You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2020/03/07 13:51:34 UTC

[GitHub] [airflow] mik-laj opened a new pull request #7644: [AIRFLOW-7003][WIP] Lazy load all plguins

mik-laj opened a new pull request #7644: [AIRFLOW-7003][WIP] Lazy load all plguins
URL: https://github.com/apache/airflow/pull/7644
 
 
   ---
   Issue link: WILL BE INSERTED BY [boring-cyborg](https://github.com/kaxil/boring-cyborg)
   
   Make sure to mark the boxes below before creating PR: [x]
   
   - [X] Description above provides context of the change
   - [X] Commit message/PR title starts with `[AIRFLOW-NNNN]`. AIRFLOW-NNNN = JIRA ID<sup>*</sup>
   - [X] Unit tests coverage for changes (not needed for documentation changes)
   - [X] Commits follow "[How to write a good git commit message](http://chris.beams.io/posts/git-commit/)"
   - [X] Relevant documentation is updated including usage instructions.
   - [X] I will engage committers as explained in [Contribution Workflow Example](https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#contribution-workflow-example).
   
   <sup>*</sup> For document-only changes commit message can start with `[AIRFLOW-XXXX]`.
   
   ---
   In case of fundamental code change, Airflow Improvement Proposal ([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvements+Proposals)) is needed.
   In case of a new dependency, check compliance with the [ASF 3rd Party License Policy](https://www.apache.org/legal/resolved.html#category-x).
   In case of backwards incompatible changes please leave a note in [UPDATING.md](https://github.com/apache/airflow/blob/master/UPDATING.md).
   Read the [Pull Request Guidelines](https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#pull-request-guidelines) for more information.
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] kaxil commented on a change in pull request #7644: [AIRFLOW-7003] Lazy load all plguins

Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #7644: [AIRFLOW-7003] Lazy load all plguins
URL: https://github.com/apache/airflow/pull/7644#discussion_r389823090
 
 

 ##########
 File path: airflow/serialization/serialized_objects.py
 ##########
 @@ -326,14 +326,15 @@ def serialize_operator(cls, op: BaseOperator) -> dict:
     def deserialize_operator(cls, encoded_op: Dict[str, Any]) -> BaseOperator:
         """Deserializes an operator from a JSON object.
         """
-        from airflow.plugins_manager import operator_extra_links
+        from airflow import plugins_manager
+        plugins_manager.endure_plugins_loaded()
 
 Review comment:
   ```suggestion
           plugins_manager.ensure_plugins_loaded()
   ```

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] ashb commented on a change in pull request #7644: [AIRFLOW-7003] Lazy load all plguins

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #7644: [AIRFLOW-7003] Lazy load all plguins
URL: https://github.com/apache/airflow/pull/7644#discussion_r389611234
 
 

 ##########
 File path: airflow/plugins_manager.py
 ##########
 @@ -34,6 +34,32 @@
 
 import_errors = {}
 
+plugins = None  # type: Optional[List[AirflowPlugin]]
+
+norm_pattern = re.compile(r'[/|.]')
 
 Review comment:
   I don't think this is used anymore. 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] mik-laj merged pull request #7644: [AIRFLOW-7003] Lazy load all plguins

Posted by GitBox <gi...@apache.org>.
mik-laj merged pull request #7644: [AIRFLOW-7003] Lazy load all plguins
URL: https://github.com/apache/airflow/pull/7644
 
 
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] ashb commented on a change in pull request #7644: [AIRFLOW-7003] Lazy load all plguins

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #7644: [AIRFLOW-7003] Lazy load all plguins
URL: https://github.com/apache/airflow/pull/7644#discussion_r392208874
 
 

 ##########
 File path: airflow/serialization/serialized_objects.py
 ##########
 @@ -326,14 +326,15 @@ def serialize_operator(cls, op: BaseOperator) -> dict:
     def deserialize_operator(cls, encoded_op: Dict[str, Any]) -> BaseOperator:
         """Deserializes an operator from a JSON object.
         """
-        from airflow.plugins_manager import operator_extra_links
+        from airflow import plugins_manager
 
 Review comment:
   It's a security measure:
   
   Since operator links can be dynamic (think a pre-signed S3 url that is only valid for 15mins), so we need to support inflating to "custom" classes, but we don't want to have to trust the serialized blob, so we only inflate classes are pre-registered.
   
   This is a class of bugs called "Object Injection Attacks" -- if we trusted the input and de-serialized whatever class was here we might end up opening a reverse shell etc. https://blog.nelhage.com/2011/03/exploiting-pickle/ as an example. This defense is not perfect as the plugins are "under user control" but this is mostly looking forward to when we will have an API that accepts a serialized DAG blob to run.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] kaxil commented on a change in pull request #7644: [AIRFLOW-7003] Lazy load all plguins

Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #7644: [AIRFLOW-7003] Lazy load all plguins
URL: https://github.com/apache/airflow/pull/7644#discussion_r389821560
 
 

 ##########
 File path: airflow/plugins_manager.py
 ##########
 @@ -84,89 +107,82 @@ def on_load(cls, *args, **kwargs):
         """
 
 
-def load_entrypoint_plugins(entry_points, airflow_plugins):
-    """
-    Load AirflowPlugin subclasses from the entrypoints
-    provided. The entry_point group should be 'airflow.plugins'.
-
-    :param entry_points: A collection of entrypoints to search for plugins
-    :type entry_points: Generator[setuptools.EntryPoint, None, None]
-    :param airflow_plugins: A collection of existing airflow plugins to
-        ensure we don't load duplicates
-    :type airflow_plugins: list[type[airflow.plugins_manager.AirflowPlugin]]
-    :rtype: list[airflow.plugins_manager.AirflowPlugin]
-    """
-    for entry_point in entry_points:
-        log.debug('Importing entry_point plugin %s', entry_point.name)
-        plugin_obj = entry_point.load()
-        if is_valid_plugin(plugin_obj, airflow_plugins):
-            if callable(getattr(plugin_obj, 'on_load', None)):
-                plugin_obj.on_load()
-                airflow_plugins.append(plugin_obj)
-    return airflow_plugins
-
-
-def is_valid_plugin(plugin_obj, existing_plugins):
+def is_valid_plugin(plugin_obj):
     """
     Check whether a potential object is a subclass of
     the AirflowPlugin class.
 
     :param plugin_obj: potential subclass of AirflowPlugin
-    :param existing_plugins: Existing list of AirflowPlugin subclasses
     :return: Whether or not the obj is a valid subclass of
         AirflowPlugin
     """
+    global plugins  # pylint: disable=global-statement
+
     if (
         inspect.isclass(plugin_obj) and
         issubclass(plugin_obj, AirflowPlugin) and
         (plugin_obj is not AirflowPlugin)
     ):
         plugin_obj.validate()
-        return plugin_obj not in existing_plugins
+        return plugin_obj not in plugins
     return False
 
 
-plugins = []  # type: List[AirflowPlugin]
-
-norm_pattern = re.compile(r'[/|.]')
-
-if not settings.PLUGINS_FOLDER:
-    raise ValueError("Plugins folder is not set")
-
-# Crawl through the plugins folder to find AirflowPlugin derivatives
-for root, dirs, files in os.walk(settings.PLUGINS_FOLDER, followlinks=True):
-    for f in files:
-        filepath = os.path.join(root, f)
-        try:
-            if not os.path.isfile(filepath):
-                continue
-            mod_name, file_ext = os.path.splitext(
-                os.path.split(filepath)[-1])
-            if file_ext != '.py':
-                continue
-
-            log.debug('Importing plugin module %s', filepath)
-            # normalize root path as namespace
-            namespace = '_'.join([re.sub(norm_pattern, '__', root), mod_name])
-
-            loader = importlib.machinery.SourceFileLoader(mod_name, filepath)
-            spec = importlib.util.spec_from_loader(mod_name, loader)
-            m = importlib.util.module_from_spec(spec)
-            sys.modules[spec.name] = m
-            loader.exec_module(m)
-            for obj in list(m.__dict__.values()):
-                if is_valid_plugin(obj, plugins):
-                    plugins.append(obj)
-        except Exception as e:  # pylint: disable=broad-except
-            log.exception(e)
-            path = filepath or str(f)
-            log.error('Failed to import plugin %s', path)
-            import_errors[path] = str(e)
-
-plugins = load_entrypoint_plugins(
-    pkg_resources.iter_entry_points('airflow.plugins'),
-    plugins
-)
+def load_entrypoint_plugins():
+    """
+    Load and register plugins AirflowPlugin subclasses from the entrypoints.
+    The entry_point group should be 'airflow.plugins'.
+    """
+    global plugins  # pylint: disable=global-statement
+
+    entry_points = pkg_resources.iter_entry_points('airflow.plugins')
+
+    log.debug("Loading plugins from entrypoints")
+
+    for entry_point in entry_points:
+        log.debug('Importing entry_point plugin %s', entry_point.name)
+        plugin_obj = entry_point.load()
+        if is_valid_plugin(plugin_obj):
+            if callable(getattr(plugin_obj, 'on_load', None)):
+                plugin_obj.on_load()
+                plugins.append(plugin_obj)
+
+
+def load_plugins_from_plugin_directory():
+    """
+    Load and register Airflow Plugin from plugin directory
 
 Review comment:
   ```suggestion
       Load and register Airflow Plugins from plugins directory
   ```

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] houqp commented on a change in pull request #7644: [AIRFLOW-7003] Lazy load all plguins

Posted by GitBox <gi...@apache.org>.
houqp commented on a change in pull request #7644: [AIRFLOW-7003] Lazy load all plguins
URL: https://github.com/apache/airflow/pull/7644#discussion_r390008524
 
 

 ##########
 File path: airflow/serialization/serialized_objects.py
 ##########
 @@ -326,14 +326,15 @@ def serialize_operator(cls, op: BaseOperator) -> dict:
     def deserialize_operator(cls, encoded_op: Dict[str, Any]) -> BaseOperator:
         """Deserializes an operator from a JSON object.
         """
-        from airflow.plugins_manager import operator_extra_links
+        from airflow import plugins_manager
 
 Review comment:
   not related to your change, but i am just curious: do you know what's the reason to do a lazy import for plugins_manager here as well as in `_deserialize_operator_extra_links`?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] ashb commented on a change in pull request #7644: [AIRFLOW-7003] Lazy load all plguins

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #7644: [AIRFLOW-7003] Lazy load all plguins
URL: https://github.com/apache/airflow/pull/7644#discussion_r389611686
 
 

 ##########
 File path: airflow/plugins_manager.py
 ##########
 @@ -183,103 +202,116 @@ def make_module(name: str, objects: List[Any]):
 # pylint: enable=protected-access
 
 
-# Plugin components to integrate as modules
-operators_modules = []
-sensors_modules = []
-hooks_modules = []
-executors_modules = []
-macros_modules = []
+def load_plugins():
+    """
+    Load plugins from plugins directory and entrypoints.
+    Plugins are only loaded if they have not been previously loaded.
 
 Review comment:
   ```suggestion
   
       Plugins are only loaded if they have not been previously loaded.
   ```

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] kaxil commented on a change in pull request #7644: [AIRFLOW-7003] Lazy load all plguins

Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #7644: [AIRFLOW-7003] Lazy load all plguins
URL: https://github.com/apache/airflow/pull/7644#discussion_r389823378
 
 

 ##########
 File path: airflow/serialization/serialized_objects.py
 ##########
 @@ -397,7 +398,8 @@ def _deserialize_operator_extra_links(
         :param encoded_op_links: Serialized Operator Link
         :return: De-Serialized Operator Link
         """
-        from airflow.plugins_manager import registered_operator_link_classes
+        from airflow import plugins_manager
+        plugins_manager.endure_plugins_loaded()
 
 Review comment:
   ```suggestion
           plugins_manager.ensure_plugins_loaded()
   ```

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] mik-laj commented on issue #7644: [AIRFLOW-7003] Lazy load all plguins

