You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2018/12/02 17:45:10 UTC

[GitHub] kaxil closed pull request #4253: [AIRFLOW-3414] Fix reload_module in DagFileProcessorAgent

kaxil closed pull request #4253: [AIRFLOW-3414] Fix reload_module in DagFileProcessorAgent
URL: https://github.com/apache/incubator-airflow/pull/4253
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/airflow/logging_config.py b/airflow/logging_config.py
index 84850b4f82..4865f0567a 100644
--- a/airflow/logging_config.py
+++ b/airflow/logging_config.py
@@ -57,9 +57,9 @@ def configure_logging():
                 .format(logging_class_path, err)
             )
     else:
-        from airflow.config_templates.airflow_local_settings import (
-            DEFAULT_LOGGING_CONFIG as logging_config
-        )
+        logging_class_path = 'airflow.config_templates.' \
+                             'airflow_local_settings.DEFAULT_LOGGING_CONFIG'
+        logging_config = import_string(logging_class_path)
         log.debug('Unable to load custom logging, using default config instead')
 
     try:
@@ -73,7 +73,7 @@ def configure_logging():
 
     validate_logging_config(logging_config)
 
-    return logging_config
+    return logging_class_path
 
 
 def validate_logging_config(logging_config):
diff --git a/airflow/settings.py b/airflow/settings.py
index 8f8420ea22..8691fe4e75 100644
--- a/airflow/settings.py
+++ b/airflow/settings.py
@@ -261,7 +261,7 @@ def configure_action_logging():
 except Exception:
     pass
 
-configure_logging()
+logging_class_path = configure_logging()
 configure_vars()
 configure_adapters()
 # The webservers import this file from models.py with the default settings.
diff --git a/airflow/utils/dag_processing.py b/airflow/utils/dag_processing.py
index 0fe9e917ef..6425f77745 100644
--- a/airflow/utils/dag_processing.py
+++ b/airflow/utils/dag_processing.py
@@ -34,6 +34,7 @@
 from collections import defaultdict
 from collections import namedtuple
 from datetime import timedelta
+from importlib import import_module
 
 import psutil
 from six.moves import range, reload_module
@@ -45,6 +46,7 @@
 from airflow import configuration as conf
 from airflow.dag.base_dag import BaseDag, BaseDagBag
 from airflow.exceptions import AirflowException
+from airflow.settings import logging_class_path
 from airflow.utils import timezone
 from airflow.utils.db import provide_session
 from airflow.utils.log.logging_mixin import LoggingMixin
@@ -539,7 +541,9 @@ def helper():
             # e.g. RotatingFileHandler. And it can cause connection corruption if we
             # do not recreate the SQLA connection pool.
             os.environ['CONFIG_PROCESSOR_MANAGER_LOGGER'] = 'True'
