You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by po...@apache.org on 2020/06/29 13:21:34 UTC

[airflow] 28/37: Make loading plugins from entrypoint fault-tolerant (#8732)

This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch v1-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 2e6288daf50807fe967e08642dfe6810b34e8a20
Author: Kaxil Naik <ka...@gmail.com>
AuthorDate: Thu May 21 19:10:59 2020 +0100

    Make loading plugins from entrypoint fault-tolerant (#8732)
    
    (cherry picked from commit fd6e057aeff3dd4e38192fe4a36d3083200c2f5e)
---
 airflow/plugins_manager.py                 | 19 ++++++++++++-------
 tests/plugins/test_plugins_manager_rbac.py | 27 +++++++++++++++++++++++++++
 2 files changed, 39 insertions(+), 7 deletions(-)

diff --git a/airflow/plugins_manager.py b/airflow/plugins_manager.py
index d68f882..b70517b 100644
--- a/airflow/plugins_manager.py
+++ b/airflow/plugins_manager.py
@@ -25,6 +25,7 @@ from __future__ import unicode_literals
 from builtins import object
 import imp
 import inspect
+import logging
 import os
 import re
 from typing import Any, Dict, List, Set, Type
@@ -33,9 +34,8 @@ import pkg_resources
 
 from airflow import settings
 from airflow.models.baseoperator import BaseOperatorLink
-from airflow.utils.log.logging_mixin import LoggingMixin
 
-log = LoggingMixin().log
+log = logging.getLogger(__name__)
 
 import_errors = {}
 
@@ -99,13 +99,18 @@ def load_entrypoint_plugins(entry_points, airflow_plugins):
     :type airflow_plugins: list[type[airflow.plugins_manager.AirflowPlugin]]
     :rtype: list[airflow.plugins_manager.AirflowPlugin]
     """
+    global import_errors  # pylint: disable=global-statement
     for entry_point in entry_points:
         log.debug('Importing entry_point plugin %s', entry_point.name)
-        plugin_obj = entry_point.load()
-        if is_valid_plugin(plugin_obj, airflow_plugins):
-            if callable(getattr(plugin_obj, 'on_load', None)):
-                plugin_obj.on_load()
-                airflow_plugins.append(plugin_obj)
+        try:
+            plugin_obj = entry_point.load()
+            if is_valid_plugin(plugin_obj, airflow_plugins):
+                if callable(getattr(plugin_obj, 'on_load', None)):
+                    plugin_obj.on_load()
+                    airflow_plugins.append(plugin_obj)
+        except Exception as e:  # pylint: disable=broad-except
+            log.exception("Failed to import plugin %s", entry_point.name)
+            import_errors[entry_point.module_name] = str(e)
     return airflow_plugins
 
 
diff --git a/tests/plugins/test_plugins_manager_rbac.py b/tests/plugins/test_plugins_manager_rbac.py
index ded8c58..83edcb6 100644
--- a/tests/plugins/test_plugins_manager_rbac.py
+++ b/tests/plugins/test_plugins_manager_rbac.py
@@ -23,7 +23,10 @@ from __future__ import print_function
 from __future__ import unicode_literals
 
 import unittest
+import six
+from tests.compat import mock
 
+import pkg_resources
 
 from airflow.www_rbac import app as application
 
@@ -71,3 +74,27 @@ class PluginsTestRBAC(unittest.TestCase):
         # Blueprint should be present in the app
         self.assertTrue('test_plugin' in self.app.blueprints)
         self.assertEqual(self.app.blueprints['test_plugin'].name, bp.name)
+
+    @unittest.skipIf(six.PY2, 'self.assertLogs not available for Python 2')
+    @mock.patch('pkg_resources.iter_entry_points')
+    def test_entrypoint_plugin_errors_dont_raise_exceptions(self, mock_ep_plugins):
+        """
+        Test that Airflow does not raise an Error if there is any Exception because of the
+        Plugin.
+        """
+        from airflow.plugins_manager import load_entrypoint_plugins, import_errors
+
+        mock_entrypoint = mock.Mock()
+        mock_entrypoint.name = 'test-entrypoint'
+        mock_entrypoint.module_name = 'test.plugins.test_plugins_manager'
+        mock_entrypoint.load.side_effect = Exception('Version Conflict')
+        mock_ep_plugins.return_value = [mock_entrypoint]
+
+        with self.assertLogs("airflow.plugins_manager", level="ERROR") as log_output:
+            load_entrypoint_plugins(pkg_resources.iter_entry_points('airflow.plugins'), [])
+            received_logs = log_output.output[0]
+            # Assert Traceback is shown too
+            assert "Traceback (most recent call last):" in received_logs
+            assert "Version Conflict" in received_logs
+            assert "Failed to import plugin test-entrypoint" in received_logs
+            assert ('test.plugins.test_plugins_manager', 'Version Conflict') in import_errors.items()