You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by bo...@apache.org on 2017/12/10 14:18:47 UTC
incubator-airflow git commit: [AIRFLOW-1878] Fix stderr/stdout
redirection for tasks
Repository: incubator-airflow
Updated Branches:
refs/heads/master 9731ce6fa -> 02ff8ae35
[AIRFLOW-1878] Fix stderr/stdout redirection for tasks
logging.StreamHandler keeps a reference to the
initial stream
it has been assigned. This prevents redirection
after initalization
to a logging facility.
Closes #2836 from bolkedebruin/AIRFLOW-1878
Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/02ff8ae3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/02ff8ae3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/02ff8ae3
Branch: refs/heads/master
Commit: 02ff8ae35dd16e6f23d29d7b24a5fb9c09d0b7a4
Parents: 9731ce6
Author: Bolke de Bruin <bo...@xs4all.nl>
Authored: Sun Dec 10 15:18:38 2017 +0100
Committer: Bolke de Bruin <bo...@xs4all.nl>
Committed: Sun Dec 10 15:18:38 2017 +0100
----------------------------------------------------------------------
airflow/bin/cli.py | 2 +-
.../config_templates/airflow_local_settings.py | 4 +--
airflow/utils/log/logging_mixin.py | 29 ++++++++++++++++++++
3 files changed, 32 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/02ff8ae3/airflow/bin/cli.py
----------------------------------------------------------------------
diff --git a/airflow/bin/cli.py b/airflow/bin/cli.py
index 782e58d..1367362 100755
--- a/airflow/bin/cli.py
+++ b/airflow/bin/cli.py
@@ -367,7 +367,7 @@ def run(args, dag=None):
hostname = socket.getfqdn()
log.info("Running %s on host %s", ti, hostname)
- with redirect_stdout(log, logging.INFO), redirect_stderr(log, logging.WARN):
+ with redirect_stdout(ti.log, logging.INFO), redirect_stderr(ti.log, logging.WARN):
if args.local:
run_job = jobs.LocalTaskJob(
task_instance=ti,
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/02ff8ae3/airflow/config_templates/airflow_local_settings.py
----------------------------------------------------------------------
diff --git a/airflow/config_templates/airflow_local_settings.py b/airflow/config_templates/airflow_local_settings.py
index 020df8d..aa5b8da 100644
--- a/airflow/config_templates/airflow_local_settings.py
+++ b/airflow/config_templates/airflow_local_settings.py
@@ -43,9 +43,9 @@ DEFAULT_LOGGING_CONFIG = {
},
'handlers': {
'console': {
- 'class': 'logging.StreamHandler',
+ 'class': 'airflow.utils.log.logging_mixin.RedirectStdHandler',
'formatter': 'airflow.task',
- 'stream': 'ext://sys.stdout'
+ 'stream': 'sys.stdout'
},
'file.task': {
'class': 'airflow.utils.log.file_task_handler.FileTaskHandler',
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/02ff8ae3/airflow/utils/log/logging_mixin.py
----------------------------------------------------------------------
diff --git a/airflow/utils/log/logging_mixin.py b/airflow/utils/log/logging_mixin.py
index 03437bf..fb3b85f 100644
--- a/airflow/utils/log/logging_mixin.py
+++ b/airflow/utils/log/logging_mixin.py
@@ -21,8 +21,11 @@ import logging
import sys
import warnings
+import six
+
from builtins import object
from contextlib import contextmanager
+from logging import Handler, StreamHandler
class LoggingMixin(object):
@@ -103,6 +106,32 @@ class StreamLogWriter(object):
return False
+class RedirectStdHandler(StreamHandler):
+ """
+ This class is like a StreamHandler using sys.stderr/stdout, but always uses
+ whatever sys.stderr/stderr is currently set to rather than the value of
+ sys.stderr/stdout at handler construction time.
+ """
+ def __init__(self, stream):
+ if not isinstance(stream, six.string_types):
+ raise Exception("Cannot use file like objects. Use 'stdout' or 'stderr'"
+ " as a str and without 'ext://'.")
+
+ self._use_stderr = True
+ if 'stdout' in stream:
+ self._use_stderr = False
+
+ # StreamHandler tries to set self.stream
+ Handler.__init__(self)
+
+ @property
+ def stream(self):
+ if self._use_stderr:
+ return sys.stderr
+
+ return sys.stdout
+
+
@contextmanager
def redirect_stdout(logger, level):
writer = StreamLogWriter(logger, level)