You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ka...@apache.org on 2020/05/30 19:12:47 UTC

[airflow] branch master updated: Support properties in plugins (#9002)

This is an automated email from the ASF dual-hosted git repository.

kamilbregula pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/master by this push:
     new 2b1dc1b  Support properties in plugins (#9002)
2b1dc1b is described below

commit 2b1dc1b8e163ca255da03cad1380d81e13b6b002
Author: Kamil BreguĊ‚a <mi...@users.noreply.github.com>
AuthorDate: Sat May 30 21:12:21 2020 +0200

    Support properties in plugins (#9002)
---
 airflow/plugins_manager.py            | 18 ++++++++++--------
 docs/plugins.rst                      |  5 +++--
 tests/plugins/test_plugins_manager.py | 33 +++++++++++++++++++++++++++++++++
 3 files changed, 46 insertions(+), 10 deletions(-)

diff --git a/airflow/plugins_manager.py b/airflow/plugins_manager.py
index 7e3b4ae..7c2b116 100644
--- a/airflow/plugins_manager.py
+++ b/airflow/plugins_manager.py
@@ -145,11 +145,12 @@ def load_entrypoint_plugins():
     for entry_point in entry_points:  # pylint: disable=too-many-nested-blocks
         log.debug('Importing entry_point plugin %s', entry_point.name)
         try:
-            plugin_obj = entry_point.load()
-            if is_valid_plugin(plugin_obj):
-                if callable(getattr(plugin_obj, 'on_load', None)):
-                    plugin_obj.on_load()
-                    plugins.append(plugin_obj)
+            plugin_class = entry_point.load()
+            if is_valid_plugin(plugin_class):
+                plugin_instance = plugin_class()
+                if callable(getattr(plugin_instance, 'on_load', None)):
+                    plugin_instance.on_load()
+                    plugins.append(plugin_instance)
         except Exception as e:  # pylint: disable=broad-except
             log.exception("Failed to import plugin %s", entry_point.name)
             import_errors[entry_point.module_name] = str(e)
@@ -182,9 +183,10 @@ def load_plugins_from_plugin_directory():
                 mod = importlib.util.module_from_spec(spec)
                 sys.modules[spec.name] = mod
                 loader.exec_module(mod)
-                for obj in list(mod.__dict__.values()):
-                    if is_valid_plugin(obj):
-                        plugins.append(obj)
+                for mod_attr_value in list(mod.__dict__.values()):
+                    if is_valid_plugin(mod_attr_value):
+                        plugin_instance = mod_attr_value()
+                        plugins.append(plugin_instance)
             except Exception as e:  # pylint: disable=broad-except
                 log.exception(e)
                 path = filepath or str(f)
diff --git a/docs/plugins.rst b/docs/plugins.rst
index fc13e9a..e56f36c 100644
--- a/docs/plugins.rst
+++ b/docs/plugins.rst
@@ -115,8 +115,9 @@ looks like:
         # buttons.
         operator_extra_links = []
 
-You can derive it by inheritance (please refer to the example below).
-Please note ``name`` inside this class must be specified.
+You can derive it by inheritance (please refer to the example below). In the example, all options have been
+defined as class attributes, but you can also define them as properties if you need to perform
+additional initialization. Please note ``name`` inside this class must be specified.
 
 After the plugin is imported into Airflow,
 you can invoke it using statement like
diff --git a/tests/plugins/test_plugins_manager.py b/tests/plugins/test_plugins_manager.py
index 47495ac..78da6ba 100644
--- a/tests/plugins/test_plugins_manager.py
+++ b/tests/plugins/test_plugins_manager.py
@@ -19,6 +19,8 @@
 import unittest
 from unittest import mock
 
+from airflow.hooks.base_hook import BaseHook
+from airflow.plugins_manager import AirflowPlugin
 from airflow.www import app as application
 
 
@@ -93,3 +95,34 @@ class TestPluginsRBAC(unittest.TestCase):
             assert "Version Conflict" in received_logs
             assert "Failed to import plugin test-entrypoint" in received_logs
             assert "Version Conflict", "test.plugins.test_plugins_manager" in import_errors.items()
+
+
+class TestPluginsManager(unittest.TestCase):
+    class AirflowTestPropertyPlugin(AirflowPlugin):
+        name = "test_property_plugin"
+
+        @property
+        def operators(self):
+            from airflow.models.baseoperator import BaseOperator
+
+            class PluginPropertyOperator(BaseOperator):
+                pass
+
+            return [PluginPropertyOperator]
+
+        class TestNonPropertyHook(BaseHook):
+            pass
+
+        hooks = [TestNonPropertyHook]
+
+    @mock.patch('airflow.plugins_manager.plugins', [AirflowTestPropertyPlugin()])
+    @mock.patch('airflow.plugins_manager.operators_modules', None)
+    @mock.patch('airflow.plugins_manager.sensors_modules', None)
+    @mock.patch('airflow.plugins_manager.hooks_modules', None)
+    @mock.patch('airflow.plugins_manager.macros_modules', None)
+    def test_should_load_plugins_from_property(self):
+        from airflow import plugins_manager
+        plugins_manager.integrate_dag_plugins()
+        self.assertIn('TestPluginsManager.AirflowTestPropertyPlugin', str(plugins_manager.plugins))
+        self.assertIn('PluginPropertyOperator', str(plugins_manager.operators_modules[0].__dict__))
+        self.assertIn("TestNonPropertyHook", str(plugins_manager.hooks_modules[0].__dict__))