You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by bo...@apache.org on 2018/05/14 07:09:32 UTC

incubator-airflow git commit: [AIRFLOW-2425] Add lineage support

Repository: incubator-airflow
Updated Branches:
  refs/heads/master 042c3f2ae -> 648e1e693


[AIRFLOW-2425] Add lineage support

Add lineage support by having inlets and oulets
that
are made available to dependent upstream or
downstream
tasks.

If configured to do so can send lineage data to a
backend. Apache Atlas is supported out of the box.

Closes #3321 from bolkedebruin/lineage_exp


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/648e1e69
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/648e1e69
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/648e1e69

Branch: refs/heads/master
Commit: 648e1e69308029102e5251815331c701eb614403
Parents: 042c3f2
Author: Bolke de Bruin <bo...@xs4all.nl>
Authored: Mon May 14 09:09:25 2018 +0200
Committer: Bolke de Bruin <bo...@xs4all.nl>
Committed: Mon May 14 09:09:25 2018 +0200

----------------------------------------------------------------------
 airflow/config_templates/default_airflow.cfg |  11 ++
 airflow/contrib/operators/druid_operator.py  |   4 +-
 airflow/lineage/__init__.py                  | 141 ++++++++++++++++++++++
 airflow/lineage/backend/__init__.py          |  31 +++++
 airflow/lineage/backend/atlas/__init__.py    |  98 +++++++++++++++
 airflow/lineage/backend/atlas/typedefs.py    | 110 +++++++++++++++++
 airflow/lineage/datasets.py                  | 140 +++++++++++++++++++++
 airflow/logging_config.py                    |  16 +--
 airflow/models.py                            |  30 ++++-
 airflow/operators/bash_operator.py           |   5 +-
 airflow/utils/module_loading.py              |  18 ++-
 docs/index.rst                               |   1 +
 docs/lineage.rst                             |  85 +++++++++++++
 scripts/ci/requirements.txt                  |   1 +
 setup.py                                     |   4 +-
 tests/__init__.py                            |   5 +-
 tests/lineage/__init__.py                    |  18 +++
 tests/lineage/backend/__init__.py            |  18 +++
 tests/lineage/backend/test_atlas.py          |  90 ++++++++++++++
 tests/lineage/test_lineage.py                | 117 ++++++++++++++++++
 20 files changed, 920 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/648e1e69/airflow/config_templates/default_airflow.cfg
----------------------------------------------------------------------
diff --git a/airflow/config_templates/default_airflow.cfg b/airflow/config_templates/default_airflow.cfg
index ee28cc5..60d1156 100644
--- a/airflow/config_templates/default_airflow.cfg
+++ b/airflow/config_templates/default_airflow.cfg
@@ -177,6 +177,17 @@ endpoint_url = http://localhost:8080
 # How to authenticate users of the API
 auth_backend = airflow.api.auth.backend.default
 
+[lineage]
+# what lineage backend to use
+backend =
+
+[atlas]
+sasl_enabled = False
+host =
+port = 21000
+username =
+password =
+
 [operators]
 # The default owner assigned to each new operator, unless
 # provided explicitly or passed via `default_args`

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/648e1e69/airflow/contrib/operators/druid_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/druid_operator.py b/airflow/contrib/operators/druid_operator.py
index b100816..c0cb09d 100644
--- a/airflow/contrib/operators/druid_operator.py
+++ b/airflow/contrib/operators/druid_operator.py
@@ -7,9 +7,9 @@
 # to you under the Apache License, Version 2.0 (the
 # "License"); you may not use this file except in compliance
 # with the License.  You may obtain a copy of the License at
-# 
+#
 #   http://www.apache.org/licenses/LICENSE-2.0
-# 
+#
 # Unless required by applicable law or agreed to in writing,
 # software distributed under the License is distributed on an
 # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/648e1e69/airflow/lineage/__init__.py
