You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by mo...@apache.org on 2023/06/28 09:20:32 UTC

[airflow] branch main updated: openlineage, docs: tips for OpenLineage method implementation (#31817)

This is an automated email from the ASF dual-hosted git repository.

mobuchowski pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 3d8e214e90 openlineage, docs: tips for OpenLineage method implementation (#31817)
3d8e214e90 is described below

commit 3d8e214e9027221c1f1e9bffdd2756860e60bcfd
Author: Maciej Obuchowski <ob...@gmail.com>
AuthorDate: Wed Jun 28 11:20:22 2023 +0200

    openlineage, docs: tips for OpenLineage method implementation (#31817)
    
    Signed-off-by: Maciej Obuchowski <ob...@gmail.com>
---
 .../guides/developer.rst                           | 81 ++++++++++++++++++++--
 1 file changed, 77 insertions(+), 4 deletions(-)

diff --git a/docs/apache-airflow-providers-openlineage/guides/developer.rst b/docs/apache-airflow-providers-openlineage/guides/developer.rst
index 0e1d959c1e..364f004830 100644
--- a/docs/apache-airflow-providers-openlineage/guides/developer.rst
+++ b/docs/apache-airflow-providers-openlineage/guides/developer.rst
@@ -20,7 +20,7 @@
 Implementing OpenLineage in Operators
 -------------------------------------
 
-OpenLineage defines few methods for implementation for Operators.
+OpenLineage defines a few methods for implementation in Operators. Those are referred to as OpenLineage methods.
 
 .. code-block:: python
 
@@ -35,8 +35,7 @@ OpenLineage defines few methods for implementation for Operators.
   def get_openlineage_facets_on_failure(ti: TaskInstance) -> OperatorLineage:
       ...
 
-Those get called respectively when task instance changes state to RUNNING, SUCCESS and FAILED.
-It's required to implement ``on_start`` method.
+OpenLineage methods get called respectively when task instance changes state to RUNNING, SUCCESS and FAILED.
 If there's no ``on_complete`` or ``on_failure`` method, the ``on_start`` gets called instead.
 
 Instead of returning complete OpenLineage event, the provider defines ``OperatorLineage`` structure to be returned by Operators:
@@ -50,4 +49,78 @@ Instead of returning complete OpenLineage event, the provider defines ``Operator
       run_facets: dict[str, BaseFacet] = Factory(dict)
       job_facets: dict[str, BaseFacet] = Factory(dict)
 
-OpenLineage integration takes care to enrich it with things like general Airflow facets, proper event time and type.
+OpenLineage integration itself takes care to enrich it with things like general Airflow facets, proper event time and type, creating proper OpenLineage RunEvent.
+
+How to properly implement OpenLineage methods?
+==============================================
+
+There are a couple of things worth noting when implementing OpenLineage in operators.
+
+First, do not import OpenLineage methods on top-level, but in OL method itself.
+This allows users to use your provider even if they do not have OpenLineage provider installed.
+
+Second important point is to make sure your provider returns OpenLineage-compliant dataset names.
+It allows OpenLineage consumers to properly match information about datasets gathered from different sources.
+The naming convention is described in the `OpenLineage docs <https://openlineage.io/docs/spec/naming>`__.
+
+Third, OpenLineage implementation should not waste time of users that do not use it.
+This means not doing heavy processing or network calls in the ``execute`` method that aren't used by it.
+Better option is to save relevant information in Operator attributes - and then use it
+in OpenLineage method.
+Good example is ``BigQueryExecuteQueryOperator``. It saves ``job_ids`` of queries that were executed.
+``get_openlineage_facets_on_complete`` then can call BigQuery API, asking for lineage of those tables, and transform it to OpenLineage format.
+
+Fourth, it's not necessary to implement all the methods. If all the datasets are known before ``execute`` is
+called, and there's no relevant runtime data, there might be no point to implementing ``get_openlineage_facets_on_complete``
+- the ``get_openlineage_facets_on_start`` method can provide all the data. And in reverse, if everything is unknown
+before execute, there might be no point in writing ``_on_start`` method.
+Similarly, if there's no relevant failure data - or the failure conditions are unknown,
+implementing ``get_openlineage_facets_on_failure`` is probably not worth it.
+
+Here's example of properly implemented ``get_openlineage_facets_on_complete`` method, for ``GcsToGcsOperator``.
+
+.. code-block::
+
+    def get_openlineage_events_on_complete(self, task_instance):
+        """
+        Implementing _on_complete because execute method does preprocessing on internals.
+        This means we won't have to normalize self.source_object and self.source_objects,
+        destination bucket and so on.
+        """
+        from openlineage.client.run import Dataset
+        from airflow.providers.openlineage.extractors import OperatorLineage
+
+        return OperatorLineage(
+            inputs=[
+                Dataset(namespace=f"gs://{self.source_bucket}", name=source)
+                for source in sorted(self.resolved_source_objects)
+            ],
+            outputs=[
+                Dataset(namespace=f"gs://{self.destination_bucket}", name=target)
+                for target in sorted(self.resolved_target_objects)
+            ],
+        )
+
+
+How to add tests to OpenLineage integration?
+============================================
+
+Unit testing OpenLineage integration in operators is very similar to testing operators itself.
+Objective of those tests is making sure the ``get_openlineage_*`` methods return proper ``OperatorLineage``
+data structure with relevant fields filled. It's recommended to mock any external calls.
+Authors of tests need to remember the condition of calling different OL methods is different.
+``get_openlineage_facets_on_start`` is called before ``execute``, and as such, must not depend on values
+that are set there.
+
+System testing OpenLineage integration relies on the existing system test framework.
+There is special ``VariableTransport`` that gathers OpenLineage events in Airflow database,
+and ``OpenLineageTestOperator`` that compares those events to expected ones. Objective of author
+of OpenLineage system test is to provide expected dictionary of event keys and events to ``OpenLineageTestOperator``.
+
+Event keys identify event send from particular operator and method: they have structure ``<dag_id>.<task_id>.event.<event_type>``;
+it's always possible to identify particular event send from particular task this way.
+
+The provided event structure does not have to contain all the fields that are in the resulting event.
+Only the fields provided by test author are compared; this allows to check only for fields particular
+test cares about. It also allows to skip fields that are (semi) randomly generated, like ``runId`` or ``eventTime``,
+or just always the same in context of OpenLineage in Airflow, like ``producer``.