You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ka...@apache.org on 2018/08/27 16:26:54 UTC

[34/51] [partial] incubator-airflow-site git commit: 1.10.0

http://git-wip-us.apache.org/repos/asf/incubator-airflow-site/blob/11437c14/_modules/airflow/contrib/operators/databricks_operator.html
----------------------------------------------------------------------
diff --git a/_modules/airflow/contrib/operators/databricks_operator.html b/_modules/airflow/contrib/operators/databricks_operator.html
index cb38349..c39f769 100644
--- a/_modules/airflow/contrib/operators/databricks_operator.html
+++ b/_modules/airflow/contrib/operators/databricks_operator.html
@@ -91,7 +91,7 @@
 <li class="toctree-l1"><a class="reference internal" href="../../../../start.html">Quick Start</a></li>
 <li class="toctree-l1"><a class="reference internal" href="../../../../installation.html">Installation</a></li>
 <li class="toctree-l1"><a class="reference internal" href="../../../../tutorial.html">Tutorial</a></li>
-<li class="toctree-l1"><a class="reference internal" href="../../../../configuration.html">Configuration</a></li>
+<li class="toctree-l1"><a class="reference internal" href="../../../../howto/index.html">How-to Guides</a></li>
 <li class="toctree-l1"><a class="reference internal" href="../../../../ui.html">UI / Screenshots</a></li>
 <li class="toctree-l1"><a class="reference internal" href="../../../../concepts.html">Concepts</a></li>
 <li class="toctree-l1"><a class="reference internal" href="../../../../profiling.html">Data Profiling</a></li>
@@ -99,8 +99,10 @@
 <li class="toctree-l1"><a class="reference internal" href="../../../../scheduler.html">Scheduling &amp; Triggers</a></li>
 <li class="toctree-l1"><a class="reference internal" href="../../../../plugins.html">Plugins</a></li>
 <li class="toctree-l1"><a class="reference internal" href="../../../../security.html">Security</a></li>
+<li class="toctree-l1"><a class="reference internal" href="../../../../timezone.html">Time zones</a></li>
 <li class="toctree-l1"><a class="reference internal" href="../../../../api.html">Experimental Rest API</a></li>
 <li class="toctree-l1"><a class="reference internal" href="../../../../integration.html">Integration</a></li>
+<li class="toctree-l1"><a class="reference internal" href="../../../../lineage.html">Lineage</a></li>
 <li class="toctree-l1"><a class="reference internal" href="../../../../faq.html">FAQ</a></li>
 <li class="toctree-l1"><a class="reference internal" href="../../../../code.html">API Reference</a></li>
 </ul>
@@ -169,17 +171,22 @@
   <h1>Source code for airflow.contrib.operators.databricks_operator</h1><div class="highlight"><pre>
 <span></span><span class="c1"># -*- coding: utf-8 -*-</span>
 <span class="c1">#</span>
-<span class="c1"># Licensed under the Apache License, Version 2.0 (the &quot;License&quot;);</span>
-<span class="c1"># you may not use this file except in compliance with the License.</span>
-<span class="c1"># You may obtain a copy of the License at</span>
-<span class="c1">#</span>
-<span class="c1"># http://www.apache.org/licenses/LICENSE-2.0</span>
-<span class="c1">#</span>
-<span class="c1"># Unless required by applicable law or agreed to in writing, software</span>
-<span class="c1"># distributed under the License is distributed on an &quot;AS IS&quot; BASIS,</span>
-<span class="c1"># WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.</span>
-<span class="c1"># See the License for the specific language governing permissions and</span>
-<span class="c1"># limitations under the License.</span>
+<span class="c1"># Licensed to the Apache Software Foundation (ASF) under one</span>
+<span class="c1"># or more contributor license agreements.  See the NOTICE file</span>
+<span class="c1"># distributed with this work for additional information</span>
+<span class="c1"># regarding copyright ownership.  The ASF licenses this file</span>
+<span class="c1"># to you under the Apache License, Version 2.0 (the</span>
+<span class="c1"># &quot;License&quot;); you may not use this file except in compliance</span>
+<span class="c1"># with the License.  You may obtain a copy of the License at</span>
+<span class="c1"># </span>
+<span class="c1">#   http://www.apache.org/licenses/LICENSE-2.0</span>
+<span class="c1"># </span>
+<span class="c1"># Unless required by applicable law or agreed to in writing,</span>
+<span class="c1"># software distributed under the License is distributed on an</span>
+<span class="c1"># &quot;AS IS&quot; BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY</span>
+<span class="c1"># KIND, either express or implied.  See the License for the</span>
+<span class="c1"># specific language governing permissions and limitations</span>
+<span class="c1"># under the License.</span>
 <span class="c1">#</span>
 
 <span class="kn">import</span> <span class="nn">six</span>
@@ -190,6 +197,10 @@
 <span class="kn">from</span> <span class="nn">airflow.models</span> <span class="k">import</span> <span class="n">BaseOperator</span>
 
 
+<span class="n">XCOM_RUN_ID_KEY</span> <span class="o">=</span> <span class="s1">&#39;run_id&#39;</span>
+<span class="n">XCOM_RUN_PAGE_URL_KEY</span> <span class="o">=</span> <span class="s1">&#39;run_page_url&#39;</span>
+
+
 <div class="viewcode-block" id="DatabricksSubmitRunOperator"><a class="viewcode-back" href="../../../../integration.html#airflow.contrib.operators.databricks_operator.DatabricksSubmitRunOperator">[docs]</a><span class="k">class</span> <span class="nc">DatabricksSubmitRunOperator</span><span class="p">(</span><span class="n">BaseOperator</span><span class="p">):</span>
     <span class="sd">&quot;&quot;&quot;</span>
 <span class="sd">    Submits an Spark job run to Databricks using the</span>
