You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2019/01/10 23:04:01 UTC

[GitHub] feng-tao closed pull request #4412: [AIRFLOW-3605] Load plugins from entry_points

feng-tao closed pull request #4412: [AIRFLOW-3605] Load plugins from entry_points
URL: https://github.com/apache/airflow/pull/4412
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/airflow/plugins_manager.py b/airflow/plugins_manager.py
index 0eee5f8f93..d0efe62da9 100644
--- a/airflow/plugins_manager.py
+++ b/airflow/plugins_manager.py
@@ -28,6 +28,7 @@
 import os
 import re
 import sys
+import pkg_resources
 
 from airflow import configuration
 from airflow.utils.log.logging_mixin import LoggingMixin
@@ -59,6 +60,59 @@ def validate(cls):
         if not cls.name:
             raise AirflowPluginException("Your plugin needs a name.")
 
+    @classmethod
+    def on_load(cls, *args, **kwargs):
+        """
+        Executed when the plugin is loaded.
+        This method is only called once during runtime.
+
+        :param args: If future arguments are passed in on call.
+        :param kwargs: If future arguments are passed in on call.
+        """
+        pass
+
+
+def load_entrypoint_plugins(entry_points, airflow_plugins):
+    """
+    Load AirflowPlugin subclasses from the entrypoints
+    provided. The entry_point group should be 'airflow.plugins'.
+
+    :param entry_points: A collection of entrypoints to search for plugins
+    :type entry_points: Generator[setuptools.EntryPoint, None, None]
+    :param airflow_plugins: A collection of existing airflow plugins to
+        ensure we don't load duplicates
+    :type airflow_plugins: List[AirflowPlugin]
+    :return: List[Type[AirflowPlugin]]
+    """
+    for entry_point in entry_points:
+        log.debug('Importing entry_point plugin %s', entry_point.name)
+        plugin_obj = entry_point.load()
+        if is_valid_plugin(plugin_obj, airflow_plugins):
+            if callable(getattr(plugin_obj, 'on_load', None)):
+                plugin_obj.on_load()
+                airflow_plugins.append(plugin_obj)
+    return airflow_plugins
+
+
+def is_valid_plugin(plugin_obj, existing_plugins):
+    """
+    Check whether a potential object is a subclass of
+    the AirflowPlugin class.
+
+    :param plugin_obj: potential subclass of AirflowPlugin
+    :param existing_plugins: Existing list of AirflowPlugin subclasses
+    :return: Whether or not the obj is a valid subclass of
+        AirflowPlugin
+    """
+    if (
+        inspect.isclass(plugin_obj) and
+        issubclass(plugin_obj, AirflowPlugin) and
+        (plugin_obj is not AirflowPlugin)
+    ):
+        plugin_obj.validate()
+        return plugin_obj not in existing_plugins
+    return False
+
 
 plugins_folder = configuration.conf.get('core', 'plugins_folder')
 if not plugins_folder:
@@ -90,19 +144,19 @@ def validate(cls):
 
             m = imp.load_source(namespace, filepath)
             for obj in list(m.__dict__.values()):
-                if (
-                        inspect.isclass(obj) and
-                        issubclass(obj, AirflowPlugin) and
-                        obj is not AirflowPlugin):
-                    obj.validate()
-                    if obj not in plugins:
-                        plugins.append(obj)
+                if is_valid_plugin(obj, plugins):
+                    plugins.append(obj)
 
         except Exception as e:
             log.exception(e)
             log.error('Failed to import plugin %s', filepath)
             import_errors[filepath] = str(e)
 
+plugins = load_entrypoint_plugins(
+    pkg_resources.iter_entry_points('airflow.plugins'),
+    plugins
+)
+
 
 def make_module(name, objects):
     log.debug('Creating module %s', name)
diff --git a/docs/plugins.rst b/docs/plugins.rst
index e2f37774de..3e72903a55 100644
--- a/docs/plugins.rst
+++ b/docs/plugins.rst
@@ -220,3 +220,56 @@ Note on role based views
 Airflow 1.10 introduced role based views using FlaskAppBuilder. You can configure which UI is used by setting
 rbac = True. To support plugin views and links for both versions of the UI and maintain backwards compatibility,
 the fields appbuilder_views and appbuilder_menu_items were added to the AirflowTestPlugin class.