----------------------------------------------------------------------
diff --git a/airflow/lineage/__init__.py b/airflow/lineage/__init__.py
new file mode 100644
index 0000000..3e2af47
--- /dev/null
+++ b/airflow/lineage/__init__.py
@@ -0,0 +1,141 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from functools import wraps
+
+from airflow import configuration as conf
+from airflow.lineage.datasets import DataSet
+from airflow.utils.log.logging_mixin import LoggingMixin
+from airflow.utils.module_loading import import_string, prepare_classpath
+
+from itertools import chain
+
+PIPELINE_OUTLETS = "pipeline_outlets"
+PIPELINE_INLETS = "pipeline_inlets"
+
+log = LoggingMixin().log
+
+
+def _get_backend():
+    backend = None
+
+    try:
+        _backend_str = conf.get("lineage", "backend")
+        prepare_classpath()
+        backend = import_string(_backend_str)
+    except ImportError as ie:
+        log.debug("Cannot import %s due to %s", _backend_str, ie)
+    except conf.AirflowConfigException:
+        log.debug("Could not find lineage backend key in config")
+
+    return backend
+
+
+def apply_lineage(func):
+    """
+    Saves the lineage to XCom and if configured to do so sends it
+    to the backend.
+    """
+    backend = _get_backend()
+
+    @wraps(func)
+    def wrapper(self, context, *args, **kwargs):
+        self.log.debug("Lineage called with inlets: %s, outlets: %s",
+                       self.inlets, self.outlets)
+        ret_val = func(self, context, *args, **kwargs)
+
+        outlets = [x.as_dict() for x in self.outlets]
+        inlets = [x.as_dict() for x in self.inlets]
+
+        if len(self.outlets) > 0:
+            self.xcom_push(context,
+                           key=PIPELINE_OUTLETS,
+                           value=outlets,
+                           execution_date=context['ti'].execution_date)
+
+        if len(self.inlets) > 0:
+            self.xcom_push(context,
+                           key=PIPELINE_INLETS,
+                           value=inlets,
+                           execution_date=context['ti'].execution_date)
+
+        if backend:
+            backend.send_lineage(operator=self, inlets=self.inlets,
+                                 outlets=self.outlets, context=context)
+
+        return ret_val
+
+    return wrapper
+
+
+def prepare_lineage(func):
+    """
+    Prepares the lineage inlets and outlets
+    inlets can be:
+        "auto" -> picks up any outlets from direct upstream tasks that have outlets
+        defined, as such that if A -> B -> C and B does not have outlets but A does,
+        these are provided as inlets.
+        "list of task_ids" -> picks up outlets from the upstream task_ids
+        "list of datasets" -> manually defined list of DataSet
+    """
+    @wraps(func)
+    def wrapper(self, context, *args, **kwargs):
+        self.log.debug("Preparing lineage inlets and outlets")
+
+        task_ids = set(self._inlets['task_ids']).intersection(
+            self.get_flat_relative_ids(upstream=True)
+        )
+        if task_ids:
+            inlets = self.xcom_pull(context,
+                                    task_ids=task_ids,
+                                    dag_id=self.dag_id,
+                                    key=PIPELINE_OUTLETS)
+            inlets = [item for sublist in inlets if sublist for item in sublist]
+            inlets = [DataSet.map_type(i['typeName'])(data=i['attributes'])
+                      for i in inlets]
+            self.inlets.extend(inlets)
+
+        if self._inlets['auto']:
+            # dont append twice
+            task_ids = set(self._inlets['task_ids']).symmetric_difference(
+                self.upstream_task_ids
+            )
+            inlets = self.xcom_pull(context,
+                                    task_ids=task_ids,
+                                    dag_id=self.dag_id,
+                                    key=PIPELINE_OUTLETS)
+            inlets = [item for sublist in inlets if sublist for item in sublist]
+            inlets = [DataSet.map_type(i['typeName'])(data=i['attributes'])
+                      for i in inlets]
+            self.inlets.extend(inlets)
+
+        if len(self._inlets['datasets']) > 0:
+            self.inlets.extend(self._inlets['datasets'])
+
+        # outlets
+        if len(self._outlets['datasets']) > 0:
+            self.outlets.extend(self._outlets['datasets'])
+
+        self.log.debug("inlets: %s, outlets: %s", self.inlets, self.outlets)
+
+        for dataset in chain(self.inlets, self.outlets):
+            dataset.set_context(context)
+
+        return func(self, context, *args, **kwargs)
+
+    return wrapper

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/648e1e69/airflow/lineage/backend/__init__.py
----------------------------------------------------------------------
diff --git a/airflow/lineage/backend/__init__.py b/airflow/lineage/backend/__init__.py
new file mode 100644
index 0000000..7913021
--- /dev/null
+++ b/airflow/lineage/backend/__init__.py
@@ -0,0 +1,31 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+
+class LineageBackend(object):
+    def send_lineage(self,
+                     operator=None, inlets=None, outlets=None, context=None):
+        """
+        Sends lineage metadata to a backend
+        :param operator: the operator executing a transformation on the inlets and outlets
+        :param inlets: the inlets to this operator
+        :param outlets: the outlets from this operator
+        :param context: the current context of the task instance
+        """
+        raise NotImplementedError()

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/648e1e69/airflow/lineage/backend/atlas/__init__.py
----------------------------------------------------------------------
diff --git a/airflow/lineage/backend/atlas/__init__.py b/airflow/lineage/backend/atlas/__init__.py
new file mode 100644
index 0000000..69335be
--- /dev/null
+++ b/airflow/lineage/backend/atlas/__init__.py
@@ -0,0 +1,98 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+from airflow import configuration as conf
+from airflow.lineage import datasets
+from airflow.lineage.backend import LineageBackend
+from airflow.lineage.backend.atlas.typedefs import operator_typedef
+from airflow.utils.timezone import convert_to_utc
+
+from atlasclient.client import Atlas
+from atlasclient.exceptions import HttpError
+
+SERIALIZED_DATE_FORMAT_STR = "%Y-%m-%dT%H:%M:%S.%fZ"
+
+_username = conf.get("atlas", "username")
+_password = conf.get("atlas", "password")
+_port = conf.get("atlas", "port")
+_host = conf.get("atlas", "host")
+
+
+class AtlasBackend(LineageBackend):
+    def send_lineage(self, operator, inlets, outlets, context):
+        client = Atlas(_host, port=_port, username=_username, password=_password)
+        try:
+            client.typedefs.create(data=operator_typedef)
+        except HttpError:
+            client.typedefs.update(data=operator_typedef)
+
+        _execution_date = convert_to_utc(context['ti'].execution_date)
+        _start_date = convert_to_utc(context['ti'].start_date)
+        _end_date = convert_to_utc(context['ti'].end_date)
+
+        inlet_list = []
+        if inlets:
+            for entity in inlets:
+                if entity is None:
+                    continue
+
+                entity.set_context(context)
+                client.entity_post.create(data={"entity": entity.as_dict()})
+                inlet_list.append({"typeName": entity.type_name,
+                                   "uniqueAttributes": {
+                                       "qualifiedName": entity.qualified_name
+                                   }})
+
+        outlet_list = []
+        if outlets:
+            for entity in outlets:
+                if not entity:
+                    continue
+
+                entity.set_context(context)
+                client.entity_post.create(data={"entity": entity.as_dict()})
+                outlet_list.append({"typeName": entity.type_name,
+                                    "uniqueAttributes": {
+                                        "qualifiedName": entity.qualified_name
+                                    }})
+
+        operator_name = operator.__class__.__name__
+        name = "{} {} ({})".format(operator.dag_id, operator.task_id, operator_name)
+        qualified_name = "{}_{}_{}@{}".format(operator.dag_id,
+                                              operator.task_id,
+                                              _execution_date,
+                                              operator_name)
+
+        data = {
+            "dag_id": operator.dag_id,
+            "task_id": operator.task_id,
+            "execution_date": _execution_date.strftime(SERIALIZED_DATE_FORMAT_STR),
+            "name": name,
+            "inputs": inlet_list,
+            "outputs": outlet_list,
+            "command": operator.lineage_data,
+        }
+
+        if _start_date:
+            data["start_date"] = _start_date.strftime(SERIALIZED_DATE_FORMAT_STR)
+        if _end_date:
+            data["end_date"] = _end_date.strftime(SERIALIZED_DATE_FORMAT_STR)
+
+        process = datasets.Operator(qualified_name=qualified_name, data=data)
+        client.entity_post.create(data={"entity": process.as_dict()})

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/648e1e69/airflow/lineage/backend/atlas/typedefs.py
----------------------------------------------------------------------
diff --git a/airflow/lineage/backend/atlas/typedefs.py b/airflow/lineage/backend/atlas/typedefs.py
new file mode 100644
index 0000000..1df22bb
--- /dev/null
+++ b/airflow/lineage/backend/atlas/typedefs.py
@@ -0,0 +1,110 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+operator_typedef = {
+    "enumDefs": [],
+    "structDefs": [],
+    "classificationDefs": [],
+    "entityDefs": [
+        {
+            "superTypes": [
+                "Process"
+            ],
+            "name": "airflow_operator",
+            "description": "Airflow Operator",
+            "createdBy": "airflow",
+            "updatedBy": "airflow",
+            "attributeDefs": [
+                # "name" will be set to Operator name
+                # "qualifiedName" will be set to dag_id_task_id@operator_name
+                {
+                    "name": "dag_id",
+                    "isOptional": False,
+                    "isUnique": False,
+                    "isIndexable": True,
+                    "typeName": "string",
+                    "valuesMaxCount": 1,
+                    "cardinality": "SINGLE",
+                    "valuesMinCount": 0
+                },
+                {
+                    "name": "task_id",
+                    "isOptional": False,
+                    "isUnique": False,
+                    "isIndexable": True,
+                    "typeName": "string",
+                    "valuesMaxCount": 1,
+                    "cardinality": "SINGLE",
+                    "valuesMinCount": 0
+                },
+                {
+                    "name": "command",
+                    "isOptional": True,
+                    "isUnique": False,
+                    "isIndexable": False,
+                    "typeName": "string",
+                    "valuesMaxCount": 1,
+                    "cardinality": "SINGLE",
+                    "valuesMinCount": 0
+                },
+                {
+                    "name": "conn_id",
+                    "isOptional": True,
+                    "isUnique": False,
+                    "isIndexable": False,
+                    "typeName": "string",
+                    "valuesMaxCount": 1,
+                    "cardinality": "SINGLE",
+                    "valuesMinCount": 0
+                },
+                {
+                    "name": "execution_date",
+                    "isOptional": False,
+                    "isUnique": False,
+                    "isIndexable": True,
+                    "typeName": "date",
+                    "valuesMaxCount": 1,
+                    "cardinality": "SINGLE",
+                    "valuesMinCount": 0
+                },
+                {
+                    "name": "start_date",
+                    "isOptional": True,
+                    "isUnique": False,
+                    "isIndexable": False,
+                    "typeName": "date",
+                    "valuesMaxCount": 1,
+                    "cardinality": "SINGLE",
+                    "valuesMinCount": 0
+                },
+                {
+                    "name": "end_date",
+                    "isOptional": True,
+                    "isUnique": False,
+                    "isIndexable": False,
+                    "typeName": "date",
+                    "valuesMaxCount": 1,
+                    "cardinality": "SINGLE",
+                    "valuesMinCount": 0
+                },
+            ],
+        },
+    ],
+}

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/648e1e69/airflow/lineage/datasets.py
----------------------------------------------------------------------
diff --git a/airflow/lineage/datasets.py b/airflow/lineage/datasets.py
new file mode 100644
index 0000000..40c8edc
--- /dev/null
+++ b/airflow/lineage/datasets.py
@@ -0,0 +1,140 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import six
+
+from jinja2 import Environment
+
+
+def _inherited(cls):
+    return set(cls.__subclasses__()).union(
+        [s for c in cls.__subclasses__() for s in _inherited(c)]
+    )
+
+
+class DataSet(object):
+    attributes = []
+    type_name = "dataSet"
+
+    def __init__(self, qualified_name=None, data=None, **kwargs):
+        self._qualified_name = qualified_name
+        self.context = None
+        self._data = dict()
+
+        self._data.update(dict((key, value) for key, value in six.iteritems(kwargs)
+                               if key in set(self.attributes)))
+
+        if data:
+            if "qualifiedName" in data:
+                self._qualified_name = data.pop("qualifiedName")
+
+            self._data = dict((key, value) for key, value in six.iteritems(data)
+                              if key in set(self.attributes))
+
+    def set_context(self, context):
+        self.context = context
+
+    @property
+    def qualified_name(self):
+        if self.context:
+            env = Environment()
+            return env.from_string(self._qualified_name).render(**self.context)
+
+        return self._qualified_name
+
+    def __getattr__(self, attr):
+        if attr in self.attributes:
+            if self.context:
+                env = Environment()
+                return env.from_string(self._data.get(attr)).render(**self.context)
+
+            return self._data.get(attr)
+
+        raise AttributeError(attr)
+
+    def __getitem__(self, item):
+        return self.__getattr__(item)
+
+    def __iter__(self):
+        for key, value in six.iteritems(self._data):
+            yield (key, value)
+
+    def as_dict(self):
+        attributes = dict(self._data)
+        attributes.update({"qualifiedName": self.qualified_name})
+
+        env = Environment()
+        if self.context:
+            for key, value in six.iteritems(attributes):
+                attributes[key] = env.from_string(value).render(**self.context)
+
+        d = {
+            "typeName": self.type_name,
+            "attributes": attributes,
+        }
+
+        return d
+
+    @staticmethod
+    def map_type(name):
+        for cls in _inherited(DataSet):
+            if cls.type_name == name:
+                return cls
+
+        raise NotImplemented("No known mapping for {}".format(name))
+
+
+class DataBase(DataSet):
+    type_name = "dbStore"
+    attributes = ["dbStoreType", "storeUse", "source", "description", "userName",
+                  "storeUri", "operation", "startTime", "endTime", "commandlineOpts",
+                  "attribute_db"]
+
+
+class File(DataSet):
+    type_name = "fs_path"
+    attributes = ["name", "path", "isFile", "isSymlink"]
+
+    def __init__(self, name=None, data=None):
+        super(File, self).__init__(name=name, data=data)
+
+        self._qualified_name = 'file://' + self.name
+        self._data['path'] = self.name
+
+
+class HadoopFile(File):
+    cluster_name = "none"
+    attributes = ["name", "path", "clusterName"]
+
+    type_name = "hdfs_file"
+
+    def __init__(self, name=None, data=None):
+        super(File, self).__init__(name=name, data=data)
+
+        self._qualified_name = "{}@{}".format(self.name, self.cluster_name)
+        self._data['path'] = self.name
+
+        self._data['clusterName'] = self.cluster_name
+
+
+class Operator(DataSet):
+    type_name = "airflow_operator"
+
+    # todo we can derive this from the spec
+    attributes = ["dag_id", "task_id", "command", "conn_id", "name", "execution_date",
+                  "start_date", "end_date", "inputs", "outputs"]

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/648e1e69/airflow/logging_config.py
----------------------------------------------------------------------
diff --git a/airflow/logging_config.py b/airflow/logging_config.py
index cf1a275..33c2dc8 100644
--- a/airflow/logging_config.py
+++ b/airflow/logging_config.py
@@ -7,9 +7,9 @@
 # to you under the Apache License, Version 2.0 (the
 # "License"); you may not use this file except in compliance
 # with the License.  You may obtain a copy of the License at
