You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aurora.apache.org by js...@apache.org on 2016/07/01 19:35:16 UTC
aurora git commit: Fix Process log configuration handling.
Repository: aurora
Updated Branches:
refs/heads/master b3f88bc9c -> 205c308c5
Fix Process log configuration handling.
Previously flagged configuration of Process logging mode would
blow up and claimed defaulting of the rotation policy did not
occur.
Bugs closed: AURORA-1724
Reviewed at https://reviews.apache.org/r/49399/
Project: http://git-wip-us.apache.org/repos/asf/aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/aurora/commit/205c308c
Tree: http://git-wip-us.apache.org/repos/asf/aurora/tree/205c308c
Diff: http://git-wip-us.apache.org/repos/asf/aurora/diff/205c308c
Branch: refs/heads/master
Commit: 205c308c5463944b7c86c82a3e90fa66b7581d6d
Parents: b3f88bc
Author: John Sirois <js...@apache.org>
Authored: Fri Jul 1 13:35:11 2016 -0600
Committer: John Sirois <jo...@gmail.com>
Committed: Fri Jul 1 13:35:11 2016 -0600
----------------------------------------------------------------------
docs/operations/configuration.md | 9 +-
docs/reference/configuration.md | 33 ++-
.../executor/bin/thermos_executor_main.py | 21 +-
src/main/python/apache/thermos/core/runner.py | 40 ++--
.../python/apache/thermos/testing/runner.py | 16 +-
src/test/python/apache/thermos/core/BUILD | 2 +
.../thermos/core/test_runner_log_config.py | 230 +++++++++++++++++++
7 files changed, 293 insertions(+), 58 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/aurora/blob/205c308c/docs/operations/configuration.md
----------------------------------------------------------------------
diff --git a/docs/operations/configuration.md b/docs/operations/configuration.md
index e332f86..0615c54 100644
--- a/docs/operations/configuration.md
+++ b/docs/operations/configuration.md
@@ -103,10 +103,10 @@ Maximum number of backups to retain before deleting the oldest backup(s).
## Process Logs
### Log destination
-By default, Thermos will write process stdout/stderr to log files in the sandbox. Process object configuration
-allows specifying alternate log file destinations like streamed stdout/stderr or suppression of all log output.
-Default behavior can be configured for the entire cluster with the following flag (through the `-thermos_executor_flags`
-argument to the Aurora scheduler):
+By default, Thermos will write process stdout/stderr to log files in the sandbox. Process object
+configuration allows specifying alternate log file destinations like streamed stdout/stderr or
+suppression of all log output. Default behavior can be configured for the entire cluster with the
+following flag (through the `-thermos_executor_flags` argument to the Aurora scheduler):
--runner-logger-destination=both
@@ -130,7 +130,6 @@ reach 100 MiB in size and keep a maximum of 10 backups. If a user has provided a
their process, it will override these default settings.
-
## Thermos Executor Wrapper
If you need to do computation before starting the thermos executor (for example, setting a different
http://git-wip-us.apache.org/repos/asf/aurora/blob/205c308c/docs/reference/configuration.md
----------------------------------------------------------------------
diff --git a/docs/reference/configuration.md b/docs/reference/configuration.md
index c4b1d38..64c076d 100644
--- a/docs/reference/configuration.md
+++ b/docs/reference/configuration.md
@@ -121,30 +121,27 @@ schedule.
#### logger
-The default behavior of Thermos is to store stderr/stdout logs in files which grow unbounded.
-In the event that you have large log volume, you may want to configure Thermos to automatically rotate logs
-after they grow to a certain size, which can prevent your job from using more than its allocated
-disk space.
-
-A Logger union consists of a destination enum, a mode enum and a rotation policy.
-It's to set where the process logs should be sent using `destination`. Default
-option is `file`. Its also possible to specify `console` to get logs output
-to stdout/stderr, `none` to suppress any logs output or `both` to send logs to files and
-console output. In case of using `none` or `console` rotation attributes are ignored.
-Rotation policies only apply to loggers whose mode is `rotate`. The acceptable values
-for the LoggerMode enum are `standard` and `rotate`. The rotation policy applies to both
-stderr and stdout.
-
-By default, all processes use the `standard` LoggerMode.
+The default behavior of Thermos is to store stderr/stdout logs in files which grow unbounded.
+In the event that you have large log volume, you may want to configure Thermos to automatically
+rotate logs after they grow to a certain size, which can prevent your job from using more than its
+allocated disk space.
+
+Logger objects specify a `destination` for Process logs which is, by default, `file` - a pair of
+`stdout` and `stderr` files. Its also possible to specify `console` to get logs output to
+the Process stdout and stderr streams, `none` to suppress any logs output or `both` to send logs to
+files and console streams.
+
+The default Logger `mode` is `standard` which lets the stdout and stderr streams grow without bound.
**Attribute Name** | **Type** | **Description**
------------------- | :---------------: | ---------------------------------
**destination** | LoggerDestination | Destination of logs. (Default: `file`)
**mode** | LoggerMode | Mode of the logger. (Default: `standard`)
- **rotate** | RotatePolicy | An optional rotation policy.
+ **rotate** | RotatePolicy | An optional rotation policy. (Default: `Empty`)
-A RotatePolicy describes log rotation behavior for when `mode` is set to `rotate`. It is ignored
-otherwise.
+A RotatePolicy describes log rotation behavior for when `mode` is set to `rotate` and it is ignored
+otherwise. If `rotate` is `Empty` or `RotatePolicy()` when the `mode` is set to `rotate` the
+defaults below are used.
**Attribute Name** | **Type** | **Description**
------------------- | :----------: | ---------------------------------
http://git-wip-us.apache.org/repos/asf/aurora/blob/205c308c/src/main/python/apache/aurora/executor/bin/thermos_executor_main.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/aurora/executor/bin/thermos_executor_main.py b/src/main/python/apache/aurora/executor/bin/thermos_executor_main.py
index 203fc47..0ef3856 100644
--- a/src/main/python/apache/aurora/executor/bin/thermos_executor_main.py
+++ b/src/main/python/apache/aurora/executor/bin/thermos_executor_main.py
@@ -55,9 +55,6 @@ LogOptions.set_simple(True)
LogOptions.set_disk_log_level('DEBUG')
LogOptions.set_log_dir(CWD)
-_LOGGER_DESTINATIONS = ', '.join(LoggerDestination.VALUES)
-_LOGGER_MODES = ', '.join(LoggerMode.VALUES)
-
app.add_option(
'--announcer-ensemble',
@@ -88,8 +85,7 @@ app.add_option(
type=str,
default=None,
help='Set hostname to be announced. By default it is'
- 'the --hostname argument passed into the Mesos agent.'
-)
+ 'the --hostname argument passed into the Mesos agent.')
app.add_option(
'--announcer-zookeeper-auth-config',
@@ -118,25 +114,22 @@ app.add_option(
dest='nosetuid_health_checks',
action="store_true",
help='If set, the executor will not run shell health checks as job\'s role\'s user',
- default=False
-)
+ default=False)
app.add_option(
'--runner-logger-destination',
dest='runner_logger_destination',
- type=str,
- default='file',
- help='The logger destination [%s] to use for all processes run by thermos.'
- % _LOGGER_DESTINATIONS)
+ choices=LoggerDestination.VALUES,
+ help='The logger destination %r to use for all processes run by thermos.'
+ % (LoggerDestination.VALUES,))
app.add_option(
'--runner-logger-mode',
dest='runner_logger_mode',
- type=str,
- default=None,
- help='The logger mode [%s] to use for all processes run by thermos.' % _LOGGER_MODES)
+ choices=LoggerMode.VALUES,
+ help='The logger mode %r to use for all processes run by thermos.' % (LoggerMode.VALUES,))
app.add_option(
http://git-wip-us.apache.org/repos/asf/aurora/blob/205c308c/src/main/python/apache/thermos/core/runner.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/thermos/core/runner.py b/src/main/python/apache/thermos/core/runner.py
index 3ebf86e..fe971ed 100644
--- a/src/main/python/apache/thermos/core/runner.py
+++ b/src/main/python/apache/thermos/core/runner.py
@@ -66,11 +66,11 @@ from apache.thermos.config.loader import (
ThermosTaskValidator,
ThermosTaskWrapper
)
-from apache.thermos.config.schema import ThermosContext
+from apache.thermos.config.schema import Logger, RotatePolicy, ThermosContext
from .helper import TaskRunnerHelper
from .muxer import ProcessMuxer
-from .process import LoggerDestination, LoggerMode, Process
+from .process import LoggerMode, Process
from gen.apache.thermos.ttypes import (
ProcessState,
@@ -722,6 +722,9 @@ class TaskRunner(object):
rotate_log_backups=rotate_log_backups,
preserve_env=self._preserve_env)
+ _DEFAULT_LOGGER = Logger()
+ _DEFAULT_ROTATION = RotatePolicy()
+
def _build_process_logger_args(self, process):
"""
Build the appropriate logging configuration based on flags + process
@@ -730,27 +733,36 @@ class TaskRunner(object):
If no configuration (neither flags nor process config), default to
"standard" mode.
"""
- destination, mode, size, backups = None, None, None, None
+
+ destination, mode, size, backups = (self._DEFAULT_LOGGER.destination().get(),
+ self._DEFAULT_LOGGER.mode().get(),
+ None,
+ None)
+
logger = process.logger()
if logger is Empty:
if self._process_logger_destination:
destination = self._process_logger_destination
- else:
- destination = LoggerDestination.FILE
-
if self._process_logger_mode:
- mode = self._process_logger_mode,
- size = Amount(self._rotate_log_size_mb, Data.MB)
- backups = self._rotate_log_backups
- else:
- mode = LoggerMode.STANDARD
+ mode = self._process_logger_mode
else:
destination = logger.destination().get()
mode = logger.mode().get()
- if mode == LoggerMode.ROTATE:
+
+ if mode == LoggerMode.ROTATE:
+ size = Amount(self._DEFAULT_ROTATION.log_size().get(), Data.BYTES)
+ backups = self._DEFAULT_ROTATION.backups().get()
+ if logger is Empty:
+ if self._rotate_log_size_mb:
+ size = Amount(self._rotate_log_size_mb, Data.MB)
+ if self._rotate_log_backups:
+ backups = self._rotate_log_backups
+ else:
rotate = logger.rotate()
- size = Amount(rotate.log_size().get(), Data.BYTES)
- backups = rotate.backups().get()
+ if rotate is not Empty:
+ size = Amount(rotate.log_size().get(), Data.BYTES)
+ backups = rotate.backups().get()
+
return destination, mode, size, backups
def deadlocked(self, plan=None):
http://git-wip-us.apache.org/repos/asf/aurora/blob/205c308c/src/main/python/apache/thermos/testing/runner.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/thermos/testing/runner.py b/src/main/python/apache/thermos/testing/runner.py
index 8b6ba73..0560891 100644
--- a/src/main/python/apache/thermos/testing/runner.py
+++ b/src/main/python/apache/thermos/testing/runner.py
@@ -70,10 +70,8 @@ class AngryHandler(TaskRunnerUniversalHandler):
sys.exit(1)
sandbox = os.path.join('%(sandbox)s', '%(task_id)s')
-args = {}
+args = %(extra_task_runner_args)r
args['task_id'] = '%(task_id)s'
-if %(portmap)s:
- args['portmap'] = %(portmap)s
args['universal_handler'] = AngryHandler
runner = TaskRunner(task, '%(root)s', sandbox, **args)
@@ -83,7 +81,7 @@ with open('%(state_filename)s', 'w') as fp:
fp.write(thrift_serialize(runner.state))
"""
- def __init__(self, task, portmap={}, success_rate=100, random_seed=31337):
+ def __init__(self, task, success_rate=100, random_seed=31337, **extra_task_runner_args):
"""
task = Thermos task
portmap = port map
@@ -99,7 +97,7 @@ with open('%(state_filename)s', 'w') as fp:
self.tempdir = tempfile.mkdtemp()
self.task_id = '%s-runner-base' % int(time.time() * 1000000)
self.sandbox = os.path.join(self.tempdir, 'sandbox')
- self.portmap = portmap
+ self.extra_task_runner_args = extra_task_runner_args
self.cleaned = False
self.pathspec = TaskPath(root=self.tempdir, task_id=self.task_id)
self.script_filename = None
@@ -130,9 +128,9 @@ with open('%(state_filename)s', 'w') as fp:
'root': self.tempdir,
'task_id': self.task_id,
'state_filename': self.state_filename,
- 'portmap': repr(self.portmap),
'success_rate': self.success_rate,
'random_seed': self.random_seed + self._run_count,
+ 'extra_task_runner_args': self.extra_task_runner_args,
})
with environment_as(PYTHONPATH=os.pathsep.join(sys.path)):
@@ -193,12 +191,16 @@ with open('%(state_filename)s', 'w') as fp:
class RunnerTestBase(object):
@classmethod
+ def extra_task_runner_args(cls):
+ return dict(portmap=getattr(cls, 'portmap', {}))
+
+ @classmethod
def task(cls):
raise NotImplementedError
@classmethod
def setup_class(cls):
- cls.runner = Runner(cls.task(), portmap=getattr(cls, 'portmap', {}))
+ cls.runner = Runner(cls.task(), **cls.extra_task_runner_args())
cls.runner.run()
cls.state = cls.runner.state
http://git-wip-us.apache.org/repos/asf/aurora/blob/205c308c/src/test/python/apache/thermos/core/BUILD
----------------------------------------------------------------------
diff --git a/src/test/python/apache/thermos/core/BUILD b/src/test/python/apache/thermos/core/BUILD
index acfb79b..957a116 100644
--- a/src/test/python/apache/thermos/core/BUILD
+++ b/src/test/python/apache/thermos/core/BUILD
@@ -19,7 +19,9 @@ python_tests(
'3rdparty/python:mock',
'3rdparty/python:psutil',
'3rdparty/python:twitter.common.contextutil',
+ '3rdparty/python:twitter.common.quantity',
'3rdparty/python:twitter.common.process',
+ 'src/main/python/apache/thermos/config',
'src/main/python/apache/thermos/core',
'src/main/python/apache/thermos/monitoring',
'src/main/python/apache/thermos/testing',
http://git-wip-us.apache.org/repos/asf/aurora/blob/205c308c/src/test/python/apache/thermos/core/test_runner_log_config.py
----------------------------------------------------------------------
diff --git a/src/test/python/apache/thermos/core/test_runner_log_config.py b/src/test/python/apache/thermos/core/test_runner_log_config.py
new file mode 100644
index 0000000..4381296
--- /dev/null
+++ b/src/test/python/apache/thermos/core/test_runner_log_config.py
@@ -0,0 +1,230 @@
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import os
+import struct
+from collections import namedtuple
+
+from twitter.common.quantity import Amount, Data
+
+from apache.thermos.config.schema_base import Logger, Process, RotatePolicy, Task
+from apache.thermos.testing.runner import RunnerTestBase
+
+
+class LogConfig(namedtuple('LogConfig', ['destination', 'mode', 'size_mb', 'backups'])):
+ @classmethod
+ def create(cls, destination=None, mode=None, size_mb=None, backups=None):
+ return cls(destination=destination, mode=mode, size_mb=size_mb, backups=backups)
+
+
+class RunnerLogConfigTestBase(RunnerTestBase):
+ _OPT_CONFIG_PROCESS_NAME = 'opt-log-config-process'
+ _CUSTOM_CONFIG_PROCESS_NAME = 'custom-log-config-process'
+
+ @classmethod
+ def opt_config(cls):
+ return LogConfig.create()
+
+ @classmethod
+ def custom_config(cls):
+ return None
+
+ @classmethod
+ def log_size(cls):
+ return Amount(1, Data.BYTES)
+
+ STDOUT = 1
+ STDERR = 2
+
+ @classmethod
+ def log_fd(cls):
+ return cls.STDOUT
+
+ @classmethod
+ def extra_task_runner_args(cls):
+ opt_config = cls.opt_config()
+ return dict(process_logger_destination=opt_config.destination,
+ process_logger_mode=opt_config.mode,
+ rotate_log_size_mb=opt_config.size_mb,
+ rotate_log_backups=opt_config.backups)
+
+ @classmethod
+ def task(cls):
+ cmdline = 'head -c %d /dev/zero >&%d' % (cls.log_size().as_(Data.BYTES), cls.log_fd())
+
+ opt_config_process = Process(name=cls._OPT_CONFIG_PROCESS_NAME, cmdline=cmdline)
+
+ custom_config_process = Process(name=cls._CUSTOM_CONFIG_PROCESS_NAME, cmdline=cmdline)
+ custom_config = cls.custom_config()
+ if custom_config:
+ logger = Logger()
+ if custom_config.destination:
+ logger = logger(destination=custom_config.destination)
+ if custom_config.mode:
+ logger = logger(mode=custom_config.mode)
+ if custom_config.size_mb or custom_config.backups:
+ rotate = RotatePolicy()
+ if custom_config.size_mb:
+ rotate = rotate(log_size=custom_config.size_mb)
+ if custom_config.backups:
+ rotate = rotate(backups=custom_config.backups)
+ logger = logger(rotate=rotate)
+ custom_config_process = custom_config_process(logger=logger)
+
+ return Task(name='log-config-task', processes=[opt_config_process, custom_config_process])
+
+ class Assert(object):
+ def __init__(self, log_file_dir):
+ self._log_file_dir = log_file_dir
+
+ def log_file_names(self, *names):
+ actual_names = os.listdir(self._log_file_dir)
+ assert set(actual_names) == set(names)
+ assert all(os.path.isfile(os.path.join(self._log_file_dir, name)) for name in names)
+
+ def log_file(self, name):
+ expected_output_file = os.path.join(self._log_file_dir, name)
+ assert os.path.exists(expected_output_file)
+ with open(expected_output_file, 'rb') as fp:
+ actual = fp.read()
+ size = len(actual)
+ assert actual == struct.pack('B', 0) * size
+ return size
+
+ def empty_log_file(self, name):
+ size = self.log_file(name=name)
+ assert 0 == size
+
+ def _log_dir_name(self, process_name):
+ return os.path.join(self.state.header.log_dir, process_name, '0')
+
+ @property
+ def opt_assert(self):
+ return self.Assert(self._log_dir_name(self._OPT_CONFIG_PROCESS_NAME))
+
+ @property
+ def custom_assert(self):
+ return self.Assert(self._log_dir_name(self._CUSTOM_CONFIG_PROCESS_NAME))
+
+
+class StandardTestBase(RunnerLogConfigTestBase):
+ @classmethod
+ def log_size(cls):
+ return Amount(200, Data.MB)
+
+ def test_log_config(self):
+ log, empty = ('stdout', 'stderr') if self.log_fd() == self.STDOUT else ('stderr', 'stdout')
+ for assertions in self.opt_assert, self.custom_assert:
+ assertions.log_file_names(log, empty)
+ assertions.empty_log_file(name=empty)
+
+ # No rotation should occur in standard mode.
+ size = assertions.log_file(name=log)
+ assert size == self.log_size().as_(Data.BYTES)
+
+
+class TestStandardStdout(StandardTestBase):
+ @classmethod
+ def log_fd(cls):
+ return cls.STDOUT
+
+
+class TestStandardStderr(StandardTestBase):
+ @classmethod
+ def log_fd(cls):
+ return cls.STDERR
+
+
+class RotateTestBase(RunnerLogConfigTestBase):
+ @classmethod
+ def opt_config(cls):
+ return LogConfig.create(mode='rotate', size_mb=1, backups=1)
+
+
+class TestRotateUnderStdout(RotateTestBase):
+ def test_log_config(self):
+ self.opt_assert.log_file_names('stdout', 'stderr')
+ self.opt_assert.empty_log_file(name='stderr')
+ self.opt_assert.log_file(name='stdout')
+
+
+class TestRotateUnderStderr(RotateTestBase):
+ @classmethod
+ def log_fd(cls):
+ return cls.STDERR
+
+ def test_log_config(self):
+ self.opt_assert.log_file_names('stdout', 'stderr')
+ self.opt_assert.empty_log_file(name='stdout')
+ self.opt_assert.log_file(name='stderr')
+
+
+class TestRotateOverStdout(RotateTestBase):
+ @classmethod
+ def log_size(cls):
+ return Amount(2, Data.MB)
+
+ def test_log_config(self):
+ self.opt_assert.log_file_names('stdout', 'stdout.1', 'stderr')
+ self.opt_assert.empty_log_file(name='stderr')
+ self.opt_assert.log_file(name='stdout')
+ self.opt_assert.log_file(name='stdout.1')
+
+
+class TestRotateOverStderr(RotateTestBase):
+ @classmethod
+ def log_size(cls):
+ return Amount(3, Data.MB)
+
+ @classmethod
+ def log_fd(cls):
+ return cls.STDERR
+
+ def test_log_config(self):
+ self.opt_assert.log_file_names('stdout', 'stderr', 'stderr.1')
+ self.opt_assert.empty_log_file(name='stdout')
+ self.opt_assert.log_file(name='stderr')
+ self.opt_assert.log_file(name='stderr.1')
+
+
+class TestRotateDefaulted(RunnerLogConfigTestBase):
+ @classmethod
+ def opt_config(cls):
+ return LogConfig.create(mode='rotate')
+
+ @classmethod
+ def custom_config(cls):
+ return LogConfig.create(mode='rotate')
+
+ @classmethod
+ def log_size(cls):
+ # Default rotation policy is 100MiB with 5 backups so this guarantees a full rotation.
+ return Amount(700, Data.MB)
+
+ def test_log_config(self):
+ for assertions in self.opt_assert, self.custom_assert:
+ assertions.log_file_names('stderr',
+ 'stdout',
+ 'stdout.1',
+ 'stdout.2',
+ 'stdout.3',
+ 'stdout.4',
+ 'stdout.5')
+ assertions.empty_log_file(name='stderr')
+ assertions.log_file(name='stdout')
+ assertions.log_file(name='stdout.1')
+ assertions.log_file(name='stdout.2')
+ assertions.log_file(name='stdout.3')
+ assertions.log_file(name='stdout.4')
+ assertions.log_file(name='stdout.5')