@@ -307,6 +318,8 @@
 <span class="sd">    :param databricks_retry_limit: Amount of times retry if the Databricks backend is</span>
 <span class="sd">        unreachable. Its value must be greater than or equal to 1.</span>
 <span class="sd">    :type databricks_retry_limit: int</span>
+<span class="sd">    :param do_xcom_push: Whether we should push run_id and run_page_url to xcom.</span>
+<span class="sd">    :type do_xcom_push: boolean</span>
 <span class="sd">    &quot;&quot;&quot;</span>
     <span class="c1"># Used in airflow.models.BaseOperator</span>
     <span class="n">template_fields</span> <span class="o">=</span> <span class="p">(</span><span class="s1">&#39;json&#39;</span><span class="p">,)</span>
@@ -327,6 +340,7 @@
             <span class="n">databricks_conn_id</span><span class="o">=</span><span class="s1">&#39;databricks_default&#39;</span><span class="p">,</span>
             <span class="n">polling_period_seconds</span><span class="o">=</span><span class="mi">30</span><span class="p">,</span>
             <span class="n">databricks_retry_limit</span><span class="o">=</span><span class="mi">3</span><span class="p">,</span>
+            <span class="n">do_xcom_push</span><span class="o">=</span><span class="kc">False</span><span class="p">,</span>
             <span class="o">**</span><span class="n">kwargs</span><span class="p">):</span>
         <span class="sd">&quot;&quot;&quot;</span>
 <span class="sd">        Creates a new ``DatabricksSubmitRunOperator``.</span>
@@ -356,6 +370,7 @@
         <span class="bp">self</span><span class="o">.</span><span class="n">json</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_deep_string_coerce</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">json</span><span class="p">)</span>
         <span class="c1"># This variable will be used in case our task gets killed.</span>
         <span class="bp">self</span><span class="o">.</span><span class="n">run_id</span> <span class="o">=</span> <span class="kc">None</span>
+        <span class="bp">self</span><span class="o">.</span><span class="n">do_xcom_push</span> <span class="o">=</span> <span class="n">do_xcom_push</span>
 
     <span class="k">def</span> <span class="nf">_deep_string_coerce</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">content</span><span class="p">,</span> <span class="n">json_path</span><span class="o">=</span><span class="s1">&#39;json&#39;</span><span class="p">):</span>
         <span class="sd">&quot;&quot;&quot;</span>
@@ -393,8 +408,12 @@
     <span class="k">def</span> <span class="nf">execute</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">context</span><span class="p">):</span>
         <span class="n">hook</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">get_hook</span><span class="p">()</span>
         <span class="bp">self</span><span class="o">.</span><span class="n">run_id</span> <span class="o">=</span> <span class="n">hook</span><span class="o">.</span><span class="n">submit_run</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">json</span><span class="p">)</span>
