You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aurora.apache.org by js...@apache.org on 2016/07/01 19:35:16 UTC

aurora git commit: Fix Process log configuration handling.

Repository: aurora
Updated Branches:
  refs/heads/master b3f88bc9c -> 205c308c5


Fix Process log configuration handling.

Previously flagged configuration of Process logging mode would
blow up and claimed defaulting of the rotation policy did not
occur.

Bugs closed: AURORA-1724

Reviewed at https://reviews.apache.org/r/49399/


Project: http://git-wip-us.apache.org/repos/asf/aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/aurora/commit/205c308c
Tree: http://git-wip-us.apache.org/repos/asf/aurora/tree/205c308c
Diff: http://git-wip-us.apache.org/repos/asf/aurora/diff/205c308c

Branch: refs/heads/master
Commit: 205c308c5463944b7c86c82a3e90fa66b7581d6d
Parents: b3f88bc
Author: John Sirois <js...@apache.org>
Authored: Fri Jul 1 13:35:11 2016 -0600
Committer: John Sirois <jo...@gmail.com>
Committed: Fri Jul 1 13:35:11 2016 -0600

----------------------------------------------------------------------
 docs/operations/configuration.md                |   9 +-
 docs/reference/configuration.md                 |  33 ++-
 .../executor/bin/thermos_executor_main.py       |  21 +-
 src/main/python/apache/thermos/core/runner.py   |  40 ++--
 .../python/apache/thermos/testing/runner.py     |  16 +-
 src/test/python/apache/thermos/core/BUILD       |   2 +
 .../thermos/core/test_runner_log_config.py      | 230 +++++++++++++++++++
 7 files changed, 293 insertions(+), 58 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/aurora/blob/205c308c/docs/operations/configuration.md
----------------------------------------------------------------------
diff --git a/docs/operations/configuration.md b/docs/operations/configuration.md
index e332f86..0615c54 100644
--- a/docs/operations/configuration.md
+++ b/docs/operations/configuration.md
@@ -103,10 +103,10 @@ Maximum number of backups to retain before deleting the oldest backup(s).
 ## Process Logs
 
 ### Log destination
-By default, Thermos will write process stdout/stderr to log files in the sandbox. Process object configuration
-allows specifying alternate log file destinations like streamed stdout/stderr or suppression of all log output.
-Default behavior can be configured for the entire cluster with the following flag (through the `-thermos_executor_flags`
-argument to the Aurora scheduler):
+By default, Thermos will write process stdout/stderr to log files in the sandbox. Process object
+configuration allows specifying alternate log file destinations like streamed stdout/stderr or
+suppression of all log output. Default behavior can be configured for the entire cluster with the
+following flag (through the `-thermos_executor_flags` argument to the Aurora scheduler):
 
     --runner-logger-destination=both
 
@@ -130,7 +130,6 @@ reach 100 MiB in size and keep a maximum of 10 backups. If a user has provided a
 their process, it will override these default settings.
 
 
