You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2018/12/19 19:20:00 UTC

[jira] [Commented] (AIRFLOW-2554) Inlets and outlets should be availabe in templates by their fully_qualified name or name

    [ https://issues.apache.org/jira/browse/AIRFLOW-2554?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16725287#comment-16725287 ] 

ASF GitHub Bot commented on AIRFLOW-2554:
-----------------------------------------

stale[bot] closed pull request #3453: [AIRFLOW-2554] Enable convenience access to in/outlets in templates
URL: https://github.com/apache/incubator-airflow/pull/3453
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/airflow/models.py b/airflow/models.py
index eda480832b..0f9e1eb131 100755
--- a/airflow/models.py
+++ b/airflow/models.py
@@ -1808,6 +1808,10 @@ def get_template_context(self, session=None):
             session.expunge_all()
             session.commit()
 
+        # create convenience names for inlets and outlets
+        inlets = dict((i.qualified_name, i) for i in task.inlets)
+        outlets = dict((o.qualified_name, o) for o in task.outlets)
+
         if task.params:
             params.update(task.params)
 
@@ -1844,7 +1848,9 @@ def __getattr__(self, item):
             def __repr__(self):
                 return str(self.var)
 
-        return {
+        # make sure dag level overwrite inlets/outlets
+        context = dict(inlets, **outlets)
+        context.update({
             'dag': task.dag,
             'ds': ds,
             'next_ds': next_ds,
@@ -1877,9 +1883,11 @@ def __repr__(self):
                 'value': VariableAccessor(),
                 'json': VariableJsonAccessor()
             },
-            'inlets': task.inlets,
-            'outlets': task.outlets,
-        }
+            'inlets': inlets,
+            'outlets': outlets,
+        })
+
+        return context
 
     def overwrite_params_with_dag_run_conf(self, params, dag_run):
         if dag_run and dag_run.conf:
diff --git a/docs/lineage.rst b/docs/lineage.rst
index 719ef0115e..3a9c87e4ff 100644
--- a/docs/lineage.rst
+++ b/docs/lineage.rst
@@ -16,30 +16,30 @@ works.
     from airflow.lineage.datasets import File
     from airflow.models import DAG
     from datetime import timedelta
-    
+
     FILE_CATEGORIES = ["CAT1", "CAT2", "CAT3"]
-    
+
     args = {
         'owner': 'airflow',
         'start_date': airflow.utils.dates.days_ago(2)
     }
-    
+
     dag = DAG(
         dag_id='example_lineage', default_args=args,
         schedule_interval='0 0 * * *',
         dagrun_timeout=timedelta(minutes=60))
-    
+
     f_final = File("/tmp/final")
-    run_this_last = DummyOperator(task_id='run_this_last', dag=dag, 
+    run_this_last = DummyOperator(task_id='run_this_last', dag=dag,
         inlets={"auto": True},
         outlets={"datasets": [f_final,]})
-    
+
     f_in = File("/tmp/whole_directory/")
     outlets = []
     for file in FILE_CATEGORIES:
         f_out = File("/tmp/{}/{{{{ execution_date }}}}".format(file))
         outlets.append(f_out)
-    run_this = BashOperator(    
+    run_this = BashOperator(
         task_id='run_me_first', bash_command='echo 1', dag=dag,
         inlets={"datasets": [f_in,]},
         outlets={"datasets": outlets}
@@ -49,25 +49,39 @@ works.
 
 Tasks take the parameters `inlets` and `outlets`. Inlets can be manually defined by a list of dataset `{"datasets":
 [dataset1, dataset2]}` or can be configured to look for outlets from upstream tasks `{"task_ids": ["task_id1", "task_id2"]}`
-or can be configured to pick up outlets from direct upstream tasks `{"auto": True}` or a combination of them. Outlets 
-are defined as list of dataset `{"datasets": [dataset1, dataset2]}`. Any fields for the dataset are templated with 
-the context when the task is being executed. 
+or can be configured to pick up outlets from direct upstream tasks `{"auto": True}` or a combination of them. Outlets
+are defined as list of dataset `{"datasets": [dataset1, dataset2]}`. Any fields for the dataset are templated with
+the context when the task is being executed.
 
 .. note:: Operators can add inlets and outlets automatically if the operator supports it.
 
-In the example DAG task `run_me_first` is a BashOperator that takes 3 inlets: `CAT1`, `CAT2`, `CAT3`, that are 
+In the example DAG task `run_me_first` is a BashOperator that takes 3 inlets: `CAT1`, `CAT2`, `CAT3`, that are
 generated from a list. Note that `execution_date` is a templated field and will be rendered when the task is running.
 
 .. note:: Behind the scenes Airflow prepares the lineage metadata as part of the `pre_execute` method of a task. When the task
-          has finished execution `post_execute` is called and lineage metadata is pushed into XCOM. Thus if you are creating 
+          has finished execution `post_execute` is called and lineage metadata is pushed into XCOM. Thus if you are creating
           your own operators that override this method make sure to decorate your method with `prepare_lineage` and `apply_lineage`
           respectively.
 
+Templating
+----------
+
+Inlets and outlets are available in templated variables by referring to their (qualified) name. If their name collides
+with one of the built-in names (ie. 'dag') the built-in name takes precedence and the inlets or outlets will only be
+available by using `{{ inlet['name'] }}` in your template.
+
+.. code:: python
+
+    my_file = File(name="my_file", "my_file.{{ ds }}")
+    run_this = BashOperator(
+        task_id='run_me_first', bash_command="echo {{ my_file.name }}; echo {{ inlet['myfile'].name }}", dag=dag,
+        inlets={"datasets": [my_file,]}
+        )
 
 Apache Atlas
 ------------
 
-Airflow can send its lineage metadata to Apache Atlas. You need to enable the `atlas` backend and configure it 
+Airflow can send its lineage metadata to Apache Atlas. You need to enable the `atlas` backend and configure it
 properly, e.g. in your `airflow.cfg`:
 
 .. code:: python
@@ -80,6 +94,6 @@ properly, e.g. in your `airflow.cfg`:
     password = my_password
     host = host
     port = 21000
-    
+
 
 Please make sure to have the `atlasclient` package installed.


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


> Inlets and outlets should be availabe in templates by their fully_qualified name or name
> ----------------------------------------------------------------------------------------
>
>                 Key: AIRFLOW-2554
>                 URL: https://issues.apache.org/jira/browse/AIRFLOW-2554
>             Project: Apache Airflow
>          Issue Type: Improvement
>            Reporter: Bolke de Bruin
>            Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)