You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2021/01/29 19:56:21 UTC

[GitHub] [airflow] jhtimmins commented on a change in pull request #13931: Don't load plugins inside Scheduling loop

jhtimmins commented on a change in pull request #13931:
URL: https://github.com/apache/airflow/pull/13931#discussion_r567060866



##########
File path: airflow/serialization/serialized_objects.py
##########
@@ -407,35 +408,39 @@ def serialize_operator(cls, op: BaseOperator) -> Dict[str, Any]:
     @classmethod
     def deserialize_operator(cls, encoded_op: Dict[str, Any]) -> BaseOperator:
         """Deserializes an operator from a JSON object."""
-        from airflow import plugins_manager
-
-        plugins_manager.initialize_extra_operators_links_plugins()
-
-        if plugins_manager.operator_extra_links is None:
-            raise AirflowException("Can not load plugins")
         op = SerializedBaseOperator(task_id=encoded_op['task_id'])
 
-        # Extra Operator Links defined in Plugins
-        op_extra_links_from_plugin = {}
-
         if "label" not in encoded_op:
             # Handle deserialization of old data before the introduction of TaskGroup
             encoded_op["label"] = encoded_op["task_id"]
 
-        for ope in plugins_manager.operator_extra_links:
-            for operator in ope.operators:
-                if (
-                    operator.__name__ == encoded_op["_task_type"]
-                    and operator.__module__ == encoded_op["_task_module"]
-                ):
-                    op_extra_links_from_plugin.update({ope.name: ope})
-
-        # If OperatorLinks are defined in Plugins but not in the Operator that is being Serialized
-        # set the Operator links attribute
-        # The case for "If OperatorLinks are defined in the operator that is being Serialized"
-        # is handled in the deserialization loop where it matches k == "_operator_extra_links"
-        if op_extra_links_from_plugin and "_operator_extra_links" not in encoded_op:
-            setattr(op, "operator_extra_links", list(op_extra_links_from_plugin.values()))
+        # Extra Operator Links defined in Plugins
+        op_extra_links_from_plugin = {}
+
+        load_op_links = "AIRFLOW_IN_SCHEDULING_LOOP" not in os.environ
+        # We don't want to load Extra Operator links in Scheduler
+        if load_op_links:  # pylint: disable=too-many-nested-blocks
+            from airflow import plugins_manager
+
+            plugins_manager.initialize_extra_operators_links_plugins()
+
+            if plugins_manager.operator_extra_links is None:
+                raise AirflowException("Can not load plugins")
+
+            for ope in plugins_manager.operator_extra_links:

Review comment:
       Some of these variable names are ambiguous. Particularly `ope`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org