You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ka...@apache.org on 2020/08/15 03:32:47 UTC
[airflow] 12/47: [AIRFLOW-6706] Lazy load operator extra links
(#7327) (#10318)
This is an automated email from the ASF dual-hosted git repository.
kaxilnaik pushed a commit to branch v1-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
commit ea32d0d83ce915798ba9779dbf7c1df9faf7c241
Author: Tomek Urbaszek <tu...@gmail.com>
AuthorDate: Thu Aug 13 20:09:44 2020 +0200
[AIRFLOW-6706] Lazy load operator extra links (#7327) (#10318)
Co-authored-by: Kamil BreguĊa <mi...@users.noreply.github.com>
Backported from https://github.com/apache/airflow/pull/7327
cherry-picked from b180e4b
---
airflow/operators/__init__.py | 4 +---
airflow/plugins_manager.py | 29 +----------------------------
airflow/serialization/serialized_objects.py | 27 ++++++++++++++++++++-------
3 files changed, 22 insertions(+), 38 deletions(-)
diff --git a/airflow/operators/__init__.py b/airflow/operators/__init__.py
index fb5383f..00f34d0 100644
--- a/airflow/operators/__init__.py
+++ b/airflow/operators/__init__.py
@@ -101,7 +101,7 @@ if not os.environ.get('AIRFLOW_USE_NEW_IMPORTS', False):
def _integrate_plugins():
"""Integrate plugins to the context"""
- from airflow.plugins_manager import operators_modules, register_inbuilt_operator_links
+ from airflow.plugins_manager import operators_modules
for operators_module in operators_modules:
sys.modules[operators_module.__name__] = operators_module
globals()[operators_module._name] = operators_module
@@ -121,5 +121,3 @@ def _integrate_plugins():
"import from 'airflow.operators.[plugin_module]' "
"instead. Support for direct imports will be dropped "
"entirely in Airflow 2.0.".format(i=operator_name))
-
- register_inbuilt_operator_links()
diff --git a/airflow/plugins_manager.py b/airflow/plugins_manager.py
index b70517b..5fd680e 100644
--- a/airflow/plugins_manager.py
+++ b/airflow/plugins_manager.py
@@ -28,7 +28,7 @@ import inspect
import logging
import os
import re
-from typing import Any, Dict, List, Set, Type
+from typing import Any, Dict, List, Type
import pkg_resources
@@ -114,33 +114,6 @@ def load_entrypoint_plugins(entry_points, airflow_plugins):
return airflow_plugins
-def register_inbuilt_operator_links():
- """
- Register all the Operators Links that are already defined for the operators
- in the "airflow" project. Example: QDSLink (Operator Link for Qubole Operator)
-
- This is required to populate the "allowed list" of allowed classes when deserializing operator links
- """
- inbuilt_operator_links = set() # type: Set[Type]
-
- try:
- from airflow.contrib.operators.bigquery_operator import BigQueryConsoleLink, BigQueryConsoleIndexableLink # noqa E501 # pylint: disable=R0401,line-too-long
- inbuilt_operator_links.update([BigQueryConsoleLink, BigQueryConsoleIndexableLink])
- except ImportError:
- pass
-
- try:
- from airflow.contrib.operators.qubole_operator import QDSLink # pylint: disable=R0401
- inbuilt_operator_links.update([QDSLink])
- except ImportError:
- pass
-
- registered_operator_link_classes.update({
- "{}.{}".format(link.__module__, link.__name__): link
- for link in inbuilt_operator_links
- })
-
-
def is_valid_plugin(plugin_obj, existing_plugins):
"""
Check whether a potential object is a subclass of
diff --git a/airflow/serialization/serialized_objects.py b/airflow/serialization/serialized_objects.py
index 34372db..d959e92 100644
--- a/airflow/serialization/serialized_objects.py
+++ b/airflow/serialization/serialized_objects.py
@@ -23,7 +23,7 @@ import datetime
import enum
import logging
import six
-from typing import TYPE_CHECKING, Optional, Union, Dict
+from typing import TYPE_CHECKING, Optional, Union, Dict, List
import cattr
import pendulum
@@ -36,6 +36,7 @@ from airflow.serialization.enums import DagAttributeTypes as DAT, Encoding
from airflow.serialization.helpers import serialize_template_field
from airflow.serialization.json_schema import Validator, load_dag_schema
from airflow.settings import json
+from airflow.utils.module_loading import import_string
from airflow.www.utils import get_python_source
try:
@@ -49,6 +50,17 @@ if TYPE_CHECKING:
log = logging.getLogger(__name__)
+BUILTIN_OPERATOR_EXTRA_LINKS = [
+ "airflow.contrib.operators.bigquery_operator.BigQueryConsoleLink",
+ "airflow.contrib.operators.bigquery_operator.BigQueryConsoleIndexableLink",
+ "airflow.contrib.operators.qubole_operator.QDSLink",
+ # providers new paths
+ "airflow.providers.google.cloud.operators.bigquery.BigQueryConsoleLink",
+ "airflow.providers.google.cloud.operators.bigquery.BigQueryConsoleIndexableLink",
+ "airflow.providers.qubole.operators.qubole.QDSLink"
+] # type: List[str]
+
+
class BaseSerialization:
"""BaseSerialization provides utils for serialization."""
@@ -459,15 +471,16 @@ class SerializedBaseOperator(BaseOperator, BaseSerialization):
# list(_operator_links_source.items())[0] =
# ('airflow.gcp.operators.bigquery.BigQueryConsoleIndexableLink', {'index': 0})
- _operator_link_class, data = list(_operator_links_source.items())[0]
-
- if _operator_link_class in registered_operator_link_classes:
- single_op_link_class_name = registered_operator_link_classes[_operator_link_class]
+ _operator_link_class_path, data = list(_operator_links_source.items())[0]
+ if _operator_link_class_path in BUILTIN_OPERATOR_EXTRA_LINKS:
+ single_op_link_class = import_string(_operator_link_class_path)
+ elif _operator_link_class_path in registered_operator_link_classes:
+ single_op_link_class = registered_operator_link_classes[_operator_link_class_path]
else:
- raise KeyError("Operator Link class %r not registered" % _operator_link_class)
+ raise KeyError("Operator Link class %r not registered" % _operator_link_class_path)
op_predefined_extra_link = cattr.structure(
- data, single_op_link_class_name) # type: BaseOperatorLink
+ data, single_op_link_class) # type: BaseOperatorLink
op_predefined_extra_links.update(
{op_predefined_extra_link.name: op_predefined_extra_link}