You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ka...@apache.org on 2020/11/18 23:51:13 UTC

[airflow] 01/07: Log instead of raise an Error for unregistered OperatorLinks (#11959)

This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a commit to branch v1-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 5b65c1737c2d4f62107809eb41bae19c3f62a320
Author: Kaxil Naik <ka...@gmail.com>
AuthorDate: Fri Oct 30 10:39:31 2020 +0000

    Log instead of raise an Error for unregistered OperatorLinks (#11959)
    
    Currently, if someone uses OperatorLinks that are not registered,
    it will break the UI when someone clicks on that DAG.
    
    This commit will instead log an error in the Webserver logs so that
    someone can still see the DAG in different Views (graph, tree, etc).
    
    (cherry picked from commit 44f6e6fca59596a5cdf27ca0910e86a9d8150a63)
---
 airflow/serialization/serialized_objects.py   |  3 ++-
 tests/serialization/test_dag_serialization.py | 37 ++++++++++++++++++++++++++-
 2 files changed, 38 insertions(+), 2 deletions(-)

diff --git a/airflow/serialization/serialized_objects.py b/airflow/serialization/serialized_objects.py
index d959e92..c527ddf 100644
--- a/airflow/serialization/serialized_objects.py
+++ b/airflow/serialization/serialized_objects.py
@@ -477,7 +477,8 @@ class SerializedBaseOperator(BaseOperator, BaseSerialization):
             elif _operator_link_class_path in registered_operator_link_classes:
                 single_op_link_class = registered_operator_link_classes[_operator_link_class_path]
             else:
-                raise KeyError("Operator Link class %r not registered" % _operator_link_class_path)
+                log.error("Operator Link class %r not registered", _operator_link_class_path)
+                return {}
 
             op_predefined_extra_link = cattr.structure(
                 data, single_op_link_class)    # type: BaseOperatorLink
diff --git a/tests/serialization/test_dag_serialization.py b/tests/serialization/test_dag_serialization.py
index 6b714a8..d999cb0 100644
--- a/tests/serialization/test_dag_serialization.py
+++ b/tests/serialization/test_dag_serialization.py
@@ -33,7 +33,7 @@ from dateutil.relativedelta import relativedelta, FR
 
 from airflow.hooks.base_hook import BaseHook
 from airflow.models import DAG, Connection, DagBag, TaskInstance
-from airflow.models.baseoperator import BaseOperator
+from airflow.models.baseoperator import BaseOperator, BaseOperatorLink
 from airflow.operators.bash_operator import BashOperator
 from airflow.serialization.json_schema import load_dag_schema_dict
 from airflow.serialization.serialized_objects import SerializedBaseOperator, SerializedDAG
@@ -560,6 +560,41 @@ class TestStringifiedDAGs(unittest.TestCase):
         google_link_from_plugin = simple_task.get_extra_links(test_date, GoogleLink.name)
         self.assertEqual("https://www.google.com", google_link_from_plugin)
 
+    @unittest.skipIf(six.PY2, 'self.assertLogs not available for Python 2')
+    def test_extra_operator_links_logs_error_for_non_registered_extra_links(self):
+        """
+        Assert OperatorLinks not registered via Plugins and if it is not an inbuilt Operator Link,
+        it can still deserialize the DAG (does not error) but just logs an error
+        """
+
+        class TaskStateLink(BaseOperatorLink):
+            """OperatorLink not registered via Plugins nor a built-in OperatorLink"""
+            name = 'My Link'
+
+            def get_link(self, operator, dttm):
+                return 'https://www.google.com'
+
+        class MyOperator(BaseOperator):
+            """Just a DummyOperator using above defined Extra Operator Link"""
+            operator_extra_links = [TaskStateLink()]
+
+            def execute(self, context):
+                pass
+
+        with DAG(dag_id='simple_dag', start_date=datetime(2019, 8, 1)) as dag:
+            MyOperator(task_id='blah')
+
+        serialized_dag = SerializedDAG.to_dict(dag)
+
+        with self.assertLogs("airflow.serialization.serialized_objects", level="ERROR") as log_output:
+            SerializedDAG.from_dict(serialized_dag)
+            received_logs = log_output.output[0]
+            expected_err_msg = (
+                "Operator Link class 'tests.serialization.test_dag_serialization.TaskStateLink' "
+                "not registered"
+            )
+            assert expected_err_msg in received_logs
+
     def test_extra_serialized_field_and_multiple_operator_links(self):
         """
         Assert extra field exists & OperatorLinks defined in Plugins and inbuilt Operator Links.