You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by as...@apache.org on 2021/04/15 12:07:48 UTC

[airflow] 29/36: Restore base lineage backend (#14146)

This is an automated email from the ASF dual-hosted git repository.

ash pushed a commit to branch v2-0-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit ac00eab33446171c2776d2822621fa8845a4bfcd
Author: João Ponte <JP...@users.noreply.github.com>
AuthorDate: Sat Apr 3 10:26:59 2021 +0200

    Restore base lineage backend (#14146)
    
    This adds back the base lineage backend which can be extended to send lineage metadata to any custom backend.
    closes: #14106
    
    Co-authored-by: Joao Ponte <jp...@plista.com>
    Co-authored-by: Tomek Urbaszek <tu...@gmail.com>
    (cherry picked from commit af2d11e36ed43b0103a54780640493b8ae46d70e)
---
 airflow/lineage/__init__.py     | 22 ++++++++++++++++++
 airflow/lineage/backend.py      | 47 +++++++++++++++++++++++++++++++++++++++
 docs/apache-airflow/lineage.rst | 21 ++++++++++++++++++
 tests/lineage/test_lineage.py   | 49 ++++++++++++++++++++++++++++++++++++++++-
 4 files changed, 138 insertions(+), 1 deletion(-)

diff --git a/airflow/lineage/__init__.py b/airflow/lineage/__init__.py
index 65f19ef..905eb00 100644
--- a/airflow/lineage/__init__.py
+++ b/airflow/lineage/__init__.py
@@ -25,6 +25,8 @@ import attr
 import jinja2
 from cattr import structure, unstructure
 
+from airflow.configuration import conf
+from airflow.lineage.backend import LineageBackend
 from airflow.utils.module_loading import import_string
 
 ENV = jinja2.Environment()
@@ -45,6 +47,22 @@ class Metadata:
     data: Dict = attr.ib()
 
 
+def get_backend() -> Optional[LineageBackend]:
+    """Gets the lineage backend if defined in the configs"""
+    clazz = conf.getimport("lineage", "backend", fallback=None)
+
+    if clazz:
+        if not issubclass(clazz, LineageBackend):
+            raise TypeError(
+                f"Your custom Lineage class `{clazz.__name__}` "
+                f"is not a subclass of `{LineageBackend.__name__}`."
+            )
+        else:
+            return clazz()
+
+    return None
+
+
 def _get_instance(meta: Metadata):
     """Instantiate an object from Metadata"""
     cls = import_string(meta.type_name)
@@ -82,6 +100,7 @@ def apply_lineage(func: T) -> T:
     Saves the lineage to XCom and if configured to do so sends it
     to the backend.
     """
+    _backend = get_backend()
 
     @wraps(func)
     def wrapper(self, context, *args, **kwargs):
@@ -101,6 +120,9 @@ def apply_lineage(func: T) -> T:
                 context, key=PIPELINE_INLETS, value=inlets, execution_date=context['ti'].execution_date
             )
 
+        if _backend:
+            _backend.send_lineage(operator=self, inlets=self.inlets, outlets=self.outlets, context=context)
+
         return ret_val
 
     return cast(T, wrapper)
diff --git a/airflow/lineage/backend.py b/airflow/lineage/backend.py
new file mode 100644
index 0000000..edfbe0e
--- /dev/null
+++ b/airflow/lineage/backend.py
@@ -0,0 +1,47 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""Sends lineage metadata to a backend"""
+from typing import TYPE_CHECKING, Optional
+
+if TYPE_CHECKING:
+    from airflow.models.baseoperator import BaseOperator  # pylint: disable=cyclic-import
+
+
+class LineageBackend:
+    """Sends lineage metadata to a backend"""
+
+    def send_lineage(
+        self,
+        operator: 'BaseOperator',
+        inlets: Optional[list] = None,
+        outlets: Optional[list] = None,
+        context: Optional[dict] = None,
+    ):
+        """
+        Sends lineage metadata to a backend
+
+        :param operator: the operator executing a transformation on the inlets and outlets
+        :type operator: airflow.models.baseoperator.BaseOperator
+        :param inlets: the inlets to this operator
+        :type inlets: list
+        :param outlets: the outlets from this operator
+        :type outlets: list
+        :param context: the current context of the task instance
+        :type context: dict
+        """
+        raise NotImplementedError()
diff --git a/docs/apache-airflow/lineage.rst b/docs/apache-airflow/lineage.rst
index a29f042..362d3e6 100644
--- a/docs/apache-airflow/lineage.rst
+++ b/docs/apache-airflow/lineage.rst
@@ -95,3 +95,24 @@ has outlets defined (e.g. by using ``add_outlets(..)`` or has out of the box sup
     f_in > run_this | (run_this_last > outlets)
 
 .. _precedence: https://docs.python.org/3/reference/expressions.html
+
+
+Lineage Backend
+---------------
+
+It's possible to push the lineage metrics to a custom backend by providing an instance of a LinageBackend in the config:
+
+.. code-block:: ini
+
+  [lineage]
+  backend = my.lineage.CustomBackend
+
+The backend should inherit from ``airflow.lineage.LineageBackend``.
+
+.. code-block:: python
+
+  from airflow.lineage.backend import LineageBackend
+
+  class ExampleBackend(LineageBackend):
+    def send_lineage(self, operator, inlets=None, outlets=None, context=None):
+      # Send the info to some external service
diff --git a/tests/lineage/test_lineage.py b/tests/lineage/test_lineage.py
index 350a8be..b5ebbea 100644
--- a/tests/lineage/test_lineage.py
+++ b/tests/lineage/test_lineage.py
@@ -16,16 +16,24 @@
 # specific language governing permissions and limitations
 # under the License.
 import unittest
+from unittest import mock
 
-from airflow.lineage import AUTO
+from airflow.lineage import AUTO, apply_lineage, get_backend, prepare_lineage
+from airflow.lineage.backend import LineageBackend
 from airflow.lineage.entities import File
 from airflow.models import DAG, TaskInstance as TI
 from airflow.operators.dummy import DummyOperator
 from airflow.utils import timezone
+from tests.test_utils.config import conf_vars
 
 DEFAULT_DATE = timezone.datetime(2016, 1, 1)
 
 
+class CustomLineageBackend(LineageBackend):
+    def send_lineage(self, operator, inlets=None, outlets=None, context=None):
+        pass
+
+
 class TestLineage(unittest.TestCase):
     def test_lineage(self):
         dag = DAG(dag_id='test_prepare_lineage', start_date=DEFAULT_DATE)
@@ -111,3 +119,42 @@ class TestLineage(unittest.TestCase):
         op1.pre_execute(ctx1)
         assert op1.inlets[0].url == f1s.format(DEFAULT_DATE)
         assert op1.outlets[0].url == f1s.format(DEFAULT_DATE)
+
+    @mock.patch("airflow.lineage.get_backend")
+    def test_lineage_is_sent_to_backend(self, mock_get_backend):
+        class TestBackend(LineageBackend):
+            def send_lineage(self, operator, inlets=None, outlets=None, context=None):
+                assert len(inlets) == 1
+                assert len(outlets) == 1
+
+        func = mock.Mock()
+        func.__name__ = 'foo'
+
+        mock_get_backend.return_value = TestBackend()
+
+        dag = DAG(dag_id='test_lineage_is_sent_to_backend', start_date=DEFAULT_DATE)
+
+        with dag:
+            op1 = DummyOperator(task_id='task1')
+
+        file1 = File("/tmp/some_file")
+
+        op1.inlets.append(file1)
+        op1.outlets.append(file1)
+
+        ctx1 = {"ti": TI(task=op1, execution_date=DEFAULT_DATE), "execution_date": DEFAULT_DATE}
+
+        prep = prepare_lineage(func)
+        prep(op1, ctx1)
+        post = apply_lineage(func)
+        post(op1, ctx1)
+
+    def test_empty_lineage_backend(self):
+        backend = get_backend()
+        assert backend is None
+
+    @conf_vars({("lineage", "backend"): "tests.lineage.test_lineage.CustomLineageBackend"})
+    def test_resolve_lineage_class(self):
+        backend = get_backend()
+        assert issubclass(backend.__class__, LineageBackend)
+        assert isinstance(backend, CustomLineageBackend)