-
 ## Thermos Executor Wrapper
 
 If you need to do computation before starting the thermos executor (for example, setting a different

http://git-wip-us.apache.org/repos/asf/aurora/blob/205c308c/docs/reference/configuration.md
----------------------------------------------------------------------
diff --git a/docs/reference/configuration.md b/docs/reference/configuration.md
index c4b1d38..64c076d 100644
--- a/docs/reference/configuration.md
+++ b/docs/reference/configuration.md
@@ -121,30 +121,27 @@ schedule.
 
 #### logger
 
-The default behavior of Thermos is to store  stderr/stdout logs in files which grow unbounded.
-In the event that you have large log volume, you may want to configure Thermos to automatically rotate logs
-after they grow to a certain size, which can prevent your job from using more than its allocated
-disk space.
-
-A Logger union consists of a destination enum, a mode enum and a rotation policy.
-It's to set where the process logs should be sent using `destination`. Default
-option is `file`. Its also possible to specify `console` to get logs output
-to stdout/stderr, `none` to suppress any logs output or `both` to send logs to files and
-console output. In case of using `none` or `console` rotation attributes are ignored.
-Rotation policies only apply to loggers whose mode is `rotate`. The acceptable values
-for the LoggerMode enum are `standard` and `rotate`. The rotation policy applies to both
-stderr and stdout.
-
-By default, all processes use the `standard` LoggerMode.
+The default behavior of Thermos is to store stderr/stdout logs in files which grow unbounded.
+In the event that you have large log volume, you may want to configure Thermos to automatically
+rotate logs after they grow to a certain size, which can prevent your job from using more than its
+allocated disk space.
+
+Logger objects specify a `destination` for Process logs which is, by default, `file` - a pair of
+`stdout` and `stderr` files. Its also possible to specify `console` to get logs output to
+the Process stdout and stderr streams, `none` to suppress any logs output or `both` to send logs to
+files and console streams.
+
+The default Logger `mode` is `standard` which lets the stdout and stderr streams grow without bound.
 
   **Attribute Name**  | **Type**          | **Description**
   ------------------- | :---------------: | ---------------------------------
    **destination**    | LoggerDestination | Destination of logs. (Default: `file`)
    **mode**           | LoggerMode        | Mode of the logger. (Default: `standard`)
-   **rotate**         | RotatePolicy      | An optional rotation policy.
+   **rotate**         | RotatePolicy      | An optional rotation policy. (Default: `Empty`)
 
-A RotatePolicy describes log rotation behavior for when `mode` is set to `rotate`. It is ignored
-otherwise.
+A RotatePolicy describes log rotation behavior for when `mode` is set to `rotate` and it is ignored
+otherwise. If `rotate` is `Empty` or `RotatePolicy()` when the `mode` is set to `rotate` the
+defaults below are used.
 
   **Attribute Name**  | **Type**     | **Description**
   ------------------- | :----------: | ---------------------------------

http://git-wip-us.apache.org/repos/asf/aurora/blob/205c308c/src/main/python/apache/aurora/executor/bin/thermos_executor_main.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/aurora/executor/bin/thermos_executor_main.py b/src/main/python/apache/aurora/executor/bin/thermos_executor_main.py
index 203fc47..0ef3856 100644
--- a/src/main/python/apache/aurora/executor/bin/thermos_executor_main.py
+++ b/src/main/python/apache/aurora/executor/bin/thermos_executor_main.py
@@ -55,9 +55,6 @@ LogOptions.set_simple(True)
 LogOptions.set_disk_log_level('DEBUG')
 LogOptions.set_log_dir(CWD)
 
-_LOGGER_DESTINATIONS = ', '.join(LoggerDestination.VALUES)
-_LOGGER_MODES = ', '.join(LoggerMode.VALUES)
-
 
 app.add_option(
     '--announcer-ensemble',
@@ -88,8 +85,7 @@ app.add_option(
     type=str,
     default=None,
     help='Set hostname to be announced. By default it is'
-         'the --hostname argument passed into the Mesos agent.'
-)
+         'the --hostname argument passed into the Mesos agent.')
 
 app.add_option(
     '--announcer-zookeeper-auth-config',
@@ -118,25 +114,22 @@ app.add_option(
     dest='nosetuid_health_checks',
     action="store_true",
     help='If set, the executor will not run shell health checks as job\'s role\'s user',
-    default=False
-)
+    default=False)
 
 
 app.add_option(
     '--runner-logger-destination',
     dest='runner_logger_destination',
-    type=str,
-    default='file',
-    help='The logger destination [%s] to use for all processes run by thermos.'
-      % _LOGGER_DESTINATIONS)
+    choices=LoggerDestination.VALUES,
+    help='The logger destination %r to use for all processes run by thermos.'
+      % (LoggerDestination.VALUES,))
 
 
 app.add_option(
     '--runner-logger-mode',
     dest='runner_logger_mode',
-    type=str,
-    default=None,
-    help='The logger mode [%s] to use for all processes run by thermos.' % _LOGGER_MODES)
+    choices=LoggerMode.VALUES,
+    help='The logger mode %r to use for all processes run by thermos.' % (LoggerMode.VALUES,))
 
 
 app.add_option(

http://git-wip-us.apache.org/repos/asf/aurora/blob/205c308c/src/main/python/apache/thermos/core/runner.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/thermos/core/runner.py b/src/main/python/apache/thermos/core/runner.py
index 3ebf86e..fe971ed 100644
--- a/src/main/python/apache/thermos/core/runner.py
+++ b/src/main/python/apache/thermos/core/runner.py
@@ -66,11 +66,11 @@ from apache.thermos.config.loader import (
     ThermosTaskValidator,
     ThermosTaskWrapper
 )
-from apache.thermos.config.schema import ThermosContext
+from apache.thermos.config.schema import Logger, RotatePolicy, ThermosContext
 
 from .helper import TaskRunnerHelper
 from .muxer import ProcessMuxer
-from .process import LoggerDestination, LoggerMode, Process
+from .process import LoggerMode, Process
 
 from gen.apache.thermos.ttypes import (
     ProcessState,
@@ -722,6 +722,9 @@ class TaskRunner(object):
       rotate_log_backups=rotate_log_backups,
       preserve_env=self._preserve_env)
 
+  _DEFAULT_LOGGER = Logger()
+  _DEFAULT_ROTATION = RotatePolicy()
+
   def _build_process_logger_args(self, process):
     """
       Build the appropriate logging configuration based on flags + process
@@ -730,27 +733,36 @@ class TaskRunner(object):
       If no configuration (neither flags nor process config), default to
       "standard" mode.
     """
-    destination, mode, size, backups = None, None, None, None
+
+    destination, mode, size, backups = (self._DEFAULT_LOGGER.destination().get(),
+                                        self._DEFAULT_LOGGER.mode().get(),
+                                        None,
+                                        None)
+
     logger = process.logger()
     if logger is Empty:
       if self._process_logger_destination:
         destination = self._process_logger_destination
-      else:
-        destination = LoggerDestination.FILE
-
       if self._process_logger_mode:
-        mode = self._process_logger_mode,
-        size = Amount(self._rotate_log_size_mb, Data.MB)
-        backups = self._rotate_log_backups
-      else:
-        mode = LoggerMode.STANDARD
+        mode = self._process_logger_mode
     else:
       destination = logger.destination().get()
       mode = logger.mode().get()
-      if mode == LoggerMode.ROTATE:
+
+    if mode == LoggerMode.ROTATE:
+      size = Amount(self._DEFAULT_ROTATION.log_size().get(), Data.BYTES)
+      backups = self._DEFAULT_ROTATION.backups().get()
+      if logger is Empty:
+        if self._rotate_log_size_mb:
+          size = Amount(self._rotate_log_size_mb, Data.MB)
+        if self._rotate_log_backups:
+          backups = self._rotate_log_backups
+      else:
         rotate = logger.rotate()
-        size = Amount(rotate.log_size().get(), Data.BYTES)
-        backups = rotate.backups().get()
+        if rotate is not Empty:
+          size = Amount(rotate.log_size().get(), Data.BYTES)
+          backups = rotate.backups().get()
+
     return destination, mode, size, backups
 
   def deadlocked(self, plan=None):

http://git-wip-us.apache.org/repos/asf/aurora/blob/205c308c/src/main/python/apache/thermos/testing/runner.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/thermos/testing/runner.py b/src/main/python/apache/thermos/testing/runner.py
index 8b6ba73..0560891 100644
--- a/src/main/python/apache/thermos/testing/runner.py
+++ b/src/main/python/apache/thermos/testing/runner.py
@@ -70,10 +70,8 @@ class AngryHandler(TaskRunnerUniversalHandler):
         sys.exit(1)
 
 sandbox = os.path.join('%(sandbox)s', '%(task_id)s')
-args = {}
+args = %(extra_task_runner_args)r
 args['task_id'] = '%(task_id)s'
-if %(portmap)s:
-  args['portmap'] = %(portmap)s
 args['universal_handler'] = AngryHandler
 
 runner = TaskRunner(task, '%(root)s', sandbox, **args)
@@ -83,7 +81,7 @@ with open('%(state_filename)s', 'w') as fp:
   fp.write(thrift_serialize(runner.state))
 """
 
-  def __init__(self, task, portmap={}, success_rate=100, random_seed=31337):
+  def __init__(self, task, success_rate=100, random_seed=31337, **extra_task_runner_args):
     """
       task = Thermos task
       portmap = port map
@@ -99,7 +97,7 @@ with open('%(state_filename)s', 'w') as fp:
     self.tempdir = tempfile.mkdtemp()
     self.task_id = '%s-runner-base' % int(time.time() * 1000000)
     self.sandbox = os.path.join(self.tempdir, 'sandbox')
-    self.portmap = portmap
+    self.extra_task_runner_args = extra_task_runner_args
     self.cleaned = False
     self.pathspec = TaskPath(root=self.tempdir, task_id=self.task_id)
     self.script_filename = None
@@ -130,9 +128,9 @@ with open('%(state_filename)s', 'w') as fp:
         'root': self.tempdir,
         'task_id': self.task_id,
         'state_filename': self.state_filename,
-        'portmap': repr(self.portmap),
         'success_rate': self.success_rate,
         'random_seed': self.random_seed + self._run_count,
+        'extra_task_runner_args': self.extra_task_runner_args,
       })
 
     with environment_as(PYTHONPATH=os.pathsep.join(sys.path)):