-            reload_module(airflow.config_templates.airflow_local_settings)
+            # Replicating the behavior of how logging module was loaded
+            # in logging_config.py
+            reload_module(import_module(logging_class_path.rsplit('.', 1)[0]))
             reload_module(airflow.settings)
             del os.environ['CONFIG_PROCESSOR_MANAGER_LOGGER']
             processor_manager = DagFileProcessorManager(dag_directory,
diff --git a/tests/utils/test_dag_processing.py b/tests/utils/test_dag_processing.py
index 4043caec56..9c85b66c57 100644
--- a/tests/utils/test_dag_processing.py
+++ b/tests/utils/test_dag_processing.py
@@ -18,12 +18,15 @@
 # under the License.
 
 import os
+import sys
+import tempfile
 import unittest
 from datetime import timedelta
 
 from mock import MagicMock
 
 from airflow import configuration as conf
+from airflow.configuration import mkdir_p
 from airflow.jobs import DagFileProcessor
 from airflow.jobs import LocalTaskJob as LJ
 from airflow.models import DagBag, TaskInstance as TI
@@ -38,6 +41,96 @@
 
 DEFAULT_DATE = timezone.datetime(2016, 1, 1)
 
+SETTINGS_FILE_VALID = """
+LOGGING_CONFIG = {
+    'version': 1,
+    'disable_existing_loggers': False,
+    'formatters': {
+        'airflow.task': {
+            'format': '[%%(asctime)s] {{%%(filename)s:%%(lineno)d}} %%(levelname)s - %%(message)s'
+        },
+    },
+    'handlers': {
+        'console': {
+            'class': 'logging.StreamHandler',
+            'formatter': 'airflow.task',
+            'stream': 'ext://sys.stdout'
+        },
+        'task': {
+            'class': 'logging.StreamHandler',
+            'formatter': 'airflow.task',
+            'stream': 'ext://sys.stdout'
+        },
+    },
+    'loggers': {
+        'airflow': {
+            'handlers': ['console'],
+            'level': 'INFO',
+            'propagate': False
+        },
+        'airflow.task': {
+            'handlers': ['task'],
+            'level': 'INFO',
+            'propagate': False,
+        },
+    }
+}
+"""
+
+SETTINGS_DEFAULT_NAME = 'custom_airflow_local_settings'
+
+
+class settings_context(object):
+    """
+    Sets a settings file and puts it in the Python classpath
+
+    :param content:
+          The content of the settings file
+    """
+
+    def __init__(self, content, dir=None, name='LOGGING_CONFIG'):
+        self.content = content
+        self.settings_root = tempfile.mkdtemp()
+        filename = "{}.py".format(SETTINGS_DEFAULT_NAME)
+
+        if dir:
+            # Replace slashes by dots
+            self.module = dir.replace('/', '.') + '.' + SETTINGS_DEFAULT_NAME + '.' + name
+
+            # Create the directory structure
+            dir_path = os.path.join(self.settings_root, dir)
+            mkdir_p(dir_path)
+
+            # Add the __init__ for the directories
+            # This is required for Python 2.7
+            basedir = self.settings_root
+            for part in dir.split('/'):
+                open(os.path.join(basedir, '__init__.py'), 'w').close()
+                basedir = os.path.join(basedir, part)
+            open(os.path.join(basedir, '__init__.py'), 'w').close()
+
+            self.settings_file = os.path.join(dir_path, filename)
+        else:
+            self.module = SETTINGS_DEFAULT_NAME + '.' + name
+            self.settings_file = os.path.join(self.settings_root, filename)
+
+    def __enter__(self):
+        with open(self.settings_file, 'w') as handle:
+            handle.writelines(self.content)
+        sys.path.append(self.settings_root)
+        conf.set(
+            'core',
+            'logging_config_class',
+            self.module
+        )
+        return self.settings_file
+
+    def __exit__(self, *exc_info):
+        # shutil.rmtree(self.settings_root)
+        # Reset config
+        conf.set('core', 'logging_config_class', '')
+        sys.path.remove(self.settings_root)
+
 
 class TestDagFileProcessorManager(unittest.TestCase):
     def test_set_file_paths_when_processor_file_path_not_in_new_file_paths(self):
@@ -123,6 +216,54 @@ def test_find_zombies(self):
 
 
 class TestDagFileProcessorAgent(unittest.TestCase):
+    def test_reload_module(self):
+        """
+        Configure the context to have core.logging_config_class set to a fake logging
+        class path, thus when reloading logging module the airflow.processor_manager
+        logger should not be configured.
+        """
+        with settings_context(SETTINGS_FILE_VALID):
+            # Launch a process through DagFileProcessorAgent, which will try
+            # reload the logging module.
+            def processor_factory(file_path, zombies):
+                return DagFileProcessor(file_path,
+                                        False,
+                                        [],
+                                        zombies)
+
+            test_dag_path = os.path.join(TEST_DAG_FOLDER, 'test_scheduler_dags.py')
+            async_mode = 'sqlite' not in conf.get('core', 'sql_alchemy_conn')
+
+            log_file_loc = conf.get('core', 'DAG_PROCESSOR_MANAGER_LOG_LOCATION')
+            try:
+                os.remove(log_file_loc)
+            except OSError:
+                pass
+
+            # Starting dag processing with 0 max_runs to avoid redundant operations.
+            processor_agent = DagFileProcessorAgent(test_dag_path,
+                                                    [],
+                                                    0,
+                                                    processor_factory,
+                                                    async_mode)
+            manager_process = \
+                processor_agent._launch_process(processor_agent._dag_directory,
+                                                processor_agent._file_paths,
+                                                processor_agent._max_runs,
+                                                processor_agent._processor_factory,
+                                                processor_agent._child_signal_conn,
+                                                processor_agent._stat_queue,
+                                                processor_agent._result_queue,
+                                                processor_agent._async_mode)
+            if not async_mode:
+                processor_agent.heartbeat()
+
+            manager_process.join()
+
+            # Since we are reloading logging config not creating this file,
+            # we should expect it to be nonexistent.
+            self.assertFalse(os.path.isfile(log_file_loc))
+
     def test_parse_once(self):
         def processor_factory(file_path, zombies):
             return DagFileProcessor(file_path,
@@ -164,7 +305,7 @@ def processor_factory(file_path, zombies):
         except OSError:
             pass
 
-        # Starting dag processing with 0 max_runs to avoid redundent operations.
+        # Starting dag processing with 0 max_runs to avoid redundant operations.
         processor_agent = DagFileProcessorAgent(test_dag_path,
                                                 [],
                                                 0,


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services