You are viewing a plain text version of this content. The canonical link for it is here.
Posted to by on 2013/12/31 22:20:15 UTC

[22/51] [partial] Rename twitter* and com.twitter to apache and org.apache directories to preserve all file history before the refactor.
diff --git a/src/main/python/apache/thermos/common/ b/src/main/python/apache/thermos/common/
new file mode 100644
index 0000000..28e07ac
--- /dev/null
+++ b/src/main/python/apache/thermos/common/
@@ -0,0 +1,101 @@
+import os
+class TaskPath(object):
+  """
+    Handle the resolution / detection of the path structure for thermos tasks.
+    This is used by the runner to determine where it should be dumping checkpoints and writing
+    stderr/stdout, and by the observer to determine how to detect the running tasks on the system.
+    Examples:
+      pathspec = TaskPath(root = "/var/run/thermos")
+                           ^ substitution dictionary for DIR_TEMPLATE
+                                           which template to acquire
+                                                    v
+      pathspec.given(task_id = "12345-thermos-wickman-23", state='active').getpath("task_path")
+                         ^
+            further substitutions DIR_TEMPLATE
+    As a detection mechanism:
+      path_glob = pathspec.given(task_id = "*").getpath(task_type)
+      matching_paths = glob.glob(path_glob)
+      path_re = pathspec.given(task_id = "(\S+)").getpath(task_type)
+      path_re = re.compile(path_re)
+      ids = []
+      for path in matching_paths:
+        matched_blobs = path_re.match(path).groups()
+        ids.append(int(matched_blobs[0]))
+      return ids
+  """
+  class UnknownPath(Exception): pass
+  class UnderspecifiedPath(Exception): pass
+  DEFAULT_CHECKPOINT_ROOT = "/var/run/thermos"
+  KNOWN_KEYS = [ 'root', 'task_id', 'state', 'process', 'run', 'log_dir' ]
+            'task_path': ['%(root)s',       'tasks',   '%(state)s', '%(task_id)s'],
+      'checkpoint_path': ['%(root)s', 'checkpoints', '%(task_id)s'],
+    'runner_checkpoint': ['%(root)s', 'checkpoints', '%(task_id)s', 'runner'],
+   'process_checkpoint': ['%(root)s', 'checkpoints', '%(task_id)s', 'coordinator.%(process)s'],
+      'process_logbase': ['%(log_dir)s'],
+       'process_logdir': ['%(log_dir)s', '%(process)s', '%(run)s']
+  }
+      process_logbase = ['%(root)s', 'logs', '%(task_id)s'],
+      process_logdir  = ['%(root)s', 'logs', '%(task_id)s', '%(process)s', '%(run)s']
+  )
+  def __init__(self, **kw):
+    self._filename = None
+    # initialize with self-interpolating values
+    if kw.get('root') is None:
+      kw['root'] = self.DEFAULT_CHECKPOINT_ROOT
+    # Before log_dir was added explicitly to RunnerHeader, it resolved to %(root)s/logs
+    if kw.get('log_dir'):
+      self._template, keys = self.DIR_TEMPLATE, self.KNOWN_KEYS
+    else:
+      self._template, keys = self.LEGACY_DIR_TEMPLATE, self.LEGACY_KNOWN_KEYS
+    self._data = dict((key, '%%(%s)s' % key) for key in keys)
+    self._data.update(kw)
+  def given(self, **kw):
+    """ Perform further interpolation of the templates given the kwargs """
+    eval_dict = dict(self._data) # copy
+    eval_dict.update(kw)
+    tp = TaskPath(**eval_dict)
+    tp._filename = self._filename
+    return tp
+  def with_filename(self, filename):
+    """ Return a TaskPath with the specific filename appended to the end of the path """
+    wp = TaskPath(**self._data)
+    wp._filename = filename
+    return wp
+  def getpath(self, pathname):
+    if pathname not in self._template:
+      raise self.UnknownPath("Internal error, unknown id: %s" % pathname)
+    path = self._template[pathname][:]
+    if self._filename:
+      path += [self._filename]
+    path = os.path.join(*path)
+    interpolated_path = path % self._data
+    try:
+      _ = interpolated_path % {}
+    except KeyError:
+      raise TaskPath.UnderspecifiedPath(
+        "Tried to interpolate path with insufficient variables: %s as %s" % (
+        pathname, interpolated_path))
+    return interpolated_path
diff --git a/src/main/python/apache/thermos/common/ b/src/main/python/apache/thermos/common/
new file mode 100644
index 0000000..a886dad
--- /dev/null
+++ b/src/main/python/apache/thermos/common/
@@ -0,0 +1,304 @@
+"""Planners to schedule processes within Tasks.
+  - a daemon process can depend upon a regular process
+  - a regular process cannot depend upon a daemon process
+  - a non-ephemeral process cannot depend upon an ephemeral process
+from collections import defaultdict, namedtuple
+import copy
+from functools import partial
+import sys
+import time
+class Planner(object):
+  """
+    Given a set of process names and a graph of dependencies between them, determine
+    what can run predicated upon process completions.
+  """
+  class InvalidSchedule(Exception): pass
+  @staticmethod
+  def filter_runnable(processes, dependencies):
+    return set(process for process in processes if not dependencies.get(process))
+  @staticmethod
+  def filter_dependencies(dependencies, given=frozenset()):
+    """
+      Provided a map of process => list of process :dependencies, and a set of satisfied
+      prior processes in :given, return the new map of dependencies with priors removed.
+    """
+    dependencies = copy.deepcopy(dependencies)
+    for process_set in dependencies.values():
+      process_set -= given
+    return dependencies
+  @staticmethod
+  def satisfiable(processes, dependencies):
+    """
+      Given a set of processes and a dependency map, determine if this is a consistent
+      schedule without cycles.
+    """
+    processes = copy.copy(processes)
+    dependencies = copy.deepcopy(dependencies)
+    scheduling = True
+    while scheduling:
+      scheduling = False
+      runnables = Planner.filter_runnable(processes, dependencies)
+      if runnables:
+        scheduling = True
+        processes -= runnables
+      dependencies = Planner.filter_dependencies(dependencies, given=runnables)
+    return len(processes) == 0
+  def __init__(self, processes, dependencies):
+    self._processes = set(processes)
+    self._dependencies = dict((process, set(dependencies.get(process, [])))
+        for process in self._processes)
+    if not Planner.satisfiable(self._processes, self._dependencies):
+      raise Planner.InvalidSchedule("Cycles detected in the task schedule!")
+    self._running = set()
+    self._finished = set()
+    self._failed = set()
+  @property
+  def runnable(self):
+    return Planner.filter_runnable(self._processes - self._running - self._finished - self._failed,
+      Planner.filter_dependencies(self._dependencies, given=self._finished))
+  @property
+  def processes(self):
+    return set(self._processes)
+  @property
+  def running(self):
+    return set(self._running)
+  @property
+  def finished(self):
+    return set(self._finished)
+  @property
+  def failed(self):
+    return set(self._failed)
+  def reset(self, process):
+    assert process in self._running
+    assert process not in self._finished
+    assert process not in self._failed
+    self._running.discard(process)
+  def set_running(self, process):
+    assert process not in self._failed
+    assert process not in self._finished
+    assert process in self._running or process in self.runnable
+    self._running.add(process)
+  def set_finished(self, process):
+    assert process in self._running
+    assert process not in self._failed
+    self._running.discard(process)
+    self._finished.add(process)
+  def set_failed(self, process):
+    assert process in self._running
+    assert process not in self._finished
+    self._running.discard(process)
+    self._failed.add(process)
+  def is_complete(self):
+    return self._finished.union(self._failed) == self._processes
+TaskAttributes = namedtuple('TaskAttributes', 'min_duration is_daemon max_failures is_ephemeral')
+class TaskPlanner(object):
+  """
+    A planner for the processes part of a Thermos task, taking into account ephemeral and daemon
+    bits, in addition to duration restrictions [and failure limits?].
+                               is_daemon
+         .----------------------------------------------------.
+         |                                                    |
+         |    clock gate      .----------------------.        |
+         |  .---------------> | runnable && !waiting |        |
+         v  |                 `----------------------'        |
+       .----------.                       |                   |
+       | runnable |                       | set_running       |
+       `----------'                       v                   |
+            ^        forget          .---------.              |  !is_daemon  .----------.
+            `------------------------| running |--------------+------------> | finished |
+            ^                        `---------' add_success                 `----------'
+            |                             |
+            |     under failure limit     | add_failure
+            `-----------------------------+
+                                          | past failure limit
+                                          v
+                                      .--------.
+                                      | failed |
+                                      `--------'
+  """
+  InvalidSchedule = Planner.InvalidSchedule
+  INFINITY = sys.float_info.max
+  TOTAL_RUN_LIMIT = sys.maxsize
+  @staticmethod
+  def extract_dependencies(task, process_filter=None):
+    """
+      Construct a set of processes and the process dependencies from a Thermos Task.
+    """
+    process_map = dict((, process)
+                        for process in filter(process_filter, task.processes()))
+    processes = set(process_map)
+    dependencies = defaultdict(set)
+    if task.has_constraints():
+      for constraint in task.constraints():
+        # handle process orders
+        process_names = constraint.order().get()
+        process_name_set = set(process_names)
+        # either all process_names must be in processes or none should be
+        if process_name_set.issubset(processes) == process_name_set.isdisjoint(processes):
+          raise TaskPlanner.InvalidSchedule('Invalid process dependencies!')
+        if not process_name_set.issubset(processes):
+          continue
+        for k in range(1, len(process_names)):
+          pnk, pnk1 = process_names[k], process_names[k-1]
+          if process_map[pnk1].daemon().get():
+            raise TaskPlanner.InvalidSchedule(
+              'Process %s may not depend upon daemon process %s' % (pnk, pnk1))
+          if not process_map[pnk].ephemeral().get() and process_map[pnk1].ephemeral().get():
+            raise TaskPlanner.InvalidSchedule(
+              'Non-ephemeral process %s may not depend upon ephemeral process %s' % (pnk, pnk1))
+          dependencies[pnk].add(pnk1)
+    return (processes, dependencies)
+  def __init__(self, task, clock=time, process_filter=None):
+    self._filter = process_filter
+    assert self._filter is None or callable(self._filter), (
+        'TaskPlanner must be given callable process filter.')
+    self._planner = Planner(*self.extract_dependencies(task, self._filter))
+    self._clock = clock
+    self._last_terminal = {} # process => timestamp of last terminal state
+    self._failures = defaultdict(int)
+    self._successes = defaultdict(int)
+    self._attributes = {}
+    self._ephemerals = set( for process in task.processes()
+        if (self._filter is None or self._filter(process)) and process.ephemeral().get())
+    for process in filter(self._filter, task.processes()):
+      self._attributes[] = TaskAttributes(
+        is_daemon=bool(process.daemon().get()),
+        is_ephemeral=bool(process.ephemeral().get()),
+        max_failures=process.max_failures().get(),
+        min_duration=process.min_duration().get())
+  def get_wait(self, process, timestamp=None):
+    now = timestamp if timestamp is not None else self._clock.time()
+    if process not in self._last_terminal:
+      return 0
+    return self._attributes[process].min_duration - (now - self._last_terminal[process])
+  def is_ready(self, process, timestamp=None):
+    return self.get_wait(process, timestamp) <= 0
+  def is_waiting(self, process, timestamp=None):
+    return not self.is_ready(process, timestamp)
+  @property
+  def runnable(self):
+    """A list of processes that are runnable w/o duration restrictions."""
+    return self.runnable_at(self._clock.time())
+  @property
+  def waiting(self):
+    """A list of processes that are waiting w/o duration restrictions."""
+    return self.waiting_at(self._clock.time())
+  def runnable_at(self, timestamp):
+    return set(filter(partial(self.is_ready, timestamp=timestamp), self._planner.runnable))
+  def waiting_at(self, timestamp):
+    return set(filter(partial(self.is_waiting, timestamp=timestamp), self._planner.runnable))
+  def min_wait(self, timestamp=None):
+    """Return the current wait time for the next process to become runnable, 0 if something is ready
+       immediately, or sys.float.max if there are no waiters."""
+    if self.runnable_at(timestamp if timestamp is not None else self._clock.time()):
+      return 0
+    waits = [self.get_wait(waiter, timestamp) for waiter in self.waiting_at(timestamp)]
+    return min(waits) if waits else self.INFINITY
+  def set_running(self, process):
+    self._planner.set_running(process)
+  def add_failure(self, process, timestamp=None):
+    """Increment the failure count of a process, and reset it to runnable if maximum number of
+    failures has not been reached, or mark it as failed otherwise (ephemeral processes do not
+    count towards the success of a task, and are hence marked finished instead)"""
+    timestamp = timestamp if timestamp is not None else self._clock.time()
+    self._last_terminal[process] = timestamp
+    self._failures[process] += 1
+    self.failure_transition(process)
+  def has_reached_run_limit(self, process):
+    return (self._successes[process] + self._failures[process]) >= self.TOTAL_RUN_LIMIT
+  def failure_transition(self, process):
+    if self.has_reached_run_limit(process):
+      self._planner.set_failed(process)
+      return
+    if self._attributes[process].max_failures == 0 or (
+        self._failures[process] < self._attributes[process].max_failures):
+      self._planner.reset(process)
+    elif self._attributes[process].is_ephemeral:
+      self._planner.set_finished(process)
+    else:
+      self._planner.set_failed(process)
+  def add_success(self, process, timestamp=None):
+    """Reset a process to runnable if it is a daemon, or mark it as finished otherwise."""
+    timestamp = timestamp if timestamp is not None else self._clock.time()
+    self._last_terminal[process] = timestamp
+    self._successes[process] += 1
+    self.success_transition(process)
+  def success_transition(self, process):
+    if self.has_reached_run_limit(process):
+      self._planner.set_failed(process)
+      return
+    if not self._attributes[process].is_daemon:
+      self._planner.set_finished(process)
+    else:
+      self._planner.reset(process)
+  def set_failed(self, process):
+    """Force a process to be in failed state.  E.g. kill -9 and you want it pinned failed."""
+    self._planner.set_failed(process)
+  def lost(self, process):
+    """Mark a process as lost.  This sets its runnable state back to the previous runnable
+       state and does not increment its failure count."""
+    self._planner.reset(process)
+  def is_complete(self):
+    """A task is complete if all ordinary processes are finished or failed (there may still be
+       running ephemeral processes)"""
+    terminals = self._planner.finished.union(self._planner.failed).union(self._ephemerals)
+    return self._planner.processes == terminals
+  # TODO(wickman) Should we consider subclassing again?
+  @property
+  def failed(self):
+    return self._planner.failed
+  @property
+  def running(self):
+    return self._planner.running
+  @property
+  def finished(self):
+    return self._planner.finished
diff --git a/src/main/python/apache/thermos/config/BUILD b/src/main/python/apache/thermos/config/BUILD
new file mode 100644
index 0000000..57e7f0d
--- /dev/null
+++ b/src/main/python/apache/thermos/config/BUILD
@@ -0,0 +1,24 @@
+import os
+  name = 'schema',
+  sources = globs('*.py'),
+  dependencies = [
+    pants('aurora/twitterdeps/src/python/twitter/common/lang'),
+    pants('src/main/python/twitter/thermos:pystachio'),
+    pants('src/main/python/twitter/thermos/common:planner')
+  ],
+  name = 'config',
+  dependencies = [
+    pants(':schema'),
+    pants('src/main/python/twitter/thermos/common'),  # cover common:planner
+  ],
+  provides = setup_py(
+    name = 'twitter.thermos.config',
+    version = open(os.path.join(get_buildroot(), '.auroraversion')).read().strip().lower(),
+    description = 'Thermos configuration schema and loader.',
+  )
diff --git a/src/main/python/apache/thermos/config/ b/src/main/python/apache/thermos/config/
new file mode 100644
index 0000000..e69de29
diff --git a/src/main/python/apache/thermos/config/bin/ b/src/main/python/apache/thermos/config/bin/
new file mode 100644
index 0000000..c0680ec
--- /dev/null
+++ b/src/main/python/apache/thermos/config/bin/
@@ -0,0 +1,47 @@
+import copy
+import sys
+import json
+import pprint
+from twitter.common import app
+from twitter.thermos.config.loader import ThermosConfigLoader
+def main(args):
+  """
+    Given .thermos configs, loads them and prints out information about them.
+  """
+  if len(args) == 0:
+  for arg in args:
+    print '\nparsing %s\n' % arg
+    tc = ThermosConfigLoader.load(arg)
+    for task_wrapper in tc.tasks():
+      task = task_wrapper.task
+      if not task.has_name():
+        print 'Found unnamed task!  Skipping...'
+        continue
+      print 'Task: %s [check: %s]' % (, task.check())
+      if not task.processes():
+        print '  No processes.'
+      else:
+        print '  Processes:'
+        for proc in task.processes():
+          print '    %s' % proc
+      ports = task_wrapper.ports()
+      if not ports:
+        print '  No unbound ports.'
+      else:
+        print '  Ports:'
+        for port in ports:
+          print '    %s' % port
+      print 'raw:'
+      pprint.pprint(json.loads(task_wrapper.to_json()))
+app.set_usage("%s config1 config2 ..." %
diff --git a/src/main/python/apache/thermos/config/bin/ b/src/main/python/apache/thermos/config/bin/
new file mode 100644
index 0000000..4a5c797
--- /dev/null
+++ b/src/main/python/apache/thermos/config/bin/
@@ -0,0 +1,3 @@
+from twitter.thermos.config.schema import *
+from code import interact
+interact('Thermos Config REPL', local=locals())
diff --git a/src/main/python/apache/thermos/config/ b/src/main/python/apache/thermos/config/
new file mode 100644
index 0000000..e69de29
diff --git a/src/main/python/apache/thermos/config/ b/src/main/python/apache/thermos/config/
new file mode 100644
index 0000000..54721c3
--- /dev/null
+++ b/src/main/python/apache/thermos/config/
@@ -0,0 +1,179 @@
+import copy
+import json
+import os
+import re
+import textwrap
+from twitter.common.dirutil import safe_open
+from twitter.common.lang import Compatibility
+from twitter.thermos.common.planner import TaskPlanner
+from twitter.thermos.config.schema import Task
+from pystachio import Ref
+from pystachio.config import Config
+class PortExtractor(object):
+  class InvalidPorts(Exception): pass
+  @staticmethod
+  def extract(obj):
+    port_scope = Ref.from_address('thermos.ports')
+    _, uninterp = obj.interpolate()
+    ports = []
+    for ref in uninterp:
+      subscope = port_scope.scoped_to(ref)
+      if subscope is not None:
+        if not subscope.is_index():
+          raise PortExtractor.InvalidPorts(
+            'Bad port specification "%s" (should be of form "thermos.ports[name]"' % ref.address())
+        ports.append(subscope.action().value)
+    return ports
+class ThermosProcessWrapper(object):
+  # >=1 characters && anything but NULL and '/'
+  VALID_PROCESS_NAME_RE = re.compile(r'^[^./][^/]*$')
+  class InvalidProcess(Exception): pass
+  def __init__(self, process):
+    self._process = process
+  def ports(self):
+    try:
+      return PortExtractor.extract(self._process)
+    except PortExtractor.InvalidPorts:
+      raise self.InvalidProcess('Process has invalid ports scoping!')
+  @staticmethod
+  def assert_valid_process_name(name):
+    if not ThermosProcessWrapper.VALID_PROCESS_NAME_RE.match(name):
+      raise ThermosProcessWrapper.InvalidProcess('Invalid process name: %s' % name)
+class ThermosTaskWrapper(object):
+  class InvalidTask(Exception): pass
+  def __init__(self, task, bindings=None, strict=True):
+    if bindings:
+      task = task.bind(*bindings)
+    if not task.check().ok() and strict:
+      raise ThermosTaskWrapper.InvalidTask(task.check().message())
+    self._task = task
+  @property
+  def task(self):
+    return self._task
+  def ports(self):
+    ti, _ = self._task.interpolate()
+    ports = set()
+    if ti.has_processes():
+      for process in ti.processes():
+        try:
+          ports.update(ThermosProcessWrapper(process).ports())
+        except ThermosProcessWrapper.InvalidProcess:
+          raise self.InvalidTask('Task has invalid process: %s' % process)
+    return ports
+  def to_json(self):
+    return json.dumps(self._task.get())
+  def to_file(self, filename):
+    ti, _ = self._task.interpolate()
+    with safe_open(filename, 'w') as fp:
+      json.dump(ti.get(), fp)
+  @staticmethod
+  def from_file(filename, **kw):
+    try:
+      with safe_open(filename) as fp:
+        task = Task.json_load(fp)
+      return ThermosTaskWrapper(task, **kw)
+    except Exception as e:
+      return None
+# TODO(wickman) These should be validators pushed onto ThermosConfigLoader.plugins
+class ThermosTaskValidator(object):
+  class InvalidTaskError(Exception): pass
+  @classmethod
+  def assert_valid_task(cls, task):
+    cls.assert_valid_names(task)
+    cls.assert_typecheck(task)
+    cls.assert_valid_plan(task)
+  @classmethod
+  def assert_valid_plan(cls, task):
+    try:
+      TaskPlanner(task, process_filter=lambda proc: == False)
+      TaskPlanner(task, process_filter=lambda proc: == True)
+    except TaskPlanner.InvalidSchedule as e:
+      raise cls.InvalidTaskError('Task has invalid plan: %s' % e)
+  @classmethod
+  def assert_valid_names(cls, task):
+    for process in task.processes():
+      name =
+      try:
+        ThermosProcessWrapper.assert_valid_process_name(name)
+      except ThermosProcessWrapper.InvalidProcess as e:
+        raise cls.InvalidTaskError('Task has invalid process: %s' % e)
+  @classmethod
+  def assert_typecheck(cls, task):
+    typecheck = task.check()
+    if not typecheck.ok():
+      raise cls.InvalidTaskError('Failed to fully evaluate task: %s' %
+        typecheck.message())
+  @classmethod
+  def assert_valid_ports(cls, task, portmap):
+    for port in ThermosTaskWrapper(task).ports():
+      if port not in portmap:
+        raise cls.InvalidTaskError('Task requires unbound port %s!' % port)
+  @classmethod
+  def assert_same_task(cls, spec, task):
+    active_task = spec.given(state='active').getpath('task_path')
+    if os.path.exists(active_task):
+      task_on_disk = ThermosTaskWrapper.from_file(active_task)
+      if not task_on_disk or task_on_disk.task != task:
+        raise cls.InvalidTaskError('Task differs from on disk copy: %r vs %r' % (
+            task_on_disk.task if task_on_disk else None, task))
+class ThermosConfigLoader(object):
+  SCHEMA = textwrap.dedent("""
+    from pystachio import *
+    from twitter.thermos.config.schema import *
+    __TASKS = []
+    def export(task):
+      __TASKS.append(Task(task) if isinstance(task, dict) else task)
+  """)
+  @classmethod
+  def load(cls, loadable, **kw):
+    config = Config(loadable, schema=cls.SCHEMA)
+    return cls(ThermosTaskWrapper(task, **kw) for task in config.environment['__TASKS'])
+  @classmethod
+  def load_json(cls, filename, **kw):
+    tc = cls()
+    task = ThermosTaskWrapper.from_file(filename, **kw)
+    if task:
+      ThermosTaskValidator.assert_valid_task(task.task())
+      tc.add_task(task)
+    return tc
+  def __init__(self, exported_tasks=None):
+    self._exported_tasks = exported_tasks or []
+  def add_task(self, task):
+    self._exported_tasks.append(task)
+  def tasks(self):
+    return self._exported_tasks
diff --git a/src/main/python/apache/thermos/config/ b/src/main/python/apache/thermos/config/
new file mode 100644
index 0000000..2ee38a1
--- /dev/null
+++ b/src/main/python/apache/thermos/config/
@@ -0,0 +1,2 @@
+from .schema_base import *
+from .schema_helpers import *
diff --git a/src/main/python/apache/thermos/config/ b/src/main/python/apache/thermos/config/
new file mode 100644
index 0000000..a47cfdf
--- /dev/null
+++ b/src/main/python/apache/thermos/config/
@@ -0,0 +1,75 @@
+from pystachio import (
+  Boolean,
+  Default,
+  Empty,
+  Float,
+  Integer,
+  List,
+  Map,
+  Required,
+  String,
+  Struct
+# Define constants for resources
+BYTES = 1
+KB = 1024 * BYTES
+MB = 1024 * KB
+GB = 1024 * MB
+TB = 1024 * GB
+class ThermosContext(Struct):
+  # TODO(wickman) Move the underlying replacement mechanism to %port% replacements
+  ports   = Map(String, Integer)
+  # TODO(wickman) Move the underlying replacement mechanism to %task_id%
+  task_id = String
+  # TODO(wickman) Move underlying mechanism to %user%
+  user    = String
+class Resources(Struct):
+  cpu  = Required(Float)
+  ram  = Required(Integer)
+  disk = Required(Integer)
+class Constraint(Struct):
+  order = List(String)
+class Process(Struct):
+  cmdline = Required(String)
+  name    = Required(String)
+  # This is currently unused but reserved for future use by Thermos.
+  resources     = Resources
+  # optionals
+  max_failures  = Default(Integer, 1)      # maximum number of failed process runs
+                                           # before process is failed.
+  daemon        = Default(Boolean, False)
+  ephemeral     = Default(Boolean, False)
+  min_duration  = Default(Integer, 5)      # integer seconds
+  final         = Default(Boolean, False)  # if this process should be a finalizing process
+                                           # that should always be run after regular processes
+class Task(Struct):
+  name = Default(String, '{{processes[0].name}}')
+  processes = List(Process)
+  # optionals
+  constraints = Default(List(Constraint), [])
+  resources = Resources
+  max_failures = Default(Integer, 1)        # maximum number of failed processes before task is failed.
+  max_concurrency = Default(Integer, 0)     # 0 is infinite concurrency.
+                                            # > 0 is max concurrent processes.
+  finalization_wait = Default(Integer, 30)  # the amount of time in seconds we allocate to run the
+                                            # finalization schedule.
+  # TODO(jon): remove/replace with proper solution to MESOS-3546
+  user = String
diff --git a/src/main/python/apache/thermos/config/ b/src/main/python/apache/thermos/config/
new file mode 100644
index 0000000..502de5a
--- /dev/null
+++ b/src/main/python/apache/thermos/config/
@@ -0,0 +1,247 @@
+"""Helpers for composing Thermos workflows."""
+import itertools
+from twitter.common.lang import Compatibility
+from .schema_base import (
+   Constraint,
+   GB,
+   Process,
+   Resources,
+   Task,
+from pystachio import Empty, List
+__all__ = (
+  # shorthand for process ordering constraint
+  'order',
+  # task combinators
+  'combine_tasks',    # merge N tasks in parallel
+  'concat_tasks',     # serialize N tasks
+  # options helpers
+  'java_options',
+  'python_options',
+  # the automatically-sequential version of a task
+  'SequentialTask',
+  # create a simple task from a command line + name
+  'SimpleTask',
+  # helper classes
+  'Options',
+  'Processes',
+  'Tasks',
+  'Units',
+class Units(object):
+  """Helpers for base units of Tasks and Processes."""
+  @classmethod
+  def optional_resources(cls, resources):
+    return Resources() if resources is Empty else resources
+  @classmethod
+  def resources_sum(cls, *resources):
+    """Add two Resources objects together."""
+    def add_unit(f1, f2):
+      return (0 if f1 is Empty else f1.get()) + (0 if f2 is Empty else f2.get())
+    def add(r1, r2):
+      return Resources(cpu=add_unit(r1.cpu(), r2.cpu()),
+                       ram=add_unit(r1.ram(), r2.ram()),
+                       disk=add_unit(r1.disk(), r2.disk()))
+    return reduce(add, map(cls.optional_resources, resources), Resources(cpu=0, ram=0, disk=0))
+  @classmethod
+  def resources_max(cls, resources):
+    """Return a Resource object that is the maximum of the inputs along each
+      resource dimension."""
+    def max_unit(f1, f2):
+      return max(0 if f1 is Empty else f1.get(), 0 if f2 is Empty else f2.get())
+    def resource_max(r1, r2):
+      return Resources(cpu=max_unit(r1.cpu(), r2.cpu()),
+                       ram=max_unit(r1.ram(), r2.ram()),
+                       disk=max_unit(r1.disk(), r2.disk()))
+    return reduce(resource_max,
+        map(cls.optional_resources, resources), Resources(cpu=0, ram=0, disk=0))
+  @classmethod
+  def processes_merge(cls, tasks):
+    """Return a deduped list of the processes from all tasks."""
+    return list(set(itertools.chain.from_iterable(task.processes() for task in tasks)))
+  @classmethod
+  def constraints_merge(cls, tasks):
+    """Return a deduped list of the constraints from all tasks."""
+    return list(set(itertools.chain.from_iterable(task.constraints() for task in tasks)))
+class Processes(object):
+  """Helper class for Process objects."""
+  @classmethod
+  def _process_name(cls, process):
+    if isinstance(process, Process):
+      return
+    elif isinstance(process, Compatibility.string):
+      return process
+    raise ValueError("Unknown value for process order: %s" % repr(process))
+  @classmethod
+  def order(cls, *processes):
+    """Given a list of processes, return the list of constraints that keeps them in order, e.g.
+       order(p1, p2, p3) => [Constraint(order=[,,])].
+       Similarly, concatenation operations are valid, i.e.
+          order(p1, p2) + order(p2, p3) <=> order(p1, p2, p3)
+    """
+    return [Constraint(order=[cls._process_name(p) for p in processes])]
+class Tasks(object):
+  """Helper class for Task objects."""
+  SIMPLE_CPU  = 1.0
+  SIMPLE_RAM  = 1 * GB
+  @classmethod
+  def _combine_processes(cls, *tasks):
+    """Given multiple tasks, merge their processes together, retaining the identity of the first
+       task."""
+    if len(tasks) == 0:
+      return Task()
+    head_task = tasks[-1]
+    return head_task(processes=Units.processes_merge(tasks))
+  @classmethod
+  def combine(cls, *tasks, **kw):
+    """Given multiple tasks, return a Task that runs all processes in parallel."""
+    if len(tasks) == 0:
+      return Task()
+    base = cls._combine_processes(*tasks)
+    return base(
+      resources=Units.resources_sum(*(task.resources() for task in tasks)),
+      constraints=Units.constraints_merge(tasks),
+      **kw
+    )
+  @classmethod
+  def concat(cls, *tasks, **kw):
+    """Given tasks T1...TN, return a single Task that runs all processes such that
+       all processes in Tk run before any process in Tk+1."""
+    if len(tasks) == 0:
+      return Task()
+    base = cls._combine_processes(*tasks)
+    base = base(resources=Units.resources_max(task.resources() for task in tasks))
+    base_constraints = Units.constraints_merge(tasks)
+    # TODO(wickman) be smarter about this in light of existing constraints
+    for (t1, t2) in zip(tasks[0:-1], tasks[1:]):
+      for p1 in t1.processes():
+        for p2 in t2.processes():
+          if p1 != p2:
+            base_constraints.extend(Processes.order(p1, p2))
+    return base(constraints=base_constraints, **kw)
+  @classmethod
+  def simple(cls, name, command):
+    """Create a usable Task from a provided name + command line and a default set of resources"""
+    return Task(
+      name=name,
+      processes=[Process(name=name, cmdline=command)],
+      resources=Resources(cpu=cls.SIMPLE_CPU,
+                          ram=cls.SIMPLE_RAM,
+                          disk=cls.SIMPLE_DISK))
+  @classmethod
+  def sequential(cls, task):
+    """Add a constraint that makes all processes within a task run sequentially."""
+    def maybe_constrain(task):
+      return {'constraints': order(*task.processes())} if task.processes() is not Empty else {}
+    if task.constraints() is Empty or task.constraints() == List(Constraint)([]):
+      return task(**maybe_constrain(task))
+    raise ValueError('Cannot turn a Task with existing constraints into a SequentialTask!')
+class Options(object):
+  """Helper class for constructing command-line arguments."""
+  @classmethod
+  def render_option(cls, short_prefix, long_prefix, option, value=None):
+    option = '%s%s' % (short_prefix if len(option) == 1 else long_prefix, option)
+    return '%s %s' % (option, value) if value else option
+  @classmethod
+  def render_options(cls, short_prefix, long_prefix, *options, **kw_options):
+    renders = []
+    for option in options:
+      if isinstance(option, Compatibility.string):
+        renders.append(cls.render_option(short_prefix, long_prefix, option))
+      elif isinstance(option, dict):
+        # preserve order in case option is an OrderedDict, rather than recursing with **option
+        for argument, value in option.items():
+          renders.append(cls.render_option(short_prefix, long_prefix, argument, value))
+      else:
+        raise ValueError('Got an unexpected argument to render_options: %s' % repr(option))
+    for argument, value in kw_options.items():
+      renders.append(cls.render_option(short_prefix, long_prefix, argument, value))
+    return renders
+  @classmethod
+  def java(cls, *options, **kw_options):
+    """
+      Given a set of arguments, keyword arguments or dictionaries, render
+      command-line parameters accordingly.  For example:
+        java_options('a', 'b') == '-a -b'
+        java_options({
+          'a': 23,
+          'b': 'foo'
+        }) == '-a 23 -b foo'
+        java_options(a=23, b='foo') == '-a 23 -b foo'
+    """
+    return ' '.join(cls.render_options('-', '-', *options, **kw_options))
+  @classmethod
+  def python(cls, *options, **kw_options):
+    """
+      Given a set of arguments, keyword arguments or dictionaries, render
+      command-line parameters accordingly.  Single letter parameters are
+      rendered with single '-'.  For example:
+        python_options('a', 'boo') == '-a --boo'
+        python_options({
+          'a': 23,
+          'boo': 'foo'
+        }) == '-a 23 --boo foo'
+        python_options(a=23, boo='foo') == '-a 23 --boo foo'
+    """
+    return ' '.join(cls.render_options('-', '--', *options, **kw_options))
+def SimpleTask(name, command):
+  """A simple command-line Task with default resources"""
+  return Tasks.simple(name, command)
+def SequentialTask(*args, **kw):
+  """A Task whose processes are always sequential."""
+  return Tasks.sequential(Task(*args, **kw))
+python_options = Options.python
+java_options =
+combine_tasks = Tasks.combine
+concat_tasks = Tasks.concat
+order = Processes.order
diff --git a/src/main/python/apache/thermos/core/BUILD b/src/main/python/apache/thermos/core/BUILD
new file mode 100644
index 0000000..d260112
--- /dev/null
+++ b/src/main/python/apache/thermos/core/BUILD
@@ -0,0 +1,88 @@
+import os
+  name = 'helper',
+  sources = [''],
+  dependencies = [
+    pants('src/main/python/twitter/thermos:psutil'),
+    pants('aurora/twitterdeps/src/python/twitter/common/dirutil'),
+    pants('aurora/twitterdeps/src/python/twitter/common/log'),
+    pants('aurora/twitterdeps/src/python/twitter/common/quantity'),
+    pants('aurora/twitterdeps/src/python/twitter/common/recordio:recordio-thrift'),
+    pants('src/main/python/twitter/thermos/common:ckpt'),
+    pants('src/main/python/twitter/thermos/common:path'),
+    pants('src/main/thrift/com/twitter/thermos:py-thrift'),
+  ]
+  name = 'inspector',
+  sources = [''],
+  dependencies = [
+    pants(':muxer'),
+    pants('aurora/twitterdeps/src/python/twitter/common/recordio:recordio-thrift'),
+    pants('src/main/python/twitter/thermos/common:ckpt'),
+    pants('src/main/python/twitter/thermos/common:path'),
+    pants('src/main/thrift/com/twitter/thermos:py-thrift'),
+  ]
+  name = 'muxer',
+  sources = [''],
+  dependencies = [
+    pants('aurora/twitterdeps/src/python/twitter/common/log'),
+    pants('aurora/twitterdeps/src/python/twitter/common/recordio:recordio-thrift'),
+    pants('src/main/thrift/com/twitter/thermos:py-thrift'),
+  ]
+  name = 'process',
+  sources = [''],
+  dependencies = [
+    pants('aurora/twitterdeps/src/python/twitter/common/dirutil'),
+    pants('aurora/twitterdeps/src/python/twitter/common/lang'),
+    pants('aurora/twitterdeps/src/python/twitter/common/log'),
+    pants('aurora/twitterdeps/src/python/twitter/common/quantity'),
+    pants('aurora/twitterdeps/src/python/twitter/common/recordio:recordio-thrift'),
+    pants('src/main/thrift/com/twitter/thermos:py-thrift'),
+  ]
+  name = 'runner',
+  sources = ['', ''],
+  dependencies = [
+    pants(':helper'),
+    pants(':muxer'),
+    pants(':process'),
+    pants('aurora/twitterdeps/src/python/twitter/common/dirutil'),
+    pants('aurora/twitterdeps/src/python/twitter/common/log'),
+    pants('aurora/twitterdeps/src/python/twitter/common/quantity'),
+    pants('aurora/twitterdeps/src/python/twitter/common/recordio:recordio-thrift'),
+    pants('src/main/python/twitter/thermos:psutil'),
+    pants('src/main/python/twitter/thermos/common:ckpt'),
+    pants('src/main/python/twitter/thermos/common:path'),
+    pants('src/main/python/twitter/thermos/common:planner'),
+    pants('src/main/python/twitter/thermos/config:schema'),
+    pants('src/main/thrift/com/twitter/thermos:py-thrift'),
+  ]
+  name = 'core',
+  dependencies = [
+    pants(':inspector'),
+    pants(':runner'),
+    # covering libs
+    pants('src/main/python/twitter/thermos/common'),
+    pants('src/main/python/twitter/thermos/config'),
+  ],
+  provides = setup_py(
+    name = 'twitter.thermos.core',
+    version = open(os.path.join(get_buildroot(), '.auroraversion')).read().strip().lower(),
+    description = 'The Thermos core state machine.',
+  )
diff --git a/src/main/python/apache/thermos/core/ b/src/main/python/apache/thermos/core/
new file mode 100644
index 0000000..de40ea7
--- /dev/null
+++ b/src/main/python/apache/thermos/core/
@@ -0,0 +1 @@
diff --git a/src/main/python/apache/thermos/core/ b/src/main/python/apache/thermos/core/
new file mode 100644
index 0000000..3587a85
--- /dev/null
+++ b/src/main/python/apache/thermos/core/
@@ -0,0 +1,387 @@
+from contextlib import closing
+import errno
+import os
+import signal
+import time
+from twitter.common import log
+from twitter.common.dirutil import lock_file, safe_mkdir
+from twitter.common.quantity import Amount, Time
+from twitter.common.recordio import ThriftRecordWriter
+from twitter.thermos.common.ckpt import CheckpointDispatcher
+from twitter.thermos.common.path import TaskPath
+from gen.twitter.thermos.ttypes import (
+  ProcessState,
+  ProcessStatus,
+  RunnerCkpt,
+  TaskState,
+  TaskStatus)
+import psutil
+class TaskKiller(object):
+  """
+    Task killing interface.
+  """
+  def __init__(self, task_id, checkpoint_root):
+    self._task_id = task_id
+    self._checkpoint_root = checkpoint_root
+  def kill(self, force=True):
+    TaskRunnerHelper.kill(self._task_id, self._checkpoint_root, force=force,
+                          terminal_status=TaskState.KILLED)
+  def lose(self, force=True):
+    TaskRunnerHelper.kill(self._task_id, self._checkpoint_root, force=force,
+                          terminal_status=TaskState.LOST)
+class TaskRunnerHelper(object):
+  """
+    TaskRunner helper methods that can be operated directly upon checkpoint
+    state.  These operations do not require knowledge of the underlying
+    task.
+    TaskRunnerHelper is sort of a mishmash of "checkpoint-only" operations and
+    the "Process Platform" stuff that started to get pulled into
+    This really needs some hard design thought to see if it can be extracted out
+    even further.
+  """
+  class Error(Exception): pass
+  class PermissionError(Error): pass
+  # Maximum drift between when the system says a task was forked and when we checkpointed
+  # its fork_time (used as a heuristic to determine a forked task is really ours instead of
+  # a task with coincidentally the same PID but just wrapped around.)
+  @staticmethod
+  def get_actual_user():
+    import getpass, pwd
+    try:
+      pwd_entry = pwd.getpwuid(os.getuid())
+    except KeyError:
+      return getpass.getuser()
+    return pwd_entry[0]
+  @staticmethod
+  def process_from_name(task, process_name):
+    if task.has_processes():
+      for process in task.processes():
+        if == process_name:
+          return process
+    return None
+  @classmethod
+  def this_is_really_our_pid(cls, process, current_user, start_time):
+    """
+      A heuristic to make sure that this is likely the pid that we own/forked.  Necessary
+      because of pid-space wrapping.  We don't want to go and kill processes we don't own,
+      especially if the killer is running as root.
+      process: psutil.Process representing the process to check
+      current_user: user expected to own the process
+      start_time: time at which it's expected the process has started
+      Raises:
+        psutil.NoSuchProcess - if the Process supplied no longer exists
+    """
+    if process.username != current_user:
+"Expected pid %s to be ours but the pid user is %s and we're %s" % (
+, process.username, current_user))
+      return False
+    if abs(start_time - process.create_time) >= cls.MAX_START_TIME_DRIFT.as_(Time.SECONDS):
+"Expected pid %s start time to be %s but it's %s" % (
+, start_time, process.create_time))
+      return False
+    return True
+  @classmethod
+  def scan_process(cls, state, process_name):
+    """
+      Given a RunnerState and a process_name, return the following:
+        (coordinator pid, process pid, process tree)
+        (int or None, int or None, set)
+    """
+    process_run = state.processes[process_name][-1]
+    process_owner = state.header.user
+    coordinator_pid, pid, tree = None, None, set()
+    if process_run.coordinator_pid:
+      try:
+        coordinator_process = psutil.Process(process_run.coordinator_pid)
+        if cls.this_is_really_our_pid(coordinator_process, process_owner, process_run.fork_time):
+          coordinator_pid = process_run.coordinator_pid
+      except psutil.NoSuchProcess:
+'  Coordinator %s [pid: %s] completed.' % (process_run.process,
+            process_run.coordinator_pid))
+      except psutil.Error as err:
+        log.warning('  Error gathering information on pid %s: %s' % (process_run.coordinator_pid,
+            err))
+    if
+      try:
+        process = psutil.Process(
+        if cls.this_is_really_our_pid(process, process_owner, process_run.start_time):
+          pid =
+      except psutil.NoSuchProcess:
+'  Process %s [pid: %s] completed.' % (process_run.process,
+      except psutil.Error as err:
+        log.warning('  Error gathering information on pid %s: %s' % (, err))
+      else:
+        if pid:
+          try:
+            tree = set( for proc in process.get_children(recursive=True))
+          except psutil.Error:
+            log.warning('  Error gathering information on children of pid %s' % pid)
+    return (coordinator_pid, pid, tree)
+  @classmethod
+  def scantree(cls, state):
+    """
+      Scan the process tree associated with the provided task state.
+      Returns a dictionary of process name => (coordinator pid, pid, pid children)
+      If the coordinator is no longer active, coordinator pid will be None.  If the
+      forked process is no longer active, pid will be None and its children will be
+      an empty set.
+    """
+    return dict((process_name, cls.scan_process(state, process_name))
+                for process_name in state.processes)
+  @classmethod
+  def safe_signal(cls, pid, sig=signal.SIGTERM):
+    try:
+      os.kill(pid, sig)
+    except OSError as e:
+      if e.errno not in (errno.ESRCH, errno.EPERM):
+        log.error('Unexpected error in os.kill: %s' % e)
+    except Exception as e:
+      log.error('Unexpected error in os.kill: %s' % e)
+  @classmethod
+  def terminate_pid(cls, pid):
+    cls.safe_signal(pid, signal.SIGTERM)
+  @classmethod
+  def kill_pid(cls, pid):
+    cls.safe_signal(pid, signal.SIGKILL)
+  @classmethod
+  def kill_group(cls, pgrp):
+    cls.safe_signal(-pgrp, signal.SIGKILL)
+  @classmethod
+  def _get_process_tuple(cls, state, process_name):
+    assert process_name in state.processes and len(state.processes[process_name]) > 0
+    return cls.scan_process(state, process_name)
+  @classmethod
+  def _get_coordinator_group(cls, state, process_name):
+    assert process_name in state.processes and len(state.processes[process_name]) > 0
+    return state.processes[process_name][-1].coordinator_pid
+  @classmethod
+  def terminate_process(cls, state, process_name):
+    log.debug('TaskRunnerHelper.terminate_process(%s)' % process_name)
+    _, pid, _ = cls._get_process_tuple(state, process_name)
+    if pid:
+      log.debug('   => SIGTERM pid %s' % pid)
+      cls.terminate_pid(pid)
+    return bool(pid)
+  @classmethod
+  def kill_process(cls, state, process_name):
+    log.debug('TaskRunnerHelper.kill_process(%s)' % process_name)
+    coordinator_pgid = cls._get_coordinator_group(state, process_name)
+    coordinator_pid, pid, tree = cls._get_process_tuple(state, process_name)
+    # This is super dangerous.  TODO(wickman)  Add a heuristic that determines
+    # that 1) there are processes that currently belong to this process group
+    #  and 2) those processes have inherited the coordinator checkpoint filehandle
+    # This way we validate that it is in fact the process group we expect.
+    if coordinator_pgid:
+      log.debug('   => SIGKILL coordinator group %s' % coordinator_pgid)
+      cls.kill_group(coordinator_pgid)
+    if coordinator_pid:
+      log.debug('   => SIGKILL coordinator %s' % coordinator_pid)
+      cls.kill_pid(coordinator_pid)
+    if pid:
+      log.debug('   => SIGKILL pid %s' % pid)
+      cls.kill_pid(pid)
+    for child in tree:
+      log.debug('   => SIGKILL child %s' % child)
+      cls.kill_pid(child)
+    return bool(coordinator_pid or pid or tree)
+  @classmethod
+  def kill_runner(cls, state):
+    log.debug('TaskRunnerHelper.kill_runner()')
+    if not state or not state.statuses:
+      raise cls.Error('Could not read state!')
+    pid = state.statuses[-1].runner_pid
+    if pid == os.getpid():
+      raise cls.Error('Unwilling to commit seppuku.')
+    try:
+      os.kill(pid, signal.SIGKILL)
+      return True
+    except OSError as e:
+      if e.errno == errno.EPERM:
+        # Permission denied
+        return False
+      elif e.errno == errno.ESRCH:
+        # pid no longer exists
+        return True
+      raise
+  @classmethod
+  def open_checkpoint(cls, filename, force=False, state=None):
+    """
+      Acquire a locked checkpoint stream.
+    """
+    safe_mkdir(os.path.dirname(filename))
+    fp = lock_file(filename, "a+")
+    if fp in (None, False):
+      if force:
+'Found existing runner, forcing leadership forfeit.')
+        state = state or CheckpointDispatcher.from_file(filename)
+        if cls.kill_runner(state):
+'Successfully killed leader.')
+          # TODO(wickman)  Blocking may not be the best idea here.  Perhaps block up to
+          # a maximum timeout.  But blocking is necessary because os.kill does not immediately
+          # release the lock if we're in force mode.
+          fp = lock_file(filename, "a+", blocking=True)
+      else:
+        log.error('Found existing runner, cannot take control.')
+    if fp in (None, False):
+      raise cls.PermissionError('Could not open locked checkpoint: %s, lock_file = %s' %
+        (filename, fp))
+    ckpt = ThriftRecordWriter(fp)
+    ckpt.set_sync(True)
+    return ckpt
+  @classmethod
+  def kill(cls, task_id, checkpoint_root, force=False,
+           terminal_status=TaskState.KILLED, clock=time):
+    """
+      An implementation of Task killing that doesn't require a fully hydrated TaskRunner object.
+      Terminal status must be either KILLED or LOST state.
+    """
+    if terminal_status not in (TaskState.KILLED, TaskState.LOST):
+      raise cls.Error('terminal_status must be KILLED or LOST (got %s)' %
+                      TaskState._VALUES_TO_NAMES.get(terminal_status) or terminal_status)
+    pathspec = TaskPath(root=checkpoint_root, task_id=task_id)
+    checkpoint = pathspec.getpath('runner_checkpoint')
+    state = CheckpointDispatcher.from_file(checkpoint)
+    if state is None or state.header is None or state.statuses is None:
+      if force:
+        log.error('Task has uninitialized TaskState - forcibly finalizing')
+        cls.finalize_task(pathspec)
+        return
+      else:
+        log.error('Cannot update states in uninitialized TaskState!')
+        return
+    ckpt = cls.open_checkpoint(checkpoint, force=force, state=state)
+    def write_task_state(state):
+      update = TaskStatus(state=state, timestamp_ms=int(clock.time() * 1000),
+                          runner_pid=os.getpid(), runner_uid=os.getuid())
+      ckpt.write(RunnerCkpt(task_status=update))
+    def write_process_status(status):
+      ckpt.write(RunnerCkpt(process_status=status))
+    if cls.is_task_terminal(state.statuses[-1].state):
+'Task is already in terminal state!  Finalizing.')
+      cls.finalize_task(pathspec)
+      return
+    with closing(ckpt):
+      write_task_state(TaskState.ACTIVE)
+      for process, history in state.processes.items():
+        process_status = history[-1]
+        if not cls.is_process_terminal(process_status.state):
+          if cls.kill_process(state, process):
+            write_process_status(ProcessStatus(process=process,
+              state=ProcessState.KILLED, seq=process_status.seq + 1, return_code=-9,
+              stop_time=clock.time()))
+          else:
+            if process_status.state is not ProcessState.WAITING:
+              write_process_status(ProcessStatus(process=process,
+                state=ProcessState.LOST, seq=process_status.seq + 1))
+      write_task_state(terminal_status)
+    cls.finalize_task(pathspec)
+  @classmethod
+  def reap_children(cls):
+    pids = set()
+    while True:
+      try:
+        pid, status, rusage = os.wait3(os.WNOHANG)
+        if pid == 0:
+          break
+        pids.add(pid)
+        log.debug('Detected terminated process: pid=%s, status=%s, rusage=%s' % (
+          pid, status, rusage))
+      except OSError as e:
+        if e.errno != errno.ECHILD:
+          log.warning('Unexpected error when calling waitpid: %s' % e)
+        break
+    return pids
+    ProcessState.SUCCESS,
+    ProcessState.KILLED,
+    ProcessState.FAILED,
+    ProcessState.LOST])
+  TERMINAL_TASK_STATES = frozenset([
+    TaskState.SUCCESS,
+    TaskState.FAILED,
+    TaskState.KILLED,
+    TaskState.LOST])
+  @classmethod
+  def is_process_terminal(cls, process_status):
+    return process_status in cls.TERMINAL_PROCESS_STATES
+  @classmethod
+  def is_task_terminal(cls, task_status):
+    return task_status in cls.TERMINAL_TASK_STATES
+  @classmethod
+  def initialize_task(cls, spec, task):
+    active_task = spec.given(state='active').getpath('task_path')
+    finished_task = spec.given(state='finished').getpath('task_path')
+    is_active, is_finished = os.path.exists(active_task), os.path.exists(finished_task)
+    if is_finished:
+      raise cls.Error('Cannot initialize task with "finished" record!')
+    if not is_active:
+      safe_mkdir(os.path.dirname(active_task))
+      with open(active_task, 'w') as fp:
+        fp.write(task)
+  @classmethod
+  def finalize_task(cls, spec):
+    active_task = spec.given(state='active').getpath('task_path')
+    finished_task = spec.given(state='finished').getpath('task_path')
+    is_active, is_finished = os.path.exists(active_task), os.path.exists(finished_task)
+    if not is_active:
+      raise cls.Error('Cannot finalize task with no "active" record!')
+    elif is_finished:
+      raise cls.Error('Cannot finalize task with "finished" record!')
+    safe_mkdir(os.path.dirname(finished_task))
+    os.rename(active_task, finished_task)
+    os.utime(finished_task, None)
diff --git a/src/main/python/apache/thermos/core/ b/src/main/python/apache/thermos/core/
new file mode 100644
index 0000000..8c646a1
--- /dev/null
+++ b/src/main/python/apache/thermos/core/
@@ -0,0 +1,105 @@
+from collections import namedtuple
+from contextlib import closing
+import pwd
+from twitter.common import log
+from twitter.common.recordio import RecordIO, ThriftRecordReader
+from twitter.thermos.common.ckpt import CheckpointDispatcher
+from twitter.thermos.common.path import TaskPath
+from gen.twitter.thermos.ttypes import (
+  ProcessState,
+  RunnerCkpt,
+  RunnerState)
+from .muxer import ProcessMuxer
+CheckpointInspection = namedtuple('CheckpointInspection',
+    ['runner_latest_update',
+     'process_latest_update',
+     'runner_processes',
+     'coordinator_processes',
+     'processes'])
+class CheckpointInspector(object):
+  def __init__(self, checkpoint_root):
+    self._path = TaskPath(root=checkpoint_root)
+  @staticmethod
+  def get_timestamp(process_record):
+    if process_record :
+      for timestamp in ('fork_time', 'start_time', 'stop_time'):
+        stamp = getattr(process_record, timestamp, None)
+        if stamp:
+          return stamp
+    return 0
+  def inspect(self, task_id):
+    """
+      Reconstructs the checkpoint stream and returns a CheckpointInspection.
+    """
+    dispatcher = CheckpointDispatcher()
+    state = RunnerState(processes = {})
+    muxer = ProcessMuxer(self._path.given(task_id=task_id))
+    runner_processes = []
+    coordinator_processes = set()
+    processes = set()
+    def consume_process_record(record):
+      if not record.process_status:
+        return
+      try:
+        user_uid = pwd.getpwnam(state.header.user).pw_uid
+      except KeyError:
+        log.error('Could not find user: %s' % state.header.user)
+        return
+      if record.process_status.state == ProcessState.FORKED:
+        coordinator_processes.add((record.process_status.coordinator_pid, user_uid,
+                                   record.process_status.fork_time))
+      elif record.process_status.state == ProcessState.RUNNING:
+        processes.add((, user_uid,
+                       record.process_status.start_time))
+    # replay runner checkpoint
+    runner_pid = None
+    runner_latest_update = 0
+    try:
+      with open(self._path.given(task_id=task_id).getpath('runner_checkpoint')) as fp:
+        with closing(ThriftRecordReader(fp, RunnerCkpt)) as ckpt:
+          for record in ckpt:
+            dispatcher.dispatch(state, record)
+            runner_latest_update = max(runner_latest_update,
+                self.get_timestamp(record.process_status))
+            # collect all bound runners
+            if record.task_status:
+              if record.task_status.runner_pid != runner_pid:
+                runner_processes.append((record.task_status.runner_pid,
+                                         record.task_status.runner_uid or 0,
+                                         record.task_status.timestamp_ms))
+                runner_pid = record.task_status.runner_pid
+            elif record.process_status:
+              consume_process_record(record)
+    except (IOError, OSError, RecordIO.Error) as err:
+      log.debug('Error inspecting task runner checkpoint: %s' % err)
+      return
+    # register existing processes in muxer
+    for process_name in state.processes:
+      muxer.register(process_name)
+    # read process checkpoints
+    process_latest_update = runner_latest_update
+    for record in
+      process_latest_update = max(process_latest_update, self.get_timestamp(record.process_status))
+      consume_process_record(record)
+    return CheckpointInspection(
+      runner_latest_update=runner_latest_update,
+      process_latest_update=process_latest_update,
+      runner_processes=runner_processes,
+      coordinator_processes=coordinator_processes,
+      processes=processes)
diff --git a/src/main/python/apache/thermos/core/ b/src/main/python/apache/thermos/core/
new file mode 100644
index 0000000..9c0b389
--- /dev/null
+++ b/src/main/python/apache/thermos/core/
@@ -0,0 +1,142 @@
+import os
+import errno
+from twitter.common import log
+from twitter.common.recordio import ThriftRecordReader
+from gen.twitter.thermos.ttypes import RunnerCkpt
+class ProcessMuxer(object):
+  class ProcessExists(Exception): pass
+  class ProcessNotFound(Exception): pass
+  class CorruptCheckpoint(Exception): pass
+  def __init__(self, pathspec):
+    self._processes = {} # process_name => fp
+    self._watermarks = {} # process_name => sequence high watermark
+    self._pathspec = pathspec
+  def __del__(self):
+    for fp in filter(None, self._processes.values()):
+      fp.close()
+  def register(self, process_name, watermark=0):
+    log.debug('registering %s' % process_name)
+    if process_name in self._processes:
+      raise ProcessMuxer.ProcessExists("Process %s is already registered" % process_name)
+    self._processes[process_name] = None
+    self._watermarks[process_name] = watermark
+  def _bind_processes(self):
+    for process_name, fp in self._processes.items():
+      if fp is None:
+        process_ckpt = self._pathspec.given(process=process_name).getpath('process_checkpoint')
+        log.debug('ProcessMuxer binding %s => %s' % (process_name, process_ckpt))
+        try:
+          self._processes[process_name] = open(process_ckpt, 'r')
+        except IOError as e:
+          if e.errno == errno.ENOENT:
+            log.debug('  => bind failed, checkpoint not available yet.')
+            continue
+          else:
+            log.error("Unexpected inability to open %s! %s" % (process_ckpt, e))
+        except Exception as e:
+          log.error("Unexpected inability to open %s! %s" % (process_ckpt, e))
+        self._fast_forward_stream(process_name)
+  def _fast_forward_stream(self, process_name):
+    log.debug('Fast forwarding %s stream to seq=%s' % (process_name,
+      self._watermarks[process_name]))
+    assert self._processes.get(process_name) is not None
+    fp = self._processes[process_name]
+    rr = ThriftRecordReader(fp, RunnerCkpt)
+    current_watermark = -1
+    records = 0
+    while current_watermark < self._watermarks[process_name]:
+      last_pos = fp.tell()
+      record = rr.try_read()
+      if record is None:
+        break
+      new_watermark = record.process_status.seq
+      if new_watermark > self._watermarks[process_name]:
+        log.debug('Over-seeked %s [watermark = %s, high watermark = %s], rewinding.' % (
+          process_name, new_watermark, self._watermarks[process_name]))
+        break
+      current_watermark = new_watermark
+      records += 1
+    if current_watermark < self._watermarks[process_name]:
+      log.warning('Only able to fast forward to %s@sequence=%s, high watermark is %s' % (
+         process_name, current_watermark, self._watermarks[process_name]))
+    if records:
+      log.debug('Fast forwarded %s %s record(s) to seq=%s.' % (process_name, records,
+        current_watermark))
+  def unregister(self, process_name):
+    log.debug('unregistering %s' % process_name)
+    if process_name not in self._processes:
+      raise ProcessMuxer.ProcessNotFound("No trace of process: %s" % process_name)
+    else:
+      self._watermarks.pop(process_name)
+      fp = self._processes.pop(process_name)
+      if fp is not None:
+        fp.close()
+  def has_data(self, process):
+    """
+      Return true if we think that there are updates available from the supplied process.
+    """
+    self._bind_processes()
+    # TODO(wickman) Should this raise ProcessNotFound?
+    if process not in self._processes:
+      return False
+    fp = self._processes[process]
+    rr = ThriftRecordReader(fp, RunnerCkpt)
+    old_pos = fp.tell()
+    try:
+      expected_new_pos = os.fstat(fp.fileno()).st_size
+    except OSError as e:
+      log.debug('ProcessMuxer could not fstat for process %s' % process)
+      return False
+    update = rr.try_read()
+    if update:
+      return True
+    return False
+  def select(self):
+    """
+      Read and multiplex checkpoint records from all the forked off process coordinators.
+      Checkpoint records can come from one of two places:
+        in-process: checkpoint records synthesized for FORKED and LOST events
+        out-of-process: checkpoint records from from file descriptors of forked coordinators
+      Returns a list of RunnerCkpt objects that were successfully read, or an empty
+      list if none were read.
+    """
+    self._bind_processes()
+    updates = []
+    for handle in filter(None, self._processes.values()):
+      try:
+        fstat = os.fstat(handle.fileno())
+      except OSError as e:
+        log.error('Unable to fstat %s!' %
+        continue
+      if handle.tell() > fstat.st_size:
+        log.error('Truncated checkpoint record detected on %s!' %
+      elif handle.tell() < fstat.st_size:
+        rr = ThriftRecordReader(handle, RunnerCkpt)
+        while True:
+          process_update = rr.try_read()
+          if process_update:
+            updates.append(process_update)
+          else:
+            break
+    if len(updates) > 0:
+      log.debug('select() returning %s updates:' % len(updates))
+      for update in updates:
+        log.debug('  = %s' % update)
+    return updates
diff --git a/src/main/python/apache/thermos/core/ b/src/main/python/apache/thermos/core/
new file mode 100644
index 0000000..dd1d62a
--- /dev/null
+++ b/src/main/python/apache/thermos/core/
@@ -0,0 +1,371 @@
+"""Run processes of a Thermos task.
+This module contains the Process class, used to manage the execution of the constituent processes of
+a Thermos task. Each process is represented by a "coordinator" process, which fires off the actual
+commandline in a subprocess of its own.
+from abc import abstractmethod
+import getpass
+import grp
+import os
+import pwd
+import signal
+import subprocess
+import sys
+import time
+from twitter.common import log
+from twitter.common.dirutil import (
+    lock_file,
+    safe_mkdir,
+    safe_open,
+from twitter.common.lang import Interface
+from twitter.common.quantity import Amount, Time
+from twitter.common.recordio import ThriftRecordReader, ThriftRecordWriter
+from gen.twitter.thermos.ttypes import (
+    ProcessState,
+    ProcessStatus,
+    RunnerCkpt,
+class Platform(Interface):
+  """Abstract representation of a platform encapsulating system-level functions"""
+  @abstractmethod
+  def clock(self):
+    pass
+  @abstractmethod
+  def fork(self):
+    pass
+  @abstractmethod
+  def getpid(self):
+    pass
+class ProcessBase(object):
+  """
+    Encapsulate a running process for a task.
+  """
+  class Error(Exception): pass
+  class UnknownUserError(Error): pass
+  class CheckpointError(Error): pass
+  class UnspecifiedSandbox(Error): pass
+  class PermissionError(Error): pass
+  def __init__(self, name, cmdline, sequence, pathspec, sandbox_dir, user=None, platform=None):
+    """
+      required:
+        name        = name of the process
+        cmdline     = cmdline of the process
+        sequence    = the next available sequence number for state updates
+        pathspec    = TaskPath object for synthesizing path names
+        sandbox_dir = the sandbox in which to run the process
+        platform    = Platform providing fork, clock, getpid
+      optional:
+        user        = the user to run as (if unspecified, will default to current user.)
+                      if specified to a user that is not the current user, you must have root access
+    """
+    self._name = name
+    self._cmdline = cmdline
+    self._pathspec = pathspec
+    self._seq = sequence
+    self._sandbox = sandbox_dir
+    if self._sandbox:
+      safe_mkdir(self._sandbox)
+    self._pid = None
+    self._fork_time = None
+    self._stdout = None
+    self._stderr = None
+    self._user = user
+    if self._user:
+      user, current_user = self._getpwuid() # may raise self.UnknownUserError
+      if user != current_user and os.geteuid() != 0:
+        raise self.PermissionError('Must be root to run processes as other users!')
+    self._ckpt = None
+    self._ckpt_head = -1
+    if platform is None:
+      raise ValueError("Platform must be specified")
+    self._platform = platform
+  def _log(self, msg):
+    log.debug('[process:%5s=%s]: %s' % (self._pid,, msg))
+  def _ckpt_write(self, msg):
+    self._init_ckpt_if_necessary()
+    self._log("child state transition [%s] <= %s" % (self.ckpt_file(), msg))
+    self._ckpt.write(msg)
+  def _write_process_update(self, **kw):
+    """Write a process update to the coordinator's checkpoint stream."""
+    process_status = ProcessStatus(**kw)
+    process_status.seq = self._seq
+    process_status.process =
+    self._ckpt_write(RunnerCkpt(process_status=process_status))
+    self._seq += 1
+  def _write_initial_update(self):
+    self._write_process_update(state=ProcessState.FORKED,
+                               fork_time=self._fork_time,
+                               coordinator_pid=self._pid)
+  def cmdline(self):
+    return self._cmdline
+  def name(self):
+    return self._name
+  def pid(self):
+    """pid of the coordinator"""
+    return self._pid
+  def rebind(self, pid, fork_time):
+    """rebind Process to an existing coordinator pid without forking"""
+    self._pid = pid
+    self._fork_time = fork_time
+  def ckpt_file(self):
+    return self._pathspec.getpath('process_checkpoint')
+  def _setup_ckpt(self):
+    """Set up the checkpoint: must be run on the parent."""
+    self._log('initializing checkpoint file: %s' % self.ckpt_file())
+    ckpt_fp = lock_file(self.ckpt_file(), "a+")
+    if ckpt_fp in (None, False):
+      raise self.CheckpointError('Could not acquire checkpoint permission or lock for %s!' %
+        self.ckpt_file())
+    self._ckpt_head = os.path.getsize(self.ckpt_file())
+    self._ckpt = ThriftRecordWriter(ckpt_fp)
+    self._ckpt.set_sync(True)
+  def _init_ckpt_if_necessary(self):
+    if self._ckpt is None:
+      self._setup_ckpt()
+  def _wait_for_control(self):
+    """Wait for control of the checkpoint stream: must be run in the child."""
+    total_wait_time = Amount(0, Time.SECONDS)
+    with open(self.ckpt_file(), 'r') as fp:
+      rr = ThriftRecordReader(fp, RunnerCkpt)
+      while total_wait_time < self.MAXIMUM_CONTROL_WAIT:
+        ckpt_tail = os.path.getsize(self.ckpt_file())
+        if ckpt_tail == self._ckpt_head:
+          self._platform.clock().sleep(self.CONTROL_WAIT_CHECK_INTERVAL.as_(Time.SECONDS))
+          total_wait_time += self.CONTROL_WAIT_CHECK_INTERVAL
+          continue
+        checkpoint = rr.try_read()
+        if checkpoint:
+          if not checkpoint.process_status:
+            raise self.CheckpointError('No process status in checkpoint!')
+          if (checkpoint.process_status.process != or
+              checkpoint.process_status.state != ProcessState.FORKED or
+              checkpoint.process_status.fork_time != self._fork_time or
+              checkpoint.process_status.coordinator_pid != self._pid):
+            self._log('Losing control of the checkpoint stream:')
+            self._log('   fork_time [%s] vs self._fork_time [%s]' % (
+                checkpoint.process_status.fork_time, self._fork_time))
+            self._log('   coordinator_pid [%s] vs self._pid [%s]' % (
+                checkpoint.process_status.coordinator_pid, self._pid))
+            raise self.CheckpointError('Lost control of the checkpoint stream!')
+          self._log('Taking control of the checkpoint stream at record: %s' %
+            checkpoint.process_status)
+          self._seq = checkpoint.process_status.seq + 1
+          return True
+    raise self.CheckpointError('Timed out waiting for checkpoint stream!')
+  def _prepare_fork(self):
+    user, current_user = self._getpwuid()
+    uid, gid = user.pw_uid, user.pw_gid
+    self._fork_time = self._platform.clock().time()
+    self._setup_ckpt()
+    self._stdout = safe_open(self._pathspec.with_filename('stdout').getpath('process_logdir'), "w")
+    self._stderr = safe_open(self._pathspec.with_filename('stderr').getpath('process_logdir'), "w")
+    os.chown(, user.pw_uid, user.pw_gid)
+    os.chown(, user.pw_uid, user.pw_gid)
+  def _finalize_fork(self):
+    self._write_initial_update()
+    self._ckpt.close()
+    self._ckpt = None
+  def _getpwuid(self):
+    """Returns a tuple of the user (i.e. --user) and current user."""
+    try:
+      current_user = pwd.getpwuid(os.getuid())
+    except KeyError:
+      raise self.UnknownUserError('Unknown user %s!' % self._user)
+    try:
+      user = pwd.getpwnam(self._user) if self._user else current_user
+    except KeyError:
+      raise self.UnknownUserError('Unable to get pwent information!')
+    return user, current_user
+  def start(self):
+    """
+      This is the main call point from the runner, and forks a co-ordinator process to run the
+      target process (i.e. self.cmdline())
+      The parent returns immediately and populates information about the pid of the co-ordinator.
+      The child (co-ordinator) will launch the target process in a subprocess.
+    """
+    self._prepare_fork()
+    self._pid = self._platform.fork()
+    if self._pid == 0:
+      self._pid = self._platform.getpid()
+      self._wait_for_control()
+      try:
+        self.execute()
+      finally:
+        self._ckpt.close()
+        self.finish()
+    else:
+      self._finalize_fork()
+  def execute(self):
+    raise NotImplementedError
+  def finish(self):
+    pass
+class RealPlatform(Platform):
+  def __init__(self, fork=os.fork):
+    self._fork = fork
+  def fork(self):
+    pid = self._fork()
+    if pid == 0:
+      self._sanitize()
+    return pid
+  def _sanitize(self):
+    for sig in self.IGNORE_SIGNALS:
+      signal.signal(sig, signal.SIG_IGN)
+  def getpid(self):
+    return os.getpid()
+  def clock(self):
+    return time
+class Process(ProcessBase):
+  """
+    Encapsulate a running process for a task.
+  """
+  RCFILE = '.thermos_profile'
+  FD_CLOEXEC = True
+  def __init__(self, *args, **kw):
+    """
+      See ProcessBase.__init__
+      Takes additional arguments:
+        fork: the fork function to use [default: os.fork]
+        chroot: whether or not to chroot into the sandbox [default: False]
+    """
+    fork = kw.pop('fork', os.fork)
+    self._use_chroot = bool(kw.pop('chroot', False))
+    self._rc = None
+    kw['platform'] = RealPlatform(fork=fork)
+    ProcessBase.__init__(self, *args, **kw)
+    if self._use_chroot and self._sandbox is None:
+      raise self.UnspecifiedSandbox('If using chroot, must specify sandbox!')
+  def _chroot(self):
+    """chdir and chroot to the sandbox directory."""
+    os.chdir(self._sandbox)
+    os.chroot(self._sandbox)
+  def _setuid(self):
+    """Drop privileges to the user supplied in Process creation (if necessary.)"""
+    user, current_user = self._getpwuid()
+    if user.pw_uid == current_user.pw_uid:
+      return
+    uid, gid = user.pw_uid, user.pw_gid
+    username = user.pw_name
+    group_ids = [group.gr_gid for group in grp.getgrall() if username in group.gr_mem]
+    os.setgroups(group_ids)
+    os.setgid(gid)
+    os.setuid(uid)
+  def execute(self):
+    """Perform final initialization and launch target process commandline in a subprocess."""
+    if not self._stderr:
+      raise RuntimeError('self._stderr not set up!')
+    if not self._stdout:
+      raise RuntimeError('self._stdout not set up!')
+    user, _ = self._getpwuid()
+    username, homedir = user.pw_name, user.pw_dir
+    # TODO(wickman) reconsider setsid now that we're invoking in a subshell
+    os.setsid()
+    if self._use_chroot:
+      self._chroot()
+    self._setuid()
+    # start process
+    start_time = self._platform.clock().time()
+    if not self._sandbox:
+      sandbox = os.getcwd()
+    else:
+      sandbox = self._sandbox if not self._use_chroot else '/'
+    thermos_profile = os.path.join(sandbox, self.RCFILE)
+    env = {
+      'HOME': homedir if self._use_chroot else sandbox,
+      'LOGNAME': username,
+      'USER': username,
+      'PATH': os.environ['PATH']
+    }
+    if os.path.exists(thermos_profile):
+      env.update(BASH_ENV=thermos_profile)
+    self._popen = subprocess.Popen(["/bin/bash", "-c", self.cmdline()],
+                                   stderr=self._stderr,
+                                   stdout=self._stdout,
+                                   close_fds=self.FD_CLOEXEC,
+                                   cwd=sandbox,
+                                   env=env)
+    self._write_process_update(state=ProcessState.RUNNING,
+                     ,
+                               start_time=start_time)
+    # wait for job to finish
+    rc = self._popen.wait()
+    # indicate that we have finished/failed
+    if rc < 0:
+      state = ProcessState.KILLED
+    elif rc == 0:
+      state = ProcessState.SUCCESS
+    else:
+      state = ProcessState.FAILED
+    self._write_process_update(state=state,
+                               return_code=rc,
+                               stop_time=self._platform.clock().time())
+    self._rc = rc
+  def finish(self):
+    self._log('Coordinator exiting.')
+    sys.exit(0)