+
+
+Plugins as Python packages
+--------------------------
+
+It is possible to load plugins via `setuptools' entrypoint<https://packaging.python.org/guides/creating-and-discovering-plugins/#using-package-metadata>`_ mechanism. To do this link
+your plugin using an entrypoint in your package. If the package is installed, airflow
+will automatically load the registered plugins from the entrypoint list.
+
+_Note_: Neither the entrypoint name (eg, `my_plugin`) nor the name of the
+plugin class will contribute towards the module and class name of the plugin
+itself. The structure is determined by
+`airflow.plugins_manager.AirflowPlugin.name` and the class name of the plugin
+component with the pattern `airflow.{component}.{name}.{component_class_name}`.
+
+.. code-block:: python
+
+    # my_package/my_plugin.py
+    from airflow.plugins_manager import AirflowPlugin
+    from airflow.models import BaseOperator
+    from airflow.hooks.base_hook import BaseHook
+
+    class MyOperator(BaseOperator):
+      pass
+
+    class MyHook(BaseHook):
+      pass
+
+    class MyAirflowPlugin(AirflowPlugin):
+      name = 'my_namespace'
+      operators = [MyOperator]
+      hooks = [MyHook]
+
+
+.. code-block:: python
+
+    from setuptools import setup
+
+    setup(
+        name="my-package",
+        ...
+        entry_points = {
+            'airflow.plugins': [
+                'my_plugin = my_package.my_plugin:MyAirflowPlugin'
+            ]
+        }
+    )
+
+::
+
+This will create a hook, and an operator accessible at:
+ - `airflow.hooks.my_namespace.MyHook`
+ - `airflow.operators.my_namespace.MyOperator`
diff --git a/tests/plugins/test_plugin.py b/tests/plugins/test_plugin.py
index 68aa989a2e..8bf0416b22 100644
--- a/tests/plugins/test_plugin.py
+++ b/tests/plugins/test_plugin.py
@@ -117,3 +117,15 @@ class AirflowTestPlugin(AirflowPlugin):
     menu_links = [ml]
     appbuilder_views = [v_appbuilder_package]
     appbuilder_menu_items = [appbuilder_mitem]
+
+
+class MockPluginA(AirflowPlugin):
+    name = 'plugin-a'
+
+
+class MockPluginB(AirflowPlugin):
+    name = 'plugin-b'
+
+
+class MockPluginC(AirflowPlugin):
+    name = 'plugin-c'
diff --git a/tests/plugins/test_plugins_manager_www.py b/tests/plugins/test_plugins_manager_www.py
index 130bd32c73..d47feb6e76 100644
--- a/tests/plugins/test_plugins_manager_www.py
+++ b/tests/plugins/test_plugins_manager_www.py
@@ -23,15 +23,18 @@
 from __future__ import unicode_literals
 
 import unittest
+from mock import MagicMock, PropertyMock
 
 from flask.blueprints import Blueprint
 from flask_admin.menu import MenuLink, MenuView
 
 from airflow.hooks.base_hook import BaseHook
 from airflow.models import BaseOperator
+from airflow.plugins_manager import load_entrypoint_plugins, is_valid_plugin
 from airflow.sensors.base_sensor_operator import BaseSensorOperator
 from airflow.executors.base_executor import BaseExecutor
 from airflow.www.app import create_app
+from tests.plugins.test_plugin import MockPluginA, MockPluginB, MockPluginC
 
 
 class PluginsTest(unittest.TestCase):
@@ -83,3 +86,54 @@ def test_menu_links(self):
         [menu_link] = [ml for ml in category.get_children()
                        if isinstance(ml, MenuLink)]
         self.assertEqual('Test Menu Link', menu_link.name)
+
+
+class PluginsTestEntrypointLoad(unittest.TestCase):
+
+    def setUp(self):
+        self.expected = [MockPluginA, MockPluginB, MockPluginC]
+        self.entrypoints = [
+            self._build_mock(plugin_obj)
+            for plugin_obj in self.expected
+        ]
+
+    def _build_mock(self, plugin_obj):
+        m = MagicMock(**{
+            'load.return_value': plugin_obj
+        })
+        type(m).name = PropertyMock(return_value='plugin-' + plugin_obj.name)
+        return m
+
+    def test_load_entrypoint_plugins(self):
+        self.assertListEqual(
+            load_entrypoint_plugins(self.entrypoints, []),
+            self.expected
+        )
+
+    def test_failed_load_entrpoint_plugins(self):
+        self.assertListEqual(
+            load_entrypoint_plugins(
+                self.entrypoints[:2] + [self._build_mock(
+                    MagicMock(name='other')
+                )], []),
+            self.expected[:2]
+        )
+
+
+class PluginsTestValidator(unittest.TestCase):
+    def setUp(self):
+        self.existing = [MockPluginA, MockPluginB]
+
+    def test_valid_plugin(self):
+        c = MockPluginC
+        self.assertTrue(
+            is_valid_plugin(
+                c, self.existing
+            ))
+
+    def test_invalid_plugin(self):
+        c = MockPluginC
+        self.assertFalse(
+            is_valid_plugin(
+                c, self.existing + [c]
+            ))


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services