You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ka...@apache.org on 2020/08/13 18:10:25 UTC

[airflow] branch v1-10-test updated: [AIRFLOW-6706] Lazy load operator extra links (#7327) (#10318)

This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a commit to branch v1-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/v1-10-test by this push:
     new ccbe56a  [AIRFLOW-6706] Lazy load operator extra links (#7327) (#10318)
ccbe56a is described below

commit ccbe56a7c0d06f58798464d68d4a4e11dcf2fb5d
Author: Tomek Urbaszek <tu...@gmail.com>
AuthorDate: Thu Aug 13 20:09:44 2020 +0200

    [AIRFLOW-6706] Lazy load operator extra links (#7327) (#10318)
    
    Co-authored-by: Kamil BreguĊ‚a <mi...@users.noreply.github.com>
    Backported from https://github.com/apache/airflow/pull/7327
    cherry-picked from b180e4b
---
 airflow/operators/__init__.py               |  4 +---
 airflow/plugins_manager.py                  | 29 +----------------------------
 airflow/serialization/serialized_objects.py | 27 ++++++++++++++++++++-------
 3 files changed, 22 insertions(+), 38 deletions(-)

diff --git a/airflow/operators/__init__.py b/airflow/operators/__init__.py
index fb5383f..00f34d0 100644
--- a/airflow/operators/__init__.py
+++ b/airflow/operators/__init__.py
@@ -101,7 +101,7 @@ if not os.environ.get('AIRFLOW_USE_NEW_IMPORTS', False):
 
 def _integrate_plugins():
     """Integrate plugins to the context"""
-    from airflow.plugins_manager import operators_modules, register_inbuilt_operator_links
+    from airflow.plugins_manager import operators_modules
     for operators_module in operators_modules:
         sys.modules[operators_module.__name__] = operators_module
         globals()[operators_module._name] = operators_module
@@ -121,5 +121,3 @@ def _integrate_plugins():
                     "import from 'airflow.operators.[plugin_module]' "
                     "instead. Support for direct imports will be dropped "
                     "entirely in Airflow 2.0.".format(i=operator_name))
-
-    register_inbuilt_operator_links()
diff --git a/airflow/plugins_manager.py b/airflow/plugins_manager.py
index b70517b..5fd680e 100644
--- a/airflow/plugins_manager.py
+++ b/airflow/plugins_manager.py
@@ -28,7 +28,7 @@ import inspect
 import logging
 import os
 import re
-from typing import Any, Dict, List, Set, Type
+from typing import Any, Dict, List, Type
 
 import pkg_resources
 
@@ -114,33 +114,6 @@ def load_entrypoint_plugins(entry_points, airflow_plugins):
     return airflow_plugins
 
 
-def register_inbuilt_operator_links():
-    """
-    Register all the Operators Links that are already defined for the operators
-    in the "airflow" project. Example: QDSLink (Operator Link for Qubole Operator)
-
-    This is required to populate the "allowed list" of allowed classes when deserializing operator links
-    """
-    inbuilt_operator_links = set()  # type: Set[Type]
-
-    try:
-        from airflow.contrib.operators.bigquery_operator import BigQueryConsoleLink, BigQueryConsoleIndexableLink  # noqa E501 # pylint: disable=R0401,line-too-long
-        inbuilt_operator_links.update([BigQueryConsoleLink, BigQueryConsoleIndexableLink])
-    except ImportError:
-        pass
-
-    try:
-        from airflow.contrib.operators.qubole_operator import QDSLink   # pylint: disable=R0401
-        inbuilt_operator_links.update([QDSLink])
-    except ImportError:
-        pass
-
-    registered_operator_link_classes.update({
-        "{}.{}".format(link.__module__, link.__name__): link
-        for link in inbuilt_operator_links
-    })
-
-
 def is_valid_plugin(plugin_obj, existing_plugins):
     """
     Check whether a potential object is a subclass of
diff --git a/airflow/serialization/serialized_objects.py b/airflow/serialization/serialized_objects.py
index 34372db..d959e92 100644
--- a/airflow/serialization/serialized_objects.py
+++ b/airflow/serialization/serialized_objects.py
@@ -23,7 +23,7 @@ import datetime
 import enum
 import logging
 import six
-from typing import TYPE_CHECKING, Optional, Union, Dict
+from typing import TYPE_CHECKING, Optional, Union, Dict, List
 
 import cattr
 import pendulum
@@ -36,6 +36,7 @@ from airflow.serialization.enums import DagAttributeTypes as DAT, Encoding
 from airflow.serialization.helpers import serialize_template_field
 from airflow.serialization.json_schema import Validator, load_dag_schema
 from airflow.settings import json
+from airflow.utils.module_loading import import_string
 from airflow.www.utils import get_python_source
 
 try:
@@ -49,6 +50,17 @@ if TYPE_CHECKING:
 log = logging.getLogger(__name__)
 
 
+BUILTIN_OPERATOR_EXTRA_LINKS = [
+    "airflow.contrib.operators.bigquery_operator.BigQueryConsoleLink",
+    "airflow.contrib.operators.bigquery_operator.BigQueryConsoleIndexableLink",
+    "airflow.contrib.operators.qubole_operator.QDSLink",
+    # providers new paths
+    "airflow.providers.google.cloud.operators.bigquery.BigQueryConsoleLink",
+    "airflow.providers.google.cloud.operators.bigquery.BigQueryConsoleIndexableLink",
+    "airflow.providers.qubole.operators.qubole.QDSLink"
+]  # type: List[str]
+
+
 class BaseSerialization:
     """BaseSerialization provides utils for serialization."""
 
@@ -459,15 +471,16 @@ class SerializedBaseOperator(BaseOperator, BaseSerialization):
             #   list(_operator_links_source.items())[0] =
             #   ('airflow.gcp.operators.bigquery.BigQueryConsoleIndexableLink', {'index': 0})
 
-            _operator_link_class, data = list(_operator_links_source.items())[0]
-
-            if _operator_link_class in registered_operator_link_classes:
-                single_op_link_class_name = registered_operator_link_classes[_operator_link_class]
+            _operator_link_class_path, data = list(_operator_links_source.items())[0]
+            if _operator_link_class_path in BUILTIN_OPERATOR_EXTRA_LINKS:
+                single_op_link_class = import_string(_operator_link_class_path)
+            elif _operator_link_class_path in registered_operator_link_classes:
+                single_op_link_class = registered_operator_link_classes[_operator_link_class_path]
             else:
-                raise KeyError("Operator Link class %r not registered" % _operator_link_class)
+                raise KeyError("Operator Link class %r not registered" % _operator_link_class_path)
 
             op_predefined_extra_link = cattr.structure(
-                data, single_op_link_class_name)    # type: BaseOperatorLink
+                data, single_op_link_class)    # type: BaseOperatorLink
 
             op_predefined_extra_links.update(
                 {op_predefined_extra_link.name: op_predefined_extra_link}