@@ -193,12 +191,16 @@ with open('%(state_filename)s', 'w') as fp:
 
 class RunnerTestBase(object):
   @classmethod
+  def extra_task_runner_args(cls):
+    return dict(portmap=getattr(cls, 'portmap', {}))
+
+  @classmethod
   def task(cls):
     raise NotImplementedError
 
   @classmethod
   def setup_class(cls):
-    cls.runner = Runner(cls.task(), portmap=getattr(cls, 'portmap', {}))
+    cls.runner = Runner(cls.task(), **cls.extra_task_runner_args())
     cls.runner.run()
     cls.state = cls.runner.state
 

http://git-wip-us.apache.org/repos/asf/aurora/blob/205c308c/src/test/python/apache/thermos/core/BUILD
----------------------------------------------------------------------
diff --git a/src/test/python/apache/thermos/core/BUILD b/src/test/python/apache/thermos/core/BUILD
index acfb79b..957a116 100644
--- a/src/test/python/apache/thermos/core/BUILD
+++ b/src/test/python/apache/thermos/core/BUILD
@@ -19,7 +19,9 @@ python_tests(
     '3rdparty/python:mock',
     '3rdparty/python:psutil',
     '3rdparty/python:twitter.common.contextutil',
+    '3rdparty/python:twitter.common.quantity',
     '3rdparty/python:twitter.common.process',
+    'src/main/python/apache/thermos/config',
     'src/main/python/apache/thermos/core',
     'src/main/python/apache/thermos/monitoring',
     'src/main/python/apache/thermos/testing',

http://git-wip-us.apache.org/repos/asf/aurora/blob/205c308c/src/test/python/apache/thermos/core/test_runner_log_config.py
----------------------------------------------------------------------
diff --git a/src/test/python/apache/thermos/core/test_runner_log_config.py b/src/test/python/apache/thermos/core/test_runner_log_config.py
new file mode 100644
index 0000000..4381296
--- /dev/null
+++ b/src/test/python/apache/thermos/core/test_runner_log_config.py
@@ -0,0 +1,230 @@
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import os
+import struct
+from collections import namedtuple
+
+from twitter.common.quantity import Amount, Data
+
+from apache.thermos.config.schema_base import Logger, Process, RotatePolicy, Task
+from apache.thermos.testing.runner import RunnerTestBase
+
+
+class LogConfig(namedtuple('LogConfig', ['destination', 'mode', 'size_mb', 'backups'])):
+  @classmethod
+  def create(cls, destination=None, mode=None, size_mb=None, backups=None):
+    return cls(destination=destination, mode=mode, size_mb=size_mb, backups=backups)
+
+
+class RunnerLogConfigTestBase(RunnerTestBase):
+  _OPT_CONFIG_PROCESS_NAME = 'opt-log-config-process'
+  _CUSTOM_CONFIG_PROCESS_NAME = 'custom-log-config-process'
+
+  @classmethod
+  def opt_config(cls):
+    return LogConfig.create()
+
+  @classmethod
+  def custom_config(cls):
+    return None
+
+  @classmethod
+  def log_size(cls):
+    return Amount(1, Data.BYTES)
+
+  STDOUT = 1
+  STDERR = 2
+
+  @classmethod
+  def log_fd(cls):
+    return cls.STDOUT
+
+  @classmethod
+  def extra_task_runner_args(cls):
+    opt_config = cls.opt_config()
+    return dict(process_logger_destination=opt_config.destination,
+                process_logger_mode=opt_config.mode,
+                rotate_log_size_mb=opt_config.size_mb,
+                rotate_log_backups=opt_config.backups)
+
+  @classmethod
+  def task(cls):
+    cmdline = 'head -c %d /dev/zero >&%d' % (cls.log_size().as_(Data.BYTES), cls.log_fd())
+
+    opt_config_process = Process(name=cls._OPT_CONFIG_PROCESS_NAME, cmdline=cmdline)
+
+    custom_config_process = Process(name=cls._CUSTOM_CONFIG_PROCESS_NAME, cmdline=cmdline)
+    custom_config = cls.custom_config()
+    if custom_config:
+      logger = Logger()
+      if custom_config.destination:
+        logger = logger(destination=custom_config.destination)
+      if custom_config.mode:
+        logger = logger(mode=custom_config.mode)
+      if custom_config.size_mb or custom_config.backups:
+        rotate = RotatePolicy()
+        if custom_config.size_mb:
+          rotate = rotate(log_size=custom_config.size_mb)
+        if custom_config.backups:
+          rotate = rotate(backups=custom_config.backups)
+        logger = logger(rotate=rotate)
+      custom_config_process = custom_config_process(logger=logger)
+
+    return Task(name='log-config-task', processes=[opt_config_process, custom_config_process])
+
+  class Assert(object):
+    def __init__(self, log_file_dir):
+      self._log_file_dir = log_file_dir
+
+    def log_file_names(self, *names):
+      actual_names = os.listdir(self._log_file_dir)
+      assert set(actual_names) == set(names)
+      assert all(os.path.isfile(os.path.join(self._log_file_dir, name)) for name in names)
+
+    def log_file(self, name):
+      expected_output_file = os.path.join(self._log_file_dir, name)
+      assert os.path.exists(expected_output_file)
+      with open(expected_output_file, 'rb') as fp:
+        actual = fp.read()
+        size = len(actual)
+        assert actual == struct.pack('B', 0) * size
+      return size
+
+    def empty_log_file(self, name):
+      size = self.log_file(name=name)
+      assert 0 == size
+
+  def _log_dir_name(self, process_name):
+    return os.path.join(self.state.header.log_dir, process_name, '0')
+
+  @property
+  def opt_assert(self):
+    return self.Assert(self._log_dir_name(self._OPT_CONFIG_PROCESS_NAME))
+
+  @property
+  def custom_assert(self):
+    return self.Assert(self._log_dir_name(self._CUSTOM_CONFIG_PROCESS_NAME))
+
+
+class StandardTestBase(RunnerLogConfigTestBase):
+  @classmethod
+  def log_size(cls):
+    return Amount(200, Data.MB)
+
+  def test_log_config(self):
+    log, empty = ('stdout', 'stderr') if self.log_fd() == self.STDOUT else ('stderr', 'stdout')
+    for assertions in self.opt_assert, self.custom_assert:
+      assertions.log_file_names(log, empty)
+      assertions.empty_log_file(name=empty)
+
+      # No rotation should occur in standard mode.
+      size = assertions.log_file(name=log)
+      assert size == self.log_size().as_(Data.BYTES)
+
+
+class TestStandardStdout(StandardTestBase):
+  @classmethod
+  def log_fd(cls):
+    return cls.STDOUT
+
+
+class TestStandardStderr(StandardTestBase):
+  @classmethod
+  def log_fd(cls):
+    return cls.STDERR
+
+
+class RotateTestBase(RunnerLogConfigTestBase):
+  @classmethod
+  def opt_config(cls):
+    return LogConfig.create(mode='rotate', size_mb=1, backups=1)
+
+
+class TestRotateUnderStdout(RotateTestBase):
+  def test_log_config(self):
+    self.opt_assert.log_file_names('stdout', 'stderr')
+    self.opt_assert.empty_log_file(name='stderr')
+    self.opt_assert.log_file(name='stdout')
+
+
+class TestRotateUnderStderr(RotateTestBase):
+  @classmethod
+  def log_fd(cls):
+    return cls.STDERR
+
+  def test_log_config(self):
+    self.opt_assert.log_file_names('stdout', 'stderr')
+    self.opt_assert.empty_log_file(name='stdout')
+    self.opt_assert.log_file(name='stderr')
+
+
+class TestRotateOverStdout(RotateTestBase):
+  @classmethod
+  def log_size(cls):
+    return Amount(2, Data.MB)
+
+  def test_log_config(self):
+    self.opt_assert.log_file_names('stdout', 'stdout.1', 'stderr')
+    self.opt_assert.empty_log_file(name='stderr')
+    self.opt_assert.log_file(name='stdout')
+    self.opt_assert.log_file(name='stdout.1')
+
+
+class TestRotateOverStderr(RotateTestBase):
+  @classmethod
+  def log_size(cls):
+    return Amount(3, Data.MB)
+
+  @classmethod
+  def log_fd(cls):
+    return cls.STDERR
+
+  def test_log_config(self):
+    self.opt_assert.log_file_names('stdout', 'stderr', 'stderr.1')
+    self.opt_assert.empty_log_file(name='stdout')
+    self.opt_assert.log_file(name='stderr')
+    self.opt_assert.log_file(name='stderr.1')
+
+
+class TestRotateDefaulted(RunnerLogConfigTestBase):
+  @classmethod
+  def opt_config(cls):
+    return LogConfig.create(mode='rotate')
+
+  @classmethod
+  def custom_config(cls):
+    return LogConfig.create(mode='rotate')
+
+  @classmethod
+  def log_size(cls):
+    # Default rotation policy is 100MiB with 5 backups so this guarantees a full rotation.
+    return Amount(700, Data.MB)
+
+  def test_log_config(self):
+    for assertions in self.opt_assert, self.custom_assert:
+      assertions.log_file_names('stderr',
+                                'stdout',
+                                'stdout.1',
+                                'stdout.2',
+                                'stdout.3',
+                                'stdout.4',
+                                'stdout.5')
+      assertions.empty_log_file(name='stderr')
+      assertions.log_file(name='stdout')
+      assertions.log_file(name='stdout.1')
+      assertions.log_file(name='stdout.2')
+      assertions.log_file(name='stdout.3')
+      assertions.log_file(name='stdout.4')
+      assertions.log_file(name='stdout.5')