-# 
+#
 #   http://www.apache.org/licenses/LICENSE-2.0
-# 
+#
 # Unless required by applicable law or agreed to in writing,
 # software distributed under the License is distributed on an
 # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -18,25 +18,15 @@
 # under the License.
 #
 import logging
-import os
-import sys
 from logging.config import dictConfig
 
 from airflow import configuration as conf
 from airflow.exceptions import AirflowConfigException
-from airflow.utils.module_loading import import_string
+from airflow.utils.module_loading import import_string, prepare_classpath
 
 log = logging.getLogger(__name__)
 
 
-def prepare_classpath():
-    config_path = os.path.join(conf.get('core', 'airflow_home'), 'config')
-    config_path = os.path.expanduser(config_path)
-
-    if config_path not in sys.path:
-        sys.path.append(config_path)
-
-
 def configure_logging():
     logging_class_path = ''
     try:

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/648e1e69/airflow/models.py
----------------------------------------------------------------------
diff --git a/airflow/models.py b/airflow/models.py
index 5903075..ec4d2bb 100755
--- a/airflow/models.py
+++ b/airflow/models.py
@@ -73,6 +73,7 @@ from airflow.exceptions import (
     AirflowDagCycleException, AirflowException, AirflowSkipException, AirflowTaskTimeout
 )
 from airflow.dag.base_dag import BaseDag, BaseDagBag
