You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aurora.apache.org by wf...@apache.org on 2013/12/31 22:20:14 UTC
[21/51] [partial] Rename twitter* and com.twitter to apache and
org.apache directories to preserve all file history before the refactor.
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/apache/thermos/core/runner.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/thermos/core/runner.py b/src/main/python/apache/thermos/core/runner.py
new file mode 100644
index 0000000..b10cf16
--- /dev/null
+++ b/src/main/python/apache/thermos/core/runner.py
@@ -0,0 +1,905 @@
+""" Thermos runner.
+
+This module contains the TaskRunner, the core component of Thermos responsible for actually running
+tasks. It also contains several Handlers which define the behaviour on state transitions within the
+TaskRunner.
+
+There are three "active" states in a running Thermos task:
+ ACTIVE
+ CLEANING
+ FINALIZING
+
+A task in ACTIVE state is running regular processes. The moment this task succeeds or goes over its
+failure limit, it then goes into CLEANING state, where it begins the staged termination of leftover
+processes (with SIGTERMs). Once all processes have terminated, the task goes into FINALIZING state,
+where the processes marked with the 'final' bit run. Once the task has gone into CLEANING state, it
+has a deadline for going into terminal state. If it doesn't make it in time (at any point, whether
+in CLEANING or FINALIZING state), it is forced into terminal state through SIGKILLs of all live
+processes (coordinators, shells and the full process trees rooted at the shells.)
+
+TaskRunner.kill is implemented by forcing the task into CLEANING state and setting its finalization
+deadline manually. So in practice, we implement Task preemption by calling kill with the
+finalization deadline = now + preemption wait, which gives the Task an opportunity to do graceful
+shutdown. If preemption_wait=0, it will result in immediate SIGKILLs and then transition to the
+terminal state.
+
+"""
+
+from contextlib import contextmanager
+import errno
+from functools import partial
+import os
+import socket
+import sys
+import time
+import traceback
+
+from twitter.common import log
+from twitter.common.dirutil import safe_mkdir
+from twitter.common.quantity import Amount, Time
+from twitter.common.recordio import ThriftRecordReader
+
+from twitter.thermos.common.ckpt import (
+ CheckpointDispatcher,
+ UniversalStateHandler,
+ ProcessStateHandler,
+ TaskStateHandler)
+from twitter.thermos.common.path import TaskPath
+from twitter.thermos.common.planner import TaskPlanner
+from twitter.thermos.config.loader import (
+ ThermosConfigLoader,
+ ThermosProcessWrapper,
+ ThermosTaskWrapper,
+ ThermosTaskValidator)
+from twitter.thermos.config.schema import ThermosContext
+
+from gen.twitter.thermos.ttypes import (
+ ProcessState,
+ ProcessStatus,
+ RunnerCkpt,
+ RunnerHeader,
+ RunnerState,
+ TaskState,
+ TaskStatus,
+)
+
+from .helper import TaskRunnerHelper
+from .muxer import ProcessMuxer
+from .process import Process
+
+from pystachio import Environment
+
+
+# TODO(wickman) Currently this is messy because of all the private access into ._runner.
+# Clean this up by giving the TaskRunnerProcessHandler the components it should own, and
+# create a legitimate API contract into the Runner.
+class TaskRunnerProcessHandler(ProcessStateHandler):
+ """
+ Accesses these parts of the runner:
+
+ | _task_processes [array set, pop]
+ | _task_process_from_process_name [process name / sequence number => Process]
+ | _watcher [ProcessMuxer.register, unregister]
+ | _plan [add_success, add_failure, set_running]
+ """
+
+ def __init__(self, runner):
+ self._runner = runner
+
+ def on_waiting(self, process_update):
+ log.debug('Process on_waiting %s' % process_update)
+ self._runner._task_processes[process_update.process] = (
+ self._runner._task_process_from_process_name(
+ process_update.process, process_update.seq + 1))
+ self._runner._watcher.register(process_update.process, process_update.seq - 1)
+
+ def on_forked(self, process_update):
+ log.debug('Process on_forked %s' % process_update)
+ task_process = self._runner._task_processes[process_update.process]
+ task_process.rebind(process_update.coordinator_pid, process_update.fork_time)
+ self._runner._plan.set_running(process_update.process)
+
+ def on_running(self, process_update):
+ log.debug('Process on_running %s' % process_update)
+ self._runner._plan.set_running(process_update.process)
+
+ def _cleanup(self, process_update):
+ if not self._runner._recovery:
+ TaskRunnerHelper.kill_process(self._runner.state, process_update.process)
+
+ def on_success(self, process_update):
+ log.debug('Process on_success %s' % process_update)
+ log.info('Process(%s) finished successfully [rc=%s]' % (
+ process_update.process, process_update.return_code))
+ self._cleanup(process_update)
+ self._runner._task_processes.pop(process_update.process)
+ self._runner._watcher.unregister(process_update.process)
+ self._runner._plan.add_success(process_update.process)
+
+ def _on_abnormal(self, process_update):
+ log.info('Process %s had an abnormal termination' % process_update.process)
+ self._runner._task_processes.pop(process_update.process)
+ self._runner._watcher.unregister(process_update.process)
+
+ def on_failed(self, process_update):
+ log.debug('Process on_failed %s' % process_update)
+ log.info('Process(%s) failed [rc=%s]' % (process_update.process, process_update.return_code))
+ self._cleanup(process_update)
+ self._on_abnormal(process_update)
+ self._runner._plan.add_failure(process_update.process)
+ if process_update.process in self._runner._plan.failed:
+ log.info('Process %s reached maximum failures, marking process run failed.' %
+ process_update.process)
+ else:
+ log.info('Process %s under maximum failure limit, restarting.' % process_update.process)
+
+ def on_lost(self, process_update):
+ log.debug('Process on_lost %s' % process_update)
+ self._cleanup(process_update)
+ self._on_abnormal(process_update)
+ self._runner._plan.lost(process_update.process)
+
+ def on_killed(self, process_update):
+ log.debug('Process on_killed %s' % process_update)
+ self._cleanup(process_update)
+ self._runner._task_processes.pop(process_update.process)
+ self._runner._watcher.unregister(process_update.process)
+ log.debug('Process killed, marking it as a loss.')
+ self._runner._plan.lost(process_update.process)
+
+
+class TaskRunnerTaskHandler(TaskStateHandler):
+ """
+ Accesses these parts of the runner:
+ _plan [set to regular_plan or finalizing_plan]
+ _recovery [boolean, whether or not to side-effect]
+ _pathspec [path creation]
+ _task [ThermosTask]
+ _set_finalization_start
+ _kill
+ """
+
+ def __init__(self, runner):
+ self._runner = runner
+ self._pathspec = self._runner._pathspec
+
+ def on_active(self, task_update):
+ log.debug('Task on_active(%s)' % task_update)
+ self._runner._plan = self._runner._regular_plan
+ if self._runner._recovery:
+ return
+ TaskRunnerHelper.initialize_task(self._pathspec,
+ ThermosTaskWrapper(self._runner._task).to_json())
+
+ def on_cleaning(self, task_update):
+ log.debug('Task on_cleaning(%s)' % task_update)
+ self._runner._finalization_start = task_update.timestamp_ms / 1000.0
+ self._runner._terminate_plan(self._runner._regular_plan)
+
+ def on_finalizing(self, task_update):
+ log.debug('Task on_finalizing(%s)' % task_update)
+ if not self._runner._recovery:
+ self._runner._kill()
+ self._runner._plan = self._runner._finalizing_plan
+ if self._runner._finalization_start is None:
+ self._runner._finalization_start = task_update.timestamp_ms / 1000.0
+
+ def on_killed(self, task_update):
+ log.debug('Task on_killed(%s)' % task_update)
+ self._cleanup()
+
+ def on_success(self, task_update):
+ log.debug('Task on_success(%s)' % task_update)
+ self._cleanup()
+ log.info('Task succeeded.')
+
+ def on_failed(self, task_update):
+ log.debug('Task on_failed(%s)' % task_update)
+ self._cleanup()
+
+ def on_lost(self, task_update):
+ log.debug('Task on_lost(%s)' % task_update)
+ self._cleanup()
+
+ def _cleanup(self):
+ if not self._runner._recovery:
+ self._runner._kill()
+ TaskRunnerHelper.finalize_task(self._pathspec)
+
+
+class TaskRunnerUniversalHandler(UniversalStateHandler):
+ """
+ Universal handler to checkpoint every process and task transition of the runner.
+
+ Accesses these parts of the runner:
+ _ckpt_write
+ """
+
+ def __init__(self, runner):
+ self._runner = runner
+
+ def _checkpoint(self, record):
+ self._runner._ckpt_write(record)
+
+ def on_process_transition(self, state, process_update):
+ log.debug('_on_process_transition: %s' % process_update)
+ self._checkpoint(RunnerCkpt(process_status=process_update))
+
+ def on_task_transition(self, state, task_update):
+ log.debug('_on_task_transition: %s' % task_update)
+ self._checkpoint(RunnerCkpt(task_status=task_update))
+
+ def on_initialization(self, header):
+ log.debug('_on_initialization: %s' % header)
+ ThermosTaskValidator.assert_valid_task(self._runner.task)
+ ThermosTaskValidator.assert_valid_ports(self._runner.task, header.ports)
+ self._checkpoint(RunnerCkpt(runner_header=header))
+
+
+class TaskRunnerStage(object):
+ """
+ A stage of the task runner pipeline.
+ """
+ MAX_ITERATION_WAIT = Amount(1, Time.SECONDS)
+
+ def __init__(self, runner):
+ self.runner = runner
+ self.clock = runner._clock
+
+ def run(self):
+ """
+ Perform any work necessary at this stage of the task.
+
+ If there is no more work to be done, return None. [This will invoke a state transition.]
+
+ If there is still work to be done, return the number of seconds from now in which you'd like
+ to be called to re-run the plan.
+ """
+ return None
+
+ def transition_to(self):
+ """
+ The stage to which we should transition.
+ """
+ raise NotImplementedError
+
+
+class TaskRunnerStage_ACTIVE(TaskRunnerStage):
+ """
+ Run the regular plan (i.e. normal, non-finalizing processes.)
+ """
+ MAX_ITERATION_WAIT = Amount(15, Time.SECONDS)
+ MIN_ITERATION_WAIT = Amount(1, Time.SECONDS)
+
+ def __init__(self, runner):
+ super(TaskRunnerStage_ACTIVE, self).__init__(runner)
+
+ def run(self):
+ launched = self.runner._run_plan(self.runner._regular_plan)
+
+ # Have we terminated?
+ terminal_state = None
+ if self.runner._regular_plan.is_complete():
+ log.info('Regular plan complete.')
+ terminal_state = TaskState.SUCCESS if self.runner.is_healthy() else TaskState.FAILED
+ elif not self.runner.is_healthy():
+ log.error('Regular plan unhealthy!')
+ terminal_state = TaskState.FAILED
+
+ if terminal_state:
+ # No more work to do
+ return None
+ elif launched > 0:
+ # We want to run ASAP after updates have been collected
+ return max(self.MIN_ITERATION_WAIT.as_(Time.SECONDS), self.runner._regular_plan.min_wait())
+ else:
+ # We want to run as soon as something is available to run or after a prescribed timeout.
+ return min(self.MAX_ITERATION_WAIT.as_(Time.SECONDS), self.runner._regular_plan.min_wait())
+
+ def transition_to(self):
+ return TaskState.CLEANING
+
+
+class TaskRunnerStage_CLEANING(TaskRunnerStage):
+ """
+ Start the cleanup of the regular plan (e.g. if it failed.) On ACTIVE -> CLEANING,
+ we send SIGTERMs to all still-running processes. We wait at most finalization_wait
+ for all processes to complete before SIGKILLs are sent. If everything exits cleanly
+ prior to that point in time, we transition to FINALIZING, which kicks into gear
+ the finalization schedule (if any.)
+ """
+ def run(self):
+ log.debug('TaskRunnerStage[CLEANING]: Finalization remaining: %s' %
+ self.runner._finalization_remaining())
+ if self.runner._finalization_remaining() > 0 and self.runner.has_running_processes():
+ return min(self.runner._finalization_remaining(), self.MAX_ITERATION_WAIT.as_(Time.SECONDS))
+
+ def transition_to(self):
+ if self.runner._finalization_remaining() <= 0:
+ log.info('Exceeded finalization wait, skipping finalization.')
+ return self.runner.terminal_state()
+ return TaskState.FINALIZING
+
+
+class TaskRunnerStage_FINALIZING(TaskRunnerStage):
+ """
+ Run the finalizing plan, specifically the plan of tasks with the 'final'
+ bit marked (e.g. log savers, checkpointers and the like.) Anything in this
+ plan will be SIGKILLed if we go over the finalization_wait.
+ """
+
+ def run(self):
+ self.runner._run_plan(self.runner._finalizing_plan)
+ log.debug('TaskRunnerStage[FINALIZING]: Finalization remaining: %s' %
+ self.runner._finalization_remaining())
+ if self.runner.deadlocked(self.runner._finalizing_plan):
+ log.warning('Finalizing plan deadlocked.')
+ return None
+ if self.runner._finalization_remaining() > 0 and not self.runner._finalizing_plan.is_complete():
+ return min(self.runner._finalization_remaining(), self.MAX_ITERATION_WAIT.as_(Time.SECONDS))
+
+ def transition_to(self):
+ if self.runner._finalization_remaining() <= 0:
+ log.info('Exceeded finalization wait, terminating finalization.')
+ return self.runner.terminal_state()
+
+
+class TaskRunner(object):
+ """
+ Run a ThermosTask.
+
+ This class encapsulates the core logic to run and control the state of a Thermos task.
+ Typically, it will be instantiated directly to control a new task, but a TaskRunner can also be
+ synthesised from an existing task's checkpoint root
+ """
+ class Error(Exception): pass
+ class InvalidTask(Error): pass
+ class InternalError(Error): pass
+ class PermissionError(Error): pass
+ class StateError(Error): pass
+
+ # Maximum amount of time we spend waiting for new updates from the checkpoint streams
+ # before doing housecleaning (checking for LOST tasks, dead PIDs.)
+ MAX_ITERATION_TIME = Amount(10, Time.SECONDS)
+
+ # Minimum amount of time we wait between polls for updates on coordinator checkpoints.
+ COORDINATOR_INTERVAL_SLEEP = Amount(1, Time.SECONDS)
+
+ # Amount of time we're willing to wait after forking before we expect the runner to have
+ # exec'ed the child process.
+ LOST_TIMEOUT = Amount(60, Time.SECONDS)
+
+ # Active task stages
+ STAGES = {
+ TaskState.ACTIVE: TaskRunnerStage_ACTIVE,
+ TaskState.CLEANING: TaskRunnerStage_CLEANING,
+ TaskState.FINALIZING: TaskRunnerStage_FINALIZING
+ }
+
+ @classmethod
+ def get(cls, task_id, checkpoint_root):
+ """
+ Get a TaskRunner bound to the task_id in checkpoint_root.
+ """
+ path = TaskPath(root=checkpoint_root, task_id=task_id, state='active')
+ task_json = path.getpath('task_path')
+ task_checkpoint = path.getpath('runner_checkpoint')
+ if not os.path.exists(task_json):
+ return None
+ task = ThermosConfigLoader.load_json(task_json)
+ if task is None:
+ return None
+ if len(task.tasks()) == 0:
+ return None
+ try:
+ checkpoint = CheckpointDispatcher.from_file(task_checkpoint)
+ if checkpoint is None or checkpoint.header is None:
+ return None
+ return cls(task.tasks()[0].task(), checkpoint_root, checkpoint.header.sandbox,
+ log_dir=checkpoint.header.log_dir, task_id=task_id,
+ portmap=checkpoint.header.ports)
+ except Exception as e:
+ log.error('Failed to reconstitute checkpoint in TaskRunner.get: %s' % e, exc_info=True)
+ return None
+
+ def __init__(self, task, checkpoint_root, sandbox, log_dir=None,
+ task_id=None, portmap=None, user=None, chroot=False, clock=time,
+ universal_handler=None, planner_class=TaskPlanner):
+ """
+ required:
+ task (config.Task) = the task to run
+ checkpoint_root (path) = the checkpoint root
+ sandbox (path) = the sandbox in which the path will be run
+ [if None, cwd will be assumed, but garbage collection will be
+ disabled for this task.]
+
+ optional:
+ log_dir (string) = directory to house stdout/stderr logs. If not specified, logs will be
+ written into the sandbox directory under .logs/
+ task_id (string) = bind to this task id. if not specified, will synthesize an id based
+ upon task.name()
+ portmap (dict) = a map (string => integer) from name to port, e.g. { 'http': 80 }
+ user (string) = the user to run the task as. if not current user, requires setuid
+ privileges.
+ chroot (boolean) = whether or not to chroot into the sandbox prior to exec.
+ clock (time interface) = the clock to use throughout
+ universal_handler = checkpoint record handler (only used for testing)
+ planner_class (TaskPlanner class) = TaskPlanner class to use for constructing the task
+ planning policy.
+ """
+ if not issubclass(planner_class, TaskPlanner):
+ raise TypeError('planner_class must be a TaskPlanner.')
+ self._clock = clock
+ launch_time = self._clock.time()
+ launch_time_ms = '%06d' % int((launch_time - int(launch_time)) * 10**6)
+ if not task_id:
+ self._task_id = '%s-%s.%s' % (task.name(),
+ time.strftime('%Y%m%d-%H%M%S', time.localtime(launch_time)),
+ launch_time_ms)
+ else:
+ self._task_id = task_id
+ current_user = TaskRunnerHelper.get_actual_user()
+ self._user = user or current_user
+ # TODO(wickman) This should be delegated to the ProcessPlatform / Helper
+ if self._user != current_user:
+ if os.geteuid() != 0:
+ raise ValueError('task specifies user as %s, but %s does not have setuid permission!' % (
+ self._user, current_user))
+ self._portmap = portmap or {}
+ self._launch_time = launch_time
+ self._log_dir = log_dir or os.path.join(sandbox, '.logs')
+ self._pathspec = TaskPath(root=checkpoint_root, task_id=self._task_id, log_dir=self._log_dir)
+ try:
+ ThermosTaskValidator.assert_valid_task(task)
+ ThermosTaskValidator.assert_valid_ports(task, self._portmap)
+ except ThermosTaskValidator.InvalidTaskError as e:
+ raise self.InvalidTask('Invalid task: %s' % e)
+ context = ThermosContext(
+ task_id=self._task_id,
+ ports=self._portmap,
+ user=self._user)
+ self._task, uninterp = (task % Environment(thermos=context)).interpolate()
+ if len(uninterp) > 0:
+ raise self.InvalidTask('Failed to interpolate task, missing: %s' %
+ ', '.join(str(ref) for ref in uninterp))
+ try:
+ ThermosTaskValidator.assert_same_task(self._pathspec, self._task)
+ except ThermosTaskValidator.InvalidTaskError as e:
+ raise self.InvalidTask('Invalid task: %s' % e)
+ self._plan = None # plan currently being executed (updated by Handlers)
+ self._regular_plan = planner_class(self._task, clock=clock,
+ process_filter=lambda proc: proc.final().get() == False)
+ self._finalizing_plan = planner_class(self._task, clock=clock,
+ process_filter=lambda proc: proc.final().get() == True)
+ self._chroot = chroot
+ self._sandbox = sandbox
+ self._terminal_state = None
+ self._ckpt = None
+ self._process_map = dict((p.name().get(), p) for p in self._task.processes())
+ self._task_processes = {}
+ self._stages = dict((state, stage(self)) for state, stage in self.STAGES.items())
+ self._finalization_start = None
+ self._preemption_deadline = None
+ self._watcher = ProcessMuxer(self._pathspec)
+ self._state = RunnerState(processes = {})
+
+ # create runner state
+ universal_handler = universal_handler or TaskRunnerUniversalHandler
+ self._dispatcher = CheckpointDispatcher()
+ self._dispatcher.register_handler(universal_handler(self))
+ self._dispatcher.register_handler(TaskRunnerProcessHandler(self))
+ self._dispatcher.register_handler(TaskRunnerTaskHandler(self))
+
+ # recover checkpointed runner state and update plan
+ self._recovery = True
+ self._replay_runner_ckpt()
+
+ @property
+ def task(self):
+ return self._task
+
+ @property
+ def task_id(self):
+ return self._task_id
+
+ @property
+ def state(self):
+ return self._state
+
+ @property
+ def processes(self):
+ return self._task_processes
+
+ def task_state(self):
+ return self._state.statuses[-1].state if self._state.statuses else TaskState.ACTIVE
+
+ def close_ckpt(self):
+ """Force close the checkpoint stream. This is necessary for runners terminated through
+ exception propagation."""
+ log.debug('Closing the checkpoint stream.')
+ self._ckpt.close()
+
+ @contextmanager
+ def control(self, force=False):
+ """
+ Bind to the checkpoint associated with this task, position to the end of the log if
+ it exists, or create it if it doesn't. Fails if we cannot get "leadership" i.e. a
+ file lock on the checkpoint stream.
+ """
+ if self.is_terminal():
+ raise TaskRunner.StateError('Cannot take control of a task in terminal state.')
+ if self._sandbox:
+ safe_mkdir(self._sandbox)
+ ckpt_file = self._pathspec.getpath('runner_checkpoint')
+ try:
+ self._ckpt = TaskRunnerHelper.open_checkpoint(ckpt_file, force=force, state=self._state)
+ except TaskRunnerHelper.PermissionError:
+ raise TaskRunner.PermissionError('Unable to open checkpoint %s' % ckpt_file)
+ log.debug('Flipping recovery mode off.')
+ self._recovery = False
+ self._set_task_status(self.task_state())
+ self._resume_task()
+ try:
+ yield
+ except Exception as e:
+ log.error('Caught exception in self.control(): %s' % e)
+ log.error(' %s' % traceback.format_exc())
+ self._ckpt.close()
+
+ def _resume_task(self):
+ assert self._ckpt is not None
+ unapplied_updates = self._replay_process_ckpts()
+ if self.is_terminal():
+ raise self.StateError('Cannot resume terminal task.')
+ self._initialize_ckpt_header()
+ self._replay(unapplied_updates)
+
+ def _ckpt_write(self, record):
+ """
+ Write to the checkpoint stream if we're not in recovery mode.
+ """
+ if not self._recovery:
+ self._ckpt.write(record)
+
+ def _replay(self, checkpoints):
+ """
+ Replay a sequence of RunnerCkpts.
+ """
+ for checkpoint in checkpoints:
+ self._dispatcher.dispatch(self._state, checkpoint)
+
+ def _replay_runner_ckpt(self):
+ """
+ Replay the checkpoint stream associated with this task.
+ """
+ ckpt_file = self._pathspec.getpath('runner_checkpoint')
+ if os.path.exists(ckpt_file):
+ fp = open(ckpt_file, "r")
+ ckpt_recover = ThriftRecordReader(fp, RunnerCkpt)
+ for record in ckpt_recover:
+ log.debug('Replaying runner checkpoint record: %s' % record)
+ self._dispatcher.dispatch(self._state, record, recovery=True)
+ ckpt_recover.close()
+
+ def _replay_process_ckpts(self):
+ """
+ Replay the unmutating process checkpoints. Return the unapplied process updates that
+ would mutate the runner checkpoint stream.
+ """
+ process_updates = self._watcher.select()
+ unapplied_process_updates = []
+ for process_update in process_updates:
+ if self._dispatcher.would_update(self._state, process_update):
+ unapplied_process_updates.append(process_update)
+ else:
+ self._dispatcher.dispatch(self._state, process_update, recovery=True)
+ return unapplied_process_updates
+
+ def _initialize_ckpt_header(self):
+ """
+ Initializes the RunnerHeader for this checkpoint stream if it has not already
+ been constructed.
+ """
+ if self._state.header is None:
+ header = RunnerHeader(
+ task_id=self._task_id,
+ launch_time_ms=int(self._launch_time*1000),
+ sandbox=self._sandbox,
+ log_dir=self._log_dir,
+ hostname=socket.gethostname(),
+ user=self._user,
+ ports=self._portmap)
+ runner_ckpt = RunnerCkpt(runner_header=header)
+ self._dispatcher.dispatch(self._state, runner_ckpt)
+
+ def _set_task_status(self, state):
+ update = TaskStatus(state=state, timestamp_ms=int(self._clock.time() * 1000),
+ runner_pid=os.getpid(), runner_uid=os.getuid())
+ runner_ckpt = RunnerCkpt(task_status=update)
+ self._dispatcher.dispatch(self._state, runner_ckpt, self._recovery)
+
+ def _finalization_remaining(self):
+ # If a preemption deadline has been set, use that.
+ if self._preemption_deadline:
+ return max(0, self._preemption_deadline - self._clock.time())
+
+ # Otherwise, use the finalization wait provided in the configuration.
+ finalization_allocation = self.task.finalization_wait().get()
+ if self._finalization_start is None:
+ return sys.float_info.max
+ else:
+ waited = max(0, self._clock.time() - self._finalization_start)
+ return max(0, finalization_allocation - waited)
+
+ def _set_process_status(self, process_name, process_state, **kw):
+ if 'sequence_number' in kw:
+ sequence_number = kw.pop('sequence_number')
+ log.debug('_set_process_status(%s <= %s, seq=%s[force])' % (process_name,
+ ProcessState._VALUES_TO_NAMES.get(process_state), sequence_number))
+ else:
+ current_run = self._current_process_run(process_name)
+ if not current_run:
+ assert process_state == ProcessState.WAITING
+ sequence_number = 0
+ else:
+ sequence_number = current_run.seq + 1
+ log.debug('_set_process_status(%s <= %s, seq=%s[auto])' % (process_name,
+ ProcessState._VALUES_TO_NAMES.get(process_state), sequence_number))
+ runner_ckpt = RunnerCkpt(process_status=ProcessStatus(
+ process=process_name, state=process_state, seq=sequence_number, **kw))
+ self._dispatcher.dispatch(self._state, runner_ckpt, self._recovery)
+
+ def _task_process_from_process_name(self, process_name, sequence_number):
+ """
+ Construct a Process() object from a process_name, populated with its
+ correct run number and fully interpolated commandline.
+ """
+ run_number = len(self.state.processes[process_name]) - 1
+ pathspec = self._pathspec.given(process=process_name, run=run_number)
+ process = self._process_map.get(process_name)
+ if process is None:
+ raise self.InternalError('FATAL: Could not find process: %s' % process_name)
+ def close_ckpt_and_fork():
+ pid = os.fork()
+ if pid == 0 and self._ckpt is not None:
+ self._ckpt.close()
+ return pid
+ return Process(
+ process.name().get(),
+ process.cmdline().get(),
+ sequence_number,
+ pathspec,
+ self._sandbox,
+ self._user,
+ chroot=self._chroot,
+ fork=close_ckpt_and_fork)
+
+ def deadlocked(self, plan=None):
+ """Check whether a plan is deadlocked, i.e. there are no running/runnable processes, and the
+ plan is not complete."""
+ plan = plan or self._regular_plan
+ now = self._clock.time()
+ running = list(plan.running)
+ runnable = list(plan.runnable_at(now))
+ waiting = list(plan.waiting_at(now))
+ log.debug('running:%d runnable:%d waiting:%d complete:%s' % (
+ len(running), len(runnable), len(waiting), plan.is_complete()))
+ return len(running + runnable + waiting) == 0 and not plan.is_complete()
+
+ def is_healthy(self):
+ """Check whether the TaskRunner is healthy. A healthy TaskRunner is not deadlocked and has not
+ reached its max_failures count."""
+ max_failures = self._task.max_failures().get()
+ deadlocked = self.deadlocked()
+ under_failure_limit = max_failures == 0 or len(self._regular_plan.failed) < max_failures
+ log.debug('max_failures:%d failed:%d under_failure_limit:%s deadlocked:%s ==> health:%s' % (
+ max_failures, len(self._regular_plan.failed), under_failure_limit, deadlocked,
+ not deadlocked and under_failure_limit))
+ return not deadlocked and under_failure_limit
+
+ def _current_process_run(self, process_name):
+ if process_name not in self._state.processes or len(self._state.processes[process_name]) == 0:
+ return None
+ return self._state.processes[process_name][-1]
+
+ def is_process_lost(self, process_name):
+ """Determine whether or not we should mark a task as LOST and do so if necessary."""
+ current_run = self._current_process_run(process_name)
+ if not current_run:
+ raise self.InternalError('No current_run for process %s!' % process_name)
+
+ def forked_but_never_came_up():
+ return current_run.state == ProcessState.FORKED and (
+ self._clock.time() - current_run.fork_time > TaskRunner.LOST_TIMEOUT.as_(Time.SECONDS))
+
+ def running_but_coordinator_died():
+ if current_run.state != ProcessState.RUNNING:
+ return False
+ coordinator_pid, _, _ = TaskRunnerHelper.scan_process(self.state, process_name)
+ if coordinator_pid is not None:
+ return False
+ elif self._watcher.has_data(process_name):
+ return False
+ return True
+
+ if forked_but_never_came_up() or running_but_coordinator_died():
+ log.info('Detected a LOST task: %s' % current_run)
+ log.debug(' forked_but_never_came_up: %s' % forked_but_never_came_up())
+ log.debug(' running_but_coordinator_died: %s' % running_but_coordinator_died())
+ return True
+
+ return False
+
+ def _run_plan(self, plan):
+ log.debug('Schedule pass:')
+
+ running = list(plan.running)
+ log.debug('running: %s' % ' '.join(plan.running))
+ log.debug('finished: %s' % ' '.join(plan.finished))
+
+ launched = []
+ for process_name in plan.running:
+ if self.is_process_lost(process_name):
+ self._set_process_status(process_name, ProcessState.LOST)
+
+ now = self._clock.time()
+ runnable = list(plan.runnable_at(now))
+ waiting = list(plan.waiting_at(now))
+ log.debug('runnable: %s' % ' '.join(runnable))
+ log.debug('waiting: %s' % ' '.join(
+ '%s[T-%.1fs]' % (process, plan.get_wait(process)) for process in waiting))
+
+ def pick_processes(process_list):
+ if self._task.max_concurrency().get() == 0:
+ return process_list
+ num_to_pick = max(self._task.max_concurrency().get() - len(running), 0)
+ return process_list[:num_to_pick]
+
+ for process_name in pick_processes(runnable):
+ tp = self._task_processes.get(process_name)
+ if tp:
+ current_run = self._current_process_run(process_name)
+ assert current_run.state == ProcessState.WAITING
+ else:
+ self._set_process_status(process_name, ProcessState.WAITING)
+ tp = self._task_processes[process_name]
+ log.info('Forking Process(%s)' % process_name)
+ tp.start()
+ launched.append(tp)
+
+ return len(launched) > 0
+
+ def _terminate_plan(self, plan):
+ for process in plan.running:
+ last_run = self._current_process_run(process)
+ if last_run and last_run.state in (ProcessState.FORKED, ProcessState.RUNNING):
+ TaskRunnerHelper.terminate_process(self.state, process)
+
+ def has_running_processes(self):
+ """
+ Returns True if any processes associated with this task have active pids.
+ """
+ process_tree = TaskRunnerHelper.scantree(self.state)
+ return any(any(process_set) for process_set in process_tree.values())
+
+ def has_active_processes(self):
+ """
+ Returns True if any processes are in non-terminal states.
+ """
+ return any(not TaskRunnerHelper.is_process_terminal(run.state) for run in
+ filter(None, (self._current_process_run(process) for process in self.state.processes)))
+
+ def collect_updates(self, timeout=None):
+ """
+ Collects and applies updates from process checkpoint streams. Returns the number
+ of applied process checkpoints.
+ """
+ if self.has_active_processes():
+ sleep_interval = self.COORDINATOR_INTERVAL_SLEEP.as_(Time.SECONDS)
+ total_time = 0.0
+ while True:
+ process_updates = self._watcher.select()
+ for process_update in process_updates:
+ self._dispatcher.dispatch(self._state, process_update, self._recovery)
+ if process_updates:
+ return len(process_updates)
+ if timeout and total_time >= timeout:
+ break
+ total_time += sleep_interval
+ self._clock.sleep(sleep_interval)
+ return 0
+
+ def is_terminal(self):
+ return TaskRunnerHelper.is_task_terminal(self.task_state())
+
+ def terminal_state(self):
+ if self._terminal_state:
+ log.debug('Forced terminal state: %s' %
+ TaskState._VALUES_TO_NAMES.get(self._terminal_state, 'UNKNOWN'))
+ return self._terminal_state
+ else:
+ return TaskState.SUCCESS if self.is_healthy() else TaskState.FAILED
+
+ def run(self, force=False):
+ """
+ Entrypoint to runner. Assume control of checkpoint stream, and execute TaskRunnerStages
+ until runner is terminal.
+ """
+ if self.is_terminal():
+ return
+ with self.control(force):
+ self._run()
+
+ def _run(self):
+ iteration_time = self.MAX_ITERATION_TIME.as_(Time.SECONDS)
+ while not self.is_terminal():
+ start = self._clock.time()
+ # step 1: execute stage corresponding to the state we're currently in
+ runner = self._stages[self.task_state()]
+ iteration_wait = runner.run()
+ if iteration_wait is None:
+ log.debug('Run loop: No more work to be done in state %s' %
+ TaskState._VALUES_TO_NAMES.get(self.task_state(), 'UNKNOWN'))
+ self._set_task_status(runner.transition_to())
+ continue
+ log.debug('Run loop: Work to be done within %.1fs' % iteration_wait)
+ # step 2: check child process checkpoint streams for updates
+ if not self.collect_updates(iteration_wait):
+ # If we don't collect any updates, at least 'touch' the checkpoint stream
+ # so as to prevent garbage collection.
+ elapsed = self._clock.time() - start
+ if elapsed < iteration_wait:
+ log.debug('Update collection only took %.1fs, idling %.1fs' % (
+ elapsed, iteration_wait - elapsed))
+ self._clock.sleep(iteration_wait - elapsed)
+ log.debug('Run loop: No updates collected, touching checkpoint.')
+ os.utime(self._pathspec.getpath('runner_checkpoint'), None)
+ # step 3: reap any zombie child processes
+ TaskRunnerHelper.reap_children()
+
+ def kill(self, force=False, terminal_status=TaskState.KILLED,
+ preemption_wait=Amount(1, Time.MINUTES)):
+ """
+ Kill all processes associated with this task and set task/process states as terminal_status
+ (defaults to KILLED)
+ """
+ log.debug('Runner issued kill: force:%s, preemption_wait:%s' % (
+ force, preemption_wait))
+ assert terminal_status in (TaskState.KILLED, TaskState.LOST)
+ self._preemption_deadline = self._clock.time() + preemption_wait.as_(Time.SECONDS)
+ with self.control(force):
+ if self.is_terminal():
+ log.warning('Task is not in ACTIVE state, cannot issue kill.')
+ return
+ self._terminal_state = terminal_status
+ if self.task_state() == TaskState.ACTIVE:
+ self._set_task_status(TaskState.CLEANING)
+ self._run()
+
+ def lose(self, force=False):
+ """
+ Mark a task as LOST and kill any straggling processes.
+ """
+ self.kill(force, preemption_wait=Amount(0, Time.SECONDS), terminal_status=TaskState.LOST)
+
+ def _kill(self):
+ processes = TaskRunnerHelper.scantree(self._state)
+ for process, pid_tuple in processes.items():
+ current_run = self._current_process_run(process)
+ coordinator_pid, pid, tree = pid_tuple
+ if TaskRunnerHelper.is_process_terminal(current_run.state):
+ if coordinator_pid or pid or tree:
+ log.warning('Terminal process (%s) still has running pids:' % process)
+ log.warning(' coordinator_pid: %s' % coordinator_pid)
+ log.warning(' pid: %s' % pid)
+ log.warning(' tree: %s' % tree)
+ TaskRunnerHelper.kill_process(self.state, process)
+ else:
+ if coordinator_pid or pid or tree:
+ log.info('Transitioning %s to KILLED' % process)
+ self._set_process_status(process, ProcessState.KILLED,
+ stop_time=self._clock.time(), return_code=-1)
+ else:
+ log.info('Transitioning %s to LOST' % process)
+ if current_run.state != ProcessState.WAITING:
+ self._set_process_status(process, ProcessState.LOST)
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/apache/thermos/monitoring/BUILD
----------------------------------------------------------------------
diff --git a/src/main/python/apache/thermos/monitoring/BUILD b/src/main/python/apache/thermos/monitoring/BUILD
new file mode 100644
index 0000000..a977878
--- /dev/null
+++ b/src/main/python/apache/thermos/monitoring/BUILD
@@ -0,0 +1,97 @@
+import os
+
+python_library(
+ name = 'detector',
+ sources = ['detector.py'],
+ dependencies = [
+ pants('src/main/python/twitter/thermos/common:path')
+ ]
+)
+
+python_library(
+ name = 'garbage',
+ sources = ['garbage.py'],
+ dependencies = [
+ pants(':detector'),
+ pants('aurora/twitterdeps/src/python/twitter/common/dirutil'),
+ pants('aurora/twitterdeps/src/python/twitter/common/lang'),
+ pants('aurora/twitterdeps/src/python/twitter/common/quantity'),
+ pants('src/main/python/twitter/thermos/common:ckpt'),
+ pants('src/main/python/twitter/thermos/common:path'),
+ ]
+)
+
+python_library(
+ name = 'monitor',
+ sources = ['monitor.py'],
+ dependencies = [
+ pants('aurora/twitterdeps/src/python/twitter/common/log'),
+ pants('aurora/twitterdeps/src/python/twitter/common/recordio:recordio-thrift'),
+ pants('src/main/python/twitter/thermos/common:ckpt'),
+ pants('src/main/thrift/com/twitter/thermos:py-thrift'),
+ ]
+)
+
+python_library(
+ name = 'disk',
+ sources = ['disk.py'],
+ dependencies = [
+ pants('aurora/twitterdeps/src/python/twitter/common/dirutil'),
+ pants('aurora/twitterdeps/src/python/twitter/common/exceptions'),
+ pants('aurora/twitterdeps/src/python/twitter/common/lang'),
+ pants('aurora/twitterdeps/src/python/twitter/common/log'),
+ pants('aurora/twitterdeps/src/python/twitter/common/quantity'),
+ python_requirement('watchdog'),
+ ]
+)
+
+python_library(
+ name = 'process',
+ sources = ['process.py'],
+)
+
+python_library(
+ name = 'process_collector_psutil',
+ sources = ['process_collector_psutil.py'],
+ dependencies = [
+ pants(':process'),
+ pants('aurora/twitterdeps/src/python/twitter/common/log'),
+ pants('src/main/python/twitter/thermos:psutil'),
+ ]
+)
+
+python_library(
+ name = 'resource',
+ sources = ['resource.py'],
+ dependencies = [
+ pants(':disk'),
+ pants(':monitor'),
+ pants(':process'),
+ pants(':process_collector_psutil'),
+ pants('aurora/twitterdeps/src/python/twitter/common/collections'),
+ pants('aurora/twitterdeps/src/python/twitter/common/concurrent'),
+ pants('aurora/twitterdeps/src/python/twitter/common/lang'),
+ pants('aurora/twitterdeps/src/python/twitter/common/log'),
+ pants('aurora/twitterdeps/src/python/twitter/common/quantity'),
+ ]
+)
+
+python_library(
+ name = 'monitoring',
+ dependencies = [
+ pants(':detector'),
+ pants(':disk'),
+ pants(':garbage'),
+ pants(':monitor'),
+ pants(':process'),
+ pants(':resource'),
+
+ # covering dependency for common
+ pants('src/main/python/twitter/thermos/common'),
+ ],
+ provides = setup_py(
+ name = 'twitter.thermos.monitoring',
+ version = open(os.path.join(get_buildroot(), '.auroraversion')).read().strip().lower(),
+ description = 'Thermos monitoring library.',
+ )
+)
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/apache/thermos/monitoring/__init__.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/thermos/monitoring/__init__.py b/src/main/python/apache/thermos/monitoring/__init__.py
new file mode 100644
index 0000000..e69de29
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/apache/thermos/monitoring/detector.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/thermos/monitoring/detector.py b/src/main/python/apache/thermos/monitoring/detector.py
new file mode 100644
index 0000000..ed48c93
--- /dev/null
+++ b/src/main/python/apache/thermos/monitoring/detector.py
@@ -0,0 +1,91 @@
+"""Detect Thermos tasks on disk
+
+This module contains the TaskDetector, used to detect Thermos tasks within a given checkpoint root.
+
+"""
+
+import glob
+import os
+import re
+
+from twitter.thermos.common.path import TaskPath
+
+
+class TaskDetector(object):
+ """
+ Helper class in front of TaskPath to detect active/finished/running tasks. Performs no
+ introspection on the state of a task; merely detects based on file paths on disk.
+ """
+ class MatchingError(Exception): pass
+
+ def __init__(self, root):
+ self._root_dir = root
+ self._pathspec = TaskPath()
+
+ def get_task_ids(self, state=None):
+ paths = glob.glob(self._pathspec.given(root=self._root_dir,
+ task_id="*",
+ state=state or '*')
+ .getpath('task_path'))
+ path_re = re.compile(self._pathspec.given(root=re.escape(self._root_dir),
+ task_id="(\S+)",
+ state='(\S+)')
+ .getpath('task_path'))
+ for path in paths:
+ try:
+ task_state, task_id = path_re.match(path).groups()
+ except:
+ continue
+ if state is None or task_state == state:
+ yield (task_state, task_id)
+
+ def get_process_runs(self, task_id, log_dir):
+ paths = glob.glob(self._pathspec.given(root=self._root_dir,
+ task_id=task_id,
+ log_dir=log_dir,
+ process='*',
+ run='*')
+ .getpath('process_logdir'))
+ path_re = re.compile(self._pathspec.given(root=re.escape(self._root_dir),
+ task_id=re.escape(task_id),
+ log_dir=log_dir,
+ process='(\S+)',
+ run='(\d+)')
+ .getpath('process_logdir'))
+ for path in paths:
+ try:
+ process, run = path_re.match(path).groups()
+ except:
+ continue
+ yield process, int(run)
+
+ def get_process_logs(self, task_id, log_dir):
+ for process, run in self.get_process_runs(task_id, log_dir):
+ for logtype in ('stdout', 'stderr'):
+ path = (self._pathspec.with_filename(logtype).given(root=self._root_dir,
+ task_id=task_id,
+ log_dir=log_dir,
+ process=process,
+ run=run)
+ .getpath('process_logdir'))
+ if os.path.exists(path):
+ yield path
+
+ def get_checkpoint(self, task_id):
+ return self._pathspec.given(root=self._root_dir, task_id=task_id).getpath('runner_checkpoint')
+
+ def get_process_checkpoints(self, task_id):
+ matching_paths = glob.glob(self._pathspec.given(root=self._root_dir,
+ task_id=task_id,
+ process='*')
+ .getpath('process_checkpoint'))
+ path_re = re.compile(self._pathspec.given(root=re.escape(self._root_dir),
+ task_id=re.escape(task_id),
+ process='(\S+)')
+ .getpath('process_checkpoint'))
+ for path in matching_paths:
+ try:
+ process, = path_re.match(path).groups()
+ except:
+ continue
+ yield path
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/apache/thermos/monitoring/disk.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/thermos/monitoring/disk.py b/src/main/python/apache/thermos/monitoring/disk.py
new file mode 100644
index 0000000..9116746
--- /dev/null
+++ b/src/main/python/apache/thermos/monitoring/disk.py
@@ -0,0 +1,181 @@
+"""Sample disk usage under a particular path
+
+This module provides threads which can be used to gather information on the disk utilisation
+under a particular path.
+
+Currently, there are two threads available:
+ - DiskCollectorThread, which periodically uses a basic brute-force approach (os.stat()ing every
+ file within the path)
+ - InotifyDiskCollectorThread, which updates disk utilisation dynamically by using inotify to
+ monitor disk changes within the path
+
+"""
+
+import os
+import threading
+import time
+from Queue import Empty, Queue
+
+from twitter.common import log
+from twitter.common.dirutil import du, safe_bsize
+from twitter.common.exceptions import ExceptionalThread
+from twitter.common.lang import Lockable
+from twitter.common.quantity import Amount, Time
+
+from watchdog.observers import Observer as WatchdogObserver
+from watchdog.events import (
+ FileSystemEventHandler,
+ FileCreatedEvent,
+ FileDeletedEvent,
+ FileModifiedEvent,
+ FileMovedEvent,
+)
+
+
+class DiskCollectorThread(ExceptionalThread):
+ """ Thread to calculate aggregate disk usage under a given path using a simple algorithm """
+ def __init__(self, path):
+ self.path = path
+ self.value = None
+ self.event = threading.Event()
+ super(DiskCollectorThread, self).__init__()
+ self.daemon = True
+
+ def run(self):
+ log.debug("DiskCollectorThread: starting collection of %s" % self.path)
+ self.value = du(self.path)
+ log.debug("DiskCollectorThread: finished collection of %s" % self.path)
+ self.event.set()
+
+ def finished(self):
+ return self.event.is_set()
+
+
+class DiskCollector(Lockable):
+ """ Spawn a background thread to sample disk usage """
+ def __init__(self, root):
+ self._root = root
+ self._thread = None
+ self._value = 0
+ super(DiskCollector, self).__init__()
+
+ @Lockable.sync
+ def sample(self):
+ """ Trigger collection of sample, if not already begun """
+ if self._thread is None:
+ self._thread = DiskCollectorThread(self._root)
+ self._thread.start()
+
+ @property
+ @Lockable.sync
+ def value(self):
+ """ Retrieve value of disk usage """
+ if self._thread is not None and self._thread.finished():
+ self._value = self._thread.value
+ self._thread = None
+ return self._value
+
+ @property
+ @Lockable.sync
+ def completed_event(self):
+ """ Return a threading.Event that will block until an in-progress disk collection is complete,
+ or block indefinitely otherwise. Use with caution! (i.e.: set a timeout) """
+ if self._thread is not None:
+ return self._thread.event
+ else:
+ return threading.Event()
+
+
+class InotifyDiskCollectorThread(ExceptionalThread, FileSystemEventHandler):
+ """ Thread to calculate aggregate disk usage under a given path
+
+ Note that while this thread uses inotify (through the watchdog module) to monitor disk events in
+ "real time", the actual processing of events is only performed periodically (configured via
+ COLLECTION_INTERVAL)
+
+ """
+ INTERESTING_EVENTS = (FileCreatedEvent, FileDeletedEvent, FileModifiedEvent, FileMovedEvent)
+ COLLECTION_INTERVAL = Amount(5, Time.SECONDS)
+
+ def __init__(self, path):
+ self._path = path
+ self._files = {} # file path => size (bytes)
+ self._queue = Queue()
+ self._observer = WatchdogObserver()
+ super(InotifyDiskCollectorThread, self).__init__()
+ self.daemon = True
+
+ def dispatch(self, event):
+ """ Dispatch all interesting events to the internal queue """
+ if isinstance(event, self.INTERESTING_EVENTS):
+ self._queue.put(event)
+
+ def _initialize(self):
+ """ Collect an initial snapshot of the disk usage in the path """
+ log.debug("Starting watchdog observer to collect events...")
+ self._observer.schedule(self, path=self._path, recursive=True)
+ self._observer.start()
+ log.debug("Collecting initial disk usage sample...")
+ for root, _, files in os.walk(self._path):
+ for filename in files:
+ f = os.path.join(root, filename)
+ self._files[f] = safe_bsize(f)
+
+ def _process_events(self):
+ """ Deduplicate and process watchdog events, updating the internal file store appropriately """
+ file_ops = {}
+
+ def remove_file(path):
+ self._files.pop(path, None)
+ def stat_file(path):
+ self._files[path] = safe_bsize(path)
+
+ while not self._to_process.empty():
+ event = self._to_process.get()
+ if isinstance(event, (FileCreatedEvent, FileModifiedEvent)):
+ file_ops[event.src_path] = lambda: stat_file(event.src_path)
+ elif isinstance(event, FileDeletedEvent):
+ file_ops[event.src_path] = lambda: remove_file(event.src_path)
+ elif isinstance(event, FileMovedEvent):
+ file_ops[event.src_path] = lambda: remove_file(event.src_path)
+ file_ops[event.dest_path] = lambda: stat_file(event.dest_path)
+
+ for op in file_ops.values():
+ op()
+
+ def run(self):
+ """ Loop indefinitely, periodically processing watchdog/inotify events. """
+ self._initialize()
+ log.debug("Initialization complete. Moving to handling events.")
+ while True:
+ next = time.time() + self.COLLECTION_INTERVAL.as_(Time.SECONDS)
+ if not self._queue.empty():
+ self._to_process, self._queue = self._queue, Queue()
+ self._process_events()
+ time.sleep(max(0, next - time.time()))
+
+ @property
+ def value(self):
+ return sum(self._files.itervalues())
+
+
+class InotifyDiskCollector(object):
+ """ Spawn a background thread to sample disk usage """
+ def __init__(self, root):
+ self._root = root
+ self._thread = InotifyDiskCollectorThread(self._root)
+
+ def sample(self):
+ """ Trigger disk collection loop. """
+ if not os.path.exists(self._root):
+ log.error('Cannot start monitoring path until it exists')
+ elif not self._thread.is_alive():
+ self._thread.start()
+
+ @property
+ def value(self):
+ return self._thread.value
+
+ @property
+ def completed_event(self):
+ return threading.Event()
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/apache/thermos/monitoring/garbage.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/thermos/monitoring/garbage.py b/src/main/python/apache/thermos/monitoring/garbage.py
new file mode 100644
index 0000000..4ac0d3a
--- /dev/null
+++ b/src/main/python/apache/thermos/monitoring/garbage.py
@@ -0,0 +1,183 @@
+from abc import abstractmethod
+from collections import namedtuple
+import os
+import sys
+import time
+
+from twitter.common.dirutil import safe_delete, safe_rmtree, safe_bsize
+from twitter.common.lang import Interface
+from twitter.common.quantity import Amount, Data, Time
+from twitter.thermos.common.ckpt import CheckpointDispatcher
+from twitter.thermos.common.path import TaskPath
+
+from .detector import TaskDetector
+
+
+class TaskGarbageCollector(object):
+ def __init__(self, root):
+ self._root = root
+ self._detector = TaskDetector(root=self._root)
+ self._states = {}
+
+ def state(self, task_id):
+ if task_id not in self._states:
+ self._states[task_id] = CheckpointDispatcher.from_file(self._detector.get_checkpoint(task_id))
+ return self._states[task_id]
+
+ def get_age(self, task_id):
+ return os.path.getmtime(self._detector.get_checkpoint(task_id))
+
+ def get_finished_tasks(self):
+ return [task_id for _, task_id in self._detector.get_task_ids(state='finished')]
+
+ def get_metadata(self, task_id, with_size=True):
+ runner_ckpt = self._detector.get_checkpoint(task_id)
+ process_ckpts = [ckpt for ckpt in self._detector.get_process_checkpoints(task_id)]
+ json_spec = TaskPath(root=self._root, task_id=task_id, state='finished').getpath('task_path')
+ for path in [json_spec, runner_ckpt] + process_ckpts:
+ if with_size:
+ yield path, safe_bsize(path)
+ else:
+ yield path
+
+ def get_logs(self, task_id, with_size=True):
+ state = self.state(task_id)
+ if state and state.header:
+ for path in self._detector.get_process_logs(task_id, state.header.log_dir):
+ if with_size:
+ yield path, safe_bsize(path)
+ else:
+ yield path
+
+ def get_data(self, task_id, with_size=True):
+ state = self.state(task_id)
+ if state and state.header and state.header.sandbox:
+ for root, dirs, files in os.walk(state.header.sandbox):
+ for file in files:
+ filename = os.path.join(root, file)
+ if with_size:
+ yield filename, safe_bsize(filename)
+ else:
+ yield filename
+
+ def erase_task(self, task_id):
+ self.erase_data(task_id)
+ self.erase_logs(task_id)
+ self.erase_metadata(task_id)
+
+ def erase_metadata(self, task_id):
+ for fn in self.get_metadata(task_id, with_size=False):
+ safe_delete(fn)
+ safe_rmtree(TaskPath(root=self._root, task_id=task_id).getpath('checkpoint_path'))
+
+ def erase_logs(self, task_id):
+ for fn in self.get_logs(task_id, with_size=False):
+ safe_delete(fn)
+ state = self.state(task_id)
+ if state and state.header:
+ safe_rmtree(TaskPath(root=self._root, task_id=task_id, log_dir=state.header.log_dir)
+ .getpath('process_logbase'))
+
+ def erase_data(self, task_id):
+ # TODO(wickman)
+ # This could be potentially dangerous if somebody naively runs their sandboxes in e.g.
+ # $HOME or / or similar. Perhaps put a guard somewhere?
+ for fn in self.get_data(task_id, with_size=False):
+ os.unlink(fn)
+ state = self.state(task_id)
+ if state and state.header and state.header.sandbox:
+ safe_rmtree(state.header.sandbox)
+
+
+class TaskGarbageCollectionPolicy(Interface):
+ def __init__(self, collector):
+ if not isinstance(collector, TaskGarbageCollector):
+ raise ValueError(
+ "Expected collector to be a TaskGarbageCollector, got %s" % collector.__class__.__name__)
+ self._collector = collector
+
+ @property
+ def collector(self):
+ return self._collector
+
+ @abstractmethod
+ def run(self):
+ """Returns a list of task_ids that should be garbage collected given the specified policy."""
+
+
+class DefaultCollector(TaskGarbageCollectionPolicy):
+ def __init__(self, collector, **kw):
+ """
+ Default garbage collection policy.
+
+ Arguments that may be specified:
+ max_age: Amount(Time) (max age of a retained task) [default: infinity]
+ max_space: Amount(Data) (max space to keep) [default: infinity]
+ max_tasks: int (max number of tasks to keep) [default: infinity]
+ include_metadata: boolean (Whether or not to include metadata in the
+ space calculations.) [default: True]
+ include_logs: boolean (Whether or not to include logs in the
+ space calculations.) [default: True]
+ verbose: boolean (whether or not to log) [default: False]
+ logger: callable (function to call with log messages) [default: sys.stdout.write]
+ """
+ self._max_age = kw.get('max_age', Amount(10**10, Time.DAYS))
+ self._max_space = kw.get('max_space', Amount(10**10, Data.TB))
+ self._max_tasks = kw.get('max_tasks', 10**10)
+ self._include_metadata = kw.get('include_metadata', True)
+ self._include_logs = kw.get('include_logs', True)
+ self._verbose = kw.get('verbose', False)
+ self._logger = kw.get('logger', sys.stdout.write)
+ TaskGarbageCollectionPolicy.__init__(self, collector)
+
+ def log(self, msg):
+ if self._verbose:
+ self._logger(msg)
+
+ def run(self):
+ tasks = []
+ now = time.time()
+
+ TaskTuple = namedtuple('TaskTuple', 'task_id age metadata_size log_size data_size')
+ for task_id in self.collector.get_finished_tasks():
+ age = Amount(int(now - self.collector.get_age(task_id)), Time.SECONDS)
+ self.log('Analyzing task %s (age: %s)... ' % (task_id, age))
+ metadata_size = Amount(sum(sz for _, sz in self.collector.get_metadata(task_id)), Data.BYTES)
+ self.log(' metadata %.1fKB ' % metadata_size.as_(Data.KB))
+ log_size = Amount(sum(sz for _, sz in self.collector.get_logs(task_id)), Data.BYTES)
+ self.log(' logs %.1fKB ' % log_size.as_(Data.KB))
+ data_size = Amount(sum(sz for _, sz in self.collector.get_data(task_id)), Data.BYTES)
+ self.log(' data %.1fMB ' % data_size.as_(Data.MB))
+ tasks.append(TaskTuple(task_id, age, metadata_size, log_size, data_size))
+
+ gc_tasks = set()
+ gc_tasks.update(task for task in tasks if task.age > self._max_age)
+ self.log('After age filter: %s tasks' % len(gc_tasks))
+
+ def total_gc_size(task):
+ return sum([task.data_size,
+ task.metadata_size if self._include_metadata else Amount(0, Data.BYTES),
+ task.log_size if self._include_logs else Amount(0, Data.BYTES)],
+ Amount(0, Data.BYTES))
+
+ total_used = Amount(0, Data.BYTES)
+ for task in sorted(tasks, key=lambda tsk: tsk.age, reverse=True):
+ if task not in gc_tasks:
+ total_used += total_gc_size(task)
+ if total_used > self._max_space:
+ gc_tasks.add(task)
+ self.log('After size filter: %s tasks' % len(gc_tasks))
+
+ for task in sorted(tasks, key=lambda tsk: tsk.age, reverse=True):
+ if task not in gc_tasks and len(tasks) - len(gc_tasks) > self._max_tasks:
+ gc_tasks.add(task)
+ self.log('After total task filter: %s tasks' % len(gc_tasks))
+
+ self.log('Deciding to garbage collect the following tasks:')
+ if gc_tasks:
+ for task in gc_tasks:
+ self.log(' %s' % repr(task))
+ else:
+ self.log(' None.')
+
+ return gc_tasks
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/apache/thermos/monitoring/monitor.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/thermos/monitoring/monitor.py b/src/main/python/apache/thermos/monitoring/monitor.py
new file mode 100644
index 0000000..5af67fe
--- /dev/null
+++ b/src/main/python/apache/thermos/monitoring/monitor.py
@@ -0,0 +1,125 @@
+"""Monitor the state of Thermos tasks on a system
+
+This module contains the TaskMonitor, used to reconstruct the state of active or finished Thermos
+tasks based on their checkpoint streams. It exposes two key pieces of information about a Task, both
+as their corresponding Thrift structs:
+ - a RunnerState, representing the latest state of the Task
+ - a list of ProcessStates, representing the processes currently running within the Task
+
+"""
+
+import os
+import copy
+import errno
+import threading
+
+from twitter.common import log
+from twitter.common.recordio import ThriftRecordReader
+from twitter.thermos.common.ckpt import CheckpointDispatcher
+
+from gen.twitter.thermos.ttypes import (
+ ProcessState,
+ RunnerCkpt,
+ RunnerState,
+ TaskState,
+)
+
+
+class TaskMonitor(object):
+ """
+ Class responsible for reconstructing and monitoring the state of an individual Thermos task via
+ its runner checkpoint. Also exports information on active processes in the task.
+ """
+
+ def __init__(self, pathspec, task_id):
+ self._task_id = task_id
+ self._dispatcher = CheckpointDispatcher()
+ self._runnerstate = RunnerState(processes={})
+ self._runner_ckpt = pathspec.given(task_id=task_id).getpath('runner_checkpoint')
+ self._active_file, self._finished_file = (
+ pathspec.given(task_id=task_id, state=state).getpath('task_path')
+ for state in ('active', 'finished'))
+ self._ckpt_head = 0
+ self._apply_states()
+ self._lock = threading.Lock()
+
+ def _apply_states(self):
+ """
+ os.stat() the corresponding checkpoint stream of this task and determine if there are new ckpt
+ records. Attempt to read those records and update the high watermark for that stream.
+ Returns True if new states were applied, False otherwise.
+ """
+ ckpt_offset = None
+ try:
+ ckpt_offset = os.stat(self._runner_ckpt).st_size
+
+ updated = False
+ if self._ckpt_head < ckpt_offset:
+ with open(self._runner_ckpt, 'r') as fp:
+ fp.seek(self._ckpt_head)
+ rr = ThriftRecordReader(fp, RunnerCkpt)
+ while True:
+ runner_update = rr.try_read()
+ if not runner_update:
+ break
+ try:
+ self._dispatcher.dispatch(self._runnerstate, runner_update)
+ except CheckpointDispatcher.InvalidSequenceNumber as e:
+ log.error('Checkpoint stream is corrupt: %s' % e)
+ break
+ new_ckpt_head = fp.tell()
+ updated = self._ckpt_head != new_ckpt_head
+ self._ckpt_head = new_ckpt_head
+ return updated
+ except OSError as e:
+ if e.errno == errno.ENOENT:
+ # The log doesn't yet exist, will retry later.
+ log.warning('Could not read from discovered task %s.' % self._task_id)
+ return False
+ else:
+ raise
+
+ def refresh(self):
+ """
+ Check to see if there are new updates and apply them. Return true if
+ updates were applied, false otherwise.
+ """
+ with self._lock:
+ return self._apply_states()
+
+ def get_state(self):
+ """
+ Get the latest state of this Task.
+ """
+ with self._lock:
+ self._apply_states()
+ return copy.deepcopy(self._runnerstate)
+
+ def task_state(self):
+ state = self.get_state()
+ return state.statuses[-1].state if state.statuses else TaskState.ACTIVE
+
+ @property
+ def active(self):
+ return os.path.exists(self._active_file)
+
+ @property
+ def finished(self):
+ return os.path.exists(self._finished_file)
+
+ def get_active_processes(self):
+ """
+ Get active processes. Returned is a list of tuples of the form:
+ (ProcessStatus object of running object, its run number)
+ """
+ active_processes = []
+ with self._lock:
+ self._apply_states()
+ state = self._runnerstate
+ for process, runs in state.processes.items():
+ if len(runs) == 0:
+ continue
+ last_run = runs[-1]
+ if last_run.state == ProcessState.RUNNING:
+ active_processes.append((last_run, len(runs) - 1))
+ return active_processes
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/apache/thermos/monitoring/process.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/thermos/monitoring/process.py b/src/main/python/apache/thermos/monitoring/process.py
new file mode 100644
index 0000000..294682f
--- /dev/null
+++ b/src/main/python/apache/thermos/monitoring/process.py
@@ -0,0 +1,49 @@
+"""Represent resource consumption statistics for processes
+
+This module exposes one class: the ProcessSample, used to represent resource consumption. A single
+ProcessSample might correspond to one individual process, or to an aggregate of multiple processes.
+
+"""
+
+from collections import namedtuple
+
+
+class ProcessSample(namedtuple('ProcessSample', 'rate user system rss vms nice status threads')):
+ """ Sample of statistics about a process's resource consumption (either a single process or an
+ aggregate of processes) """
+
+ @staticmethod
+ def empty():
+ return ProcessSample(rate=0, user=0, system=0, rss=0, vms=0, nice=None, status=None, threads=0)
+
+ def __add__(self, other):
+ if self.nice is not None and other.nice is None:
+ nice = self.nice
+ else:
+ nice = other.nice
+ if self.status is not None and other.status is None:
+ status = self.status
+ else:
+ status = other.status
+ return ProcessSample(
+ rate = self.rate + other.rate,
+ user = self.user + other.user,
+ system = self.system + other.system,
+ rss = self.rss + other.rss,
+ vms = self.vms + other.vms,
+ nice = nice,
+ status = status,
+ threads = self.threads + other.threads)
+
+ def to_dict(self):
+ return dict(
+ cpu = self.rate,
+ ram = self.rss,
+ user = self.user,
+ system = self.system,
+ rss = self.rss,
+ vms = self.vms,
+ nice = self.nice,
+ status = str(self.status),
+ threads = self.threads
+ )
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/apache/thermos/monitoring/process_collector_psutil.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/thermos/monitoring/process_collector_psutil.py b/src/main/python/apache/thermos/monitoring/process_collector_psutil.py
new file mode 100644
index 0000000..7b68173
--- /dev/null
+++ b/src/main/python/apache/thermos/monitoring/process_collector_psutil.py
@@ -0,0 +1,92 @@
+""" Sample resource consumption statistics for processes using psutil """
+
+from operator import attrgetter
+from time import time
+
+from twitter.common import log
+
+from .process import ProcessSample
+
+from psutil import Process
+from psutil import NoSuchProcess, AccessDenied, Error as PsutilError
+
+
+def process_to_sample(process):
+ """ Given a psutil.Process, return a current ProcessSample """
+ try:
+ # the nonblocking get_cpu_percent call is stateful on a particular Process object, and hence
+ # >2 consecutive calls are required before it will return a non-zero value
+ rate = process.get_cpu_percent(0.0) / 100.0
+ user, system = process.get_cpu_times()
+ rss, vms = process.get_memory_info()
+ nice = process.nice
+ status = process.status
+ threads = process.get_num_threads()
+ return ProcessSample(rate, user, system, rss, vms, nice, status, threads)
+ except (AccessDenied, NoSuchProcess) as e:
+ log.warning('Error during process sampling [pid=%s]: %s' % (process.pid, e))
+ return ProcessSample.empty()
+
+
+class ProcessTreeCollector(object):
+ """ Collect resource consumption statistics for a process and its children """
+ def __init__(self, pid):
+ """ Given a pid """
+ self._pid = pid
+ self._process = None # psutil.Process
+ self._sampled_tree = {} # pid => ProcessSample
+ self._sample = ProcessSample.empty()
+ self._stamp = None
+ self._rate = 0.0
+ self._procs = 1
+
+ def sample(self):
+ """ Collate and aggregate ProcessSamples for process and children
+ Returns None: result is stored in self.value
+ """
+ try:
+ last_sample, last_stamp = self._sample, self._stamp
+ if self._process is None:
+ self._process = Process(self._pid)
+ parent = self._process
+ parent_sample = process_to_sample(parent)
+ new_samples = dict(
+ (proc.pid, process_to_sample(proc))
+ for proc in parent.get_children(recursive=True)
+ )
+ new_samples[self._pid] = parent_sample
+
+ except PsutilError as e:
+ log.warning('Error during process sampling: %s' % e)
+ self._sample = ProcessSample.empty()
+ self._rate = 0.0
+
+ else:
+ last_stamp = self._stamp
+ self._stamp = time()
+ # for most stats, calculate simple sum to aggregate
+ self._sample = sum(new_samples.values(), ProcessSample.empty())
+ # cpu consumption is more complicated
+ # We require at least 2 generations of a process before we can calculate rate, so for all
+ # current processes that were not running in the previous sample, compare to an empty sample
+ if self._sampled_tree and last_stamp:
+ new = new_samples.values()
+ old = [self._sampled_tree.get(pid, ProcessSample.empty()) for pid in new_samples.keys()]
+ new_user_sys = sum(map(attrgetter('user'), new)) + sum(map(attrgetter('system'), new))
+ old_user_sys = sum(map(attrgetter('user'), old)) + sum(map(attrgetter('system'), old))
+ self._rate = (new_user_sys - old_user_sys) / (self._stamp - last_stamp)
+ log.debug("Calculated rate for pid=%s and children: %s" % (self._process.pid, self._rate))
+ self._sampled_tree = new_samples
+
+ @property
+ def value(self):
+ """ Aggregated ProcessSample representing resource consumption of the tree """
+ # Since we don't trust the CPU consumption returned by psutil, replace it with our own in the
+ # exported ProcessSample
+ return self._sample._replace(rate=self._rate)
+
+ @property
+ def procs(self):
+ """ Number of active processes in the tree """
+ return len(self._sampled_tree)
+
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/apache/thermos/monitoring/resource.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/thermos/monitoring/resource.py b/src/main/python/apache/thermos/monitoring/resource.py
new file mode 100644
index 0000000..468f1b1
--- /dev/null
+++ b/src/main/python/apache/thermos/monitoring/resource.py
@@ -0,0 +1,222 @@
+"""Monitor the resource consumption of Thermos tasks
+
+This module contains classes used to monitor the resource consumption (e.g. CPU, RAM, disk) of
+Thermos tasks. Resource monitoring of a Thermos task typically treats the task as an aggregate of
+all the processes within it. Importantly, this excludes the process(es) of Thermos itself (i.e. the
+TaskRunner and any other wrappers involved in launching a task).
+
+The ResourceMonitorBase defines the interface for other components (for example, the Thermos
+TaskObserver) to interact with and retrieve information about a Task's resource consumption. The
+canonical/reference implementation of a ResourceMonitor is the TaskResourceMonitor, a thread which
+actively monitors resources for a particular task by periodically polling process information and
+disk consumption and retaining a limited (FIFO) in-memory history of this data.
+
+"""
+
+from abc import abstractmethod
+from bisect import bisect_left
+from collections import namedtuple
+from operator import attrgetter
+import platform
+import threading
+import time
+
+from twitter.common import log
+from twitter.common.collections import RingBuffer
+from twitter.common.concurrent import EventMuxer
+from twitter.common.lang import Interface
+from twitter.common.quantity import Amount, Time
+
+from .disk import DiskCollector
+from .monitor import TaskMonitor
+from .process import ProcessSample
+from .process_collector_psutil import ProcessTreeCollector
+
+
+class ResourceMonitorBase(Interface):
+ """ Defines the interface for interacting with a ResourceMonitor """
+
+ class Error(Exception): pass
+
+ class ResourceResult(namedtuple('ResourceResult', 'num_procs process_sample disk_usage')):
+ pass
+
+ @abstractmethod
+ def sample(self):
+ """ Return a sample of the resource consumption of the task right now
+
+ Returns a tuple of (timestamp, ResourceResult)
+ """
+
+ @abstractmethod
+ def sample_at(self, time):
+ """ Return a sample of the resource consumption as close as possible to the specified time
+
+ Returns a tuple of (timestamp, ResourceResult)
+ """
+
+ @abstractmethod
+ def sample_by_process(self, process_name):
+ """ Return a sample of the resource consumption of a specific process in the task right now
+
+ Returns a ProcessSample
+ """
+
+
+class ResourceHistory(object):
+ """Simple class to contain a RingBuffer (fixed-length FIFO) history of resource samples, with the
+ mapping: timestamp => (number_of_procs, ProcessSample, disk_usage_in_bytes)
+ """
+ def __init__(self, maxlen, initialize=True):
+ if not maxlen >= 1:
+ raise ValueError("maxlen must be greater than 0")
+ self._maxlen = maxlen
+ self._values = RingBuffer(maxlen, None)
+ if initialize:
+ self.add(time.time(), ResourceMonitorBase.ResourceResult(0, ProcessSample.empty(), 0))
+
+ def add(self, timestamp, value):
+ """Store a new resource sample corresponding to the given timestamp"""
+ if self._values and not timestamp >= self._values[-1][0]:
+ raise ValueError("Refusing to add timestamp in the past!")
+ self._values.append((timestamp, value))
+
+ def get(self, timestamp):
+ """Get the resource sample nearest to the given timestamp"""
+ closest = min(bisect_left(self._values, (timestamp, None)), len(self) - 1)
+ return self._values[closest]
+
+ def __iter__(self):
+ return iter(self._values)
+
+ def __len__(self):
+ return len(self._values)
+
+ def __repr__(self):
+ return 'ResourceHistory(%s)' % ', '.join([str(r) for r in self._values])
+
+
+class TaskResourceMonitor(ResourceMonitorBase, threading.Thread):
+ """ Lightweight thread to aggregate resource consumption for a task's constituent processes.
+ Actual resource calculation is delegated to collectors; this class periodically polls the
+ collectors and aggregates into a representation for the entire task. Also maintains a limited
+ history of previous sample results.
+ """
+
+ MAX_HISTORY = 10000 # magic number
+
+ def __init__(self, task_monitor, sandbox,
+ process_collector=ProcessTreeCollector, disk_collector=DiskCollector,
+ process_collection_interval=Amount(20, Time.SECONDS),
+ disk_collection_interval=Amount(1, Time.MINUTES),
+ history_time=Amount(1, Time.HOURS)):
+ """
+ task_monitor: TaskMonitor object specifying the task whose resources should be monitored
+ sandbox: Directory for which to monitor disk utilisation
+ """
+ self._task_monitor = task_monitor # exposes PIDs, sandbox
+ self._task_id = task_monitor._task_id
+ log.debug('Initialising resource collection for task %s' % self._task_id)
+ self._process_collectors = dict() # ProcessStatus => ProcessTreeCollector
+ # TODO(jon): sandbox is also available through task_monitor, but typically the first checkpoint
+ # isn't written (and hence the header is not available) by the time we initialise here
+ self._sandbox = sandbox
+ self._process_collector_factory = process_collector
+ self._disk_collector = disk_collector(self._sandbox)
+ self._process_collection_interval = process_collection_interval.as_(Time.SECONDS)
+ self._disk_collection_interval = disk_collection_interval.as_(Time.SECONDS)
+ min_collection_interval = min(self._process_collection_interval, self._disk_collection_interval)
+ history_length = int(history_time.as_(Time.SECONDS) / min_collection_interval)
+ if history_length > self.MAX_HISTORY:
+ raise ValueError("Requested history length too large")
+ log.debug("Initialising ResourceHistory of length %s" % history_length)
+ self._history = ResourceHistory(history_length)
+ self._kill_signal = threading.Event()
+ threading.Thread.__init__(self)
+ self.daemon = True
+
+ def sample(self):
+ if not self.is_alive():
+ log.warning("TaskResourceMonitor not running - sample may be inaccurate")
+ return self.sample_at(time.time())
+
+ def sample_at(self, timestamp):
+ return self._history.get(timestamp)
+
+ def sample_by_process(self, process_name):
+ try:
+ process = [process for process in self._get_active_processes()
+ if process.process == process_name].pop()
+ except IndexError:
+ raise ValueError('No active process found with name "%s" in this task' % process_name)
+ else:
+ # Since this might be called out of band (before the main loop is aware of the process)
+ if process not in self._process_collectors:
+ self._process_collectors[process] = self._process_collector_factory(process.pid)
+
+ self._process_collectors[process].sample()
+ return self._process_collectors[process].value
+
+ def _get_active_processes(self):
+ """Get a list of ProcessStatus objects representing currently-running processes in the task"""
+ return [process for process, _ in self._task_monitor.get_active_processes()]
+
+ def kill(self):
+ """Signal that the thread should cease collecting resources and terminate"""
+ self._kill_signal.set()
+
+ def run(self):
+ """Thread entrypoint. Loop indefinitely, polling collectors at self._collection_interval and
+ collating samples."""
+
+ log.debug('Commencing resource monitoring for task "%s"' % self._task_id)
+ next_process_collection = 0
+ next_disk_collection = 0
+
+ while not self._kill_signal.is_set():
+
+ now = time.time()
+
+ if now > next_process_collection:
+ next_process_collection = now + self._process_collection_interval
+ actives = set(self._get_active_processes())
+ current = set(self._process_collectors)
+ for process in current - actives:
+ log.debug('Process "%s" (pid %s) no longer active, removing from monitored processes' %
+ (process.process, process.pid))
+ self._process_collectors.pop(process)
+ for process in actives - current:
+ log.debug('Adding process "%s" (pid %s) to resource monitoring' %
+ (process.process, process.pid))
+ self._process_collectors[process] = self._process_collector_factory(process.pid)
+ for process, collector in self._process_collectors.iteritems():
+ log.debug('Collecting sample for process "%s" (pid %s) and children' %
+ (process.process, process.pid))
+ collector.sample()
+
+ if now > next_disk_collection:
+ next_disk_collection = now + self._disk_collection_interval
+ log.debug('Collecting disk sample for %s' % self._sandbox)
+ self._disk_collector.sample()
+
+ try:
+ aggregated_procs = sum(map(attrgetter('procs'), self._process_collectors.values()))
+ aggregated_sample = sum(map(attrgetter('value'), self._process_collectors.values()),
+ ProcessSample.empty())
+ self._history.add(now, self.ResourceResult(aggregated_procs, aggregated_sample,
+ self._disk_collector.value))
+ log.debug("Recorded resource sample at %s" % now)
+ except ValueError as err:
+ log.warning("Error recording resource sample: %s" % err)
+
+ # Sleep until any of the following conditions are met:
+ # - it's time for the next disk collection
+ # - it's time for the next process collection
+ # - the result from the last disk collection is available via the DiskCollector
+ # - the TaskResourceMonitor has been killed via self._kill_signal
+ now = time.time()
+ next_collection = min(next_process_collection - now, next_disk_collection - now)
+ EventMuxer(self._kill_signal, self._disk_collector.completed_event
+ ).wait(timeout=max(0, next_collection))
+
+ log.debug('Stopping resource monitoring for task "%s"' % self._task_id)
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/apache/thermos/observer/BUILD
----------------------------------------------------------------------
diff --git a/src/main/python/apache/thermos/observer/BUILD b/src/main/python/apache/thermos/observer/BUILD
new file mode 100644
index 0000000..00ea9cb
--- /dev/null
+++ b/src/main/python/apache/thermos/observer/BUILD
@@ -0,0 +1,51 @@
+import os
+
+python_library(
+ name = 'observed_task',
+ sources = ['observed_task.py'],
+ dependencies = [
+ pants('aurora/twitterdeps/src/python/twitter/common/lang'),
+ pants('aurora/twitterdeps/src/python/twitter/common/log'),
+ pants('src/main/python/twitter/thermos:pystachio'),
+ pants('src/main/python/twitter/thermos/common:ckpt'),
+ pants('src/main/python/twitter/thermos/config'),
+ ]
+)
+
+python_library(
+ name = 'task_observer',
+ sources = ['task_observer.py'],
+ dependencies = [
+ pants(':observed_task'),
+ pants('aurora/twitterdeps/src/python/twitter/common/exceptions'),
+ pants('aurora/twitterdeps/src/python/twitter/common/lang'),
+ pants('aurora/twitterdeps/src/python/twitter/common/log'),
+ pants('aurora/twitterdeps/src/python/twitter/common/quantity'),
+ pants('src/main/python/twitter/thermos/common:path'),
+ pants('src/main/python/twitter/thermos/monitoring:detector'),
+ pants('src/main/python/twitter/thermos/monitoring:monitor'),
+ pants('src/main/python/twitter/thermos/monitoring:process'),
+ pants('src/main/python/twitter/thermos/monitoring:resource'),
+ pants('src/main/thrift/com/twitter/thermos:py-thrift'),
+ ]
+)
+
+python_library(
+ name = 'observer',
+ dependencies = [
+ pants(':task_observer'),
+ pants('src/main/python/twitter/thermos/observer/http:http_observer'),
+
+ # covering libraries
+ pants('src/main/python/twitter/thermos/common'),
+ pants('src/main/python/twitter/thermos/config'),
+ pants('src/main/python/twitter/thermos/monitoring'),
+ ],
+ provides = setup_py(
+ name = 'twitter.thermos.observer',
+ version = open(os.path.join(get_buildroot(), '.auroraversion')).read().strip().lower(),
+ description = 'The Thermos observer web interface.',
+ ).with_binaries(
+ thermos_observer = pants('src/main/python/twitter/thermos/observer/bin:thermos_observer'),
+ )
+)
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/apache/thermos/observer/__init__.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/thermos/observer/__init__.py b/src/main/python/apache/thermos/observer/__init__.py
new file mode 100644
index 0000000..e69de29
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/apache/thermos/observer/bin/BUILD
----------------------------------------------------------------------
diff --git a/src/main/python/apache/thermos/observer/bin/BUILD b/src/main/python/apache/thermos/observer/bin/BUILD
new file mode 100644
index 0000000..26eb148
--- /dev/null
+++ b/src/main/python/apache/thermos/observer/bin/BUILD
@@ -0,0 +1,14 @@
+python_binary(
+ name = 'thermos_observer',
+ source = 'thermos_observer.py',
+ entry_point = 'twitter.thermos.observer.bin.thermos_observer:proxy_main',
+ dependencies = [
+ pants('aurora/twitterdeps/src/python/twitter/common/app'),
+ pants('aurora/twitterdeps/src/python/twitter/common/exceptions'),
+ pants('aurora/twitterdeps/src/python/twitter/common/http'),
+ pants('src/main/python/twitter/thermos:cherrypy'),
+ pants('src/main/python/twitter/thermos/common:path'),
+ pants('src/main/python/twitter/thermos/observer/http:http_observer'),
+ pants('src/main/python/twitter/thermos/observer:task_observer'),
+ ],
+)
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/apache/thermos/observer/bin/__init__.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/thermos/observer/bin/__init__.py b/src/main/python/apache/thermos/observer/bin/__init__.py
new file mode 100644
index 0000000..e69de29
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/apache/thermos/observer/bin/thermos_observer.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/thermos/observer/bin/thermos_observer.py b/src/main/python/apache/thermos/observer/bin/thermos_observer.py
new file mode 100644
index 0000000..9e7f7a0
--- /dev/null
+++ b/src/main/python/apache/thermos/observer/bin/thermos_observer.py
@@ -0,0 +1,55 @@
+from __future__ import print_function
+
+import socket
+import sys
+import time
+
+from twitter.common import app
+from twitter.common.exceptions import ExceptionalThread
+from twitter.common.http import HttpServer
+from twitter.common.http.diagnostics import DiagnosticsEndpoints
+from twitter.thermos.common.path import TaskPath
+from twitter.thermos.observer.task_observer import TaskObserver
+from twitter.thermos.observer.http.http_observer import BottleObserver
+
+
+app.add_option("--root",
+ dest="root",
+ metavar="DIR",
+ default=TaskPath.DEFAULT_CHECKPOINT_ROOT,
+ help="root checkpoint directory for thermos task runners")
+
+
+app.add_option("--port",
+ dest="port",
+ metavar="INT",
+ default=1338,
+ help="port number to listen on.")
+
+
+def proxy_main():
+ def main(args, opts):
+ if args:
+ print("ERROR: unrecognized arguments: %s\n" % (" ".join(args)), file=sys.stderr)
+ app.help()
+ sys.exit(1)
+
+ root_server = HttpServer()
+ root_server.mount_routes(DiagnosticsEndpoints())
+
+ task_observer = TaskObserver(opts.root)
+ task_observer.start()
+
+ bottle_wrapper = BottleObserver(task_observer)
+
+ root_server.mount_routes(bottle_wrapper)
+
+ def run():
+ root_server.run('0.0.0.0', opts.port, 'cherrypy')
+
+ et = ExceptionalThread(target=run)
+ et.daemon = True
+ et.start()
+ et.join()
+
+ app.main()
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/apache/thermos/observer/http/BUILD
----------------------------------------------------------------------
diff --git a/src/main/python/apache/thermos/observer/http/BUILD b/src/main/python/apache/thermos/observer/http/BUILD
new file mode 100644
index 0000000..9f3d587
--- /dev/null
+++ b/src/main/python/apache/thermos/observer/http/BUILD
@@ -0,0 +1,47 @@
+python_library(
+ name = 'json',
+ sources = ['json.py'],
+ dependencies = [
+ pants('aurora/twitterdeps/src/python/twitter/common/http'),
+ ]
+)
+
+python_library(
+ name = 'static_assets',
+ sources = ['static_assets.py'],
+ resources = rglobs('assets/*'),
+ dependencies = [
+ pants('src/main/python/twitter/thermos:bottle'),
+ ]
+)
+
+python_library(
+ name = 'templating',
+ sources = ['templating.py'],
+ resources = globs('templates/*.tpl'),
+)
+
+python_library(
+ name = 'file_browser',
+ sources = ['file_browser.py'],
+ dependencies = [
+ pants(':templating'),
+ pants('aurora/twitterdeps/src/python/twitter/common/log'),
+ pants('aurora/twitterdeps/src/python/twitter/common/http'),
+ pants('src/main/python/twitter/thermos:bottle'),
+ pants('src/main/python/twitter/thermos:mako'),
+ ]
+)
+
+python_library(
+ name = 'http_observer',
+ sources = ['__init__.py', 'http_observer.py'],
+ dependencies = [
+ pants(':file_browser'),
+ pants(':json'),
+ pants(':static_assets'),
+ pants(':templating'),
+ pants('aurora/twitterdeps/src/python/twitter/common/log'),
+ pants('aurora/twitterdeps/src/python/twitter/common/http'),
+ ]
+)
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/apache/thermos/observer/http/__init__.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/thermos/observer/http/__init__.py b/src/main/python/apache/thermos/observer/http/__init__.py
new file mode 100644
index 0000000..e69de29