You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ka...@apache.org on 2020/10/30 10:41:49 UTC

[airflow] branch master updated: Log instead of raise an Error for unregistered OperatorLinks (#11959)

This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/master by this push:
     new 44f6e6f  Log instead of raise an Error for unregistered OperatorLinks (#11959)
44f6e6f is described below

commit 44f6e6fca59596a5cdf27ca0910e86a9d8150a63
Author: Kaxil Naik <ka...@gmail.com>
AuthorDate: Fri Oct 30 10:39:31 2020 +0000

    Log instead of raise an Error for unregistered OperatorLinks (#11959)
    
    Currently, if someone uses OperatorLinks that are not registered,
    it will break the UI when someone clicks on that DAG.
    
    This commit will instead log an error in the Webserver logs so that
    someone can still see the DAG in different Views (graph, tree, etc).
---
 airflow/serialization/serialized_objects.py   |  3 ++-
 tests/serialization/test_dag_serialization.py | 36 ++++++++++++++++++++++++++-
 2 files changed, 37 insertions(+), 2 deletions(-)

diff --git a/airflow/serialization/serialized_objects.py b/airflow/serialization/serialized_objects.py
index 0a54a8d..2ade55f 100644
--- a/airflow/serialization/serialized_objects.py
+++ b/airflow/serialization/serialized_objects.py
@@ -499,7 +499,8 @@ class SerializedBaseOperator(BaseOperator, BaseSerialization):
                     _operator_link_class_path
                 ]
             else:
-                raise KeyError("Operator Link class %r not registered" % _operator_link_class_path)
+                log.error("Operator Link class %r not registered", _operator_link_class_path)
+                return {}
 
             op_predefined_extra_link: BaseOperatorLink = cattr.structure(
                 data, single_op_link_class)
diff --git a/tests/serialization/test_dag_serialization.py b/tests/serialization/test_dag_serialization.py
index 39dd3ba..08a8aaa 100644
--- a/tests/serialization/test_dag_serialization.py
+++ b/tests/serialization/test_dag_serialization.py
@@ -35,7 +35,7 @@ from parameterized import parameterized
 from airflow.hooks.base_hook import BaseHook
 from airflow.kubernetes.pod_generator import PodGenerator
 from airflow.models import DAG, Connection, DagBag, TaskInstance
-from airflow.models.baseoperator import BaseOperator
+from airflow.models.baseoperator import BaseOperator, BaseOperatorLink
 from airflow.operators.bash import BashOperator
 from airflow.security import permissions
 from airflow.serialization.json_schema import load_dag_schema_dict
@@ -606,6 +606,40 @@ class TestStringifiedDAGs(unittest.TestCase):
         google_link_from_plugin = simple_task.get_extra_links(test_date, GoogleLink.name)
         self.assertEqual("https://www.google.com", google_link_from_plugin)
 
+    def test_extra_operator_links_logs_error_for_non_registered_extra_links(self):
+        """
+        Assert OperatorLinks not registered via Plugins and if it is not an inbuilt Operator Link,
+        it can still deserialize the DAG (does not error) but just logs an error
+        """
+
+        class TaskStateLink(BaseOperatorLink):
+            """OperatorLink not registered via Plugins nor a built-in OperatorLink"""
+            name = 'My Link'
+
+            def get_link(self, operator, dttm):
+                return 'https://www.google.com'
+
+        class MyOperator(BaseOperator):
+            """Just a DummyOperator using above defined Extra Operator Link"""
+            operator_extra_links = [TaskStateLink()]
+
+            def execute(self, context):
+                pass
+
+        with DAG(dag_id='simple_dag', start_date=datetime(2019, 8, 1)) as dag:
+            MyOperator(task_id='blah')
+
+        serialized_dag = SerializedDAG.to_dict(dag)
+
+        with self.assertLogs("airflow.serialization.serialized_objects", level="ERROR") as log_output:
+            SerializedDAG.from_dict(serialized_dag)
+            received_logs = log_output.output[0]
+            expected_err_msg = (
+                "Operator Link class 'tests.serialization.test_dag_serialization.TaskStateLink' "
+                "not registered"
+            )
+            assert expected_err_msg in received_logs
+
     def test_extra_serialized_field_and_multiple_operator_links(self):
         """
         Assert extra field exists & OperatorLinks defined in Plugins and inbuilt Operator Links.