+from airflow.lineage import apply_lineage, prepare_lineage
 from airflow.ti_deps.deps.not_in_retry_period_dep import NotInRetryPeriodDep
 from airflow.ti_deps.deps.prev_dagrun_dep import PrevDagrunDep
 from airflow.ti_deps.deps.trigger_rule_dep import TriggerRuleDep
@@ -1832,7 +1833,9 @@ class TaskInstance(Base, LoggingMixin):
             'var': {
                 'value': VariableAccessor(),
                 'json': VariableJsonAccessor()
-            }
+            },
+            'inlets': task.inlets,
+            'outlets': task.outlets,
         }
 
     def render_templates(self):
@@ -2282,6 +2285,8 @@ class BaseOperator(LoggingMixin):
             run_as_user=None,
             task_concurrency=None,
             executor_config=None,
+            inlets=None,
+            outlets=None,
             *args,
             **kwargs):
 
@@ -2369,6 +2374,27 @@ class BaseOperator(LoggingMixin):
 
         self._log = logging.getLogger("airflow.task.operators")
 
+        # lineage
+        self.inlets = []
+        self.outlets = []
+        self.lineage_data = None
+
+        self._inlets = {
+            "auto": False,
+            "task_ids": [],
+            "datasets": [],
+        }
+
+        self._outlets = {
+            "datasets": [],
+        }
+
+        if inlets:
+            self._inlets.update(inlets)
+
+        if outlets:
+            self._outlets.update(outlets)
+
         self._comps = {
             'task_id',
             'dag_id',
@@ -2546,6 +2572,7 @@ class BaseOperator(LoggingMixin):
                 self.get_flat_relative_ids(upstream=upstream))
         )
 