-        <span class="n">run_page_url</span> <span class="o">=</span> <span class="n">hook</span><span class="o">.</span><span class="n">get_run_page_url</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">run_id</span><span class="p">)</span>
+        <span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">do_xcom_push</span><span class="p">:</span>
+            <span class="n">context</span><span class="p">[</span><span class="s1">&#39;ti&#39;</span><span class="p">]</span><span class="o">.</span><span class="n">xcom_push</span><span class="p">(</span><span class="n">key</span><span class="o">=</span><span class="n">XCOM_RUN_ID_KEY</span><span class="p">,</span> <span class="n">value</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">run_id</span><span class="p">)</span>
         <span class="bp">self</span><span class="o">.</span><span class="n">log</span><span class="o">.</span><span class="n">info</span><span class="p">(</span><span class="s1">&#39;Run submitted with run_id: </span><span class="si">%s</span><span class="s1">&#39;</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">run_id</span><span class="p">)</span>
+        <span class="n">run_page_url</span> <span class="o">=</span> <span class="n">hook</span><span class="o">.</span><span class="n">get_run_page_url</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">run_id</span><span class="p">)</span>
+        <span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">do_xcom_push</span><span class="p">:</span>
+            <span class="n">context</span><span class="p">[</span><span class="s1">&#39;ti&#39;</span><span class="p">]</span><span class="o">.</span><span class="n">xcom_push</span><span class="p">(</span><span class="n">key</span><span class="o">=</span><span class="n">XCOM_RUN_PAGE_URL_KEY</span><span class="p">,</span> <span class="n">value</span><span class="o">=</span><span class="n">run_page_url</span><span class="p">)</span>
         <span class="bp">self</span><span class="o">.</span><span class="n">_log_run_page_url</span><span class="p">(</span><span class="n">run_page_url</span><span class="p">)</span>
         <span class="k">while</span> <span class="kc">True</span><span class="p">:</span>
             <span class="n">run_state</span> <span class="o">=</span> <span class="n">hook</span><span class="o">.</span><span class="n">get_run_state</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">run_id</span><span class="p">)</span>

http://git-wip-us.apache.org/repos/asf/incubator-airflow-site/blob/11437c14/_modules/airflow/contrib/operators/dataflow_operator.html
----------------------------------------------------------------------
diff --git a/_modules/airflow/contrib/operators/dataflow_operator.html b/_modules/airflow/contrib/operators/dataflow_operator.html
index 6807e24..d5d4041 100644
--- a/_modules/airflow/contrib/operators/dataflow_operator.html
+++ b/_modules/airflow/contrib/operators/dataflow_operator.html
@@ -91,7 +91,7 @@
 <li class="toctree-l1"><a class="reference internal" href="../../../../start.html">Quick Start</a></li>
 <li class="toctree-l1"><a class="reference internal" href="../../../../installation.html">Installation</a></li>
 <li class="toctree-l1"><a class="reference internal" href="../../../../tutorial.html">Tutorial</a></li>
-<li class="toctree-l1"><a class="reference internal" href="../../../../configuration.html">Configuration</a></li>
+<li class="toctree-l1"><a class="reference internal" href="../../../../howto/index.html">How-to Guides</a></li>
 <li class="toctree-l1"><a class="reference internal" href="../../../../ui.html">UI / Screenshots</a></li>
 <li class="toctree-l1"><a class="reference internal" href="../../../../concepts.html">Concepts</a></li>
 <li class="toctree-l1"><a class="reference internal" href="../../../../profiling.html">Data Profiling</a></li>
@@ -99,8 +99,10 @@
 <li class="toctree-l1"><a class="reference internal" href="../../../../scheduler.html">Scheduling &amp; Triggers</a></li>
 <li class="toctree-l1"><a class="reference internal" href="../../../../plugins.html">Plugins</a></li>
 <li class="toctree-l1"><a class="reference internal" href="../../../../security.html">Security</a></li>
+<li class="toctree-l1"><a class="reference internal" href="../../../../timezone.html">Time zones</a></li>
 <li class="toctree-l1"><a class="reference internal" href="../../../../api.html">Experimental Rest API</a></li>
 <li class="toctree-l1"><a class="reference internal" href="../../../../integration.html">Integration</a></li>
+<li class="toctree-l1"><a class="reference internal" href="../../../../lineage.html">Lineage</a></li>
 <li class="toctree-l1"><a class="reference internal" href="../../../../faq.html">FAQ</a></li>
 <li class="toctree-l1"><a class="reference internal" href="../../../../code.html">API Reference</a></li>
 </ul>
@@ -169,25 +171,31 @@
   <h1>Source code for airflow.contrib.operators.dataflow_operator</h1><div class="highlight"><pre>
 <span></span><span class="c1"># -*- coding: utf-8 -*-</span>
 <span class="c1">#</span>
-<span class="c1"># Licensed under the Apache License, Version 2.0 (the &quot;License&quot;);</span>
-<span class="c1"># you may not use this file except in compliance with the License.</span>
-<span class="c1"># You may obtain a copy of the License at</span>
-<span class="c1">#</span>
-<span class="c1"># http://www.apache.org/licenses/LICENSE-2.0</span>
-<span class="c1">#</span>
-<span class="c1"># Unless required by applicable law or agreed to in writing, software</span>
-<span class="c1"># distributed under the License is distributed on an &quot;AS IS&quot; BASIS,</span>
-<span class="c1"># WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.</span>
-<span class="c1"># See the License for the specific language governing permissions and</span>
-<span class="c1"># limitations under the License.</span>
+<span class="c1"># Licensed to the Apache Software Foundation (ASF) under one</span>
+<span class="c1"># or more contributor license agreements.  See the NOTICE file</span>
+<span class="c1"># distributed with this work for additional information</span>
+<span class="c1"># regarding copyright ownership.  The ASF licenses this file</span>
+<span class="c1"># to you under the Apache License, Version 2.0 (the</span>
+<span class="c1"># &quot;License&quot;); you may not use this file except in compliance</span>
+<span class="c1"># with the License.  You may obtain a copy of the License at</span>
+<span class="c1"># </span>
+<span class="c1">#   http://www.apache.org/licenses/LICENSE-2.0</span>
+<span class="c1"># </span>
+<span class="c1"># Unless required by applicable law or agreed to in writing,</span>
+<span class="c1"># software distributed under the License is distributed on an</span>
+<span class="c1"># &quot;AS IS&quot; BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY</span>
+<span class="c1"># KIND, either express or implied.  See the License for the</span>
+<span class="c1"># specific language governing permissions and limitations</span>
+<span class="c1"># under the License.</span>
 
-<span class="kn">import</span> <span class="nn">copy</span>
 <span class="kn">import</span> <span class="nn">re</span>
 <span class="kn">import</span> <span class="nn">uuid</span>
+<span class="kn">import</span> <span class="nn">copy</span>
 
 <span class="kn">from</span> <span class="nn">airflow.contrib.hooks.gcs_hook</span> <span class="k">import</span> <span class="n">GoogleCloudStorageHook</span>
 <span class="kn">from</span> <span class="nn">airflow.contrib.hooks.gcp_dataflow_hook</span> <span class="k">import</span> <span class="n">DataFlowHook</span>
 <span class="kn">from</span> <span class="nn">airflow.models</span> <span class="k">import</span> <span class="n">BaseOperator</span>
+<span class="kn">from</span> <span class="nn">airflow.version</span> <span class="k">import</span> <span class="n">version</span>
 <span class="kn">from</span> <span class="nn">airflow.utils.decorators</span> <span class="k">import</span> <span class="n">apply_defaults</span>
 
 
@@ -199,32 +207,35 @@
 <span class="sd">    It&#39;s a good practice to define dataflow_* parameters in the default_args of the dag</span>
 <span class="sd">    like the project, zone and staging location.</span>
 
-<span class="sd">    ```</span>
-<span class="sd">    default_args = {</span>
-<span class="sd">        &#39;dataflow_default_options&#39;: {</span>
-<span class="sd">            &#39;project&#39;: &#39;my-gcp-project&#39;,</span>
-<span class="sd">            &#39;zone&#39;: &#39;europe-west1-d&#39;,</span>
-<span class="sd">            &#39;stagingLocation&#39;: &#39;gs://my-staging-bucket/staging/&#39;</span>
-<span class="sd">        }</span>
-<span class="sd">    }</span>
-<span class="sd">    ```</span>
+<span class="sd">    .. code-block:: python</span>
+
+<span class="sd">       default_args = {</span>
+<span class="sd">           &#39;dataflow_default_options&#39;: {</span>
+<span class="sd">               &#39;project&#39;: &#39;my-gcp-project&#39;,</span>
+<span class="sd">               &#39;zone&#39;: &#39;europe-west1-d&#39;,</span>
+<span class="sd">               &#39;stagingLocation&#39;: &#39;gs://my-staging-bucket/staging/&#39;</span>
+<span class="sd">           }</span>
+<span class="sd">       }</span>
 
 <span class="sd">    You need to pass the path to your dataflow as a file reference with the ``jar``</span>
-<span class="sd">    parameter, the jar needs to be a self executing jar. Use ``options`` to pass on</span>
-<span class="sd">    options to your job.</span>
-
-<span class="sd">    ```</span>
-<span class="sd">    t1 = DataFlowOperation(</span>
-<span class="sd">        task_id=&#39;datapflow_example&#39;,</span>
-<span class="sd">        jar=&#39;{{var.value.gcp_dataflow_base}}pipeline/build/libs/pipeline-example-1.0.jar&#39;,</span>
-<span class="sd">        options={</span>
-<span class="sd">            &#39;autoscalingAlgorithm&#39;: &#39;BASIC&#39;,</span>
-<span class="sd">            &#39;maxNumWorkers&#39;: &#39;50&#39;,</span>
-<span class="sd">            &#39;start&#39;: &#39;{{ds}}&#39;,</span>
-<span class="sd">            &#39;partitionType&#39;: &#39;DAY&#39;</span>
-<span class="sd">        },</span>
-<span class="sd">        dag=my-dag)</span>
-<span class="sd">    ```</span>
+<span class="sd">    parameter, the jar needs to be a self executing jar (see documentation here:</span>
+<span class="sd">    https://beam.apache.org/documentation/runners/dataflow/#self-executing-jar).</span>
+<span class="sd">    Use ``options`` to pass on options to your job.</span>
+
+<span class="sd">    .. code-block:: python</span>
+
+<span class="sd">       t1 = DataFlowOperation(</span>
+<span class="sd">           task_id=&#39;datapflow_example&#39;,</span>
+<span class="sd">           jar=&#39;{{var.value.gcp_dataflow_base}}pipeline/build/libs/pipeline-example-1.0.jar&#39;,</span>
+<span class="sd">           options={</span>
+<span class="sd">               &#39;autoscalingAlgorithm&#39;: &#39;BASIC&#39;,</span>
+<span class="sd">               &#39;maxNumWorkers&#39;: &#39;50&#39;,</span>
+<span class="sd">               &#39;start&#39;: &#39;{{ds}}&#39;,</span>
+<span class="sd">               &#39;partitionType&#39;: &#39;DAY&#39;,</span>
+<span class="sd">               &#39;labels&#39;: {&#39;foo&#39; : &#39;bar&#39;}</span>
+<span class="sd">           },</span>
+<span class="sd">           gcp_conn_id=&#39;gcp-airflow-service-account&#39;,</span>
+<span class="sd">           dag=my-dag)</span>
 
 <span class="sd">    Both ``jar`` and ``options`` are templated so you can use variables in them.</span>
 <span class="sd">    &quot;&quot;&quot;</span>
@@ -239,6 +250,8 @@
             <span class="n">options</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
             <span class="n">gcp_conn_id</span><span class="o">=</span><span class="s1">&#39;google_cloud_default&#39;</span><span class="p">,</span>
             <span class="n">delegate_to</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
+            <span class="n">poll_sleep</span><span class="o">=</span><span class="mi">10</span><span class="p">,</span>
+            <span class="n">job_class</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
             <span class="o">*</span><span class="n">args</span><span class="p">,</span>
             <span class="o">**</span><span class="n">kwargs</span><span class="p">):</span>
         <span class="sd">&quot;&quot;&quot;</span>
@@ -248,9 +261,10 @@
 <span class="sd">        high-level options, for instances, project and zone information, which</span>
 <span class="sd">        apply to all dataflow operators in the DAG.</span>
 
-<span class="sd">        For more detail on job submission have a look at the reference:</span>
 
-<span class="sd">        https://cloud.google.com/dataflow/pipelines/specifying-exec-params</span>
+<span class="sd">        .. seealso::</span>
+<span class="sd">            For more detail on job submission have a look at the reference:</span>
+<span class="sd">            https://cloud.google.com/dataflow/pipelines/specifying-exec-params</span>
 
 <span class="sd">        :param jar: The reference to a self executing DataFlow jar.</span>
 <span class="sd">        :type jar: string</span>
@@ -265,29 +279,147 @@
 <span class="sd">            For this to work, the service account making the request must have</span>
 <span class="sd">            domain-wide delegation enabled.</span>
 <span class="sd">        :type delegate_to: string</span>
+<span class="sd">        :param poll_sleep: The time in seconds to sleep between polling Google</span>
+<span class="sd">            Cloud Platform for the dataflow job status while the job is in the</span>
+<span class="sd">            JOB_STATE_RUNNING state.</span>
+<span class="sd">        :type poll_sleep: int</span>
+<span class="sd">        :param job_class: The name of the dataflow job class to be executued, it</span>
+<span class="sd">        is often not the main class configured in the dataflow jar file.</span>
+<span class="sd">        :type job_class: string</span>
 <span class="sd">        &quot;&quot;&quot;</span>
         <span class="nb">super</span><span class="p">(</span><span class="n">DataFlowJavaOperator</span><span class="p">,</span> <span class="bp">self</span><span class="p">)</span><span class="o">.</span><span class="fm">__init__</span><span class="p">(</span><span class="o">*</span><span class="n">args</span><span class="p">,</span> <span class="o">**</span><span class="n">kwargs</span><span class="p">)</span>
 
         <span class="n">dataflow_default_options</span> <span class="o">=</span> <span class="n">dataflow_default_options</span> <span class="ow">or</span> <span class="p">{}</span>
         <span class="n">options</span> <span class="o">=</span> <span class="n">options</span> <span class="ow">or</span> <span class="p">{}</span>
-
+        <span class="n">options</span><span class="o">.</span><span class="n">setdefault</span><span class="p">(</span><span class="s1">&#39;labels&#39;</span><span class="p">,</span> <span class="p">{})</span><span class="o">.</span><span class="n">update</span><span class="p">(</span>
+            <span class="p">{</span><span class="s1">&#39;airflow-version&#39;</span><span class="p">:</span> <span class="s1">&#39;v&#39;</span> <span class="o">+</span> <span class="n">version</span><span class="o">.</span><span class="n">replace</span><span class="p">(</span><span class="s1">&#39;.&#39;</span><span class="p">,</span> <span class="s1">&#39;-&#39;</span><span class="p">)</span><span class="o">.</span><span class="n">replace</span><span class="p">(</span><span class="s1">&#39;+&#39;</span><span class="p">,</span> <span class="s1">&#39;-&#39;</span><span class="p">)})</span>
         <span class="bp">self</span><span class="o">.</span><span class="n">gcp_conn_id</span> <span class="o">=</span> <span class="n">gcp_conn_id</span>
         <span class="bp">self</span><span class="o">.</span><span class="n">delegate_to</span> <span class="o">=</span> <span class="n">delegate_to</span>
         <span class="bp">self</span><span class="o">.</span><span class="n">jar</span> <span class="o">=</span> <span class="n">jar</span>
         <span class="bp">self</span><span class="o">.</span><span class="n">dataflow_default_options</span> <span class="o">=</span> <span class="n">dataflow_default_options</span>
         <span class="bp">self</span><span class="o">.</span><span class="n">options</span> <span class="o">=</span> <span class="n">options</span>
+        <span class="bp">self</span><span class="o">.</span><span class="n">poll_sleep</span> <span class="o">=</span> <span class="n">poll_sleep</span>
+        <span class="bp">self</span><span class="o">.</span><span class="n">job_class</span> <span class="o">=</span> <span class="n">job_class</span>
 
     <span class="k">def</span> <span class="nf">execute</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">context</span><span class="p">):</span>
         <span class="n">bucket_helper</span> <span class="o">=</span> <span class="n">GoogleCloudBucketHelper</span><span class="p">(</span>
             <span class="bp">self</span><span class="o">.</span><span class="n">gcp_conn_id</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">delegate_to</span><span class="p">)</span>
         <span class="bp">self</span><span class="o">.</span><span class="n">jar</span> <span class="o">=</span> <span class="n">bucket_helper</span><span class="o">.</span><span class="n">google_cloud_to_local</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">jar</span><span class="p">)</span>
         <span class="n">hook</span> <span class="o">=</span> <span class="n">DataFlowHook</span><span class="p">(</span><span class="n">gcp_conn_id</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">gcp_conn_id</span><span class="p">,</span>
-                            <span class="n">delegate_to</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">delegate_to</span><span class="p">)</span>
+                            <span class="n">delegate_to</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">delegate_to</span><span class="p">,</span>
+                            <span class="n">poll_sleep</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">poll_sleep</span><span class="p">)</span>
 
         <span class="n">dataflow_options</span> <span class="o">=</span> <span class="n">copy</span><span class="o">.</span><span class="n">copy</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">dataflow_default_options</span><span class="p">)</span>
         <span class="n">dataflow_options</span><span class="o">.</span><span class="n">update</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">options</span><span class="p">)</span>
 
-        <span class="n">hook</span><span class="o">.</span><span class="n">start_java_dataflow</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">task_id</span><span class="p">,</span> <span class="n">dataflow_options</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">jar</span><span class="p">)</span></div>
+        <span class="n">hook</span><span class="o">.</span><span class="n">start_java_dataflow</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">task_id</span><span class="p">,</span> <span class="n">dataflow_options</span><span class="p">,</span>
+                                 <span class="bp">self</span><span class="o">.</span><span class="n">jar</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">job_class</span><span class="p">)</span></div>
+
+
+<div class="viewcode-block" id="DataflowTemplateOperator"><a class="viewcode-back" href="../../../../integration.html#airflow.contrib.operators.dataflow_operator.DataflowTemplateOperator">[docs]</a><span class="k">class</span> <span class="nc">DataflowTemplateOperator</span><span class="p">(</span><span class="n">BaseOperator</span><span class="p">):</span>
+    <span class="sd">&quot;&quot;&quot;</span>
+<span class="sd">    Start a Templated Cloud DataFlow batch job. The parameters of the operation</span>
+<span class="sd">    will be passed to the job.</span>
+<span class="sd">    It&#39;s a good practice to define dataflow_* parameters in the default_args of the dag</span>
+<span class="sd">    like the project, zone and staging location.</span>
+
+<span class="sd">    .. seealso::</span>
+<span class="sd">        https://cloud.google.com/dataflow/docs/reference/rest/v1b3/LaunchTemplateParameters</span>
+<span class="sd">        https://cloud.google.com/dataflow/docs/reference/rest/v1b3/RuntimeEnvironment</span>
+
+<span class="sd">    .. code-block:: python</span>
+
+<span class="sd">       default_args = {</span>
+<span class="sd">           &#39;dataflow_default_options&#39;: {</span>
+<span class="sd">               &#39;project&#39;: &#39;my-gcp-project&#39;</span>
+<span class="sd">               &#39;zone&#39;: &#39;europe-west1-d&#39;,</span>
+<span class="sd">               &#39;tempLocation&#39;: &#39;gs://my-staging-bucket/staging/&#39;</span>
+<span class="sd">               }</span>
+<span class="sd">           }</span>
+<span class="sd">       }</span>
+
+<span class="sd">    You need to pass the path to your dataflow template as a file reference with the</span>
+<span class="sd">    ``template`` parameter. Use ``parameters`` to pass on parameters to your job.</span>
+<span class="sd">    Use ``environment`` to pass on runtime environment variables to your job.</span>
+
+<span class="sd">    .. code-block:: python</span>
+
+<span class="sd">       t1 = DataflowTemplateOperator(</span>
+<span class="sd">           task_id=&#39;datapflow_example&#39;,</span>
+<span class="sd">           template=&#39;{{var.value.gcp_dataflow_base}}&#39;,</span>
+<span class="sd">           parameters={</span>
+<span class="sd">               &#39;inputFile&#39;: &quot;gs://bucket/input/my_input.txt&quot;,</span>
+<span class="sd">               &#39;outputFile&#39;: &quot;gs://bucket/output/my_output.txt&quot;</span>
+<span class="sd">           },</span>
+<span class="sd">           gcp_conn_id=&#39;gcp-airflow-service-account&#39;,</span>
+<span class="sd">           dag=my-dag)</span>
+
+<span class="sd">    ``template``, ``dataflow_default_options`` and ``parameters`` are templated so you can</span>
+<span class="sd">    use variables in them.</span>
+<span class="sd">    &quot;&quot;&quot;</span>
+    <span class="n">template_fields</span> <span class="o">=</span> <span class="p">[</span><span class="s1">&#39;parameters&#39;</span><span class="p">,</span> <span class="s1">&#39;dataflow_default_options&#39;</span><span class="p">,</span> <span class="s1">&#39;template&#39;</span><span class="p">]</span>
+    <span class="n">ui_color</span> <span class="o">=</span> <span class="s1">&#39;#0273d4&#39;</span>
+
+    <span class="nd">@apply_defaults</span>
+    <span class="k">def</span> <span class="nf">__init__</span><span class="p">(</span>
+            <span class="bp">self</span><span class="p">,</span>
+            <span class="n">template</span><span class="p">,</span>
+            <span class="n">dataflow_default_options</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
+            <span class="n">parameters</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
+            <span class="n">gcp_conn_id</span><span class="o">=</span><span class="s1">&#39;google_cloud_default&#39;</span><span class="p">,</span>
+            <span class="n">delegate_to</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
+            <span class="n">poll_sleep</span><span class="o">=</span><span class="mi">10</span><span class="p">,</span>
+            <span class="o">*</span><span class="n">args</span><span class="p">,</span>
+            <span class="o">**</span><span class="n">kwargs</span><span class="p">):</span>
+        <span class="sd">&quot;&quot;&quot;</span>
+<span class="sd">        Create a new DataflowTemplateOperator. Note that</span>
+<span class="sd">        dataflow_default_options is expected to save high-level options</span>
+<span class="sd">        for project information, which apply to all dataflow operators in the DAG.</span>
+
+<span class="sd">        .. seealso::</span>
+<span class="sd">            https://cloud.google.com/dataflow/docs/reference/rest/v1b3</span>
+<span class="sd">            /LaunchTemplateParameters</span>
+<span class="sd">            https://cloud.google.com/dataflow/docs/reference/rest/v1b3/RuntimeEnvironment</span>
+<span class="sd">            For more detail on job template execution have a look at the reference:</span>
+<span class="sd">            https://cloud.google.com/dataflow/docs/templates/executing-templates</span>
+
+<span class="sd">        :param template: The reference to the DataFlow template.</span>
+<span class="sd">        :type template: string</span>
+<span class="sd">        :param dataflow_default_options: Map of default job environment options.</span>
+<span class="sd">        :type dataflow_default_options: dict</span>
+<span class="sd">        :param parameters: Map of job specific parameters for the template.</span>
+<span class="sd">        :type parameters: dict</span>
+<span class="sd">        :param gcp_conn_id: The connection ID to use connecting to Google Cloud</span>
+<span class="sd">        Platform.</span>
+<span class="sd">        :type gcp_conn_id: string</span>
+<span class="sd">        :param delegate_to: The account to impersonate, if any.</span>
+<span class="sd">            For this to work, the service account making the request must have</span>
+<span class="sd">            domain-wide delegation enabled.</span>
+<span class="sd">        :type delegate_to: string</span>
+<span class="sd">        :param poll_sleep: The time in seconds to sleep between polling Google</span>
+<span class="sd">            Cloud Platform for the dataflow job status while the job is in the</span>
+<span class="sd">            JOB_STATE_RUNNING state.</span>
+<span class="sd">        :type poll_sleep: int</span>
+<span class="sd">        &quot;&quot;&quot;</span>
+        <span class="nb">super</span><span class="p">(</span><span class="n">DataflowTemplateOperator</span><span class="p">,</span> <span class="bp">self</span><span class="p">)</span><span class="o">.</span><span class="fm">__init__</span><span class="p">(</span><span class="o">*</span><span class="n">args</span><span class="p">,</span> <span class="o">**</span><span class="n">kwargs</span><span class="p">)</span>
+
+        <span class="n">dataflow_default_options</span> <span class="o">=</span> <span class="n">dataflow_default_options</span> <span class="ow">or</span> <span class="p">{}</span>
+        <span class="n">parameters</span> <span class="o">=</span> <span class="n">parameters</span> <span class="ow">or</span> <span class="p">{}</span>
+
+        <span class="bp">self</span><span class="o">.</span><span class="n">gcp_conn_id</span> <span class="o">=</span> <span class="n">gcp_conn_id</span>
+        <span class="bp">self</span><span class="o">.</span><span class="n">delegate_to</span> <span class="o">=</span> <span class="n">delegate_to</span>
+        <span class="bp">self</span><span class="o">.</span><span class="n">dataflow_default_options</span> <span class="o">=</span> <span class="n">dataflow_default_options</span>
+        <span class="bp">self</span><span class="o">.</span><span class="n">poll_sleep</span> <span class="o">=</span> <span class="n">poll_sleep</span>
+        <span class="bp">self</span><span class="o">.</span><span class="n">template</span> <span class="o">=</span> <span class="n">template</span>
+        <span class="bp">self</span><span class="o">.</span><span class="n">parameters</span> <span class="o">=</span> <span class="n">parameters</span>
+
+    <span class="k">def</span> <span class="nf">execute</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">context</span><span class="p">):</span>
+        <span class="n">hook</span> <span class="o">=</span> <span class="n">DataFlowHook</span><span class="p">(</span><span class="n">gcp_conn_id</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">gcp_conn_id</span><span class="p">,</span>
+                            <span class="n">delegate_to</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">delegate_to</span><span class="p">,</span>
+                            <span class="n">poll_sleep</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">poll_sleep</span><span class="p">)</span>
+
+        <span class="n">hook</span><span class="o">.</span><span class="n">start_template_dataflow</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">task_id</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">dataflow_default_options</span><span class="p">,</span>
+                                     <span class="bp">self</span><span class="o">.</span><span class="n">parameters</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">template</span><span class="p">)</span></div>
 
 
 <div class="viewcode-block" id="DataFlowPythonOperator"><a class="viewcode-back" href="../../../../integration.html#airflow.contrib.operators.dataflow_operator.DataFlowPythonOperator">[docs]</a><span class="k">class</span> <span class="nc">DataFlowPythonOperator</span><span class="p">(</span><span class="n">BaseOperator</span><span class="p">):</span>
@@ -303,6 +435,7 @@
             <span class="n">options</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
             <span class="n">gcp_conn_id</span><span class="o">=</span><span class="s1">&#39;google_cloud_default&#39;</span><span class="p">,</span>
             <span class="n">delegate_to</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
+            <span class="n">poll_sleep</span><span class="o">=</span><span class="mi">10</span><span class="p">,</span>
             <span class="o">*</span><span class="n">args</span><span class="p">,</span>
             <span class="o">**</span><span class="n">kwargs</span><span class="p">):</span>
         <span class="sd">&quot;&quot;&quot;</span>
@@ -312,12 +445,12 @@
 <span class="sd">        high-level options, for instances, project and zone information, which</span>
 <span class="sd">        apply to all dataflow operators in the DAG.</span>
 
-<span class="sd">        For more detail on job submission have a look at the reference:</span>
+<span class="sd">        .. seealso::</span>
+<span class="sd">            For more detail on job submission have a look at the reference:</span>
+<span class="sd">            https://cloud.google.com/dataflow/pipelines/specifying-exec-params</span>
 
-<span class="sd">        https://cloud.google.com/dataflow/pipelines/specifying-exec-params</span>
-
-<span class="sd">        :param py_file: Reference to the python dataflow pipleline file, e.g.,</span>
-<span class="sd">            /some/local/file/path/to/your/python/pipeline/file.py.</span>
+<span class="sd">        :param py_file: Reference to the python dataflow pipleline file.py, e.g.,</span>
+<span class="sd">            /some/local/file/path/to/your/python/pipeline/file.</span>
 <span class="sd">        :type py_file: string</span>
 <span class="sd">        :param py_options: Additional python options.</span>
 <span class="sd">        :type pyt_options: list of strings, e.g., [&quot;-m&quot;, &quot;-v&quot;].</span>
@@ -332,6 +465,10 @@
 <span class="sd">            For this to work, the service account making the request must have</span>
 <span class="sd">            domain-wide  delegation enabled.</span>
 <span class="sd">        :type delegate_to: string</span>
+<span class="sd">        :param poll_sleep: The time in seconds to sleep between polling Google</span>
+<span class="sd">            Cloud Platform for the dataflow job status while the job is in the</span>
+<span class="sd">            JOB_STATE_RUNNING state.</span>
+<span class="sd">        :type poll_sleep: int</span>
 <span class="sd">        &quot;&quot;&quot;</span>
         <span class="nb">super</span><span class="p">(</span><span class="n">DataFlowPythonOperator</span><span class="p">,</span> <span class="bp">self</span><span class="p">)</span><span class="o">.</span><span class="fm">__init__</span><span class="p">(</span><span class="o">*</span><span class="n">args</span><span class="p">,</span> <span class="o">**</span><span class="n">kwargs</span><span class="p">)</span>
 
@@ -339,16 +476,20 @@
         <span class="bp">self</span><span class="o">.</span><span class="n">py_options</span> <span class="o">=</span> <span class="n">py_options</span> <span class="ow">or</span> <span class="p">[]</span>
         <span class="bp">self</span><span class="o">.</span><span class="n">dataflow_default_options</span> <span class="o">=</span> <span class="n">dataflow_default_options</span> <span class="ow">or</span> <span class="p">{}</span>
         <span class="bp">self</span><span class="o">.</span><span class="n">options</span> <span class="o">=</span> <span class="n">options</span> <span class="ow">or</span> <span class="p">{}</span>
+        <span class="bp">self</span><span class="o">.</span><span class="n">options</span><span class="o">.</span><span class="n">setdefault</span><span class="p">(</span><span class="s1">&#39;labels&#39;</span><span class="p">,</span> <span class="p">{})</span><span class="o">.</span><span class="n">update</span><span class="p">(</span>
+            <span class="p">{</span><span class="s1">&#39;airflow-version&#39;</span><span class="p">:</span> <span class="s1">&#39;v&#39;</span> <span class="o">+</span> <span class="n">version</span><span class="o">.</span><span class="n">replace</span><span class="p">(</span><span class="s1">&#39;.&#39;</span><span class="p">,</span> <span class="s1">&#39;-&#39;</span><span class="p">)</span><span class="o">.</span><span class="n">replace</span><span class="p">(</span><span class="s1">&#39;+&#39;</span><span class="p">,</span> <span class="s1">&#39;-&#39;</span><span class="p">)})</span>
         <span class="bp">self</span><span class="o">.</span><span class="n">gcp_conn_id</span> <span class="o">=</span> <span class="n">gcp_conn_id</span>
         <span class="bp">self</span><span class="o">.</span><span class="n">delegate_to</span> <span class="o">=</span> <span class="n">delegate_to</span>
+        <span class="bp">self</span><span class="o">.</span><span class="n">poll_sleep</span> <span class="o">=</span> <span class="n">poll_sleep</span>
 
-    <span class="k">def</span> <span class="nf">execute</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">context</span><span class="p">):</span>
+<div class="viewcode-block" id="DataFlowPythonOperator.execute"><a class="viewcode-back" href="../../../../integration.html#airflow.contrib.operators.dataflow_operator.DataFlowPythonOperator.execute">[docs]</a>    <span class="k">def</span> <span class="nf">execute</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">context</span><span class="p">):</span>
         <span class="sd">&quot;&quot;&quot;Execute the python dataflow job.&quot;&quot;&quot;</span>
         <span class="n">bucket_helper</span> <span class="o">=</span> <span class="n">GoogleCloudBucketHelper</span><span class="p">(</span>
             <span class="bp">self</span><span class="o">.</span><span class="n">gcp_conn_id</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">delegate_to</span><span class="p">)</span>
         <span class="bp">self</span><span class="o">.</span><span class="n">py_file</span> <span class="o">=</span> <span class="n">bucket_helper</span><span class="o">.</span><span class="n">google_cloud_to_local</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">py_file</span><span class="p">)</span>
         <span class="n">hook</span> <span class="o">=</span> <span class="n">DataFlowHook</span><span class="p">(</span><span class="n">gcp_conn_id</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">gcp_conn_id</span><span class="p">,</span>
-                            <span class="n">delegate_to</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">delegate_to</span><span class="p">)</span>
+                            <span class="n">delegate_to</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">delegate_to</span><span class="p">,</span>
+                            <span class="n">poll_sleep</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">poll_sleep</span><span class="p">)</span>
         <span class="n">dataflow_options</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">dataflow_default_options</span><span class="o">.</span><span class="n">copy</span><span class="p">()</span>
         <span class="n">dataflow_options</span><span class="o">.</span><span class="n">update</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">options</span><span class="p">)</span>
         <span class="c1"># Convert argument names from lowerCamelCase to snake case.</span>
@@ -358,7 +499,7 @@
                              <span class="k">for</span> <span class="n">key</span> <span class="ow">in</span> <span class="n">dataflow_options</span><span class="p">}</span>
         <span class="n">hook</span><span class="o">.</span><span class="n">start_python_dataflow</span><span class="p">(</span>
             <span class="bp">self</span><span class="o">.</span><span class="n">task_id</span><span class="p">,</span> <span class="n">formatted_options</span><span class="p">,</span>
-            <span class="bp">self</span><span class="o">.</span><span class="n">py_file</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">py_options</span><span class="p">)</span></div>
+            <span class="bp">self</span><span class="o">.</span><span class="n">py_file</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">py_options</span><span class="p">)</span></div></div>
 
 
 <span class="k">class</span> <span class="nc">GoogleCloudBucketHelper</span><span class="p">():</span>