You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aurora.apache.org by wf...@apache.org on 2013/12/31 22:20:02 UTC

[09/51] [partial] Rename twitter* and com.twitter to apache and org.apache directories to preserve all file history before the refactor.

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/twitter/thermos/core/runner.py
----------------------------------------------------------------------
diff --git a/src/main/python/twitter/thermos/core/runner.py b/src/main/python/twitter/thermos/core/runner.py
deleted file mode 100644
index b10cf16..0000000
--- a/src/main/python/twitter/thermos/core/runner.py
+++ /dev/null
@@ -1,905 +0,0 @@
-""" Thermos runner.
-
-This module contains the TaskRunner, the core component of Thermos responsible for actually running
-tasks. It also contains several Handlers which define the behaviour on state transitions within the
-TaskRunner.
-
-There are three "active" states in a running Thermos task:
-  ACTIVE
-  CLEANING
-  FINALIZING
-
-A task in ACTIVE state is running regular processes.  The moment this task succeeds or goes over its
-failure limit, it then goes into CLEANING state, where it begins the staged termination of leftover
-processes (with SIGTERMs).  Once all processes have terminated, the task goes into FINALIZING state,
-where the processes marked with the 'final' bit run.  Once the task has gone into CLEANING state, it
-has a deadline for going into terminal state.  If it doesn't make it in time (at any point, whether
-in CLEANING or FINALIZING state), it is forced into terminal state through SIGKILLs of all live
-processes (coordinators, shells and the full process trees rooted at the shells.)
-
-TaskRunner.kill is implemented by forcing the task into CLEANING state and setting its finalization
-deadline manually.  So in practice, we implement Task preemption by calling kill with the
-finalization deadline = now + preemption wait, which gives the Task an opportunity to do graceful
-shutdown.  If preemption_wait=0, it will result in immediate SIGKILLs and then transition to the
-terminal state.
-
-"""
-
-from contextlib import contextmanager
-import errno
-from functools import partial
-import os
-import socket
-import sys
-import time
-import traceback
-
-from twitter.common import log
-from twitter.common.dirutil import safe_mkdir
-from twitter.common.quantity import Amount, Time
-from twitter.common.recordio import ThriftRecordReader
-
-from twitter.thermos.common.ckpt import (
-  CheckpointDispatcher,
-  UniversalStateHandler,
-  ProcessStateHandler,
-  TaskStateHandler)
-from twitter.thermos.common.path import TaskPath
-from twitter.thermos.common.planner import TaskPlanner
-from twitter.thermos.config.loader import (
-  ThermosConfigLoader,
-  ThermosProcessWrapper,
-  ThermosTaskWrapper,
-  ThermosTaskValidator)
-from twitter.thermos.config.schema import ThermosContext
-
-from gen.twitter.thermos.ttypes import (
-  ProcessState,
-  ProcessStatus,
-  RunnerCkpt,
-  RunnerHeader,
-  RunnerState,
-  TaskState,
-  TaskStatus,
-)
-
-from .helper import TaskRunnerHelper
-from .muxer import ProcessMuxer
-from .process import Process
-
-from pystachio import Environment
-
-
-# TODO(wickman) Currently this is messy because of all the private access into ._runner.
-# Clean this up by giving the TaskRunnerProcessHandler the components it should own, and
-# create a legitimate API contract into the Runner.
-class TaskRunnerProcessHandler(ProcessStateHandler):
-  """
-    Accesses these parts of the runner:
-
-      | _task_processes [array set, pop]
-      | _task_process_from_process_name [process name / sequence number => Process]
-      | _watcher [ProcessMuxer.register, unregister]
-      | _plan [add_success, add_failure, set_running]
-  """
-
-  def __init__(self, runner):
-    self._runner = runner
-
-  def on_waiting(self, process_update):
-    log.debug('Process on_waiting %s' % process_update)
-    self._runner._task_processes[process_update.process] = (
-      self._runner._task_process_from_process_name(
-        process_update.process, process_update.seq + 1))
-    self._runner._watcher.register(process_update.process, process_update.seq - 1)
-
-  def on_forked(self, process_update):
-    log.debug('Process on_forked %s' % process_update)
-    task_process = self._runner._task_processes[process_update.process]
-    task_process.rebind(process_update.coordinator_pid, process_update.fork_time)
-    self._runner._plan.set_running(process_update.process)
-
-  def on_running(self, process_update):
-    log.debug('Process on_running %s' % process_update)
-    self._runner._plan.set_running(process_update.process)
-
-  def _cleanup(self, process_update):
-    if not self._runner._recovery:
-      TaskRunnerHelper.kill_process(self._runner.state, process_update.process)
-
-  def on_success(self, process_update):
-    log.debug('Process on_success %s' % process_update)
-    log.info('Process(%s) finished successfully [rc=%s]' % (
-      process_update.process, process_update.return_code))
-    self._cleanup(process_update)
-    self._runner._task_processes.pop(process_update.process)
-    self._runner._watcher.unregister(process_update.process)
-    self._runner._plan.add_success(process_update.process)
-
-  def _on_abnormal(self, process_update):
-    log.info('Process %s had an abnormal termination' % process_update.process)
-    self._runner._task_processes.pop(process_update.process)
-    self._runner._watcher.unregister(process_update.process)
-
-  def on_failed(self, process_update):
-    log.debug('Process on_failed %s' % process_update)
-    log.info('Process(%s) failed [rc=%s]' % (process_update.process, process_update.return_code))
-    self._cleanup(process_update)
-    self._on_abnormal(process_update)
-    self._runner._plan.add_failure(process_update.process)
-    if process_update.process in self._runner._plan.failed:
-      log.info('Process %s reached maximum failures, marking process run failed.' %
-          process_update.process)
-    else:
-      log.info('Process %s under maximum failure limit, restarting.' % process_update.process)
-
-  def on_lost(self, process_update):
-    log.debug('Process on_lost %s' % process_update)
-    self._cleanup(process_update)
-    self._on_abnormal(process_update)
-    self._runner._plan.lost(process_update.process)
-
-  def on_killed(self, process_update):
-    log.debug('Process on_killed %s' % process_update)
-    self._cleanup(process_update)
-    self._runner._task_processes.pop(process_update.process)
-    self._runner._watcher.unregister(process_update.process)
-    log.debug('Process killed, marking it as a loss.')
-    self._runner._plan.lost(process_update.process)
-
-
-class TaskRunnerTaskHandler(TaskStateHandler):
-  """
-    Accesses these parts of the runner:
-      _plan [set to regular_plan or finalizing_plan]
-      _recovery [boolean, whether or not to side-effect]
-      _pathspec [path creation]
-      _task [ThermosTask]
-      _set_finalization_start
-      _kill
-  """
-
-  def __init__(self, runner):
-    self._runner = runner
-    self._pathspec = self._runner._pathspec
-
-  def on_active(self, task_update):
-    log.debug('Task on_active(%s)' % task_update)
-    self._runner._plan = self._runner._regular_plan
-    if self._runner._recovery:
-      return
-    TaskRunnerHelper.initialize_task(self._pathspec,
-        ThermosTaskWrapper(self._runner._task).to_json())
-
-  def on_cleaning(self, task_update):
-    log.debug('Task on_cleaning(%s)' % task_update)
-    self._runner._finalization_start = task_update.timestamp_ms / 1000.0
-    self._runner._terminate_plan(self._runner._regular_plan)
-
-  def on_finalizing(self, task_update):
-    log.debug('Task on_finalizing(%s)' % task_update)
-    if not self._runner._recovery:
-      self._runner._kill()
-    self._runner._plan = self._runner._finalizing_plan
-    if self._runner._finalization_start is None:
-      self._runner._finalization_start = task_update.timestamp_ms / 1000.0
-
-  def on_killed(self, task_update):
-    log.debug('Task on_killed(%s)' % task_update)
-    self._cleanup()
-
-  def on_success(self, task_update):
-    log.debug('Task on_success(%s)' % task_update)
-    self._cleanup()
-    log.info('Task succeeded.')
-
-  def on_failed(self, task_update):
-    log.debug('Task on_failed(%s)' % task_update)
-    self._cleanup()
-
-  def on_lost(self, task_update):
-    log.debug('Task on_lost(%s)' % task_update)
-    self._cleanup()
-
-  def _cleanup(self):
-    if not self._runner._recovery:
-      self._runner._kill()
-      TaskRunnerHelper.finalize_task(self._pathspec)
-
-
-class TaskRunnerUniversalHandler(UniversalStateHandler):
-  """
-    Universal handler to checkpoint every process and task transition of the runner.
-
-    Accesses these parts of the runner:
-      _ckpt_write
-  """
-
-  def __init__(self, runner):
-    self._runner = runner
-
-  def _checkpoint(self, record):
-    self._runner._ckpt_write(record)
-
-  def on_process_transition(self, state, process_update):
-    log.debug('_on_process_transition: %s' % process_update)
-    self._checkpoint(RunnerCkpt(process_status=process_update))
-
-  def on_task_transition(self, state, task_update):
-    log.debug('_on_task_transition: %s' % task_update)
-    self._checkpoint(RunnerCkpt(task_status=task_update))
-
-  def on_initialization(self, header):
-    log.debug('_on_initialization: %s' % header)
-    ThermosTaskValidator.assert_valid_task(self._runner.task)
-    ThermosTaskValidator.assert_valid_ports(self._runner.task, header.ports)
-    self._checkpoint(RunnerCkpt(runner_header=header))
-
-
-class TaskRunnerStage(object):
-  """
-    A stage of the task runner pipeline.
-  """
-  MAX_ITERATION_WAIT = Amount(1, Time.SECONDS)
-
-  def __init__(self, runner):
-    self.runner = runner
-    self.clock = runner._clock
-
-  def run(self):
-    """
-      Perform any work necessary at this stage of the task.
-
-      If there is no more work to be done, return None. [This will invoke a state transition.]
-
-      If there is still work to be done, return the number of seconds from now in which you'd like
-      to be called to re-run the plan.
-    """
-    return None
-
-  def transition_to(self):
-    """
-      The stage to which we should transition.
-    """
-    raise NotImplementedError
-
-
-class TaskRunnerStage_ACTIVE(TaskRunnerStage):
-  """
-    Run the regular plan (i.e. normal, non-finalizing processes.)
-  """
-  MAX_ITERATION_WAIT = Amount(15, Time.SECONDS)
-  MIN_ITERATION_WAIT = Amount(1, Time.SECONDS)
-
-  def __init__(self, runner):
-    super(TaskRunnerStage_ACTIVE, self).__init__(runner)
-
-  def run(self):
-    launched = self.runner._run_plan(self.runner._regular_plan)
-
-    # Have we terminated?
-    terminal_state = None
-    if self.runner._regular_plan.is_complete():
-      log.info('Regular plan complete.')
-      terminal_state = TaskState.SUCCESS if self.runner.is_healthy() else TaskState.FAILED
-    elif not self.runner.is_healthy():
-      log.error('Regular plan unhealthy!')
-      terminal_state = TaskState.FAILED
-
-    if terminal_state:
-      # No more work to do
-      return None
-    elif launched > 0:
-      # We want to run ASAP after updates have been collected
-      return max(self.MIN_ITERATION_WAIT.as_(Time.SECONDS), self.runner._regular_plan.min_wait())
-    else:
-      # We want to run as soon as something is available to run or after a prescribed timeout.
-      return min(self.MAX_ITERATION_WAIT.as_(Time.SECONDS), self.runner._regular_plan.min_wait())
-
-  def transition_to(self):
-    return TaskState.CLEANING
-
-
-class TaskRunnerStage_CLEANING(TaskRunnerStage):
-  """
-    Start the cleanup of the regular plan (e.g. if it failed.)  On ACTIVE -> CLEANING,
-    we send SIGTERMs to all still-running processes.  We wait at most finalization_wait
-    for all processes to complete before SIGKILLs are sent.  If everything exits cleanly
-    prior to that point in time, we transition to FINALIZING, which kicks into gear
-    the finalization schedule (if any.)
-  """
-  def run(self):
-    log.debug('TaskRunnerStage[CLEANING]: Finalization remaining: %s' %
-        self.runner._finalization_remaining())
-    if self.runner._finalization_remaining() > 0 and self.runner.has_running_processes():
-      return min(self.runner._finalization_remaining(), self.MAX_ITERATION_WAIT.as_(Time.SECONDS))
-
-  def transition_to(self):
-    if self.runner._finalization_remaining() <= 0:
-      log.info('Exceeded finalization wait, skipping finalization.')
-      return self.runner.terminal_state()
-    return TaskState.FINALIZING
-
-
-class TaskRunnerStage_FINALIZING(TaskRunnerStage):
-  """
-    Run the finalizing plan, specifically the plan of tasks with the 'final'
-    bit marked (e.g. log savers, checkpointers and the like.)  Anything in this
-    plan will be SIGKILLed if we go over the finalization_wait.
-  """
-
-  def run(self):
-    self.runner._run_plan(self.runner._finalizing_plan)
-    log.debug('TaskRunnerStage[FINALIZING]: Finalization remaining: %s' %
-        self.runner._finalization_remaining())
-    if self.runner.deadlocked(self.runner._finalizing_plan):
-      log.warning('Finalizing plan deadlocked.')
-      return None
-    if self.runner._finalization_remaining() > 0 and not self.runner._finalizing_plan.is_complete():
-      return min(self.runner._finalization_remaining(), self.MAX_ITERATION_WAIT.as_(Time.SECONDS))
-
-  def transition_to(self):
-    if self.runner._finalization_remaining() <= 0:
-      log.info('Exceeded finalization wait, terminating finalization.')
-    return self.runner.terminal_state()
-
-
-class TaskRunner(object):
-  """
-    Run a ThermosTask.
-
-    This class encapsulates the core logic to run and control the state of a Thermos task.
-    Typically, it will be instantiated directly to control a new task, but a TaskRunner can also be
-    synthesised from an existing task's checkpoint root
-  """
-  class Error(Exception): pass
-  class InvalidTask(Error): pass
-  class InternalError(Error): pass
-  class PermissionError(Error): pass
-  class StateError(Error): pass
-
-  # Maximum amount of time we spend waiting for new updates from the checkpoint streams
-  # before doing housecleaning (checking for LOST tasks, dead PIDs.)
-  MAX_ITERATION_TIME = Amount(10, Time.SECONDS)
-
-  # Minimum amount of time we wait between polls for updates on coordinator checkpoints.
-  COORDINATOR_INTERVAL_SLEEP = Amount(1, Time.SECONDS)
-
-  # Amount of time we're willing to wait after forking before we expect the runner to have
-  # exec'ed the child process.
-  LOST_TIMEOUT = Amount(60, Time.SECONDS)
-
-  # Active task stages
-  STAGES = {
-    TaskState.ACTIVE: TaskRunnerStage_ACTIVE,
-    TaskState.CLEANING: TaskRunnerStage_CLEANING,
-    TaskState.FINALIZING: TaskRunnerStage_FINALIZING
-  }
-
-  @classmethod
-  def get(cls, task_id, checkpoint_root):
-    """
-      Get a TaskRunner bound to the task_id in checkpoint_root.
-    """
-    path = TaskPath(root=checkpoint_root, task_id=task_id, state='active')
-    task_json = path.getpath('task_path')
-    task_checkpoint = path.getpath('runner_checkpoint')
-    if not os.path.exists(task_json):
-      return None
-    task = ThermosConfigLoader.load_json(task_json)
-    if task is None:
-      return None
-    if len(task.tasks()) == 0:
-      return None
-    try:
-      checkpoint = CheckpointDispatcher.from_file(task_checkpoint)
-      if checkpoint is None or checkpoint.header is None:
-        return None
-      return cls(task.tasks()[0].task(), checkpoint_root, checkpoint.header.sandbox,
-                 log_dir=checkpoint.header.log_dir, task_id=task_id,
-                 portmap=checkpoint.header.ports)
-    except Exception as e:
-      log.error('Failed to reconstitute checkpoint in TaskRunner.get: %s' % e, exc_info=True)
-      return None
-
-  def __init__(self, task, checkpoint_root, sandbox, log_dir=None,
-               task_id=None, portmap=None, user=None, chroot=False, clock=time,
-               universal_handler=None, planner_class=TaskPlanner):
-    """
-      required:
-        task (config.Task) = the task to run
-        checkpoint_root (path) = the checkpoint root
-        sandbox (path) = the sandbox in which the path will be run
-                         [if None, cwd will be assumed, but garbage collection will be
-                          disabled for this task.]
-
-      optional:
-        log_dir (string)  = directory to house stdout/stderr logs. If not specified, logs will be
-                            written into the sandbox directory under .logs/
-        task_id (string)  = bind to this task id.  if not specified, will synthesize an id based
-                            upon task.name()
-        portmap (dict)    = a map (string => integer) from name to port, e.g. { 'http': 80 }
-        user (string)     = the user to run the task as.  if not current user, requires setuid
-                            privileges.
-        chroot (boolean)  = whether or not to chroot into the sandbox prior to exec.
-        clock (time interface) = the clock to use throughout
-        universal_handler = checkpoint record handler (only used for testing)
-        planner_class (TaskPlanner class) = TaskPlanner class to use for constructing the task
-                            planning policy.
-    """
-    if not issubclass(planner_class, TaskPlanner):
-      raise TypeError('planner_class must be a TaskPlanner.')
-    self._clock = clock
-    launch_time = self._clock.time()
-    launch_time_ms = '%06d' % int((launch_time - int(launch_time)) * 10**6)
-    if not task_id:
-      self._task_id = '%s-%s.%s' % (task.name(),
-                                    time.strftime('%Y%m%d-%H%M%S', time.localtime(launch_time)),
-                                    launch_time_ms)
-    else:
-      self._task_id = task_id
-    current_user = TaskRunnerHelper.get_actual_user()
-    self._user = user or current_user
-    # TODO(wickman) This should be delegated to the ProcessPlatform / Helper
-    if self._user != current_user:
-      if os.geteuid() != 0:
-        raise ValueError('task specifies user as %s, but %s does not have setuid permission!' % (
-          self._user, current_user))
-    self._portmap = portmap or {}
-    self._launch_time = launch_time
-    self._log_dir = log_dir or os.path.join(sandbox, '.logs')
-    self._pathspec = TaskPath(root=checkpoint_root, task_id=self._task_id, log_dir=self._log_dir)
-    try:
-      ThermosTaskValidator.assert_valid_task(task)
-      ThermosTaskValidator.assert_valid_ports(task, self._portmap)
-    except ThermosTaskValidator.InvalidTaskError as e:
-      raise self.InvalidTask('Invalid task: %s' % e)
-    context = ThermosContext(
-        task_id=self._task_id,
-        ports=self._portmap,
-        user=self._user)
-    self._task, uninterp = (task % Environment(thermos=context)).interpolate()
-    if len(uninterp) > 0:
-      raise self.InvalidTask('Failed to interpolate task, missing: %s' %
-          ', '.join(str(ref) for ref in uninterp))
-    try:
-      ThermosTaskValidator.assert_same_task(self._pathspec, self._task)
-    except ThermosTaskValidator.InvalidTaskError as e:
-      raise self.InvalidTask('Invalid task: %s' % e)
-    self._plan = None # plan currently being executed (updated by Handlers)
-    self._regular_plan = planner_class(self._task, clock=clock,
-        process_filter=lambda proc: proc.final().get() == False)
-    self._finalizing_plan = planner_class(self._task, clock=clock,
-        process_filter=lambda proc: proc.final().get() == True)
-    self._chroot = chroot
-    self._sandbox = sandbox
-    self._terminal_state = None
-    self._ckpt = None
-    self._process_map = dict((p.name().get(), p) for p in self._task.processes())
-    self._task_processes = {}
-    self._stages = dict((state, stage(self)) for state, stage in self.STAGES.items())
-    self._finalization_start = None
-    self._preemption_deadline = None
-    self._watcher = ProcessMuxer(self._pathspec)
-    self._state   = RunnerState(processes = {})
-
-    # create runner state
-    universal_handler = universal_handler or TaskRunnerUniversalHandler
-    self._dispatcher = CheckpointDispatcher()
-    self._dispatcher.register_handler(universal_handler(self))
-    self._dispatcher.register_handler(TaskRunnerProcessHandler(self))
-    self._dispatcher.register_handler(TaskRunnerTaskHandler(self))
-
-    # recover checkpointed runner state and update plan
-    self._recovery = True
-    self._replay_runner_ckpt()
-
-  @property
-  def task(self):
-    return self._task
-
-  @property
-  def task_id(self):
-    return self._task_id
-
-  @property
-  def state(self):
-    return self._state
-
-  @property
-  def processes(self):
-    return self._task_processes
-
-  def task_state(self):
-    return self._state.statuses[-1].state if self._state.statuses else TaskState.ACTIVE
-
-  def close_ckpt(self):
-    """Force close the checkpoint stream.  This is necessary for runners terminated through
-       exception propagation."""
-    log.debug('Closing the checkpoint stream.')
-    self._ckpt.close()
-
-  @contextmanager
-  def control(self, force=False):
-    """
-      Bind to the checkpoint associated with this task, position to the end of the log if
-      it exists, or create it if it doesn't.  Fails if we cannot get "leadership" i.e. a
-      file lock on the checkpoint stream.
-    """
-    if self.is_terminal():
-      raise TaskRunner.StateError('Cannot take control of a task in terminal state.')
-    if self._sandbox:
-      safe_mkdir(self._sandbox)
-    ckpt_file = self._pathspec.getpath('runner_checkpoint')
-    try:
-      self._ckpt = TaskRunnerHelper.open_checkpoint(ckpt_file, force=force, state=self._state)
-    except TaskRunnerHelper.PermissionError:
-      raise TaskRunner.PermissionError('Unable to open checkpoint %s' % ckpt_file)
-    log.debug('Flipping recovery mode off.')
-    self._recovery = False
-    self._set_task_status(self.task_state())
-    self._resume_task()
-    try:
-      yield
-    except Exception as e:
-      log.error('Caught exception in self.control(): %s' % e)
-      log.error('  %s' % traceback.format_exc())
-    self._ckpt.close()
-
-  def _resume_task(self):
-    assert self._ckpt is not None
-    unapplied_updates = self._replay_process_ckpts()
-    if self.is_terminal():
-      raise self.StateError('Cannot resume terminal task.')
-    self._initialize_ckpt_header()
-    self._replay(unapplied_updates)
-
-  def _ckpt_write(self, record):
-    """
-      Write to the checkpoint stream if we're not in recovery mode.
-    """
-    if not self._recovery:
-      self._ckpt.write(record)
-
-  def _replay(self, checkpoints):
-    """
-      Replay a sequence of RunnerCkpts.
-    """
-    for checkpoint in checkpoints:
-      self._dispatcher.dispatch(self._state, checkpoint)
-
-  def _replay_runner_ckpt(self):
-    """
-      Replay the checkpoint stream associated with this task.
-    """
-    ckpt_file = self._pathspec.getpath('runner_checkpoint')
-    if os.path.exists(ckpt_file):
-      fp = open(ckpt_file, "r")
-      ckpt_recover = ThriftRecordReader(fp, RunnerCkpt)
-      for record in ckpt_recover:
-        log.debug('Replaying runner checkpoint record: %s' % record)
-        self._dispatcher.dispatch(self._state, record, recovery=True)
-      ckpt_recover.close()
-
-  def _replay_process_ckpts(self):
-    """
-      Replay the unmutating process checkpoints.  Return the unapplied process updates that
-      would mutate the runner checkpoint stream.
-    """
-    process_updates = self._watcher.select()
-    unapplied_process_updates = []
-    for process_update in process_updates:
-      if self._dispatcher.would_update(self._state, process_update):
-        unapplied_process_updates.append(process_update)
-      else:
-        self._dispatcher.dispatch(self._state, process_update, recovery=True)
-    return unapplied_process_updates
-
-  def _initialize_ckpt_header(self):
-    """
-      Initializes the RunnerHeader for this checkpoint stream if it has not already
-      been constructed.
-    """
-    if self._state.header is None:
-      header = RunnerHeader(
-        task_id=self._task_id,
-        launch_time_ms=int(self._launch_time*1000),
-        sandbox=self._sandbox,
-        log_dir=self._log_dir,
-        hostname=socket.gethostname(),
-        user=self._user,
-        ports=self._portmap)
-      runner_ckpt = RunnerCkpt(runner_header=header)
-      self._dispatcher.dispatch(self._state, runner_ckpt)
-
-  def _set_task_status(self, state):
-    update = TaskStatus(state=state, timestamp_ms=int(self._clock.time() * 1000),
-                        runner_pid=os.getpid(), runner_uid=os.getuid())
-    runner_ckpt = RunnerCkpt(task_status=update)
-    self._dispatcher.dispatch(self._state, runner_ckpt, self._recovery)
-
-  def _finalization_remaining(self):
-    # If a preemption deadline has been set, use that.
-    if self._preemption_deadline:
-      return max(0, self._preemption_deadline - self._clock.time())
-
-    # Otherwise, use the finalization wait provided in the configuration.
-    finalization_allocation = self.task.finalization_wait().get()
-    if self._finalization_start is None:
-      return sys.float_info.max
-    else:
-     waited = max(0, self._clock.time() - self._finalization_start)
-     return max(0, finalization_allocation - waited)
-
-  def _set_process_status(self, process_name, process_state, **kw):
-    if 'sequence_number' in kw:
-      sequence_number = kw.pop('sequence_number')
-      log.debug('_set_process_status(%s <= %s, seq=%s[force])' % (process_name,
-        ProcessState._VALUES_TO_NAMES.get(process_state), sequence_number))
-    else:
-      current_run = self._current_process_run(process_name)
-      if not current_run:
-        assert process_state == ProcessState.WAITING
-        sequence_number = 0
-      else:
-        sequence_number = current_run.seq + 1
-      log.debug('_set_process_status(%s <= %s, seq=%s[auto])' % (process_name,
-        ProcessState._VALUES_TO_NAMES.get(process_state), sequence_number))
-    runner_ckpt = RunnerCkpt(process_status=ProcessStatus(
-      process=process_name, state=process_state, seq=sequence_number, **kw))
-    self._dispatcher.dispatch(self._state, runner_ckpt, self._recovery)
-
-  def _task_process_from_process_name(self, process_name, sequence_number):
-    """
-      Construct a Process() object from a process_name, populated with its
-      correct run number and fully interpolated commandline.
-    """
-    run_number = len(self.state.processes[process_name]) - 1
-    pathspec = self._pathspec.given(process=process_name, run=run_number)
-    process = self._process_map.get(process_name)
-    if process is None:
-      raise self.InternalError('FATAL: Could not find process: %s' % process_name)
-    def close_ckpt_and_fork():
-      pid = os.fork()
-      if pid == 0 and self._ckpt is not None:
-        self._ckpt.close()
-      return pid
-    return Process(
-      process.name().get(),
-      process.cmdline().get(),
-      sequence_number,
-      pathspec,
-      self._sandbox,
-      self._user,
-      chroot=self._chroot,
-      fork=close_ckpt_and_fork)
-
-  def deadlocked(self, plan=None):
-    """Check whether a plan is deadlocked, i.e. there are no running/runnable processes, and the
-    plan is not complete."""
-    plan = plan or self._regular_plan
-    now = self._clock.time()
-    running = list(plan.running)
-    runnable = list(plan.runnable_at(now))
-    waiting = list(plan.waiting_at(now))
-    log.debug('running:%d runnable:%d waiting:%d complete:%s' % (
-      len(running), len(runnable), len(waiting), plan.is_complete()))
-    return len(running + runnable + waiting) == 0 and not plan.is_complete()
-
-  def is_healthy(self):
-    """Check whether the TaskRunner is healthy. A healthy TaskRunner is not deadlocked and has not
-    reached its max_failures count."""
-    max_failures = self._task.max_failures().get()
-    deadlocked = self.deadlocked()
-    under_failure_limit = max_failures == 0 or len(self._regular_plan.failed) < max_failures
-    log.debug('max_failures:%d failed:%d under_failure_limit:%s deadlocked:%s ==> health:%s' % (
-      max_failures, len(self._regular_plan.failed), under_failure_limit, deadlocked,
-      not deadlocked and under_failure_limit))
-    return not deadlocked and under_failure_limit
-
-  def _current_process_run(self, process_name):
-    if process_name not in self._state.processes or len(self._state.processes[process_name]) == 0:
-      return None
-    return self._state.processes[process_name][-1]
-
-  def is_process_lost(self, process_name):
-    """Determine whether or not we should mark a task as LOST and do so if necessary."""
-    current_run = self._current_process_run(process_name)
-    if not current_run:
-      raise self.InternalError('No current_run for process %s!' % process_name)
-
-    def forked_but_never_came_up():
-      return current_run.state == ProcessState.FORKED and (
-        self._clock.time() - current_run.fork_time > TaskRunner.LOST_TIMEOUT.as_(Time.SECONDS))
-
-    def running_but_coordinator_died():
-      if current_run.state != ProcessState.RUNNING:
-        return False
-      coordinator_pid, _, _ = TaskRunnerHelper.scan_process(self.state, process_name)
-      if coordinator_pid is not None:
-        return False
-      elif self._watcher.has_data(process_name):
-        return False
-      return True
-
-    if forked_but_never_came_up() or running_but_coordinator_died():
-      log.info('Detected a LOST task: %s' % current_run)
-      log.debug('  forked_but_never_came_up: %s' % forked_but_never_came_up())
-      log.debug('  running_but_coordinator_died: %s' % running_but_coordinator_died())
-      return True
-
-    return False
-
-  def _run_plan(self, plan):
-    log.debug('Schedule pass:')
-
-    running = list(plan.running)
-    log.debug('running: %s' % ' '.join(plan.running))
-    log.debug('finished: %s' % ' '.join(plan.finished))
-
-    launched = []
-    for process_name in plan.running:
-      if self.is_process_lost(process_name):
-        self._set_process_status(process_name, ProcessState.LOST)
-
-    now = self._clock.time()
-    runnable = list(plan.runnable_at(now))
-    waiting = list(plan.waiting_at(now))
-    log.debug('runnable: %s' % ' '.join(runnable))
-    log.debug('waiting: %s' % ' '.join(
-        '%s[T-%.1fs]' % (process, plan.get_wait(process)) for process in waiting))
-
-    def pick_processes(process_list):
-      if self._task.max_concurrency().get() == 0:
-        return process_list
-      num_to_pick = max(self._task.max_concurrency().get() - len(running), 0)
-      return process_list[:num_to_pick]
-
-    for process_name in pick_processes(runnable):
-      tp = self._task_processes.get(process_name)
-      if tp:
-        current_run = self._current_process_run(process_name)
-        assert current_run.state == ProcessState.WAITING
-      else:
-        self._set_process_status(process_name, ProcessState.WAITING)
-        tp = self._task_processes[process_name]
-      log.info('Forking Process(%s)' % process_name)
-      tp.start()
-      launched.append(tp)
-
-    return len(launched) > 0
-
-  def _terminate_plan(self, plan):
-    for process in plan.running:
-      last_run = self._current_process_run(process)
-      if last_run and last_run.state in (ProcessState.FORKED, ProcessState.RUNNING):
-        TaskRunnerHelper.terminate_process(self.state, process)
-
-  def has_running_processes(self):
-    """
-      Returns True if any processes associated with this task have active pids.
-    """
-    process_tree = TaskRunnerHelper.scantree(self.state)
-    return any(any(process_set) for process_set in process_tree.values())
-
-  def has_active_processes(self):
-    """
-      Returns True if any processes are in non-terminal states.
-    """
-    return any(not TaskRunnerHelper.is_process_terminal(run.state) for run in
-        filter(None, (self._current_process_run(process) for process in self.state.processes)))
-
-  def collect_updates(self, timeout=None):
-    """
-      Collects and applies updates from process checkpoint streams.  Returns the number
-      of applied process checkpoints.
-    """
-    if self.has_active_processes():
-      sleep_interval = self.COORDINATOR_INTERVAL_SLEEP.as_(Time.SECONDS)
-      total_time = 0.0
-      while True:
-        process_updates = self._watcher.select()
-        for process_update in process_updates:
-          self._dispatcher.dispatch(self._state, process_update, self._recovery)
-        if process_updates:
-          return len(process_updates)
-        if timeout and total_time >= timeout:
-          break
-        total_time += sleep_interval
-        self._clock.sleep(sleep_interval)
-    return 0
-
-  def is_terminal(self):
-    return TaskRunnerHelper.is_task_terminal(self.task_state())
-
-  def terminal_state(self):
-    if self._terminal_state:
-      log.debug('Forced terminal state: %s' %
-          TaskState._VALUES_TO_NAMES.get(self._terminal_state, 'UNKNOWN'))
-      return self._terminal_state
-    else:
-      return TaskState.SUCCESS if self.is_healthy() else TaskState.FAILED
-
-  def run(self, force=False):
-    """
-      Entrypoint to runner. Assume control of checkpoint stream, and execute TaskRunnerStages
-      until runner is terminal.
-    """
-    if self.is_terminal():
-      return
-    with self.control(force):
-      self._run()
-
-  def _run(self):
-    iteration_time = self.MAX_ITERATION_TIME.as_(Time.SECONDS)
-    while not self.is_terminal():
-      start = self._clock.time()
-      # step 1: execute stage corresponding to the state we're currently in
-      runner = self._stages[self.task_state()]
-      iteration_wait = runner.run()
-      if iteration_wait is None:
-        log.debug('Run loop: No more work to be done in state %s' %
-            TaskState._VALUES_TO_NAMES.get(self.task_state(), 'UNKNOWN'))
-        self._set_task_status(runner.transition_to())
-        continue
-      log.debug('Run loop: Work to be done within %.1fs' % iteration_wait)
-      # step 2: check child process checkpoint streams for updates
-      if not self.collect_updates(iteration_wait):
-        # If we don't collect any updates, at least 'touch' the checkpoint stream
-        # so as to prevent garbage collection.
-        elapsed = self._clock.time() - start
-        if elapsed < iteration_wait:
-          log.debug('Update collection only took %.1fs, idling %.1fs' % (
-              elapsed, iteration_wait - elapsed))
-          self._clock.sleep(iteration_wait - elapsed)
-        log.debug('Run loop: No updates collected, touching checkpoint.')
-        os.utime(self._pathspec.getpath('runner_checkpoint'), None)
-      # step 3: reap any zombie child processes
-      TaskRunnerHelper.reap_children()
-
-  def kill(self, force=False, terminal_status=TaskState.KILLED,
-           preemption_wait=Amount(1, Time.MINUTES)):
-    """
-      Kill all processes associated with this task and set task/process states as terminal_status
-      (defaults to KILLED)
-    """
-    log.debug('Runner issued kill: force:%s, preemption_wait:%s' % (
-      force, preemption_wait))
-    assert terminal_status in (TaskState.KILLED, TaskState.LOST)
-    self._preemption_deadline = self._clock.time() + preemption_wait.as_(Time.SECONDS)
-    with self.control(force):
-      if self.is_terminal():
-        log.warning('Task is not in ACTIVE state, cannot issue kill.')
-        return
-      self._terminal_state = terminal_status
-      if self.task_state() == TaskState.ACTIVE:
-        self._set_task_status(TaskState.CLEANING)
-      self._run()
-
-  def lose(self, force=False):
-    """
-      Mark a task as LOST and kill any straggling processes.
-    """
-    self.kill(force, preemption_wait=Amount(0, Time.SECONDS), terminal_status=TaskState.LOST)
-
-  def _kill(self):
-    processes = TaskRunnerHelper.scantree(self._state)
-    for process, pid_tuple in processes.items():
-      current_run = self._current_process_run(process)
-      coordinator_pid, pid, tree = pid_tuple
-      if TaskRunnerHelper.is_process_terminal(current_run.state):
-        if coordinator_pid or pid or tree:
-          log.warning('Terminal process (%s) still has running pids:' % process)
-          log.warning('  coordinator_pid: %s' % coordinator_pid)
-          log.warning('              pid: %s' % pid)
-          log.warning('             tree: %s' % tree)
-        TaskRunnerHelper.kill_process(self.state, process)
-      else:
-        if coordinator_pid or pid or tree:
-          log.info('Transitioning %s to KILLED' % process)
-          self._set_process_status(process, ProcessState.KILLED,
-            stop_time=self._clock.time(), return_code=-1)
-        else:
-          log.info('Transitioning %s to LOST' % process)
-          if current_run.state != ProcessState.WAITING:
-            self._set_process_status(process, ProcessState.LOST)

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/twitter/thermos/monitoring/BUILD
----------------------------------------------------------------------
diff --git a/src/main/python/twitter/thermos/monitoring/BUILD b/src/main/python/twitter/thermos/monitoring/BUILD
deleted file mode 100644
index a977878..0000000
--- a/src/main/python/twitter/thermos/monitoring/BUILD
+++ /dev/null
@@ -1,97 +0,0 @@
-import os
-
-python_library(
-  name = 'detector',
-  sources = ['detector.py'],
-  dependencies = [
-    pants('src/main/python/twitter/thermos/common:path')
-  ]
-)
-
-python_library(
-  name = 'garbage',
-  sources = ['garbage.py'],
-  dependencies = [
-    pants(':detector'),
-    pants('aurora/twitterdeps/src/python/twitter/common/dirutil'),
-    pants('aurora/twitterdeps/src/python/twitter/common/lang'),
-    pants('aurora/twitterdeps/src/python/twitter/common/quantity'),
-    pants('src/main/python/twitter/thermos/common:ckpt'),
-    pants('src/main/python/twitter/thermos/common:path'),
-  ]
-)
-
-python_library(
-  name = 'monitor',
-  sources = ['monitor.py'],
-  dependencies = [
-    pants('aurora/twitterdeps/src/python/twitter/common/log'),
-    pants('aurora/twitterdeps/src/python/twitter/common/recordio:recordio-thrift'),
-    pants('src/main/python/twitter/thermos/common:ckpt'),
-    pants('src/main/thrift/com/twitter/thermos:py-thrift'),
-  ]
-)
-
-python_library(
-  name = 'disk',
-  sources = ['disk.py'],
-  dependencies = [
-    pants('aurora/twitterdeps/src/python/twitter/common/dirutil'),
-    pants('aurora/twitterdeps/src/python/twitter/common/exceptions'),
-    pants('aurora/twitterdeps/src/python/twitter/common/lang'),
-    pants('aurora/twitterdeps/src/python/twitter/common/log'),
-    pants('aurora/twitterdeps/src/python/twitter/common/quantity'),
-    python_requirement('watchdog'),
-  ]
-)
-
-python_library(
-  name = 'process',
-  sources = ['process.py'],
-)
-
-python_library(
-  name = 'process_collector_psutil',
-  sources = ['process_collector_psutil.py'],
-  dependencies = [
-    pants(':process'),
-    pants('aurora/twitterdeps/src/python/twitter/common/log'),
-    pants('src/main/python/twitter/thermos:psutil'),
-  ]
-)
-
-python_library(
-  name = 'resource',
-  sources = ['resource.py'],
-  dependencies = [
-    pants(':disk'),
-    pants(':monitor'),
-    pants(':process'),
-    pants(':process_collector_psutil'),
-    pants('aurora/twitterdeps/src/python/twitter/common/collections'),
-    pants('aurora/twitterdeps/src/python/twitter/common/concurrent'),
-    pants('aurora/twitterdeps/src/python/twitter/common/lang'),
-    pants('aurora/twitterdeps/src/python/twitter/common/log'),
-    pants('aurora/twitterdeps/src/python/twitter/common/quantity'),
-  ]
-)
-
-python_library(
-  name = 'monitoring',
-  dependencies = [
-    pants(':detector'),
-    pants(':disk'),
-    pants(':garbage'),
-    pants(':monitor'),
-    pants(':process'),
-    pants(':resource'),
-
-    # covering dependency for common
-    pants('src/main/python/twitter/thermos/common'),
-  ],
-  provides = setup_py(
-    name = 'twitter.thermos.monitoring',
-    version = open(os.path.join(get_buildroot(), '.auroraversion')).read().strip().lower(),
-    description = 'Thermos monitoring library.',
-  )
-)

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/twitter/thermos/monitoring/__init__.py
----------------------------------------------------------------------
diff --git a/src/main/python/twitter/thermos/monitoring/__init__.py b/src/main/python/twitter/thermos/monitoring/__init__.py
deleted file mode 100644
index e69de29..0000000

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/twitter/thermos/monitoring/detector.py
----------------------------------------------------------------------
diff --git a/src/main/python/twitter/thermos/monitoring/detector.py b/src/main/python/twitter/thermos/monitoring/detector.py
deleted file mode 100644
index ed48c93..0000000
--- a/src/main/python/twitter/thermos/monitoring/detector.py
+++ /dev/null
@@ -1,91 +0,0 @@
-"""Detect Thermos tasks on disk
-
-This module contains the TaskDetector, used to detect Thermos tasks within a given checkpoint root.
-
-"""
-
-import glob
-import os
-import re
-
-from twitter.thermos.common.path import TaskPath
-
-
-class TaskDetector(object):
-  """
-    Helper class in front of TaskPath to detect active/finished/running tasks. Performs no
-    introspection on the state of a task; merely detects based on file paths on disk.
-  """
-  class MatchingError(Exception): pass
-
-  def __init__(self, root):
-    self._root_dir = root
-    self._pathspec = TaskPath()
-
-  def get_task_ids(self, state=None):
-    paths = glob.glob(self._pathspec.given(root=self._root_dir,
-                                           task_id="*",
-                                           state=state or '*')
-                                    .getpath('task_path'))
-    path_re = re.compile(self._pathspec.given(root=re.escape(self._root_dir),
-                                              task_id="(\S+)",
-                                              state='(\S+)')
-                                       .getpath('task_path'))
-    for path in paths:
-      try:
-        task_state, task_id = path_re.match(path).groups()
-      except:
-        continue
-      if state is None or task_state == state:
-        yield (task_state, task_id)
-
-  def get_process_runs(self, task_id, log_dir):
-    paths = glob.glob(self._pathspec.given(root=self._root_dir,
-                                           task_id=task_id,
-                                           log_dir=log_dir,
-                                           process='*',
-                                           run='*')
-                                    .getpath('process_logdir'))
-    path_re = re.compile(self._pathspec.given(root=re.escape(self._root_dir),
-                                              task_id=re.escape(task_id),
-                                              log_dir=log_dir,
-                                              process='(\S+)',
-                                              run='(\d+)')
-                                       .getpath('process_logdir'))
-    for path in paths:
-      try:
-        process, run = path_re.match(path).groups()
-      except:
-        continue
-      yield process, int(run)
-
-  def get_process_logs(self, task_id, log_dir):
-    for process, run in self.get_process_runs(task_id, log_dir):
-      for logtype in ('stdout', 'stderr'):
-        path = (self._pathspec.with_filename(logtype).given(root=self._root_dir,
-                                                           task_id=task_id,
-                                                           log_dir=log_dir,
-                                                           process=process,
-                                                           run=run)
-                                                     .getpath('process_logdir'))
-        if os.path.exists(path):
-          yield path
-
-  def get_checkpoint(self, task_id):
-    return self._pathspec.given(root=self._root_dir, task_id=task_id).getpath('runner_checkpoint')
-
-  def get_process_checkpoints(self, task_id):
-    matching_paths = glob.glob(self._pathspec.given(root=self._root_dir,
-                                                    task_id=task_id,
-                                                    process='*')
-                                             .getpath('process_checkpoint'))
-    path_re = re.compile(self._pathspec.given(root=re.escape(self._root_dir),
-                                              task_id=re.escape(task_id),
-                                              process='(\S+)')
-                                       .getpath('process_checkpoint'))
-    for path in matching_paths:
-      try:
-        process, = path_re.match(path).groups()
-      except:
-        continue
-      yield path

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/twitter/thermos/monitoring/disk.py
----------------------------------------------------------------------
diff --git a/src/main/python/twitter/thermos/monitoring/disk.py b/src/main/python/twitter/thermos/monitoring/disk.py
deleted file mode 100644
index 9116746..0000000
--- a/src/main/python/twitter/thermos/monitoring/disk.py
+++ /dev/null
@@ -1,181 +0,0 @@
-"""Sample disk usage under a particular path
-
-This module provides threads which can be used to gather information on the disk utilisation
-under a particular path.
-
-Currently, there are two threads available:
-  - DiskCollectorThread, which periodically uses a basic brute-force approach (os.stat()ing every
-    file within the path)
-  - InotifyDiskCollectorThread, which updates disk utilisation dynamically by using inotify to
-    monitor disk changes within the path
-
-"""
-
-import os
-import threading
-import time
-from Queue import Empty, Queue
-
-from twitter.common import log
-from twitter.common.dirutil import du, safe_bsize
-from twitter.common.exceptions import ExceptionalThread
-from twitter.common.lang import Lockable
-from twitter.common.quantity import Amount, Time
-
-from watchdog.observers import Observer as WatchdogObserver
-from watchdog.events import (
-  FileSystemEventHandler,
-  FileCreatedEvent,
-  FileDeletedEvent,
-  FileModifiedEvent,
-  FileMovedEvent,
-)
-
-
-class DiskCollectorThread(ExceptionalThread):
-  """ Thread to calculate aggregate disk usage under a given path using a simple algorithm """
-  def __init__(self, path):
-    self.path = path
-    self.value = None
-    self.event = threading.Event()
-    super(DiskCollectorThread, self).__init__()
-    self.daemon = True
-
-  def run(self):
-    log.debug("DiskCollectorThread: starting collection of %s" % self.path)
-    self.value = du(self.path)
-    log.debug("DiskCollectorThread: finished collection of %s" % self.path)
-    self.event.set()
-
-  def finished(self):
-    return self.event.is_set()
-
-
-class DiskCollector(Lockable):
-  """ Spawn a background thread to sample disk usage """
-  def __init__(self, root):
-    self._root = root
-    self._thread = None
-    self._value = 0
-    super(DiskCollector, self).__init__()
-
-  @Lockable.sync
-  def sample(self):
-    """ Trigger collection of sample, if not already begun """
-    if self._thread is None:
-      self._thread = DiskCollectorThread(self._root)
-      self._thread.start()
-
-  @property
-  @Lockable.sync
-  def value(self):
-    """ Retrieve value of disk usage """
-    if self._thread is not None and self._thread.finished():
-      self._value = self._thread.value
-      self._thread = None
-    return self._value
-
-  @property
-  @Lockable.sync
-  def completed_event(self):
-    """ Return a threading.Event that will block until an in-progress disk collection is complete,
-    or block indefinitely otherwise. Use with caution! (i.e.: set a timeout) """
-    if self._thread is not None:
-      return self._thread.event
-    else:
-      return threading.Event()
-
-
-class InotifyDiskCollectorThread(ExceptionalThread, FileSystemEventHandler):
-  """ Thread to calculate aggregate disk usage under a given path
-
-    Note that while this thread uses inotify (through the watchdog module) to monitor disk events in
-    "real time", the actual processing of events is only performed periodically (configured via
-    COLLECTION_INTERVAL)
-
-  """
-  INTERESTING_EVENTS = (FileCreatedEvent, FileDeletedEvent, FileModifiedEvent, FileMovedEvent)
-  COLLECTION_INTERVAL = Amount(5, Time.SECONDS)
-
-  def __init__(self, path):
-    self._path = path
-    self._files = {}   # file path => size (bytes)
-    self._queue = Queue()
-    self._observer = WatchdogObserver()
-    super(InotifyDiskCollectorThread, self).__init__()
-    self.daemon = True
-
-  def dispatch(self, event):
-    """ Dispatch all interesting events to the internal queue """
-    if isinstance(event, self.INTERESTING_EVENTS):
-      self._queue.put(event)
-
-  def _initialize(self):
-    """ Collect an initial snapshot of the disk usage in the path """
-    log.debug("Starting watchdog observer to collect events...")
-    self._observer.schedule(self, path=self._path, recursive=True)
-    self._observer.start()
-    log.debug("Collecting initial disk usage sample...")
-    for root, _, files in os.walk(self._path):
-      for filename in files:
-        f = os.path.join(root, filename)
-        self._files[f] = safe_bsize(f)
-
-  def _process_events(self):
-    """ Deduplicate and process watchdog events, updating the internal file store appropriately """
-    file_ops = {}
-
-    def remove_file(path):
-      self._files.pop(path, None)
-    def stat_file(path):
-      self._files[path] = safe_bsize(path)
-
-    while not self._to_process.empty():
-      event = self._to_process.get()
-      if isinstance(event, (FileCreatedEvent, FileModifiedEvent)):
-        file_ops[event.src_path] = lambda: stat_file(event.src_path)
-      elif isinstance(event, FileDeletedEvent):
-        file_ops[event.src_path] = lambda: remove_file(event.src_path)
-      elif isinstance(event, FileMovedEvent):
-        file_ops[event.src_path] = lambda: remove_file(event.src_path)
-        file_ops[event.dest_path] = lambda: stat_file(event.dest_path)
-
-    for op in file_ops.values():
-      op()
-
-  def run(self):
-    """ Loop indefinitely, periodically processing watchdog/inotify events. """
-    self._initialize()
-    log.debug("Initialization complete. Moving to handling events.")
-    while True:
-      next = time.time() + self.COLLECTION_INTERVAL.as_(Time.SECONDS)
-      if not self._queue.empty():
-        self._to_process, self._queue = self._queue, Queue()
-        self._process_events()
-      time.sleep(max(0, next - time.time()))
-
-  @property
-  def value(self):
-    return sum(self._files.itervalues())
-
-
-class InotifyDiskCollector(object):
-  """ Spawn a background thread to sample disk usage """
-  def __init__(self, root):
-    self._root = root
-    self._thread = InotifyDiskCollectorThread(self._root)
-
-  def sample(self):
-    """ Trigger disk collection loop. """
-    if not os.path.exists(self._root):
-      log.error('Cannot start monitoring path until it exists')
-    elif not self._thread.is_alive():
-      self._thread.start()
-
-  @property
-  def value(self):
-    return self._thread.value
-
-  @property
-  def completed_event(self):
-    return threading.Event()

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/twitter/thermos/monitoring/garbage.py
----------------------------------------------------------------------
diff --git a/src/main/python/twitter/thermos/monitoring/garbage.py b/src/main/python/twitter/thermos/monitoring/garbage.py
deleted file mode 100644
index 4ac0d3a..0000000
--- a/src/main/python/twitter/thermos/monitoring/garbage.py
+++ /dev/null
@@ -1,183 +0,0 @@
-from abc import abstractmethod
-from collections import namedtuple
-import os
-import sys
-import time
-
-from twitter.common.dirutil import safe_delete, safe_rmtree, safe_bsize
-from twitter.common.lang import Interface
-from twitter.common.quantity import Amount, Data, Time
-from twitter.thermos.common.ckpt import CheckpointDispatcher
-from twitter.thermos.common.path import TaskPath
-
-from .detector import TaskDetector
-
-
-class TaskGarbageCollector(object):
-  def __init__(self, root):
-    self._root = root
-    self._detector = TaskDetector(root=self._root)
-    self._states = {}
-
-  def state(self, task_id):
-    if task_id not in self._states:
-      self._states[task_id] = CheckpointDispatcher.from_file(self._detector.get_checkpoint(task_id))
-    return self._states[task_id]
-
-  def get_age(self, task_id):
-    return os.path.getmtime(self._detector.get_checkpoint(task_id))
-
-  def get_finished_tasks(self):
-    return [task_id for _, task_id in self._detector.get_task_ids(state='finished')]
-
-  def get_metadata(self, task_id, with_size=True):
-    runner_ckpt = self._detector.get_checkpoint(task_id)
-    process_ckpts = [ckpt for ckpt in self._detector.get_process_checkpoints(task_id)]
-    json_spec = TaskPath(root=self._root, task_id=task_id, state='finished').getpath('task_path')
-    for path in [json_spec, runner_ckpt] + process_ckpts:
-      if with_size:
-        yield path, safe_bsize(path)
-      else:
-        yield path
-
-  def get_logs(self, task_id, with_size=True):
-    state = self.state(task_id)
-    if state and state.header:
-      for path in self._detector.get_process_logs(task_id, state.header.log_dir):
-        if with_size:
-          yield path, safe_bsize(path)
-        else:
-          yield path
-
-  def get_data(self, task_id, with_size=True):
-    state = self.state(task_id)
-    if state and state.header and state.header.sandbox:
-      for root, dirs, files in os.walk(state.header.sandbox):
-        for file in files:
-          filename = os.path.join(root, file)
-          if with_size:
-            yield filename, safe_bsize(filename)
-          else:
-            yield filename
-
-  def erase_task(self, task_id):
-    self.erase_data(task_id)
-    self.erase_logs(task_id)
-    self.erase_metadata(task_id)
-
-  def erase_metadata(self, task_id):
-    for fn in self.get_metadata(task_id, with_size=False):
-      safe_delete(fn)
-    safe_rmtree(TaskPath(root=self._root, task_id=task_id).getpath('checkpoint_path'))
-
-  def erase_logs(self, task_id):
-    for fn in self.get_logs(task_id, with_size=False):
-      safe_delete(fn)
-    state = self.state(task_id)
-    if state and state.header:
-      safe_rmtree(TaskPath(root=self._root, task_id=task_id, log_dir=state.header.log_dir)
-                  .getpath('process_logbase'))
-
-  def erase_data(self, task_id):
-    # TODO(wickman)
-    # This could be potentially dangerous if somebody naively runs their sandboxes in e.g.
-    # $HOME or / or similar.  Perhaps put a guard somewhere?
-    for fn in self.get_data(task_id, with_size=False):
-      os.unlink(fn)
-    state = self.state(task_id)
-    if state and state.header and state.header.sandbox:
-      safe_rmtree(state.header.sandbox)
-
-
-class TaskGarbageCollectionPolicy(Interface):
-  def __init__(self, collector):
-    if not isinstance(collector, TaskGarbageCollector):
-      raise ValueError(
-          "Expected collector to be a TaskGarbageCollector, got %s" % collector.__class__.__name__)
-    self._collector = collector
-
-  @property
-  def collector(self):
-    return self._collector
-
-  @abstractmethod
-  def run(self):
-    """Returns a list of task_ids that should be garbage collected given the specified policy."""
-
-
-class DefaultCollector(TaskGarbageCollectionPolicy):
-  def __init__(self, collector, **kw):
-    """
-      Default garbage collection policy.
-
-      Arguments that may be specified:
-        max_age:   Amount(Time) (max age of a retained task)  [default: infinity]
-        max_space: Amount(Data) (max space to keep)           [default: infinity]
-        max_tasks: int (max number of tasks to keep)          [default: infinity]
-        include_metadata: boolean  (Whether or not to include metadata in the
-          space calculations.)  [default: True]
-        include_logs: boolean  (Whether or not to include logs in the
-          space calculations.)  [default: True]
-        verbose: boolean (whether or not to log)  [default: False]
-        logger: callable (function to call with log messages) [default: sys.stdout.write]
-    """
-    self._max_age = kw.get('max_age', Amount(10**10, Time.DAYS))
-    self._max_space = kw.get('max_space', Amount(10**10, Data.TB))
-    self._max_tasks = kw.get('max_tasks', 10**10)
-    self._include_metadata = kw.get('include_metadata', True)
-    self._include_logs = kw.get('include_logs', True)
-    self._verbose = kw.get('verbose', False)
-    self._logger = kw.get('logger', sys.stdout.write)
-    TaskGarbageCollectionPolicy.__init__(self, collector)
-
-  def log(self, msg):
-    if self._verbose:
-      self._logger(msg)
-
-  def run(self):
-    tasks = []
-    now = time.time()
-
-    TaskTuple = namedtuple('TaskTuple', 'task_id age metadata_size log_size data_size')
-    for task_id in self.collector.get_finished_tasks():
-      age = Amount(int(now - self.collector.get_age(task_id)), Time.SECONDS)
-      self.log('Analyzing task %s (age: %s)... ' % (task_id, age))
-      metadata_size = Amount(sum(sz for _, sz in self.collector.get_metadata(task_id)), Data.BYTES)
-      self.log('  metadata %.1fKB ' % metadata_size.as_(Data.KB))
-      log_size = Amount(sum(sz for _, sz in self.collector.get_logs(task_id)), Data.BYTES)
-      self.log('  logs %.1fKB ' % log_size.as_(Data.KB))
-      data_size = Amount(sum(sz for _, sz in self.collector.get_data(task_id)), Data.BYTES)
-      self.log('  data %.1fMB ' % data_size.as_(Data.MB))
-      tasks.append(TaskTuple(task_id, age, metadata_size, log_size, data_size))
-
-    gc_tasks = set()
-    gc_tasks.update(task for task in tasks if task.age > self._max_age)
-    self.log('After age filter: %s tasks' % len(gc_tasks))
-
-    def total_gc_size(task):
-      return sum([task.data_size,
-                  task.metadata_size if self._include_metadata else Amount(0, Data.BYTES),
-                  task.log_size if self._include_logs else Amount(0, Data.BYTES)],
-                  Amount(0, Data.BYTES))
-
-    total_used = Amount(0, Data.BYTES)
-    for task in sorted(tasks, key=lambda tsk: tsk.age, reverse=True):
-      if task not in gc_tasks:
-        total_used += total_gc_size(task)
-        if total_used > self._max_space:
-          gc_tasks.add(task)
-    self.log('After size filter: %s tasks' % len(gc_tasks))
-
-    for task in sorted(tasks, key=lambda tsk: tsk.age, reverse=True):
-      if task not in gc_tasks and len(tasks) - len(gc_tasks) > self._max_tasks:
-        gc_tasks.add(task)
-    self.log('After total task filter: %s tasks' % len(gc_tasks))
-
-    self.log('Deciding to garbage collect the following tasks:')
-    if gc_tasks:
-      for task in gc_tasks:
-        self.log('   %s' % repr(task))
-    else:
-      self.log('   None.')
-
-    return gc_tasks

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/twitter/thermos/monitoring/monitor.py
----------------------------------------------------------------------
diff --git a/src/main/python/twitter/thermos/monitoring/monitor.py b/src/main/python/twitter/thermos/monitoring/monitor.py
deleted file mode 100644
index 5af67fe..0000000
--- a/src/main/python/twitter/thermos/monitoring/monitor.py
+++ /dev/null
@@ -1,125 +0,0 @@
-"""Monitor the state of Thermos tasks on a system
-
-This module contains the TaskMonitor, used to reconstruct the state of active or finished Thermos
-tasks based on their checkpoint streams. It exposes two key pieces of information about a Task, both
-as their corresponding Thrift structs:
-  - a RunnerState, representing the latest state of the Task
-  - a list of ProcessStates, representing the processes currently running within the Task
-
-"""
-
-import os
-import copy
-import errno
-import threading
-
-from twitter.common import log
-from twitter.common.recordio import ThriftRecordReader
-from twitter.thermos.common.ckpt import CheckpointDispatcher
-
-from gen.twitter.thermos.ttypes import (
-  ProcessState,
-  RunnerCkpt,
-  RunnerState,
-  TaskState,
-)
-
-
-class TaskMonitor(object):
-  """
-    Class responsible for reconstructing and monitoring the state of an individual Thermos task via
-    its runner checkpoint. Also exports information on active processes in the task.
-  """
-
-  def __init__(self, pathspec, task_id):
-    self._task_id = task_id
-    self._dispatcher = CheckpointDispatcher()
-    self._runnerstate = RunnerState(processes={})
-    self._runner_ckpt = pathspec.given(task_id=task_id).getpath('runner_checkpoint')
-    self._active_file, self._finished_file = (
-        pathspec.given(task_id=task_id, state=state).getpath('task_path')
-        for state in ('active', 'finished'))
-    self._ckpt_head = 0
-    self._apply_states()
-    self._lock = threading.Lock()
-
-  def _apply_states(self):
-    """
-      os.stat() the corresponding checkpoint stream of this task and determine if there are new ckpt
-      records.  Attempt to read those records and update the high watermark for that stream.
-      Returns True if new states were applied, False otherwise.
-    """
-    ckpt_offset = None
-    try:
-      ckpt_offset = os.stat(self._runner_ckpt).st_size
-
-      updated = False
-      if self._ckpt_head < ckpt_offset:
-        with open(self._runner_ckpt, 'r') as fp:
-          fp.seek(self._ckpt_head)
-          rr = ThriftRecordReader(fp, RunnerCkpt)
-          while True:
-            runner_update = rr.try_read()
-            if not runner_update:
-              break
-            try:
-              self._dispatcher.dispatch(self._runnerstate, runner_update)
-            except CheckpointDispatcher.InvalidSequenceNumber as e:
-              log.error('Checkpoint stream is corrupt: %s' % e)
-              break
-          new_ckpt_head = fp.tell()
-          updated = self._ckpt_head != new_ckpt_head
-          self._ckpt_head = new_ckpt_head
-      return updated
-    except OSError as e:
-      if e.errno == errno.ENOENT:
-        # The log doesn't yet exist, will retry later.
-        log.warning('Could not read from discovered task %s.' % self._task_id)
-        return False
-      else:
-        raise
-
-  def refresh(self):
-    """
-      Check to see if there are new updates and apply them.  Return true if
-      updates were applied, false otherwise.
-    """
-    with self._lock:
-      return self._apply_states()
-
-  def get_state(self):
-    """
-      Get the latest state of this Task.
-    """
-    with self._lock:
-      self._apply_states()
-      return copy.deepcopy(self._runnerstate)
-
-  def task_state(self):
-    state = self.get_state()
-    return state.statuses[-1].state if state.statuses else TaskState.ACTIVE
-
-  @property
-  def active(self):
-    return os.path.exists(self._active_file)
-
-  @property
-  def finished(self):
-    return os.path.exists(self._finished_file)
-
-  def get_active_processes(self):
-    """
-      Get active processes.  Returned is a list of tuples of the form:
-        (ProcessStatus object of running object, its run number)
-    """
-    active_processes = []
-    with self._lock:
-      self._apply_states()
-      state = self._runnerstate
-      for process, runs in state.processes.items():
-        if len(runs) == 0:
-          continue
-        last_run = runs[-1]
-        if last_run.state == ProcessState.RUNNING:
-          active_processes.append((last_run, len(runs) - 1))
-    return active_processes

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/twitter/thermos/monitoring/process.py
----------------------------------------------------------------------
diff --git a/src/main/python/twitter/thermos/monitoring/process.py b/src/main/python/twitter/thermos/monitoring/process.py
deleted file mode 100644
index 294682f..0000000
--- a/src/main/python/twitter/thermos/monitoring/process.py
+++ /dev/null
@@ -1,49 +0,0 @@
-"""Represent resource consumption statistics for processes
-
-This module exposes one class: the ProcessSample, used to represent resource consumption. A single
-ProcessSample might correspond to one individual process, or to an aggregate of multiple processes.
-
-"""
-
-from collections import namedtuple
-
-
-class ProcessSample(namedtuple('ProcessSample', 'rate user system rss vms nice status threads')):
-  """ Sample of statistics about a process's resource consumption (either a single process or an
-  aggregate of processes) """
-
-  @staticmethod
-  def empty():
-    return ProcessSample(rate=0, user=0, system=0, rss=0, vms=0, nice=None, status=None, threads=0)
-
-  def __add__(self, other):
-    if self.nice is not None and other.nice is None:
-      nice = self.nice
-    else:
-      nice = other.nice
-    if self.status is not None and other.status is None:
-      status = self.status
-    else:
-      status = other.status
-    return ProcessSample(
-      rate = self.rate + other.rate,
-      user = self.user + other.user,
-      system = self.system + other.system,
-      rss = self.rss + other.rss,
-      vms = self.vms + other.vms,
-      nice = nice,
-      status = status,
-      threads = self.threads + other.threads)
-
-  def to_dict(self):
-    return dict(
-      cpu     = self.rate,
-      ram     = self.rss,
-      user    = self.user,
-      system  = self.system,
-      rss     = self.rss,
-      vms     = self.vms,
-      nice    = self.nice,
-      status  = str(self.status),
-      threads = self.threads
-    )

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/twitter/thermos/monitoring/process_collector_psutil.py
----------------------------------------------------------------------
diff --git a/src/main/python/twitter/thermos/monitoring/process_collector_psutil.py b/src/main/python/twitter/thermos/monitoring/process_collector_psutil.py
deleted file mode 100644
index 7b68173..0000000
--- a/src/main/python/twitter/thermos/monitoring/process_collector_psutil.py
+++ /dev/null
@@ -1,92 +0,0 @@
-""" Sample resource consumption statistics for processes using psutil """
-
-from operator import attrgetter
-from time import time
-
-from twitter.common import log
-
-from .process import ProcessSample
-
-from psutil import Process
-from psutil import NoSuchProcess, AccessDenied, Error as PsutilError
-
-
-def process_to_sample(process):
-  """ Given a psutil.Process, return a current ProcessSample """
-  try:
-    # the nonblocking get_cpu_percent call is stateful on a particular Process object, and hence
-    # >2 consecutive calls are required before it will return a non-zero value
-    rate = process.get_cpu_percent(0.0) / 100.0
-    user, system = process.get_cpu_times()
-    rss, vms = process.get_memory_info()
-    nice = process.nice
-    status = process.status
-    threads = process.get_num_threads()
-    return ProcessSample(rate, user, system, rss, vms, nice, status, threads)
-  except (AccessDenied, NoSuchProcess) as e:
-    log.warning('Error during process sampling [pid=%s]: %s' % (process.pid, e))
-    return ProcessSample.empty()
-
-
-class ProcessTreeCollector(object):
-  """ Collect resource consumption statistics for a process and its children """
-  def __init__(self, pid):
-    """ Given a pid """
-    self._pid = pid
-    self._process = None  # psutil.Process
-    self._sampled_tree = {} # pid => ProcessSample
-    self._sample = ProcessSample.empty()
-    self._stamp = None
-    self._rate = 0.0
-    self._procs = 1
-
-  def sample(self):
-    """ Collate and aggregate ProcessSamples for process and children
-        Returns None: result is stored in self.value
-    """
-    try:
-      last_sample, last_stamp = self._sample, self._stamp
-      if self._process is None:
-        self._process = Process(self._pid)
-      parent = self._process
-      parent_sample = process_to_sample(parent)
-      new_samples = dict(
-          (proc.pid, process_to_sample(proc))
-          for proc in parent.get_children(recursive=True)
-      )
-      new_samples[self._pid] = parent_sample
-
-    except PsutilError as e:
-      log.warning('Error during process sampling: %s' % e)
-      self._sample = ProcessSample.empty()
-      self._rate = 0.0
-
-    else:
-      last_stamp = self._stamp
-      self._stamp = time()
-      # for most stats, calculate simple sum to aggregate
-      self._sample = sum(new_samples.values(), ProcessSample.empty())
-      # cpu consumption is more complicated
-      # We require at least 2 generations of a process before we can calculate rate, so for all
-      # current processes that were not running in the previous sample, compare to an empty sample
-      if self._sampled_tree and last_stamp:
-        new = new_samples.values()
-        old = [self._sampled_tree.get(pid, ProcessSample.empty()) for pid in new_samples.keys()]
-        new_user_sys = sum(map(attrgetter('user'), new)) + sum(map(attrgetter('system'), new))
-        old_user_sys = sum(map(attrgetter('user'), old)) + sum(map(attrgetter('system'), old))
-        self._rate = (new_user_sys - old_user_sys) / (self._stamp - last_stamp)
-        log.debug("Calculated rate for pid=%s and children: %s" % (self._process.pid, self._rate))
-      self._sampled_tree = new_samples
-
-  @property
-  def value(self):
-    """ Aggregated ProcessSample representing resource consumption of the tree """
-    # Since we don't trust the CPU consumption returned by psutil, replace it with our own in the
-    # exported ProcessSample
-    return self._sample._replace(rate=self._rate)
-
-  @property
-  def procs(self):
-    """ Number of active processes in the tree """
-    return len(self._sampled_tree)
-

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/twitter/thermos/monitoring/resource.py
----------------------------------------------------------------------
diff --git a/src/main/python/twitter/thermos/monitoring/resource.py b/src/main/python/twitter/thermos/monitoring/resource.py
deleted file mode 100644
index 468f1b1..0000000
--- a/src/main/python/twitter/thermos/monitoring/resource.py
+++ /dev/null
@@ -1,222 +0,0 @@
-"""Monitor the resource consumption of Thermos tasks
-
-This module contains classes used to monitor the resource consumption (e.g. CPU, RAM, disk) of
-Thermos tasks. Resource monitoring of a Thermos task typically treats the task as an aggregate of
-all the processes within it. Importantly, this excludes the process(es) of Thermos itself (i.e. the
-TaskRunner and any other wrappers involved in launching a task).
-
-The ResourceMonitorBase defines the interface for other components (for example, the Thermos
-TaskObserver) to interact with and retrieve information about a Task's resource consumption.  The
-canonical/reference implementation of a ResourceMonitor is the TaskResourceMonitor, a thread which
-actively monitors resources for a particular task by periodically polling process information and
-disk consumption and retaining a limited (FIFO) in-memory history of this data.
-
-"""
-
-from abc import abstractmethod
-from bisect import bisect_left
-from collections import namedtuple
-from operator import attrgetter
-import platform
-import threading
-import time
-
-from twitter.common import log
-from twitter.common.collections import RingBuffer
-from twitter.common.concurrent import EventMuxer
-from twitter.common.lang import Interface
-from twitter.common.quantity import Amount, Time
-
-from .disk import DiskCollector
-from .monitor import TaskMonitor
-from .process import ProcessSample
-from .process_collector_psutil import ProcessTreeCollector
-
-
-class ResourceMonitorBase(Interface):
-  """ Defines the interface for interacting with a ResourceMonitor """
-
-  class Error(Exception): pass
-
-  class ResourceResult(namedtuple('ResourceResult', 'num_procs process_sample disk_usage')):
-    pass
-
-  @abstractmethod
-  def sample(self):
-    """ Return a sample of the resource consumption of the task right now
-
-    Returns a tuple of (timestamp, ResourceResult)
-    """
-
-  @abstractmethod
-  def sample_at(self, time):
-    """ Return a sample of the resource consumption as close as possible to the specified time
-
-    Returns a tuple of (timestamp, ResourceResult)
-    """
-
-  @abstractmethod
-  def sample_by_process(self, process_name):
-    """ Return a sample of the resource consumption of a specific process in the task right now
-
-    Returns a ProcessSample
-    """
-
-
-class ResourceHistory(object):
-  """Simple class to contain a RingBuffer (fixed-length FIFO) history of resource samples, with the
-       mapping: timestamp => (number_of_procs, ProcessSample, disk_usage_in_bytes)
-  """
-  def __init__(self, maxlen, initialize=True):
-    if not maxlen >= 1:
-      raise ValueError("maxlen must be greater than 0")
-    self._maxlen = maxlen
-    self._values = RingBuffer(maxlen, None)
-    if initialize:
-      self.add(time.time(), ResourceMonitorBase.ResourceResult(0, ProcessSample.empty(), 0))
-
-  def add(self, timestamp, value):
-    """Store a new resource sample corresponding to the given timestamp"""
-    if self._values and not timestamp >= self._values[-1][0]:
-      raise ValueError("Refusing to add timestamp in the past!")
-    self._values.append((timestamp, value))
-
-  def get(self, timestamp):
-    """Get the resource sample nearest to the given timestamp"""
-    closest = min(bisect_left(self._values, (timestamp, None)), len(self) - 1)
-    return self._values[closest]
-
-  def __iter__(self):
-    return iter(self._values)
-
-  def __len__(self):
-    return len(self._values)
-
-  def __repr__(self):
-    return 'ResourceHistory(%s)' % ', '.join([str(r) for r in self._values])
-
-
-class TaskResourceMonitor(ResourceMonitorBase, threading.Thread):
-  """ Lightweight thread to aggregate resource consumption for a task's constituent processes.
-      Actual resource calculation is delegated to collectors; this class periodically polls the
-      collectors and aggregates into a representation for the entire task. Also maintains a limited
-      history of previous sample results.
-  """
-
-  MAX_HISTORY = 10000 # magic number
-
-  def __init__(self, task_monitor, sandbox,
-               process_collector=ProcessTreeCollector, disk_collector=DiskCollector,
-               process_collection_interval=Amount(20, Time.SECONDS),
-               disk_collection_interval=Amount(1, Time.MINUTES),
-               history_time=Amount(1, Time.HOURS)):
-    """
-      task_monitor: TaskMonitor object specifying the task whose resources should be monitored
-      sandbox: Directory for which to monitor disk utilisation
-    """
-    self._task_monitor = task_monitor # exposes PIDs, sandbox
-    self._task_id = task_monitor._task_id
-    log.debug('Initialising resource collection for task %s' % self._task_id)
-    self._process_collectors = dict() # ProcessStatus => ProcessTreeCollector
-    # TODO(jon): sandbox is also available through task_monitor, but typically the first checkpoint
-    # isn't written (and hence the header is not available) by the time we initialise here
-    self._sandbox = sandbox
-    self._process_collector_factory = process_collector
-    self._disk_collector = disk_collector(self._sandbox)
-    self._process_collection_interval = process_collection_interval.as_(Time.SECONDS)
-    self._disk_collection_interval = disk_collection_interval.as_(Time.SECONDS)
-    min_collection_interval = min(self._process_collection_interval, self._disk_collection_interval)
-    history_length = int(history_time.as_(Time.SECONDS) / min_collection_interval)
-    if history_length > self.MAX_HISTORY:
-      raise ValueError("Requested history length too large")
-    log.debug("Initialising ResourceHistory of length %s" % history_length)
-    self._history = ResourceHistory(history_length)
-    self._kill_signal = threading.Event()
-    threading.Thread.__init__(self)
-    self.daemon = True
-
-  def sample(self):
-    if not self.is_alive():
-      log.warning("TaskResourceMonitor not running - sample may be inaccurate")
-    return self.sample_at(time.time())
-
-  def sample_at(self, timestamp):
-    return self._history.get(timestamp)
-
-  def sample_by_process(self, process_name):
-    try:
-      process = [process for process in self._get_active_processes()
-                 if process.process == process_name].pop()
-    except IndexError:
-      raise ValueError('No active process found with name "%s" in this task' % process_name)
-    else:
-      # Since this might be called out of band (before the main loop is aware of the process)
-      if process not in self._process_collectors:
-        self._process_collectors[process] = self._process_collector_factory(process.pid)
-
-      self._process_collectors[process].sample()
-      return self._process_collectors[process].value
-
-  def _get_active_processes(self):
-    """Get a list of ProcessStatus objects representing currently-running processes in the task"""
-    return [process for process, _ in self._task_monitor.get_active_processes()]
-
-  def kill(self):
-    """Signal that the thread should cease collecting resources and terminate"""
-    self._kill_signal.set()
-
-  def run(self):
-    """Thread entrypoint. Loop indefinitely, polling collectors at self._collection_interval and
-    collating samples."""
-
-    log.debug('Commencing resource monitoring for task "%s"' % self._task_id)
-    next_process_collection = 0
-    next_disk_collection = 0
-
-    while not self._kill_signal.is_set():
-
-      now = time.time()
-
-      if now > next_process_collection:
-        next_process_collection = now + self._process_collection_interval
-        actives = set(self._get_active_processes())
-        current = set(self._process_collectors)
-        for process in current - actives:
-          log.debug('Process "%s" (pid %s) no longer active, removing from monitored processes' %
-                   (process.process, process.pid))
-          self._process_collectors.pop(process)
-        for process in actives - current:
-          log.debug('Adding process "%s" (pid %s) to resource monitoring' %
-                   (process.process, process.pid))
-          self._process_collectors[process] = self._process_collector_factory(process.pid)
-        for process, collector in self._process_collectors.iteritems():
-          log.debug('Collecting sample for process "%s" (pid %s) and children' %
-                   (process.process, process.pid))
-          collector.sample()
-
-      if now > next_disk_collection:
-        next_disk_collection = now + self._disk_collection_interval
-        log.debug('Collecting disk sample for %s' % self._sandbox)
-        self._disk_collector.sample()
-
-      try:
-        aggregated_procs = sum(map(attrgetter('procs'), self._process_collectors.values()))
-        aggregated_sample = sum(map(attrgetter('value'), self._process_collectors.values()),
-                                ProcessSample.empty())
-        self._history.add(now, self.ResourceResult(aggregated_procs, aggregated_sample,
-                                                   self._disk_collector.value))
-        log.debug("Recorded resource sample at %s" % now)
-      except ValueError as err:
-        log.warning("Error recording resource sample: %s" % err)
-
-      # Sleep until any of the following conditions are met:
-      # - it's time for the next disk collection
-      # - it's time for the next process collection
-      # - the result from the last disk collection is available via the DiskCollector
-      # - the TaskResourceMonitor has been killed via self._kill_signal
-      now = time.time()
-      next_collection = min(next_process_collection - now, next_disk_collection - now)
-      EventMuxer(self._kill_signal, self._disk_collector.completed_event
-                ).wait(timeout=max(0, next_collection))
-
-    log.debug('Stopping resource monitoring for task "%s"' % self._task_id)

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/twitter/thermos/observer/BUILD
----------------------------------------------------------------------
diff --git a/src/main/python/twitter/thermos/observer/BUILD b/src/main/python/twitter/thermos/observer/BUILD
deleted file mode 100644
index 00ea9cb..0000000
--- a/src/main/python/twitter/thermos/observer/BUILD
+++ /dev/null
@@ -1,51 +0,0 @@
-import os
-
-python_library(
-  name = 'observed_task',
-  sources = ['observed_task.py'],
-  dependencies = [
-    pants('aurora/twitterdeps/src/python/twitter/common/lang'),
-    pants('aurora/twitterdeps/src/python/twitter/common/log'),
-    pants('src/main/python/twitter/thermos:pystachio'),
-    pants('src/main/python/twitter/thermos/common:ckpt'),
-    pants('src/main/python/twitter/thermos/config'),
-  ]
-)
-
-python_library(
-  name = 'task_observer',
-  sources = ['task_observer.py'],
-  dependencies = [
-    pants(':observed_task'),
-    pants('aurora/twitterdeps/src/python/twitter/common/exceptions'),
-    pants('aurora/twitterdeps/src/python/twitter/common/lang'),
-    pants('aurora/twitterdeps/src/python/twitter/common/log'),
-    pants('aurora/twitterdeps/src/python/twitter/common/quantity'),
-    pants('src/main/python/twitter/thermos/common:path'),
-    pants('src/main/python/twitter/thermos/monitoring:detector'),
-    pants('src/main/python/twitter/thermos/monitoring:monitor'),
-    pants('src/main/python/twitter/thermos/monitoring:process'),
-    pants('src/main/python/twitter/thermos/monitoring:resource'),
-    pants('src/main/thrift/com/twitter/thermos:py-thrift'),
-  ]
-)
-
-python_library(
-  name = 'observer',
-  dependencies = [
-    pants(':task_observer'),
-    pants('src/main/python/twitter/thermos/observer/http:http_observer'),
-
-    # covering libraries
-    pants('src/main/python/twitter/thermos/common'),
-    pants('src/main/python/twitter/thermos/config'),
-    pants('src/main/python/twitter/thermos/monitoring'),
-  ],
-  provides = setup_py(
-    name = 'twitter.thermos.observer',
-    version = open(os.path.join(get_buildroot(), '.auroraversion')).read().strip().lower(),
-    description = 'The Thermos observer web interface.',
-  ).with_binaries(
-    thermos_observer = pants('src/main/python/twitter/thermos/observer/bin:thermos_observer'),
-  )
-)

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/twitter/thermos/observer/__init__.py
----------------------------------------------------------------------
diff --git a/src/main/python/twitter/thermos/observer/__init__.py b/src/main/python/twitter/thermos/observer/__init__.py
deleted file mode 100644
index e69de29..0000000

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/twitter/thermos/observer/bin/BUILD
----------------------------------------------------------------------
diff --git a/src/main/python/twitter/thermos/observer/bin/BUILD b/src/main/python/twitter/thermos/observer/bin/BUILD
deleted file mode 100644
index 26eb148..0000000
--- a/src/main/python/twitter/thermos/observer/bin/BUILD
+++ /dev/null
@@ -1,14 +0,0 @@
-python_binary(
-  name = 'thermos_observer',
-  source = 'thermos_observer.py',
-  entry_point = 'twitter.thermos.observer.bin.thermos_observer:proxy_main',
-  dependencies = [
-    pants('aurora/twitterdeps/src/python/twitter/common/app'),
-    pants('aurora/twitterdeps/src/python/twitter/common/exceptions'),
-    pants('aurora/twitterdeps/src/python/twitter/common/http'),
-    pants('src/main/python/twitter/thermos:cherrypy'),
-    pants('src/main/python/twitter/thermos/common:path'),
-    pants('src/main/python/twitter/thermos/observer/http:http_observer'),
-    pants('src/main/python/twitter/thermos/observer:task_observer'),
-  ],
-)

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/twitter/thermos/observer/bin/__init__.py
----------------------------------------------------------------------
diff --git a/src/main/python/twitter/thermos/observer/bin/__init__.py b/src/main/python/twitter/thermos/observer/bin/__init__.py
deleted file mode 100644
index e69de29..0000000

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/twitter/thermos/observer/bin/thermos_observer.py
----------------------------------------------------------------------
diff --git a/src/main/python/twitter/thermos/observer/bin/thermos_observer.py b/src/main/python/twitter/thermos/observer/bin/thermos_observer.py
deleted file mode 100644
index 9e7f7a0..0000000
--- a/src/main/python/twitter/thermos/observer/bin/thermos_observer.py
+++ /dev/null
@@ -1,55 +0,0 @@
-from __future__ import print_function
-
-import socket
-import sys
-import time
-
-from twitter.common import app
-from twitter.common.exceptions import ExceptionalThread
-from twitter.common.http import HttpServer
-from twitter.common.http.diagnostics import DiagnosticsEndpoints
-from twitter.thermos.common.path import TaskPath
-from twitter.thermos.observer.task_observer import TaskObserver
-from twitter.thermos.observer.http.http_observer import BottleObserver
-
-
-app.add_option("--root",
-               dest="root",
-               metavar="DIR",
-               default=TaskPath.DEFAULT_CHECKPOINT_ROOT,
-               help="root checkpoint directory for thermos task runners")
-
-
-app.add_option("--port",
-               dest="port",
-               metavar="INT",
-               default=1338,
-               help="port number to listen on.")
-
-
-def proxy_main():
-  def main(args, opts):
-    if args:
-      print("ERROR: unrecognized arguments: %s\n" % (" ".join(args)), file=sys.stderr)
-      app.help()
-      sys.exit(1)
-
-    root_server = HttpServer()
-    root_server.mount_routes(DiagnosticsEndpoints())
-
-    task_observer = TaskObserver(opts.root)
-    task_observer.start()
-
-    bottle_wrapper = BottleObserver(task_observer)
-
-    root_server.mount_routes(bottle_wrapper)
-
-    def run():
-      root_server.run('0.0.0.0', opts.port, 'cherrypy')
-
-    et = ExceptionalThread(target=run)
-    et.daemon = True
-    et.start()
-    et.join()
-
-  app.main()

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/twitter/thermos/observer/http/BUILD
----------------------------------------------------------------------
diff --git a/src/main/python/twitter/thermos/observer/http/BUILD b/src/main/python/twitter/thermos/observer/http/BUILD
deleted file mode 100644
index 9f3d587..0000000
--- a/src/main/python/twitter/thermos/observer/http/BUILD
+++ /dev/null
@@ -1,47 +0,0 @@
-python_library(
-  name = 'json',
-  sources = ['json.py'],
-  dependencies = [
-    pants('aurora/twitterdeps/src/python/twitter/common/http'),
-  ]
-)
-
-python_library(
-  name = 'static_assets',
-  sources = ['static_assets.py'],
-  resources = rglobs('assets/*'),
-  dependencies = [
-    pants('src/main/python/twitter/thermos:bottle'),
-  ]
-)
-
-python_library(
-  name = 'templating',
-  sources = ['templating.py'],
-  resources = globs('templates/*.tpl'),
-)
-
-python_library(
-  name = 'file_browser',
-  sources = ['file_browser.py'],
-  dependencies = [
-    pants(':templating'),
-    pants('aurora/twitterdeps/src/python/twitter/common/log'),
-    pants('aurora/twitterdeps/src/python/twitter/common/http'),
-    pants('src/main/python/twitter/thermos:bottle'),
-    pants('src/main/python/twitter/thermos:mako'),
-  ]
-)
-
-python_library(
-  name = 'http_observer',
-  sources = ['__init__.py', 'http_observer.py'],
-  dependencies = [
-    pants(':file_browser'),
-    pants(':json'),
-    pants(':static_assets'),
-    pants(':templating'),
-    pants('aurora/twitterdeps/src/python/twitter/common/log'),
-    pants('aurora/twitterdeps/src/python/twitter/common/http'),
-  ]
-)

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/twitter/thermos/observer/http/__init__.py
----------------------------------------------------------------------
diff --git a/src/main/python/twitter/thermos/observer/http/__init__.py b/src/main/python/twitter/thermos/observer/http/__init__.py
deleted file mode 100644
index e69de29..0000000