You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ka...@apache.org on 2018/08/27 16:26:54 UTC
[34/51] [partial] incubator-airflow-site git commit: 1.10.0
http://git-wip-us.apache.org/repos/asf/incubator-airflow-site/blob/11437c14/_modules/airflow/contrib/operators/databricks_operator.html
----------------------------------------------------------------------
diff --git a/_modules/airflow/contrib/operators/databricks_operator.html b/_modules/airflow/contrib/operators/databricks_operator.html
index cb38349..c39f769 100644
--- a/_modules/airflow/contrib/operators/databricks_operator.html
+++ b/_modules/airflow/contrib/operators/databricks_operator.html
@@ -91,7 +91,7 @@
<li class="toctree-l1"><a class="reference internal" href="../../../../start.html">Quick Start</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../../installation.html">Installation</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../../tutorial.html">Tutorial</a></li>
-<li class="toctree-l1"><a class="reference internal" href="../../../../configuration.html">Configuration</a></li>
+<li class="toctree-l1"><a class="reference internal" href="../../../../howto/index.html">How-to Guides</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../../ui.html">UI / Screenshots</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../../concepts.html">Concepts</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../../profiling.html">Data Profiling</a></li>
@@ -99,8 +99,10 @@
<li class="toctree-l1"><a class="reference internal" href="../../../../scheduler.html">Scheduling & Triggers</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../../plugins.html">Plugins</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../../security.html">Security</a></li>
+<li class="toctree-l1"><a class="reference internal" href="../../../../timezone.html">Time zones</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../../api.html">Experimental Rest API</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../../integration.html">Integration</a></li>
+<li class="toctree-l1"><a class="reference internal" href="../../../../lineage.html">Lineage</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../../faq.html">FAQ</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../../code.html">API Reference</a></li>
</ul>
@@ -169,17 +171,22 @@
<h1>Source code for airflow.contrib.operators.databricks_operator</h1><div class="highlight"><pre>
<span></span><span class="c1"># -*- coding: utf-8 -*-</span>
<span class="c1">#</span>
-<span class="c1"># Licensed under the Apache License, Version 2.0 (the "License");</span>
-<span class="c1"># you may not use this file except in compliance with the License.</span>
-<span class="c1"># You may obtain a copy of the License at</span>
-<span class="c1">#</span>
-<span class="c1"># http://www.apache.org/licenses/LICENSE-2.0</span>
-<span class="c1">#</span>
-<span class="c1"># Unless required by applicable law or agreed to in writing, software</span>
-<span class="c1"># distributed under the License is distributed on an "AS IS" BASIS,</span>
-<span class="c1"># WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.</span>
-<span class="c1"># See the License for the specific language governing permissions and</span>
-<span class="c1"># limitations under the License.</span>
+<span class="c1"># Licensed to the Apache Software Foundation (ASF) under one</span>
+<span class="c1"># or more contributor license agreements. See the NOTICE file</span>
+<span class="c1"># distributed with this work for additional information</span>
+<span class="c1"># regarding copyright ownership. The ASF licenses this file</span>
+<span class="c1"># to you under the Apache License, Version 2.0 (the</span>
+<span class="c1"># "License"); you may not use this file except in compliance</span>
+<span class="c1"># with the License. You may obtain a copy of the License at</span>
+<span class="c1"># </span>
+<span class="c1"># http://www.apache.org/licenses/LICENSE-2.0</span>
+<span class="c1"># </span>
+<span class="c1"># Unless required by applicable law or agreed to in writing,</span>
+<span class="c1"># software distributed under the License is distributed on an</span>
+<span class="c1"># "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY</span>
+<span class="c1"># KIND, either express or implied. See the License for the</span>
+<span class="c1"># specific language governing permissions and limitations</span>
+<span class="c1"># under the License.</span>
<span class="c1">#</span>
<span class="kn">import</span> <span class="nn">six</span>
@@ -190,6 +197,10 @@
<span class="kn">from</span> <span class="nn">airflow.models</span> <span class="k">import</span> <span class="n">BaseOperator</span>
+<span class="n">XCOM_RUN_ID_KEY</span> <span class="o">=</span> <span class="s1">'run_id'</span>
+<span class="n">XCOM_RUN_PAGE_URL_KEY</span> <span class="o">=</span> <span class="s1">'run_page_url'</span>
+
+
<div class="viewcode-block" id="DatabricksSubmitRunOperator"><a class="viewcode-back" href="../../../../integration.html#airflow.contrib.operators.databricks_operator.DatabricksSubmitRunOperator">[docs]</a><span class="k">class</span> <span class="nc">DatabricksSubmitRunOperator</span><span class="p">(</span><span class="n">BaseOperator</span><span class="p">):</span>
<span class="sd">"""</span>
<span class="sd"> Submits an Spark job run to Databricks using the</span>
@@ -307,6 +318,8 @@
<span class="sd"> :param databricks_retry_limit: Amount of times retry if the Databricks backend is</span>
<span class="sd"> unreachable. Its value must be greater than or equal to 1.</span>
<span class="sd"> :type databricks_retry_limit: int</span>
+<span class="sd"> :param do_xcom_push: Whether we should push run_id and run_page_url to xcom.</span>
+<span class="sd"> :type do_xcom_push: boolean</span>
<span class="sd"> """</span>
<span class="c1"># Used in airflow.models.BaseOperator</span>
<span class="n">template_fields</span> <span class="o">=</span> <span class="p">(</span><span class="s1">'json'</span><span class="p">,)</span>
@@ -327,6 +340,7 @@
<span class="n">databricks_conn_id</span><span class="o">=</span><span class="s1">'databricks_default'</span><span class="p">,</span>
<span class="n">polling_period_seconds</span><span class="o">=</span><span class="mi">30</span><span class="p">,</span>
<span class="n">databricks_retry_limit</span><span class="o">=</span><span class="mi">3</span><span class="p">,</span>
+ <span class="n">do_xcom_push</span><span class="o">=</span><span class="kc">False</span><span class="p">,</span>
<span class="o">**</span><span class="n">kwargs</span><span class="p">):</span>
<span class="sd">"""</span>
<span class="sd"> Creates a new ``DatabricksSubmitRunOperator``.</span>
@@ -356,6 +370,7 @@
<span class="bp">self</span><span class="o">.</span><span class="n">json</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_deep_string_coerce</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">json</span><span class="p">)</span>
<span class="c1"># This variable will be used in case our task gets killed.</span>
<span class="bp">self</span><span class="o">.</span><span class="n">run_id</span> <span class="o">=</span> <span class="kc">None</span>
+ <span class="bp">self</span><span class="o">.</span><span class="n">do_xcom_push</span> <span class="o">=</span> <span class="n">do_xcom_push</span>
<span class="k">def</span> <span class="nf">_deep_string_coerce</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">content</span><span class="p">,</span> <span class="n">json_path</span><span class="o">=</span><span class="s1">'json'</span><span class="p">):</span>
<span class="sd">"""</span>
@@ -393,8 +408,12 @@
<span class="k">def</span> <span class="nf">execute</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">context</span><span class="p">):</span>
<span class="n">hook</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">get_hook</span><span class="p">()</span>
<span class="bp">self</span><span class="o">.</span><span class="n">run_id</span> <span class="o">=</span> <span class="n">hook</span><span class="o">.</span><span class="n">submit_run</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">json</span><span class="p">)</span>
- <span class="n">run_page_url</span> <span class="o">=</span> <span class="n">hook</span><span class="o">.</span><span class="n">get_run_page_url</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">run_id</span><span class="p">)</span>
+ <span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">do_xcom_push</span><span class="p">:</span>
+ <span class="n">context</span><span class="p">[</span><span class="s1">'ti'</span><span class="p">]</span><span class="o">.</span><span class="n">xcom_push</span><span class="p">(</span><span class="n">key</span><span class="o">=</span><span class="n">XCOM_RUN_ID_KEY</span><span class="p">,</span> <span class="n">value</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">run_id</span><span class="p">)</span>
<span class="bp">self</span><span class="o">.</span><span class="n">log</span><span class="o">.</span><span class="n">info</span><span class="p">(</span><span class="s1">'Run submitted with run_id: </span><span class="si">%s</span><span class="s1">'</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">run_id</span><span class="p">)</span>
+ <span class="n">run_page_url</span> <span class="o">=</span> <span class="n">hook</span><span class="o">.</span><span class="n">get_run_page_url</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">run_id</span><span class="p">)</span>
+ <span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">do_xcom_push</span><span class="p">:</span>
+ <span class="n">context</span><span class="p">[</span><span class="s1">'ti'</span><span class="p">]</span><span class="o">.</span><span class="n">xcom_push</span><span class="p">(</span><span class="n">key</span><span class="o">=</span><span class="n">XCOM_RUN_PAGE_URL_KEY</span><span class="p">,</span> <span class="n">value</span><span class="o">=</span><span class="n">run_page_url</span><span class="p">)</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_log_run_page_url</span><span class="p">(</span><span class="n">run_page_url</span><span class="p">)</span>
<span class="k">while</span> <span class="kc">True</span><span class="p">:</span>
<span class="n">run_state</span> <span class="o">=</span> <span class="n">hook</span><span class="o">.</span><span class="n">get_run_state</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">run_id</span><span class="p">)</span>
http://git-wip-us.apache.org/repos/asf/incubator-airflow-site/blob/11437c14/_modules/airflow/contrib/operators/dataflow_operator.html
----------------------------------------------------------------------
diff --git a/_modules/airflow/contrib/operators/dataflow_operator.html b/_modules/airflow/contrib/operators/dataflow_operator.html
index 6807e24..d5d4041 100644
--- a/_modules/airflow/contrib/operators/dataflow_operator.html
+++ b/_modules/airflow/contrib/operators/dataflow_operator.html
@@ -91,7 +91,7 @@
<li class="toctree-l1"><a class="reference internal" href="../../../../start.html">Quick Start</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../../installation.html">Installation</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../../tutorial.html">Tutorial</a></li>
-<li class="toctree-l1"><a class="reference internal" href="../../../../configuration.html">Configuration</a></li>
+<li class="toctree-l1"><a class="reference internal" href="../../../../howto/index.html">How-to Guides</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../../ui.html">UI / Screenshots</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../../concepts.html">Concepts</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../../profiling.html">Data Profiling</a></li>
@@ -99,8 +99,10 @@
<li class="toctree-l1"><a class="reference internal" href="../../../../scheduler.html">Scheduling & Triggers</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../../plugins.html">Plugins</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../../security.html">Security</a></li>
+<li class="toctree-l1"><a class="reference internal" href="../../../../timezone.html">Time zones</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../../api.html">Experimental Rest API</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../../integration.html">Integration</a></li>
+<li class="toctree-l1"><a class="reference internal" href="../../../../lineage.html">Lineage</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../../faq.html">FAQ</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../../code.html">API Reference</a></li>
</ul>
@@ -169,25 +171,31 @@
<h1>Source code for airflow.contrib.operators.dataflow_operator</h1><div class="highlight"><pre>
<span></span><span class="c1"># -*- coding: utf-8 -*-</span>
<span class="c1">#</span>
-<span class="c1"># Licensed under the Apache License, Version 2.0 (the "License");</span>
-<span class="c1"># you may not use this file except in compliance with the License.</span>
-<span class="c1"># You may obtain a copy of the License at</span>
-<span class="c1">#</span>
-<span class="c1"># http://www.apache.org/licenses/LICENSE-2.0</span>
-<span class="c1">#</span>
-<span class="c1"># Unless required by applicable law or agreed to in writing, software</span>
-<span class="c1"># distributed under the License is distributed on an "AS IS" BASIS,</span>
-<span class="c1"># WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.</span>
-<span class="c1"># See the License for the specific language governing permissions and</span>
-<span class="c1"># limitations under the License.</span>
+<span class="c1"># Licensed to the Apache Software Foundation (ASF) under one</span>
+<span class="c1"># or more contributor license agreements. See the NOTICE file</span>
+<span class="c1"># distributed with this work for additional information</span>
+<span class="c1"># regarding copyright ownership. The ASF licenses this file</span>
+<span class="c1"># to you under the Apache License, Version 2.0 (the</span>
+<span class="c1"># "License"); you may not use this file except in compliance</span>
+<span class="c1"># with the License. You may obtain a copy of the License at</span>
+<span class="c1"># </span>
+<span class="c1"># http://www.apache.org/licenses/LICENSE-2.0</span>
+<span class="c1"># </span>
+<span class="c1"># Unless required by applicable law or agreed to in writing,</span>
+<span class="c1"># software distributed under the License is distributed on an</span>
+<span class="c1"># "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY</span>
+<span class="c1"># KIND, either express or implied. See the License for the</span>
+<span class="c1"># specific language governing permissions and limitations</span>
+<span class="c1"># under the License.</span>
-<span class="kn">import</span> <span class="nn">copy</span>
<span class="kn">import</span> <span class="nn">re</span>
<span class="kn">import</span> <span class="nn">uuid</span>
+<span class="kn">import</span> <span class="nn">copy</span>
<span class="kn">from</span> <span class="nn">airflow.contrib.hooks.gcs_hook</span> <span class="k">import</span> <span class="n">GoogleCloudStorageHook</span>
<span class="kn">from</span> <span class="nn">airflow.contrib.hooks.gcp_dataflow_hook</span> <span class="k">import</span> <span class="n">DataFlowHook</span>
<span class="kn">from</span> <span class="nn">airflow.models</span> <span class="k">import</span> <span class="n">BaseOperator</span>
+<span class="kn">from</span> <span class="nn">airflow.version</span> <span class="k">import</span> <span class="n">version</span>
<span class="kn">from</span> <span class="nn">airflow.utils.decorators</span> <span class="k">import</span> <span class="n">apply_defaults</span>
@@ -199,32 +207,35 @@
<span class="sd"> It's a good practice to define dataflow_* parameters in the default_args of the dag</span>
<span class="sd"> like the project, zone and staging location.</span>
-<span class="sd"> ```</span>
-<span class="sd"> default_args = {</span>
-<span class="sd"> 'dataflow_default_options': {</span>
-<span class="sd"> 'project': 'my-gcp-project',</span>
-<span class="sd"> 'zone': 'europe-west1-d',</span>
-<span class="sd"> 'stagingLocation': 'gs://my-staging-bucket/staging/'</span>
-<span class="sd"> }</span>
-<span class="sd"> }</span>
-<span class="sd"> ```</span>
+<span class="sd"> .. code-block:: python</span>
+
+<span class="sd"> default_args = {</span>
+<span class="sd"> 'dataflow_default_options': {</span>
+<span class="sd"> 'project': 'my-gcp-project',</span>
+<span class="sd"> 'zone': 'europe-west1-d',</span>
+<span class="sd"> 'stagingLocation': 'gs://my-staging-bucket/staging/'</span>
+<span class="sd"> }</span>
+<span class="sd"> }</span>
<span class="sd"> You need to pass the path to your dataflow as a file reference with the ``jar``</span>
-<span class="sd"> parameter, the jar needs to be a self executing jar. Use ``options`` to pass on</span>
-<span class="sd"> options to your job.</span>
-
-<span class="sd"> ```</span>
-<span class="sd"> t1 = DataFlowOperation(</span>
-<span class="sd"> task_id='datapflow_example',</span>
-<span class="sd"> jar='{{var.value.gcp_dataflow_base}}pipeline/build/libs/pipeline-example-1.0.jar',</span>
-<span class="sd"> options={</span>
-<span class="sd"> 'autoscalingAlgorithm': 'BASIC',</span>
-<span class="sd"> 'maxNumWorkers': '50',</span>
-<span class="sd"> 'start': '{{ds}}',</span>
-<span class="sd"> 'partitionType': 'DAY'</span>
-<span class="sd"> },</span>
-<span class="sd"> dag=my-dag)</span>
-<span class="sd"> ```</span>
+<span class="sd"> parameter, the jar needs to be a self executing jar (see documentation here:</span>
+<span class="sd"> https://beam.apache.org/documentation/runners/dataflow/#self-executing-jar).</span>
+<span class="sd"> Use ``options`` to pass on options to your job.</span>
+
+<span class="sd"> .. code-block:: python</span>
+
+<span class="sd"> t1 = DataFlowOperation(</span>
+<span class="sd"> task_id='datapflow_example',</span>
+<span class="sd"> jar='{{var.value.gcp_dataflow_base}}pipeline/build/libs/pipeline-example-1.0.jar',</span>
+<span class="sd"> options={</span>
+<span class="sd"> 'autoscalingAlgorithm': 'BASIC',</span>
+<span class="sd"> 'maxNumWorkers': '50',</span>
+<span class="sd"> 'start': '{{ds}}',</span>
+<span class="sd"> 'partitionType': 'DAY',</span>
+<span class="sd"> 'labels': {'foo' : 'bar'}</span>
+<span class="sd"> },</span>
+<span class="sd"> gcp_conn_id='gcp-airflow-service-account',</span>
+<span class="sd"> dag=my-dag)</span>
<span class="sd"> Both ``jar`` and ``options`` are templated so you can use variables in them.</span>
<span class="sd"> """</span>
@@ -239,6 +250,8 @@
<span class="n">options</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
<span class="n">gcp_conn_id</span><span class="o">=</span><span class="s1">'google_cloud_default'</span><span class="p">,</span>
<span class="n">delegate_to</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
+ <span class="n">poll_sleep</span><span class="o">=</span><span class="mi">10</span><span class="p">,</span>
+ <span class="n">job_class</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
<span class="o">*</span><span class="n">args</span><span class="p">,</span>
<span class="o">**</span><span class="n">kwargs</span><span class="p">):</span>
<span class="sd">"""</span>
@@ -248,9 +261,10 @@
<span class="sd"> high-level options, for instances, project and zone information, which</span>
<span class="sd"> apply to all dataflow operators in the DAG.</span>
-<span class="sd"> For more detail on job submission have a look at the reference:</span>
-<span class="sd"> https://cloud.google.com/dataflow/pipelines/specifying-exec-params</span>
+<span class="sd"> .. seealso::</span>
+<span class="sd"> For more detail on job submission have a look at the reference:</span>
+<span class="sd"> https://cloud.google.com/dataflow/pipelines/specifying-exec-params</span>
<span class="sd"> :param jar: The reference to a self executing DataFlow jar.</span>
<span class="sd"> :type jar: string</span>
@@ -265,29 +279,147 @@
<span class="sd"> For this to work, the service account making the request must have</span>
<span class="sd"> domain-wide delegation enabled.</span>
<span class="sd"> :type delegate_to: string</span>
+<span class="sd"> :param poll_sleep: The time in seconds to sleep between polling Google</span>
+<span class="sd"> Cloud Platform for the dataflow job status while the job is in the</span>
+<span class="sd"> JOB_STATE_RUNNING state.</span>
+<span class="sd"> :type poll_sleep: int</span>
+<span class="sd"> :param job_class: The name of the dataflow job class to be executued, it</span>
+<span class="sd"> is often not the main class configured in the dataflow jar file.</span>
+<span class="sd"> :type job_class: string</span>
<span class="sd"> """</span>
<span class="nb">super</span><span class="p">(</span><span class="n">DataFlowJavaOperator</span><span class="p">,</span> <span class="bp">self</span><span class="p">)</span><span class="o">.</span><span class="fm">__init__</span><span class="p">(</span><span class="o">*</span><span class="n">args</span><span class="p">,</span> <span class="o">**</span><span class="n">kwargs</span><span class="p">)</span>
<span class="n">dataflow_default_options</span> <span class="o">=</span> <span class="n">dataflow_default_options</span> <span class="ow">or</span> <span class="p">{}</span>
<span class="n">options</span> <span class="o">=</span> <span class="n">options</span> <span class="ow">or</span> <span class="p">{}</span>
-
+ <span class="n">options</span><span class="o">.</span><span class="n">setdefault</span><span class="p">(</span><span class="s1">'labels'</span><span class="p">,</span> <span class="p">{})</span><span class="o">.</span><span class="n">update</span><span class="p">(</span>
+ <span class="p">{</span><span class="s1">'airflow-version'</span><span class="p">:</span> <span class="s1">'v'</span> <span class="o">+</span> <span class="n">version</span><span class="o">.</span><span class="n">replace</span><span class="p">(</span><span class="s1">'.'</span><span class="p">,</span> <span class="s1">'-'</span><span class="p">)</span><span class="o">.</span><span class="n">replace</span><span class="p">(</span><span class="s1">'+'</span><span class="p">,</span> <span class="s1">'-'</span><span class="p">)})</span>
<span class="bp">self</span><span class="o">.</span><span class="n">gcp_conn_id</span> <span class="o">=</span> <span class="n">gcp_conn_id</span>
<span class="bp">self</span><span class="o">.</span><span class="n">delegate_to</span> <span class="o">=</span> <span class="n">delegate_to</span>
<span class="bp">self</span><span class="o">.</span><span class="n">jar</span> <span class="o">=</span> <span class="n">jar</span>
<span class="bp">self</span><span class="o">.</span><span class="n">dataflow_default_options</span> <span class="o">=</span> <span class="n">dataflow_default_options</span>
<span class="bp">self</span><span class="o">.</span><span class="n">options</span> <span class="o">=</span> <span class="n">options</span>
+ <span class="bp">self</span><span class="o">.</span><span class="n">poll_sleep</span> <span class="o">=</span> <span class="n">poll_sleep</span>
+ <span class="bp">self</span><span class="o">.</span><span class="n">job_class</span> <span class="o">=</span> <span class="n">job_class</span>
<span class="k">def</span> <span class="nf">execute</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">context</span><span class="p">):</span>
<span class="n">bucket_helper</span> <span class="o">=</span> <span class="n">GoogleCloudBucketHelper</span><span class="p">(</span>
<span class="bp">self</span><span class="o">.</span><span class="n">gcp_conn_id</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">delegate_to</span><span class="p">)</span>
<span class="bp">self</span><span class="o">.</span><span class="n">jar</span> <span class="o">=</span> <span class="n">bucket_helper</span><span class="o">.</span><span class="n">google_cloud_to_local</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">jar</span><span class="p">)</span>
<span class="n">hook</span> <span class="o">=</span> <span class="n">DataFlowHook</span><span class="p">(</span><span class="n">gcp_conn_id</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">gcp_conn_id</span><span class="p">,</span>
- <span class="n">delegate_to</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">delegate_to</span><span class="p">)</span>
+ <span class="n">delegate_to</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">delegate_to</span><span class="p">,</span>
+ <span class="n">poll_sleep</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">poll_sleep</span><span class="p">)</span>
<span class="n">dataflow_options</span> <span class="o">=</span> <span class="n">copy</span><span class="o">.</span><span class="n">copy</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">dataflow_default_options</span><span class="p">)</span>
<span class="n">dataflow_options</span><span class="o">.</span><span class="n">update</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">options</span><span class="p">)</span>
- <span class="n">hook</span><span class="o">.</span><span class="n">start_java_dataflow</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">task_id</span><span class="p">,</span> <span class="n">dataflow_options</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">jar</span><span class="p">)</span></div>
+ <span class="n">hook</span><span class="o">.</span><span class="n">start_java_dataflow</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">task_id</span><span class="p">,</span> <span class="n">dataflow_options</span><span class="p">,</span>
+ <span class="bp">self</span><span class="o">.</span><span class="n">jar</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">job_class</span><span class="p">)</span></div>
+
+
+<div class="viewcode-block" id="DataflowTemplateOperator"><a class="viewcode-back" href="../../../../integration.html#airflow.contrib.operators.dataflow_operator.DataflowTemplateOperator">[docs]</a><span class="k">class</span> <span class="nc">DataflowTemplateOperator</span><span class="p">(</span><span class="n">BaseOperator</span><span class="p">):</span>
+ <span class="sd">"""</span>
+<span class="sd"> Start a Templated Cloud DataFlow batch job. The parameters of the operation</span>
+<span class="sd"> will be passed to the job.</span>
+<span class="sd"> It's a good practice to define dataflow_* parameters in the default_args of the dag</span>
+<span class="sd"> like the project, zone and staging location.</span>
+
+<span class="sd"> .. seealso::</span>
+<span class="sd"> https://cloud.google.com/dataflow/docs/reference/rest/v1b3/LaunchTemplateParameters</span>
+<span class="sd"> https://cloud.google.com/dataflow/docs/reference/rest/v1b3/RuntimeEnvironment</span>
+
+<span class="sd"> .. code-block:: python</span>
+
+<span class="sd"> default_args = {</span>
+<span class="sd"> 'dataflow_default_options': {</span>
+<span class="sd"> 'project': 'my-gcp-project'</span>
+<span class="sd"> 'zone': 'europe-west1-d',</span>
+<span class="sd"> 'tempLocation': 'gs://my-staging-bucket/staging/'</span>
+<span class="sd"> }</span>
+<span class="sd"> }</span>
+<span class="sd"> }</span>
+
+<span class="sd"> You need to pass the path to your dataflow template as a file reference with the</span>
+<span class="sd"> ``template`` parameter. Use ``parameters`` to pass on parameters to your job.</span>
+<span class="sd"> Use ``environment`` to pass on runtime environment variables to your job.</span>
+
+<span class="sd"> .. code-block:: python</span>
+
+<span class="sd"> t1 = DataflowTemplateOperator(</span>
+<span class="sd"> task_id='datapflow_example',</span>
+<span class="sd"> template='{{var.value.gcp_dataflow_base}}',</span>
+<span class="sd"> parameters={</span>
+<span class="sd"> 'inputFile': "gs://bucket/input/my_input.txt",</span>
+<span class="sd"> 'outputFile': "gs://bucket/output/my_output.txt"</span>
+<span class="sd"> },</span>
+<span class="sd"> gcp_conn_id='gcp-airflow-service-account',</span>
+<span class="sd"> dag=my-dag)</span>
+
+<span class="sd"> ``template``, ``dataflow_default_options`` and ``parameters`` are templated so you can</span>
+<span class="sd"> use variables in them.</span>
+<span class="sd"> """</span>
+ <span class="n">template_fields</span> <span class="o">=</span> <span class="p">[</span><span class="s1">'parameters'</span><span class="p">,</span> <span class="s1">'dataflow_default_options'</span><span class="p">,</span> <span class="s1">'template'</span><span class="p">]</span>
+ <span class="n">ui_color</span> <span class="o">=</span> <span class="s1">'#0273d4'</span>
+
+ <span class="nd">@apply_defaults</span>
+ <span class="k">def</span> <span class="nf">__init__</span><span class="p">(</span>
+ <span class="bp">self</span><span class="p">,</span>
+ <span class="n">template</span><span class="p">,</span>
+ <span class="n">dataflow_default_options</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
+ <span class="n">parameters</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
+ <span class="n">gcp_conn_id</span><span class="o">=</span><span class="s1">'google_cloud_default'</span><span class="p">,</span>
+ <span class="n">delegate_to</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
+ <span class="n">poll_sleep</span><span class="o">=</span><span class="mi">10</span><span class="p">,</span>
+ <span class="o">*</span><span class="n">args</span><span class="p">,</span>
+ <span class="o">**</span><span class="n">kwargs</span><span class="p">):</span>
+ <span class="sd">"""</span>
+<span class="sd"> Create a new DataflowTemplateOperator. Note that</span>
+<span class="sd"> dataflow_default_options is expected to save high-level options</span>
+<span class="sd"> for project information, which apply to all dataflow operators in the DAG.</span>
+
+<span class="sd"> .. seealso::</span>
+<span class="sd"> https://cloud.google.com/dataflow/docs/reference/rest/v1b3</span>
+<span class="sd"> /LaunchTemplateParameters</span>
+<span class="sd"> https://cloud.google.com/dataflow/docs/reference/rest/v1b3/RuntimeEnvironment</span>
+<span class="sd"> For more detail on job template execution have a look at the reference:</span>
+<span class="sd"> https://cloud.google.com/dataflow/docs/templates/executing-templates</span>
+
+<span class="sd"> :param template: The reference to the DataFlow template.</span>
+<span class="sd"> :type template: string</span>
+<span class="sd"> :param dataflow_default_options: Map of default job environment options.</span>
+<span class="sd"> :type dataflow_default_options: dict</span>
+<span class="sd"> :param parameters: Map of job specific parameters for the template.</span>
+<span class="sd"> :type parameters: dict</span>
+<span class="sd"> :param gcp_conn_id: The connection ID to use connecting to Google Cloud</span>
+<span class="sd"> Platform.</span>
+<span class="sd"> :type gcp_conn_id: string</span>
+<span class="sd"> :param delegate_to: The account to impersonate, if any.</span>
+<span class="sd"> For this to work, the service account making the request must have</span>
+<span class="sd"> domain-wide delegation enabled.</span>
+<span class="sd"> :type delegate_to: string</span>
+<span class="sd"> :param poll_sleep: The time in seconds to sleep between polling Google</span>
+<span class="sd"> Cloud Platform for the dataflow job status while the job is in the</span>
+<span class="sd"> JOB_STATE_RUNNING state.</span>
+<span class="sd"> :type poll_sleep: int</span>
+<span class="sd"> """</span>
+ <span class="nb">super</span><span class="p">(</span><span class="n">DataflowTemplateOperator</span><span class="p">,</span> <span class="bp">self</span><span class="p">)</span><span class="o">.</span><span class="fm">__init__</span><span class="p">(</span><span class="o">*</span><span class="n">args</span><span class="p">,</span> <span class="o">**</span><span class="n">kwargs</span><span class="p">)</span>
+
+ <span class="n">dataflow_default_options</span> <span class="o">=</span> <span class="n">dataflow_default_options</span> <span class="ow">or</span> <span class="p">{}</span>
+ <span class="n">parameters</span> <span class="o">=</span> <span class="n">parameters</span> <span class="ow">or</span> <span class="p">{}</span>
+
+ <span class="bp">self</span><span class="o">.</span><span class="n">gcp_conn_id</span> <span class="o">=</span> <span class="n">gcp_conn_id</span>
+ <span class="bp">self</span><span class="o">.</span><span class="n">delegate_to</span> <span class="o">=</span> <span class="n">delegate_to</span>
+ <span class="bp">self</span><span class="o">.</span><span class="n">dataflow_default_options</span> <span class="o">=</span> <span class="n">dataflow_default_options</span>
+ <span class="bp">self</span><span class="o">.</span><span class="n">poll_sleep</span> <span class="o">=</span> <span class="n">poll_sleep</span>
+ <span class="bp">self</span><span class="o">.</span><span class="n">template</span> <span class="o">=</span> <span class="n">template</span>
+ <span class="bp">self</span><span class="o">.</span><span class="n">parameters</span> <span class="o">=</span> <span class="n">parameters</span>
+
+ <span class="k">def</span> <span class="nf">execute</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">context</span><span class="p">):</span>
+ <span class="n">hook</span> <span class="o">=</span> <span class="n">DataFlowHook</span><span class="p">(</span><span class="n">gcp_conn_id</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">gcp_conn_id</span><span class="p">,</span>
+ <span class="n">delegate_to</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">delegate_to</span><span class="p">,</span>
+ <span class="n">poll_sleep</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">poll_sleep</span><span class="p">)</span>
+
+ <span class="n">hook</span><span class="o">.</span><span class="n">start_template_dataflow</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">task_id</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">dataflow_default_options</span><span class="p">,</span>
+ <span class="bp">self</span><span class="o">.</span><span class="n">parameters</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">template</span><span class="p">)</span></div>
<div class="viewcode-block" id="DataFlowPythonOperator"><a class="viewcode-back" href="../../../../integration.html#airflow.contrib.operators.dataflow_operator.DataFlowPythonOperator">[docs]</a><span class="k">class</span> <span class="nc">DataFlowPythonOperator</span><span class="p">(</span><span class="n">BaseOperator</span><span class="p">):</span>
@@ -303,6 +435,7 @@
<span class="n">options</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
<span class="n">gcp_conn_id</span><span class="o">=</span><span class="s1">'google_cloud_default'</span><span class="p">,</span>
<span class="n">delegate_to</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
+ <span class="n">poll_sleep</span><span class="o">=</span><span class="mi">10</span><span class="p">,</span>
<span class="o">*</span><span class="n">args</span><span class="p">,</span>
<span class="o">**</span><span class="n">kwargs</span><span class="p">):</span>
<span class="sd">"""</span>
@@ -312,12 +445,12 @@
<span class="sd"> high-level options, for instances, project and zone information, which</span>
<span class="sd"> apply to all dataflow operators in the DAG.</span>
-<span class="sd"> For more detail on job submission have a look at the reference:</span>
+<span class="sd"> .. seealso::</span>
+<span class="sd"> For more detail on job submission have a look at the reference:</span>
+<span class="sd"> https://cloud.google.com/dataflow/pipelines/specifying-exec-params</span>
-<span class="sd"> https://cloud.google.com/dataflow/pipelines/specifying-exec-params</span>
-
-<span class="sd"> :param py_file: Reference to the python dataflow pipleline file, e.g.,</span>
-<span class="sd"> /some/local/file/path/to/your/python/pipeline/file.py.</span>
+<span class="sd"> :param py_file: Reference to the python dataflow pipleline file.py, e.g.,</span>
+<span class="sd"> /some/local/file/path/to/your/python/pipeline/file.</span>
<span class="sd"> :type py_file: string</span>
<span class="sd"> :param py_options: Additional python options.</span>
<span class="sd"> :type pyt_options: list of strings, e.g., ["-m", "-v"].</span>
@@ -332,6 +465,10 @@
<span class="sd"> For this to work, the service account making the request must have</span>
<span class="sd"> domain-wide delegation enabled.</span>
<span class="sd"> :type delegate_to: string</span>
+<span class="sd"> :param poll_sleep: The time in seconds to sleep between polling Google</span>
+<span class="sd"> Cloud Platform for the dataflow job status while the job is in the</span>
+<span class="sd"> JOB_STATE_RUNNING state.</span>
+<span class="sd"> :type poll_sleep: int</span>
<span class="sd"> """</span>
<span class="nb">super</span><span class="p">(</span><span class="n">DataFlowPythonOperator</span><span class="p">,</span> <span class="bp">self</span><span class="p">)</span><span class="o">.</span><span class="fm">__init__</span><span class="p">(</span><span class="o">*</span><span class="n">args</span><span class="p">,</span> <span class="o">**</span><span class="n">kwargs</span><span class="p">)</span>
@@ -339,16 +476,20 @@
<span class="bp">self</span><span class="o">.</span><span class="n">py_options</span> <span class="o">=</span> <span class="n">py_options</span> <span class="ow">or</span> <span class="p">[]</span>
<span class="bp">self</span><span class="o">.</span><span class="n">dataflow_default_options</span> <span class="o">=</span> <span class="n">dataflow_default_options</span> <span class="ow">or</span> <span class="p">{}</span>
<span class="bp">self</span><span class="o">.</span><span class="n">options</span> <span class="o">=</span> <span class="n">options</span> <span class="ow">or</span> <span class="p">{}</span>
+ <span class="bp">self</span><span class="o">.</span><span class="n">options</span><span class="o">.</span><span class="n">setdefault</span><span class="p">(</span><span class="s1">'labels'</span><span class="p">,</span> <span class="p">{})</span><span class="o">.</span><span class="n">update</span><span class="p">(</span>
+ <span class="p">{</span><span class="s1">'airflow-version'</span><span class="p">:</span> <span class="s1">'v'</span> <span class="o">+</span> <span class="n">version</span><span class="o">.</span><span class="n">replace</span><span class="p">(</span><span class="s1">'.'</span><span class="p">,</span> <span class="s1">'-'</span><span class="p">)</span><span class="o">.</span><span class="n">replace</span><span class="p">(</span><span class="s1">'+'</span><span class="p">,</span> <span class="s1">'-'</span><span class="p">)})</span>
<span class="bp">self</span><span class="o">.</span><span class="n">gcp_conn_id</span> <span class="o">=</span> <span class="n">gcp_conn_id</span>
<span class="bp">self</span><span class="o">.</span><span class="n">delegate_to</span> <span class="o">=</span> <span class="n">delegate_to</span>
+ <span class="bp">self</span><span class="o">.</span><span class="n">poll_sleep</span> <span class="o">=</span> <span class="n">poll_sleep</span>
- <span class="k">def</span> <span class="nf">execute</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">context</span><span class="p">):</span>
+<div class="viewcode-block" id="DataFlowPythonOperator.execute"><a class="viewcode-back" href="../../../../integration.html#airflow.contrib.operators.dataflow_operator.DataFlowPythonOperator.execute">[docs]</a> <span class="k">def</span> <span class="nf">execute</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">context</span><span class="p">):</span>
<span class="sd">"""Execute the python dataflow job."""</span>
<span class="n">bucket_helper</span> <span class="o">=</span> <span class="n">GoogleCloudBucketHelper</span><span class="p">(</span>
<span class="bp">self</span><span class="o">.</span><span class="n">gcp_conn_id</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">delegate_to</span><span class="p">)</span>
<span class="bp">self</span><span class="o">.</span><span class="n">py_file</span> <span class="o">=</span> <span class="n">bucket_helper</span><span class="o">.</span><span class="n">google_cloud_to_local</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">py_file</span><span class="p">)</span>
<span class="n">hook</span> <span class="o">=</span> <span class="n">DataFlowHook</span><span class="p">(</span><span class="n">gcp_conn_id</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">gcp_conn_id</span><span class="p">,</span>
- <span class="n">delegate_to</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">delegate_to</span><span class="p">)</span>
+ <span class="n">delegate_to</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">delegate_to</span><span class="p">,</span>
+ <span class="n">poll_sleep</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">poll_sleep</span><span class="p">)</span>
<span class="n">dataflow_options</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">dataflow_default_options</span><span class="o">.</span><span class="n">copy</span><span class="p">()</span>
<span class="n">dataflow_options</span><span class="o">.</span><span class="n">update</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">options</span><span class="p">)</span>
<span class="c1"># Convert argument names from lowerCamelCase to snake case.</span>
@@ -358,7 +499,7 @@
<span class="k">for</span> <span class="n">key</span> <span class="ow">in</span> <span class="n">dataflow_options</span><span class="p">}</span>
<span class="n">hook</span><span class="o">.</span><span class="n">start_python_dataflow</span><span class="p">(</span>
<span class="bp">self</span><span class="o">.</span><span class="n">task_id</span><span class="p">,</span> <span class="n">formatted_options</span><span class="p">,</span>
- <span class="bp">self</span><span class="o">.</span><span class="n">py_file</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">py_options</span><span class="p">)</span></div>
+ <span class="bp">self</span><span class="o">.</span><span class="n">py_file</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">py_options</span><span class="p">)</span></div></div>
<span class="k">class</span> <span class="nc">GoogleCloudBucketHelper</span><span class="p">():</span>