+    @prepare_lineage
     def pre_execute(self, context):
         """
         This hook is triggered right before self.execute() is called.
@@ -2561,6 +2588,7 @@ class BaseOperator(LoggingMixin):
         """
         raise NotImplementedError()
 
+    @apply_lineage
     def post_execute(self, context, result=None):
         """
         This hook is triggered right after self.execute() is called.

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/648e1e69/airflow/operators/bash_operator.py
----------------------------------------------------------------------
diff --git a/airflow/operators/bash_operator.py b/airflow/operators/bash_operator.py
index 4b16d69..7db562e 100644
--- a/airflow/operators/bash_operator.py
+++ b/airflow/operators/bash_operator.py
@@ -7,9 +7,9 @@
 # to you under the Apache License, Version 2.0 (the
 # "License"); you may not use this file except in compliance
 # with the License.  You may obtain a copy of the License at
-# 
+#
 #   http://www.apache.org/licenses/LICENSE-2.0
-# 
+#
 # Unless required by applicable law or agreed to in writing,
 # software distributed under the License is distributed on an
 # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -89,6 +89,7 @@ class BashOperator(BaseOperator):
         bash_command = ('export {}={}; '.format(AIRFLOW_HOME_VAR, airflow_home_value) +
                         'export {}={}; '.format(PYTHONPATH_VAR, pythonpath_value) +
                         self.bash_command)
+        self.lineage_data = bash_command
 
         with TemporaryDirectory(prefix='airflowtmp') as tmp_dir:
             with NamedTemporaryFile(dir=tmp_dir, prefix=self.task_id) as f:

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/648e1e69/airflow/utils/module_loading.py
----------------------------------------------------------------------
diff --git a/airflow/utils/module_loading.py b/airflow/utils/module_loading.py
index 846c8e0..6e638b0 100644
--- a/airflow/utils/module_loading.py
+++ b/airflow/utils/module_loading.py
@@ -7,19 +7,33 @@
 # to you under the Apache License, Version 2.0 (the
 # "License"); you may not use this file except in compliance
 # with the License.  You may obtain a copy of the License at
-# 
+#
 #   http://www.apache.org/licenses/LICENSE-2.0
-# 
+#
 # Unless required by applicable law or agreed to in writing,
 # software distributed under the License is distributed on an
 # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
+import os
+import sys
 
+from airflow import configuration as conf
 from importlib import import_module
 
 
+def prepare_classpath():
+    """
+    Ensures that the Airflow home directory is on the classpath
+    """
+    config_path = os.path.join(conf.get('core', 'airflow_home'), 'config')
+    config_path = os.path.expanduser(config_path)
+
+    if config_path not in sys.path:
+        sys.path.append(config_path)
+
+
 def import_string(dotted_path):
     """
     Import a dotted module path and return the attribute/class designated by the

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/648e1e69/docs/index.rst
----------------------------------------------------------------------
diff --git a/docs/index.rst b/docs/index.rst
index 42349ea..125a1fb 100644
--- a/docs/index.rst
+++ b/docs/index.rst
@@ -86,5 +86,6 @@ Content
     timezone
     api
     integration
