You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aurora.apache.org by wf...@apache.org on 2013/12/31 22:20:14 UTC

[21/51] [partial] Rename twitter* and com.twitter to apache and org.apache directories to preserve all file history before the refactor.

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/apache/thermos/core/runner.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/thermos/core/runner.py b/src/main/python/apache/thermos/core/runner.py
new file mode 100644
index 0000000..b10cf16
--- /dev/null
+++ b/src/main/python/apache/thermos/core/runner.py
@@ -0,0 +1,905 @@
+""" Thermos runner.
+
+This module contains the TaskRunner, the core component of Thermos responsible for actually running
+tasks. It also contains several Handlers which define the behaviour on state transitions within the
+TaskRunner.
+
+There are three "active" states in a running Thermos task:
+  ACTIVE
+  CLEANING
+  FINALIZING
+
+A task in ACTIVE state is running regular processes.  The moment this task succeeds or goes over its
+failure limit, it then goes into CLEANING state, where it begins the staged termination of leftover
+processes (with SIGTERMs).  Once all processes have terminated, the task goes into FINALIZING state,
+where the processes marked with the 'final' bit run.  Once the task has gone into CLEANING state, it
+has a deadline for going into terminal state.  If it doesn't make it in time (at any point, whether
+in CLEANING or FINALIZING state), it is forced into terminal state through SIGKILLs of all live
+processes (coordinators, shells and the full process trees rooted at the shells.)
+
+TaskRunner.kill is implemented by forcing the task into CLEANING state and setting its finalization
+deadline manually.  So in practice, we implement Task preemption by calling kill with the
+finalization deadline = now + preemption wait, which gives the Task an opportunity to do graceful
+shutdown.  If preemption_wait=0, it will result in immediate SIGKILLs and then transition to the
+terminal state.
+
+"""
+
+from contextlib import contextmanager
+import errno
+from functools import partial
+import os
+import socket
+import sys
+import time
+import traceback
+
+from twitter.common import log
+from twitter.common.dirutil import safe_mkdir
+from twitter.common.quantity import Amount, Time
+from twitter.common.recordio import ThriftRecordReader
+
+from twitter.thermos.common.ckpt import (
+  CheckpointDispatcher,
+  UniversalStateHandler,
+  ProcessStateHandler,
+  TaskStateHandler)
+from twitter.thermos.common.path import TaskPath
+from twitter.thermos.common.planner import TaskPlanner
+from twitter.thermos.config.loader import (
+  ThermosConfigLoader,
+  ThermosProcessWrapper,
+  ThermosTaskWrapper,
+  ThermosTaskValidator)
+from twitter.thermos.config.schema import ThermosContext
+
+from gen.twitter.thermos.ttypes import (
+  ProcessState,
+  ProcessStatus,
+  RunnerCkpt,
+  RunnerHeader,
+  RunnerState,
+  TaskState,
+  TaskStatus,
+)
+
+from .helper import TaskRunnerHelper
+from .muxer import ProcessMuxer
+from .process import Process
+
+from pystachio import Environment
+
+
+# TODO(wickman) Currently this is messy because of all the private access into ._runner.
+# Clean this up by giving the TaskRunnerProcessHandler the components it should own, and
+# create a legitimate API contract into the Runner.
+class TaskRunnerProcessHandler(ProcessStateHandler):
+  """
+    Accesses these parts of the runner:
+
+      | _task_processes [array set, pop]
+      | _task_process_from_process_name [process name / sequence number => Process]
+      | _watcher [ProcessMuxer.register, unregister]
+      | _plan [add_success, add_failure, set_running]
+  """
+
+  def __init__(self, runner):
+    self._runner = runner
+
+  def on_waiting(self, process_update):
+    log.debug('Process on_waiting %s' % process_update)
+    self._runner._task_processes[process_update.process] = (
+      self._runner._task_process_from_process_name(
+        process_update.process, process_update.seq + 1))
+    self._runner._watcher.register(process_update.process, process_update.seq - 1)
+
+  def on_forked(self, process_update):
+    log.debug('Process on_forked %s' % process_update)
+    task_process = self._runner._task_processes[process_update.process]
+    task_process.rebind(process_update.coordinator_pid, process_update.fork_time)
+    self._runner._plan.set_running(process_update.process)
+
+  def on_running(self, process_update):
+    log.debug('Process on_running %s' % process_update)
+    self._runner._plan.set_running(process_update.process)
+
+  def _cleanup(self, process_update):
+    if not self._runner._recovery:
+      TaskRunnerHelper.kill_process(self._runner.state, process_update.process)
+
+  def on_success(self, process_update):
+    log.debug('Process on_success %s' % process_update)
+    log.info('Process(%s) finished successfully [rc=%s]' % (
+      process_update.process, process_update.return_code))
+    self._cleanup(process_update)
+    self._runner._task_processes.pop(process_update.process)
+    self._runner._watcher.unregister(process_update.process)
+    self._runner._plan.add_success(process_update.process)
+
+  def _on_abnormal(self, process_update):
+    log.info('Process %s had an abnormal termination' % process_update.process)
+    self._runner._task_processes.pop(process_update.process)
+    self._runner._watcher.unregister(process_update.process)
+
+  def on_failed(self, process_update):
+    log.debug('Process on_failed %s' % process_update)
+    log.info('Process(%s) failed [rc=%s]' % (process_update.process, process_update.return_code))
+    self._cleanup(process_update)
+    self._on_abnormal(process_update)
+    self._runner._plan.add_failure(process_update.process)
+    if process_update.process in self._runner._plan.failed:
+      log.info('Process %s reached maximum failures, marking process run failed.' %
+          process_update.process)
+    else:
+      log.info('Process %s under maximum failure limit, restarting.' % process_update.process)
+
+  def on_lost(self, process_update):
+    log.debug('Process on_lost %s' % process_update)
+    self._cleanup(process_update)
+    self._on_abnormal(process_update)
+    self._runner._plan.lost(process_update.process)
+
+  def on_killed(self, process_update):
+    log.debug('Process on_killed %s' % process_update)
+    self._cleanup(process_update)
+    self._runner._task_processes.pop(process_update.process)
+    self._runner._watcher.unregister(process_update.process)
+    log.debug('Process killed, marking it as a loss.')
+    self._runner._plan.lost(process_update.process)
+
+
+class TaskRunnerTaskHandler(TaskStateHandler):
+  """
+    Accesses these parts of the runner:
+      _plan [set to regular_plan or finalizing_plan]
+      _recovery [boolean, whether or not to side-effect]
+      _pathspec [path creation]
+      _task [ThermosTask]
+      _set_finalization_start
+      _kill
+  """
+
+  def __init__(self, runner):
+    self._runner = runner
+    self._pathspec = self._runner._pathspec
+
+  def on_active(self, task_update):
+    log.debug('Task on_active(%s)' % task_update)
+    self._runner._plan = self._runner._regular_plan
+    if self._runner._recovery:
+      return
+    TaskRunnerHelper.initialize_task(self._pathspec,
+        ThermosTaskWrapper(self._runner._task).to_json())
+
+  def on_cleaning(self, task_update):
+    log.debug('Task on_cleaning(%s)' % task_update)
+    self._runner._finalization_start = task_update.timestamp_ms / 1000.0
+    self._runner._terminate_plan(self._runner._regular_plan)
+
+  def on_finalizing(self, task_update):
+    log.debug('Task on_finalizing(%s)' % task_update)
+    if not self._runner._recovery:
+      self._runner._kill()
+    self._runner._plan = self._runner._finalizing_plan
+    if self._runner._finalization_start is None:
+      self._runner._finalization_start = task_update.timestamp_ms / 1000.0
+
+  def on_killed(self, task_update):
+    log.debug('Task on_killed(%s)' % task_update)
+    self._cleanup()
+
+  def on_success(self, task_update):
+    log.debug('Task on_success(%s)' % task_update)
+    self._cleanup()
+    log.info('Task succeeded.')
+
+  def on_failed(self, task_update):
+    log.debug('Task on_failed(%s)' % task_update)
+    self._cleanup()
+
+  def on_lost(self, task_update):
+    log.debug('Task on_lost(%s)' % task_update)
+    self._cleanup()
+
+  def _cleanup(self):
+    if not self._runner._recovery:
+      self._runner._kill()
+      TaskRunnerHelper.finalize_task(self._pathspec)
+
+
+class TaskRunnerUniversalHandler(UniversalStateHandler):
+  """
+    Universal handler to checkpoint every process and task transition of the runner.
+
+    Accesses these parts of the runner:
+      _ckpt_write
+  """
+
+  def __init__(self, runner):
+    self._runner = runner
+
+  def _checkpoint(self, record):
+    self._runner._ckpt_write(record)
+
+  def on_process_transition(self, state, process_update):
+    log.debug('_on_process_transition: %s' % process_update)
+    self._checkpoint(RunnerCkpt(process_status=process_update))
+
+  def on_task_transition(self, state, task_update):
+    log.debug('_on_task_transition: %s' % task_update)
+    self._checkpoint(RunnerCkpt(task_status=task_update))
+
+  def on_initialization(self, header):
+    log.debug('_on_initialization: %s' % header)
+    ThermosTaskValidator.assert_valid_task(self._runner.task)
+    ThermosTaskValidator.assert_valid_ports(self._runner.task, header.ports)
+    self._checkpoint(RunnerCkpt(runner_header=header))
+
+
+class TaskRunnerStage(object):
+  """
+    A stage of the task runner pipeline.
+  """
+  MAX_ITERATION_WAIT = Amount(1, Time.SECONDS)
+
+  def __init__(self, runner):
+    self.runner = runner
+    self.clock = runner._clock
+
+  def run(self):
+    """
+      Perform any work necessary at this stage of the task.
+
+      If there is no more work to be done, return None. [This will invoke a state transition.]
+
+      If there is still work to be done, return the number of seconds from now in which you'd like
+      to be called to re-run the plan.
+    """
+    return None
+
+  def transition_to(self):
+    """
+      The stage to which we should transition.
+    """
+    raise NotImplementedError
+
+
+class TaskRunnerStage_ACTIVE(TaskRunnerStage):
+  """
+    Run the regular plan (i.e. normal, non-finalizing processes.)
+  """
+  MAX_ITERATION_WAIT = Amount(15, Time.SECONDS)
+  MIN_ITERATION_WAIT = Amount(1, Time.SECONDS)
+
+  def __init__(self, runner):
+    super(TaskRunnerStage_ACTIVE, self).__init__(runner)
+
+  def run(self):
+    launched = self.runner._run_plan(self.runner._regular_plan)
+
+    # Have we terminated?
+    terminal_state = None
+    if self.runner._regular_plan.is_complete():
+      log.info('Regular plan complete.')
+      terminal_state = TaskState.SUCCESS if self.runner.is_healthy() else TaskState.FAILED
+    elif not self.runner.is_healthy():
+      log.error('Regular plan unhealthy!')
+      terminal_state = TaskState.FAILED
+
+    if terminal_state:
+      # No more work to do
+      return None
+    elif launched > 0:
+      # We want to run ASAP after updates have been collected
+      return max(self.MIN_ITERATION_WAIT.as_(Time.SECONDS), self.runner._regular_plan.min_wait())
+    else:
+      # We want to run as soon as something is available to run or after a prescribed timeout.
+      return min(self.MAX_ITERATION_WAIT.as_(Time.SECONDS), self.runner._regular_plan.min_wait())
+
+  def transition_to(self):
+    return TaskState.CLEANING
+
+
+class TaskRunnerStage_CLEANING(TaskRunnerStage):
+  """
+    Start the cleanup of the regular plan (e.g. if it failed.)  On ACTIVE -> CLEANING,
+    we send SIGTERMs to all still-running processes.  We wait at most finalization_wait
+    for all processes to complete before SIGKILLs are sent.  If everything exits cleanly
+    prior to that point in time, we transition to FINALIZING, which kicks into gear
+    the finalization schedule (if any.)
+  """
+  def run(self):
+    log.debug('TaskRunnerStage[CLEANING]: Finalization remaining: %s' %
+        self.runner._finalization_remaining())
+    if self.runner._finalization_remaining() > 0 and self.runner.has_running_processes():
+      return min(self.runner._finalization_remaining(), self.MAX_ITERATION_WAIT.as_(Time.SECONDS))
+
+  def transition_to(self):
+    if self.runner._finalization_remaining() <= 0:
+      log.info('Exceeded finalization wait, skipping finalization.')
+      return self.runner.terminal_state()
+    return TaskState.FINALIZING
+
+
+class TaskRunnerStage_FINALIZING(TaskRunnerStage):
+  """
+    Run the finalizing plan, specifically the plan of tasks with the 'final'
+    bit marked (e.g. log savers, checkpointers and the like.)  Anything in this
+    plan will be SIGKILLed if we go over the finalization_wait.
+  """
+
+  def run(self):
+    self.runner._run_plan(self.runner._finalizing_plan)
+    log.debug('TaskRunnerStage[FINALIZING]: Finalization remaining: %s' %
+        self.runner._finalization_remaining())
+    if self.runner.deadlocked(self.runner._finalizing_plan):
+      log.warning('Finalizing plan deadlocked.')
+      return None
+    if self.runner._finalization_remaining() > 0 and not self.runner._finalizing_plan.is_complete():
+      return min(self.runner._finalization_remaining(), self.MAX_ITERATION_WAIT.as_(Time.SECONDS))
+
+  def transition_to(self):
+    if self.runner._finalization_remaining() <= 0:
+      log.info('Exceeded finalization wait, terminating finalization.')
+    return self.runner.terminal_state()
+
+
+class TaskRunner(object):
+  """
+    Run a ThermosTask.
+
+    This class encapsulates the core logic to run and control the state of a Thermos task.
+    Typically, it will be instantiated directly to control a new task, but a TaskRunner can also be
+    synthesised from an existing task's checkpoint root
+  """
+  class Error(Exception): pass
+  class InvalidTask(Error): pass
+  class InternalError(Error): pass
+  class PermissionError(Error): pass
+  class StateError(Error): pass
+
+  # Maximum amount of time we spend waiting for new updates from the checkpoint streams
+  # before doing housecleaning (checking for LOST tasks, dead PIDs.)
+  MAX_ITERATION_TIME = Amount(10, Time.SECONDS)
+
+  # Minimum amount of time we wait between polls for updates on coordinator checkpoints.
+  COORDINATOR_INTERVAL_SLEEP = Amount(1, Time.SECONDS)
+
+  # Amount of time we're willing to wait after forking before we expect the runner to have
+  # exec'ed the child process.
+  LOST_TIMEOUT = Amount(60, Time.SECONDS)
+
+  # Active task stages
+  STAGES = {
+    TaskState.ACTIVE: TaskRunnerStage_ACTIVE,
+    TaskState.CLEANING: TaskRunnerStage_CLEANING,
+    TaskState.FINALIZING: TaskRunnerStage_FINALIZING
+  }
+
+  @classmethod
+  def get(cls, task_id, checkpoint_root):
+    """
+      Get a TaskRunner bound to the task_id in checkpoint_root.
+    """
+    path = TaskPath(root=checkpoint_root, task_id=task_id, state='active')
+    task_json = path.getpath('task_path')
+    task_checkpoint = path.getpath('runner_checkpoint')
+    if not os.path.exists(task_json):
+      return None
+    task = ThermosConfigLoader.load_json(task_json)
+    if task is None:
+      return None
+    if len(task.tasks()) == 0:
+      return None
+    try:
+      checkpoint = CheckpointDispatcher.from_file(task_checkpoint)
+      if checkpoint is None or checkpoint.header is None:
+        return None
+      return cls(task.tasks()[0].task(), checkpoint_root, checkpoint.header.sandbox,
+                 log_dir=checkpoint.header.log_dir, task_id=task_id,
+                 portmap=checkpoint.header.ports)
+    except Exception as e:
+      log.error('Failed to reconstitute checkpoint in TaskRunner.get: %s' % e, exc_info=True)
+      return None
+
+  def __init__(self, task, checkpoint_root, sandbox, log_dir=None,
+               task_id=None, portmap=None, user=None, chroot=False, clock=time,
+               universal_handler=None, planner_class=TaskPlanner):
+    """
+      required:
+        task (config.Task) = the task to run
+        checkpoint_root (path) = the checkpoint root
+        sandbox (path) = the sandbox in which the path will be run
+                         [if None, cwd will be assumed, but garbage collection will be
+                          disabled for this task.]
+
+      optional:
+        log_dir (string)  = directory to house stdout/stderr logs. If not specified, logs will be
+                            written into the sandbox directory under .logs/
+        task_id (string)  = bind to this task id.  if not specified, will synthesize an id based
+                            upon task.name()
+        portmap (dict)    = a map (string => integer) from name to port, e.g. { 'http': 80 }
+        user (string)     = the user to run the task as.  if not current user, requires setuid
+                            privileges.
+        chroot (boolean)  = whether or not to chroot into the sandbox prior to exec.
+        clock (time interface) = the clock to use throughout
+        universal_handler = checkpoint record handler (only used for testing)
+        planner_class (TaskPlanner class) = TaskPlanner class to use for constructing the task
+                            planning policy.
+    """
+    if not issubclass(planner_class, TaskPlanner):
+      raise TypeError('planner_class must be a TaskPlanner.')
+    self._clock = clock
+    launch_time = self._clock.time()
+    launch_time_ms = '%06d' % int((launch_time - int(launch_time)) * 10**6)
+    if not task_id:
+      self._task_id = '%s-%s.%s' % (task.name(),
+                                    time.strftime('%Y%m%d-%H%M%S', time.localtime(launch_time)),
+                                    launch_time_ms)
+    else:
+      self._task_id = task_id
+    current_user = TaskRunnerHelper.get_actual_user()
+    self._user = user or current_user
+    # TODO(wickman) This should be delegated to the ProcessPlatform / Helper
+    if self._user != current_user:
+      if os.geteuid() != 0:
+        raise ValueError('task specifies user as %s, but %s does not have setuid permission!' % (
+          self._user, current_user))
+    self._portmap = portmap or {}
+    self._launch_time = launch_time
+    self._log_dir = log_dir or os.path.join(sandbox, '.logs')
+    self._pathspec = TaskPath(root=checkpoint_root, task_id=self._task_id, log_dir=self._log_dir)
+    try:
+      ThermosTaskValidator.assert_valid_task(task)
+      ThermosTaskValidator.assert_valid_ports(task, self._portmap)
+    except ThermosTaskValidator.InvalidTaskError as e:
+      raise self.InvalidTask('Invalid task: %s' % e)
+    context = ThermosContext(
+        task_id=self._task_id,
+        ports=self._portmap,
+        user=self._user)
+    self._task, uninterp = (task % Environment(thermos=context)).interpolate()
+    if len(uninterp) > 0:
+      raise self.InvalidTask('Failed to interpolate task, missing: %s' %
+          ', '.join(str(ref) for ref in uninterp))
+    try:
+      ThermosTaskValidator.assert_same_task(self._pathspec, self._task)
+    except ThermosTaskValidator.InvalidTaskError as e:
+      raise self.InvalidTask('Invalid task: %s' % e)
+    self._plan = None # plan currently being executed (updated by Handlers)
+    self._regular_plan = planner_class(self._task, clock=clock,
+        process_filter=lambda proc: proc.final().get() == False)
+    self._finalizing_plan = planner_class(self._task, clock=clock,
+        process_filter=lambda proc: proc.final().get() == True)
+    self._chroot = chroot
+    self._sandbox = sandbox
+    self._terminal_state = None
+    self._ckpt = None
+    self._process_map = dict((p.name().get(), p) for p in self._task.processes())
+    self._task_processes = {}
+    self._stages = dict((state, stage(self)) for state, stage in self.STAGES.items())
+    self._finalization_start = None
+    self._preemption_deadline = None
+    self._watcher = ProcessMuxer(self._pathspec)
+    self._state   = RunnerState(processes = {})
+
+    # create runner state
+    universal_handler = universal_handler or TaskRunnerUniversalHandler
+    self._dispatcher = CheckpointDispatcher()
+    self._dispatcher.register_handler(universal_handler(self))
+    self._dispatcher.register_handler(TaskRunnerProcessHandler(self))
+    self._dispatcher.register_handler(TaskRunnerTaskHandler(self))
+
+    # recover checkpointed runner state and update plan
+    self._recovery = True
+    self._replay_runner_ckpt()
+
+  @property
+  def task(self):
+    return self._task
+
+  @property
+  def task_id(self):
+    return self._task_id
+
+  @property
+  def state(self):
+    return self._state
+
+  @property
+  def processes(self):
+    return self._task_processes
+
+  def task_state(self):
+    return self._state.statuses[-1].state if self._state.statuses else TaskState.ACTIVE
+
+  def close_ckpt(self):
+    """Force close the checkpoint stream.  This is necessary for runners terminated through
+       exception propagation."""
+    log.debug('Closing the checkpoint stream.')
+    self._ckpt.close()
+
+  @contextmanager
+  def control(self, force=False):
+    """
+      Bind to the checkpoint associated with this task, position to the end of the log if
+      it exists, or create it if it doesn't.  Fails if we cannot get "leadership" i.e. a
+      file lock on the checkpoint stream.
+    """
+    if self.is_terminal():
+      raise TaskRunner.StateError('Cannot take control of a task in terminal state.')
+    if self._sandbox:
+      safe_mkdir(self._sandbox)
+    ckpt_file = self._pathspec.getpath('runner_checkpoint')
+    try:
+      self._ckpt = TaskRunnerHelper.open_checkpoint(ckpt_file, force=force, state=self._state)
+    except TaskRunnerHelper.PermissionError:
+      raise TaskRunner.PermissionError('Unable to open checkpoint %s' % ckpt_file)
+    log.debug('Flipping recovery mode off.')
+    self._recovery = False
+    self._set_task_status(self.task_state())
+    self._resume_task()
+    try:
+      yield
+    except Exception as e:
+      log.error('Caught exception in self.control(): %s' % e)
+      log.error('  %s' % traceback.format_exc())
+    self._ckpt.close()
+
+  def _resume_task(self):
+    assert self._ckpt is not None
+    unapplied_updates = self._replay_process_ckpts()
+    if self.is_terminal():
+      raise self.StateError('Cannot resume terminal task.')
+    self._initialize_ckpt_header()
+    self._replay(unapplied_updates)
+
+  def _ckpt_write(self, record):
+    """
+      Write to the checkpoint stream if we're not in recovery mode.
+    """
+    if not self._recovery:
+      self._ckpt.write(record)
+
+  def _replay(self, checkpoints):
+    """
+      Replay a sequence of RunnerCkpts.
+    """
+    for checkpoint in checkpoints:
+      self._dispatcher.dispatch(self._state, checkpoint)
+
+  def _replay_runner_ckpt(self):
+    """
+      Replay the checkpoint stream associated with this task.
+    """
+    ckpt_file = self._pathspec.getpath('runner_checkpoint')
+    if os.path.exists(ckpt_file):
+      fp = open(ckpt_file, "r")
+      ckpt_recover = ThriftRecordReader(fp, RunnerCkpt)
+      for record in ckpt_recover:
+        log.debug('Replaying runner checkpoint record: %s' % record)
+        self._dispatcher.dispatch(self._state, record, recovery=True)
+      ckpt_recover.close()
+
+  def _replay_process_ckpts(self):
+    """
+      Replay the unmutating process checkpoints.  Return the unapplied process updates that
+      would mutate the runner checkpoint stream.
+    """
+    process_updates = self._watcher.select()
+    unapplied_process_updates = []
+    for process_update in process_updates:
+      if self._dispatcher.would_update(self._state, process_update):
+        unapplied_process_updates.append(process_update)
+      else:
+        self._dispatcher.dispatch(self._state, process_update, recovery=True)
+    return unapplied_process_updates
+
+  def _initialize_ckpt_header(self):
+    """
+      Initializes the RunnerHeader for this checkpoint stream if it has not already
+      been constructed.
+    """
+    if self._state.header is None:
+      header = RunnerHeader(
+        task_id=self._task_id,
+        launch_time_ms=int(self._launch_time*1000),
+        sandbox=self._sandbox,
+        log_dir=self._log_dir,
+        hostname=socket.gethostname(),
+        user=self._user,
+        ports=self._portmap)
+      runner_ckpt = RunnerCkpt(runner_header=header)
+      self._dispatcher.dispatch(self._state, runner_ckpt)
+
+  def _set_task_status(self, state):
+    update = TaskStatus(state=state, timestamp_ms=int(self._clock.time() * 1000),
+                        runner_pid=os.getpid(), runner_uid=os.getuid())
+    runner_ckpt = RunnerCkpt(task_status=update)
+    self._dispatcher.dispatch(self._state, runner_ckpt, self._recovery)
+
+  def _finalization_remaining(self):
+    # If a preemption deadline has been set, use that.
+    if self._preemption_deadline:
+      return max(0, self._preemption_deadline - self._clock.time())
+
+    # Otherwise, use the finalization wait provided in the configuration.
+    finalization_allocation = self.task.finalization_wait().get()
+    if self._finalization_start is None:
+      return sys.float_info.max
+    else:
+     waited = max(0, self._clock.time() - self._finalization_start)
+     return max(0, finalization_allocation - waited)
+
+  def _set_process_status(self, process_name, process_state, **kw):
+    if 'sequence_number' in kw:
+      sequence_number = kw.pop('sequence_number')
+      log.debug('_set_process_status(%s <= %s, seq=%s[force])' % (process_name,
+        ProcessState._VALUES_TO_NAMES.get(process_state), sequence_number))
+    else:
+      current_run = self._current_process_run(process_name)
+      if not current_run:
+        assert process_state == ProcessState.WAITING
+        sequence_number = 0
+      else:
+        sequence_number = current_run.seq + 1
+      log.debug('_set_process_status(%s <= %s, seq=%s[auto])' % (process_name,
+        ProcessState._VALUES_TO_NAMES.get(process_state), sequence_number))
+    runner_ckpt = RunnerCkpt(process_status=ProcessStatus(
+      process=process_name, state=process_state, seq=sequence_number, **kw))
+    self._dispatcher.dispatch(self._state, runner_ckpt, self._recovery)
+
+  def _task_process_from_process_name(self, process_name, sequence_number):
+    """
+      Construct a Process() object from a process_name, populated with its
+      correct run number and fully interpolated commandline.
+    """
+    run_number = len(self.state.processes[process_name]) - 1
+    pathspec = self._pathspec.given(process=process_name, run=run_number)
+    process = self._process_map.get(process_name)
+    if process is None:
+      raise self.InternalError('FATAL: Could not find process: %s' % process_name)
+    def close_ckpt_and_fork():
+      pid = os.fork()
+      if pid == 0 and self._ckpt is not None:
+        self._ckpt.close()
+      return pid
+    return Process(
+      process.name().get(),
+      process.cmdline().get(),
+      sequence_number,
+      pathspec,
+      self._sandbox,
+      self._user,
+      chroot=self._chroot,
+      fork=close_ckpt_and_fork)
+
+  def deadlocked(self, plan=None):
+    """Check whether a plan is deadlocked, i.e. there are no running/runnable processes, and the
+    plan is not complete."""
+    plan = plan or self._regular_plan
+    now = self._clock.time()
+    running = list(plan.running)
+    runnable = list(plan.runnable_at(now))
+    waiting = list(plan.waiting_at(now))
+    log.debug('running:%d runnable:%d waiting:%d complete:%s' % (
+      len(running), len(runnable), len(waiting), plan.is_complete()))
+    return len(running + runnable + waiting) == 0 and not plan.is_complete()
+
+  def is_healthy(self):
+    """Check whether the TaskRunner is healthy. A healthy TaskRunner is not deadlocked and has not
+    reached its max_failures count."""
+    max_failures = self._task.max_failures().get()
+    deadlocked = self.deadlocked()
+    under_failure_limit = max_failures == 0 or len(self._regular_plan.failed) < max_failures
+    log.debug('max_failures:%d failed:%d under_failure_limit:%s deadlocked:%s ==> health:%s' % (
+      max_failures, len(self._regular_plan.failed), under_failure_limit, deadlocked,
+      not deadlocked and under_failure_limit))
+    return not deadlocked and under_failure_limit
+
+  def _current_process_run(self, process_name):
+    if process_name not in self._state.processes or len(self._state.processes[process_name]) == 0:
+      return None
+    return self._state.processes[process_name][-1]
+
+  def is_process_lost(self, process_name):
+    """Determine whether or not we should mark a task as LOST and do so if necessary."""
+    current_run = self._current_process_run(process_name)
+    if not current_run:
+      raise self.InternalError('No current_run for process %s!' % process_name)
+
+    def forked_but_never_came_up():
+      return current_run.state == ProcessState.FORKED and (
+        self._clock.time() - current_run.fork_time > TaskRunner.LOST_TIMEOUT.as_(Time.SECONDS))
+
+    def running_but_coordinator_died():
+      if current_run.state != ProcessState.RUNNING:
+        return False
+      coordinator_pid, _, _ = TaskRunnerHelper.scan_process(self.state, process_name)
+      if coordinator_pid is not None:
+        return False
+      elif self._watcher.has_data(process_name):
+        return False
+      return True
+
+    if forked_but_never_came_up() or running_but_coordinator_died():
+      log.info('Detected a LOST task: %s' % current_run)
+      log.debug('  forked_but_never_came_up: %s' % forked_but_never_came_up())
+      log.debug('  running_but_coordinator_died: %s' % running_but_coordinator_died())
+      return True
+
+    return False
+
+  def _run_plan(self, plan):
+    log.debug('Schedule pass:')
+
+    running = list(plan.running)
+    log.debug('running: %s' % ' '.join(plan.running))
+    log.debug('finished: %s' % ' '.join(plan.finished))
+
+    launched = []
+    for process_name in plan.running:
+      if self.is_process_lost(process_name):
+        self._set_process_status(process_name, ProcessState.LOST)
+
+    now = self._clock.time()
+    runnable = list(plan.runnable_at(now))
+    waiting = list(plan.waiting_at(now))
+    log.debug('runnable: %s' % ' '.join(runnable))
+    log.debug('waiting: %s' % ' '.join(
+        '%s[T-%.1fs]' % (process, plan.get_wait(process)) for process in waiting))
+
+    def pick_processes(process_list):
+      if self._task.max_concurrency().get() == 0:
+        return process_list
+      num_to_pick = max(self._task.max_concurrency().get() - len(running), 0)
+      return process_list[:num_to_pick]
+
+    for process_name in pick_processes(runnable):
+      tp = self._task_processes.get(process_name)
+      if tp:
+        current_run = self._current_process_run(process_name)
+        assert current_run.state == ProcessState.WAITING
+      else:
+        self._set_process_status(process_name, ProcessState.WAITING)
+        tp = self._task_processes[process_name]
+      log.info('Forking Process(%s)' % process_name)
+      tp.start()
+      launched.append(tp)
+
+    return len(launched) > 0
+
+  def _terminate_plan(self, plan):
+    for process in plan.running:
+      last_run = self._current_process_run(process)
+      if last_run and last_run.state in (ProcessState.FORKED, ProcessState.RUNNING):
+        TaskRunnerHelper.terminate_process(self.state, process)
+
+  def has_running_processes(self):
+    """
+      Returns True if any processes associated with this task have active pids.
+    """
+    process_tree = TaskRunnerHelper.scantree(self.state)
+    return any(any(process_set) for process_set in process_tree.values())
+
+  def has_active_processes(self):
+    """
+      Returns True if any processes are in non-terminal states.
+    """
+    return any(not TaskRunnerHelper.is_process_terminal(run.state) for run in
+        filter(None, (self._current_process_run(process) for process in self.state.processes)))
+
+  def collect_updates(self, timeout=None):
+    """
+      Collects and applies updates from process checkpoint streams.  Returns the number
+      of applied process checkpoints.
+    """
+    if self.has_active_processes():
+      sleep_interval = self.COORDINATOR_INTERVAL_SLEEP.as_(Time.SECONDS)
+      total_time = 0.0
+      while True:
+        process_updates = self._watcher.select()
+        for process_update in process_updates:
+          self._dispatcher.dispatch(self._state, process_update, self._recovery)
+        if process_updates:
+          return len(process_updates)
+        if timeout and total_time >= timeout:
+          break
+        total_time += sleep_interval
+        self._clock.sleep(sleep_interval)
+    return 0
+
+  def is_terminal(self):
+    return TaskRunnerHelper.is_task_terminal(self.task_state())
+
+  def terminal_state(self):
+    if self._terminal_state:
+      log.debug('Forced terminal state: %s' %
+          TaskState._VALUES_TO_NAMES.get(self._terminal_state, 'UNKNOWN'))
+      return self._terminal_state
+    else:
+      return TaskState.SUCCESS if self.is_healthy() else TaskState.FAILED
+
+  def run(self, force=False):
+    """
+      Entrypoint to runner. Assume control of checkpoint stream, and execute TaskRunnerStages
+      until runner is terminal.
+    """
+    if self.is_terminal():
+      return
+    with self.control(force):
+      self._run()
+
+  def _run(self):
+    iteration_time = self.MAX_ITERATION_TIME.as_(Time.SECONDS)
+    while not self.is_terminal():
+      start = self._clock.time()
+      # step 1: execute stage corresponding to the state we're currently in
+      runner = self._stages[self.task_state()]
+      iteration_wait = runner.run()
+      if iteration_wait is None:
+        log.debug('Run loop: No more work to be done in state %s' %
+            TaskState._VALUES_TO_NAMES.get(self.task_state(), 'UNKNOWN'))
+        self._set_task_status(runner.transition_to())
+        continue
+      log.debug('Run loop: Work to be done within %.1fs' % iteration_wait)
+      # step 2: check child process checkpoint streams for updates
+      if not self.collect_updates(iteration_wait):
+        # If we don't collect any updates, at least 'touch' the checkpoint stream
+        # so as to prevent garbage collection.
+        elapsed = self._clock.time() - start
+        if elapsed < iteration_wait:
+          log.debug('Update collection only took %.1fs, idling %.1fs' % (
+              elapsed, iteration_wait - elapsed))
+          self._clock.sleep(iteration_wait - elapsed)
+        log.debug('Run loop: No updates collected, touching checkpoint.')
+        os.utime(self._pathspec.getpath('runner_checkpoint'), None)
+      # step 3: reap any zombie child processes
+      TaskRunnerHelper.reap_children()
+
+  def kill(self, force=False, terminal_status=TaskState.KILLED,
+           preemption_wait=Amount(1, Time.MINUTES)):
+    """
+      Kill all processes associated with this task and set task/process states as terminal_status
+      (defaults to KILLED)
+    """
+    log.debug('Runner issued kill: force:%s, preemption_wait:%s' % (
+      force, preemption_wait))
+    assert terminal_status in (TaskState.KILLED, TaskState.LOST)
+    self._preemption_deadline = self._clock.time() + preemption_wait.as_(Time.SECONDS)
+    with self.control(force):
+      if self.is_terminal():
+        log.warning('Task is not in ACTIVE state, cannot issue kill.')
+        return
+      self._terminal_state = terminal_status
+      if self.task_state() == TaskState.ACTIVE:
+        self._set_task_status(TaskState.CLEANING)
+      self._run()
+
+  def lose(self, force=False):
+    """
+      Mark a task as LOST and kill any straggling processes.
+    """
+    self.kill(force, preemption_wait=Amount(0, Time.SECONDS), terminal_status=TaskState.LOST)
+
+  def _kill(self):
+    processes = TaskRunnerHelper.scantree(self._state)
+    for process, pid_tuple in processes.items():
+      current_run = self._current_process_run(process)
+      coordinator_pid, pid, tree = pid_tuple
+      if TaskRunnerHelper.is_process_terminal(current_run.state):
+        if coordinator_pid or pid or tree:
+          log.warning('Terminal process (%s) still has running pids:' % process)
+          log.warning('  coordinator_pid: %s' % coordinator_pid)
+          log.warning('              pid: %s' % pid)
+          log.warning('             tree: %s' % tree)
+        TaskRunnerHelper.kill_process(self.state, process)
+      else:
+        if coordinator_pid or pid or tree:
+          log.info('Transitioning %s to KILLED' % process)
+          self._set_process_status(process, ProcessState.KILLED,
+            stop_time=self._clock.time(), return_code=-1)
+        else:
+          log.info('Transitioning %s to LOST' % process)
+          if current_run.state != ProcessState.WAITING:
+            self._set_process_status(process, ProcessState.LOST)

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/apache/thermos/monitoring/BUILD
----------------------------------------------------------------------
diff --git a/src/main/python/apache/thermos/monitoring/BUILD b/src/main/python/apache/thermos/monitoring/BUILD
new file mode 100644
index 0000000..a977878
--- /dev/null
+++ b/src/main/python/apache/thermos/monitoring/BUILD
@@ -0,0 +1,97 @@
+import os
+
+python_library(
+  name = 'detector',
+  sources = ['detector.py'],
+  dependencies = [
+    pants('src/main/python/twitter/thermos/common:path')
+  ]
+)
+
+python_library(
+  name = 'garbage',
+  sources = ['garbage.py'],
+  dependencies = [
+    pants(':detector'),
+    pants('aurora/twitterdeps/src/python/twitter/common/dirutil'),
+    pants('aurora/twitterdeps/src/python/twitter/common/lang'),
+    pants('aurora/twitterdeps/src/python/twitter/common/quantity'),
+    pants('src/main/python/twitter/thermos/common:ckpt'),
+    pants('src/main/python/twitter/thermos/common:path'),
+  ]
+)
+
+python_library(
+  name = 'monitor',
+  sources = ['monitor.py'],
+  dependencies = [
+    pants('aurora/twitterdeps/src/python/twitter/common/log'),
+    pants('aurora/twitterdeps/src/python/twitter/common/recordio:recordio-thrift'),
+    pants('src/main/python/twitter/thermos/common:ckpt'),
+    pants('src/main/thrift/com/twitter/thermos:py-thrift'),
+  ]
+)
+
+python_library(
+  name = 'disk',
+  sources = ['disk.py'],
+  dependencies = [
+    pants('aurora/twitterdeps/src/python/twitter/common/dirutil'),
+    pants('aurora/twitterdeps/src/python/twitter/common/exceptions'),
+    pants('aurora/twitterdeps/src/python/twitter/common/lang'),
+    pants('aurora/twitterdeps/src/python/twitter/common/log'),
+    pants('aurora/twitterdeps/src/python/twitter/common/quantity'),
+    python_requirement('watchdog'),
+  ]
+)
+
+python_library(
+  name = 'process',
+  sources = ['process.py'],
+)
+
+python_library(
+  name = 'process_collector_psutil',
+  sources = ['process_collector_psutil.py'],
+  dependencies = [
+    pants(':process'),
+    pants('aurora/twitterdeps/src/python/twitter/common/log'),
+    pants('src/main/python/twitter/thermos:psutil'),
+  ]
+)
+
+python_library(
+  name = 'resource',
+  sources = ['resource.py'],
+  dependencies = [
+    pants(':disk'),
+    pants(':monitor'),
+    pants(':process'),
+    pants(':process_collector_psutil'),
+    pants('aurora/twitterdeps/src/python/twitter/common/collections'),
+    pants('aurora/twitterdeps/src/python/twitter/common/concurrent'),
+    pants('aurora/twitterdeps/src/python/twitter/common/lang'),
+    pants('aurora/twitterdeps/src/python/twitter/common/log'),
+    pants('aurora/twitterdeps/src/python/twitter/common/quantity'),
+  ]
+)
+
+python_library(
+  name = 'monitoring',
+  dependencies = [
+    pants(':detector'),
+    pants(':disk'),
+    pants(':garbage'),
+    pants(':monitor'),
+    pants(':process'),
+    pants(':resource'),
+
+    # covering dependency for common
+    pants('src/main/python/twitter/thermos/common'),
+  ],
+  provides = setup_py(
+    name = 'twitter.thermos.monitoring',
+    version = open(os.path.join(get_buildroot(), '.auroraversion')).read().strip().lower(),
+    description = 'Thermos monitoring library.',
+  )
+)

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/apache/thermos/monitoring/__init__.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/thermos/monitoring/__init__.py b/src/main/python/apache/thermos/monitoring/__init__.py
new file mode 100644
index 0000000..e69de29

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/apache/thermos/monitoring/detector.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/thermos/monitoring/detector.py b/src/main/python/apache/thermos/monitoring/detector.py
new file mode 100644
index 0000000..ed48c93
--- /dev/null
+++ b/src/main/python/apache/thermos/monitoring/detector.py
@@ -0,0 +1,91 @@
+"""Detect Thermos tasks on disk
+
+This module contains the TaskDetector, used to detect Thermos tasks within a given checkpoint root.
+
+"""
+
+import glob
+import os
+import re
+
+from twitter.thermos.common.path import TaskPath
+
+
+class TaskDetector(object):
+  """
+    Helper class in front of TaskPath to detect active/finished/running tasks. Performs no
+    introspection on the state of a task; merely detects based on file paths on disk.
+  """
+  class MatchingError(Exception): pass
+
+  def __init__(self, root):
+    self._root_dir = root
+    self._pathspec = TaskPath()
+
+  def get_task_ids(self, state=None):
+    paths = glob.glob(self._pathspec.given(root=self._root_dir,
+                                           task_id="*",
+                                           state=state or '*')
+                                    .getpath('task_path'))
+    path_re = re.compile(self._pathspec.given(root=re.escape(self._root_dir),
+                                              task_id="(\S+)",
+                                              state='(\S+)')
+                                       .getpath('task_path'))
+    for path in paths:
+      try:
+        task_state, task_id = path_re.match(path).groups()
+      except:
+        continue
+      if state is None or task_state == state:
+        yield (task_state, task_id)
+
+  def get_process_runs(self, task_id, log_dir):
+    paths = glob.glob(self._pathspec.given(root=self._root_dir,
+                                           task_id=task_id,
+                                           log_dir=log_dir,
+                                           process='*',
+                                           run='*')
+                                    .getpath('process_logdir'))
+    path_re = re.compile(self._pathspec.given(root=re.escape(self._root_dir),
+                                              task_id=re.escape(task_id),
+                                              log_dir=log_dir,
+                                              process='(\S+)',
+                                              run='(\d+)')
+                                       .getpath('process_logdir'))
+    for path in paths:
+      try:
+        process, run = path_re.match(path).groups()
+      except:
+        continue
+      yield process, int(run)
+
+  def get_process_logs(self, task_id, log_dir):
+    for process, run in self.get_process_runs(task_id, log_dir):
+      for logtype in ('stdout', 'stderr'):
+        path = (self._pathspec.with_filename(logtype).given(root=self._root_dir,
+                                                           task_id=task_id,
+                                                           log_dir=log_dir,
+                                                           process=process,
+                                                           run=run)
+                                                     .getpath('process_logdir'))
+        if os.path.exists(path):
+          yield path
+
+  def get_checkpoint(self, task_id):
+    return self._pathspec.given(root=self._root_dir, task_id=task_id).getpath('runner_checkpoint')
+
+  def get_process_checkpoints(self, task_id):
+    matching_paths = glob.glob(self._pathspec.given(root=self._root_dir,
+                                                    task_id=task_id,
+                                                    process='*')
+                                             .getpath('process_checkpoint'))
+    path_re = re.compile(self._pathspec.given(root=re.escape(self._root_dir),
+                                              task_id=re.escape(task_id),
+                                              process='(\S+)')
+                                       .getpath('process_checkpoint'))
+    for path in matching_paths:
+      try:
+        process, = path_re.match(path).groups()
+      except:
+        continue
+      yield path

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/apache/thermos/monitoring/disk.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/thermos/monitoring/disk.py b/src/main/python/apache/thermos/monitoring/disk.py
new file mode 100644
index 0000000..9116746
--- /dev/null
+++ b/src/main/python/apache/thermos/monitoring/disk.py
@@ -0,0 +1,181 @@
+"""Sample disk usage under a particular path
+
+This module provides threads which can be used to gather information on the disk utilisation
+under a particular path.
+
+Currently, there are two threads available:
+  - DiskCollectorThread, which periodically uses a basic brute-force approach (os.stat()ing every
+    file within the path)
+  - InotifyDiskCollectorThread, which updates disk utilisation dynamically by using inotify to
+    monitor disk changes within the path
+
+"""
+
+import os
+import threading
+import time
+from Queue import Empty, Queue
+
+from twitter.common import log
+from twitter.common.dirutil import du, safe_bsize
+from twitter.common.exceptions import ExceptionalThread
+from twitter.common.lang import Lockable
+from twitter.common.quantity import Amount, Time
+
+from watchdog.observers import Observer as WatchdogObserver
+from watchdog.events import (
+  FileSystemEventHandler,
+  FileCreatedEvent,
+  FileDeletedEvent,
+  FileModifiedEvent,
+  FileMovedEvent,
+)
+
+
+class DiskCollectorThread(ExceptionalThread):
+  """ Thread to calculate aggregate disk usage under a given path using a simple algorithm """
+  def __init__(self, path):
+    self.path = path
+    self.value = None
+    self.event = threading.Event()
+    super(DiskCollectorThread, self).__init__()
+    self.daemon = True
+
+  def run(self):
+    log.debug("DiskCollectorThread: starting collection of %s" % self.path)
+    self.value = du(self.path)
+    log.debug("DiskCollectorThread: finished collection of %s" % self.path)
+    self.event.set()
+
+  def finished(self):
+    return self.event.is_set()
+
+
+class DiskCollector(Lockable):
+  """ Spawn a background thread to sample disk usage """
+  def __init__(self, root):
+    self._root = root
+    self._thread = None
+    self._value = 0
+    super(DiskCollector, self).__init__()
+
+  @Lockable.sync
+  def sample(self):
+    """ Trigger collection of sample, if not already begun """
+    if self._thread is None:
+      self._thread = DiskCollectorThread(self._root)
+      self._thread.start()
+
+  @property
+  @Lockable.sync
+  def value(self):
+    """ Retrieve value of disk usage """
+    if self._thread is not None and self._thread.finished():
+      self._value = self._thread.value
+      self._thread = None
+    return self._value
+
+  @property
+  @Lockable.sync
+  def completed_event(self):
+    """ Return a threading.Event that will block until an in-progress disk collection is complete,
+    or block indefinitely otherwise. Use with caution! (i.e.: set a timeout) """
+    if self._thread is not None:
+      return self._thread.event
+    else:
+      return threading.Event()
+
+
+class InotifyDiskCollectorThread(ExceptionalThread, FileSystemEventHandler):
+  """ Thread to calculate aggregate disk usage under a given path
+
+    Note that while this thread uses inotify (through the watchdog module) to monitor disk events in
+    "real time", the actual processing of events is only performed periodically (configured via
+    COLLECTION_INTERVAL)
+
+  """
+  INTERESTING_EVENTS = (FileCreatedEvent, FileDeletedEvent, FileModifiedEvent, FileMovedEvent)
+  COLLECTION_INTERVAL = Amount(5, Time.SECONDS)
+
+  def __init__(self, path):
+    self._path = path
+    self._files = {}   # file path => size (bytes)
+    self._queue = Queue()
+    self._observer = WatchdogObserver()
+    super(InotifyDiskCollectorThread, self).__init__()
+    self.daemon = True
+
+  def dispatch(self, event):
+    """ Dispatch all interesting events to the internal queue """
+    if isinstance(event, self.INTERESTING_EVENTS):
+      self._queue.put(event)
+
+  def _initialize(self):
+    """ Collect an initial snapshot of the disk usage in the path """
+    log.debug("Starting watchdog observer to collect events...")
+    self._observer.schedule(self, path=self._path, recursive=True)
+    self._observer.start()
+    log.debug("Collecting initial disk usage sample...")
+    for root, _, files in os.walk(self._path):
+      for filename in files:
+        f = os.path.join(root, filename)
+        self._files[f] = safe_bsize(f)
+
+  def _process_events(self):
+    """ Deduplicate and process watchdog events, updating the internal file store appropriately """
+    file_ops = {}
+
+    def remove_file(path):
+      self._files.pop(path, None)
+    def stat_file(path):
+      self._files[path] = safe_bsize(path)
+
+    while not self._to_process.empty():
+      event = self._to_process.get()
+      if isinstance(event, (FileCreatedEvent, FileModifiedEvent)):
+        file_ops[event.src_path] = lambda: stat_file(event.src_path)
+      elif isinstance(event, FileDeletedEvent):
+        file_ops[event.src_path] = lambda: remove_file(event.src_path)
+      elif isinstance(event, FileMovedEvent):
+        file_ops[event.src_path] = lambda: remove_file(event.src_path)
+        file_ops[event.dest_path] = lambda: stat_file(event.dest_path)
+
+    for op in file_ops.values():
+      op()
+
+  def run(self):
+    """ Loop indefinitely, periodically processing watchdog/inotify events. """
+    self._initialize()
+    log.debug("Initialization complete. Moving to handling events.")
+    while True:
+      next = time.time() + self.COLLECTION_INTERVAL.as_(Time.SECONDS)
+      if not self._queue.empty():
+        self._to_process, self._queue = self._queue, Queue()
+        self._process_events()
+      time.sleep(max(0, next - time.time()))
+
+  @property
+  def value(self):
+    return sum(self._files.itervalues())
+
+
+class InotifyDiskCollector(object):
+  """ Spawn a background thread to sample disk usage """
+  def __init__(self, root):
+    self._root = root
+    self._thread = InotifyDiskCollectorThread(self._root)
+
+  def sample(self):
+    """ Trigger disk collection loop. """
+    if not os.path.exists(self._root):
+      log.error('Cannot start monitoring path until it exists')
+    elif not self._thread.is_alive():
+      self._thread.start()
+
+  @property
+  def value(self):
+    return self._thread.value
+
+  @property
+  def completed_event(self):
+    return threading.Event()

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/apache/thermos/monitoring/garbage.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/thermos/monitoring/garbage.py b/src/main/python/apache/thermos/monitoring/garbage.py
new file mode 100644
index 0000000..4ac0d3a
--- /dev/null
+++ b/src/main/python/apache/thermos/monitoring/garbage.py
@@ -0,0 +1,183 @@
+from abc import abstractmethod
+from collections import namedtuple
+import os
+import sys
+import time
+
+from twitter.common.dirutil import safe_delete, safe_rmtree, safe_bsize
+from twitter.common.lang import Interface
+from twitter.common.quantity import Amount, Data, Time
+from twitter.thermos.common.ckpt import CheckpointDispatcher
+from twitter.thermos.common.path import TaskPath
+
+from .detector import TaskDetector
+
+
+class TaskGarbageCollector(object):
+  def __init__(self, root):
+    self._root = root
+    self._detector = TaskDetector(root=self._root)
+    self._states = {}
+
+  def state(self, task_id):
+    if task_id not in self._states:
+      self._states[task_id] = CheckpointDispatcher.from_file(self._detector.get_checkpoint(task_id))
+    return self._states[task_id]
+
+  def get_age(self, task_id):
+    return os.path.getmtime(self._detector.get_checkpoint(task_id))
+
+  def get_finished_tasks(self):
+    return [task_id for _, task_id in self._detector.get_task_ids(state='finished')]
+
+  def get_metadata(self, task_id, with_size=True):
+    runner_ckpt = self._detector.get_checkpoint(task_id)
+    process_ckpts = [ckpt for ckpt in self._detector.get_process_checkpoints(task_id)]
+    json_spec = TaskPath(root=self._root, task_id=task_id, state='finished').getpath('task_path')
+    for path in [json_spec, runner_ckpt] + process_ckpts:
+      if with_size:
+        yield path, safe_bsize(path)
+      else:
+        yield path
+
+  def get_logs(self, task_id, with_size=True):
+    state = self.state(task_id)
+    if state and state.header:
+      for path in self._detector.get_process_logs(task_id, state.header.log_dir):
+        if with_size:
+          yield path, safe_bsize(path)
+        else:
+          yield path
+
+  def get_data(self, task_id, with_size=True):
+    state = self.state(task_id)
+    if state and state.header and state.header.sandbox:
+      for root, dirs, files in os.walk(state.header.sandbox):
+        for file in files:
+          filename = os.path.join(root, file)
+          if with_size:
+            yield filename, safe_bsize(filename)
+          else:
+            yield filename
+
+  def erase_task(self, task_id):
+    self.erase_data(task_id)
+    self.erase_logs(task_id)
+    self.erase_metadata(task_id)
+
+  def erase_metadata(self, task_id):
+    for fn in self.get_metadata(task_id, with_size=False):
+      safe_delete(fn)
+    safe_rmtree(TaskPath(root=self._root, task_id=task_id).getpath('checkpoint_path'))
+
+  def erase_logs(self, task_id):
+    for fn in self.get_logs(task_id, with_size=False):
+      safe_delete(fn)
+    state = self.state(task_id)
+    if state and state.header:
+      safe_rmtree(TaskPath(root=self._root, task_id=task_id, log_dir=state.header.log_dir)
+                  .getpath('process_logbase'))
+
+  def erase_data(self, task_id):
+    # TODO(wickman)
+    # This could be potentially dangerous if somebody naively runs their sandboxes in e.g.
+    # $HOME or / or similar.  Perhaps put a guard somewhere?
+    for fn in self.get_data(task_id, with_size=False):
+      os.unlink(fn)
+    state = self.state(task_id)
+    if state and state.header and state.header.sandbox:
+      safe_rmtree(state.header.sandbox)
+
+
+class TaskGarbageCollectionPolicy(Interface):
+  def __init__(self, collector):
+    if not isinstance(collector, TaskGarbageCollector):
+      raise ValueError(
+          "Expected collector to be a TaskGarbageCollector, got %s" % collector.__class__.__name__)
+    self._collector = collector
+
+  @property
+  def collector(self):
+    return self._collector
+
+  @abstractmethod
+  def run(self):
+    """Returns a list of task_ids that should be garbage collected given the specified policy."""
+
+
+class DefaultCollector(TaskGarbageCollectionPolicy):
+  def __init__(self, collector, **kw):
+    """
+      Default garbage collection policy.
+
+      Arguments that may be specified:
+        max_age:   Amount(Time) (max age of a retained task)  [default: infinity]
+        max_space: Amount(Data) (max space to keep)           [default: infinity]
+        max_tasks: int (max number of tasks to keep)          [default: infinity]
+        include_metadata: boolean  (Whether or not to include metadata in the
+          space calculations.)  [default: True]
+        include_logs: boolean  (Whether or not to include logs in the
+          space calculations.)  [default: True]
+        verbose: boolean (whether or not to log)  [default: False]
+        logger: callable (function to call with log messages) [default: sys.stdout.write]
+    """
+    self._max_age = kw.get('max_age', Amount(10**10, Time.DAYS))
+    self._max_space = kw.get('max_space', Amount(10**10, Data.TB))
+    self._max_tasks = kw.get('max_tasks', 10**10)
+    self._include_metadata = kw.get('include_metadata', True)
+    self._include_logs = kw.get('include_logs', True)
+    self._verbose = kw.get('verbose', False)
+    self._logger = kw.get('logger', sys.stdout.write)
+    TaskGarbageCollectionPolicy.__init__(self, collector)
+
+  def log(self, msg):
+    if self._verbose:
+      self._logger(msg)
+
+  def run(self):
+    tasks = []
+    now = time.time()
+
+    TaskTuple = namedtuple('TaskTuple', 'task_id age metadata_size log_size data_size')
+    for task_id in self.collector.get_finished_tasks():
+      age = Amount(int(now - self.collector.get_age(task_id)), Time.SECONDS)
+      self.log('Analyzing task %s (age: %s)... ' % (task_id, age))
+      metadata_size = Amount(sum(sz for _, sz in self.collector.get_metadata(task_id)), Data.BYTES)
+      self.log('  metadata %.1fKB ' % metadata_size.as_(Data.KB))
+      log_size = Amount(sum(sz for _, sz in self.collector.get_logs(task_id)), Data.BYTES)
+      self.log('  logs %.1fKB ' % log_size.as_(Data.KB))
+      data_size = Amount(sum(sz for _, sz in self.collector.get_data(task_id)), Data.BYTES)
+      self.log('  data %.1fMB ' % data_size.as_(Data.MB))
+      tasks.append(TaskTuple(task_id, age, metadata_size, log_size, data_size))
+
+    gc_tasks = set()
+    gc_tasks.update(task for task in tasks if task.age > self._max_age)
+    self.log('After age filter: %s tasks' % len(gc_tasks))
+
+    def total_gc_size(task):
+      return sum([task.data_size,
+                  task.metadata_size if self._include_metadata else Amount(0, Data.BYTES),
+                  task.log_size if self._include_logs else Amount(0, Data.BYTES)],
+                  Amount(0, Data.BYTES))
+
+    total_used = Amount(0, Data.BYTES)
+    for task in sorted(tasks, key=lambda tsk: tsk.age, reverse=True):
+      if task not in gc_tasks:
+        total_used += total_gc_size(task)
+        if total_used > self._max_space:
+          gc_tasks.add(task)
+    self.log('After size filter: %s tasks' % len(gc_tasks))
+
+    for task in sorted(tasks, key=lambda tsk: tsk.age, reverse=True):
+      if task not in gc_tasks and len(tasks) - len(gc_tasks) > self._max_tasks:
+        gc_tasks.add(task)
+    self.log('After total task filter: %s tasks' % len(gc_tasks))
+
+    self.log('Deciding to garbage collect the following tasks:')
+    if gc_tasks:
+      for task in gc_tasks:
+        self.log('   %s' % repr(task))
+    else:
+      self.log('   None.')
+
+    return gc_tasks

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/apache/thermos/monitoring/monitor.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/thermos/monitoring/monitor.py b/src/main/python/apache/thermos/monitoring/monitor.py
new file mode 100644
index 0000000..5af67fe
--- /dev/null
+++ b/src/main/python/apache/thermos/monitoring/monitor.py
@@ -0,0 +1,125 @@
+"""Monitor the state of Thermos tasks on a system
+
+This module contains the TaskMonitor, used to reconstruct the state of active or finished Thermos
+tasks based on their checkpoint streams. It exposes two key pieces of information about a Task, both
+as their corresponding Thrift structs:
+  - a RunnerState, representing the latest state of the Task
+  - a list of ProcessStates, representing the processes currently running within the Task
+
+"""
+
+import os
+import copy
+import errno
+import threading
+
+from twitter.common import log
+from twitter.common.recordio import ThriftRecordReader
+from twitter.thermos.common.ckpt import CheckpointDispatcher
+
+from gen.twitter.thermos.ttypes import (
+  ProcessState,
+  RunnerCkpt,
+  RunnerState,
+  TaskState,
+)
+
+
+class TaskMonitor(object):
+  """
+    Class responsible for reconstructing and monitoring the state of an individual Thermos task via
+    its runner checkpoint. Also exports information on active processes in the task.
+  """
+
+  def __init__(self, pathspec, task_id):
+    self._task_id = task_id
+    self._dispatcher = CheckpointDispatcher()
+    self._runnerstate = RunnerState(processes={})
+    self._runner_ckpt = pathspec.given(task_id=task_id).getpath('runner_checkpoint')
+    self._active_file, self._finished_file = (
+        pathspec.given(task_id=task_id, state=state).getpath('task_path')
+        for state in ('active', 'finished'))
+    self._ckpt_head = 0
+    self._apply_states()
+    self._lock = threading.Lock()
+
+  def _apply_states(self):
+    """
+      os.stat() the corresponding checkpoint stream of this task and determine if there are new ckpt
+      records.  Attempt to read those records and update the high watermark for that stream.
+      Returns True if new states were applied, False otherwise.
+    """
+    ckpt_offset = None
+    try:
+      ckpt_offset = os.stat(self._runner_ckpt).st_size
+
+      updated = False
+      if self._ckpt_head < ckpt_offset:
+        with open(self._runner_ckpt, 'r') as fp:
+          fp.seek(self._ckpt_head)
+          rr = ThriftRecordReader(fp, RunnerCkpt)
+          while True:
+            runner_update = rr.try_read()
+            if not runner_update:
+              break
+            try:
+              self._dispatcher.dispatch(self._runnerstate, runner_update)
+            except CheckpointDispatcher.InvalidSequenceNumber as e:
+              log.error('Checkpoint stream is corrupt: %s' % e)
+              break
+          new_ckpt_head = fp.tell()
+          updated = self._ckpt_head != new_ckpt_head
+          self._ckpt_head = new_ckpt_head
+      return updated
+    except OSError as e:
+      if e.errno == errno.ENOENT:
+        # The log doesn't yet exist, will retry later.
+        log.warning('Could not read from discovered task %s.' % self._task_id)
+        return False
+      else:
+        raise
+
+  def refresh(self):
+    """
+      Check to see if there are new updates and apply them.  Return true if
+      updates were applied, false otherwise.
+    """
+    with self._lock:
+      return self._apply_states()
+
+  def get_state(self):
+    """
+      Get the latest state of this Task.
+    """
+    with self._lock:
+      self._apply_states()
+      return copy.deepcopy(self._runnerstate)
+
+  def task_state(self):
+    state = self.get_state()
+    return state.statuses[-1].state if state.statuses else TaskState.ACTIVE
+
+  @property
+  def active(self):
+    return os.path.exists(self._active_file)
+
+  @property
+  def finished(self):
+    return os.path.exists(self._finished_file)
+
+  def get_active_processes(self):
+    """
+      Get active processes.  Returned is a list of tuples of the form:
+        (ProcessStatus object of running object, its run number)
+    """
+    active_processes = []
+    with self._lock:
+      self._apply_states()
+      state = self._runnerstate
+      for process, runs in state.processes.items():
+        if len(runs) == 0:
+          continue
+        last_run = runs[-1]
+        if last_run.state == ProcessState.RUNNING:
+          active_processes.append((last_run, len(runs) - 1))
+    return active_processes

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/apache/thermos/monitoring/process.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/thermos/monitoring/process.py b/src/main/python/apache/thermos/monitoring/process.py
new file mode 100644
index 0000000..294682f
--- /dev/null
+++ b/src/main/python/apache/thermos/monitoring/process.py
@@ -0,0 +1,49 @@
+"""Represent resource consumption statistics for processes
+
+This module exposes one class: the ProcessSample, used to represent resource consumption. A single
+ProcessSample might correspond to one individual process, or to an aggregate of multiple processes.
+
+"""
+
+from collections import namedtuple
+
+
+class ProcessSample(namedtuple('ProcessSample', 'rate user system rss vms nice status threads')):
+  """ Sample of statistics about a process's resource consumption (either a single process or an
+  aggregate of processes) """
+
+  @staticmethod
+  def empty():
+    return ProcessSample(rate=0, user=0, system=0, rss=0, vms=0, nice=None, status=None, threads=0)
+
+  def __add__(self, other):
+    if self.nice is not None and other.nice is None:
+      nice = self.nice
+    else:
+      nice = other.nice
+    if self.status is not None and other.status is None:
+      status = self.status
+    else:
+      status = other.status
+    return ProcessSample(
+      rate = self.rate + other.rate,
+      user = self.user + other.user,
+      system = self.system + other.system,
+      rss = self.rss + other.rss,
+      vms = self.vms + other.vms,
+      nice = nice,
+      status = status,
+      threads = self.threads + other.threads)
+
+  def to_dict(self):
+    return dict(
+      cpu     = self.rate,
+      ram     = self.rss,
+      user    = self.user,
+      system  = self.system,
+      rss     = self.rss,
+      vms     = self.vms,
+      nice    = self.nice,
+      status  = str(self.status),
+      threads = self.threads
+    )

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/apache/thermos/monitoring/process_collector_psutil.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/thermos/monitoring/process_collector_psutil.py b/src/main/python/apache/thermos/monitoring/process_collector_psutil.py
new file mode 100644
index 0000000..7b68173
--- /dev/null
+++ b/src/main/python/apache/thermos/monitoring/process_collector_psutil.py
@@ -0,0 +1,92 @@
+""" Sample resource consumption statistics for processes using psutil """
+
+from operator import attrgetter
+from time import time
+
+from twitter.common import log
+
+from .process import ProcessSample
+
+from psutil import Process
+from psutil import NoSuchProcess, AccessDenied, Error as PsutilError
+
+
+def process_to_sample(process):
+  """ Given a psutil.Process, return a current ProcessSample """
+  try:
+    # the nonblocking get_cpu_percent call is stateful on a particular Process object, and hence
+    # >2 consecutive calls are required before it will return a non-zero value
+    rate = process.get_cpu_percent(0.0) / 100.0
+    user, system = process.get_cpu_times()
+    rss, vms = process.get_memory_info()
+    nice = process.nice
+    status = process.status
+    threads = process.get_num_threads()
+    return ProcessSample(rate, user, system, rss, vms, nice, status, threads)
+  except (AccessDenied, NoSuchProcess) as e:
+    log.warning('Error during process sampling [pid=%s]: %s' % (process.pid, e))
+    return ProcessSample.empty()
+
+
+class ProcessTreeCollector(object):
+  """ Collect resource consumption statistics for a process and its children """
+  def __init__(self, pid):
+    """ Given a pid """
+    self._pid = pid
+    self._process = None  # psutil.Process
+    self._sampled_tree = {} # pid => ProcessSample
+    self._sample = ProcessSample.empty()
+    self._stamp = None
+    self._rate = 0.0
+    self._procs = 1
+
+  def sample(self):
+    """ Collate and aggregate ProcessSamples for process and children
+        Returns None: result is stored in self.value
+    """
+    try:
+      last_sample, last_stamp = self._sample, self._stamp
+      if self._process is None:
+        self._process = Process(self._pid)
+      parent = self._process
+      parent_sample = process_to_sample(parent)
+      new_samples = dict(
+          (proc.pid, process_to_sample(proc))
+          for proc in parent.get_children(recursive=True)
+      )
+      new_samples[self._pid] = parent_sample
+
+    except PsutilError as e:
+      log.warning('Error during process sampling: %s' % e)
+      self._sample = ProcessSample.empty()
+      self._rate = 0.0
+
+    else:
+      last_stamp = self._stamp
+      self._stamp = time()
+      # for most stats, calculate simple sum to aggregate
+      self._sample = sum(new_samples.values(), ProcessSample.empty())
+      # cpu consumption is more complicated
+      # We require at least 2 generations of a process before we can calculate rate, so for all
+      # current processes that were not running in the previous sample, compare to an empty sample
+      if self._sampled_tree and last_stamp:
+        new = new_samples.values()
+        old = [self._sampled_tree.get(pid, ProcessSample.empty()) for pid in new_samples.keys()]
+        new_user_sys = sum(map(attrgetter('user'), new)) + sum(map(attrgetter('system'), new))
+        old_user_sys = sum(map(attrgetter('user'), old)) + sum(map(attrgetter('system'), old))
+        self._rate = (new_user_sys - old_user_sys) / (self._stamp - last_stamp)
+        log.debug("Calculated rate for pid=%s and children: %s" % (self._process.pid, self._rate))
+      self._sampled_tree = new_samples
+
+  @property
+  def value(self):
+    """ Aggregated ProcessSample representing resource consumption of the tree """
+    # Since we don't trust the CPU consumption returned by psutil, replace it with our own in the
+    # exported ProcessSample
+    return self._sample._replace(rate=self._rate)
+
+  @property
+  def procs(self):
+    """ Number of active processes in the tree """
+    return len(self._sampled_tree)
+

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/apache/thermos/monitoring/resource.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/thermos/monitoring/resource.py b/src/main/python/apache/thermos/monitoring/resource.py
new file mode 100644
index 0000000..468f1b1
--- /dev/null
+++ b/src/main/python/apache/thermos/monitoring/resource.py
@@ -0,0 +1,222 @@
+"""Monitor the resource consumption of Thermos tasks
+
+This module contains classes used to monitor the resource consumption (e.g. CPU, RAM, disk) of
+Thermos tasks. Resource monitoring of a Thermos task typically treats the task as an aggregate of
+all the processes within it. Importantly, this excludes the process(es) of Thermos itself (i.e. the
+TaskRunner and any other wrappers involved in launching a task).
+
+The ResourceMonitorBase defines the interface for other components (for example, the Thermos
+TaskObserver) to interact with and retrieve information about a Task's resource consumption.  The
+canonical/reference implementation of a ResourceMonitor is the TaskResourceMonitor, a thread which
+actively monitors resources for a particular task by periodically polling process information and
+disk consumption and retaining a limited (FIFO) in-memory history of this data.
+
+"""
+
+from abc import abstractmethod
+from bisect import bisect_left
+from collections import namedtuple
+from operator import attrgetter
+import platform
+import threading
+import time
+
+from twitter.common import log
+from twitter.common.collections import RingBuffer
+from twitter.common.concurrent import EventMuxer
+from twitter.common.lang import Interface
+from twitter.common.quantity import Amount, Time
+
+from .disk import DiskCollector
+from .monitor import TaskMonitor
+from .process import ProcessSample
+from .process_collector_psutil import ProcessTreeCollector
+
+
+class ResourceMonitorBase(Interface):
+  """ Defines the interface for interacting with a ResourceMonitor """
+
+  class Error(Exception): pass
+
+  class ResourceResult(namedtuple('ResourceResult', 'num_procs process_sample disk_usage')):
+    pass
+
+  @abstractmethod
+  def sample(self):
+    """ Return a sample of the resource consumption of the task right now
+
+    Returns a tuple of (timestamp, ResourceResult)
+    """
+
+  @abstractmethod
+  def sample_at(self, time):
+    """ Return a sample of the resource consumption as close as possible to the specified time
+
+    Returns a tuple of (timestamp, ResourceResult)
+    """
+
+  @abstractmethod
+  def sample_by_process(self, process_name):
+    """ Return a sample of the resource consumption of a specific process in the task right now
+
+    Returns a ProcessSample
+    """
+
+
+class ResourceHistory(object):
+  """Simple class to contain a RingBuffer (fixed-length FIFO) history of resource samples, with the
+       mapping: timestamp => (number_of_procs, ProcessSample, disk_usage_in_bytes)
+  """
+  def __init__(self, maxlen, initialize=True):
+    if not maxlen >= 1:
+      raise ValueError("maxlen must be greater than 0")
+    self._maxlen = maxlen
+    self._values = RingBuffer(maxlen, None)
+    if initialize:
+      self.add(time.time(), ResourceMonitorBase.ResourceResult(0, ProcessSample.empty(), 0))
+
+  def add(self, timestamp, value):
+    """Store a new resource sample corresponding to the given timestamp"""
+    if self._values and not timestamp >= self._values[-1][0]:
+      raise ValueError("Refusing to add timestamp in the past!")
+    self._values.append((timestamp, value))
+
+  def get(self, timestamp):
+    """Get the resource sample nearest to the given timestamp"""
+    closest = min(bisect_left(self._values, (timestamp, None)), len(self) - 1)
+    return self._values[closest]
+
+  def __iter__(self):
+    return iter(self._values)
+
+  def __len__(self):
+    return len(self._values)
+
+  def __repr__(self):
+    return 'ResourceHistory(%s)' % ', '.join([str(r) for r in self._values])
+
+
+class TaskResourceMonitor(ResourceMonitorBase, threading.Thread):
+  """ Lightweight thread to aggregate resource consumption for a task's constituent processes.
+      Actual resource calculation is delegated to collectors; this class periodically polls the
+      collectors and aggregates into a representation for the entire task. Also maintains a limited
+      history of previous sample results.
+  """
+
+  MAX_HISTORY = 10000 # magic number
+
+  def __init__(self, task_monitor, sandbox,
+               process_collector=ProcessTreeCollector, disk_collector=DiskCollector,
+               process_collection_interval=Amount(20, Time.SECONDS),
+               disk_collection_interval=Amount(1, Time.MINUTES),
+               history_time=Amount(1, Time.HOURS)):
+    """
+      task_monitor: TaskMonitor object specifying the task whose resources should be monitored
+      sandbox: Directory for which to monitor disk utilisation
+    """
+    self._task_monitor = task_monitor # exposes PIDs, sandbox
+    self._task_id = task_monitor._task_id
+    log.debug('Initialising resource collection for task %s' % self._task_id)
+    self._process_collectors = dict() # ProcessStatus => ProcessTreeCollector
+    # TODO(jon): sandbox is also available through task_monitor, but typically the first checkpoint
+    # isn't written (and hence the header is not available) by the time we initialise here
+    self._sandbox = sandbox
+    self._process_collector_factory = process_collector
+    self._disk_collector = disk_collector(self._sandbox)
+    self._process_collection_interval = process_collection_interval.as_(Time.SECONDS)
+    self._disk_collection_interval = disk_collection_interval.as_(Time.SECONDS)
+    min_collection_interval = min(self._process_collection_interval, self._disk_collection_interval)
+    history_length = int(history_time.as_(Time.SECONDS) / min_collection_interval)
+    if history_length > self.MAX_HISTORY:
+      raise ValueError("Requested history length too large")
+    log.debug("Initialising ResourceHistory of length %s" % history_length)
+    self._history = ResourceHistory(history_length)
+    self._kill_signal = threading.Event()
+    threading.Thread.__init__(self)
+    self.daemon = True
+
+  def sample(self):
+    if not self.is_alive():
+      log.warning("TaskResourceMonitor not running - sample may be inaccurate")
+    return self.sample_at(time.time())
+
+  def sample_at(self, timestamp):
+    return self._history.get(timestamp)
+
+  def sample_by_process(self, process_name):
+    try:
+      process = [process for process in self._get_active_processes()
+                 if process.process == process_name].pop()
+    except IndexError:
+      raise ValueError('No active process found with name "%s" in this task' % process_name)
+    else:
+      # Since this might be called out of band (before the main loop is aware of the process)
+      if process not in self._process_collectors:
+        self._process_collectors[process] = self._process_collector_factory(process.pid)
+
+      self._process_collectors[process].sample()
+      return self._process_collectors[process].value
+
+  def _get_active_processes(self):
+    """Get a list of ProcessStatus objects representing currently-running processes in the task"""
+    return [process for process, _ in self._task_monitor.get_active_processes()]
+
+  def kill(self):
+    """Signal that the thread should cease collecting resources and terminate"""
+    self._kill_signal.set()
+
+  def run(self):
+    """Thread entrypoint. Loop indefinitely, polling collectors at self._collection_interval and
+    collating samples."""
+
+    log.debug('Commencing resource monitoring for task "%s"' % self._task_id)
+    next_process_collection = 0
+    next_disk_collection = 0
+
+    while not self._kill_signal.is_set():
+
+      now = time.time()
+
+      if now > next_process_collection:
+        next_process_collection = now + self._process_collection_interval
+        actives = set(self._get_active_processes())
+        current = set(self._process_collectors)
+        for process in current - actives:
+          log.debug('Process "%s" (pid %s) no longer active, removing from monitored processes' %
+                   (process.process, process.pid))
+          self._process_collectors.pop(process)
+        for process in actives - current:
+          log.debug('Adding process "%s" (pid %s) to resource monitoring' %
+                   (process.process, process.pid))
+          self._process_collectors[process] = self._process_collector_factory(process.pid)
+        for process, collector in self._process_collectors.iteritems():
+          log.debug('Collecting sample for process "%s" (pid %s) and children' %
+                   (process.process, process.pid))
+          collector.sample()
+
+      if now > next_disk_collection:
+        next_disk_collection = now + self._disk_collection_interval
+        log.debug('Collecting disk sample for %s' % self._sandbox)
+        self._disk_collector.sample()
+
+      try:
+        aggregated_procs = sum(map(attrgetter('procs'), self._process_collectors.values()))
+        aggregated_sample = sum(map(attrgetter('value'), self._process_collectors.values()),
+                                ProcessSample.empty())
+        self._history.add(now, self.ResourceResult(aggregated_procs, aggregated_sample,
+                                                   self._disk_collector.value))
+        log.debug("Recorded resource sample at %s" % now)
+      except ValueError as err:
+        log.warning("Error recording resource sample: %s" % err)
+
+      # Sleep until any of the following conditions are met:
+      # - it's time for the next disk collection
+      # - it's time for the next process collection
+      # - the result from the last disk collection is available via the DiskCollector
+      # - the TaskResourceMonitor has been killed via self._kill_signal
+      now = time.time()
+      next_collection = min(next_process_collection - now, next_disk_collection - now)
+      EventMuxer(self._kill_signal, self._disk_collector.completed_event
+                ).wait(timeout=max(0, next_collection))
+
+    log.debug('Stopping resource monitoring for task "%s"' % self._task_id)

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/apache/thermos/observer/BUILD
----------------------------------------------------------------------
diff --git a/src/main/python/apache/thermos/observer/BUILD b/src/main/python/apache/thermos/observer/BUILD
new file mode 100644
index 0000000..00ea9cb
--- /dev/null
+++ b/src/main/python/apache/thermos/observer/BUILD
@@ -0,0 +1,51 @@
+import os
+
+python_library(
+  name = 'observed_task',
+  sources = ['observed_task.py'],
+  dependencies = [
+    pants('aurora/twitterdeps/src/python/twitter/common/lang'),
+    pants('aurora/twitterdeps/src/python/twitter/common/log'),
+    pants('src/main/python/twitter/thermos:pystachio'),
+    pants('src/main/python/twitter/thermos/common:ckpt'),
+    pants('src/main/python/twitter/thermos/config'),
+  ]
+)
+
+python_library(
+  name = 'task_observer',
+  sources = ['task_observer.py'],
+  dependencies = [
+    pants(':observed_task'),
+    pants('aurora/twitterdeps/src/python/twitter/common/exceptions'),
+    pants('aurora/twitterdeps/src/python/twitter/common/lang'),
+    pants('aurora/twitterdeps/src/python/twitter/common/log'),
+    pants('aurora/twitterdeps/src/python/twitter/common/quantity'),
+    pants('src/main/python/twitter/thermos/common:path'),
+    pants('src/main/python/twitter/thermos/monitoring:detector'),
+    pants('src/main/python/twitter/thermos/monitoring:monitor'),
+    pants('src/main/python/twitter/thermos/monitoring:process'),
+    pants('src/main/python/twitter/thermos/monitoring:resource'),
+    pants('src/main/thrift/com/twitter/thermos:py-thrift'),
+  ]
+)
+
+python_library(
+  name = 'observer',
+  dependencies = [
+    pants(':task_observer'),
+    pants('src/main/python/twitter/thermos/observer/http:http_observer'),
+
+    # covering libraries
+    pants('src/main/python/twitter/thermos/common'),
+    pants('src/main/python/twitter/thermos/config'),
+    pants('src/main/python/twitter/thermos/monitoring'),
+  ],
+  provides = setup_py(
+    name = 'twitter.thermos.observer',
+    version = open(os.path.join(get_buildroot(), '.auroraversion')).read().strip().lower(),
+    description = 'The Thermos observer web interface.',
+  ).with_binaries(
+    thermos_observer = pants('src/main/python/twitter/thermos/observer/bin:thermos_observer'),
+  )
+)

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/apache/thermos/observer/__init__.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/thermos/observer/__init__.py b/src/main/python/apache/thermos/observer/__init__.py
new file mode 100644
index 0000000..e69de29

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/apache/thermos/observer/bin/BUILD
----------------------------------------------------------------------
diff --git a/src/main/python/apache/thermos/observer/bin/BUILD b/src/main/python/apache/thermos/observer/bin/BUILD
new file mode 100644
index 0000000..26eb148
--- /dev/null
+++ b/src/main/python/apache/thermos/observer/bin/BUILD
@@ -0,0 +1,14 @@
+python_binary(
+  name = 'thermos_observer',
+  source = 'thermos_observer.py',
+  entry_point = 'twitter.thermos.observer.bin.thermos_observer:proxy_main',
+  dependencies = [
+    pants('aurora/twitterdeps/src/python/twitter/common/app'),
+    pants('aurora/twitterdeps/src/python/twitter/common/exceptions'),
+    pants('aurora/twitterdeps/src/python/twitter/common/http'),
+    pants('src/main/python/twitter/thermos:cherrypy'),
+    pants('src/main/python/twitter/thermos/common:path'),
+    pants('src/main/python/twitter/thermos/observer/http:http_observer'),
+    pants('src/main/python/twitter/thermos/observer:task_observer'),
+  ],
+)

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/apache/thermos/observer/bin/__init__.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/thermos/observer/bin/__init__.py b/src/main/python/apache/thermos/observer/bin/__init__.py
new file mode 100644
index 0000000..e69de29

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/apache/thermos/observer/bin/thermos_observer.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/thermos/observer/bin/thermos_observer.py b/src/main/python/apache/thermos/observer/bin/thermos_observer.py
new file mode 100644
index 0000000..9e7f7a0
--- /dev/null
+++ b/src/main/python/apache/thermos/observer/bin/thermos_observer.py
@@ -0,0 +1,55 @@
+from __future__ import print_function
+
+import socket
+import sys
+import time
+
+from twitter.common import app
+from twitter.common.exceptions import ExceptionalThread
+from twitter.common.http import HttpServer
+from twitter.common.http.diagnostics import DiagnosticsEndpoints
+from twitter.thermos.common.path import TaskPath
+from twitter.thermos.observer.task_observer import TaskObserver
+from twitter.thermos.observer.http.http_observer import BottleObserver
+
+
+app.add_option("--root",
+               dest="root",
+               metavar="DIR",
+               default=TaskPath.DEFAULT_CHECKPOINT_ROOT,
+               help="root checkpoint directory for thermos task runners")
+
+
+app.add_option("--port",
+               dest="port",
+               metavar="INT",
+               default=1338,
+               help="port number to listen on.")
+
+
+def proxy_main():
+  def main(args, opts):
+    if args:
+      print("ERROR: unrecognized arguments: %s\n" % (" ".join(args)), file=sys.stderr)
+      app.help()
+      sys.exit(1)
+
+    root_server = HttpServer()
+    root_server.mount_routes(DiagnosticsEndpoints())
+
+    task_observer = TaskObserver(opts.root)
+    task_observer.start()
+
+    bottle_wrapper = BottleObserver(task_observer)
+
+    root_server.mount_routes(bottle_wrapper)
+
+    def run():
+      root_server.run('0.0.0.0', opts.port, 'cherrypy')
+
+    et = ExceptionalThread(target=run)
+    et.daemon = True
+    et.start()
+    et.join()
+
+  app.main()

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/apache/thermos/observer/http/BUILD
----------------------------------------------------------------------
diff --git a/src/main/python/apache/thermos/observer/http/BUILD b/src/main/python/apache/thermos/observer/http/BUILD
new file mode 100644
index 0000000..9f3d587
--- /dev/null
+++ b/src/main/python/apache/thermos/observer/http/BUILD
@@ -0,0 +1,47 @@
+python_library(
+  name = 'json',
+  sources = ['json.py'],
+  dependencies = [
+    pants('aurora/twitterdeps/src/python/twitter/common/http'),
+  ]
+)
+
+python_library(
+  name = 'static_assets',
+  sources = ['static_assets.py'],
+  resources = rglobs('assets/*'),
+  dependencies = [
+    pants('src/main/python/twitter/thermos:bottle'),
+  ]
+)
+
+python_library(
+  name = 'templating',
+  sources = ['templating.py'],
+  resources = globs('templates/*.tpl'),
+)
+
+python_library(
+  name = 'file_browser',
+  sources = ['file_browser.py'],
+  dependencies = [
+    pants(':templating'),
+    pants('aurora/twitterdeps/src/python/twitter/common/log'),
+    pants('aurora/twitterdeps/src/python/twitter/common/http'),
+    pants('src/main/python/twitter/thermos:bottle'),
+    pants('src/main/python/twitter/thermos:mako'),
+  ]
+)
+
+python_library(
+  name = 'http_observer',
+  sources = ['__init__.py', 'http_observer.py'],
+  dependencies = [
+    pants(':file_browser'),
+    pants(':json'),
+    pants(':static_assets'),
+    pants(':templating'),
+    pants('aurora/twitterdeps/src/python/twitter/common/log'),
+    pants('aurora/twitterdeps/src/python/twitter/common/http'),
+  ]
+)

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/apache/thermos/observer/http/__init__.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/thermos/observer/http/__init__.py b/src/main/python/apache/thermos/observer/http/__init__.py
new file mode 100644
index 0000000..e69de29