You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2018/12/02 17:45:10 UTC
[GitHub] kaxil closed pull request #4253: [AIRFLOW-3414] Fix reload_module
in DagFileProcessorAgent
kaxil closed pull request #4253: [AIRFLOW-3414] Fix reload_module in DagFileProcessorAgent
URL: https://github.com/apache/incubator-airflow/pull/4253
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git a/airflow/logging_config.py b/airflow/logging_config.py
index 84850b4f82..4865f0567a 100644
--- a/airflow/logging_config.py
+++ b/airflow/logging_config.py
@@ -57,9 +57,9 @@ def configure_logging():
.format(logging_class_path, err)
)
else:
- from airflow.config_templates.airflow_local_settings import (
- DEFAULT_LOGGING_CONFIG as logging_config
- )
+ logging_class_path = 'airflow.config_templates.' \
+ 'airflow_local_settings.DEFAULT_LOGGING_CONFIG'
+ logging_config = import_string(logging_class_path)
log.debug('Unable to load custom logging, using default config instead')
try:
@@ -73,7 +73,7 @@ def configure_logging():
validate_logging_config(logging_config)
- return logging_config
+ return logging_class_path
def validate_logging_config(logging_config):
diff --git a/airflow/settings.py b/airflow/settings.py
index 8f8420ea22..8691fe4e75 100644
--- a/airflow/settings.py
+++ b/airflow/settings.py
@@ -261,7 +261,7 @@ def configure_action_logging():
except Exception:
pass
-configure_logging()
+logging_class_path = configure_logging()
configure_vars()
configure_adapters()
# The webservers import this file from models.py with the default settings.
diff --git a/airflow/utils/dag_processing.py b/airflow/utils/dag_processing.py
index 0fe9e917ef..6425f77745 100644
--- a/airflow/utils/dag_processing.py
+++ b/airflow/utils/dag_processing.py
@@ -34,6 +34,7 @@
from collections import defaultdict
from collections import namedtuple
from datetime import timedelta
+from importlib import import_module
import psutil
from six.moves import range, reload_module
@@ -45,6 +46,7 @@
from airflow import configuration as conf
from airflow.dag.base_dag import BaseDag, BaseDagBag
from airflow.exceptions import AirflowException
+from airflow.settings import logging_class_path
from airflow.utils import timezone
from airflow.utils.db import provide_session
from airflow.utils.log.logging_mixin import LoggingMixin
@@ -539,7 +541,9 @@ def helper():
# e.g. RotatingFileHandler. And it can cause connection corruption if we
# do not recreate the SQLA connection pool.
os.environ['CONFIG_PROCESSOR_MANAGER_LOGGER'] = 'True'
- reload_module(airflow.config_templates.airflow_local_settings)
+ # Replicating the behavior of how logging module was loaded
+ # in logging_config.py
+ reload_module(import_module(logging_class_path.rsplit('.', 1)[0]))
reload_module(airflow.settings)
del os.environ['CONFIG_PROCESSOR_MANAGER_LOGGER']
processor_manager = DagFileProcessorManager(dag_directory,
diff --git a/tests/utils/test_dag_processing.py b/tests/utils/test_dag_processing.py
index 4043caec56..9c85b66c57 100644
--- a/tests/utils/test_dag_processing.py
+++ b/tests/utils/test_dag_processing.py
@@ -18,12 +18,15 @@
# under the License.
import os
+import sys
+import tempfile
import unittest
from datetime import timedelta
from mock import MagicMock
from airflow import configuration as conf
+from airflow.configuration import mkdir_p
from airflow.jobs import DagFileProcessor
from airflow.jobs import LocalTaskJob as LJ
from airflow.models import DagBag, TaskInstance as TI
@@ -38,6 +41,96 @@
DEFAULT_DATE = timezone.datetime(2016, 1, 1)
+SETTINGS_FILE_VALID = """
+LOGGING_CONFIG = {
+ 'version': 1,
+ 'disable_existing_loggers': False,
+ 'formatters': {
+ 'airflow.task': {
+ 'format': '[%%(asctime)s] {{%%(filename)s:%%(lineno)d}} %%(levelname)s - %%(message)s'
+ },
+ },
+ 'handlers': {
+ 'console': {
+ 'class': 'logging.StreamHandler',
+ 'formatter': 'airflow.task',
+ 'stream': 'ext://sys.stdout'
+ },
+ 'task': {
+ 'class': 'logging.StreamHandler',
+ 'formatter': 'airflow.task',
+ 'stream': 'ext://sys.stdout'
+ },
+ },
+ 'loggers': {
+ 'airflow': {
+ 'handlers': ['console'],
+ 'level': 'INFO',
+ 'propagate': False
+ },
+ 'airflow.task': {
+ 'handlers': ['task'],
+ 'level': 'INFO',
+ 'propagate': False,
+ },
+ }
+}
+"""
+
+SETTINGS_DEFAULT_NAME = 'custom_airflow_local_settings'
+
+
+class settings_context(object):
+ """
+ Sets a settings file and puts it in the Python classpath
+
+ :param content:
+ The content of the settings file
+ """
+
+ def __init__(self, content, dir=None, name='LOGGING_CONFIG'):
+ self.content = content
+ self.settings_root = tempfile.mkdtemp()
+ filename = "{}.py".format(SETTINGS_DEFAULT_NAME)
+
+ if dir:
+ # Replace slashes by dots
+ self.module = dir.replace('/', '.') + '.' + SETTINGS_DEFAULT_NAME + '.' + name
+
+ # Create the directory structure
+ dir_path = os.path.join(self.settings_root, dir)
+ mkdir_p(dir_path)
+
+ # Add the __init__ for the directories
+ # This is required for Python 2.7
+ basedir = self.settings_root
+ for part in dir.split('/'):
+ open(os.path.join(basedir, '__init__.py'), 'w').close()
+ basedir = os.path.join(basedir, part)
+ open(os.path.join(basedir, '__init__.py'), 'w').close()
+
+ self.settings_file = os.path.join(dir_path, filename)
+ else:
+ self.module = SETTINGS_DEFAULT_NAME + '.' + name
+ self.settings_file = os.path.join(self.settings_root, filename)
+
+ def __enter__(self):
+ with open(self.settings_file, 'w') as handle:
+ handle.writelines(self.content)
+ sys.path.append(self.settings_root)
+ conf.set(
+ 'core',
+ 'logging_config_class',
+ self.module
+ )
+ return self.settings_file
+
+ def __exit__(self, *exc_info):
+ # shutil.rmtree(self.settings_root)
+ # Reset config
+ conf.set('core', 'logging_config_class', '')
+ sys.path.remove(self.settings_root)
+
class TestDagFileProcessorManager(unittest.TestCase):
def test_set_file_paths_when_processor_file_path_not_in_new_file_paths(self):
@@ -123,6 +216,54 @@ def test_find_zombies(self):
class TestDagFileProcessorAgent(unittest.TestCase):
+ def test_reload_module(self):
+ """
+ Configure the context to have core.logging_config_class set to a fake logging
+ class path, thus when reloading logging module the airflow.processor_manager
+ logger should not be configured.
+ """
+ with settings_context(SETTINGS_FILE_VALID):
+ # Launch a process through DagFileProcessorAgent, which will try
+ # reload the logging module.
+ def processor_factory(file_path, zombies):
+ return DagFileProcessor(file_path,
+ False,
+ [],
+ zombies)
+
+ test_dag_path = os.path.join(TEST_DAG_FOLDER, 'test_scheduler_dags.py')
+ async_mode = 'sqlite' not in conf.get('core', 'sql_alchemy_conn')
+
+ log_file_loc = conf.get('core', 'DAG_PROCESSOR_MANAGER_LOG_LOCATION')
+ try:
+ os.remove(log_file_loc)
+ except OSError:
+ pass
+
+ # Starting dag processing with 0 max_runs to avoid redundant operations.
+ processor_agent = DagFileProcessorAgent(test_dag_path,
+ [],
+ 0,
+ processor_factory,
+ async_mode)
+ manager_process = \
+ processor_agent._launch_process(processor_agent._dag_directory,
+ processor_agent._file_paths,
+ processor_agent._max_runs,
+ processor_agent._processor_factory,
+ processor_agent._child_signal_conn,
+ processor_agent._stat_queue,
+ processor_agent._result_queue,
+ processor_agent._async_mode)
+ if not async_mode:
+ processor_agent.heartbeat()
+
+ manager_process.join()
+
+ # Since we are reloading logging config not creating this file,
+ # we should expect it to be nonexistent.
+ self.assertFalse(os.path.isfile(log_file_loc))
+
def test_parse_once(self):
def processor_factory(file_path, zombies):
return DagFileProcessor(file_path,
@@ -164,7 +305,7 @@ def processor_factory(file_path, zombies):
except OSError:
pass
- # Starting dag processing with 0 max_runs to avoid redundent operations.
+ # Starting dag processing with 0 max_runs to avoid redundant operations.
processor_agent = DagFileProcessorAgent(test_dag_path,
[],
0,
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
With regards,
Apache Git Services