You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aurora.apache.org by wf...@apache.org on 2013/12/31 22:20:09 UTC

[16/51] [partial] Rename twitter* and com.twitter to apache and org.apache directories to preserve all file history before the refactor.

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/apache/thermos/observer/http/assets/observer.js
----------------------------------------------------------------------
diff --git a/src/main/python/apache/thermos/observer/http/assets/observer.js b/src/main/python/apache/thermos/observer/http/assets/observer.js
new file mode 100644
index 0000000..84aec5d
--- /dev/null
+++ b/src/main/python/apache/thermos/observer/http/assets/observer.js
@@ -0,0 +1,202 @@
+var Task = new Class({
+   data: {},
+
+   initialize: function(taskId, taskType) {
+     this.taskType = taskType
+     this.taskId  = taskId
+     this.visible  = false
+     this.setElement()
+   },
+
+   // task[active.task_id.processes.waiting]
+   taskStr: function(components) {
+     return 'task[' + ([this.taskType, this.taskId].append(components).join()) + ']'
+   },
+
+   transitionElement: function(dom_id, newval) {
+     if (!$(dom_id)) {
+       // alert('could not find dom_id' + dom_id)
+       return
+     }
+
+     curval = $(dom_id).innerHTML
+
+     // handle specialcases
+     newval = this.translateElement(dom_id, newval)
+
+     if (newval != curval) {
+       $(dom_id).innerHTML = newval
+       var morphElement = new Fx.Morph(dom_id, {
+           duration: 'long',
+           transition: Fx.Transitions.Sine.easeOut
+       });
+       morphElement.start({ 'color': ["#00FF00", "#222222"]})
+     }
+   },
+
+   applyUpdate: function(update) {
+     this.data = update
+     this.updateElement()
+   },
+
+   /*
+     TODO(wickman)  Instead do the 2-3 necessary transition functions, then make
+     a schema that maps attributes to the appropriate transition function.
+   */
+   translateElement: function(dom_id, value) {
+     if (dom_id === this.taskStr(['processes', 'running']) ||
+         dom_id === this.taskStr(['processes', 'waiting']) ||
+         dom_id === this.taskStr(['processes', 'success']) ||
+         dom_id === this.taskStr(['processes', 'failed'])) {
+       return value.length
+     }
+
+     if (dom_id == this.taskStr(['resource_consumption', 'cpu'])) {
+       if (value) {
+         return value.toFixed(2)
+       }
+     }
+
+     return value
+   },
+
+   /*
+     TODO(wickman)  Ditto to above.
+   */
+   updateElement: function() {
+     for (prefix_attr in this.data) {
+       if (this.data.hasOwnProperty(prefix_attr)) {
+         if (prefix_attr == "task_id") {
+           this.transitionElement(this.taskStr([prefix_attr]), this.translateUid(this.data[prefix_attr]))
+         } else if (instanceOf(this.data[prefix_attr], String) || instanceOf(this.data[prefix_attr], Number)) {
+           this.transitionElement(this.taskStr([prefix_attr]), this.data[prefix_attr])
+         } else if (instanceOf(this.data[prefix_attr], Object)) {
+           for (suffix_attr in this.data[prefix_attr])
+             if (this.data[prefix_attr].hasOwnProperty(suffix_attr))
+               this.transitionElement(this.taskStr([prefix_attr, suffix_attr]),
+                                      this.data[prefix_attr][suffix_attr])
+         }
+       }
+     }
+   },
+
+   translateUid: function(taskId) {
+     return "<a href='/task/" + taskId + "'>" + taskId + "</a>"
+   },
+
+   setElement: function() {
+     this.element = new Element('tr', {
+      'id': 'task[' + this.taskType + '][' + this.taskId + ']'}).adopt(
+         new Element('td', { 'id': this.taskStr(['task_id']), 'html': this.translateUid(this.taskId)}),
+         new Element('td', { 'id': this.taskStr(['name']) }),
+         new Element('td', { 'id': this.taskStr(['resource_consumption', 'cpu']) }),
+         new Element('td', { 'id': this.taskStr(['resource_consumption', 'ram']) }),
+         new Element('td', { 'id': this.taskStr(['resource_consumption', 'disk']) }),
+         new Element('td', { 'id': this.taskStr(['processes', 'waiting']) }),
+         new Element('td', { 'id': this.taskStr(['processes', 'running']) }),
+         new Element('td', { 'id': this.taskStr(['processes', 'success']) }),
+         new Element('td', { 'id': this.taskStr(['processes', 'failed']) }))
+   }
+})
+
+var TableManager = new Class({
+  activeTasks: {},
+  visibleChildren: [],
+
+  initialize: function(tableType) {
+    this.tableType = tableType
+    this.setElement()
+    this.getChildren()
+    this.startPolling()
+  },
+
+  setElement: function() {
+    this.element = new Element('table', { 'id': 'table[' + this.tableType + ']', 'class': 'common-table zebra-striped', 'cellpadding': '0' })
+    this.element.adopt(
+      new Element('thead').
+        adopt(new Element('tr', { 'id': 'task[' + this.tableType + '][superheader]', 'class': 'meta-headers' })
+          .adopt(new Element('th', { 'html': "", 'colspan': 2 }),
+                 new Element('th', { 'html': "consumed", 'colspan': 3 }),
+                 new Element('th', { 'html': "processes", 'colspan': 4 })
+                )
+              )
+        .adopt(new Element('tr', { 'id': 'task[' + this.tableType + '][header]' })
+          .adopt(new Element('th', { 'id': 'task[' + this.tableType + '][header][task_id]',   'html': "task_id" }),
+                 new Element('th', { 'id': 'task[' + this.tableType + '][header][name]',      'html': "name" }),
+                 new Element('th', { 'id': 'task[' + this.tableType + '][header][resource_consumption][cpu]', 'html': "cpu" }),
+                 new Element('th', { 'id': 'task[' + this.tableType + '][header][resource_consumption][ram]', 'html': "ram" }),
+                 new Element('th', { 'id': 'task[' + this.tableType + '][header][resource_consumption][disk]','html': "disk" }),
+                 new Element('th', { 'id': 'task[' + this.tableType + '][header][processes][waiting]', 'html': "waiting" }),
+                 new Element('th', { 'id': 'task[' + this.tableType + '][header][processes][running]', 'html': "running"}),
+                 new Element('th', { 'id': 'task[' + this.tableType + '][header][processes][success]', 'html': "success" }),
+                 new Element('th', { 'id': 'task[' + this.tableType + '][header][processes][failed]',  'html': "failed" })
+                )
+              )
+            )
+    this.tbody = new Element('tbody', { 'id': this.tableType + '_tbody' })
+    this.element.adopt(this.tbody)
+  },
+
+  toElement: function() {
+    return this.element
+  },
+
+  startPolling: function() {
+    this.taskIdPoller = setInterval(this.tableType + 'TableManager.getChildren();', 2500)
+    this.dataPoller = setInterval(this.tableType + 'TableManager.refreshVisibleChildren();', 5300)
+  },
+
+  getChildren: function() {
+    new Request.JSON({
+      'url': '/j/task_ids/' + this.tableType + '/-20',
+      'method': 'get',
+      'onComplete': function(response) {
+        if (response) {
+          var newChildren = Array.from(response.task_ids)
+          this.visibleChildren = newChildren
+
+          // first set all children to invisible
+          for (taskId in this.activeTasks) {
+            this.activeTasks[taskId].visible = false
+          }
+
+          // set new children visible
+          for (var k = 0; k < newChildren.length; k++) {
+            if (!(newChildren[k] in this.activeTasks)) {
+              this.activeTasks[newChildren[k]] = new Task(newChildren[k], this.tableType)
+            }
+            this.activeTasks[newChildren[k]].visible = true
+          }
+
+          // clear then adopt them
+          while (this.tbody.childNodes.length > 0)
+            this.tbody.removeChild(this.tbody.firstChild);
+          for (var k = 0; k < newChildren.length; k++)
+            this.tbody.adopt(this.activeTasks[newChildren[k]].element)
+        } else {
+          clearInterval(this.taskIdPoller)
+        }
+      }.bind(this)
+    }).send()
+  },
+
+  refreshVisibleChildren: function() {
+     // first get visible children
+     new Request.JSON({
+       'url': '/j/task',
+       'method': 'get',
+       'data': { 'task_id': this.visibleChildren.join() },
+       'onComplete': function(response) {
+         if (response) {
+           if (!response) return;
+           for (taskId in response) {
+             if (response.hasOwnProperty(taskId))
+               this.activeTasks[taskId].applyUpdate(response[taskId])
+           }
+         } else {
+           clearInterval(this.dataPoller)
+         }
+       }.bind(this)
+     }).send()
+   },
+})

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/apache/thermos/observer/http/file_browser.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/thermos/observer/http/file_browser.py b/src/main/python/apache/thermos/observer/http/file_browser.py
new file mode 100644
index 0000000..45c5708
--- /dev/null
+++ b/src/main/python/apache/thermos/observer/http/file_browser.py
@@ -0,0 +1,124 @@
+import os
+import pprint
+from xml.sax.saxutils import escape
+
+from twitter.common import log
+from twitter.common.http import HttpServer
+
+import bottle
+from mako.template import Template
+
+from .templating import HttpTemplate
+
+
+MB = 1024 * 1024
+DEFAULT_CHUNK_LENGTH = MB
+MAX_CHUNK_LENGTH = 16 * MB
+
+
+def _read_chunk(filename, offset=None, length=None):
+  offset = offset or -1
+  length = length or -1
+
+  try:
+    length = long(length)
+    offset = long(offset)
+  except ValueError:
+    return {}
+
+  if not os.path.isfile(filename):
+    return {}
+
+  try:
+    fstat = os.stat(filename)
+  except Exception as e:
+    log.error('Could not read from %s: %s' % (filename, e))
+    return {}
+
+  if offset == -1:
+    offset = fstat.st_size
+
+  if length == -1:
+    length = fstat.st_size - offset
+
+  with open(filename, "r") as fp:
+    fp.seek(offset)
+    try:
+      data = fp.read(length)
+    except IOError as e:
+      log.error('Failed to read %s: %s' % (filename, e), exc_info=True)
+      return {}
+
+  if data:
+    return dict(offset=offset, length=len(data), data=escape(data.decode('utf8', 'replace')))
+
+  return dict(offset=offset, length=0)
+
+
+class TaskObserverFileBrowser(object):
+  """
+    Mixin for Thermos observer File browser.
+  """
+
+  @HttpServer.route("/logs/:task_id/:process/:run/:logtype")
+  @HttpServer.mako_view(HttpTemplate.load('logbrowse'))
+  def handle_logs(self, task_id, process, run, logtype):
+    types = self._observer.logs(task_id, process, int(run))
+    if logtype not in types:
+      bottle.abort(404, "No such log type: %s" % logtype)
+    base, path = types[logtype]
+    filename = os.path.join(base, path)
+    return {
+      'task_id': task_id,
+      'filename': filename,
+      'process': process,
+      'run': run,
+      'logtype': logtype
+    }
+
+  @HttpServer.route("/logdata/:task_id/:process/:run/:logtype")
+  def handle_logdata(self, task_id, process, run, logtype):
+    offset = self.Request.GET.get('offset', -1)
+    length = self.Request.GET.get('length', -1)
+    types = self._observer.logs(task_id, process, int(run))
+    if logtype not in types:
+      return {}
+    chroot, path = types[logtype]
+    return _read_chunk(os.path.join(chroot, path), offset, length)
+
+  @HttpServer.route("/file/:task_id/:path#.+#")
+  @HttpServer.mako_view(HttpTemplate.load('filebrowse'))
+  def handle_file(self, task_id, path):
+    if path is None:
+      bottle.abort(404, "No such file")
+    return {
+      'task_id': task_id,
+      'filename': path,
+    }
+
+  @HttpServer.route("/filedata/:task_id/:path#.+#")
+  def handle_filedata(self, task_id, path):
+    if path is None:
+      return {}
+    offset = self.Request.GET.get('offset', -1)
+    length = self.Request.GET.get('length', -1)
+    chroot, path = self._observer.valid_file(task_id, path)
+    if chroot is None or path is None:
+      return {}
+    return _read_chunk(os.path.join(chroot, path), offset, length)
+
+  @HttpServer.route("/browse/:task_id")
+  @HttpServer.route("/browse/:task_id/:path#.*#")
+  @HttpServer.mako_view(HttpTemplate.load('filelist'))
+  def handle_dir(self, task_id, path=None):
+    if path == "":
+      path = None
+    chroot, path = self._observer.valid_path(task_id, path)
+    return dict(task_id=task_id, chroot=chroot, path=path)
+
+  @HttpServer.route("/download/:task_id/:path#.+#")
+  def handle_download(self, task_id, path=None):
+    chroot, path = self._observer.valid_path(task_id, path)
+    if path is None:
+      bottle.abort(404, "No such file")
+    return bottle.static_file(path, root=chroot, download=True)

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/apache/thermos/observer/http/http_observer.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/thermos/observer/http/http_observer.py b/src/main/python/apache/thermos/observer/http/http_observer.py
new file mode 100644
index 0000000..b73f17f
--- /dev/null
+++ b/src/main/python/apache/thermos/observer/http/http_observer.py
@@ -0,0 +1,133 @@
+"""HTTP interface to the Thermos TaskObserver
+
+This modules provides an HTTP server which exposes information about Thermos tasks running on a
+system. To do this, it relies heavily on the Thermos TaskObserver.
+
+"""
+
+import os
+import socket
+
+from twitter.common import log
+from twitter.common.http import HttpServer
+
+from .file_browser import TaskObserverFileBrowser
+from .json import TaskObserverJSONBindings
+from .static_assets import StaticAssets
+from .templating import HttpTemplate
+
+
+class BottleObserver(HttpServer, StaticAssets, TaskObserverFileBrowser, TaskObserverJSONBindings):
+  """
+    A bottle wrapper around a Thermos TaskObserver.
+  """
+
+  def __init__(self, observer):
+    self._observer = observer
+    StaticAssets.__init__(self)
+    TaskObserverFileBrowser.__init__(self)
+    TaskObserverJSONBindings.__init__(self)
+    HttpServer.__init__(self)
+
+  @HttpServer.route("/")
+  @HttpServer.view(HttpTemplate.load('index'))
+  def handle_index(self):
+    return dict(hostname=socket.gethostname())
+
+  @HttpServer.route("/main")
+  @HttpServer.route("/main/:type")
+  @HttpServer.route("/main/:type/:offset")
+  @HttpServer.route("/main/:type/:offset/:num")
+  @HttpServer.mako_view(HttpTemplate.load('main'))
+  def handle_main(self, type=None, offset=None, num=None):
+    if type not in (None, 'all', 'finished', 'active'):
+      HttpServer.abort(404, 'Invalid task type: %s' % type)
+    if offset is not None:
+      try:
+        offset = int(offset)
+      except ValueError:
+        HttpServer.abort(404, 'Invalid offset: %s' % offset)
+    if num is not None:
+      try:
+        num = int(num)
+      except ValueError:
+        HttpServer.abort(404, 'Invalid count: %s' % num)
+    return self._observer.main(type, offset, num)
+
+  @HttpServer.route("/task/:task_id")
+  @HttpServer.mako_view(HttpTemplate.load('task'))
+  def handle_task(self, task_id):
+    task = self.get_task(task_id)
+    processes = self._observer.processes([task_id])
+    if not processes.get(task_id, None):
+      HttpServer.abort(404, 'Unknown task_id: %s' % task_id)
+    processes = processes[task_id]
+    state = self._observer.state(task_id)
+
+    return dict(
+      task_id = task_id,
+      task = task,
+      statuses = self._observer.task_statuses(task_id),
+      user = task['user'],
+      ports = task['ports'],
+      processes = processes,
+      chroot = state.get('sandbox', ''),
+      launch_time = state.get('launch_time', 0),
+      hostname = state.get('hostname', 'localhost'),
+    )
+
+  def get_task(self, task_id):
+    task = self._observer._task(task_id)
+    if not task:
+      HttpServer.abort(404, "Failed to find task %s.  Try again shortly." % task_id)
+    return task
+
+  @HttpServer.route("/rawtask/:task_id")
+  @HttpServer.mako_view(HttpTemplate.load('rawtask'))
+  def handle_rawtask(self, task_id):
+    task = self.get_task(task_id)
+    state = self._observer.state(task_id)
+    return dict(
+      hostname = state.get('hostname', 'localhost'),
+      task_id = task_id,
+      task_struct = task['task_struct']
+    )
+
+  @HttpServer.route("/process/:task_id/:process_id")
+  @HttpServer.mako_view(HttpTemplate.load('process'))
+  def handle_process(self, task_id, process_id):
+    all_processes = {}
+    current_run = self._observer.process(task_id, process_id)
+    if not current_run:
+      HttpServer.abort(404, 'Invalid task/process combination: %s/%s' % (task_id, process_id))
+    process = self._observer.process_from_name(task_id, process_id)
+    if process is None:
+      msg = 'Could not recover process: %s/%s' % (task_id, process_id)
+      log.error(msg)
+      HttpServer.abort(404, msg)
+
+    current_run_number = current_run['process_run']
+    all_processes[current_run_number] = current_run
+    for run in range(current_run_number):
+      all_processes[run] = self._observer.process(task_id, process_id, run)
+    def convert_process_tuple(run_tuple):
+      process_tuple = dict(state = run_tuple['state'])
+      if 'start_time' in run_tuple:
+        process_tuple.update(start_time = run_tuple['start_time'])
+      if 'stop_time' in run_tuple:
+        process_tuple.update(stop_time = run_tuple['stop_time'])
+      return process_tuple
+
+    template = {
+      'task_id': task_id,
+      'process': {
+         'name': process_id,
+         'status': all_processes[current_run_number]["state"],
+         'cmdline': process.cmdline().get()
+      },
+    }
+    template['process'].update(**all_processes[current_run_number].get('used', {}))
+    template['runs'] = dict((run, convert_process_tuple(run_tuple))
+        for run, run_tuple in all_processes.items())
+    log.info('Rendering template is: %s' % template)
+    return template

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/apache/thermos/observer/http/json.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/thermos/observer/http/json.py b/src/main/python/apache/thermos/observer/http/json.py
new file mode 100644
index 0000000..9212f0e
--- /dev/null
+++ b/src/main/python/apache/thermos/observer/http/json.py
@@ -0,0 +1,56 @@
+import urllib
+
+from twitter.common.http import HttpServer
+
+
+class TaskObserverJSONBindings(object):
+  """
+    Mixin for Thermos observer JSON endpoints.
+  """
+
+  @HttpServer.route("/j/task_ids")
+  @HttpServer.route("/j/task_ids/:which")
+  @HttpServer.route("/j/task_ids/:which/:offset")
+  @HttpServer.route("/j/task_ids/:which/:offset/:num")
+  def handle_task_ids(self, which=None, offset=None, num=None):
+    return self._observer.task_ids(
+      which,
+      int(offset) if offset is not None else 0,
+      int(num) if num is not None else 20)
+
+  @HttpServer.route("/j/task_id_count")
+  def handle_task_id_count(self):
+    return self._observer.task_id_count()
+
+  @HttpServer.route("/j/task")
+  def handle_tasks(self):
+    """
+      Additional parameters:
+        task_id = comma separated list of task_ids.
+    """
+    task_ids = HttpServer.Request.GET.get('task_id', [])
+    if task_ids:
+      task_ids = urllib.unquote(task_ids).split(',')
+    return self._observer.task(task_ids)
+
+  @HttpServer.route("/j/task/:task_id")
+  def handle_task(self, task_id):
+    return self._observer.task([task_id])
+
+  @HttpServer.route("/j/process/:task_id")
+  @HttpServer.route("/j/process/:task_id/:process")
+  @HttpServer.route("/j/process/:task_id/:process/:run")
+  def handle_process(self, task_id, process=None, run=None):
+    return self._observer.process(task_id, process, run)
+
+  @HttpServer.route("/j/processes")
+  def handle_processes(self):
+    """
+      Additional parameters:
+        task_ids = comma separated list of task_ids.
+    """
+    task_ids = HttpServer.Request.GET.get('task_id', [])
+    if task_ids:
+      task_ids = urllib.unquote(task_ids).split(',')
+    return self._observer.processes(task_ids)
+

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/apache/thermos/observer/http/static_assets.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/thermos/observer/http/static_assets.py b/src/main/python/apache/thermos/observer/http/static_assets.py
new file mode 100644
index 0000000..cf3ff08
--- /dev/null
+++ b/src/main/python/apache/thermos/observer/http/static_assets.py
@@ -0,0 +1,43 @@
+import mimetypes
+import os
+
+from twitter.common import log
+from twitter.common.http.server import HttpServer
+
+from bottle import HTTPResponse
+import pkg_resources
+
+
+class StaticAssets(object):
+  """
+    Serve the /assets directory.
+  """
+  def __init__(self):
+    self._assets = {}
+    self._detect_assets()
+
+  def _detect_assets(self):
+    log.info('detecting assets...')
+    assets = pkg_resources.resource_listdir(__name__, 'assets')
+    cached_assets = {}
+    for asset in assets:
+      log.info('  detected asset: %s' % asset)
+      cached_assets[asset] = pkg_resources.resource_string(
+        __name__, os.path.join('assets', asset))
+    self._assets = cached_assets
+
+  @HttpServer.route("/favicon.ico")
+  def handle_favicon(self):
+    HttpServer.redirect("/assets/favicon.ico")
+
+  @HttpServer.route("/assets/:filename")
+  def handle_asset(self, filename):
+    # TODO(wickman)  Add static_content to bottle.
+    if filename in self._assets:
+      mimetype, encoding = mimetypes.guess_type(filename)
+      headers = {}
+      if mimetype: headers['Content-Type'] = mimetype
+      if encoding: headers['Content-Encoding'] = encoding
+      return HTTPResponse(self._assets[filename], header=headers)
+    else:
+      HttpServer.abort(404, 'Unknown asset: %s' % filename)

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/apache/thermos/observer/http/templates/filebrowse.tpl
----------------------------------------------------------------------
diff --git a/src/main/python/apache/thermos/observer/http/templates/filebrowse.tpl b/src/main/python/apache/thermos/observer/http/templates/filebrowse.tpl
new file mode 100644
index 0000000..871e31e
--- /dev/null
+++ b/src/main/python/apache/thermos/observer/http/templates/filebrowse.tpl
@@ -0,0 +1,81 @@
+<%def name="download_link()">
+  <a href='/download/${task_id}/${filename}'><font size=1>download</font></a>
+</%def>
+
+<html>
+
+<head>
+  <meta charset="utf-8">
+  <title></title>
+
+  <style type="text/css">
+    .log {
+      font-family: "Inconsolata", "Monaco", "Courier New", "Courier";
+      line-height:14px;
+      font-size: 12px;
+    }
+
+    .invert {
+      color: #FFFFFF;
+      text-decoration: none;
+      background: #000000;
+    }
+  </style>
+</head>
+
+<link rel="stylesheet"
+      type="text/css"
+      href="/assets/bootstrap.css"/>
+<style type="text/css">
+div.tight
+{
+  height:100%;
+  overflow:scroll;
+}
+</style>
+
+<title>file browser ${task_id}</title>
+<body>
+  <div> <strong> filename </strong> ${filename} </div>
+  <div> <strong> dl </strong> ${download_link()} </div>
+  <div style="position: absolute; left: 5px; top: 0px;">
+    <p id="indicator" class="log invert"></p>
+  </div>
+
+  <div id="data" class="log" style="white-space:pre-wrap; background-color:#EEEEEE;"></div>
+</body>
+
+<script src="/assets/jquery.js"></script>
+<script src="/assets/jquery.pailer.js"></script>
+
+<script>
+  function resize() {
+    var margin_left = parseInt($('body').css('margin-left'));
+    var margin_top = parseInt($('body').css('margin-top'));
+    var margin_bottom = parseInt($('body').css('margin-bottom'));
+    $('#data').width($(window).width() - margin_left);
+    $('#data').height($(window).height() - margin_top - margin_bottom);
+  }
+
+  $(window).resize(resize);
+
+  $(document).ready(function() {
+    resize();
+
+    $('#data').pailer({
+      'read': function(options) {
+        var settings = $.extend({
+          'offset': -1,
+          'length': -1
+        }, options);
+
+        var url = "/filedata/${task_id}/${filename}"
+          + '?offset=' + settings.offset
+          + '&length=' + settings.length;
+        return $.getJSON(url);
+      },
+      'indicator': $('#indicator')
+    });
+  });
+</script>
+</html>

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/apache/thermos/observer/http/templates/filelist.tpl
----------------------------------------------------------------------
diff --git a/src/main/python/apache/thermos/observer/http/templates/filelist.tpl b/src/main/python/apache/thermos/observer/http/templates/filelist.tpl
new file mode 100644
index 0000000..12a257e
--- /dev/null
+++ b/src/main/python/apache/thermos/observer/http/templates/filelist.tpl
@@ -0,0 +1,141 @@
+<%doc>
+ Template arguments:
+   task_id
+   chroot
+   path
+   dirs
+   files
+</%doc>
+
+
+<%!
+  import os
+
+  from datetime import datetime
+  import grp
+  import os
+  import pwd
+  import stat
+  import sys
+
+  NOW = datetime.now()
+
+  def format_mode(sres):
+    mode = sres.st_mode
+
+    root = (mode & 0700) >> 6
+    group = (mode & 0070) >> 3
+    user = (mode & 07)
+
+    def stat_type(md):
+      if stat.S_ISDIR(md):
+        return 'd'
+      elif stat.S_ISSOCK(md):
+        return 's'
+      else:
+        return '-'
+
+    def triple(md):
+      return '%c%c%c' % (
+        'r' if md & 0b100 else '-',
+        'w' if md & 0b010 else '-',
+        'x' if md & 0b001 else '-')
+
+    return ''.join([stat_type(mode), triple(root), triple(group), triple(user)])
+
+  def format_mtime(mtime):
+    dt = datetime.fromtimestamp(mtime)
+    return '%s %2d %5s' % (dt.strftime('%b'), dt.day,
+      dt.year if dt.year != NOW.year else dt.strftime('%H:%M'))
+
+  def format_prefix(filename, sres):
+    try:
+      pwent = pwd.getpwuid(sres.st_uid)
+      user = pwent.pw_name
+    except KeyError:
+      user = sres.st_uid
+
+    try:
+      grent = grp.getgrgid(sres.st_gid)
+      group = grent.gr_name
+    except KeyError:
+      group = sres.st_gid
+
+    return '%s %3d %10s %10s %10d %s' % (
+      format_mode(sres),
+      sres.st_nlink,
+      user,
+      group,
+      sres.st_size,
+      format_mtime(sres.st_mtime),
+    )
+%>
+
+<%def name="download_link(filename)"><a href='/download/${task_id}/${os.path.join(path, filename)}'><font size=1>dl</font></a></%def>
+<%def name="directory_link(dirname)"><a href='/browse/${task_id}/${os.path.join(path, dirname)}'>${dirname}</a></%def>
+<%def name="file_link(filename)"><a href='/file/${task_id}/${os.path.join(path, filename)}'>${filename}</a></%def>
+
+<html>
+
+<link rel="stylesheet"
+      type="text/css"
+      href="/assets/bootstrap.css"/>
+<style type="text/css">
+div.tight
+{
+  height:85%;
+  overflow:auto;
+}
+</style>
+
+<title>path browser for ${task_id}</title>
+
+
+% if chroot is not None:
+<body>
+  <div class="container">
+  <div class="span6">
+    <strong> task id </strong> ${task_id}
+  </div>
+  <div class="span6">
+    <strong> path </strong> ${path}
+  </div>
+  <div class="span12 tight">
+    <pre>
+
+% if path != ".":
+  <%
+     listing = ['..'] + os.listdir(os.path.join(chroot, path))
+  %>\
+% else:
+  <%
+     listing = os.listdir(os.path.join(chroot, path))
+  %>\
+% endif
+
+<% listing.sort() %>
+
+% for fn in listing:
+<%
+  try:
+    sres = os.stat(os.path.join(chroot, path, fn))
+  except OSError:
+    continue
+%>\
+  % if not stat.S_ISDIR(sres.st_mode):
+${format_prefix(fn, sres)} ${file_link(fn)} ${download_link(fn)}
+  % else:
+${format_prefix(fn, sres)} ${directory_link(fn)}
+  % endif
+% endfor
+    </pre>
+  </div>
+  </div>
+</body>
+% else:
+<body>
+  This task is running without a chroot.
+</body>
+% endif
+
+</html>

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/apache/thermos/observer/http/templates/home.tpl
----------------------------------------------------------------------
diff --git a/src/main/python/apache/thermos/observer/http/templates/home.tpl b/src/main/python/apache/thermos/observer/http/templates/home.tpl
new file mode 100644
index 0000000..4e7426b
--- /dev/null
+++ b/src/main/python/apache/thermos/observer/http/templates/home.tpl
@@ -0,0 +1,58 @@
+<html>
+<title>thermos(${hostname})</title>
+
+<link rel="stylesheet"
+      type="text/css"
+      href="/assets/bootstrap.css"/>
+
+<body>
+
+<%!
+ import socket
+ import time
+
+ def pretty_time(seconds):
+   return time.asctime(time.localtime(seconds))
+%>
+
+<div class="container">
+  <h3> host ${socket.gethostname()} </h3>
+
+  <div class="content" id="defaultLayout">
+     <table class="zebra-striped">
+     <thead>
+       <tr>
+         <th colspan=3> task </th>
+         <th colspan=4> resources </th>
+         <th colspan=3> links </th>
+       </tr>
+
+       <tr>
+         <th> name </th> <th> role </th> <th> status </th>
+         <th> procs </th> <th> cpu </th> <th> ram </th> <th> disk </th>
+         <th> task </th> <th> chroot </th> <th> ports </th>
+       </tr>
+      </thead>
+      <tbody>
+
+      % for proc_name, proc in sorted(processes.items()):
+       <tr>
+         <td> ${proc["process_name"]} </td>
+         <td> ${proc["process_run"]} </td>
+         <td> ${proc["state"]} </td>
+         <td> ${pretty_time(float(proc["start_time"])/1000.0) if "start_time" in proc else ""} </td>
+         <td> ${pretty_time(float(proc["stop_time"])/1000.0) if "stop_time" in proc else ""} </td>
+         <td> ${'%.3f' % proc["used"]["cpu"] if "used" in proc else ""} </td>
+         <td> ${'%dMB' % (proc["used"]["ram"] / 1024 / 1024) if "used" in proc else ""} </td>
+         <td> <a href="/logs/${task_id}/${proc["process_name"]}/${proc["process_run"]}/stdout">stdout</a> </td>
+         <td> <a href="/logs/${task_id}/${proc["process_name"]}/${proc["process_run"]}/stderr">stderr</a> </td>
+       </tr>
+      % endfor
+     </tbody>
+     </table>
+  </div>
+
+</div>
+
+</body>
+</html>

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/apache/thermos/observer/http/templates/index.tpl
----------------------------------------------------------------------
diff --git a/src/main/python/apache/thermos/observer/http/templates/index.tpl b/src/main/python/apache/thermos/observer/http/templates/index.tpl
new file mode 100644
index 0000000..d662c60
--- /dev/null
+++ b/src/main/python/apache/thermos/observer/http/templates/index.tpl
@@ -0,0 +1,51 @@
+<html>
+<title>thermos({{hostname}})</title>
+
+<link rel="stylesheet"
+      type="text/css"
+      href="assets/bootstrap.css"/>
+
+<script src="assets/jquery.js"></script>
+
+<body>
+
+<div class="container" id="defaultLayout">
+  <div id="activeTaskContainer" class='uber-container'>
+    <div class="active-container" data-url="main/active">
+    </div>
+  </div>
+  <br><br>
+  <div id="finishedTaskContainer" class='uber-container'>
+    <div class="finished-container" data-url="main/finished">
+    </div>
+  </div>
+</div>
+
+<script type="text/javascript">
+
+$(document).on('click', 'a.refresh-container', function(e) {
+   e.preventDefault()
+   topLevelDivContainer = $(this).closest('.uber-container')
+   divDataUrl = $(this).attr('data-url')
+   $.ajax({
+      'type': 'GET',
+      'dataType': 'html',
+      'url': divDataUrl,
+      success: function(data, xhr, err) {
+        $(topLevelDivContainer).html(data)
+      }
+   })
+ })
+
+refreshDivs = function() {
+  $('#activeTaskContainer').load($('.uber-container .active-container').attr('data-url'))
+  $('#finishedTaskContainer').load($('.uber-container .finished-container').attr('data-url'))
+}
+
+$(document).bind('ready', refreshDivs)
+setInterval(refreshDivs, 10000)
+
+</script>
+
+</body>
+</html>

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/apache/thermos/observer/http/templates/logbrowse.tpl
----------------------------------------------------------------------
diff --git a/src/main/python/apache/thermos/observer/http/templates/logbrowse.tpl b/src/main/python/apache/thermos/observer/http/templates/logbrowse.tpl
new file mode 100644
index 0000000..059979c
--- /dev/null
+++ b/src/main/python/apache/thermos/observer/http/templates/logbrowse.tpl
@@ -0,0 +1,80 @@
+<%def name="download_link()">
+  <a href='/download/${task_id}/${filename}'><font size=1>download</font></a>
+</%def>
+
+<html>
+
+<head>
+  <meta charset="utf-8">
+  <title></title>
+
+  <style type="text/css">
+    .log {
+      font-family: "Inconsolata", "Monaco", "Courier New", "Courier";
+      line-height:14px;
+      font-size: 12px;
+    }
+
+    .invert {
+      color: #FFFFFF;
+      text-decoration: none;
+      background: #000000;
+    }
+  </style>
+</head>
+
+<link rel="stylesheet"
+      type="text/css"
+      href="/assets/bootstrap.css"/>
+<style type="text/css">
+div.tight
+{
+  height:100%;
+  overflow:scroll;
+}
+</style>
+
+<title>log browser ${task_id}</title>
+<body>
+  <div> <strong> log </strong> ${logtype} <strong> ${download_link()} </strong> </div>
+  <div style="position: absolute; left: 5px; top: 0px;">
+    <p id="indicator" class="log invert"></p>
+  </div>
+
+  <div id="data" class="log" style="white-space:pre-wrap; background-color:#EEEEEE;"></div>
+</body>
+
+<script src="/assets/jquery.js"></script>
+<script src="/assets/jquery.pailer.js"></script>
+
+<script>
+  function resize() {
+    var margin_left = parseInt($('body').css('margin-left'));
+    var margin_top = parseInt($('body').css('margin-top'));
+    var margin_bottom = parseInt($('body').css('margin-bottom'));
+    $('#data').width($(window).width() - margin_left);
+    $('#data').height($(window).height() - margin_top - margin_bottom);
+  }
+
+  $(window).resize(resize);
+
+  $(document).ready(function() {
+    resize();
+
+    $('#data').pailer({
+      'read': function(options) {
+        var settings = $.extend({
+          'offset': -1,
+          'length': -1
+        }, options);
+
+        var url = "/logdata/${task_id}/${process}/${run}/${logtype}"
+          + '?offset=' + settings.offset
+          + '&length=' + settings.length;
+        return $.getJSON(url);
+      },
+      'indicator': $('#indicator')
+    });
+  });
+</script>
+</html>

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/apache/thermos/observer/http/templates/main.tpl
----------------------------------------------------------------------
diff --git a/src/main/python/apache/thermos/observer/http/templates/main.tpl b/src/main/python/apache/thermos/observer/http/templates/main.tpl
new file mode 100644
index 0000000..d3419ab
--- /dev/null
+++ b/src/main/python/apache/thermos/observer/http/templates/main.tpl
@@ -0,0 +1,101 @@
+<%doc>
+ Template arguments:
+  type
+  offset
+  num
+  tasks
+  task_count
+</%doc>
+
+<%!
+import socket
+import time
+
+try:
+  from twitter.common import app
+  observer_port = app.get_options().twitter_common_http_root_server_port
+except (ImportError, AttributeError) as e:
+  observer_port = 1338
+
+host = socket.gethostname()
+num_tasks = 20
+
+def pretty_time(seconds=time.time()):
+  return time.strftime('%m/%d %H:%M:%S', time.gmtime(seconds))
+
+%>
+
+<div class="${type}-container"
+     data-url="/main/${type}/${offset}/${num}">
+  <div class="row-fluid">
+    <div class="span2">
+    % if offset > 0:
+      <a class="refresh-container" href="#"
+         data-url="/main/${type}/${offset-num_tasks}/${num}">
+         &larr; newer
+      </a>
+    % endif
+    </div>
+    % if offset + num_tasks < task_count:
+    <div class="span2">
+      <a class="refresh-container" href="#"
+         data-url="/main/${type}/${offset+num_tasks}/${num}">
+         older &rarr;
+      </a>
+    </div>
+    % endif
+  </div>
+  <div class="content" id="defaultLayout">
+     <table class="table table-bordered table-condensed table-striped" style="empty-cells:show;">
+     <thead>
+       <tr>
+         <th colspan=4> ${type} tasks ${offset}...${min(task_count, offset+num_tasks) - 1} of ${task_count} </th>
+         <th colspan=3> resources </th>
+         <th colspan=3> links </th>
+       </tr>
+
+       <tr>
+         <th> name </th> <th> role </th> <th> launched </th> <th> status </th>
+         <th> cpu </th> <th> ram </th> <th> disk </th>
+         <th> task </th> <th> chroot </th> <th> ports </th>
+       </tr>
+      </thead>
+      <tbody>
+
+      % for row in tasks:
+       <tr>
+         <td> ${row["name"]} </td>
+         <td> ${row["role"]} </td>
+         <td> ${pretty_time(row["launch_timestamp"])} </td>
+         <td> ${row["state"]} @
+              ${pretty_time(row["state_timestamp"]) if row["state_timestamp"] else ""}</td>
+
+         <td> ${'%.3f' % row["cpu"] if row["cpu"] > 0 else ""} </td>
+         <td> ${'%.1fMB' % (row["ram"] / 1024. / 1024.) if row["ram"] > 0 else ""} </td>
+         <td> ${'%dGB' % (row["disk"] / 1024 / 1024 / 1024) if row["disk"] > 0 else ""} </td>
+
+         <td> <a href="/task/${row['task_id']}">info</a> </td>
+         <td> <a href="/browse/${row['task_id']}">browse</a> </td>
+         <td>
+         % if type == 'active':
+           % if 'http' in row["ports"]:
+             <a href="http://${host}:${row['ports']['http']}">http</a>
+           % else:
+             <span class="muted">http</span>
+           % endif
+           % if 'admin' in row["ports"]:
+             <a href="http://${host}:${row['ports']['admin']}">admin</a>
+           % else:
+             <span class="muted">admin</span>
+           % endif
+           % if set(row["ports"]) - set(['http', 'admin']):
+             <a href="/task/${row['task_id']}">...</a>
+           % endif
+         % endif
+         </td>
+       </tr>
+      % endfor
+     </tbody>
+     </table>
+  </div>
+</div>

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/apache/thermos/observer/http/templates/process.tpl
----------------------------------------------------------------------
diff --git a/src/main/python/apache/thermos/observer/http/templates/process.tpl b/src/main/python/apache/thermos/observer/http/templates/process.tpl
new file mode 100644
index 0000000..a7fb90d
--- /dev/null
+++ b/src/main/python/apache/thermos/observer/http/templates/process.tpl
@@ -0,0 +1,109 @@
+<%doc>
+Template arguments:
+  task_id
+  process {
+    cpu:/ram: (optional)
+    cmdline:
+    name:
+  }
+
+  runs = {
+   number: {
+     start_time: (optional)
+     stop_time: (optional)
+     state:
+   }
+  }
+
+  --
+  for each run:
+     run | state | started | finished | stdout | stderr
+</%doc>
+
+<%!
+import socket
+import time
+from xml.sax.saxutils import escape
+
+def pretty_time(seconds=time.time()):
+  return time.strftime('%m/%d %H:%M:%S', time.gmtime(seconds))
+%>
+
+<html>
+<title>thermos(${socket.gethostname()})</title>
+
+<link rel="stylesheet"
+      type="text/css"
+      href="/assets/bootstrap.css"/>
+<body>
+
+
+<div class="container">
+  <div class="row">
+    <div class="span6" id="leftBar">
+      <dl>
+        <dt> process </dt>
+          <dd> <strong> parent task </strong> <a href="/task/${task_id}">${task_id}</a> </dd>
+          <dd> <strong> process name </strong> ${process["name"]} </dd>
+          <dd> <strong> status </strong> ${process["status"]} </dd>
+      </dl>
+    </div>
+
+    <div class="span6" id="rightBar">
+      <dl>
+        <dt> resources </dt>
+          <dd> <strong> cpu </strong> ${'%.3f' % process["cpu"] if "cpu" in process else "N/A"} </dd>
+          <dd> <strong> ram </strong> ${'%.1fMB' % (process["ram"] / 1024. / 1024.) if "ram" in process else "N/A"} </dd>
+          <dd> <strong> total user </strong> ${'%.1fs' % process["user"] if "user" in process else "N/A"} </dd>
+          <dd> <strong> total sys </strong> ${'%.1fs' % process["system"] if "system" in process else "N/A"} </dd>
+          <dd> <strong> threads </strong> ${process["threads"] if "threads" in process else "N/A"} </dd>
+      </dl>
+    </div>
+  </div>
+
+
+  <strong> cmdline </strong><br>
+  <div class="container">
+<pre>
+${escape(process["cmdline"])}
+</pre>
+  </div><br><br>
+
+
+  <strong> runs </strong>
+  <div class="container">
+     <table class="table table-bordered table-condensed table-striped" style="empty-cells:show;">
+     <thead>
+       <tr>
+         <th colspan=2> </th>
+         <th colspan=2> time </th>
+         <th colspan=2> logs </th>
+       </tr>
+
+       <tr>
+         <th> run </th>
+         <th> status </th>
+         <th> started </th> <th> finished </th>
+         <th> stdout </th> <th> stderr </th>
+       </tr>
+      </thead>
+      <tbody>
+
+      % for run, process_dict in sorted(runs.items(), reverse=True):
+       <tr>
+         <td> ${run} </td>
+         <td> ${process_dict["state"]} </td>
+         <td> ${pretty_time(process_dict["start_time"]) if "start_time" in process_dict else ""} </td>
+         <td> ${pretty_time(process_dict["stop_time"]) if "stop_time" in process_dict else ""} </td>
+         <td> <a href="/logs/${task_id}/${process["name"]}/${run}/stdout">stdout</a> </td>
+         <td> <a href="/logs/${task_id}/${process["name"]}/${run}/stderr">stderr</a> </td>
+       </tr>
+      % endfor
+     </tbody>
+     </table>
+  </div>
+
+</div>
+
+</body>
+</html>

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/apache/thermos/observer/http/templates/rawtask.tpl
----------------------------------------------------------------------
diff --git a/src/main/python/apache/thermos/observer/http/templates/rawtask.tpl b/src/main/python/apache/thermos/observer/http/templates/rawtask.tpl
new file mode 100644
index 0000000..c4b7f52
--- /dev/null
+++ b/src/main/python/apache/thermos/observer/http/templates/rawtask.tpl
@@ -0,0 +1,28 @@
+<%doc>
+ Template arguments:
+  hostname
+  task_id
+  task_struct
+</%doc>
+
+<html>
+<title>thermos(${hostname})</title>
+
+<link rel="stylesheet"
+      type="text/css"
+      href="/assets/bootstrap.css"/>
+<%!
+  from json import dumps
+  def print_task(task):
+    return dumps(task.get(), indent=4)
+%>
+
+<body>
+<div class="container">
+  <h3> task ${task_id} </h3>
+  <div class="content" id="rawTask">
+    <pre>${print_task(task_struct)}</pre>
+  </div>
+</div>
+</body>
+</html>

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/apache/thermos/observer/http/templates/task.tpl
----------------------------------------------------------------------
diff --git a/src/main/python/apache/thermos/observer/http/templates/task.tpl b/src/main/python/apache/thermos/observer/http/templates/task.tpl
new file mode 100644
index 0000000..b71c80d
--- /dev/null
+++ b/src/main/python/apache/thermos/observer/http/templates/task.tpl
@@ -0,0 +1,140 @@
+<%doc>
+ Template arguments:
+  task_id
+  task
+  statuses
+  user
+  ports
+  processes
+  chroot
+  launch_time
+  hostname
+</%doc>
+
+<html>
+<title>thermos(${hostname})</title>
+
+<link rel="stylesheet"
+      type="text/css"
+      href="/assets/bootstrap.css"/>
+
+<body>
+
+<%!
+ import time
+
+ def pretty_time(seconds=time.time()):
+   return time.strftime('%m/%d %H:%M:%S', time.gmtime(seconds))
+
+ def get(task, typ):
+   return task['resource_consumption'][typ]
+
+%>
+
+<div class="container">
+
+  <h3> task ${task_id} </h3>
+
+  <div class="row">
+    <div class="span6" id="leftBar">
+      <dl>
+        <dt> task </dt>
+          <dd> <strong> status </strong> ${statuses[-1][0] if statuses else 'UNKNOWN'} </dd>
+          <dd> <strong> user </strong> ${user} </dd>
+        <dt> ports </dt>
+        % for port_name, port_number in ports.items():
+          <dd> <strong> ${port_name} </strong> <a href="http://${hostname}:${port_number}">${port_number}</a> </dd>
+        %endfor
+      </dl>
+    </div>
+
+    <div class="span6" id="rightBar">
+      <dl>
+        <dt> header </dt>
+          <dd> <strong> chroot </strong> <a href="/browse/${task_id}">browse</a> </dd>
+          <dd> <strong> hostname </strong> <a href="/">${hostname}</a> </dd>
+          <dd> <strong> launch time </strong> ${pretty_time(launch_time)} </dd>
+          <dd> <strong> task config </strong> <a href="/rawtask/${task_id}">view</a> </dd>
+      </dl>
+    </div>
+  </div>
+
+  <div class="row-fluid">
+    <div class="span8" id="taskLayout">
+       <table class="table table-bordered table-condensed table-striped" style="empty-cells:show;">
+       <thead>
+         <tr>
+           <th colspan=1> task status </th>
+           <th colspan=1> time </th>
+         </tr>
+        </thead>
+
+        <tbody>
+        % for status, timestamp in sorted(statuses, key=lambda status: status[1]):
+         <tr>
+           <td> ${status} </td> <td> ${pretty_time(timestamp)} </td>
+         </tr>
+        % endfor
+       </tbody>
+       </table>
+    </div>
+
+    <div class="span4" id="taskResources">
+       <table class="table table-bordered table-condensed table-striped" style="empty-cells:show;">
+       <thead>
+         <tr>
+           <th> cpu </th>
+           <th> ram </th>
+           <th> disk </th>
+         </tr>
+        </thead>
+
+        <tbody>
+         <tr>
+           <td> ${'%.3f' % get(task, 'cpu')} </td>
+           <td> ${'%.1fMB' % (get(task, 'ram') / 1024. / 1024.)} </td>
+           <td> ${'%.1fGB' % (get(task, 'disk') / 1024. / 1024. / 1024.)} </td>
+         </tr>
+       </tbody>
+       </table>
+    </div>
+  </div>
+
+  <div class="content" id="processesLayout">
+     <table class="table table-bordered table-condensed table-striped" style="empty-cells:show;">
+     <thead>
+       <tr>
+         <th colspan=3> process </th>
+         <th colspan=2> time </th>
+         <th colspan=2> used </th>
+         <th colspan=2> logs </th>
+       </tr>
+
+       <tr>
+         <th> name </th> <th> run </th> <th> status </th> <th> started </th> <th> finished </th>
+         <th> cpu </th> <th> ram </th>
+         <th> stdout </th> <th> stderr </th>
+       </tr>
+      </thead>
+
+      <tbody>
+      % for proc_name, proc in sorted(processes.items(), key=lambda item: item[1].get('start_time')):
+       <tr>
+         <td> <a href="/process/${task_id}/${proc["process_name"]}">${proc["process_name"]}</a> </td>
+         <td> ${proc["process_run"]} </td>
+         <td> ${proc["state"]} </td>
+         <td> ${pretty_time(proc["start_time"]) if "start_time" in proc else ""} </td>
+         <td> ${pretty_time(proc["stop_time"]) if "stop_time" in proc else ""} </td>
+         <td> ${'%.3f' % proc["used"]["cpu"] if "used" in proc else ""} </td>
+         <td> ${'%dMB' % (proc["used"]["ram"] / 1024 / 1024) if "used" in proc else ""} </td>
+         <td> <a href="/logs/${task_id}/${proc["process_name"]}/${proc["process_run"]}/stdout">stdout</a> </td>
+         <td> <a href="/logs/${task_id}/${proc["process_name"]}/${proc["process_run"]}/stderr">stderr</a> </td>
+       </tr>
+      % endfor
+     </tbody>
+     </table>
+  </div>
+</div>
+
+</body>
+</html>

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/apache/thermos/observer/http/templating.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/thermos/observer/http/templating.py b/src/main/python/apache/thermos/observer/http/templating.py
new file mode 100644
index 0000000..783308b
--- /dev/null
+++ b/src/main/python/apache/thermos/observer/http/templating.py
@@ -0,0 +1,9 @@
+import os
+import pkg_resources
+
+
+class HttpTemplate(object):
+  @staticmethod
+  def load(name):
+    return pkg_resources.resource_string(
+        __name__, os.path.join('templates', '%s.tpl' % name))

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/apache/thermos/observer/observed_task.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/thermos/observer/observed_task.py b/src/main/python/apache/thermos/observer/observed_task.py
new file mode 100644
index 0000000..f995998
--- /dev/null
+++ b/src/main/python/apache/thermos/observer/observed_task.py
@@ -0,0 +1,136 @@
+from abc import abstractproperty
+import os
+
+from twitter.common import log
+from twitter.common.lang import AbstractClass
+from twitter.thermos.config.loader import ThermosTaskWrapper
+from twitter.thermos.config.schema import ThermosContext
+from twitter.thermos.common.ckpt import CheckpointDispatcher
+
+from pystachio import Environment
+
+
+class ObservedTask(AbstractClass):
+  """ Represents a Task being observed """
+
+  @classmethod
+  def safe_mtime(cls, path):
+    try:
+      return os.path.getmtime(path)
+    except OSError:
+      return None
+
+  def __init__(self, task_id, pathspec):
+    self._task_id = task_id
+    self._pathspec = pathspec
+    self._mtime = self._get_mtime()
+
+  @abstractproperty
+  def type(self):
+    """Indicates the type of task (active or finished)"""
+
+  def _read_task(self, memoized={}):
+    """Read the corresponding task from disk and return a ThermosTask.  Memoizes already-read tasks.
+    """
+    if self._task_id not in memoized:
+      path = self._pathspec.given(task_id=self._task_id, state=self.type).getpath('task_path')
+      if os.path.exists(path):
+        task = ThermosTaskWrapper.from_file(path)
+        if task is None:
+          log.error('Error reading ThermosTask from %s in observer.' % path)
+        else:
+          context = self.context(self._task_id)
+          if not context:
+            log.warning('Task not yet available: %s' % self._task_id)
+          task = task.task() % Environment(thermos=context)
+          memoized[self._task_id] = task
+
+    return memoized.get(self._task_id, None)
+
+  def _get_mtime(self):
+    """Retrieve the mtime of the task's state directory"""
+    get_path = lambda state: self._pathspec.given(
+      task_id=self._task_id, state=state).getpath('task_path')
+    mtime = self.safe_mtime(get_path('active'))
+    if mtime is None:
+      mtime = self.safe_mtime(get_path('finished'))
+    if mtime is None:
+      log.error("Couldn't get mtime for task %s!" % self._task_id)
+    return mtime
+
+  def context(self, task_id):
+    state = self.state
+    if state.header is None:
+      return None
+    return ThermosContext(
+      ports=state.header.ports if state.header.ports else {},
+      task_id=state.header.task_id,
+      user=state.header.user,
+    )
+
+  @property
+  def task(self):
+    """Return a ThermosTask representing this task"""
+    return self._read_task()
+
+  @property
+  def task_id(self):
+    """Return the task's task_id"""
+    return self._task_id
+
+  @property
+  def mtime(self):
+    """Return mtime of task file"""
+    return self._mtime
+
+  @abstractproperty
+  def state(self):
+    """Return state of task (gen.twitter.thermos.ttypes.RunnerState)"""
+
+
+class ActiveObservedTask(ObservedTask):
+  """An active Task known by the TaskObserver"""
+
+  def __init__(self, task_id, pathspec, task_monitor, resource_monitor):
+    super(ActiveObservedTask, self).__init__(task_id, pathspec)
+    self._task_monitor = task_monitor
+    self._resource_monitor = resource_monitor
+
+  @property
+  def type(self):
+    return 'active'
+
+  @property
+  def state(self):
+    """Return a RunnerState representing the current state of task, retrieved from TaskMonitor"""
+    return self.task_monitor.get_state()
+
+  @property
+  def task_monitor(self):
+    """Return a TaskMonitor monitoring this task"""
+    return self._task_monitor
+
+  @property
+  def resource_monitor(self):
+    """Return a ResourceMonitor implementation monitoring this task's resources"""
+    return self._resource_monitor
+
+
+class FinishedObservedTask(ObservedTask):
+  """A finished Task known by the TaskObserver"""
+
+  def __init__(self, task_id, pathspec):
+    super(FinishedObservedTask, self).__init__(task_id, pathspec)
+    self._state = None
+
+  @property
+  def type(self):
+    return 'finished'
+
+  @property
+  def state(self):
+    """Return final state of Task (RunnerState, read from disk and cached for future access)"""
+    if self._state is None:
+      path = self._pathspec.given(task_id=self._task_id).getpath('runner_checkpoint')
+      self._state = CheckpointDispatcher.from_file(path)
+    return self._state

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/apache/thermos/observer/task_observer.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/thermos/observer/task_observer.py b/src/main/python/apache/thermos/observer/task_observer.py
new file mode 100644
index 0000000..b6ebcf7
--- /dev/null
+++ b/src/main/python/apache/thermos/observer/task_observer.py
@@ -0,0 +1,640 @@
+"""Observe Thermos tasks on a system
+
+This module provides a number of classes for exposing information about running (active) and
+finished Thermos tasks on a system. The primary entry point is the TaskObserver, a thread which
+polls a designated Thermos checkpoint root and collates information about all tasks it discovers.
+
+"""
+from operator import attrgetter
+import os
+import threading
+import time
+
+from twitter.common import log
+from twitter.common.exceptions import ExceptionalThread
+from twitter.common.lang import Lockable
+from twitter.common.quantity import Amount, Time
+
+from twitter.thermos.common.path import TaskPath
+from twitter.thermos.monitoring.detector import TaskDetector
+from twitter.thermos.monitoring.monitor import TaskMonitor
+from twitter.thermos.monitoring.process import ProcessSample
+from twitter.thermos.monitoring.resource import ResourceMonitorBase, TaskResourceMonitor
+
+from gen.twitter.thermos.ttypes import ProcessState, TaskState
+
+from .observed_task import ActiveObservedTask, FinishedObservedTask
+
+
+class TaskObserver(ExceptionalThread, Lockable):
+  """
+    The TaskObserver monitors the thermos checkpoint root for active/finished
+    tasks.  It is used to be the oracle of the state of all thermos tasks on
+    a machine.
+
+    It currently returns JSON, but really should just return objects.  We should
+    then build an object->json translator.
+  """
+  class UnexpectedError(Exception): pass
+  class UnexpectedState(Exception): pass
+
+  POLLING_INTERVAL = Amount(1, Time.SECONDS)
+
+  def __init__(self, root, resource_monitor_class=TaskResourceMonitor):
+    self._pathspec = TaskPath(root=root)
+    self._detector = TaskDetector(root)
+    if not issubclass(resource_monitor_class, ResourceMonitorBase):
+      raise ValueError("resource monitor class must implement ResourceMonitorBase!")
+    self._resource_monitor = resource_monitor_class
+    self._active_tasks = {}    # task_id => ActiveObservedTask
+    self._finished_tasks = {}  # task_id => FinishedObservedTask
+    self._stop_event = threading.Event()
+    ExceptionalThread.__init__(self)
+    Lockable.__init__(self)
+    self.daemon = True
+
+  @property
+  def active_tasks(self):
+    """Return a dictionary of active Tasks"""
+    return self._active_tasks
+
+  @property
+  def finished_tasks(self):
+    """Return a dictionary of finished Tasks"""
+    return self._finished_tasks
+
+  @property
+  def all_tasks(self):
+    """Return a dictionary of all Tasks known by the TaskObserver"""
+    return dict(self.active_tasks.items() + self.finished_tasks.items())
+
+  def stop(self):
+    self._stop_event.set()
+
+  def start(self):
+    ExceptionalThread.start(self)
+
+  @Lockable.sync
+  def add_active_task(self, task_id):
+    if task_id in self.finished_tasks:
+      log.error('Found an active task (%s) in finished tasks?' % task_id)
+      return
+    task_monitor = TaskMonitor(self._pathspec, task_id)
+    if not task_monitor.get_state().header:
+      log.info('Unable to load task "%s"' % task_id)
+      return
+    sandbox = task_monitor.get_state().header.sandbox
+    resource_monitor = self._resource_monitor(task_monitor, sandbox)
+    resource_monitor.start()
+    self._active_tasks[task_id] = ActiveObservedTask(
+      task_id=task_id, pathspec=self._pathspec,
+      task_monitor=task_monitor, resource_monitor=resource_monitor
+    )
+
+  @Lockable.sync
+  def add_finished_task(self, task_id):
+    self._finished_tasks[task_id] = FinishedObservedTask(
+      task_id=task_id, pathspec=self._pathspec
+    )
+
+  @Lockable.sync
+  def active_to_finished(self, task_id):
+    self.remove_active_task(task_id)
+    self.add_finished_task(task_id)
+
+  @Lockable.sync
+  def remove_active_task(self, task_id):
+    task = self.active_tasks.pop(task_id)
+    task.resource_monitor.kill()
+
+  @Lockable.sync
+  def remove_finished_task(self, task_id):
+    self.finished_tasks.pop(task_id)
+
+  def run(self):
+    """
+      The internal thread for the observer.  This periodically polls the
+      checkpoint root for new tasks, or transitions of tasks from active to
+      finished state.
+    """
+    while not self._stop_event.is_set():
+      time.sleep(self.POLLING_INTERVAL.as_(Time.SECONDS))
+
+      active_tasks = [task_id for _, task_id in self._detector.get_task_ids(state='active')]
+      finished_tasks = [task_id for _, task_id in self._detector.get_task_ids(state='finished')]
+
+      with self.lock:
+
+        # Ensure all tasks currently detected on the system are observed appropriately
+        for active in active_tasks:
+          if active not in self.active_tasks:
+            log.debug('task_id %s (unknown) -> active' % active)
+            self.add_active_task(active)
+        for finished in finished_tasks:
+          if finished in self.active_tasks:
+            log.debug('task_id %s active -> finished' % finished)
+            self.active_to_finished(finished)
+          elif finished not in self.finished_tasks:
+            log.debug('task_id %s (unknown) -> finished' % finished)
+            self.add_finished_task(finished)
+
+        # Remove ObservedTasks for tasks no longer detected on the system
+        for unknown in set(self.active_tasks) - set(active_tasks + finished_tasks):
+          log.debug('task_id %s active -> (unknown)' % unknown)
+          self.remove_active_task(unknown)
+        for unknown in set(self.finished_tasks) - set(active_tasks + finished_tasks):
+          log.debug('task_id %s finished -> (unknown)' % unknown)
+          self.remove_finished_task(unknown)
+
+  @Lockable.sync
+  def process_from_name(self, task_id, process_id):
+    if task_id in self.all_tasks:
+      task = self.all_tasks[task_id].task
+      if task:
+        for process in task.processes():
+          if process.name().get() == process_id:
+            return process
+
+  @Lockable.sync
+  def task_count(self):
+    """
+      Return the count of tasks that could be ready properly from disk.
+      This may be <= self.task_id_count()
+    """
+    return dict(
+      active=len(self.active_tasks),
+      finished=len(self.finished_tasks),
+      all=len(self.all_tasks),
+    )
+
+  @Lockable.sync
+  def task_id_count(self):
+    """
+      Return the raw count of active and finished task_ids from the TaskDetector.
+    """
+    num_active = len(list(self._detector.get_task_ids(state='active')))
+    num_finished = len(list(self._detector.get_task_ids(state='finished')))
+    return dict(active=num_active, finished=num_finished, all=num_active + num_finished)
+
+  def _get_tasks_of_type(self, type):
+    """Convenience function to return all tasks of a given type"""
+    tasks = {
+      'active': self.active_tasks,
+      'finished': self.finished_tasks,
+      'all': self.all_tasks,
+    }.get(type, None)
+
+    if tasks is None:
+      log.error('Unknown task type %s' % type)
+      return {}
+
+    return tasks
+
+  @Lockable.sync
+  def state(self, task_id):
+    """Return a dict containing mapped information about a task's state"""
+    real_state = self.raw_state(task_id)
+    if real_state is None or real_state.header is None:
+      return {}
+    else:
+      return dict(
+        task_id=real_state.header.task_id,
+        launch_time=real_state.header.launch_time_ms / 1000.0,
+        sandbox=real_state.header.sandbox,
+        hostname=real_state.header.hostname,
+        user=real_state.header.user
+      )
+
+  @Lockable.sync
+  def raw_state(self, task_id):
+    """
+      Return the current runner state (thrift blob: gen.twitter.thermos.ttypes.RunnerState)
+      of a given task id
+    """
+    if task_id not in self.all_tasks:
+      return None
+    return self.all_tasks[task_id].state
+
+  @Lockable.sync
+  def _task_processes(self, task_id):
+    """
+      Return the processes of a task given its task_id.
+
+      Returns a map from state to processes in that state, where possible
+      states are: waiting, running, success, failed.
+    """
+    if task_id not in self.all_tasks:
+      return {}
+    state = self.raw_state(task_id)
+    if state is None or state.header is None:
+      return {}
+
+    waiting, running, success, failed, killed = [], [], [], [], []
+    for process, runs in state.processes.items():
+      # No runs ==> nothing started.
+      if len(runs) == 0:
+        waiting.append(process)
+      else:
+        if runs[-1].state in (None, ProcessState.WAITING, ProcessState.LOST):
+          waiting.append(process)
+        elif runs[-1].state in (ProcessState.FORKED, ProcessState.RUNNING):
+          running.append(process)
+        elif runs[-1].state == ProcessState.SUCCESS:
+          success.append(process)
+        elif runs[-1].state == ProcessState.FAILED:
+          failed.append(process)
+        elif runs[-1].state == ProcessState.KILLED:
+          killed.append(process)
+        else:
+          # TODO(wickman)  Consider log.error instead of raising.
+          raise self.UnexpectedState(
+            "Unexpected ProcessHistoryState: %s" % state.processes[process].state)
+
+    return dict(waiting=waiting, running=running, success=success, failed=failed, killed=killed)
+
+  @Lockable.sync
+  def main(self, type=None, offset=None, num=None):
+    """Return a set of information about tasks, optionally filtered
+
+      Args:
+        type = (all|active|finished|None) [default: all]
+        offset = offset into the list of task_ids [default: 0]
+        num = number of results to return [default: 20]
+
+      Tasks are sorted by interest:
+        - active tasks are sorted by start time
+        - finished tasks are sorted by completion time
+
+      Returns:
+        {
+          tasks: [task_id_1, ..., task_id_N],
+          type: query type,
+          offset: next offset,
+          num: next num
+        }
+
+    """
+    type = type or 'all'
+    offset = offset or 0
+    num = num or 20
+
+    # Get a list of all ObservedTasks of requested type
+    tasks = sorted((task for task in self._get_tasks_of_type(type).values()),
+                   key=attrgetter('mtime'), reverse=True)
+
+    # Filter by requested offset + number of results
+    end = num
+    if offset < 0:
+      offset = offset % len(tasks) if len(tasks) > abs(offset) else 0
+    end += offset
+    tasks = tasks[offset:end]
+
+    def task_row(observed_task):
+      """Generate an output row for a Task"""
+      task = self._task(observed_task.task_id)
+      # tasks include those which could not be found properly and are hence empty {}
+      if task:
+        return dict(
+            task_id=observed_task.task_id,
+            name=task['name'],
+            role=task['user'],
+            launch_timestamp=task['launch_timestamp'],
+            state=task['state'],
+            state_timestamp=task['state_timestamp'],
+            ports=task['ports'],
+            **task['resource_consumption'])
+
+    return dict(
+      tasks=filter(None, map(task_row, tasks)),
+      type=type,
+      offset=offset,
+      num=num,
+      task_count=self.task_count()[type],
+    )
+
+  def _sample(self, task_id):
+    if task_id not in self.active_tasks:
+      log.debug("Task %s not found in active tasks" % task_id)
+      sample = ProcessSample.empty().to_dict()
+      sample['disk'] = 0
+    else:
+      resource_sample = self.active_tasks[task_id].resource_monitor.sample()[1]
+      sample = resource_sample.process_sample.to_dict()
+      sample['disk'] = resource_sample.disk_usage
+      log.debug("Got sample for task %s: %s" % (task_id, sample))
+    return sample
+
+  @Lockable.sync
+  def task_statuses(self, task_id):
+    """
+      Return the sequence of task states.
+
+      [(task_state [string], timestamp), ...]
+    """
+
+    # Unknown task_id.
+    if task_id not in self.all_tasks:
+      return []
+
+    task = self.all_tasks[task_id]
+    if task is None:
+      return []
+
+    state = self.raw_state(task_id)
+    if state is None or state.header is None:
+      return []
+
+    # Get the timestamp of the transition into the current state.
+    return [
+      (TaskState._VALUES_TO_NAMES.get(st.state, 'UNKNOWN'), st.timestamp_ms / 1000)
+      for st in state.statuses]
+
+  @Lockable.sync
+  def _task(self, task_id):
+    """
+      Return composite information about a particular task task_id, given the below
+      schema.
+
+      {
+         task_id: string,
+         name: string,
+         user: string,
+         launch_timestamp: seconds,
+         state: string [ACTIVE, SUCCESS, FAILED]
+         ports: { name1: 'url', name2: 'url2' }
+         resource_consumption: { cpu:, ram:, disk: }
+         processes: { -> names only
+            waiting: [],
+            running: [],
+            success: [],
+            failed:  []
+         }
+      }
+    """
+    # Unknown task_id.
+    if task_id not in self.all_tasks:
+      return {}
+
+    task = self.all_tasks[task_id].task
+    if task is None:
+      # TODO(wickman)  Can this happen?
+      log.error('Could not find task: %s' % task_id)
+      return {}
+
+    state = self.raw_state(task_id)
+    if state is None or state.header is None:
+      # TODO(wickman)  Can this happen?
+      return {}
+
+    # Get the timestamp of the transition into the current state.
+    current_state = state.statuses[-1].state
+    last_state = state.statuses[0]
+    state_timestamp = 0
+    for status in state.statuses:
+      if status.state == current_state and last_state != current_state:
+        state_timestamp = status.timestamp_ms / 1000
+      last_state = status.state
+
+    return dict(
+       task_id=task_id,
+       name=task.name().get(),
+       launch_timestamp=state.statuses[0].timestamp_ms / 1000,
+       state=TaskState._VALUES_TO_NAMES[state.statuses[-1].state],
+       state_timestamp=state_timestamp,
+       user=state.header.user,
+       resource_consumption=self._sample(task_id),
+       ports=state.header.ports,
+       processes=self._task_processes(task_id),
+       task_struct=task,
+    )
+
+  @Lockable.sync
+  def _get_process_resource_consumption(self, task_id, process_name):
+    if task_id not in self.active_tasks:
+      log.debug("Task %s not found in active tasks" % task_id)
+      return ProcessSample.empty().to_dict()
+    sample = self.active_tasks[task_id].resource_monitor.sample_by_process(process_name).to_dict()
+    log.debug('Resource consumption (%s, %s) => %s' % (task_id, process_name, sample))
+    return sample
+
+  @Lockable.sync
+  def _get_process_tuple(self, history, run):
+    """
+      Return the basic description of a process run if it exists, otherwise
+      an empty dictionary.
+
+      {
+        process_name: string
+        process_run: int
+        state: string [WAITING, FORKED, RUNNING, SUCCESS, KILLED, FAILED, LOST]
+        (optional) start_time: seconds from epoch
+        (optional) stop_time: seconds from epoch
+      }
+    """
+    if len(history) == 0:
+      return {}
+    if run >= len(history):
+      return {}
+    else:
+      process_run = history[run]
+      run = run % len(history)
+      d = dict(
+        process_name=process_run.process,
+        process_run=run,
+        state=ProcessState._VALUES_TO_NAMES[process_run.state],
+      )
+      if process_run.start_time:
+        d.update(start_time=process_run.start_time)
+      if process_run.stop_time:
+        d.update(stop_time=process_run.stop_time)
+      return d
+
+  @Lockable.sync
+  def process(self, task_id, process, run=None):
+    """
+      Returns a process run, where the schema is given below:
+
+      {
+        process_name: string
+        process_run: int
+        used: { cpu: float, ram: int bytes, disk: int bytes }
+        start_time: (time since epoch in millis (utc))
+        stop_time: (time since epoch in millis (utc))
+        state: string [WAITING, FORKED, RUNNING, SUCCESS, KILLED, FAILED, LOST]
+      }
+
+      If run is None, return the latest run.
+    """
+    state = self.raw_state(task_id)
+    if state is None or state.header is None:
+      return {}
+    if process not in state.processes:
+      return {}
+    history = state.processes[process]
+    run = int(run) if run is not None else -1
+    tup = self._get_process_tuple(history, run)
+    if not tup:
+      return {}
+    if tup.get('state') == 'RUNNING':
+      tup.update(used=self._get_process_resource_consumption(task_id, process))
+    return tup
+
+  @Lockable.sync
+  def _processes(self, task_id):
+    """
+      Return
+        {
+          process1: { ... }
+          process2: { ... }
+          ...
+          processN: { ... }
+        }
+
+      where processK is the latest run of processK and in the schema as
+      defined by process().
+    """
+
+    if task_id not in self.all_tasks:
+      return {}
+    state = self.raw_state(task_id)
+    if state is None or state.header is None:
+      return {}
+
+    processes = self._task_processes(task_id)
+    d = dict()
+    for process_type in processes:
+      for process_name in processes[process_type]:
+        d[process_name] = self.process(task_id, process_name)
+    return d
+
+  @Lockable.sync
+  def processes(self, task_ids):
+    """
+      Given a list of task_ids, returns a map of task_id => processes, where processes
+      is defined by the schema in _processes.
+    """
+    if not isinstance(task_ids, (list, tuple)):
+      return {}
+    return dict((task_id, self._processes(task_id)) for task_id in task_ids)
+
+  @Lockable.sync
+  def get_run_number(self, runner_state, process, run=None):
+    if runner_state is not None and runner_state.processes is not None:
+      run = run if run is not None else -1
+      if run < len(runner_state.processes[process]):
+        if len(runner_state.processes[process]) > 0:
+          return run % len(runner_state.processes[process])
+
+  @Lockable.sync
+  def logs(self, task_id, process, run=None):
+    """
+      Given a task_id and a process and (optional) run number, return a dict:
+      {
+        stderr: [dir, filename]
+        stdout: [dir, filename]
+      }
+
+      If the run number is unspecified, uses the latest run.
+
+      TODO(wickman)  Just return the filenames directly?
+    """
+    runner_state = self.raw_state(task_id)
+    if runner_state is None or runner_state.header is None:
+      return {}
+    run = self.get_run_number(runner_state, process, run)
+    if run is None:
+      return {}
+    log_path = self._pathspec.given(task_id=task_id, process=process, run=run,
+                                    log_dir=runner_state.header.log_dir).getpath('process_logdir')
+    return dict(
+      stdout=[log_path, 'stdout'],
+      stderr=[log_path, 'stderr']
+    )
+
+  @staticmethod
+  def _sanitize_path(base_path, relpath):
+    """
+      Attempts to sanitize a path through path normalization, also making sure
+      that the relative path is contained inside of base_path.
+    """
+    if relpath is None:
+      relpath = "."
+    normalized_base = os.path.realpath(base_path)
+    normalized = os.path.realpath(os.path.join(base_path, relpath))
+    if normalized.startswith(normalized_base):
+      return (normalized_base, os.path.relpath(normalized, normalized_base))
+    return (None, None)
+
+  @Lockable.sync
+  def valid_file(self, task_id, path):
+    """
+      Like valid_path, but also verify the given path is a file
+    """
+    chroot, path = self.valid_path(task_id, path)
+    if chroot and path and os.path.isfile(os.path.join(chroot, path)):
+      return chroot, path
+    return None, None
+
+  @Lockable.sync
+  def valid_path(self, task_id, path):
+    """
+      Given a task_id and a path within that task_id's sandbox, verify:
+        (1) it's actually in the sandbox and not outside
+        (2) it's a valid, existing path
+      Returns chroot and the pathname relative to that chroot.
+    """
+    runner_state = self.raw_state(task_id)
+    if runner_state is None or runner_state.header is None:
+      return None, None
+    try:
+      chroot = runner_state.header.sandbox
+    except AttributeError:
+      return None, None
+    chroot, path = self._sanitize_path(chroot, path)
+    if chroot and path:
+      return chroot, path
+    return None, None
+
+  @Lockable.sync
+  def files(self, task_id, path=None):
+    """
+      Returns dictionary
+      {
+        task_id: task_id
+        chroot: absolute directory on machine
+        path: sanitized relative path w.r.t. chroot
+        dirs: list of directories
+        files: list of files
+      }
+    """
+    # TODO(jon): DEPRECATED: most of the necessary logic is handled directly in the templates.
+    # Also, global s/chroot/sandbox/?
+    empty = dict(task_id=task_id, chroot=None, path=None, dirs=None, files=None)
+    path = path if path is not None else '.'
+    runner_state = self.raw_state(task_id)
+    if runner_state is None:
+      return empty
+    try:
+      chroot = runner_state.header.sandbox
+    except AttributeError:
+      return empty
+    if chroot is None:  # chroot-less job
+      return empty
+    chroot, path = self._sanitize_path(chroot, path)
+    if (chroot is None or path is None
+        or not os.path.isdir(os.path.join(chroot, path))):
+      return empty
+    names = os.listdir(os.path.join(chroot, path))
+    dirs, files = [], []
+    for name in names:
+      if os.path.isdir(os.path.join(chroot, path, name)):
+        dirs.append(name)
+      else:
+        files.append(name)
+    return dict(
+      task_id=task_id,
+      chroot=chroot,
+      path=path,
+      dirs=dirs,
+      files=files
+    )

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/apache/thermos/testing/BUILD
----------------------------------------------------------------------
diff --git a/src/main/python/apache/thermos/testing/BUILD b/src/main/python/apache/thermos/testing/BUILD
new file mode 100644
index 0000000..b5485d0
--- /dev/null
+++ b/src/main/python/apache/thermos/testing/BUILD
@@ -0,0 +1,14 @@
+python_library(
+  name = 'runner',
+  sources = ['runner.py'],
+  dependencies = [
+    pants('aurora/twitterdeps/src/python/twitter/common/contextutil'),
+    pants('aurora/twitterdeps/src/python/twitter/common/log'),
+    pants('src/main/python/twitter/thermos/common:ckpt'),
+    pants('src/main/python/twitter/thermos/common:path'),
+    pants('src/main/python/twitter/thermos/config'),
+    pants('src/main/python/twitter/thermos/core'),
+    pants('src/main/python/twitter/thermos:thrift'),
+    pants('src/main/thrift/com/twitter/thermos:py-thrift'),
+  ]
+)

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/apache/thermos/testing/__init__.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/thermos/testing/__init__.py b/src/main/python/apache/thermos/testing/__init__.py
new file mode 100644
index 0000000..e69de29

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/apache/thermos/testing/runner.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/thermos/testing/runner.py b/src/main/python/apache/thermos/testing/runner.py
new file mode 100644
index 0000000..930f41b
--- /dev/null
+++ b/src/main/python/apache/thermos/testing/runner.py
@@ -0,0 +1,191 @@
+from __future__ import print_function
+
+import atexit
+import errno
+import os
+import shutil
+import subprocess
+import sys
+import tempfile
+import time
+
+from twitter.common import log
+from twitter.common.contextutil import temporary_file, environment_as
+from twitter.thermos.common.path import TaskPath
+from twitter.thermos.common.ckpt import CheckpointDispatcher
+from twitter.thermos.config.loader import ThermosTaskWrapper
+from thrift.TSerialization import deserialize as thrift_deserialize
+
+from gen.twitter.thermos.ttypes import (
+  TaskState,
+  RunnerCkpt,
+  RunnerState,
+)
+
+
+class Runner(object):
+  RUN_JOB_SCRIPT = """
+import os
+import random
+import sys
+from twitter.common import log
+from twitter.common.log.options import LogOptions
+from twitter.thermos.config.loader import ThermosConfigLoader
+from twitter.thermos.core.helper import TaskRunnerHelper
+from twitter.thermos.core.runner import TaskRunner, TaskRunnerUniversalHandler
+from thrift.TSerialization import serialize as thrift_serialize
+
+random.seed(%(random_seed)d)
+
+log.init('runner_base')
+LogOptions.set_disk_log_level('DEBUG')
+
+task = ThermosConfigLoader.load_json('%(filename)s')
+task = task.tasks()[0].task
+
+success_rate=%(success_rate)d
+
+class AngryHandler(TaskRunnerUniversalHandler):
+  def checkpoint(self, record):
+    if not self._runner._recovery:
+      if random.randint(0, 100) <= success_rate:
+        super(AngryHandler, self).checkpoint(record)
+      else:
+        sys.exit(1)
+
+sandbox = os.path.join('%(sandbox)s', '%(task_id)s')
+args = {}
+args['task_id'] = '%(task_id)s'
+if %(portmap)s:
+  args['portmap'] = %(portmap)s
+args['universal_handler'] = AngryHandler
+
+runner = TaskRunner(task, '%(root)s', sandbox, **args)
+runner.run()
+
+with open('%(state_filename)s', 'w') as fp:
+  fp.write(thrift_serialize(runner.state))
+"""
+
+  def __init__(self, task, portmap={}, success_rate=100, random_seed=31337):
+    """
+      task = Thermos task
+      portmap = port map
+      success_rate = success rate of writing checkpoint to disk
+    """
+    self.task = task
+
+    with temporary_file(cleanup=False) as fp:
+      self.job_filename = fp.name
+      fp.write(ThermosTaskWrapper(task).to_json())
+
+    self.state_filename = tempfile.mktemp()
+    self.tempdir = tempfile.mkdtemp()
+    self.task_id = '%s-runner-base' % int(time.time()*1000000)
+    self.sandbox = os.path.join(self.tempdir, 'sandbox')
+    self.portmap = portmap
+    self.cleaned = False
+    self.pathspec = TaskPath(root = self.tempdir, task_id = self.task_id)
+    self.script_filename = None
+    self.success_rate = success_rate
+    self.random_seed = random_seed
+    self._run_count = 0
+
+  @property
+  def pid(self):
+    return self.po.pid
+
+  @property
+  def root(self):
+    return self.tempdir
+
+  def run(self):
+    self._run_count += 1
+    atexit.register(self.cleanup)
+
+    if self.script_filename:
+      os.unlink(self.script_filename)
+
+    with temporary_file(cleanup=False) as fp:
+      self.script_filename = fp.name
+      fp.write(self.RUN_JOB_SCRIPT % {
+        'filename': self.job_filename,
+        'sandbox': self.sandbox,
+        'root': self.tempdir,
+        'task_id': self.task_id,
+        'state_filename': self.state_filename,
+        'portmap': repr(self.portmap),
+        'success_rate': self.success_rate,
+        'random_seed': self.random_seed + self._run_count,
+      })
+
+    with environment_as(PYTHONPATH=os.pathsep.join(sys.path)):
+      self.po = subprocess.Popen([sys.executable, self.script_filename],
+        stdout=subprocess.PIPE, stderr=subprocess.PIPE)
+      try:
+        so, se = self.po.communicate()
+      except OSError as e:
+        if e.errno == errno.ECHILD:
+          so = se = 'Killed'
+        else:
+          raise
+
+    rc = self.po.returncode
+    if rc != 0:
+      if os.path.exists(self.job_filename):
+        config = open(self.job_filename).read()
+      else:
+        config = 'Nonexistent!'
+      if 'THERMOS_DEBUG' in os.environ:
+        print("Runner failed!\n\n\nconfig:%s\n\n\nstdout:%s\n\n\nstderr:%s\n\n\n" % (
+            config, so, se))
+
+    try:
+      with open(self.state_filename, 'r') as fp:
+        self.state = thrift_deserialize(RunnerState(), fp.read())
+    except Exception as e:
+      if 'THERMOS_DEBUG' in os.environ:
+        print('Failed to load Runner state: %s' % e, file=sys.stderr)
+      self.state = RunnerState()
+
+    try:
+      self.reconstructed_state = CheckpointDispatcher.from_file(
+        self.pathspec.getpath('runner_checkpoint'))
+    except:
+      self.reconstructed_state = None
+    self.initialized = True
+    return rc
+
+  def cleanup(self):
+    if not self.cleaned:
+      if hasattr(self, 'po'):
+        try:
+          self.po.kill()
+        except:
+          pass
+      os.unlink(self.job_filename)
+      os.unlink(self.script_filename)
+      if 'THERMOS_DEBUG' not in os.environ:
+        shutil.rmtree(self.tempdir, ignore_errors=True)
+      else:
+        print('Logs saved in %s' % self.tempdir)
+      self.cleaned = True
+
+
+class RunnerTestBase(object):
+  @classmethod
+  def task(cls):
+    raise NotImplementedError
+
+  @classmethod
+  def setup_class(cls):
+    cls.runner = Runner(cls.task(), portmap=getattr(cls, 'portmap', {}))
+    cls.runner.run()
+    cls.state = cls.runner.state
+
+  @classmethod
+  def teardown_class(cls):
+    cls.runner.cleanup()
+
+  def test_runner_state_reconstruction(self):
+    assert self.state == self.runner.reconstructed_state

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/twitter/__init__.py
----------------------------------------------------------------------
diff --git a/src/main/python/twitter/__init__.py b/src/main/python/twitter/__init__.py
deleted file mode 100644
index de40ea7..0000000
--- a/src/main/python/twitter/__init__.py
+++ /dev/null
@@ -1 +0,0 @@
-__import__('pkg_resources').declare_namespace(__name__)

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/twitter/aurora/BUILD
----------------------------------------------------------------------
diff --git a/src/main/python/twitter/aurora/BUILD b/src/main/python/twitter/aurora/BUILD
deleted file mode 100644
index e69de29..0000000

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/twitter/aurora/BUILD.thirdparty
----------------------------------------------------------------------
diff --git a/src/main/python/twitter/aurora/BUILD.thirdparty b/src/main/python/twitter/aurora/BUILD.thirdparty
deleted file mode 100644
index 7a43aca..0000000
--- a/src/main/python/twitter/aurora/BUILD.thirdparty
+++ /dev/null
@@ -1,19 +0,0 @@
-def make_dep(name, version, dependency_name=None):
-  """Build a target from a specified dependency tuple.
-
-    name is the target name, specified in other BUILD files.
-    version is a hardcoded version string
-    dependency_name is used to identify the specific binary to resolve
-  """
-  dependency_name = dependency_name or name
-  versioned_name = "%s==%s" % (dependency_name, version)
-  python_requirement(requirement=versioned_name, name=name)
-
-make_dep('argparse', '1.2.1')
-make_dep('mesos-core', '0.15.0-rc4', 'mesos')
-make_dep('mock', '1.0.1')
-make_dep('mox', '0.5.3')
-make_dep('psutil', '1.1.2')
-make_dep('pystachio', '0.7.2')
-make_dep('pyyaml', '3.10', 'PyYAML')
-make_dep('thrift', '0.9.1')

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/twitter/aurora/__init__.py
----------------------------------------------------------------------
diff --git a/src/main/python/twitter/aurora/__init__.py b/src/main/python/twitter/aurora/__init__.py
deleted file mode 100644
index b0d6433..0000000
--- a/src/main/python/twitter/aurora/__init__.py
+++ /dev/null
@@ -1 +0,0 @@
-__import__('pkg_resources').declare_namespace(__name__)
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/twitter/aurora/admin/BUILD
----------------------------------------------------------------------
diff --git a/src/main/python/twitter/aurora/admin/BUILD b/src/main/python/twitter/aurora/admin/BUILD
deleted file mode 100644
index c8089b4..0000000
--- a/src/main/python/twitter/aurora/admin/BUILD
+++ /dev/null
@@ -1,11 +0,0 @@
-python_library(
-  name = 'mesos_maintenance',
-  sources = 'mesos_maintenance.py',
-  dependencies = [
-    pants('aurora/twitterdeps/src/python/twitter/common/log'),
-    pants('aurora/twitterdeps/src/python/twitter/common/quantity'),
-    pants('src/main/python/twitter/aurora/client:api'),
-    pants('src/main/python/twitter/aurora/client:base'),
-    pants('src/main/thrift/com/twitter/aurora/gen:py-thrift'),
-  ]
-)

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/twitter/aurora/admin/__init__.py
----------------------------------------------------------------------
diff --git a/src/main/python/twitter/aurora/admin/__init__.py b/src/main/python/twitter/aurora/admin/__init__.py
deleted file mode 100644
index e69de29..0000000