+    lineage
     faq
     code

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/648e1e69/docs/lineage.rst
----------------------------------------------------------------------
diff --git a/docs/lineage.rst b/docs/lineage.rst
new file mode 100644
index 0000000..719ef01
--- /dev/null
+++ b/docs/lineage.rst
@@ -0,0 +1,85 @@
+Lineage
+=======
+
+.. note:: Lineage support is very experimental and subject to change.
+
+Airflow can help track origins of data, what happens to it and where it moves over time. This can aid having
+audit trails and data governance, but also debugging of data flows.
+
+Airflow tracks data by means of inlets and outlets of the tasks. Let's work from an example and see how it
+works.
+
+.. code:: python
+
+    from airflow.operators.bash_operator import BashOperator
+    from airflow.operators.dummy_operator import DummyOperator
+    from airflow.lineage.datasets import File
+    from airflow.models import DAG
+    from datetime import timedelta
+    
+    FILE_CATEGORIES = ["CAT1", "CAT2", "CAT3"]
+    
+    args = {
+        'owner': 'airflow',
+        'start_date': airflow.utils.dates.days_ago(2)
+    }
+    
+    dag = DAG(
+        dag_id='example_lineage', default_args=args,
+        schedule_interval='0 0 * * *',
+        dagrun_timeout=timedelta(minutes=60))
+    
+    f_final = File("/tmp/final")
+    run_this_last = DummyOperator(task_id='run_this_last', dag=dag, 
+        inlets={"auto": True},
+        outlets={"datasets": [f_final,]})
+    
+    f_in = File("/tmp/whole_directory/")
+    outlets = []
+    for file in FILE_CATEGORIES:
+        f_out = File("/tmp/{}/{{{{ execution_date }}}}".format(file))
+        outlets.append(f_out)
+    run_this = BashOperator(    
+        task_id='run_me_first', bash_command='echo 1', dag=dag,
+        inlets={"datasets": [f_in,]},
+        outlets={"datasets": outlets}
+        )
+    run_this.set_downstream(run_this_last)
+
+
+Tasks take the parameters `inlets` and `outlets`. Inlets can be manually defined by a list of dataset `{"datasets":
+[dataset1, dataset2]}` or can be configured to look for outlets from upstream tasks `{"task_ids": ["task_id1", "task_id2"]}`
+or can be configured to pick up outlets from direct upstream tasks `{"auto": True}` or a combination of them. Outlets 
+are defined as list of dataset `{"datasets": [dataset1, dataset2]}`. Any fields for the dataset are templated with 
+the context when the task is being executed. 
+
+.. note:: Operators can add inlets and outlets automatically if the operator supports it.
+
+In the example DAG task `run_me_first` is a BashOperator that takes 3 inlets: `CAT1`, `CAT2`, `CAT3`, that are 
+generated from a list. Note that `execution_date` is a templated field and will be rendered when the task is running.
+
+.. note:: Behind the scenes Airflow prepares the lineage metadata as part of the `pre_execute` method of a task. When the task
+          has finished execution `post_execute` is called and lineage metadata is pushed into XCOM. Thus if you are creating 
+          your own operators that override this method make sure to decorate your method with `prepare_lineage` and `apply_lineage`
+          respectively.
+
+
+Apache Atlas
+------------
+
+Airflow can send its lineage metadata to Apache Atlas. You need to enable the `atlas` backend and configure it 
+properly, e.g. in your `airflow.cfg`:
+
+.. code:: python
+
+    [lineage]
+    backend = airflow.lineage.backend.atlas
+
+    [atlas]
+    username = my_username
+    password = my_password
+    host = host
+    port = 21000
+    
+
+Please make sure to have the `atlasclient` package installed.

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/648e1e69/scripts/ci/requirements.txt
----------------------------------------------------------------------
diff --git a/scripts/ci/requirements.txt b/scripts/ci/requirements.txt
index a2a78df..8ab52fc 100644
--- a/scripts/ci/requirements.txt
+++ b/scripts/ci/requirements.txt
@@ -12,6 +12,7 @@
 # limitations under the License.
 
 alembic
