You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ka...@apache.org on 2020/05/30 19:12:47 UTC
[airflow] branch master updated: Support properties in plugins
(#9002)
This is an automated email from the ASF dual-hosted git repository.
kamilbregula pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/master by this push:
new 2b1dc1b Support properties in plugins (#9002)
2b1dc1b is described below
commit 2b1dc1b8e163ca255da03cad1380d81e13b6b002
Author: Kamil BreguĊa <mi...@users.noreply.github.com>
AuthorDate: Sat May 30 21:12:21 2020 +0200
Support properties in plugins (#9002)
---
airflow/plugins_manager.py | 18 ++++++++++--------
docs/plugins.rst | 5 +++--
tests/plugins/test_plugins_manager.py | 33 +++++++++++++++++++++++++++++++++
3 files changed, 46 insertions(+), 10 deletions(-)
diff --git a/airflow/plugins_manager.py b/airflow/plugins_manager.py
index 7e3b4ae..7c2b116 100644
--- a/airflow/plugins_manager.py
+++ b/airflow/plugins_manager.py
@@ -145,11 +145,12 @@ def load_entrypoint_plugins():
for entry_point in entry_points: # pylint: disable=too-many-nested-blocks
log.debug('Importing entry_point plugin %s', entry_point.name)
try:
- plugin_obj = entry_point.load()
- if is_valid_plugin(plugin_obj):
- if callable(getattr(plugin_obj, 'on_load', None)):
- plugin_obj.on_load()
- plugins.append(plugin_obj)
+ plugin_class = entry_point.load()
+ if is_valid_plugin(plugin_class):
+ plugin_instance = plugin_class()
+ if callable(getattr(plugin_instance, 'on_load', None)):
+ plugin_instance.on_load()
+ plugins.append(plugin_instance)
except Exception as e: # pylint: disable=broad-except
log.exception("Failed to import plugin %s", entry_point.name)
import_errors[entry_point.module_name] = str(e)
@@ -182,9 +183,10 @@ def load_plugins_from_plugin_directory():
mod = importlib.util.module_from_spec(spec)
sys.modules[spec.name] = mod
loader.exec_module(mod)
- for obj in list(mod.__dict__.values()):
- if is_valid_plugin(obj):
- plugins.append(obj)
+ for mod_attr_value in list(mod.__dict__.values()):
+ if is_valid_plugin(mod_attr_value):
+ plugin_instance = mod_attr_value()
+ plugins.append(plugin_instance)
except Exception as e: # pylint: disable=broad-except
log.exception(e)
path = filepath or str(f)
diff --git a/docs/plugins.rst b/docs/plugins.rst
index fc13e9a..e56f36c 100644
--- a/docs/plugins.rst
+++ b/docs/plugins.rst
@@ -115,8 +115,9 @@ looks like:
# buttons.
operator_extra_links = []
-You can derive it by inheritance (please refer to the example below).
-Please note ``name`` inside this class must be specified.
+You can derive it by inheritance (please refer to the example below). In the example, all options have been
+defined as class attributes, but you can also define them as properties if you need to perform
+additional initialization. Please note ``name`` inside this class must be specified.
After the plugin is imported into Airflow,
you can invoke it using statement like
diff --git a/tests/plugins/test_plugins_manager.py b/tests/plugins/test_plugins_manager.py
index 47495ac..78da6ba 100644
--- a/tests/plugins/test_plugins_manager.py
+++ b/tests/plugins/test_plugins_manager.py
@@ -19,6 +19,8 @@
import unittest
from unittest import mock
+from airflow.hooks.base_hook import BaseHook
+from airflow.plugins_manager import AirflowPlugin
from airflow.www import app as application
@@ -93,3 +95,34 @@ class TestPluginsRBAC(unittest.TestCase):
assert "Version Conflict" in received_logs
assert "Failed to import plugin test-entrypoint" in received_logs
assert "Version Conflict", "test.plugins.test_plugins_manager" in import_errors.items()
+
+
+class TestPluginsManager(unittest.TestCase):
+ class AirflowTestPropertyPlugin(AirflowPlugin):
+ name = "test_property_plugin"
+
+ @property
+ def operators(self):
+ from airflow.models.baseoperator import BaseOperator
+
+ class PluginPropertyOperator(BaseOperator):
+ pass
+
+ return [PluginPropertyOperator]
+
+ class TestNonPropertyHook(BaseHook):
+ pass
+
+ hooks = [TestNonPropertyHook]
+
+ @mock.patch('airflow.plugins_manager.plugins', [AirflowTestPropertyPlugin()])
+ @mock.patch('airflow.plugins_manager.operators_modules', None)
+ @mock.patch('airflow.plugins_manager.sensors_modules', None)
+ @mock.patch('airflow.plugins_manager.hooks_modules', None)
+ @mock.patch('airflow.plugins_manager.macros_modules', None)
+ def test_should_load_plugins_from_property(self):
+ from airflow import plugins_manager
+ plugins_manager.integrate_dag_plugins()
+ self.assertIn('TestPluginsManager.AirflowTestPropertyPlugin', str(plugins_manager.plugins))
+ self.assertIn('PluginPropertyOperator', str(plugins_manager.operators_modules[0].__dict__))
+ self.assertIn("TestNonPropertyHook", str(plugins_manager.hooks_modules[0].__dict__))