You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by po...@apache.org on 2020/06/29 13:21:34 UTC
[airflow] 28/37: Make loading plugins from entrypoint
fault-tolerant (#8732)
This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch v1-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 2e6288daf50807fe967e08642dfe6810b34e8a20
Author: Kaxil Naik <ka...@gmail.com>
AuthorDate: Thu May 21 19:10:59 2020 +0100
Make loading plugins from entrypoint fault-tolerant (#8732)
(cherry picked from commit fd6e057aeff3dd4e38192fe4a36d3083200c2f5e)
---
airflow/plugins_manager.py | 19 ++++++++++++-------
tests/plugins/test_plugins_manager_rbac.py | 27 +++++++++++++++++++++++++++
2 files changed, 39 insertions(+), 7 deletions(-)
diff --git a/airflow/plugins_manager.py b/airflow/plugins_manager.py
index d68f882..b70517b 100644
--- a/airflow/plugins_manager.py
+++ b/airflow/plugins_manager.py
@@ -25,6 +25,7 @@ from __future__ import unicode_literals
from builtins import object
import imp
import inspect
+import logging
import os
import re
from typing import Any, Dict, List, Set, Type
@@ -33,9 +34,8 @@ import pkg_resources
from airflow import settings
from airflow.models.baseoperator import BaseOperatorLink
-from airflow.utils.log.logging_mixin import LoggingMixin
-log = LoggingMixin().log
+log = logging.getLogger(__name__)
import_errors = {}
@@ -99,13 +99,18 @@ def load_entrypoint_plugins(entry_points, airflow_plugins):
:type airflow_plugins: list[type[airflow.plugins_manager.AirflowPlugin]]
:rtype: list[airflow.plugins_manager.AirflowPlugin]
"""
+ global import_errors # pylint: disable=global-statement
for entry_point in entry_points:
log.debug('Importing entry_point plugin %s', entry_point.name)
- plugin_obj = entry_point.load()
- if is_valid_plugin(plugin_obj, airflow_plugins):
- if callable(getattr(plugin_obj, 'on_load', None)):
- plugin_obj.on_load()
- airflow_plugins.append(plugin_obj)
+ try:
+ plugin_obj = entry_point.load()
+ if is_valid_plugin(plugin_obj, airflow_plugins):
+ if callable(getattr(plugin_obj, 'on_load', None)):
+ plugin_obj.on_load()
+ airflow_plugins.append(plugin_obj)
+ except Exception as e: # pylint: disable=broad-except
+ log.exception("Failed to import plugin %s", entry_point.name)
+ import_errors[entry_point.module_name] = str(e)
return airflow_plugins
diff --git a/tests/plugins/test_plugins_manager_rbac.py b/tests/plugins/test_plugins_manager_rbac.py
index ded8c58..83edcb6 100644
--- a/tests/plugins/test_plugins_manager_rbac.py
+++ b/tests/plugins/test_plugins_manager_rbac.py
@@ -23,7 +23,10 @@ from __future__ import print_function
from __future__ import unicode_literals
import unittest
+import six
+from tests.compat import mock
+import pkg_resources
from airflow.www_rbac import app as application
@@ -71,3 +74,27 @@ class PluginsTestRBAC(unittest.TestCase):
# Blueprint should be present in the app
self.assertTrue('test_plugin' in self.app.blueprints)
self.assertEqual(self.app.blueprints['test_plugin'].name, bp.name)
+
+ @unittest.skipIf(six.PY2, 'self.assertLogs not available for Python 2')
+ @mock.patch('pkg_resources.iter_entry_points')
+ def test_entrypoint_plugin_errors_dont_raise_exceptions(self, mock_ep_plugins):
+ """
+ Test that Airflow does not raise an Error if there is any Exception because of the
+ Plugin.
+ """
+ from airflow.plugins_manager import load_entrypoint_plugins, import_errors
+
+ mock_entrypoint = mock.Mock()
+ mock_entrypoint.name = 'test-entrypoint'
+ mock_entrypoint.module_name = 'test.plugins.test_plugins_manager'
+ mock_entrypoint.load.side_effect = Exception('Version Conflict')
+ mock_ep_plugins.return_value = [mock_entrypoint]
+
+ with self.assertLogs("airflow.plugins_manager", level="ERROR") as log_output:
+ load_entrypoint_plugins(pkg_resources.iter_entry_points('airflow.plugins'), [])
+ received_logs = log_output.output[0]
+ # Assert Traceback is shown too
+ assert "Traceback (most recent call last):" in received_logs
+ assert "Version Conflict" in received_logs
+ assert "Failed to import plugin test-entrypoint" in received_logs
+ assert ('test.plugins.test_plugins_manager', 'Version Conflict') in import_errors.items()