+atlasclient
 azure-storage>=0.34.0
 bcrypt
 bleach

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/648e1e69/setup.py
----------------------------------------------------------------------
diff --git a/setup.py b/setup.py
index 7e78439..8999695 100644
--- a/setup.py
+++ b/setup.py
@@ -107,6 +107,7 @@ async = [
     'eventlet>= 0.9.7',
     'gevent>=0.13'
 ]
+atlas = ['atlasclient>=0.1.2']
 azure = ['azure-storage>=0.34.0']
 sendgrid = ['sendgrid>=5.2.0']
 celery = [
@@ -212,7 +213,7 @@ devel_hadoop = devel_minreq + hive + hdfs + webhdfs + kerberos
 devel_all = (sendgrid + devel + all_dbs + doc + samba + s3 + slack + crypto + oracle +
              docker + ssh + kubernetes + celery + azure + redis + gcp_api + datadog +
              zendesk + jdbc + ldap + kerberos + password + webhdfs + jenkins +
-             druid + pinot + segment + snowflake + elasticsearch)
+             druid + pinot + segment + snowflake + elasticsearch + atlas)
 
 # Snakebite & Google Cloud Dataflow are not Python 3 compatible :'(
 if PY3:
@@ -279,6 +280,7 @@ def do_setup():
             'all': devel_all,
             'devel_ci': devel_ci,
             'all_dbs': all_dbs,
+            'atlas': atlas,
             'async': async,
             'azure': azure,
             'celery': celery,

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/648e1e69/tests/__init__.py
----------------------------------------------------------------------
diff --git a/tests/__init__.py b/tests/__init__.py
index 59c97e5..eff9d4b 100644
--- a/tests/__init__.py
+++ b/tests/__init__.py
@@ -7,9 +7,9 @@
 # to you under the Apache License, Version 2.0 (the
 # "License"); you may not use this file except in compliance
 # with the License.  You may obtain a copy of the License at
-# 
+#
 #   http://www.apache.org/licenses/LICENSE-2.0
-# 
+#
 # Unless required by applicable law or agreed to in writing,
 # software distributed under the License is distributed on an
 # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -28,6 +28,7 @@ from .core import *
 from .executors import *
 from .jobs import *
 from .impersonation import *