Posted by GitBox <gi...@apache.org>.
mik-laj commented on issue #7644: [AIRFLOW-7003] Lazy load all plguins
URL: https://github.com/apache/airflow/pull/7644#issuecomment-596701039
 
 
   > What about airflow task run or similar?
   
   https://github.com/PolideaInternal/airflow/blob/AIRFLOW-7003/airflow/cli/commands/task_command.py#L170
   https://github.com/PolideaInternal/airflow/blob/AIRFLOW-7003/airflow/utils/cli.py#L155
   https://github.com/PolideaInternal/airflow/blob/AIRFLOW-7003/airflow/models/dagbag.py#L105
   https://github.com/PolideaInternal/airflow/blob/AIRFLOW-7003/airflow/models/dagbag.py#L390
   https://github.com/PolideaInternal/airflow/blob/AIRFLOW-7003/airflow/models/dagbag.py#L195

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] codecov-io commented on issue #7644: [AIRFLOW-7003] Lazy load all plguins

Posted by GitBox <gi...@apache.org>.
codecov-io commented on issue #7644: [AIRFLOW-7003] Lazy load all plguins
URL: https://github.com/apache/airflow/pull/7644#issuecomment-596777052
 
 
   # [Codecov](https://codecov.io/gh/apache/airflow/pull/7644?src=pr&el=h1) Report
   > Merging [#7644](https://codecov.io/gh/apache/airflow/pull/7644?src=pr&el=desc) into [master](https://codecov.io/gh/apache/airflow/commit/f3abd340826289dec23e96b79a6ed9b6a1955027?src=pr&el=desc) will **decrease** coverage by `0.01%`.
   > The diff coverage is `87.12%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/airflow/pull/7644/graphs/tree.svg?width=650&token=WdLKlKHOAU&height=150&src=pr)](https://codecov.io/gh/apache/airflow/pull/7644?src=pr&el=tree)
   
   ```diff
   @@            Coverage Diff             @@
   ##           master    #7644      +/-   ##
   ==========================================
   - Coverage   86.83%   86.82%   -0.02%     
   ==========================================
     Files         897      897              
     Lines       42805    42876      +71     
   ==========================================
   + Hits        37170    37226      +56     
   - Misses       5635     5650      +15
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/airflow/pull/7644?src=pr&el=tree) | Coverage Δ | |
   |---|---|---|
   | [airflow/\_\_init\_\_.py](https://codecov.io/gh/apache/airflow/pull/7644/diff?src=pr&el=tree#diff-YWlyZmxvdy9fX2luaXRfXy5weQ==) | `91.3% <ø> (-0.7%)` | :arrow_down: |
   | [airflow/models/dagbag.py](https://codecov.io/gh/apache/airflow/pull/7644/diff?src=pr&el=tree#diff-YWlyZmxvdy9tb2RlbHMvZGFnYmFnLnB5) | `89.83% <100%> (+0.08%)` | :arrow_up: |
   | [airflow/www/app.py](https://codecov.io/gh/apache/airflow/pull/7644/diff?src=pr&el=tree#diff-YWlyZmxvdy93d3cvYXBwLnB5) | `94.28% <100%> (+0.04%)` | :arrow_up: |
   | [airflow/serialization/serialized\_objects.py](https://codecov.io/gh/apache/airflow/pull/7644/diff?src=pr&el=tree#diff-YWlyZmxvdy9zZXJpYWxpemF0aW9uL3NlcmlhbGl6ZWRfb2JqZWN0cy5weQ==) | `90.26% <100%> (+0.06%)` | :arrow_up: |
   | [airflow/plugins\_manager.py](https://codecov.io/gh/apache/airflow/pull/7644/diff?src=pr&el=tree#diff-YWlyZmxvdy9wbHVnaW5zX21hbmFnZXIucHk=) | `90.14% <85.22%> (+1.01%)` | :arrow_up: |
   | [...rflow/providers/google/cloud/operators/dataflow.py](https://codecov.io/gh/apache/airflow/pull/7644/diff?src=pr&el=tree#diff-YWlyZmxvdy9wcm92aWRlcnMvZ29vZ2xlL2Nsb3VkL29wZXJhdG9ycy9kYXRhZmxvdy5weQ==) | `90.44% <0%> (-8.65%)` | :arrow_down: |
   | [airflow/providers/postgres/hooks/postgres.py](https://codecov.io/gh/apache/airflow/pull/7644/diff?src=pr&el=tree#diff-YWlyZmxvdy9wcm92aWRlcnMvcG9zdGdyZXMvaG9va3MvcG9zdGdyZXMucHk=) | `92.95% <0%> (-1.41%)` | :arrow_down: |
   | [airflow/jobs/scheduler\_job.py](https://codecov.io/gh/apache/airflow/pull/7644/diff?src=pr&el=tree#diff-YWlyZmxvdy9qb2JzL3NjaGVkdWxlcl9qb2IucHk=) | `90.21% <0%> (-0.44%)` | :arrow_down: |
   | [airflow/jobs/backfill\_job.py](https://codecov.io/gh/apache/airflow/pull/7644/diff?src=pr&el=tree#diff-YWlyZmxvdy9qb2JzL2JhY2tmaWxsX2pvYi5weQ==) | `91.87% <0%> (-0.29%)` | :arrow_down: |
   | [airflow/providers/mysql/hooks/mysql.py](https://codecov.io/gh/apache/airflow/pull/7644/diff?src=pr&el=tree#diff-YWlyZmxvdy9wcm92aWRlcnMvbXlzcWwvaG9va3MvbXlzcWwucHk=) | `92.7% <0%> (-0.07%)` | :arrow_down: |
   | ... and [9 more](https://codecov.io/gh/apache/airflow/pull/7644/diff?src=pr&el=tree-more) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/airflow/pull/7644?src=pr&el=continue).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/airflow/pull/7644?src=pr&el=footer). Last update [f3abd34...f44a0d1](https://codecov.io/gh/apache/airflow/pull/7644?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] mik-laj commented on issue #7644: [AIRFLOW-7003] Lazy load all plguins

Posted by GitBox <gi...@apache.org>.
mik-laj commented on issue #7644: [AIRFLOW-7003] Lazy load all plguins
URL: https://github.com/apache/airflow/pull/7644#issuecomment-596581085
 
 
   > I think we need to call integrate_dag_plugins() form somewhere in the task execution path too
   
   I called this method here: 
   https://github.com/apache/airflow/pull/7644/files#diff-0d282e5c2bb3b60ee0d056a990cf0927R195
   
   Both SchedulerJob and LocalTaskJob use this method.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] mik-laj commented on issue #7644: [AIRFLOW-7003][WIP] Lazy load all plguins

Posted by GitBox <gi...@apache.org>.
mik-laj commented on issue #7644: [AIRFLOW-7003][WIP] Lazy load all plguins
URL: https://github.com/apache/airflow/pull/7644#issuecomment-596128936
 
 
   Travis is green.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] mik-laj commented on a change in pull request #7644: [AIRFLOW-7003] Lazy load all plguins

Posted by GitBox <gi...@apache.org>.
mik-laj commented on a change in pull request #7644: [AIRFLOW-7003] Lazy load all plguins
URL: https://github.com/apache/airflow/pull/7644#discussion_r389773706
 
 

 ##########
 File path: airflow/plugins_manager.py
 ##########
 @@ -183,103 +202,116 @@ def make_module(name: str, objects: List[Any]):
 # pylint: enable=protected-access
 
 
-# Plugin components to integrate as modules
-operators_modules = []
-sensors_modules = []
-hooks_modules = []
-executors_modules = []
-macros_modules = []
+def load_plugins():
+    """
+    Load plugins from plugins directory and entrypoints.
+    Plugins are only loaded if they have not been previously loaded.
 
 Review comment:
   Done.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] kaxil commented on a change in pull request #7644: [AIRFLOW-7003] Lazy load all plguins

Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #7644: [AIRFLOW-7003] Lazy load all plguins
URL: https://github.com/apache/airflow/pull/7644#discussion_r389823552
 
 

 ##########
 File path: airflow/www/app.py
 ##########
 @@ -173,16 +173,16 @@ def init_views(appbuilder):
 
             def integrate_plugins():
                 """Integrate plugins to the context"""
-                from airflow.plugins_manager import (
-                    flask_appbuilder_views, flask_appbuilder_menu_links
-                )
+                from airflow import plugins_manager
 
-                for v in flask_appbuilder_views:
+                plugins_manager.endure_plugins_loaded()
 
 Review comment:
   ```suggestion
                   plugins_manager.ensure_plugins_loaded()
   ```

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] mik-laj commented on a change in pull request #7644: [AIRFLOW-7003] Lazy load all plguins

Posted by GitBox <gi...@apache.org>.
mik-laj commented on a change in pull request #7644: [AIRFLOW-7003] Lazy load all plguins
URL: https://github.com/apache/airflow/pull/7644#discussion_r389773655
 
 

 ##########
 File path: airflow/plugins_manager.py
 ##########
 @@ -183,103 +202,116 @@ def make_module(name: str, objects: List[Any]):
 # pylint: enable=protected-access
 
 
-# Plugin components to integrate as modules
-operators_modules = []
-sensors_modules = []
-hooks_modules = []
-executors_modules = []
-macros_modules = []
+def load_plugins():
 
 Review comment:
   Done.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] ashb commented on issue #7644: [AIRFLOW-7003] Lazy load all plguins

Posted by GitBox <gi...@apache.org>.
ashb commented on issue #7644: [AIRFLOW-7003] Lazy load all plguins
URL: https://github.com/apache/airflow/pull/7644#issuecomment-598706095
 
 
   > > What about airflow task run or similar?
   > 
   > https://github.com/PolideaInternal/airflow/blob/AIRFLOW-7003/airflow/cli/commands/task_command.py#L170
   > https://github.com/PolideaInternal/airflow/blob/AIRFLOW-7003/airflow/utils/cli.py#L155
   > https://github.com/PolideaInternal/airflow/blob/AIRFLOW-7003/airflow/models/dagbag.py#L105
   > https://github.com/PolideaInternal/airflow/blob/AIRFLOW-7003/airflow/models/dagbag.py#L390
   > https://github.com/PolideaInternal/airflow/blob/AIRFLOW-7003/airflow/models/dagbag.py#L195
   
   Ah yeah. Perfect.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] mik-laj commented on a change in pull request #7644: [AIRFLOW-7003] Lazy load all plguins

Posted by GitBox <gi...@apache.org>.
mik-laj commented on a change in pull request #7644: [AIRFLOW-7003] Lazy load all plguins
URL: https://github.com/apache/airflow/pull/7644#discussion_r390029675
 
 

 ##########
 File path: airflow/serialization/serialized_objects.py
 ##########
 @@ -326,14 +326,15 @@ def serialize_operator(cls, op: BaseOperator) -> dict:
     def deserialize_operator(cls, encoded_op: Dict[str, Any]) -> BaseOperator:
         """Deserializes an operator from a JSON object.
         """
-        from airflow.plugins_manager import operator_extra_links
+        from airflow import plugins_manager
 
 Review comment:
   I have no idea. This code was written by @kaxil  together with @ashb Can you help us?.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] ashb commented on issue #7644: [AIRFLOW-7003] Lazy load all plguins

Posted by GitBox <gi...@apache.org>.
ashb commented on issue #7644: [AIRFLOW-7003] Lazy load all plguins
URL: https://github.com/apache/airflow/pull/7644#issuecomment-596669436
 
 
   > > I think we need to call integrate_dag_plugins() form somewhere in the task execution path too
   > 
   > I called this method here:
   > https://github.com/apache/airflow/pull/7644/files#diff-0d282e5c2bb3b60ee0d056a990cf0927R195
   > 
   > Both SchedulerJob and LocalTaskJob use this method.
   
   What about `airflow task run` or similar?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] kaxil commented on a change in pull request #7644: [AIRFLOW-7003] Lazy load all plguins

Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #7644: [AIRFLOW-7003] Lazy load all plguins
URL: https://github.com/apache/airflow/pull/7644#discussion_r389819402
 
 

 ##########
 File path: airflow/plugins_manager.py
 ##########
 @@ -183,103 +199,117 @@ def make_module(name: str, objects: List[Any]):
 # pylint: enable=protected-access
 
 
-# Plugin components to integrate as modules
-operators_modules = []
-sensors_modules = []
-hooks_modules = []
-executors_modules = []
-macros_modules = []
+def endure_plugins_loaded():
+    """
+    Load plugins from plugins directory and entrypoints.
 
-# Plugin components to integrate directly
-admin_views: List[Any] = []
-flask_blueprints: List[Any] = []
-menu_links: List[Any] = []
-flask_appbuilder_views: List[Any] = []
-flask_appbuilder_menu_links: List[Any] = []
-global_operator_extra_links: List[Any] = []
-operator_extra_links: List[Any] = []
-registered_operator_link_classes: Dict[str, Type] = {}
-"""Mapping of class names to class of OperatorLinks registered by plugins.
+    Plugins are only loaded if they have not been previously loaded.
+    """
+    global plugins  # pylint: disable=global-statement
+
+    if plugins is not None:
+        log.debug("Plugins are already loaded. Skipping.")
+        return
+
+    if not settings.PLUGINS_FOLDER:
+        raise ValueError("Plugins folder is not set")
+
+    log.debug("Loading plugins")
+
+    plugins = []
+
+    load_plugins_from_plugin_directory()
+    load_entrypoint_plugins()
+
+    initialize_plugins()
+
+
+def initialize_plugins():
+    """Creates modules for loaded extension from plugins"""
+    # pylint: disable=global-statement
+    global plugins
+    global operators_modules
+    global sensors_modules
+    global hooks_modules
+    global executors_modules
+    global macros_modules
+
+    global admin_views
+    global flask_blueprints
+    global menu_links
+    global flask_appbuilder_views
+    global flask_appbuilder_menu_links
+    global global_operator_extra_links
+    global operator_extra_links
+    global registered_operator_link_classes
+    # pylint: enable=global-statement
+
+    log.debug("Initialize plugin modules")
+
+    for plugin in plugins:
+        plugin_name: str = plugin.name
+        operators_modules.append(
+            make_module('airflow.operators.' + plugin_name, plugin.operators + plugin.sensors))
+        sensors_modules.append(
+            make_module('airflow.sensors.' + plugin_name, plugin.sensors)
+        )
+        hooks_modules.append(make_module('airflow.hooks.' + plugin_name, plugin.hooks))
+        executors_modules.append(
+            make_module('airflow.executors.' + plugin_name, plugin.executors))
+        macros_modules.append(make_module('airflow.macros.' + plugin_name, plugin.macros))
+
+        admin_views.extend(plugin.admin_views)
+        menu_links.extend(plugin.menu_links)
+        flask_appbuilder_views.extend(plugin.appbuilder_views)
+        flask_appbuilder_menu_links.extend(plugin.appbuilder_menu_items)
+        flask_blueprints.extend([{
+            'name': plugin.name,
+            'blueprint': bp
+        } for bp in plugin.flask_blueprints])
+        global_operator_extra_links.extend(plugin.global_operator_extra_links)
+        operator_extra_links.extend(list(plugin.operator_extra_links))
+
+        registered_operator_link_classes.update({
+            "{}.{}".format(link.__class__.__module__,
+                           link.__class__.__name__): link.__class__
+            for link in plugin.operator_extra_links
+        })
 
-Used by the DAG serialization code to only allow specific classes to be created
-during deserialization
-"""
 
-for p in plugins:
-    if not p.name:
-        raise AirflowPluginException("Plugin name is missing.")
-    plugin_name: str = p.name
-    operators_modules.append(
-        make_module('airflow.operators.' + plugin_name, p.operators + p.sensors))
-    sensors_modules.append(
-        make_module('airflow.sensors.' + plugin_name, p.sensors)
-    )
-    hooks_modules.append(make_module('airflow.hooks.' + plugin_name, p.hooks))
-    executors_modules.append(
-        make_module('airflow.executors.' + plugin_name, p.executors))
-    macros_modules.append(make_module('airflow.macros.' + plugin_name, p.macros))
-
-    admin_views.extend(p.admin_views)
-    menu_links.extend(p.menu_links)
-    flask_appbuilder_views.extend(p.appbuilder_views)
-    flask_appbuilder_menu_links.extend(p.appbuilder_menu_items)
-    flask_blueprints.extend([{
-        'name': p.name,
-        'blueprint': bp
-    } for bp in p.flask_blueprints])
-    global_operator_extra_links.extend(p.global_operator_extra_links)
-    operator_extra_links.extend(list(p.operator_extra_links))
-
-    registered_operator_link_classes.update({
-        "{}.{}".format(link.__class__.__module__,
-                       link.__class__.__name__): link.__class__
-        for link in p.operator_extra_links
-    })
-
-
-def integrate_operator_plugins() -> None:
-    """Integrate operators plugins to the context"""
+def integrate_executor_plugins() -> None:
+    """Integrate executor plugins to the context."""
+    endure_plugins_loaded()
 
 Review comment:
   ```suggestion
       ensure_plugins_loaded()
   ```

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] kaxil commented on a change in pull request #7644: [AIRFLOW-7003] Lazy load all plguins

Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #7644: [AIRFLOW-7003] Lazy load all plguins
URL: https://github.com/apache/airflow/pull/7644#discussion_r389819304
 
 

 ##########
 File path: airflow/plugins_manager.py
 ##########
 @@ -183,103 +199,117 @@ def make_module(name: str, objects: List[Any]):
 # pylint: enable=protected-access
 
 
-# Plugin components to integrate as modules
-operators_modules = []
-sensors_modules = []
-hooks_modules = []
-executors_modules = []
-macros_modules = []
+def endure_plugins_loaded():
+    """
+    Load plugins from plugins directory and entrypoints.
 
-# Plugin components to integrate directly
-admin_views: List[Any] = []
-flask_blueprints: List[Any] = []
-menu_links: List[Any] = []
-flask_appbuilder_views: List[Any] = []
-flask_appbuilder_menu_links: List[Any] = []
-global_operator_extra_links: List[Any] = []
-operator_extra_links: List[Any] = []
-registered_operator_link_classes: Dict[str, Type] = {}
-"""Mapping of class names to class of OperatorLinks registered by plugins.
+    Plugins are only loaded if they have not been previously loaded.
+    """
+    global plugins  # pylint: disable=global-statement
+
+    if plugins is not None:
+        log.debug("Plugins are already loaded. Skipping.")
+        return
+
+    if not settings.PLUGINS_FOLDER:
+        raise ValueError("Plugins folder is not set")
+
+    log.debug("Loading plugins")
+
+    plugins = []
+
+    load_plugins_from_plugin_directory()
+    load_entrypoint_plugins()
+
+    initialize_plugins()
+
+
+def initialize_plugins():
+    """Creates modules for loaded extension from plugins"""
+    # pylint: disable=global-statement
+    global plugins
+    global operators_modules
+    global sensors_modules
+    global hooks_modules
+    global executors_modules
+    global macros_modules
+
+    global admin_views
+    global flask_blueprints
+    global menu_links
+    global flask_appbuilder_views
+    global flask_appbuilder_menu_links
+    global global_operator_extra_links
+    global operator_extra_links
+    global registered_operator_link_classes
+    # pylint: enable=global-statement
+
+    log.debug("Initialize plugin modules")
+
+    for plugin in plugins:
+        plugin_name: str = plugin.name
+        operators_modules.append(
+            make_module('airflow.operators.' + plugin_name, plugin.operators + plugin.sensors))
+        sensors_modules.append(
+            make_module('airflow.sensors.' + plugin_name, plugin.sensors)
+        )
+        hooks_modules.append(make_module('airflow.hooks.' + plugin_name, plugin.hooks))
+        executors_modules.append(
+            make_module('airflow.executors.' + plugin_name, plugin.executors))
+        macros_modules.append(make_module('airflow.macros.' + plugin_name, plugin.macros))
+
+        admin_views.extend(plugin.admin_views)
+        menu_links.extend(plugin.menu_links)
+        flask_appbuilder_views.extend(plugin.appbuilder_views)
+        flask_appbuilder_menu_links.extend(plugin.appbuilder_menu_items)
+        flask_blueprints.extend([{
+            'name': plugin.name,
+            'blueprint': bp
+        } for bp in plugin.flask_blueprints])
+        global_operator_extra_links.extend(plugin.global_operator_extra_links)
+        operator_extra_links.extend(list(plugin.operator_extra_links))
+
+        registered_operator_link_classes.update({
+            "{}.{}".format(link.__class__.__module__,
+                           link.__class__.__name__): link.__class__
+            for link in plugin.operator_extra_links
+        })
 
-Used by the DAG serialization code to only allow specific classes to be created
-during deserialization
-"""
 
-for p in plugins:
-    if not p.name:
-        raise AirflowPluginException("Plugin name is missing.")
-    plugin_name: str = p.name
-    operators_modules.append(
-        make_module('airflow.operators.' + plugin_name, p.operators + p.sensors))
-    sensors_modules.append(
-        make_module('airflow.sensors.' + plugin_name, p.sensors)
-    )
-    hooks_modules.append(make_module('airflow.hooks.' + plugin_name, p.hooks))
-    executors_modules.append(
-        make_module('airflow.executors.' + plugin_name, p.executors))
-    macros_modules.append(make_module('airflow.macros.' + plugin_name, p.macros))
-
-    admin_views.extend(p.admin_views)
-    menu_links.extend(p.menu_links)
-    flask_appbuilder_views.extend(p.appbuilder_views)
-    flask_appbuilder_menu_links.extend(p.appbuilder_menu_items)
-    flask_blueprints.extend([{
-        'name': p.name,
-        'blueprint': bp
-    } for bp in p.flask_blueprints])
-    global_operator_extra_links.extend(p.global_operator_extra_links)
-    operator_extra_links.extend(list(p.operator_extra_links))
-
-    registered_operator_link_classes.update({
-        "{}.{}".format(link.__class__.__module__,
-                       link.__class__.__name__): link.__class__
-        for link in p.operator_extra_links
-    })
-
-
-def integrate_operator_plugins() -> None:
-    """Integrate operators plugins to the context"""
+def integrate_executor_plugins() -> None:
+    """Integrate executor plugins to the context."""
+    endure_plugins_loaded()
+
+    log.debug("Integrate executor plugins")
+
+    for executors_module in executors_modules:
+        sys.modules[executors_module.__name__] = executors_module
+        # noinspection PyProtectedMember
+        globals()[executors_module._name] = executors_module  # pylint: disable=protected-access
+
+
+def integrate_dag_plugins() -> None:
+    """Integrates operator, sensor, hook, macro plugins."""
+    endure_plugins_loaded()
 
 Review comment:
   ```suggestion
       ensure_plugins_loaded()
   ```

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] kaxil commented on a change in pull request #7644: [AIRFLOW-7003] Lazy load all plguins

Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #7644: [AIRFLOW-7003] Lazy load all plguins
URL: https://github.com/apache/airflow/pull/7644#discussion_r389818149
 
 

 ##########
 File path: airflow/plugins_manager.py
 ##########
 @@ -183,103 +199,117 @@ def make_module(name: str, objects: List[Any]):
 # pylint: enable=protected-access
 
 
-# Plugin components to integrate as modules
-operators_modules = []
-sensors_modules = []
-hooks_modules = []
-executors_modules = []
-macros_modules = []
+def endure_plugins_loaded():
 
 Review comment:
   ```suggestion
   def ensure_plugins_loaded():
   ```

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] mik-laj commented on a change in pull request #7644: [AIRFLOW-7003] Lazy load all plguins

Posted by GitBox <gi...@apache.org>.
mik-laj commented on a change in pull request #7644: [AIRFLOW-7003] Lazy load all plguins
URL: https://github.com/apache/airflow/pull/7644#discussion_r389773869
 
 

 ##########
 File path: airflow/plugins_manager.py
 ##########
 @@ -34,6 +34,32 @@
 
 import_errors = {}
 
+plugins = None  # type: Optional[List[AirflowPlugin]]
+
+norm_pattern = re.compile(r'[/|.]')
 
 Review comment:
   Good catch. Thanks.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] ashb commented on a change in pull request #7644: [AIRFLOW-7003] Lazy load all plguins

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #7644: [AIRFLOW-7003] Lazy load all plguins
URL: https://github.com/apache/airflow/pull/7644#discussion_r389612475
 
 

 ##########
 File path: airflow/plugins_manager.py
 ##########
 @@ -183,103 +202,116 @@ def make_module(name: str, objects: List[Any]):
 # pylint: enable=protected-access
 
 
-# Plugin components to integrate as modules
-operators_modules = []
-sensors_modules = []
-hooks_modules = []
-executors_modules = []
-macros_modules = []
+def load_plugins():
 
 Review comment:
   How about this as a name?
   
   ```suggestion
   def ensure_plugins_loaded():
   ```
   
   It makes it clearer from the call-site that it will only load them once, not every time this fn is called

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services