You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2021/01/27 19:39:56 UTC

[GitHub] [airflow] kaxil opened a new pull request #13931: Don't load plugins inside Scheduling loop

kaxil opened a new pull request #13931:
URL: https://github.com/apache/airflow/pull/13931


   closes https://github.com/apache/airflow/issues/13099
   
   Mutually exclusive with 
   
   <!--
   Thank you for contributing! Please make sure that your code changes
   are covered with tests. And in case of new features or big changes
   remember to adjust the documentation.
   
   Feel free to ping committers for the review!
   
   In case of existing issue, reference it using one of the following:
   
   closes: #ISSUE
   related: #ISSUE
   
   How to write a good git commit message:
   http://chris.beams.io/posts/git-commit/
   -->
   
   ---
   **^ Add meaningful description above**
   
   Read the **[Pull Request Guidelines](https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#pull-request-guidelines)** for more information.
   In case of fundamental code change, Airflow Improvement Proposal ([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvements+Proposals)) is needed.
   In case of a new dependency, check compliance with the [ASF 3rd Party License Policy](https://www.apache.org/legal/resolved.html#category-x).
   In case of backwards incompatible changes please leave a note in [UPDATING.md](https://github.com/apache/airflow/blob/master/UPDATING.md).
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on a change in pull request #13931: Don't load plugins inside Scheduling loop

Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #13931:
URL: https://github.com/apache/airflow/pull/13931#discussion_r565589430



##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -1466,6 +1466,8 @@ def _do_scheduling(self, session) -> int:
         :return: Number of TIs enqueued in this iteration
         :rtype: int
         """
+        os.environ["AIRFLOW_IN_SCHEDULING_LOOP"] = 'True'
+

Review comment:
       Once we decided on which PR is better, I will add code comments + tests




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on pull request #13931: Don't load plugins inside Scheduling loop

Posted by GitBox <gi...@apache.org>.
kaxil commented on pull request #13931:
URL: https://github.com/apache/airflow/pull/13931#issuecomment-769155989


   @ephraimbuddy @jhtimmins Can you take a look and review too please


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] commented on pull request #13931: Don't load plugins inside Scheduling loop

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #13931:
URL: https://github.com/apache/airflow/pull/13931#issuecomment-768601858


   [The Workflow run](https://github.com/apache/airflow/actions/runs/516247430) is cancelling this PR. It has some failed jobs matching ^Pylint$,^Static checks,^Build docs$,^Spell check docs$,^Backport packages$,^Provider packages,^Checks: Helm tests$,^Test OpenAPI*.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on a change in pull request #13931: Don't load plugins inside Scheduling loop

Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #13931:
URL: https://github.com/apache/airflow/pull/13931#discussion_r567076230



##########
File path: airflow/serialization/serialized_objects.py
##########
@@ -407,35 +408,39 @@ def serialize_operator(cls, op: BaseOperator) -> Dict[str, Any]:
     @classmethod
     def deserialize_operator(cls, encoded_op: Dict[str, Any]) -> BaseOperator:
         """Deserializes an operator from a JSON object."""
-        from airflow import plugins_manager
-
-        plugins_manager.initialize_extra_operators_links_plugins()
-
-        if plugins_manager.operator_extra_links is None:
-            raise AirflowException("Can not load plugins")
         op = SerializedBaseOperator(task_id=encoded_op['task_id'])
 
-        # Extra Operator Links defined in Plugins
-        op_extra_links_from_plugin = {}
-
         if "label" not in encoded_op:
             # Handle deserialization of old data before the introduction of TaskGroup
             encoded_op["label"] = encoded_op["task_id"]
 
-        for ope in plugins_manager.operator_extra_links:
-            for operator in ope.operators:
-                if (
-                    operator.__name__ == encoded_op["_task_type"]
-                    and operator.__module__ == encoded_op["_task_module"]
-                ):
-                    op_extra_links_from_plugin.update({ope.name: ope})
-
-        # If OperatorLinks are defined in Plugins but not in the Operator that is being Serialized
-        # set the Operator links attribute
-        # The case for "If OperatorLinks are defined in the operator that is being Serialized"
-        # is handled in the deserialization loop where it matches k == "_operator_extra_links"
-        if op_extra_links_from_plugin and "_operator_extra_links" not in encoded_op:
-            setattr(op, "operator_extra_links", list(op_extra_links_from_plugin.values()))
+        # Extra Operator Links defined in Plugins
+        op_extra_links_from_plugin = {}
+
+        load_op_links = "AIRFLOW_IN_SCHEDULING_LOOP" not in os.environ
+        # We don't want to load Extra Operator links in Scheduler
+        if load_op_links:  # pylint: disable=too-many-nested-blocks
+            from airflow import plugins_manager
+
+            plugins_manager.initialize_extra_operators_links_plugins()
+
+            if plugins_manager.operator_extra_links is None:
+                raise AirflowException("Can not load plugins")
+
+            for ope in plugins_manager.operator_extra_links:

Review comment:
       They indeed are but this just indents the code and does not change it, use https://github.com/apache/airflow/pull/13931/files?diff=split&w=1 to hide whitespace changes




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil closed pull request #13931: Don't load plugins inside Scheduling loop

Posted by GitBox <gi...@apache.org>.
kaxil closed pull request #13931:
URL: https://github.com/apache/airflow/pull/13931


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on pull request #13931: Don't load plugins inside Scheduling loop

Posted by GitBox <gi...@apache.org>.
kaxil commented on pull request #13931:
URL: https://github.com/apache/airflow/pull/13931#issuecomment-770047879


   Closing this in favor of https://github.com/apache/airflow/pull/13932


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] jhtimmins commented on a change in pull request #13931: Don't load plugins inside Scheduling loop

Posted by GitBox <gi...@apache.org>.
jhtimmins commented on a change in pull request #13931:
URL: https://github.com/apache/airflow/pull/13931#discussion_r567060866



##########
File path: airflow/serialization/serialized_objects.py
##########
@@ -407,35 +408,39 @@ def serialize_operator(cls, op: BaseOperator) -> Dict[str, Any]:
     @classmethod
     def deserialize_operator(cls, encoded_op: Dict[str, Any]) -> BaseOperator:
         """Deserializes an operator from a JSON object."""
-        from airflow import plugins_manager
-
-        plugins_manager.initialize_extra_operators_links_plugins()
-
-        if plugins_manager.operator_extra_links is None:
-            raise AirflowException("Can not load plugins")
         op = SerializedBaseOperator(task_id=encoded_op['task_id'])
 
-        # Extra Operator Links defined in Plugins
-        op_extra_links_from_plugin = {}
-
         if "label" not in encoded_op:
             # Handle deserialization of old data before the introduction of TaskGroup
             encoded_op["label"] = encoded_op["task_id"]
 
-        for ope in plugins_manager.operator_extra_links:
-            for operator in ope.operators:
-                if (
-                    operator.__name__ == encoded_op["_task_type"]
-                    and operator.__module__ == encoded_op["_task_module"]
-                ):
-                    op_extra_links_from_plugin.update({ope.name: ope})
-
-        # If OperatorLinks are defined in Plugins but not in the Operator that is being Serialized
-        # set the Operator links attribute
-        # The case for "If OperatorLinks are defined in the operator that is being Serialized"
-        # is handled in the deserialization loop where it matches k == "_operator_extra_links"
-        if op_extra_links_from_plugin and "_operator_extra_links" not in encoded_op:
-            setattr(op, "operator_extra_links", list(op_extra_links_from_plugin.values()))
+        # Extra Operator Links defined in Plugins
+        op_extra_links_from_plugin = {}
+
+        load_op_links = "AIRFLOW_IN_SCHEDULING_LOOP" not in os.environ
+        # We don't want to load Extra Operator links in Scheduler
+        if load_op_links:  # pylint: disable=too-many-nested-blocks
+            from airflow import plugins_manager
+
+            plugins_manager.initialize_extra_operators_links_plugins()
+
+            if plugins_manager.operator_extra_links is None:
+                raise AirflowException("Can not load plugins")
+
+            for ope in plugins_manager.operator_extra_links:

Review comment:
       Some of these variable names are ambiguous. Particularly `ope`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org