+from .lineage import *
 from .models import *
 from .operators import *
 from .security import *

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/648e1e69/tests/lineage/__init__.py
----------------------------------------------------------------------
diff --git a/tests/lineage/__init__.py b/tests/lineage/__init__.py
new file mode 100644
index 0000000..114d189
--- /dev/null
+++ b/tests/lineage/__init__.py
@@ -0,0 +1,18 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/648e1e69/tests/lineage/backend/__init__.py
----------------------------------------------------------------------
diff --git a/tests/lineage/backend/__init__.py b/tests/lineage/backend/__init__.py
new file mode 100644
index 0000000..114d189
--- /dev/null
+++ b/tests/lineage/backend/__init__.py
@@ -0,0 +1,18 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/648e1e69/tests/lineage/backend/test_atlas.py
----------------------------------------------------------------------
diff --git a/tests/lineage/backend/test_atlas.py b/tests/lineage/backend/test_atlas.py
new file mode 100644
index 0000000..9c98334
--- /dev/null
+++ b/tests/lineage/backend/test_atlas.py
@@ -0,0 +1,90 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import unittest
+
+from airflow import configuration as conf
+from airflow.configuration import AirflowConfigException
+from airflow.lineage.backend.atlas import AtlasBackend
+from airflow.lineage.datasets import File
+from airflow.models import DAG, TaskInstance as TI
+from airflow.operators.dummy_operator import DummyOperator
+from airflow.utils import timezone
+
+from backports.configparser import DuplicateSectionError
+
+DEFAULT_DATE = timezone.datetime(2016, 1, 1)
+
+try:
+    from unittest import mock
+except ImportError:
+    try:
+        import mock
+    except ImportError:
+        mock = None
+
+
+class TestAtlas(unittest.TestCase):
+    def setUp(self):
+        conf.load_test_config()
+        try:
+            conf.conf.add_section("atlas")
+        except AirflowConfigException:
+            pass
+        except DuplicateSectionError:
+            pass
+
+        conf.conf.set("atlas", "username", "none")
+        conf.conf.set("atlas", "password", "none")
+        conf.conf.set("atlas", "host", "none")
+        conf.conf.set("atlas", "port", "0")
+
+        self.atlas = AtlasBackend()
+
+    @mock.patch("airflow.lineage.backend.atlas.Atlas")
+    def test_lineage_send(self, atlas_mock):
+        td = mock.MagicMock()
+        en = mock.MagicMock()
+        atlas_mock.return_value = mock.Mock(typedefs=td, entity_post=en)
+
+        dag = DAG(
+            dag_id='test_prepare_lineage',
+            start_date=DEFAULT_DATE
+        )
+
+        f1 = File("/tmp/does_not_exist_1")
+        f2 = File("/tmp/does_not_exist_2")
+
+        inlets_d = [f1, ]
+        outlets_d = [f2, ]
+
+        with dag:
+            op1 = DummyOperator(task_id='leave1',
+                                inlets={"datasets": inlets_d},
+                                outlets={"datasets": outlets_d})
+
+        ctx = {"ti": TI(task=op1, execution_date=DEFAULT_DATE)}
+
+        self.atlas.send_lineage(operator=op1, inlets=inlets_d,
+                                outlets=outlets_d, context=ctx)
+
+        self.assertEqual(td.create.call_count, 1)
+        self.assertTrue(en.create.called)
+        self.assertEqual(len(en.mock_calls), 3)
+
+        # test can be broader

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/648e1e69/tests/lineage/test_lineage.py
----------------------------------------------------------------------
diff --git a/tests/lineage/test_lineage.py b/tests/lineage/test_lineage.py
new file mode 100644
index 0000000..92b74fe
--- /dev/null
+++ b/tests/lineage/test_lineage.py
@@ -0,0 +1,117 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import unittest
+
+from airflow.lineage import apply_lineage, prepare_lineage
+from airflow.lineage.datasets import File
+from airflow.models import DAG, TaskInstance as TI
+from airflow.operators.dummy_operator import DummyOperator
+from airflow.utils import timezone
+
+DEFAULT_DATE = timezone.datetime(2016, 1, 1)
+
+try:
+    from unittest import mock
+except ImportError:
+    try:
+        import mock
+    except ImportError:
+        mock = None
+
+
+class TestLineage(unittest.TestCase):
+
+    @mock.patch("airflow.lineage._get_backend")
+    def test_lineage(self, _get_backend):
+        backend = mock.Mock()
+        send_mock = mock.Mock()
+        backend.send_lineage = send_mock
+
+        _get_backend.return_value = backend
+
+        dag = DAG(
+            dag_id='test_prepare_lineage',
+            start_date=DEFAULT_DATE
+        )
+
+        f1 = File("/tmp/does_not_exist_1")
+        f2 = File("/tmp/does_not_exist_2")
+        f3 = File("/tmp/does_not_exist_3")
+
+        with dag:
+            op1 = DummyOperator(task_id='leave1',
+                                inlets={"datasets": [f1, ]},
+                                outlets={"datasets": [f2, ]})
+            op2 = DummyOperator(task_id='leave2')
+            op3 = DummyOperator(task_id='upstream_level_1',
+                                inlets={"auto": True},
+                                outlets={"datasets": [f3, ]})
+            op4 = DummyOperator(task_id='upstream_level_2')
+            op5 = DummyOperator(task_id='upstream_level_3',
+                                inlets={"task_ids": ["leave1", "upstream_level_1"]})
+
+            op1.set_downstream(op3)
+            op2.set_downstream(op3)
+            op3.set_downstream(op4)
+            op4.set_downstream(op5)
+
+        ctx1 = {"ti": TI(task=op1, execution_date=DEFAULT_DATE)}
+        ctx2 = {"ti": TI(task=op2, execution_date=DEFAULT_DATE)}
+        ctx3 = {"ti": TI(task=op3, execution_date=DEFAULT_DATE)}
+        ctx5 = {"ti": TI(task=op5, execution_date=DEFAULT_DATE)}
+
+        func = mock.Mock()
+        func.__name__ = 'foo'
+
+        # prepare with manual inlets and outlets
+        prep = prepare_lineage(func)
+        prep(op1, ctx1)
+
+        self.assertEqual(len(op1.inlets), 1)
+        self.assertEqual(op1.inlets[0], f1)
+
+        self.assertEqual(len(op1.outlets), 1)
+        self.assertEqual(op1.outlets[0], f2)
+
+        # post process with no backend
+        post = apply_lineage(func)
+        post(op1, ctx1)
+        self.assertEqual(send_mock.call_count, 1)
+        send_mock.reset_mock()
+
+        prep(op2, ctx2)
+        self.assertEqual(len(op2.inlets), 0)
+        post(op2, ctx2)
+        self.assertEqual(send_mock.call_count, 1)
+        send_mock.reset_mock()
+
+        prep(op3, ctx3)
+        self.assertEqual(len(op3.inlets), 1)
+        self.assertEqual(op3.inlets[0].qualified_name, f2.qualified_name)
+        post(op3, ctx3)
+        self.assertEqual(send_mock.call_count, 1)
+        send_mock.reset_mock()
+
+        # skip 4
+
+        prep(op5, ctx5)
+        self.assertEqual(len(op5.inlets), 2)
+        post(op5, ctx5)
+        self.assertEqual(send_mock.call_count, 1)
+        send_mock.reset_mock()