You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by as...@apache.org on 2021/04/15 12:07:48 UTC
[airflow] 29/36: Restore base lineage backend (#14146)
This is an automated email from the ASF dual-hosted git repository.
ash pushed a commit to branch v2-0-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
commit ac00eab33446171c2776d2822621fa8845a4bfcd
Author: João Ponte <JP...@users.noreply.github.com>
AuthorDate: Sat Apr 3 10:26:59 2021 +0200
Restore base lineage backend (#14146)
This adds back the base lineage backend which can be extended to send lineage metadata to any custom backend.
closes: #14106
Co-authored-by: Joao Ponte <jp...@plista.com>
Co-authored-by: Tomek Urbaszek <tu...@gmail.com>
(cherry picked from commit af2d11e36ed43b0103a54780640493b8ae46d70e)
---
airflow/lineage/__init__.py | 22 ++++++++++++++++++
airflow/lineage/backend.py | 47 +++++++++++++++++++++++++++++++++++++++
docs/apache-airflow/lineage.rst | 21 ++++++++++++++++++
tests/lineage/test_lineage.py | 49 ++++++++++++++++++++++++++++++++++++++++-
4 files changed, 138 insertions(+), 1 deletion(-)
diff --git a/airflow/lineage/__init__.py b/airflow/lineage/__init__.py
index 65f19ef..905eb00 100644
--- a/airflow/lineage/__init__.py
+++ b/airflow/lineage/__init__.py
@@ -25,6 +25,8 @@ import attr
import jinja2
from cattr import structure, unstructure
+from airflow.configuration import conf
+from airflow.lineage.backend import LineageBackend
from airflow.utils.module_loading import import_string
ENV = jinja2.Environment()
@@ -45,6 +47,22 @@ class Metadata:
data: Dict = attr.ib()
+def get_backend() -> Optional[LineageBackend]:
+ """Gets the lineage backend if defined in the configs"""
+ clazz = conf.getimport("lineage", "backend", fallback=None)
+
+ if clazz:
+ if not issubclass(clazz, LineageBackend):
+ raise TypeError(
+ f"Your custom Lineage class `{clazz.__name__}` "
+ f"is not a subclass of `{LineageBackend.__name__}`."
+ )
+ else:
+ return clazz()
+
+ return None
+
+
def _get_instance(meta: Metadata):
"""Instantiate an object from Metadata"""
cls = import_string(meta.type_name)
@@ -82,6 +100,7 @@ def apply_lineage(func: T) -> T:
Saves the lineage to XCom and if configured to do so sends it
to the backend.
"""
+ _backend = get_backend()
@wraps(func)
def wrapper(self, context, *args, **kwargs):
@@ -101,6 +120,9 @@ def apply_lineage(func: T) -> T:
context, key=PIPELINE_INLETS, value=inlets, execution_date=context['ti'].execution_date
)
+ if _backend:
+ _backend.send_lineage(operator=self, inlets=self.inlets, outlets=self.outlets, context=context)
+
return ret_val
return cast(T, wrapper)
diff --git a/airflow/lineage/backend.py b/airflow/lineage/backend.py
new file mode 100644
index 0000000..edfbe0e
--- /dev/null
+++ b/airflow/lineage/backend.py
@@ -0,0 +1,47 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""Sends lineage metadata to a backend"""
+from typing import TYPE_CHECKING, Optional
+
+if TYPE_CHECKING:
+ from airflow.models.baseoperator import BaseOperator # pylint: disable=cyclic-import
+
+
+class LineageBackend:
+ """Sends lineage metadata to a backend"""
+
+ def send_lineage(
+ self,
+ operator: 'BaseOperator',
+ inlets: Optional[list] = None,
+ outlets: Optional[list] = None,
+ context: Optional[dict] = None,
+ ):
+ """
+ Sends lineage metadata to a backend
+
+ :param operator: the operator executing a transformation on the inlets and outlets
+ :type operator: airflow.models.baseoperator.BaseOperator
+ :param inlets: the inlets to this operator
+ :type inlets: list
+ :param outlets: the outlets from this operator
+ :type outlets: list
+ :param context: the current context of the task instance
+ :type context: dict
+ """
+ raise NotImplementedError()
diff --git a/docs/apache-airflow/lineage.rst b/docs/apache-airflow/lineage.rst
index a29f042..362d3e6 100644
--- a/docs/apache-airflow/lineage.rst
+++ b/docs/apache-airflow/lineage.rst
@@ -95,3 +95,24 @@ has outlets defined (e.g. by using ``add_outlets(..)`` or has out of the box sup
f_in > run_this | (run_this_last > outlets)
.. _precedence: https://docs.python.org/3/reference/expressions.html
+
+
+Lineage Backend
+---------------
+
+It's possible to push the lineage metrics to a custom backend by providing an instance of a LinageBackend in the config:
+
+.. code-block:: ini
+
+ [lineage]
+ backend = my.lineage.CustomBackend
+
+The backend should inherit from ``airflow.lineage.LineageBackend``.
+
+.. code-block:: python
+
+ from airflow.lineage.backend import LineageBackend
+
+ class ExampleBackend(LineageBackend):
+ def send_lineage(self, operator, inlets=None, outlets=None, context=None):
+ # Send the info to some external service
diff --git a/tests/lineage/test_lineage.py b/tests/lineage/test_lineage.py
index 350a8be..b5ebbea 100644
--- a/tests/lineage/test_lineage.py
+++ b/tests/lineage/test_lineage.py
@@ -16,16 +16,24 @@
# specific language governing permissions and limitations
# under the License.
import unittest
+from unittest import mock
-from airflow.lineage import AUTO
+from airflow.lineage import AUTO, apply_lineage, get_backend, prepare_lineage
+from airflow.lineage.backend import LineageBackend
from airflow.lineage.entities import File
from airflow.models import DAG, TaskInstance as TI
from airflow.operators.dummy import DummyOperator
from airflow.utils import timezone
+from tests.test_utils.config import conf_vars
DEFAULT_DATE = timezone.datetime(2016, 1, 1)
+class CustomLineageBackend(LineageBackend):
+ def send_lineage(self, operator, inlets=None, outlets=None, context=None):
+ pass
+
+
class TestLineage(unittest.TestCase):
def test_lineage(self):
dag = DAG(dag_id='test_prepare_lineage', start_date=DEFAULT_DATE)
@@ -111,3 +119,42 @@ class TestLineage(unittest.TestCase):
op1.pre_execute(ctx1)
assert op1.inlets[0].url == f1s.format(DEFAULT_DATE)
assert op1.outlets[0].url == f1s.format(DEFAULT_DATE)
+
+ @mock.patch("airflow.lineage.get_backend")
+ def test_lineage_is_sent_to_backend(self, mock_get_backend):
+ class TestBackend(LineageBackend):
+ def send_lineage(self, operator, inlets=None, outlets=None, context=None):
+ assert len(inlets) == 1
+ assert len(outlets) == 1
+
+ func = mock.Mock()
+ func.__name__ = 'foo'
+
+ mock_get_backend.return_value = TestBackend()
+
+ dag = DAG(dag_id='test_lineage_is_sent_to_backend', start_date=DEFAULT_DATE)
+
+ with dag:
+ op1 = DummyOperator(task_id='task1')
+
+ file1 = File("/tmp/some_file")
+
+ op1.inlets.append(file1)
+ op1.outlets.append(file1)
+
+ ctx1 = {"ti": TI(task=op1, execution_date=DEFAULT_DATE), "execution_date": DEFAULT_DATE}
+
+ prep = prepare_lineage(func)
+ prep(op1, ctx1)
+ post = apply_lineage(func)
+ post(op1, ctx1)
+
+ def test_empty_lineage_backend(self):
+ backend = get_backend()
+ assert backend is None
+
+ @conf_vars({("lineage", "backend"): "tests.lineage.test_lineage.CustomLineageBackend"})
+ def test_resolve_lineage_class(self):
+ backend = get_backend()
+ assert issubclass(backend.__class__, LineageBackend)
+ assert isinstance(backend, CustomLineageBackend)