You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aurora.apache.org by wf...@apache.org on 2013/12/31 22:20:02 UTC
[09/51] [partial] Rename twitter* and com.twitter to apache and
org.apache directories to preserve all file history before the refactor.
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/twitter/thermos/core/runner.py
----------------------------------------------------------------------
diff --git a/src/main/python/twitter/thermos/core/runner.py b/src/main/python/twitter/thermos/core/runner.py
deleted file mode 100644
index b10cf16..0000000
--- a/src/main/python/twitter/thermos/core/runner.py
+++ /dev/null
@@ -1,905 +0,0 @@
-""" Thermos runner.
-
-This module contains the TaskRunner, the core component of Thermos responsible for actually running
-tasks. It also contains several Handlers which define the behaviour on state transitions within the
-TaskRunner.
-
-There are three "active" states in a running Thermos task:
- ACTIVE
- CLEANING
- FINALIZING
-
-A task in ACTIVE state is running regular processes. The moment this task succeeds or goes over its
-failure limit, it then goes into CLEANING state, where it begins the staged termination of leftover
-processes (with SIGTERMs). Once all processes have terminated, the task goes into FINALIZING state,
-where the processes marked with the 'final' bit run. Once the task has gone into CLEANING state, it
-has a deadline for going into terminal state. If it doesn't make it in time (at any point, whether
-in CLEANING or FINALIZING state), it is forced into terminal state through SIGKILLs of all live
-processes (coordinators, shells and the full process trees rooted at the shells.)
-
-TaskRunner.kill is implemented by forcing the task into CLEANING state and setting its finalization
-deadline manually. So in practice, we implement Task preemption by calling kill with the
-finalization deadline = now + preemption wait, which gives the Task an opportunity to do graceful
-shutdown. If preemption_wait=0, it will result in immediate SIGKILLs and then transition to the
-terminal state.
-
-"""
-
-from contextlib import contextmanager
-import errno
-from functools import partial
-import os
-import socket
-import sys
-import time
-import traceback
-
-from twitter.common import log
-from twitter.common.dirutil import safe_mkdir
-from twitter.common.quantity import Amount, Time
-from twitter.common.recordio import ThriftRecordReader
-
-from twitter.thermos.common.ckpt import (
- CheckpointDispatcher,
- UniversalStateHandler,
- ProcessStateHandler,
- TaskStateHandler)
-from twitter.thermos.common.path import TaskPath
-from twitter.thermos.common.planner import TaskPlanner
-from twitter.thermos.config.loader import (
- ThermosConfigLoader,
- ThermosProcessWrapper,
- ThermosTaskWrapper,
- ThermosTaskValidator)
-from twitter.thermos.config.schema import ThermosContext
-
-from gen.twitter.thermos.ttypes import (
- ProcessState,
- ProcessStatus,
- RunnerCkpt,
- RunnerHeader,
- RunnerState,
- TaskState,
- TaskStatus,
-)
-
-from .helper import TaskRunnerHelper
-from .muxer import ProcessMuxer
-from .process import Process
-
-from pystachio import Environment
-
-
-# TODO(wickman) Currently this is messy because of all the private access into ._runner.
-# Clean this up by giving the TaskRunnerProcessHandler the components it should own, and
-# create a legitimate API contract into the Runner.
-class TaskRunnerProcessHandler(ProcessStateHandler):
- """
- Accesses these parts of the runner:
-
- | _task_processes [array set, pop]
- | _task_process_from_process_name [process name / sequence number => Process]
- | _watcher [ProcessMuxer.register, unregister]
- | _plan [add_success, add_failure, set_running]
- """
-
- def __init__(self, runner):
- self._runner = runner
-
- def on_waiting(self, process_update):
- log.debug('Process on_waiting %s' % process_update)
- self._runner._task_processes[process_update.process] = (
- self._runner._task_process_from_process_name(
- process_update.process, process_update.seq + 1))
- self._runner._watcher.register(process_update.process, process_update.seq - 1)
-
- def on_forked(self, process_update):
- log.debug('Process on_forked %s' % process_update)
- task_process = self._runner._task_processes[process_update.process]
- task_process.rebind(process_update.coordinator_pid, process_update.fork_time)
- self._runner._plan.set_running(process_update.process)
-
- def on_running(self, process_update):
- log.debug('Process on_running %s' % process_update)
- self._runner._plan.set_running(process_update.process)
-
- def _cleanup(self, process_update):
- if not self._runner._recovery:
- TaskRunnerHelper.kill_process(self._runner.state, process_update.process)
-
- def on_success(self, process_update):
- log.debug('Process on_success %s' % process_update)
- log.info('Process(%s) finished successfully [rc=%s]' % (
- process_update.process, process_update.return_code))
- self._cleanup(process_update)
- self._runner._task_processes.pop(process_update.process)
- self._runner._watcher.unregister(process_update.process)
- self._runner._plan.add_success(process_update.process)
-
- def _on_abnormal(self, process_update):
- log.info('Process %s had an abnormal termination' % process_update.process)
- self._runner._task_processes.pop(process_update.process)
- self._runner._watcher.unregister(process_update.process)
-
- def on_failed(self, process_update):
- log.debug('Process on_failed %s' % process_update)
- log.info('Process(%s) failed [rc=%s]' % (process_update.process, process_update.return_code))
- self._cleanup(process_update)
- self._on_abnormal(process_update)
- self._runner._plan.add_failure(process_update.process)
- if process_update.process in self._runner._plan.failed:
- log.info('Process %s reached maximum failures, marking process run failed.' %
- process_update.process)
- else:
- log.info('Process %s under maximum failure limit, restarting.' % process_update.process)
-
- def on_lost(self, process_update):
- log.debug('Process on_lost %s' % process_update)
- self._cleanup(process_update)
- self._on_abnormal(process_update)
- self._runner._plan.lost(process_update.process)
-
- def on_killed(self, process_update):
- log.debug('Process on_killed %s' % process_update)
- self._cleanup(process_update)
- self._runner._task_processes.pop(process_update.process)
- self._runner._watcher.unregister(process_update.process)
- log.debug('Process killed, marking it as a loss.')
- self._runner._plan.lost(process_update.process)
-
-
-class TaskRunnerTaskHandler(TaskStateHandler):
- """
- Accesses these parts of the runner:
- _plan [set to regular_plan or finalizing_plan]
- _recovery [boolean, whether or not to side-effect]
- _pathspec [path creation]
- _task [ThermosTask]
- _set_finalization_start
- _kill
- """
-
- def __init__(self, runner):
- self._runner = runner
- self._pathspec = self._runner._pathspec
-
- def on_active(self, task_update):
- log.debug('Task on_active(%s)' % task_update)
- self._runner._plan = self._runner._regular_plan
- if self._runner._recovery:
- return
- TaskRunnerHelper.initialize_task(self._pathspec,
- ThermosTaskWrapper(self._runner._task).to_json())
-
- def on_cleaning(self, task_update):
- log.debug('Task on_cleaning(%s)' % task_update)
- self._runner._finalization_start = task_update.timestamp_ms / 1000.0
- self._runner._terminate_plan(self._runner._regular_plan)
-
- def on_finalizing(self, task_update):
- log.debug('Task on_finalizing(%s)' % task_update)
- if not self._runner._recovery:
- self._runner._kill()
- self._runner._plan = self._runner._finalizing_plan
- if self._runner._finalization_start is None:
- self._runner._finalization_start = task_update.timestamp_ms / 1000.0
-
- def on_killed(self, task_update):
- log.debug('Task on_killed(%s)' % task_update)
- self._cleanup()
-
- def on_success(self, task_update):
- log.debug('Task on_success(%s)' % task_update)
- self._cleanup()
- log.info('Task succeeded.')
-
- def on_failed(self, task_update):
- log.debug('Task on_failed(%s)' % task_update)
- self._cleanup()
-
- def on_lost(self, task_update):
- log.debug('Task on_lost(%s)' % task_update)
- self._cleanup()
-
- def _cleanup(self):
- if not self._runner._recovery:
- self._runner._kill()
- TaskRunnerHelper.finalize_task(self._pathspec)
-
-
-class TaskRunnerUniversalHandler(UniversalStateHandler):
- """
- Universal handler to checkpoint every process and task transition of the runner.
-
- Accesses these parts of the runner:
- _ckpt_write
- """
-
- def __init__(self, runner):
- self._runner = runner
-
- def _checkpoint(self, record):
- self._runner._ckpt_write(record)
-
- def on_process_transition(self, state, process_update):
- log.debug('_on_process_transition: %s' % process_update)
- self._checkpoint(RunnerCkpt(process_status=process_update))
-
- def on_task_transition(self, state, task_update):
- log.debug('_on_task_transition: %s' % task_update)
- self._checkpoint(RunnerCkpt(task_status=task_update))
-
- def on_initialization(self, header):
- log.debug('_on_initialization: %s' % header)
- ThermosTaskValidator.assert_valid_task(self._runner.task)
- ThermosTaskValidator.assert_valid_ports(self._runner.task, header.ports)
- self._checkpoint(RunnerCkpt(runner_header=header))
-
-
-class TaskRunnerStage(object):
- """
- A stage of the task runner pipeline.
- """
- MAX_ITERATION_WAIT = Amount(1, Time.SECONDS)
-
- def __init__(self, runner):
- self.runner = runner
- self.clock = runner._clock
-
- def run(self):
- """
- Perform any work necessary at this stage of the task.
-
- If there is no more work to be done, return None. [This will invoke a state transition.]
-
- If there is still work to be done, return the number of seconds from now in which you'd like
- to be called to re-run the plan.
- """
- return None
-
- def transition_to(self):
- """
- The stage to which we should transition.
- """
- raise NotImplementedError
-
-
-class TaskRunnerStage_ACTIVE(TaskRunnerStage):
- """
- Run the regular plan (i.e. normal, non-finalizing processes.)
- """
- MAX_ITERATION_WAIT = Amount(15, Time.SECONDS)
- MIN_ITERATION_WAIT = Amount(1, Time.SECONDS)
-
- def __init__(self, runner):
- super(TaskRunnerStage_ACTIVE, self).__init__(runner)
-
- def run(self):
- launched = self.runner._run_plan(self.runner._regular_plan)
-
- # Have we terminated?
- terminal_state = None
- if self.runner._regular_plan.is_complete():
- log.info('Regular plan complete.')
- terminal_state = TaskState.SUCCESS if self.runner.is_healthy() else TaskState.FAILED
- elif not self.runner.is_healthy():
- log.error('Regular plan unhealthy!')
- terminal_state = TaskState.FAILED
-
- if terminal_state:
- # No more work to do
- return None
- elif launched > 0:
- # We want to run ASAP after updates have been collected
- return max(self.MIN_ITERATION_WAIT.as_(Time.SECONDS), self.runner._regular_plan.min_wait())
- else:
- # We want to run as soon as something is available to run or after a prescribed timeout.
- return min(self.MAX_ITERATION_WAIT.as_(Time.SECONDS), self.runner._regular_plan.min_wait())
-
- def transition_to(self):
- return TaskState.CLEANING
-
-
-class TaskRunnerStage_CLEANING(TaskRunnerStage):
- """
- Start the cleanup of the regular plan (e.g. if it failed.) On ACTIVE -> CLEANING,
- we send SIGTERMs to all still-running processes. We wait at most finalization_wait
- for all processes to complete before SIGKILLs are sent. If everything exits cleanly
- prior to that point in time, we transition to FINALIZING, which kicks into gear
- the finalization schedule (if any.)
- """
- def run(self):
- log.debug('TaskRunnerStage[CLEANING]: Finalization remaining: %s' %
- self.runner._finalization_remaining())
- if self.runner._finalization_remaining() > 0 and self.runner.has_running_processes():
- return min(self.runner._finalization_remaining(), self.MAX_ITERATION_WAIT.as_(Time.SECONDS))
-
- def transition_to(self):
- if self.runner._finalization_remaining() <= 0:
- log.info('Exceeded finalization wait, skipping finalization.')
- return self.runner.terminal_state()
- return TaskState.FINALIZING
-
-
-class TaskRunnerStage_FINALIZING(TaskRunnerStage):
- """
- Run the finalizing plan, specifically the plan of tasks with the 'final'
- bit marked (e.g. log savers, checkpointers and the like.) Anything in this
- plan will be SIGKILLed if we go over the finalization_wait.
- """
-
- def run(self):
- self.runner._run_plan(self.runner._finalizing_plan)
- log.debug('TaskRunnerStage[FINALIZING]: Finalization remaining: %s' %
- self.runner._finalization_remaining())
- if self.runner.deadlocked(self.runner._finalizing_plan):
- log.warning('Finalizing plan deadlocked.')
- return None
- if self.runner._finalization_remaining() > 0 and not self.runner._finalizing_plan.is_complete():
- return min(self.runner._finalization_remaining(), self.MAX_ITERATION_WAIT.as_(Time.SECONDS))
-
- def transition_to(self):
- if self.runner._finalization_remaining() <= 0:
- log.info('Exceeded finalization wait, terminating finalization.')
- return self.runner.terminal_state()
-
-
-class TaskRunner(object):
- """
- Run a ThermosTask.
-
- This class encapsulates the core logic to run and control the state of a Thermos task.
- Typically, it will be instantiated directly to control a new task, but a TaskRunner can also be
- synthesised from an existing task's checkpoint root
- """
- class Error(Exception): pass
- class InvalidTask(Error): pass
- class InternalError(Error): pass
- class PermissionError(Error): pass
- class StateError(Error): pass
-
- # Maximum amount of time we spend waiting for new updates from the checkpoint streams
- # before doing housecleaning (checking for LOST tasks, dead PIDs.)
- MAX_ITERATION_TIME = Amount(10, Time.SECONDS)
-
- # Minimum amount of time we wait between polls for updates on coordinator checkpoints.
- COORDINATOR_INTERVAL_SLEEP = Amount(1, Time.SECONDS)
-
- # Amount of time we're willing to wait after forking before we expect the runner to have
- # exec'ed the child process.
- LOST_TIMEOUT = Amount(60, Time.SECONDS)
-
- # Active task stages
- STAGES = {
- TaskState.ACTIVE: TaskRunnerStage_ACTIVE,
- TaskState.CLEANING: TaskRunnerStage_CLEANING,
- TaskState.FINALIZING: TaskRunnerStage_FINALIZING
- }
-
- @classmethod
- def get(cls, task_id, checkpoint_root):
- """
- Get a TaskRunner bound to the task_id in checkpoint_root.
- """
- path = TaskPath(root=checkpoint_root, task_id=task_id, state='active')
- task_json = path.getpath('task_path')
- task_checkpoint = path.getpath('runner_checkpoint')
- if not os.path.exists(task_json):
- return None
- task = ThermosConfigLoader.load_json(task_json)
- if task is None:
- return None
- if len(task.tasks()) == 0:
- return None
- try:
- checkpoint = CheckpointDispatcher.from_file(task_checkpoint)
- if checkpoint is None or checkpoint.header is None:
- return None
- return cls(task.tasks()[0].task(), checkpoint_root, checkpoint.header.sandbox,
- log_dir=checkpoint.header.log_dir, task_id=task_id,
- portmap=checkpoint.header.ports)
- except Exception as e:
- log.error('Failed to reconstitute checkpoint in TaskRunner.get: %s' % e, exc_info=True)
- return None
-
- def __init__(self, task, checkpoint_root, sandbox, log_dir=None,
- task_id=None, portmap=None, user=None, chroot=False, clock=time,
- universal_handler=None, planner_class=TaskPlanner):
- """
- required:
- task (config.Task) = the task to run
- checkpoint_root (path) = the checkpoint root
- sandbox (path) = the sandbox in which the path will be run
- [if None, cwd will be assumed, but garbage collection will be
- disabled for this task.]
-
- optional:
- log_dir (string) = directory to house stdout/stderr logs. If not specified, logs will be
- written into the sandbox directory under .logs/
- task_id (string) = bind to this task id. if not specified, will synthesize an id based
- upon task.name()
- portmap (dict) = a map (string => integer) from name to port, e.g. { 'http': 80 }
- user (string) = the user to run the task as. if not current user, requires setuid
- privileges.
- chroot (boolean) = whether or not to chroot into the sandbox prior to exec.
- clock (time interface) = the clock to use throughout
- universal_handler = checkpoint record handler (only used for testing)
- planner_class (TaskPlanner class) = TaskPlanner class to use for constructing the task
- planning policy.
- """
- if not issubclass(planner_class, TaskPlanner):
- raise TypeError('planner_class must be a TaskPlanner.')
- self._clock = clock
- launch_time = self._clock.time()
- launch_time_ms = '%06d' % int((launch_time - int(launch_time)) * 10**6)
- if not task_id:
- self._task_id = '%s-%s.%s' % (task.name(),
- time.strftime('%Y%m%d-%H%M%S', time.localtime(launch_time)),
- launch_time_ms)
- else:
- self._task_id = task_id
- current_user = TaskRunnerHelper.get_actual_user()
- self._user = user or current_user
- # TODO(wickman) This should be delegated to the ProcessPlatform / Helper
- if self._user != current_user:
- if os.geteuid() != 0:
- raise ValueError('task specifies user as %s, but %s does not have setuid permission!' % (
- self._user, current_user))
- self._portmap = portmap or {}
- self._launch_time = launch_time
- self._log_dir = log_dir or os.path.join(sandbox, '.logs')
- self._pathspec = TaskPath(root=checkpoint_root, task_id=self._task_id, log_dir=self._log_dir)
- try:
- ThermosTaskValidator.assert_valid_task(task)
- ThermosTaskValidator.assert_valid_ports(task, self._portmap)
- except ThermosTaskValidator.InvalidTaskError as e:
- raise self.InvalidTask('Invalid task: %s' % e)
- context = ThermosContext(
- task_id=self._task_id,
- ports=self._portmap,
- user=self._user)
- self._task, uninterp = (task % Environment(thermos=context)).interpolate()
- if len(uninterp) > 0:
- raise self.InvalidTask('Failed to interpolate task, missing: %s' %
- ', '.join(str(ref) for ref in uninterp))
- try:
- ThermosTaskValidator.assert_same_task(self._pathspec, self._task)
- except ThermosTaskValidator.InvalidTaskError as e:
- raise self.InvalidTask('Invalid task: %s' % e)
- self._plan = None # plan currently being executed (updated by Handlers)
- self._regular_plan = planner_class(self._task, clock=clock,
- process_filter=lambda proc: proc.final().get() == False)
- self._finalizing_plan = planner_class(self._task, clock=clock,
- process_filter=lambda proc: proc.final().get() == True)
- self._chroot = chroot
- self._sandbox = sandbox
- self._terminal_state = None
- self._ckpt = None
- self._process_map = dict((p.name().get(), p) for p in self._task.processes())
- self._task_processes = {}
- self._stages = dict((state, stage(self)) for state, stage in self.STAGES.items())
- self._finalization_start = None
- self._preemption_deadline = None
- self._watcher = ProcessMuxer(self._pathspec)
- self._state = RunnerState(processes = {})
-
- # create runner state
- universal_handler = universal_handler or TaskRunnerUniversalHandler
- self._dispatcher = CheckpointDispatcher()
- self._dispatcher.register_handler(universal_handler(self))
- self._dispatcher.register_handler(TaskRunnerProcessHandler(self))
- self._dispatcher.register_handler(TaskRunnerTaskHandler(self))
-
- # recover checkpointed runner state and update plan
- self._recovery = True
- self._replay_runner_ckpt()
-
- @property
- def task(self):
- return self._task
-
- @property
- def task_id(self):
- return self._task_id
-
- @property
- def state(self):
- return self._state
-
- @property
- def processes(self):
- return self._task_processes
-
- def task_state(self):
- return self._state.statuses[-1].state if self._state.statuses else TaskState.ACTIVE
-
- def close_ckpt(self):
- """Force close the checkpoint stream. This is necessary for runners terminated through
- exception propagation."""
- log.debug('Closing the checkpoint stream.')
- self._ckpt.close()
-
- @contextmanager
- def control(self, force=False):
- """
- Bind to the checkpoint associated with this task, position to the end of the log if
- it exists, or create it if it doesn't. Fails if we cannot get "leadership" i.e. a
- file lock on the checkpoint stream.
- """
- if self.is_terminal():
- raise TaskRunner.StateError('Cannot take control of a task in terminal state.')
- if self._sandbox:
- safe_mkdir(self._sandbox)
- ckpt_file = self._pathspec.getpath('runner_checkpoint')
- try:
- self._ckpt = TaskRunnerHelper.open_checkpoint(ckpt_file, force=force, state=self._state)
- except TaskRunnerHelper.PermissionError:
- raise TaskRunner.PermissionError('Unable to open checkpoint %s' % ckpt_file)
- log.debug('Flipping recovery mode off.')
- self._recovery = False
- self._set_task_status(self.task_state())
- self._resume_task()
- try:
- yield
- except Exception as e:
- log.error('Caught exception in self.control(): %s' % e)
- log.error(' %s' % traceback.format_exc())
- self._ckpt.close()
-
- def _resume_task(self):
- assert self._ckpt is not None
- unapplied_updates = self._replay_process_ckpts()
- if self.is_terminal():
- raise self.StateError('Cannot resume terminal task.')
- self._initialize_ckpt_header()
- self._replay(unapplied_updates)
-
- def _ckpt_write(self, record):
- """
- Write to the checkpoint stream if we're not in recovery mode.
- """
- if not self._recovery:
- self._ckpt.write(record)
-
- def _replay(self, checkpoints):
- """
- Replay a sequence of RunnerCkpts.
- """
- for checkpoint in checkpoints:
- self._dispatcher.dispatch(self._state, checkpoint)
-
- def _replay_runner_ckpt(self):
- """
- Replay the checkpoint stream associated with this task.
- """
- ckpt_file = self._pathspec.getpath('runner_checkpoint')
- if os.path.exists(ckpt_file):
- fp = open(ckpt_file, "r")
- ckpt_recover = ThriftRecordReader(fp, RunnerCkpt)
- for record in ckpt_recover:
- log.debug('Replaying runner checkpoint record: %s' % record)
- self._dispatcher.dispatch(self._state, record, recovery=True)
- ckpt_recover.close()
-
- def _replay_process_ckpts(self):
- """
- Replay the unmutating process checkpoints. Return the unapplied process updates that
- would mutate the runner checkpoint stream.
- """
- process_updates = self._watcher.select()
- unapplied_process_updates = []
- for process_update in process_updates:
- if self._dispatcher.would_update(self._state, process_update):
- unapplied_process_updates.append(process_update)
- else:
- self._dispatcher.dispatch(self._state, process_update, recovery=True)
- return unapplied_process_updates
-
- def _initialize_ckpt_header(self):
- """
- Initializes the RunnerHeader for this checkpoint stream if it has not already
- been constructed.
- """
- if self._state.header is None:
- header = RunnerHeader(
- task_id=self._task_id,
- launch_time_ms=int(self._launch_time*1000),
- sandbox=self._sandbox,
- log_dir=self._log_dir,
- hostname=socket.gethostname(),
- user=self._user,
- ports=self._portmap)
- runner_ckpt = RunnerCkpt(runner_header=header)
- self._dispatcher.dispatch(self._state, runner_ckpt)
-
- def _set_task_status(self, state):
- update = TaskStatus(state=state, timestamp_ms=int(self._clock.time() * 1000),
- runner_pid=os.getpid(), runner_uid=os.getuid())
- runner_ckpt = RunnerCkpt(task_status=update)
- self._dispatcher.dispatch(self._state, runner_ckpt, self._recovery)
-
- def _finalization_remaining(self):
- # If a preemption deadline has been set, use that.
- if self._preemption_deadline:
- return max(0, self._preemption_deadline - self._clock.time())
-
- # Otherwise, use the finalization wait provided in the configuration.
- finalization_allocation = self.task.finalization_wait().get()
- if self._finalization_start is None:
- return sys.float_info.max
- else:
- waited = max(0, self._clock.time() - self._finalization_start)
- return max(0, finalization_allocation - waited)
-
- def _set_process_status(self, process_name, process_state, **kw):
- if 'sequence_number' in kw:
- sequence_number = kw.pop('sequence_number')
- log.debug('_set_process_status(%s <= %s, seq=%s[force])' % (process_name,
- ProcessState._VALUES_TO_NAMES.get(process_state), sequence_number))
- else:
- current_run = self._current_process_run(process_name)
- if not current_run:
- assert process_state == ProcessState.WAITING
- sequence_number = 0
- else:
- sequence_number = current_run.seq + 1
- log.debug('_set_process_status(%s <= %s, seq=%s[auto])' % (process_name,
- ProcessState._VALUES_TO_NAMES.get(process_state), sequence_number))
- runner_ckpt = RunnerCkpt(process_status=ProcessStatus(
- process=process_name, state=process_state, seq=sequence_number, **kw))
- self._dispatcher.dispatch(self._state, runner_ckpt, self._recovery)
-
- def _task_process_from_process_name(self, process_name, sequence_number):
- """
- Construct a Process() object from a process_name, populated with its
- correct run number and fully interpolated commandline.
- """
- run_number = len(self.state.processes[process_name]) - 1
- pathspec = self._pathspec.given(process=process_name, run=run_number)
- process = self._process_map.get(process_name)
- if process is None:
- raise self.InternalError('FATAL: Could not find process: %s' % process_name)
- def close_ckpt_and_fork():
- pid = os.fork()
- if pid == 0 and self._ckpt is not None:
- self._ckpt.close()
- return pid
- return Process(
- process.name().get(),
- process.cmdline().get(),
- sequence_number,
- pathspec,
- self._sandbox,
- self._user,
- chroot=self._chroot,
- fork=close_ckpt_and_fork)
-
- def deadlocked(self, plan=None):
- """Check whether a plan is deadlocked, i.e. there are no running/runnable processes, and the
- plan is not complete."""
- plan = plan or self._regular_plan
- now = self._clock.time()
- running = list(plan.running)
- runnable = list(plan.runnable_at(now))
- waiting = list(plan.waiting_at(now))
- log.debug('running:%d runnable:%d waiting:%d complete:%s' % (
- len(running), len(runnable), len(waiting), plan.is_complete()))
- return len(running + runnable + waiting) == 0 and not plan.is_complete()
-
- def is_healthy(self):
- """Check whether the TaskRunner is healthy. A healthy TaskRunner is not deadlocked and has not
- reached its max_failures count."""
- max_failures = self._task.max_failures().get()
- deadlocked = self.deadlocked()
- under_failure_limit = max_failures == 0 or len(self._regular_plan.failed) < max_failures
- log.debug('max_failures:%d failed:%d under_failure_limit:%s deadlocked:%s ==> health:%s' % (
- max_failures, len(self._regular_plan.failed), under_failure_limit, deadlocked,
- not deadlocked and under_failure_limit))
- return not deadlocked and under_failure_limit
-
- def _current_process_run(self, process_name):
- if process_name not in self._state.processes or len(self._state.processes[process_name]) == 0:
- return None
- return self._state.processes[process_name][-1]
-
- def is_process_lost(self, process_name):
- """Determine whether or not we should mark a task as LOST and do so if necessary."""
- current_run = self._current_process_run(process_name)
- if not current_run:
- raise self.InternalError('No current_run for process %s!' % process_name)
-
- def forked_but_never_came_up():
- return current_run.state == ProcessState.FORKED and (
- self._clock.time() - current_run.fork_time > TaskRunner.LOST_TIMEOUT.as_(Time.SECONDS))
-
- def running_but_coordinator_died():
- if current_run.state != ProcessState.RUNNING:
- return False
- coordinator_pid, _, _ = TaskRunnerHelper.scan_process(self.state, process_name)
- if coordinator_pid is not None:
- return False
- elif self._watcher.has_data(process_name):
- return False
- return True
-
- if forked_but_never_came_up() or running_but_coordinator_died():
- log.info('Detected a LOST task: %s' % current_run)
- log.debug(' forked_but_never_came_up: %s' % forked_but_never_came_up())
- log.debug(' running_but_coordinator_died: %s' % running_but_coordinator_died())
- return True
-
- return False
-
- def _run_plan(self, plan):
- log.debug('Schedule pass:')
-
- running = list(plan.running)
- log.debug('running: %s' % ' '.join(plan.running))
- log.debug('finished: %s' % ' '.join(plan.finished))
-
- launched = []
- for process_name in plan.running:
- if self.is_process_lost(process_name):
- self._set_process_status(process_name, ProcessState.LOST)
-
- now = self._clock.time()
- runnable = list(plan.runnable_at(now))
- waiting = list(plan.waiting_at(now))
- log.debug('runnable: %s' % ' '.join(runnable))
- log.debug('waiting: %s' % ' '.join(
- '%s[T-%.1fs]' % (process, plan.get_wait(process)) for process in waiting))
-
- def pick_processes(process_list):
- if self._task.max_concurrency().get() == 0:
- return process_list
- num_to_pick = max(self._task.max_concurrency().get() - len(running), 0)
- return process_list[:num_to_pick]
-
- for process_name in pick_processes(runnable):
- tp = self._task_processes.get(process_name)
- if tp:
- current_run = self._current_process_run(process_name)
- assert current_run.state == ProcessState.WAITING
- else:
- self._set_process_status(process_name, ProcessState.WAITING)
- tp = self._task_processes[process_name]
- log.info('Forking Process(%s)' % process_name)
- tp.start()
- launched.append(tp)
-
- return len(launched) > 0
-
- def _terminate_plan(self, plan):
- for process in plan.running:
- last_run = self._current_process_run(process)
- if last_run and last_run.state in (ProcessState.FORKED, ProcessState.RUNNING):
- TaskRunnerHelper.terminate_process(self.state, process)
-
- def has_running_processes(self):
- """
- Returns True if any processes associated with this task have active pids.
- """
- process_tree = TaskRunnerHelper.scantree(self.state)
- return any(any(process_set) for process_set in process_tree.values())
-
- def has_active_processes(self):
- """
- Returns True if any processes are in non-terminal states.
- """
- return any(not TaskRunnerHelper.is_process_terminal(run.state) for run in
- filter(None, (self._current_process_run(process) for process in self.state.processes)))
-
- def collect_updates(self, timeout=None):
- """
- Collects and applies updates from process checkpoint streams. Returns the number
- of applied process checkpoints.
- """
- if self.has_active_processes():
- sleep_interval = self.COORDINATOR_INTERVAL_SLEEP.as_(Time.SECONDS)
- total_time = 0.0
- while True:
- process_updates = self._watcher.select()
- for process_update in process_updates:
- self._dispatcher.dispatch(self._state, process_update, self._recovery)
- if process_updates:
- return len(process_updates)
- if timeout and total_time >= timeout:
- break
- total_time += sleep_interval
- self._clock.sleep(sleep_interval)
- return 0
-
- def is_terminal(self):
- return TaskRunnerHelper.is_task_terminal(self.task_state())
-
- def terminal_state(self):
- if self._terminal_state:
- log.debug('Forced terminal state: %s' %
- TaskState._VALUES_TO_NAMES.get(self._terminal_state, 'UNKNOWN'))
- return self._terminal_state
- else:
- return TaskState.SUCCESS if self.is_healthy() else TaskState.FAILED
-
- def run(self, force=False):
- """
- Entrypoint to runner. Assume control of checkpoint stream, and execute TaskRunnerStages
- until runner is terminal.
- """
- if self.is_terminal():
- return
- with self.control(force):
- self._run()
-
- def _run(self):
- iteration_time = self.MAX_ITERATION_TIME.as_(Time.SECONDS)
- while not self.is_terminal():
- start = self._clock.time()
- # step 1: execute stage corresponding to the state we're currently in
- runner = self._stages[self.task_state()]
- iteration_wait = runner.run()
- if iteration_wait is None:
- log.debug('Run loop: No more work to be done in state %s' %
- TaskState._VALUES_TO_NAMES.get(self.task_state(), 'UNKNOWN'))
- self._set_task_status(runner.transition_to())
- continue
- log.debug('Run loop: Work to be done within %.1fs' % iteration_wait)
- # step 2: check child process checkpoint streams for updates
- if not self.collect_updates(iteration_wait):
- # If we don't collect any updates, at least 'touch' the checkpoint stream
- # so as to prevent garbage collection.
- elapsed = self._clock.time() - start
- if elapsed < iteration_wait:
- log.debug('Update collection only took %.1fs, idling %.1fs' % (
- elapsed, iteration_wait - elapsed))
- self._clock.sleep(iteration_wait - elapsed)
- log.debug('Run loop: No updates collected, touching checkpoint.')
- os.utime(self._pathspec.getpath('runner_checkpoint'), None)
- # step 3: reap any zombie child processes
- TaskRunnerHelper.reap_children()
-
- def kill(self, force=False, terminal_status=TaskState.KILLED,
- preemption_wait=Amount(1, Time.MINUTES)):
- """
- Kill all processes associated with this task and set task/process states as terminal_status
- (defaults to KILLED)
- """
- log.debug('Runner issued kill: force:%s, preemption_wait:%s' % (
- force, preemption_wait))
- assert terminal_status in (TaskState.KILLED, TaskState.LOST)
- self._preemption_deadline = self._clock.time() + preemption_wait.as_(Time.SECONDS)
- with self.control(force):
- if self.is_terminal():
- log.warning('Task is not in ACTIVE state, cannot issue kill.')
- return
- self._terminal_state = terminal_status
- if self.task_state() == TaskState.ACTIVE:
- self._set_task_status(TaskState.CLEANING)
- self._run()
-
- def lose(self, force=False):
- """
- Mark a task as LOST and kill any straggling processes.
- """
- self.kill(force, preemption_wait=Amount(0, Time.SECONDS), terminal_status=TaskState.LOST)
-
- def _kill(self):
- processes = TaskRunnerHelper.scantree(self._state)
- for process, pid_tuple in processes.items():
- current_run = self._current_process_run(process)
- coordinator_pid, pid, tree = pid_tuple
- if TaskRunnerHelper.is_process_terminal(current_run.state):
- if coordinator_pid or pid or tree:
- log.warning('Terminal process (%s) still has running pids:' % process)
- log.warning(' coordinator_pid: %s' % coordinator_pid)
- log.warning(' pid: %s' % pid)
- log.warning(' tree: %s' % tree)
- TaskRunnerHelper.kill_process(self.state, process)
- else:
- if coordinator_pid or pid or tree:
- log.info('Transitioning %s to KILLED' % process)
- self._set_process_status(process, ProcessState.KILLED,
- stop_time=self._clock.time(), return_code=-1)
- else:
- log.info('Transitioning %s to LOST' % process)
- if current_run.state != ProcessState.WAITING:
- self._set_process_status(process, ProcessState.LOST)
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/twitter/thermos/monitoring/BUILD
----------------------------------------------------------------------
diff --git a/src/main/python/twitter/thermos/monitoring/BUILD b/src/main/python/twitter/thermos/monitoring/BUILD
deleted file mode 100644
index a977878..0000000
--- a/src/main/python/twitter/thermos/monitoring/BUILD
+++ /dev/null
@@ -1,97 +0,0 @@
-import os
-
-python_library(
- name = 'detector',
- sources = ['detector.py'],
- dependencies = [
- pants('src/main/python/twitter/thermos/common:path')
- ]
-)
-
-python_library(
- name = 'garbage',
- sources = ['garbage.py'],
- dependencies = [
- pants(':detector'),
- pants('aurora/twitterdeps/src/python/twitter/common/dirutil'),
- pants('aurora/twitterdeps/src/python/twitter/common/lang'),
- pants('aurora/twitterdeps/src/python/twitter/common/quantity'),
- pants('src/main/python/twitter/thermos/common:ckpt'),
- pants('src/main/python/twitter/thermos/common:path'),
- ]
-)
-
-python_library(
- name = 'monitor',
- sources = ['monitor.py'],
- dependencies = [
- pants('aurora/twitterdeps/src/python/twitter/common/log'),
- pants('aurora/twitterdeps/src/python/twitter/common/recordio:recordio-thrift'),
- pants('src/main/python/twitter/thermos/common:ckpt'),
- pants('src/main/thrift/com/twitter/thermos:py-thrift'),
- ]
-)
-
-python_library(
- name = 'disk',
- sources = ['disk.py'],
- dependencies = [
- pants('aurora/twitterdeps/src/python/twitter/common/dirutil'),
- pants('aurora/twitterdeps/src/python/twitter/common/exceptions'),
- pants('aurora/twitterdeps/src/python/twitter/common/lang'),
- pants('aurora/twitterdeps/src/python/twitter/common/log'),
- pants('aurora/twitterdeps/src/python/twitter/common/quantity'),
- python_requirement('watchdog'),
- ]
-)
-
-python_library(
- name = 'process',
- sources = ['process.py'],
-)
-
-python_library(
- name = 'process_collector_psutil',
- sources = ['process_collector_psutil.py'],
- dependencies = [
- pants(':process'),
- pants('aurora/twitterdeps/src/python/twitter/common/log'),
- pants('src/main/python/twitter/thermos:psutil'),
- ]
-)
-
-python_library(
- name = 'resource',
- sources = ['resource.py'],
- dependencies = [
- pants(':disk'),
- pants(':monitor'),
- pants(':process'),
- pants(':process_collector_psutil'),
- pants('aurora/twitterdeps/src/python/twitter/common/collections'),
- pants('aurora/twitterdeps/src/python/twitter/common/concurrent'),
- pants('aurora/twitterdeps/src/python/twitter/common/lang'),
- pants('aurora/twitterdeps/src/python/twitter/common/log'),
- pants('aurora/twitterdeps/src/python/twitter/common/quantity'),
- ]
-)
-
-python_library(
- name = 'monitoring',
- dependencies = [
- pants(':detector'),
- pants(':disk'),
- pants(':garbage'),
- pants(':monitor'),
- pants(':process'),
- pants(':resource'),
-
- # covering dependency for common
- pants('src/main/python/twitter/thermos/common'),
- ],
- provides = setup_py(
- name = 'twitter.thermos.monitoring',
- version = open(os.path.join(get_buildroot(), '.auroraversion')).read().strip().lower(),
- description = 'Thermos monitoring library.',
- )
-)
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/twitter/thermos/monitoring/__init__.py
----------------------------------------------------------------------
diff --git a/src/main/python/twitter/thermos/monitoring/__init__.py b/src/main/python/twitter/thermos/monitoring/__init__.py
deleted file mode 100644
index e69de29..0000000
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/twitter/thermos/monitoring/detector.py
----------------------------------------------------------------------
diff --git a/src/main/python/twitter/thermos/monitoring/detector.py b/src/main/python/twitter/thermos/monitoring/detector.py
deleted file mode 100644
index ed48c93..0000000
--- a/src/main/python/twitter/thermos/monitoring/detector.py
+++ /dev/null
@@ -1,91 +0,0 @@
-"""Detect Thermos tasks on disk
-
-This module contains the TaskDetector, used to detect Thermos tasks within a given checkpoint root.
-
-"""
-
-import glob
-import os
-import re
-
-from twitter.thermos.common.path import TaskPath
-
-
-class TaskDetector(object):
- """
- Helper class in front of TaskPath to detect active/finished/running tasks. Performs no
- introspection on the state of a task; merely detects based on file paths on disk.
- """
- class MatchingError(Exception): pass
-
- def __init__(self, root):
- self._root_dir = root
- self._pathspec = TaskPath()
-
- def get_task_ids(self, state=None):
- paths = glob.glob(self._pathspec.given(root=self._root_dir,
- task_id="*",
- state=state or '*')
- .getpath('task_path'))
- path_re = re.compile(self._pathspec.given(root=re.escape(self._root_dir),
- task_id="(\S+)",
- state='(\S+)')
- .getpath('task_path'))
- for path in paths:
- try:
- task_state, task_id = path_re.match(path).groups()
- except:
- continue
- if state is None or task_state == state:
- yield (task_state, task_id)
-
- def get_process_runs(self, task_id, log_dir):
- paths = glob.glob(self._pathspec.given(root=self._root_dir,
- task_id=task_id,
- log_dir=log_dir,
- process='*',
- run='*')
- .getpath('process_logdir'))
- path_re = re.compile(self._pathspec.given(root=re.escape(self._root_dir),
- task_id=re.escape(task_id),
- log_dir=log_dir,
- process='(\S+)',
- run='(\d+)')
- .getpath('process_logdir'))
- for path in paths:
- try:
- process, run = path_re.match(path).groups()
- except:
- continue
- yield process, int(run)
-
- def get_process_logs(self, task_id, log_dir):
- for process, run in self.get_process_runs(task_id, log_dir):
- for logtype in ('stdout', 'stderr'):
- path = (self._pathspec.with_filename(logtype).given(root=self._root_dir,
- task_id=task_id,
- log_dir=log_dir,
- process=process,
- run=run)
- .getpath('process_logdir'))
- if os.path.exists(path):
- yield path
-
- def get_checkpoint(self, task_id):
- return self._pathspec.given(root=self._root_dir, task_id=task_id).getpath('runner_checkpoint')
-
- def get_process_checkpoints(self, task_id):
- matching_paths = glob.glob(self._pathspec.given(root=self._root_dir,
- task_id=task_id,
- process='*')
- .getpath('process_checkpoint'))
- path_re = re.compile(self._pathspec.given(root=re.escape(self._root_dir),
- task_id=re.escape(task_id),
- process='(\S+)')
- .getpath('process_checkpoint'))
- for path in matching_paths:
- try:
- process, = path_re.match(path).groups()
- except:
- continue
- yield path
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/twitter/thermos/monitoring/disk.py
----------------------------------------------------------------------
diff --git a/src/main/python/twitter/thermos/monitoring/disk.py b/src/main/python/twitter/thermos/monitoring/disk.py
deleted file mode 100644
index 9116746..0000000
--- a/src/main/python/twitter/thermos/monitoring/disk.py
+++ /dev/null
@@ -1,181 +0,0 @@
-"""Sample disk usage under a particular path
-
-This module provides threads which can be used to gather information on the disk utilisation
-under a particular path.
-
-Currently, there are two threads available:
- - DiskCollectorThread, which periodically uses a basic brute-force approach (os.stat()ing every
- file within the path)
- - InotifyDiskCollectorThread, which updates disk utilisation dynamically by using inotify to
- monitor disk changes within the path
-
-"""
-
-import os
-import threading
-import time
-from Queue import Empty, Queue
-
-from twitter.common import log
-from twitter.common.dirutil import du, safe_bsize
-from twitter.common.exceptions import ExceptionalThread
-from twitter.common.lang import Lockable
-from twitter.common.quantity import Amount, Time
-
-from watchdog.observers import Observer as WatchdogObserver
-from watchdog.events import (
- FileSystemEventHandler,
- FileCreatedEvent,
- FileDeletedEvent,
- FileModifiedEvent,
- FileMovedEvent,
-)
-
-
-class DiskCollectorThread(ExceptionalThread):
- """ Thread to calculate aggregate disk usage under a given path using a simple algorithm """
- def __init__(self, path):
- self.path = path
- self.value = None
- self.event = threading.Event()
- super(DiskCollectorThread, self).__init__()
- self.daemon = True
-
- def run(self):
- log.debug("DiskCollectorThread: starting collection of %s" % self.path)
- self.value = du(self.path)
- log.debug("DiskCollectorThread: finished collection of %s" % self.path)
- self.event.set()
-
- def finished(self):
- return self.event.is_set()
-
-
-class DiskCollector(Lockable):
- """ Spawn a background thread to sample disk usage """
- def __init__(self, root):
- self._root = root
- self._thread = None
- self._value = 0
- super(DiskCollector, self).__init__()
-
- @Lockable.sync
- def sample(self):
- """ Trigger collection of sample, if not already begun """
- if self._thread is None:
- self._thread = DiskCollectorThread(self._root)
- self._thread.start()
-
- @property
- @Lockable.sync
- def value(self):
- """ Retrieve value of disk usage """
- if self._thread is not None and self._thread.finished():
- self._value = self._thread.value
- self._thread = None
- return self._value
-
- @property
- @Lockable.sync
- def completed_event(self):
- """ Return a threading.Event that will block until an in-progress disk collection is complete,
- or block indefinitely otherwise. Use with caution! (i.e.: set a timeout) """
- if self._thread is not None:
- return self._thread.event
- else:
- return threading.Event()
-
-
-class InotifyDiskCollectorThread(ExceptionalThread, FileSystemEventHandler):
- """ Thread to calculate aggregate disk usage under a given path
-
- Note that while this thread uses inotify (through the watchdog module) to monitor disk events in
- "real time", the actual processing of events is only performed periodically (configured via
- COLLECTION_INTERVAL)
-
- """
- INTERESTING_EVENTS = (FileCreatedEvent, FileDeletedEvent, FileModifiedEvent, FileMovedEvent)
- COLLECTION_INTERVAL = Amount(5, Time.SECONDS)
-
- def __init__(self, path):
- self._path = path
- self._files = {} # file path => size (bytes)
- self._queue = Queue()
- self._observer = WatchdogObserver()
- super(InotifyDiskCollectorThread, self).__init__()
- self.daemon = True
-
- def dispatch(self, event):
- """ Dispatch all interesting events to the internal queue """
- if isinstance(event, self.INTERESTING_EVENTS):
- self._queue.put(event)
-
- def _initialize(self):
- """ Collect an initial snapshot of the disk usage in the path """
- log.debug("Starting watchdog observer to collect events...")
- self._observer.schedule(self, path=self._path, recursive=True)
- self._observer.start()
- log.debug("Collecting initial disk usage sample...")
- for root, _, files in os.walk(self._path):
- for filename in files:
- f = os.path.join(root, filename)
- self._files[f] = safe_bsize(f)
-
- def _process_events(self):
- """ Deduplicate and process watchdog events, updating the internal file store appropriately """
- file_ops = {}
-
- def remove_file(path):
- self._files.pop(path, None)
- def stat_file(path):
- self._files[path] = safe_bsize(path)
-
- while not self._to_process.empty():
- event = self._to_process.get()
- if isinstance(event, (FileCreatedEvent, FileModifiedEvent)):
- file_ops[event.src_path] = lambda: stat_file(event.src_path)
- elif isinstance(event, FileDeletedEvent):
- file_ops[event.src_path] = lambda: remove_file(event.src_path)
- elif isinstance(event, FileMovedEvent):
- file_ops[event.src_path] = lambda: remove_file(event.src_path)
- file_ops[event.dest_path] = lambda: stat_file(event.dest_path)
-
- for op in file_ops.values():
- op()
-
- def run(self):
- """ Loop indefinitely, periodically processing watchdog/inotify events. """
- self._initialize()
- log.debug("Initialization complete. Moving to handling events.")
- while True:
- next = time.time() + self.COLLECTION_INTERVAL.as_(Time.SECONDS)
- if not self._queue.empty():
- self._to_process, self._queue = self._queue, Queue()
- self._process_events()
- time.sleep(max(0, next - time.time()))
-
- @property
- def value(self):
- return sum(self._files.itervalues())
-
-
-class InotifyDiskCollector(object):
- """ Spawn a background thread to sample disk usage """
- def __init__(self, root):
- self._root = root
- self._thread = InotifyDiskCollectorThread(self._root)
-
- def sample(self):
- """ Trigger disk collection loop. """
- if not os.path.exists(self._root):
- log.error('Cannot start monitoring path until it exists')
- elif not self._thread.is_alive():
- self._thread.start()
-
- @property
- def value(self):
- return self._thread.value
-
- @property
- def completed_event(self):
- return threading.Event()
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/twitter/thermos/monitoring/garbage.py
----------------------------------------------------------------------
diff --git a/src/main/python/twitter/thermos/monitoring/garbage.py b/src/main/python/twitter/thermos/monitoring/garbage.py
deleted file mode 100644
index 4ac0d3a..0000000
--- a/src/main/python/twitter/thermos/monitoring/garbage.py
+++ /dev/null
@@ -1,183 +0,0 @@
-from abc import abstractmethod
-from collections import namedtuple
-import os
-import sys
-import time
-
-from twitter.common.dirutil import safe_delete, safe_rmtree, safe_bsize
-from twitter.common.lang import Interface
-from twitter.common.quantity import Amount, Data, Time
-from twitter.thermos.common.ckpt import CheckpointDispatcher
-from twitter.thermos.common.path import TaskPath
-
-from .detector import TaskDetector
-
-
-class TaskGarbageCollector(object):
- def __init__(self, root):
- self._root = root
- self._detector = TaskDetector(root=self._root)
- self._states = {}
-
- def state(self, task_id):
- if task_id not in self._states:
- self._states[task_id] = CheckpointDispatcher.from_file(self._detector.get_checkpoint(task_id))
- return self._states[task_id]
-
- def get_age(self, task_id):
- return os.path.getmtime(self._detector.get_checkpoint(task_id))
-
- def get_finished_tasks(self):
- return [task_id for _, task_id in self._detector.get_task_ids(state='finished')]
-
- def get_metadata(self, task_id, with_size=True):
- runner_ckpt = self._detector.get_checkpoint(task_id)
- process_ckpts = [ckpt for ckpt in self._detector.get_process_checkpoints(task_id)]
- json_spec = TaskPath(root=self._root, task_id=task_id, state='finished').getpath('task_path')
- for path in [json_spec, runner_ckpt] + process_ckpts:
- if with_size:
- yield path, safe_bsize(path)
- else:
- yield path
-
- def get_logs(self, task_id, with_size=True):
- state = self.state(task_id)
- if state and state.header:
- for path in self._detector.get_process_logs(task_id, state.header.log_dir):
- if with_size:
- yield path, safe_bsize(path)
- else:
- yield path
-
- def get_data(self, task_id, with_size=True):
- state = self.state(task_id)
- if state and state.header and state.header.sandbox:
- for root, dirs, files in os.walk(state.header.sandbox):
- for file in files:
- filename = os.path.join(root, file)
- if with_size:
- yield filename, safe_bsize(filename)
- else:
- yield filename
-
- def erase_task(self, task_id):
- self.erase_data(task_id)
- self.erase_logs(task_id)
- self.erase_metadata(task_id)
-
- def erase_metadata(self, task_id):
- for fn in self.get_metadata(task_id, with_size=False):
- safe_delete(fn)
- safe_rmtree(TaskPath(root=self._root, task_id=task_id).getpath('checkpoint_path'))
-
- def erase_logs(self, task_id):
- for fn in self.get_logs(task_id, with_size=False):
- safe_delete(fn)
- state = self.state(task_id)
- if state and state.header:
- safe_rmtree(TaskPath(root=self._root, task_id=task_id, log_dir=state.header.log_dir)
- .getpath('process_logbase'))
-
- def erase_data(self, task_id):
- # TODO(wickman)
- # This could be potentially dangerous if somebody naively runs their sandboxes in e.g.
- # $HOME or / or similar. Perhaps put a guard somewhere?
- for fn in self.get_data(task_id, with_size=False):
- os.unlink(fn)
- state = self.state(task_id)
- if state and state.header and state.header.sandbox:
- safe_rmtree(state.header.sandbox)
-
-
-class TaskGarbageCollectionPolicy(Interface):
- def __init__(self, collector):
- if not isinstance(collector, TaskGarbageCollector):
- raise ValueError(
- "Expected collector to be a TaskGarbageCollector, got %s" % collector.__class__.__name__)
- self._collector = collector
-
- @property
- def collector(self):
- return self._collector
-
- @abstractmethod
- def run(self):
- """Returns a list of task_ids that should be garbage collected given the specified policy."""
-
-
-class DefaultCollector(TaskGarbageCollectionPolicy):
- def __init__(self, collector, **kw):
- """
- Default garbage collection policy.
-
- Arguments that may be specified:
- max_age: Amount(Time) (max age of a retained task) [default: infinity]
- max_space: Amount(Data) (max space to keep) [default: infinity]
- max_tasks: int (max number of tasks to keep) [default: infinity]
- include_metadata: boolean (Whether or not to include metadata in the
- space calculations.) [default: True]
- include_logs: boolean (Whether or not to include logs in the
- space calculations.) [default: True]
- verbose: boolean (whether or not to log) [default: False]
- logger: callable (function to call with log messages) [default: sys.stdout.write]
- """
- self._max_age = kw.get('max_age', Amount(10**10, Time.DAYS))
- self._max_space = kw.get('max_space', Amount(10**10, Data.TB))
- self._max_tasks = kw.get('max_tasks', 10**10)
- self._include_metadata = kw.get('include_metadata', True)
- self._include_logs = kw.get('include_logs', True)
- self._verbose = kw.get('verbose', False)
- self._logger = kw.get('logger', sys.stdout.write)
- TaskGarbageCollectionPolicy.__init__(self, collector)
-
- def log(self, msg):
- if self._verbose:
- self._logger(msg)
-
- def run(self):
- tasks = []
- now = time.time()
-
- TaskTuple = namedtuple('TaskTuple', 'task_id age metadata_size log_size data_size')
- for task_id in self.collector.get_finished_tasks():
- age = Amount(int(now - self.collector.get_age(task_id)), Time.SECONDS)
- self.log('Analyzing task %s (age: %s)... ' % (task_id, age))
- metadata_size = Amount(sum(sz for _, sz in self.collector.get_metadata(task_id)), Data.BYTES)
- self.log(' metadata %.1fKB ' % metadata_size.as_(Data.KB))
- log_size = Amount(sum(sz for _, sz in self.collector.get_logs(task_id)), Data.BYTES)
- self.log(' logs %.1fKB ' % log_size.as_(Data.KB))
- data_size = Amount(sum(sz for _, sz in self.collector.get_data(task_id)), Data.BYTES)
- self.log(' data %.1fMB ' % data_size.as_(Data.MB))
- tasks.append(TaskTuple(task_id, age, metadata_size, log_size, data_size))
-
- gc_tasks = set()
- gc_tasks.update(task for task in tasks if task.age > self._max_age)
- self.log('After age filter: %s tasks' % len(gc_tasks))
-
- def total_gc_size(task):
- return sum([task.data_size,
- task.metadata_size if self._include_metadata else Amount(0, Data.BYTES),
- task.log_size if self._include_logs else Amount(0, Data.BYTES)],
- Amount(0, Data.BYTES))
-
- total_used = Amount(0, Data.BYTES)
- for task in sorted(tasks, key=lambda tsk: tsk.age, reverse=True):
- if task not in gc_tasks:
- total_used += total_gc_size(task)
- if total_used > self._max_space:
- gc_tasks.add(task)
- self.log('After size filter: %s tasks' % len(gc_tasks))
-
- for task in sorted(tasks, key=lambda tsk: tsk.age, reverse=True):
- if task not in gc_tasks and len(tasks) - len(gc_tasks) > self._max_tasks:
- gc_tasks.add(task)
- self.log('After total task filter: %s tasks' % len(gc_tasks))
-
- self.log('Deciding to garbage collect the following tasks:')
- if gc_tasks:
- for task in gc_tasks:
- self.log(' %s' % repr(task))
- else:
- self.log(' None.')
-
- return gc_tasks
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/twitter/thermos/monitoring/monitor.py
----------------------------------------------------------------------
diff --git a/src/main/python/twitter/thermos/monitoring/monitor.py b/src/main/python/twitter/thermos/monitoring/monitor.py
deleted file mode 100644
index 5af67fe..0000000
--- a/src/main/python/twitter/thermos/monitoring/monitor.py
+++ /dev/null
@@ -1,125 +0,0 @@
-"""Monitor the state of Thermos tasks on a system
-
-This module contains the TaskMonitor, used to reconstruct the state of active or finished Thermos
-tasks based on their checkpoint streams. It exposes two key pieces of information about a Task, both
-as their corresponding Thrift structs:
- - a RunnerState, representing the latest state of the Task
- - a list of ProcessStates, representing the processes currently running within the Task
-
-"""
-
-import os
-import copy
-import errno
-import threading
-
-from twitter.common import log
-from twitter.common.recordio import ThriftRecordReader
-from twitter.thermos.common.ckpt import CheckpointDispatcher
-
-from gen.twitter.thermos.ttypes import (
- ProcessState,
- RunnerCkpt,
- RunnerState,
- TaskState,
-)
-
-
-class TaskMonitor(object):
- """
- Class responsible for reconstructing and monitoring the state of an individual Thermos task via
- its runner checkpoint. Also exports information on active processes in the task.
- """
-
- def __init__(self, pathspec, task_id):
- self._task_id = task_id
- self._dispatcher = CheckpointDispatcher()
- self._runnerstate = RunnerState(processes={})
- self._runner_ckpt = pathspec.given(task_id=task_id).getpath('runner_checkpoint')
- self._active_file, self._finished_file = (
- pathspec.given(task_id=task_id, state=state).getpath('task_path')
- for state in ('active', 'finished'))
- self._ckpt_head = 0
- self._apply_states()
- self._lock = threading.Lock()
-
- def _apply_states(self):
- """
- os.stat() the corresponding checkpoint stream of this task and determine if there are new ckpt
- records. Attempt to read those records and update the high watermark for that stream.
- Returns True if new states were applied, False otherwise.
- """
- ckpt_offset = None
- try:
- ckpt_offset = os.stat(self._runner_ckpt).st_size
-
- updated = False
- if self._ckpt_head < ckpt_offset:
- with open(self._runner_ckpt, 'r') as fp:
- fp.seek(self._ckpt_head)
- rr = ThriftRecordReader(fp, RunnerCkpt)
- while True:
- runner_update = rr.try_read()
- if not runner_update:
- break
- try:
- self._dispatcher.dispatch(self._runnerstate, runner_update)
- except CheckpointDispatcher.InvalidSequenceNumber as e:
- log.error('Checkpoint stream is corrupt: %s' % e)
- break
- new_ckpt_head = fp.tell()
- updated = self._ckpt_head != new_ckpt_head
- self._ckpt_head = new_ckpt_head
- return updated
- except OSError as e:
- if e.errno == errno.ENOENT:
- # The log doesn't yet exist, will retry later.
- log.warning('Could not read from discovered task %s.' % self._task_id)
- return False
- else:
- raise
-
- def refresh(self):
- """
- Check to see if there are new updates and apply them. Return true if
- updates were applied, false otherwise.
- """
- with self._lock:
- return self._apply_states()
-
- def get_state(self):
- """
- Get the latest state of this Task.
- """
- with self._lock:
- self._apply_states()
- return copy.deepcopy(self._runnerstate)
-
- def task_state(self):
- state = self.get_state()
- return state.statuses[-1].state if state.statuses else TaskState.ACTIVE
-
- @property
- def active(self):
- return os.path.exists(self._active_file)
-
- @property
- def finished(self):
- return os.path.exists(self._finished_file)
-
- def get_active_processes(self):
- """
- Get active processes. Returned is a list of tuples of the form:
- (ProcessStatus object of running object, its run number)
- """
- active_processes = []
- with self._lock:
- self._apply_states()
- state = self._runnerstate
- for process, runs in state.processes.items():
- if len(runs) == 0:
- continue
- last_run = runs[-1]
- if last_run.state == ProcessState.RUNNING:
- active_processes.append((last_run, len(runs) - 1))
- return active_processes
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/twitter/thermos/monitoring/process.py
----------------------------------------------------------------------
diff --git a/src/main/python/twitter/thermos/monitoring/process.py b/src/main/python/twitter/thermos/monitoring/process.py
deleted file mode 100644
index 294682f..0000000
--- a/src/main/python/twitter/thermos/monitoring/process.py
+++ /dev/null
@@ -1,49 +0,0 @@
-"""Represent resource consumption statistics for processes
-
-This module exposes one class: the ProcessSample, used to represent resource consumption. A single
-ProcessSample might correspond to one individual process, or to an aggregate of multiple processes.
-
-"""
-
-from collections import namedtuple
-
-
-class ProcessSample(namedtuple('ProcessSample', 'rate user system rss vms nice status threads')):
- """ Sample of statistics about a process's resource consumption (either a single process or an
- aggregate of processes) """
-
- @staticmethod
- def empty():
- return ProcessSample(rate=0, user=0, system=0, rss=0, vms=0, nice=None, status=None, threads=0)
-
- def __add__(self, other):
- if self.nice is not None and other.nice is None:
- nice = self.nice
- else:
- nice = other.nice
- if self.status is not None and other.status is None:
- status = self.status
- else:
- status = other.status
- return ProcessSample(
- rate = self.rate + other.rate,
- user = self.user + other.user,
- system = self.system + other.system,
- rss = self.rss + other.rss,
- vms = self.vms + other.vms,
- nice = nice,
- status = status,
- threads = self.threads + other.threads)
-
- def to_dict(self):
- return dict(
- cpu = self.rate,
- ram = self.rss,
- user = self.user,
- system = self.system,
- rss = self.rss,
- vms = self.vms,
- nice = self.nice,
- status = str(self.status),
- threads = self.threads
- )
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/twitter/thermos/monitoring/process_collector_psutil.py
----------------------------------------------------------------------
diff --git a/src/main/python/twitter/thermos/monitoring/process_collector_psutil.py b/src/main/python/twitter/thermos/monitoring/process_collector_psutil.py
deleted file mode 100644
index 7b68173..0000000
--- a/src/main/python/twitter/thermos/monitoring/process_collector_psutil.py
+++ /dev/null
@@ -1,92 +0,0 @@
-""" Sample resource consumption statistics for processes using psutil """
-
-from operator import attrgetter
-from time import time
-
-from twitter.common import log
-
-from .process import ProcessSample
-
-from psutil import Process
-from psutil import NoSuchProcess, AccessDenied, Error as PsutilError
-
-
-def process_to_sample(process):
- """ Given a psutil.Process, return a current ProcessSample """
- try:
- # the nonblocking get_cpu_percent call is stateful on a particular Process object, and hence
- # >2 consecutive calls are required before it will return a non-zero value
- rate = process.get_cpu_percent(0.0) / 100.0
- user, system = process.get_cpu_times()
- rss, vms = process.get_memory_info()
- nice = process.nice
- status = process.status
- threads = process.get_num_threads()
- return ProcessSample(rate, user, system, rss, vms, nice, status, threads)
- except (AccessDenied, NoSuchProcess) as e:
- log.warning('Error during process sampling [pid=%s]: %s' % (process.pid, e))
- return ProcessSample.empty()
-
-
-class ProcessTreeCollector(object):
- """ Collect resource consumption statistics for a process and its children """
- def __init__(self, pid):
- """ Given a pid """
- self._pid = pid
- self._process = None # psutil.Process
- self._sampled_tree = {} # pid => ProcessSample
- self._sample = ProcessSample.empty()
- self._stamp = None
- self._rate = 0.0
- self._procs = 1
-
- def sample(self):
- """ Collate and aggregate ProcessSamples for process and children
- Returns None: result is stored in self.value
- """
- try:
- last_sample, last_stamp = self._sample, self._stamp
- if self._process is None:
- self._process = Process(self._pid)
- parent = self._process
- parent_sample = process_to_sample(parent)
- new_samples = dict(
- (proc.pid, process_to_sample(proc))
- for proc in parent.get_children(recursive=True)
- )
- new_samples[self._pid] = parent_sample
-
- except PsutilError as e:
- log.warning('Error during process sampling: %s' % e)
- self._sample = ProcessSample.empty()
- self._rate = 0.0
-
- else:
- last_stamp = self._stamp
- self._stamp = time()
- # for most stats, calculate simple sum to aggregate
- self._sample = sum(new_samples.values(), ProcessSample.empty())
- # cpu consumption is more complicated
- # We require at least 2 generations of a process before we can calculate rate, so for all
- # current processes that were not running in the previous sample, compare to an empty sample
- if self._sampled_tree and last_stamp:
- new = new_samples.values()
- old = [self._sampled_tree.get(pid, ProcessSample.empty()) for pid in new_samples.keys()]
- new_user_sys = sum(map(attrgetter('user'), new)) + sum(map(attrgetter('system'), new))
- old_user_sys = sum(map(attrgetter('user'), old)) + sum(map(attrgetter('system'), old))
- self._rate = (new_user_sys - old_user_sys) / (self._stamp - last_stamp)
- log.debug("Calculated rate for pid=%s and children: %s" % (self._process.pid, self._rate))
- self._sampled_tree = new_samples
-
- @property
- def value(self):
- """ Aggregated ProcessSample representing resource consumption of the tree """
- # Since we don't trust the CPU consumption returned by psutil, replace it with our own in the
- # exported ProcessSample
- return self._sample._replace(rate=self._rate)
-
- @property
- def procs(self):
- """ Number of active processes in the tree """
- return len(self._sampled_tree)
-
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/twitter/thermos/monitoring/resource.py
----------------------------------------------------------------------
diff --git a/src/main/python/twitter/thermos/monitoring/resource.py b/src/main/python/twitter/thermos/monitoring/resource.py
deleted file mode 100644
index 468f1b1..0000000
--- a/src/main/python/twitter/thermos/monitoring/resource.py
+++ /dev/null
@@ -1,222 +0,0 @@
-"""Monitor the resource consumption of Thermos tasks
-
-This module contains classes used to monitor the resource consumption (e.g. CPU, RAM, disk) of
-Thermos tasks. Resource monitoring of a Thermos task typically treats the task as an aggregate of
-all the processes within it. Importantly, this excludes the process(es) of Thermos itself (i.e. the
-TaskRunner and any other wrappers involved in launching a task).
-
-The ResourceMonitorBase defines the interface for other components (for example, the Thermos
-TaskObserver) to interact with and retrieve information about a Task's resource consumption. The
-canonical/reference implementation of a ResourceMonitor is the TaskResourceMonitor, a thread which
-actively monitors resources for a particular task by periodically polling process information and
-disk consumption and retaining a limited (FIFO) in-memory history of this data.
-
-"""
-
-from abc import abstractmethod
-from bisect import bisect_left
-from collections import namedtuple
-from operator import attrgetter
-import platform
-import threading
-import time
-
-from twitter.common import log
-from twitter.common.collections import RingBuffer
-from twitter.common.concurrent import EventMuxer
-from twitter.common.lang import Interface
-from twitter.common.quantity import Amount, Time
-
-from .disk import DiskCollector
-from .monitor import TaskMonitor
-from .process import ProcessSample
-from .process_collector_psutil import ProcessTreeCollector
-
-
-class ResourceMonitorBase(Interface):
- """ Defines the interface for interacting with a ResourceMonitor """
-
- class Error(Exception): pass
-
- class ResourceResult(namedtuple('ResourceResult', 'num_procs process_sample disk_usage')):
- pass
-
- @abstractmethod
- def sample(self):
- """ Return a sample of the resource consumption of the task right now
-
- Returns a tuple of (timestamp, ResourceResult)
- """
-
- @abstractmethod
- def sample_at(self, time):
- """ Return a sample of the resource consumption as close as possible to the specified time
-
- Returns a tuple of (timestamp, ResourceResult)
- """
-
- @abstractmethod
- def sample_by_process(self, process_name):
- """ Return a sample of the resource consumption of a specific process in the task right now
-
- Returns a ProcessSample
- """
-
-
-class ResourceHistory(object):
- """Simple class to contain a RingBuffer (fixed-length FIFO) history of resource samples, with the
- mapping: timestamp => (number_of_procs, ProcessSample, disk_usage_in_bytes)
- """
- def __init__(self, maxlen, initialize=True):
- if not maxlen >= 1:
- raise ValueError("maxlen must be greater than 0")
- self._maxlen = maxlen
- self._values = RingBuffer(maxlen, None)
- if initialize:
- self.add(time.time(), ResourceMonitorBase.ResourceResult(0, ProcessSample.empty(), 0))
-
- def add(self, timestamp, value):
- """Store a new resource sample corresponding to the given timestamp"""
- if self._values and not timestamp >= self._values[-1][0]:
- raise ValueError("Refusing to add timestamp in the past!")
- self._values.append((timestamp, value))
-
- def get(self, timestamp):
- """Get the resource sample nearest to the given timestamp"""
- closest = min(bisect_left(self._values, (timestamp, None)), len(self) - 1)
- return self._values[closest]
-
- def __iter__(self):
- return iter(self._values)
-
- def __len__(self):
- return len(self._values)
-
- def __repr__(self):
- return 'ResourceHistory(%s)' % ', '.join([str(r) for r in self._values])
-
-
-class TaskResourceMonitor(ResourceMonitorBase, threading.Thread):
- """ Lightweight thread to aggregate resource consumption for a task's constituent processes.
- Actual resource calculation is delegated to collectors; this class periodically polls the
- collectors and aggregates into a representation for the entire task. Also maintains a limited
- history of previous sample results.
- """
-
- MAX_HISTORY = 10000 # magic number
-
- def __init__(self, task_monitor, sandbox,
- process_collector=ProcessTreeCollector, disk_collector=DiskCollector,
- process_collection_interval=Amount(20, Time.SECONDS),
- disk_collection_interval=Amount(1, Time.MINUTES),
- history_time=Amount(1, Time.HOURS)):
- """
- task_monitor: TaskMonitor object specifying the task whose resources should be monitored
- sandbox: Directory for which to monitor disk utilisation
- """
- self._task_monitor = task_monitor # exposes PIDs, sandbox
- self._task_id = task_monitor._task_id
- log.debug('Initialising resource collection for task %s' % self._task_id)
- self._process_collectors = dict() # ProcessStatus => ProcessTreeCollector
- # TODO(jon): sandbox is also available through task_monitor, but typically the first checkpoint
- # isn't written (and hence the header is not available) by the time we initialise here
- self._sandbox = sandbox
- self._process_collector_factory = process_collector
- self._disk_collector = disk_collector(self._sandbox)
- self._process_collection_interval = process_collection_interval.as_(Time.SECONDS)
- self._disk_collection_interval = disk_collection_interval.as_(Time.SECONDS)
- min_collection_interval = min(self._process_collection_interval, self._disk_collection_interval)
- history_length = int(history_time.as_(Time.SECONDS) / min_collection_interval)
- if history_length > self.MAX_HISTORY:
- raise ValueError("Requested history length too large")
- log.debug("Initialising ResourceHistory of length %s" % history_length)
- self._history = ResourceHistory(history_length)
- self._kill_signal = threading.Event()
- threading.Thread.__init__(self)
- self.daemon = True
-
- def sample(self):
- if not self.is_alive():
- log.warning("TaskResourceMonitor not running - sample may be inaccurate")
- return self.sample_at(time.time())
-
- def sample_at(self, timestamp):
- return self._history.get(timestamp)
-
- def sample_by_process(self, process_name):
- try:
- process = [process for process in self._get_active_processes()
- if process.process == process_name].pop()
- except IndexError:
- raise ValueError('No active process found with name "%s" in this task' % process_name)
- else:
- # Since this might be called out of band (before the main loop is aware of the process)
- if process not in self._process_collectors:
- self._process_collectors[process] = self._process_collector_factory(process.pid)
-
- self._process_collectors[process].sample()
- return self._process_collectors[process].value
-
- def _get_active_processes(self):
- """Get a list of ProcessStatus objects representing currently-running processes in the task"""
- return [process for process, _ in self._task_monitor.get_active_processes()]
-
- def kill(self):
- """Signal that the thread should cease collecting resources and terminate"""
- self._kill_signal.set()
-
- def run(self):
- """Thread entrypoint. Loop indefinitely, polling collectors at self._collection_interval and
- collating samples."""
-
- log.debug('Commencing resource monitoring for task "%s"' % self._task_id)
- next_process_collection = 0
- next_disk_collection = 0
-
- while not self._kill_signal.is_set():
-
- now = time.time()
-
- if now > next_process_collection:
- next_process_collection = now + self._process_collection_interval
- actives = set(self._get_active_processes())
- current = set(self._process_collectors)
- for process in current - actives:
- log.debug('Process "%s" (pid %s) no longer active, removing from monitored processes' %
- (process.process, process.pid))
- self._process_collectors.pop(process)
- for process in actives - current:
- log.debug('Adding process "%s" (pid %s) to resource monitoring' %
- (process.process, process.pid))
- self._process_collectors[process] = self._process_collector_factory(process.pid)
- for process, collector in self._process_collectors.iteritems():
- log.debug('Collecting sample for process "%s" (pid %s) and children' %
- (process.process, process.pid))
- collector.sample()
-
- if now > next_disk_collection:
- next_disk_collection = now + self._disk_collection_interval
- log.debug('Collecting disk sample for %s' % self._sandbox)
- self._disk_collector.sample()
-
- try:
- aggregated_procs = sum(map(attrgetter('procs'), self._process_collectors.values()))
- aggregated_sample = sum(map(attrgetter('value'), self._process_collectors.values()),
- ProcessSample.empty())
- self._history.add(now, self.ResourceResult(aggregated_procs, aggregated_sample,
- self._disk_collector.value))
- log.debug("Recorded resource sample at %s" % now)
- except ValueError as err:
- log.warning("Error recording resource sample: %s" % err)
-
- # Sleep until any of the following conditions are met:
- # - it's time for the next disk collection
- # - it's time for the next process collection
- # - the result from the last disk collection is available via the DiskCollector
- # - the TaskResourceMonitor has been killed via self._kill_signal
- now = time.time()
- next_collection = min(next_process_collection - now, next_disk_collection - now)
- EventMuxer(self._kill_signal, self._disk_collector.completed_event
- ).wait(timeout=max(0, next_collection))
-
- log.debug('Stopping resource monitoring for task "%s"' % self._task_id)
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/twitter/thermos/observer/BUILD
----------------------------------------------------------------------
diff --git a/src/main/python/twitter/thermos/observer/BUILD b/src/main/python/twitter/thermos/observer/BUILD
deleted file mode 100644
index 00ea9cb..0000000
--- a/src/main/python/twitter/thermos/observer/BUILD
+++ /dev/null
@@ -1,51 +0,0 @@
-import os
-
-python_library(
- name = 'observed_task',
- sources = ['observed_task.py'],
- dependencies = [
- pants('aurora/twitterdeps/src/python/twitter/common/lang'),
- pants('aurora/twitterdeps/src/python/twitter/common/log'),
- pants('src/main/python/twitter/thermos:pystachio'),
- pants('src/main/python/twitter/thermos/common:ckpt'),
- pants('src/main/python/twitter/thermos/config'),
- ]
-)
-
-python_library(
- name = 'task_observer',
- sources = ['task_observer.py'],
- dependencies = [
- pants(':observed_task'),
- pants('aurora/twitterdeps/src/python/twitter/common/exceptions'),
- pants('aurora/twitterdeps/src/python/twitter/common/lang'),
- pants('aurora/twitterdeps/src/python/twitter/common/log'),
- pants('aurora/twitterdeps/src/python/twitter/common/quantity'),
- pants('src/main/python/twitter/thermos/common:path'),
- pants('src/main/python/twitter/thermos/monitoring:detector'),
- pants('src/main/python/twitter/thermos/monitoring:monitor'),
- pants('src/main/python/twitter/thermos/monitoring:process'),
- pants('src/main/python/twitter/thermos/monitoring:resource'),
- pants('src/main/thrift/com/twitter/thermos:py-thrift'),
- ]
-)
-
-python_library(
- name = 'observer',
- dependencies = [
- pants(':task_observer'),
- pants('src/main/python/twitter/thermos/observer/http:http_observer'),
-
- # covering libraries
- pants('src/main/python/twitter/thermos/common'),
- pants('src/main/python/twitter/thermos/config'),
- pants('src/main/python/twitter/thermos/monitoring'),
- ],
- provides = setup_py(
- name = 'twitter.thermos.observer',
- version = open(os.path.join(get_buildroot(), '.auroraversion')).read().strip().lower(),
- description = 'The Thermos observer web interface.',
- ).with_binaries(
- thermos_observer = pants('src/main/python/twitter/thermos/observer/bin:thermos_observer'),
- )
-)
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/twitter/thermos/observer/__init__.py
----------------------------------------------------------------------
diff --git a/src/main/python/twitter/thermos/observer/__init__.py b/src/main/python/twitter/thermos/observer/__init__.py
deleted file mode 100644
index e69de29..0000000
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/twitter/thermos/observer/bin/BUILD
----------------------------------------------------------------------
diff --git a/src/main/python/twitter/thermos/observer/bin/BUILD b/src/main/python/twitter/thermos/observer/bin/BUILD
deleted file mode 100644
index 26eb148..0000000
--- a/src/main/python/twitter/thermos/observer/bin/BUILD
+++ /dev/null
@@ -1,14 +0,0 @@
-python_binary(
- name = 'thermos_observer',
- source = 'thermos_observer.py',
- entry_point = 'twitter.thermos.observer.bin.thermos_observer:proxy_main',
- dependencies = [
- pants('aurora/twitterdeps/src/python/twitter/common/app'),
- pants('aurora/twitterdeps/src/python/twitter/common/exceptions'),
- pants('aurora/twitterdeps/src/python/twitter/common/http'),
- pants('src/main/python/twitter/thermos:cherrypy'),
- pants('src/main/python/twitter/thermos/common:path'),
- pants('src/main/python/twitter/thermos/observer/http:http_observer'),
- pants('src/main/python/twitter/thermos/observer:task_observer'),
- ],
-)
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/twitter/thermos/observer/bin/__init__.py
----------------------------------------------------------------------
diff --git a/src/main/python/twitter/thermos/observer/bin/__init__.py b/src/main/python/twitter/thermos/observer/bin/__init__.py
deleted file mode 100644
index e69de29..0000000
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/twitter/thermos/observer/bin/thermos_observer.py
----------------------------------------------------------------------
diff --git a/src/main/python/twitter/thermos/observer/bin/thermos_observer.py b/src/main/python/twitter/thermos/observer/bin/thermos_observer.py
deleted file mode 100644
index 9e7f7a0..0000000
--- a/src/main/python/twitter/thermos/observer/bin/thermos_observer.py
+++ /dev/null
@@ -1,55 +0,0 @@
-from __future__ import print_function
-
-import socket
-import sys
-import time
-
-from twitter.common import app
-from twitter.common.exceptions import ExceptionalThread
-from twitter.common.http import HttpServer
-from twitter.common.http.diagnostics import DiagnosticsEndpoints
-from twitter.thermos.common.path import TaskPath
-from twitter.thermos.observer.task_observer import TaskObserver
-from twitter.thermos.observer.http.http_observer import BottleObserver
-
-
-app.add_option("--root",
- dest="root",
- metavar="DIR",
- default=TaskPath.DEFAULT_CHECKPOINT_ROOT,
- help="root checkpoint directory for thermos task runners")
-
-
-app.add_option("--port",
- dest="port",
- metavar="INT",
- default=1338,
- help="port number to listen on.")
-
-
-def proxy_main():
- def main(args, opts):
- if args:
- print("ERROR: unrecognized arguments: %s\n" % (" ".join(args)), file=sys.stderr)
- app.help()
- sys.exit(1)
-
- root_server = HttpServer()
- root_server.mount_routes(DiagnosticsEndpoints())
-
- task_observer = TaskObserver(opts.root)
- task_observer.start()
-
- bottle_wrapper = BottleObserver(task_observer)
-
- root_server.mount_routes(bottle_wrapper)
-
- def run():
- root_server.run('0.0.0.0', opts.port, 'cherrypy')
-
- et = ExceptionalThread(target=run)
- et.daemon = True
- et.start()
- et.join()
-
- app.main()
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/twitter/thermos/observer/http/BUILD
----------------------------------------------------------------------
diff --git a/src/main/python/twitter/thermos/observer/http/BUILD b/src/main/python/twitter/thermos/observer/http/BUILD
deleted file mode 100644
index 9f3d587..0000000
--- a/src/main/python/twitter/thermos/observer/http/BUILD
+++ /dev/null
@@ -1,47 +0,0 @@
-python_library(
- name = 'json',
- sources = ['json.py'],
- dependencies = [
- pants('aurora/twitterdeps/src/python/twitter/common/http'),
- ]
-)
-
-python_library(
- name = 'static_assets',
- sources = ['static_assets.py'],
- resources = rglobs('assets/*'),
- dependencies = [
- pants('src/main/python/twitter/thermos:bottle'),
- ]
-)
-
-python_library(
- name = 'templating',
- sources = ['templating.py'],
- resources = globs('templates/*.tpl'),
-)
-
-python_library(
- name = 'file_browser',
- sources = ['file_browser.py'],
- dependencies = [
- pants(':templating'),
- pants('aurora/twitterdeps/src/python/twitter/common/log'),
- pants('aurora/twitterdeps/src/python/twitter/common/http'),
- pants('src/main/python/twitter/thermos:bottle'),
- pants('src/main/python/twitter/thermos:mako'),
- ]
-)
-
-python_library(
- name = 'http_observer',
- sources = ['__init__.py', 'http_observer.py'],
- dependencies = [
- pants(':file_browser'),
- pants(':json'),
- pants(':static_assets'),
- pants(':templating'),
- pants('aurora/twitterdeps/src/python/twitter/common/log'),
- pants('aurora/twitterdeps/src/python/twitter/common/http'),
- ]
-)
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/twitter/thermos/observer/http/__init__.py
----------------------------------------------------------------------
diff --git a/src/main/python/twitter/thermos/observer/http/__init__.py b/src/main/python/twitter/thermos/observer/http/__init__.py
deleted file mode 100644
index e69de29..0000000