You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aurora.apache.org by wf...@apache.org on 2013/12/31 22:20:09 UTC
[16/51] [partial] Rename twitter* and com.twitter to apache and
org.apache directories to preserve all file history before the refactor.
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/apache/thermos/observer/http/assets/observer.js
----------------------------------------------------------------------
diff --git a/src/main/python/apache/thermos/observer/http/assets/observer.js b/src/main/python/apache/thermos/observer/http/assets/observer.js
new file mode 100644
index 0000000..84aec5d
--- /dev/null
+++ b/src/main/python/apache/thermos/observer/http/assets/observer.js
@@ -0,0 +1,202 @@
+var Task = new Class({
+ data: {},
+
+ initialize: function(taskId, taskType) {
+ this.taskType = taskType
+ this.taskId = taskId
+ this.visible = false
+ this.setElement()
+ },
+
+ // task[active.task_id.processes.waiting]
+ taskStr: function(components) {
+ return 'task[' + ([this.taskType, this.taskId].append(components).join()) + ']'
+ },
+
+ transitionElement: function(dom_id, newval) {
+ if (!$(dom_id)) {
+ // alert('could not find dom_id' + dom_id)
+ return
+ }
+
+ curval = $(dom_id).innerHTML
+
+ // handle specialcases
+ newval = this.translateElement(dom_id, newval)
+
+ if (newval != curval) {
+ $(dom_id).innerHTML = newval
+ var morphElement = new Fx.Morph(dom_id, {
+ duration: 'long',
+ transition: Fx.Transitions.Sine.easeOut
+ });
+ morphElement.start({ 'color': ["#00FF00", "#222222"]})
+ }
+ },
+
+ applyUpdate: function(update) {
+ this.data = update
+ this.updateElement()
+ },
+
+ /*
+ TODO(wickman) Instead do the 2-3 necessary transition functions, then make
+ a schema that maps attributes to the appropriate transition function.
+ */
+ translateElement: function(dom_id, value) {
+ if (dom_id === this.taskStr(['processes', 'running']) ||
+ dom_id === this.taskStr(['processes', 'waiting']) ||
+ dom_id === this.taskStr(['processes', 'success']) ||
+ dom_id === this.taskStr(['processes', 'failed'])) {
+ return value.length
+ }
+
+ if (dom_id == this.taskStr(['resource_consumption', 'cpu'])) {
+ if (value) {
+ return value.toFixed(2)
+ }
+ }
+
+ return value
+ },
+
+ /*
+ TODO(wickman) Ditto to above.
+ */
+ updateElement: function() {
+ for (prefix_attr in this.data) {
+ if (this.data.hasOwnProperty(prefix_attr)) {
+ if (prefix_attr == "task_id") {
+ this.transitionElement(this.taskStr([prefix_attr]), this.translateUid(this.data[prefix_attr]))
+ } else if (instanceOf(this.data[prefix_attr], String) || instanceOf(this.data[prefix_attr], Number)) {
+ this.transitionElement(this.taskStr([prefix_attr]), this.data[prefix_attr])
+ } else if (instanceOf(this.data[prefix_attr], Object)) {
+ for (suffix_attr in this.data[prefix_attr])
+ if (this.data[prefix_attr].hasOwnProperty(suffix_attr))
+ this.transitionElement(this.taskStr([prefix_attr, suffix_attr]),
+ this.data[prefix_attr][suffix_attr])
+ }
+ }
+ }
+ },
+
+ translateUid: function(taskId) {
+ return "<a href='/task/" + taskId + "'>" + taskId + "</a>"
+ },
+
+ setElement: function() {
+ this.element = new Element('tr', {
+ 'id': 'task[' + this.taskType + '][' + this.taskId + ']'}).adopt(
+ new Element('td', { 'id': this.taskStr(['task_id']), 'html': this.translateUid(this.taskId)}),
+ new Element('td', { 'id': this.taskStr(['name']) }),
+ new Element('td', { 'id': this.taskStr(['resource_consumption', 'cpu']) }),
+ new Element('td', { 'id': this.taskStr(['resource_consumption', 'ram']) }),
+ new Element('td', { 'id': this.taskStr(['resource_consumption', 'disk']) }),
+ new Element('td', { 'id': this.taskStr(['processes', 'waiting']) }),
+ new Element('td', { 'id': this.taskStr(['processes', 'running']) }),
+ new Element('td', { 'id': this.taskStr(['processes', 'success']) }),
+ new Element('td', { 'id': this.taskStr(['processes', 'failed']) }))
+ }
+})
+
+var TableManager = new Class({
+ activeTasks: {},
+ visibleChildren: [],
+
+ initialize: function(tableType) {
+ this.tableType = tableType
+ this.setElement()
+ this.getChildren()
+ this.startPolling()
+ },
+
+ setElement: function() {
+ this.element = new Element('table', { 'id': 'table[' + this.tableType + ']', 'class': 'common-table zebra-striped', 'cellpadding': '0' })
+ this.element.adopt(
+ new Element('thead').
+ adopt(new Element('tr', { 'id': 'task[' + this.tableType + '][superheader]', 'class': 'meta-headers' })
+ .adopt(new Element('th', { 'html': "", 'colspan': 2 }),
+ new Element('th', { 'html': "consumed", 'colspan': 3 }),
+ new Element('th', { 'html': "processes", 'colspan': 4 })
+ )
+ )
+ .adopt(new Element('tr', { 'id': 'task[' + this.tableType + '][header]' })
+ .adopt(new Element('th', { 'id': 'task[' + this.tableType + '][header][task_id]', 'html': "task_id" }),
+ new Element('th', { 'id': 'task[' + this.tableType + '][header][name]', 'html': "name" }),
+ new Element('th', { 'id': 'task[' + this.tableType + '][header][resource_consumption][cpu]', 'html': "cpu" }),
+ new Element('th', { 'id': 'task[' + this.tableType + '][header][resource_consumption][ram]', 'html': "ram" }),
+ new Element('th', { 'id': 'task[' + this.tableType + '][header][resource_consumption][disk]','html': "disk" }),
+ new Element('th', { 'id': 'task[' + this.tableType + '][header][processes][waiting]', 'html': "waiting" }),
+ new Element('th', { 'id': 'task[' + this.tableType + '][header][processes][running]', 'html': "running"}),
+ new Element('th', { 'id': 'task[' + this.tableType + '][header][processes][success]', 'html': "success" }),
+ new Element('th', { 'id': 'task[' + this.tableType + '][header][processes][failed]', 'html': "failed" })
+ )
+ )
+ )
+ this.tbody = new Element('tbody', { 'id': this.tableType + '_tbody' })
+ this.element.adopt(this.tbody)
+ },
+
+ toElement: function() {
+ return this.element
+ },
+
+ startPolling: function() {
+ this.taskIdPoller = setInterval(this.tableType + 'TableManager.getChildren();', 2500)
+ this.dataPoller = setInterval(this.tableType + 'TableManager.refreshVisibleChildren();', 5300)
+ },
+
+ getChildren: function() {
+ new Request.JSON({
+ 'url': '/j/task_ids/' + this.tableType + '/-20',
+ 'method': 'get',
+ 'onComplete': function(response) {
+ if (response) {
+ var newChildren = Array.from(response.task_ids)
+ this.visibleChildren = newChildren
+
+ // first set all children to invisible
+ for (taskId in this.activeTasks) {
+ this.activeTasks[taskId].visible = false
+ }
+
+ // set new children visible
+ for (var k = 0; k < newChildren.length; k++) {
+ if (!(newChildren[k] in this.activeTasks)) {
+ this.activeTasks[newChildren[k]] = new Task(newChildren[k], this.tableType)
+ }
+ this.activeTasks[newChildren[k]].visible = true
+ }
+
+ // clear then adopt them
+ while (this.tbody.childNodes.length > 0)
+ this.tbody.removeChild(this.tbody.firstChild);
+ for (var k = 0; k < newChildren.length; k++)
+ this.tbody.adopt(this.activeTasks[newChildren[k]].element)
+ } else {
+ clearInterval(this.taskIdPoller)
+ }
+ }.bind(this)
+ }).send()
+ },
+
+ refreshVisibleChildren: function() {
+ // first get visible children
+ new Request.JSON({
+ 'url': '/j/task',
+ 'method': 'get',
+ 'data': { 'task_id': this.visibleChildren.join() },
+ 'onComplete': function(response) {
+ if (response) {
+ if (!response) return;
+ for (taskId in response) {
+ if (response.hasOwnProperty(taskId))
+ this.activeTasks[taskId].applyUpdate(response[taskId])
+ }
+ } else {
+ clearInterval(this.dataPoller)
+ }
+ }.bind(this)
+ }).send()
+ },
+})
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/apache/thermos/observer/http/file_browser.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/thermos/observer/http/file_browser.py b/src/main/python/apache/thermos/observer/http/file_browser.py
new file mode 100644
index 0000000..45c5708
--- /dev/null
+++ b/src/main/python/apache/thermos/observer/http/file_browser.py
@@ -0,0 +1,124 @@
+import os
+import pprint
+from xml.sax.saxutils import escape
+
+from twitter.common import log
+from twitter.common.http import HttpServer
+
+import bottle
+from mako.template import Template
+
+from .templating import HttpTemplate
+
+
+MB = 1024 * 1024
+DEFAULT_CHUNK_LENGTH = MB
+MAX_CHUNK_LENGTH = 16 * MB
+
+
+def _read_chunk(filename, offset=None, length=None):
+ offset = offset or -1
+ length = length or -1
+
+ try:
+ length = long(length)
+ offset = long(offset)
+ except ValueError:
+ return {}
+
+ if not os.path.isfile(filename):
+ return {}
+
+ try:
+ fstat = os.stat(filename)
+ except Exception as e:
+ log.error('Could not read from %s: %s' % (filename, e))
+ return {}
+
+ if offset == -1:
+ offset = fstat.st_size
+
+ if length == -1:
+ length = fstat.st_size - offset
+
+ with open(filename, "r") as fp:
+ fp.seek(offset)
+ try:
+ data = fp.read(length)
+ except IOError as e:
+ log.error('Failed to read %s: %s' % (filename, e), exc_info=True)
+ return {}
+
+ if data:
+ return dict(offset=offset, length=len(data), data=escape(data.decode('utf8', 'replace')))
+
+ return dict(offset=offset, length=0)
+
+
+class TaskObserverFileBrowser(object):
+ """
+ Mixin for Thermos observer File browser.
+ """
+
+ @HttpServer.route("/logs/:task_id/:process/:run/:logtype")
+ @HttpServer.mako_view(HttpTemplate.load('logbrowse'))
+ def handle_logs(self, task_id, process, run, logtype):
+ types = self._observer.logs(task_id, process, int(run))
+ if logtype not in types:
+ bottle.abort(404, "No such log type: %s" % logtype)
+ base, path = types[logtype]
+ filename = os.path.join(base, path)
+ return {
+ 'task_id': task_id,
+ 'filename': filename,
+ 'process': process,
+ 'run': run,
+ 'logtype': logtype
+ }
+
+ @HttpServer.route("/logdata/:task_id/:process/:run/:logtype")
+ def handle_logdata(self, task_id, process, run, logtype):
+ offset = self.Request.GET.get('offset', -1)
+ length = self.Request.GET.get('length', -1)
+ types = self._observer.logs(task_id, process, int(run))
+ if logtype not in types:
+ return {}
+ chroot, path = types[logtype]
+ return _read_chunk(os.path.join(chroot, path), offset, length)
+
+ @HttpServer.route("/file/:task_id/:path#.+#")
+ @HttpServer.mako_view(HttpTemplate.load('filebrowse'))
+ def handle_file(self, task_id, path):
+ if path is None:
+ bottle.abort(404, "No such file")
+ return {
+ 'task_id': task_id,
+ 'filename': path,
+ }
+
+ @HttpServer.route("/filedata/:task_id/:path#.+#")
+ def handle_filedata(self, task_id, path):
+ if path is None:
+ return {}
+ offset = self.Request.GET.get('offset', -1)
+ length = self.Request.GET.get('length', -1)
+ chroot, path = self._observer.valid_file(task_id, path)
+ if chroot is None or path is None:
+ return {}
+ return _read_chunk(os.path.join(chroot, path), offset, length)
+
+ @HttpServer.route("/browse/:task_id")
+ @HttpServer.route("/browse/:task_id/:path#.*#")
+ @HttpServer.mako_view(HttpTemplate.load('filelist'))
+ def handle_dir(self, task_id, path=None):
+ if path == "":
+ path = None
+ chroot, path = self._observer.valid_path(task_id, path)
+ return dict(task_id=task_id, chroot=chroot, path=path)
+
+ @HttpServer.route("/download/:task_id/:path#.+#")
+ def handle_download(self, task_id, path=None):
+ chroot, path = self._observer.valid_path(task_id, path)
+ if path is None:
+ bottle.abort(404, "No such file")
+ return bottle.static_file(path, root=chroot, download=True)
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/apache/thermos/observer/http/http_observer.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/thermos/observer/http/http_observer.py b/src/main/python/apache/thermos/observer/http/http_observer.py
new file mode 100644
index 0000000..b73f17f
--- /dev/null
+++ b/src/main/python/apache/thermos/observer/http/http_observer.py
@@ -0,0 +1,133 @@
+"""HTTP interface to the Thermos TaskObserver
+
+This modules provides an HTTP server which exposes information about Thermos tasks running on a
+system. To do this, it relies heavily on the Thermos TaskObserver.
+
+"""
+
+import os
+import socket
+
+from twitter.common import log
+from twitter.common.http import HttpServer
+
+from .file_browser import TaskObserverFileBrowser
+from .json import TaskObserverJSONBindings
+from .static_assets import StaticAssets
+from .templating import HttpTemplate
+
+
+class BottleObserver(HttpServer, StaticAssets, TaskObserverFileBrowser, TaskObserverJSONBindings):
+ """
+ A bottle wrapper around a Thermos TaskObserver.
+ """
+
+ def __init__(self, observer):
+ self._observer = observer
+ StaticAssets.__init__(self)
+ TaskObserverFileBrowser.__init__(self)
+ TaskObserverJSONBindings.__init__(self)
+ HttpServer.__init__(self)
+
+ @HttpServer.route("/")
+ @HttpServer.view(HttpTemplate.load('index'))
+ def handle_index(self):
+ return dict(hostname=socket.gethostname())
+
+ @HttpServer.route("/main")
+ @HttpServer.route("/main/:type")
+ @HttpServer.route("/main/:type/:offset")
+ @HttpServer.route("/main/:type/:offset/:num")
+ @HttpServer.mako_view(HttpTemplate.load('main'))
+ def handle_main(self, type=None, offset=None, num=None):
+ if type not in (None, 'all', 'finished', 'active'):
+ HttpServer.abort(404, 'Invalid task type: %s' % type)
+ if offset is not None:
+ try:
+ offset = int(offset)
+ except ValueError:
+ HttpServer.abort(404, 'Invalid offset: %s' % offset)
+ if num is not None:
+ try:
+ num = int(num)
+ except ValueError:
+ HttpServer.abort(404, 'Invalid count: %s' % num)
+ return self._observer.main(type, offset, num)
+
+ @HttpServer.route("/task/:task_id")
+ @HttpServer.mako_view(HttpTemplate.load('task'))
+ def handle_task(self, task_id):
+ task = self.get_task(task_id)
+ processes = self._observer.processes([task_id])
+ if not processes.get(task_id, None):
+ HttpServer.abort(404, 'Unknown task_id: %s' % task_id)
+ processes = processes[task_id]
+ state = self._observer.state(task_id)
+
+ return dict(
+ task_id = task_id,
+ task = task,
+ statuses = self._observer.task_statuses(task_id),
+ user = task['user'],
+ ports = task['ports'],
+ processes = processes,
+ chroot = state.get('sandbox', ''),
+ launch_time = state.get('launch_time', 0),
+ hostname = state.get('hostname', 'localhost'),
+ )
+
+ def get_task(self, task_id):
+ task = self._observer._task(task_id)
+ if not task:
+ HttpServer.abort(404, "Failed to find task %s. Try again shortly." % task_id)
+ return task
+
+ @HttpServer.route("/rawtask/:task_id")
+ @HttpServer.mako_view(HttpTemplate.load('rawtask'))
+ def handle_rawtask(self, task_id):
+ task = self.get_task(task_id)
+ state = self._observer.state(task_id)
+ return dict(
+ hostname = state.get('hostname', 'localhost'),
+ task_id = task_id,
+ task_struct = task['task_struct']
+ )
+
+ @HttpServer.route("/process/:task_id/:process_id")
+ @HttpServer.mako_view(HttpTemplate.load('process'))
+ def handle_process(self, task_id, process_id):
+ all_processes = {}
+ current_run = self._observer.process(task_id, process_id)
+ if not current_run:
+ HttpServer.abort(404, 'Invalid task/process combination: %s/%s' % (task_id, process_id))
+ process = self._observer.process_from_name(task_id, process_id)
+ if process is None:
+ msg = 'Could not recover process: %s/%s' % (task_id, process_id)
+ log.error(msg)
+ HttpServer.abort(404, msg)
+
+ current_run_number = current_run['process_run']
+ all_processes[current_run_number] = current_run
+ for run in range(current_run_number):
+ all_processes[run] = self._observer.process(task_id, process_id, run)
+ def convert_process_tuple(run_tuple):
+ process_tuple = dict(state = run_tuple['state'])
+ if 'start_time' in run_tuple:
+ process_tuple.update(start_time = run_tuple['start_time'])
+ if 'stop_time' in run_tuple:
+ process_tuple.update(stop_time = run_tuple['stop_time'])
+ return process_tuple
+
+ template = {
+ 'task_id': task_id,
+ 'process': {
+ 'name': process_id,
+ 'status': all_processes[current_run_number]["state"],
+ 'cmdline': process.cmdline().get()
+ },
+ }
+ template['process'].update(**all_processes[current_run_number].get('used', {}))
+ template['runs'] = dict((run, convert_process_tuple(run_tuple))
+ for run, run_tuple in all_processes.items())
+ log.info('Rendering template is: %s' % template)
+ return template
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/apache/thermos/observer/http/json.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/thermos/observer/http/json.py b/src/main/python/apache/thermos/observer/http/json.py
new file mode 100644
index 0000000..9212f0e
--- /dev/null
+++ b/src/main/python/apache/thermos/observer/http/json.py
@@ -0,0 +1,56 @@
+import urllib
+
+from twitter.common.http import HttpServer
+
+
+class TaskObserverJSONBindings(object):
+ """
+ Mixin for Thermos observer JSON endpoints.
+ """
+
+ @HttpServer.route("/j/task_ids")
+ @HttpServer.route("/j/task_ids/:which")
+ @HttpServer.route("/j/task_ids/:which/:offset")
+ @HttpServer.route("/j/task_ids/:which/:offset/:num")
+ def handle_task_ids(self, which=None, offset=None, num=None):
+ return self._observer.task_ids(
+ which,
+ int(offset) if offset is not None else 0,
+ int(num) if num is not None else 20)
+
+ @HttpServer.route("/j/task_id_count")
+ def handle_task_id_count(self):
+ return self._observer.task_id_count()
+
+ @HttpServer.route("/j/task")
+ def handle_tasks(self):
+ """
+ Additional parameters:
+ task_id = comma separated list of task_ids.
+ """
+ task_ids = HttpServer.Request.GET.get('task_id', [])
+ if task_ids:
+ task_ids = urllib.unquote(task_ids).split(',')
+ return self._observer.task(task_ids)
+
+ @HttpServer.route("/j/task/:task_id")
+ def handle_task(self, task_id):
+ return self._observer.task([task_id])
+
+ @HttpServer.route("/j/process/:task_id")
+ @HttpServer.route("/j/process/:task_id/:process")
+ @HttpServer.route("/j/process/:task_id/:process/:run")
+ def handle_process(self, task_id, process=None, run=None):
+ return self._observer.process(task_id, process, run)
+
+ @HttpServer.route("/j/processes")
+ def handle_processes(self):
+ """
+ Additional parameters:
+ task_ids = comma separated list of task_ids.
+ """
+ task_ids = HttpServer.Request.GET.get('task_id', [])
+ if task_ids:
+ task_ids = urllib.unquote(task_ids).split(',')
+ return self._observer.processes(task_ids)
+
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/apache/thermos/observer/http/static_assets.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/thermos/observer/http/static_assets.py b/src/main/python/apache/thermos/observer/http/static_assets.py
new file mode 100644
index 0000000..cf3ff08
--- /dev/null
+++ b/src/main/python/apache/thermos/observer/http/static_assets.py
@@ -0,0 +1,43 @@
+import mimetypes
+import os
+
+from twitter.common import log
+from twitter.common.http.server import HttpServer
+
+from bottle import HTTPResponse
+import pkg_resources
+
+
+class StaticAssets(object):
+ """
+ Serve the /assets directory.
+ """
+ def __init__(self):
+ self._assets = {}
+ self._detect_assets()
+
+ def _detect_assets(self):
+ log.info('detecting assets...')
+ assets = pkg_resources.resource_listdir(__name__, 'assets')
+ cached_assets = {}
+ for asset in assets:
+ log.info(' detected asset: %s' % asset)
+ cached_assets[asset] = pkg_resources.resource_string(
+ __name__, os.path.join('assets', asset))
+ self._assets = cached_assets
+
+ @HttpServer.route("/favicon.ico")
+ def handle_favicon(self):
+ HttpServer.redirect("/assets/favicon.ico")
+
+ @HttpServer.route("/assets/:filename")
+ def handle_asset(self, filename):
+ # TODO(wickman) Add static_content to bottle.
+ if filename in self._assets:
+ mimetype, encoding = mimetypes.guess_type(filename)
+ headers = {}
+ if mimetype: headers['Content-Type'] = mimetype
+ if encoding: headers['Content-Encoding'] = encoding
+ return HTTPResponse(self._assets[filename], header=headers)
+ else:
+ HttpServer.abort(404, 'Unknown asset: %s' % filename)
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/apache/thermos/observer/http/templates/filebrowse.tpl
----------------------------------------------------------------------
diff --git a/src/main/python/apache/thermos/observer/http/templates/filebrowse.tpl b/src/main/python/apache/thermos/observer/http/templates/filebrowse.tpl
new file mode 100644
index 0000000..871e31e
--- /dev/null
+++ b/src/main/python/apache/thermos/observer/http/templates/filebrowse.tpl
@@ -0,0 +1,81 @@
+<%def name="download_link()">
+ <a href='/download/${task_id}/${filename}'><font size=1>download</font></a>
+</%def>
+
+<html>
+
+<head>
+ <meta charset="utf-8">
+ <title></title>
+
+ <style type="text/css">
+ .log {
+ font-family: "Inconsolata", "Monaco", "Courier New", "Courier";
+ line-height:14px;
+ font-size: 12px;
+ }
+
+ .invert {
+ color: #FFFFFF;
+ text-decoration: none;
+ background: #000000;
+ }
+ </style>
+</head>
+
+<link rel="stylesheet"
+ type="text/css"
+ href="/assets/bootstrap.css"/>
+<style type="text/css">
+div.tight
+{
+ height:100%;
+ overflow:scroll;
+}
+</style>
+
+<title>file browser ${task_id}</title>
+<body>
+ <div> <strong> filename </strong> ${filename} </div>
+ <div> <strong> dl </strong> ${download_link()} </div>
+ <div style="position: absolute; left: 5px; top: 0px;">
+ <p id="indicator" class="log invert"></p>
+ </div>
+
+ <div id="data" class="log" style="white-space:pre-wrap; background-color:#EEEEEE;"></div>
+</body>
+
+<script src="/assets/jquery.js"></script>
+<script src="/assets/jquery.pailer.js"></script>
+
+<script>
+ function resize() {
+ var margin_left = parseInt($('body').css('margin-left'));
+ var margin_top = parseInt($('body').css('margin-top'));
+ var margin_bottom = parseInt($('body').css('margin-bottom'));
+ $('#data').width($(window).width() - margin_left);
+ $('#data').height($(window).height() - margin_top - margin_bottom);
+ }
+
+ $(window).resize(resize);
+
+ $(document).ready(function() {
+ resize();
+
+ $('#data').pailer({
+ 'read': function(options) {
+ var settings = $.extend({
+ 'offset': -1,
+ 'length': -1
+ }, options);
+
+ var url = "/filedata/${task_id}/${filename}"
+ + '?offset=' + settings.offset
+ + '&length=' + settings.length;
+ return $.getJSON(url);
+ },
+ 'indicator': $('#indicator')
+ });
+ });
+</script>
+</html>
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/apache/thermos/observer/http/templates/filelist.tpl
----------------------------------------------------------------------
diff --git a/src/main/python/apache/thermos/observer/http/templates/filelist.tpl b/src/main/python/apache/thermos/observer/http/templates/filelist.tpl
new file mode 100644
index 0000000..12a257e
--- /dev/null
+++ b/src/main/python/apache/thermos/observer/http/templates/filelist.tpl
@@ -0,0 +1,141 @@
+<%doc>
+ Template arguments:
+ task_id
+ chroot
+ path
+ dirs
+ files
+</%doc>
+
+
+<%!
+ import os
+
+ from datetime import datetime
+ import grp
+ import os
+ import pwd
+ import stat
+ import sys
+
+ NOW = datetime.now()
+
+ def format_mode(sres):
+ mode = sres.st_mode
+
+ root = (mode & 0700) >> 6
+ group = (mode & 0070) >> 3
+ user = (mode & 07)
+
+ def stat_type(md):
+ if stat.S_ISDIR(md):
+ return 'd'
+ elif stat.S_ISSOCK(md):
+ return 's'
+ else:
+ return '-'
+
+ def triple(md):
+ return '%c%c%c' % (
+ 'r' if md & 0b100 else '-',
+ 'w' if md & 0b010 else '-',
+ 'x' if md & 0b001 else '-')
+
+ return ''.join([stat_type(mode), triple(root), triple(group), triple(user)])
+
+ def format_mtime(mtime):
+ dt = datetime.fromtimestamp(mtime)
+ return '%s %2d %5s' % (dt.strftime('%b'), dt.day,
+ dt.year if dt.year != NOW.year else dt.strftime('%H:%M'))
+
+ def format_prefix(filename, sres):
+ try:
+ pwent = pwd.getpwuid(sres.st_uid)
+ user = pwent.pw_name
+ except KeyError:
+ user = sres.st_uid
+
+ try:
+ grent = grp.getgrgid(sres.st_gid)
+ group = grent.gr_name
+ except KeyError:
+ group = sres.st_gid
+
+ return '%s %3d %10s %10s %10d %s' % (
+ format_mode(sres),
+ sres.st_nlink,
+ user,
+ group,
+ sres.st_size,
+ format_mtime(sres.st_mtime),
+ )
+%>
+
+<%def name="download_link(filename)"><a href='/download/${task_id}/${os.path.join(path, filename)}'><font size=1>dl</font></a></%def>
+<%def name="directory_link(dirname)"><a href='/browse/${task_id}/${os.path.join(path, dirname)}'>${dirname}</a></%def>
+<%def name="file_link(filename)"><a href='/file/${task_id}/${os.path.join(path, filename)}'>${filename}</a></%def>
+
+<html>
+
+<link rel="stylesheet"
+ type="text/css"
+ href="/assets/bootstrap.css"/>
+<style type="text/css">
+div.tight
+{
+ height:85%;
+ overflow:auto;
+}
+</style>
+
+<title>path browser for ${task_id}</title>
+
+
+% if chroot is not None:
+<body>
+ <div class="container">
+ <div class="span6">
+ <strong> task id </strong> ${task_id}
+ </div>
+ <div class="span6">
+ <strong> path </strong> ${path}
+ </div>
+ <div class="span12 tight">
+ <pre>
+
+% if path != ".":
+ <%
+ listing = ['..'] + os.listdir(os.path.join(chroot, path))
+ %>\
+% else:
+ <%
+ listing = os.listdir(os.path.join(chroot, path))
+ %>\
+% endif
+
+<% listing.sort() %>
+
+% for fn in listing:
+<%
+ try:
+ sres = os.stat(os.path.join(chroot, path, fn))
+ except OSError:
+ continue
+%>\
+ % if not stat.S_ISDIR(sres.st_mode):
+${format_prefix(fn, sres)} ${file_link(fn)} ${download_link(fn)}
+ % else:
+${format_prefix(fn, sres)} ${directory_link(fn)}
+ % endif
+% endfor
+ </pre>
+ </div>
+ </div>
+</body>
+% else:
+<body>
+ This task is running without a chroot.
+</body>
+% endif
+
+</html>
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/apache/thermos/observer/http/templates/home.tpl
----------------------------------------------------------------------
diff --git a/src/main/python/apache/thermos/observer/http/templates/home.tpl b/src/main/python/apache/thermos/observer/http/templates/home.tpl
new file mode 100644
index 0000000..4e7426b
--- /dev/null
+++ b/src/main/python/apache/thermos/observer/http/templates/home.tpl
@@ -0,0 +1,58 @@
+<html>
+<title>thermos(${hostname})</title>
+
+<link rel="stylesheet"
+ type="text/css"
+ href="/assets/bootstrap.css"/>
+
+<body>
+
+<%!
+ import socket
+ import time
+
+ def pretty_time(seconds):
+ return time.asctime(time.localtime(seconds))
+%>
+
+<div class="container">
+ <h3> host ${socket.gethostname()} </h3>
+
+ <div class="content" id="defaultLayout">
+ <table class="zebra-striped">
+ <thead>
+ <tr>
+ <th colspan=3> task </th>
+ <th colspan=4> resources </th>
+ <th colspan=3> links </th>
+ </tr>
+
+ <tr>
+ <th> name </th> <th> role </th> <th> status </th>
+ <th> procs </th> <th> cpu </th> <th> ram </th> <th> disk </th>
+ <th> task </th> <th> chroot </th> <th> ports </th>
+ </tr>
+ </thead>
+ <tbody>
+
+ % for proc_name, proc in sorted(processes.items()):
+ <tr>
+ <td> ${proc["process_name"]} </td>
+ <td> ${proc["process_run"]} </td>
+ <td> ${proc["state"]} </td>
+ <td> ${pretty_time(float(proc["start_time"])/1000.0) if "start_time" in proc else ""} </td>
+ <td> ${pretty_time(float(proc["stop_time"])/1000.0) if "stop_time" in proc else ""} </td>
+ <td> ${'%.3f' % proc["used"]["cpu"] if "used" in proc else ""} </td>
+ <td> ${'%dMB' % (proc["used"]["ram"] / 1024 / 1024) if "used" in proc else ""} </td>
+ <td> <a href="/logs/${task_id}/${proc["process_name"]}/${proc["process_run"]}/stdout">stdout</a> </td>
+ <td> <a href="/logs/${task_id}/${proc["process_name"]}/${proc["process_run"]}/stderr">stderr</a> </td>
+ </tr>
+ % endfor
+ </tbody>
+ </table>
+ </div>
+
+</div>
+
+</body>
+</html>
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/apache/thermos/observer/http/templates/index.tpl
----------------------------------------------------------------------
diff --git a/src/main/python/apache/thermos/observer/http/templates/index.tpl b/src/main/python/apache/thermos/observer/http/templates/index.tpl
new file mode 100644
index 0000000..d662c60
--- /dev/null
+++ b/src/main/python/apache/thermos/observer/http/templates/index.tpl
@@ -0,0 +1,51 @@
+<html>
+<title>thermos({{hostname}})</title>
+
+<link rel="stylesheet"
+ type="text/css"
+ href="assets/bootstrap.css"/>
+
+<script src="assets/jquery.js"></script>
+
+<body>
+
+<div class="container" id="defaultLayout">
+ <div id="activeTaskContainer" class='uber-container'>
+ <div class="active-container" data-url="main/active">
+ </div>
+ </div>
+ <br><br>
+ <div id="finishedTaskContainer" class='uber-container'>
+ <div class="finished-container" data-url="main/finished">
+ </div>
+ </div>
+</div>
+
+<script type="text/javascript">
+
+$(document).on('click', 'a.refresh-container', function(e) {
+ e.preventDefault()
+ topLevelDivContainer = $(this).closest('.uber-container')
+ divDataUrl = $(this).attr('data-url')
+ $.ajax({
+ 'type': 'GET',
+ 'dataType': 'html',
+ 'url': divDataUrl,
+ success: function(data, xhr, err) {
+ $(topLevelDivContainer).html(data)
+ }
+ })
+ })
+
+refreshDivs = function() {
+ $('#activeTaskContainer').load($('.uber-container .active-container').attr('data-url'))
+ $('#finishedTaskContainer').load($('.uber-container .finished-container').attr('data-url'))
+}
+
+$(document).bind('ready', refreshDivs)
+setInterval(refreshDivs, 10000)
+
+</script>
+
+</body>
+</html>
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/apache/thermos/observer/http/templates/logbrowse.tpl
----------------------------------------------------------------------
diff --git a/src/main/python/apache/thermos/observer/http/templates/logbrowse.tpl b/src/main/python/apache/thermos/observer/http/templates/logbrowse.tpl
new file mode 100644
index 0000000..059979c
--- /dev/null
+++ b/src/main/python/apache/thermos/observer/http/templates/logbrowse.tpl
@@ -0,0 +1,80 @@
+<%def name="download_link()">
+ <a href='/download/${task_id}/${filename}'><font size=1>download</font></a>
+</%def>
+
+<html>
+
+<head>
+ <meta charset="utf-8">
+ <title></title>
+
+ <style type="text/css">
+ .log {
+ font-family: "Inconsolata", "Monaco", "Courier New", "Courier";
+ line-height:14px;
+ font-size: 12px;
+ }
+
+ .invert {
+ color: #FFFFFF;
+ text-decoration: none;
+ background: #000000;
+ }
+ </style>
+</head>
+
+<link rel="stylesheet"
+ type="text/css"
+ href="/assets/bootstrap.css"/>
+<style type="text/css">
+div.tight
+{
+ height:100%;
+ overflow:scroll;
+}
+</style>
+
+<title>log browser ${task_id}</title>
+<body>
+ <div> <strong> log </strong> ${logtype} <strong> ${download_link()} </strong> </div>
+ <div style="position: absolute; left: 5px; top: 0px;">
+ <p id="indicator" class="log invert"></p>
+ </div>
+
+ <div id="data" class="log" style="white-space:pre-wrap; background-color:#EEEEEE;"></div>
+</body>
+
+<script src="/assets/jquery.js"></script>
+<script src="/assets/jquery.pailer.js"></script>
+
+<script>
+ function resize() {
+ var margin_left = parseInt($('body').css('margin-left'));
+ var margin_top = parseInt($('body').css('margin-top'));
+ var margin_bottom = parseInt($('body').css('margin-bottom'));
+ $('#data').width($(window).width() - margin_left);
+ $('#data').height($(window).height() - margin_top - margin_bottom);
+ }
+
+ $(window).resize(resize);
+
+ $(document).ready(function() {
+ resize();
+
+ $('#data').pailer({
+ 'read': function(options) {
+ var settings = $.extend({
+ 'offset': -1,
+ 'length': -1
+ }, options);
+
+ var url = "/logdata/${task_id}/${process}/${run}/${logtype}"
+ + '?offset=' + settings.offset
+ + '&length=' + settings.length;
+ return $.getJSON(url);
+ },
+ 'indicator': $('#indicator')
+ });
+ });
+</script>
+</html>
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/apache/thermos/observer/http/templates/main.tpl
----------------------------------------------------------------------
diff --git a/src/main/python/apache/thermos/observer/http/templates/main.tpl b/src/main/python/apache/thermos/observer/http/templates/main.tpl
new file mode 100644
index 0000000..d3419ab
--- /dev/null
+++ b/src/main/python/apache/thermos/observer/http/templates/main.tpl
@@ -0,0 +1,101 @@
+<%doc>
+ Template arguments:
+ type
+ offset
+ num
+ tasks
+ task_count
+</%doc>
+
+<%!
+import socket
+import time
+
+try:
+ from twitter.common import app
+ observer_port = app.get_options().twitter_common_http_root_server_port
+except (ImportError, AttributeError) as e:
+ observer_port = 1338
+
+host = socket.gethostname()
+num_tasks = 20
+
+def pretty_time(seconds=time.time()):
+ return time.strftime('%m/%d %H:%M:%S', time.gmtime(seconds))
+
+%>
+
+<div class="${type}-container"
+ data-url="/main/${type}/${offset}/${num}">
+ <div class="row-fluid">
+ <div class="span2">
+ % if offset > 0:
+ <a class="refresh-container" href="#"
+ data-url="/main/${type}/${offset-num_tasks}/${num}">
+ ← newer
+ </a>
+ % endif
+ </div>
+ % if offset + num_tasks < task_count:
+ <div class="span2">
+ <a class="refresh-container" href="#"
+ data-url="/main/${type}/${offset+num_tasks}/${num}">
+ older →
+ </a>
+ </div>
+ % endif
+ </div>
+ <div class="content" id="defaultLayout">
+ <table class="table table-bordered table-condensed table-striped" style="empty-cells:show;">
+ <thead>
+ <tr>
+ <th colspan=4> ${type} tasks ${offset}...${min(task_count, offset+num_tasks) - 1} of ${task_count} </th>
+ <th colspan=3> resources </th>
+ <th colspan=3> links </th>
+ </tr>
+
+ <tr>
+ <th> name </th> <th> role </th> <th> launched </th> <th> status </th>
+ <th> cpu </th> <th> ram </th> <th> disk </th>
+ <th> task </th> <th> chroot </th> <th> ports </th>
+ </tr>
+ </thead>
+ <tbody>
+
+ % for row in tasks:
+ <tr>
+ <td> ${row["name"]} </td>
+ <td> ${row["role"]} </td>
+ <td> ${pretty_time(row["launch_timestamp"])} </td>
+ <td> ${row["state"]} @
+ ${pretty_time(row["state_timestamp"]) if row["state_timestamp"] else ""}</td>
+
+ <td> ${'%.3f' % row["cpu"] if row["cpu"] > 0 else ""} </td>
+ <td> ${'%.1fMB' % (row["ram"] / 1024. / 1024.) if row["ram"] > 0 else ""} </td>
+ <td> ${'%dGB' % (row["disk"] / 1024 / 1024 / 1024) if row["disk"] > 0 else ""} </td>
+
+ <td> <a href="/task/${row['task_id']}">info</a> </td>
+ <td> <a href="/browse/${row['task_id']}">browse</a> </td>
+ <td>
+ % if type == 'active':
+ % if 'http' in row["ports"]:
+ <a href="http://${host}:${row['ports']['http']}">http</a>
+ % else:
+ <span class="muted">http</span>
+ % endif
+ % if 'admin' in row["ports"]:
+ <a href="http://${host}:${row['ports']['admin']}">admin</a>
+ % else:
+ <span class="muted">admin</span>
+ % endif
+ % if set(row["ports"]) - set(['http', 'admin']):
+ <a href="/task/${row['task_id']}">...</a>
+ % endif
+ % endif
+ </td>
+ </tr>
+ % endfor
+ </tbody>
+ </table>
+ </div>
+</div>
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/apache/thermos/observer/http/templates/process.tpl
----------------------------------------------------------------------
diff --git a/src/main/python/apache/thermos/observer/http/templates/process.tpl b/src/main/python/apache/thermos/observer/http/templates/process.tpl
new file mode 100644
index 0000000..a7fb90d
--- /dev/null
+++ b/src/main/python/apache/thermos/observer/http/templates/process.tpl
@@ -0,0 +1,109 @@
+<%doc>
+Template arguments:
+ task_id
+ process {
+ cpu:/ram: (optional)
+ cmdline:
+ name:
+ }
+
+ runs = {
+ number: {
+ start_time: (optional)
+ stop_time: (optional)
+ state:
+ }
+ }
+
+ --
+ for each run:
+ run | state | started | finished | stdout | stderr
+</%doc>
+
+<%!
+import socket
+import time
+from xml.sax.saxutils import escape
+
+def pretty_time(seconds=time.time()):
+ return time.strftime('%m/%d %H:%M:%S', time.gmtime(seconds))
+%>
+
+<html>
+<title>thermos(${socket.gethostname()})</title>
+
+<link rel="stylesheet"
+ type="text/css"
+ href="/assets/bootstrap.css"/>
+<body>
+
+
+<div class="container">
+ <div class="row">
+ <div class="span6" id="leftBar">
+ <dl>
+ <dt> process </dt>
+ <dd> <strong> parent task </strong> <a href="/task/${task_id}">${task_id}</a> </dd>
+ <dd> <strong> process name </strong> ${process["name"]} </dd>
+ <dd> <strong> status </strong> ${process["status"]} </dd>
+ </dl>
+ </div>
+
+ <div class="span6" id="rightBar">
+ <dl>
+ <dt> resources </dt>
+ <dd> <strong> cpu </strong> ${'%.3f' % process["cpu"] if "cpu" in process else "N/A"} </dd>
+ <dd> <strong> ram </strong> ${'%.1fMB' % (process["ram"] / 1024. / 1024.) if "ram" in process else "N/A"} </dd>
+ <dd> <strong> total user </strong> ${'%.1fs' % process["user"] if "user" in process else "N/A"} </dd>
+ <dd> <strong> total sys </strong> ${'%.1fs' % process["system"] if "system" in process else "N/A"} </dd>
+ <dd> <strong> threads </strong> ${process["threads"] if "threads" in process else "N/A"} </dd>
+ </dl>
+ </div>
+ </div>
+
+
+ <strong> cmdline </strong><br>
+ <div class="container">
+<pre>
+${escape(process["cmdline"])}
+</pre>
+ </div><br><br>
+
+
+ <strong> runs </strong>
+ <div class="container">
+ <table class="table table-bordered table-condensed table-striped" style="empty-cells:show;">
+ <thead>
+ <tr>
+ <th colspan=2> </th>
+ <th colspan=2> time </th>
+ <th colspan=2> logs </th>
+ </tr>
+
+ <tr>
+ <th> run </th>
+ <th> status </th>
+ <th> started </th> <th> finished </th>
+ <th> stdout </th> <th> stderr </th>
+ </tr>
+ </thead>
+ <tbody>
+
+ % for run, process_dict in sorted(runs.items(), reverse=True):
+ <tr>
+ <td> ${run} </td>
+ <td> ${process_dict["state"]} </td>
+ <td> ${pretty_time(process_dict["start_time"]) if "start_time" in process_dict else ""} </td>
+ <td> ${pretty_time(process_dict["stop_time"]) if "stop_time" in process_dict else ""} </td>
+ <td> <a href="/logs/${task_id}/${process["name"]}/${run}/stdout">stdout</a> </td>
+ <td> <a href="/logs/${task_id}/${process["name"]}/${run}/stderr">stderr</a> </td>
+ </tr>
+ % endfor
+ </tbody>
+ </table>
+ </div>
+
+</div>
+
+</body>
+</html>
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/apache/thermos/observer/http/templates/rawtask.tpl
----------------------------------------------------------------------
diff --git a/src/main/python/apache/thermos/observer/http/templates/rawtask.tpl b/src/main/python/apache/thermos/observer/http/templates/rawtask.tpl
new file mode 100644
index 0000000..c4b7f52
--- /dev/null
+++ b/src/main/python/apache/thermos/observer/http/templates/rawtask.tpl
@@ -0,0 +1,28 @@
+<%doc>
+ Template arguments:
+ hostname
+ task_id
+ task_struct
+</%doc>
+
+<html>
+<title>thermos(${hostname})</title>
+
+<link rel="stylesheet"
+ type="text/css"
+ href="/assets/bootstrap.css"/>
+<%!
+ from json import dumps
+ def print_task(task):
+ return dumps(task.get(), indent=4)
+%>
+
+<body>
+<div class="container">
+ <h3> task ${task_id} </h3>
+ <div class="content" id="rawTask">
+ <pre>${print_task(task_struct)}</pre>
+ </div>
+</div>
+</body>
+</html>
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/apache/thermos/observer/http/templates/task.tpl
----------------------------------------------------------------------
diff --git a/src/main/python/apache/thermos/observer/http/templates/task.tpl b/src/main/python/apache/thermos/observer/http/templates/task.tpl
new file mode 100644
index 0000000..b71c80d
--- /dev/null
+++ b/src/main/python/apache/thermos/observer/http/templates/task.tpl
@@ -0,0 +1,140 @@
+<%doc>
+ Template arguments:
+ task_id
+ task
+ statuses
+ user
+ ports
+ processes
+ chroot
+ launch_time
+ hostname
+</%doc>
+
+<html>
+<title>thermos(${hostname})</title>
+
+<link rel="stylesheet"
+ type="text/css"
+ href="/assets/bootstrap.css"/>
+
+<body>
+
+<%!
+ import time
+
+ def pretty_time(seconds=time.time()):
+ return time.strftime('%m/%d %H:%M:%S', time.gmtime(seconds))
+
+ def get(task, typ):
+ return task['resource_consumption'][typ]
+
+%>
+
+<div class="container">
+
+ <h3> task ${task_id} </h3>
+
+ <div class="row">
+ <div class="span6" id="leftBar">
+ <dl>
+ <dt> task </dt>
+ <dd> <strong> status </strong> ${statuses[-1][0] if statuses else 'UNKNOWN'} </dd>
+ <dd> <strong> user </strong> ${user} </dd>
+ <dt> ports </dt>
+ % for port_name, port_number in ports.items():
+ <dd> <strong> ${port_name} </strong> <a href="http://${hostname}:${port_number}">${port_number}</a> </dd>
+ %endfor
+ </dl>
+ </div>
+
+ <div class="span6" id="rightBar">
+ <dl>
+ <dt> header </dt>
+ <dd> <strong> chroot </strong> <a href="/browse/${task_id}">browse</a> </dd>
+ <dd> <strong> hostname </strong> <a href="/">${hostname}</a> </dd>
+ <dd> <strong> launch time </strong> ${pretty_time(launch_time)} </dd>
+ <dd> <strong> task config </strong> <a href="/rawtask/${task_id}">view</a> </dd>
+ </dl>
+ </div>
+ </div>
+
+ <div class="row-fluid">
+ <div class="span8" id="taskLayout">
+ <table class="table table-bordered table-condensed table-striped" style="empty-cells:show;">
+ <thead>
+ <tr>
+ <th colspan=1> task status </th>
+ <th colspan=1> time </th>
+ </tr>
+ </thead>
+
+ <tbody>
+ % for status, timestamp in sorted(statuses, key=lambda status: status[1]):
+ <tr>
+ <td> ${status} </td> <td> ${pretty_time(timestamp)} </td>
+ </tr>
+ % endfor
+ </tbody>
+ </table>
+ </div>
+
+ <div class="span4" id="taskResources">
+ <table class="table table-bordered table-condensed table-striped" style="empty-cells:show;">
+ <thead>
+ <tr>
+ <th> cpu </th>
+ <th> ram </th>
+ <th> disk </th>
+ </tr>
+ </thead>
+
+ <tbody>
+ <tr>
+ <td> ${'%.3f' % get(task, 'cpu')} </td>
+ <td> ${'%.1fMB' % (get(task, 'ram') / 1024. / 1024.)} </td>
+ <td> ${'%.1fGB' % (get(task, 'disk') / 1024. / 1024. / 1024.)} </td>
+ </tr>
+ </tbody>
+ </table>
+ </div>
+ </div>
+
+ <div class="content" id="processesLayout">
+ <table class="table table-bordered table-condensed table-striped" style="empty-cells:show;">
+ <thead>
+ <tr>
+ <th colspan=3> process </th>
+ <th colspan=2> time </th>
+ <th colspan=2> used </th>
+ <th colspan=2> logs </th>
+ </tr>
+
+ <tr>
+ <th> name </th> <th> run </th> <th> status </th> <th> started </th> <th> finished </th>
+ <th> cpu </th> <th> ram </th>
+ <th> stdout </th> <th> stderr </th>
+ </tr>
+ </thead>
+
+ <tbody>
+ % for proc_name, proc in sorted(processes.items(), key=lambda item: item[1].get('start_time')):
+ <tr>
+ <td> <a href="/process/${task_id}/${proc["process_name"]}">${proc["process_name"]}</a> </td>
+ <td> ${proc["process_run"]} </td>
+ <td> ${proc["state"]} </td>
+ <td> ${pretty_time(proc["start_time"]) if "start_time" in proc else ""} </td>
+ <td> ${pretty_time(proc["stop_time"]) if "stop_time" in proc else ""} </td>
+ <td> ${'%.3f' % proc["used"]["cpu"] if "used" in proc else ""} </td>
+ <td> ${'%dMB' % (proc["used"]["ram"] / 1024 / 1024) if "used" in proc else ""} </td>
+ <td> <a href="/logs/${task_id}/${proc["process_name"]}/${proc["process_run"]}/stdout">stdout</a> </td>
+ <td> <a href="/logs/${task_id}/${proc["process_name"]}/${proc["process_run"]}/stderr">stderr</a> </td>
+ </tr>
+ % endfor
+ </tbody>
+ </table>
+ </div>
+</div>
+
+</body>
+</html>
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/apache/thermos/observer/http/templating.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/thermos/observer/http/templating.py b/src/main/python/apache/thermos/observer/http/templating.py
new file mode 100644
index 0000000..783308b
--- /dev/null
+++ b/src/main/python/apache/thermos/observer/http/templating.py
@@ -0,0 +1,9 @@
+import os
+import pkg_resources
+
+
+class HttpTemplate(object):
+ @staticmethod
+ def load(name):
+ return pkg_resources.resource_string(
+ __name__, os.path.join('templates', '%s.tpl' % name))
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/apache/thermos/observer/observed_task.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/thermos/observer/observed_task.py b/src/main/python/apache/thermos/observer/observed_task.py
new file mode 100644
index 0000000..f995998
--- /dev/null
+++ b/src/main/python/apache/thermos/observer/observed_task.py
@@ -0,0 +1,136 @@
+from abc import abstractproperty
+import os
+
+from twitter.common import log
+from twitter.common.lang import AbstractClass
+from twitter.thermos.config.loader import ThermosTaskWrapper
+from twitter.thermos.config.schema import ThermosContext
+from twitter.thermos.common.ckpt import CheckpointDispatcher
+
+from pystachio import Environment
+
+
+class ObservedTask(AbstractClass):
+ """ Represents a Task being observed """
+
+ @classmethod
+ def safe_mtime(cls, path):
+ try:
+ return os.path.getmtime(path)
+ except OSError:
+ return None
+
+ def __init__(self, task_id, pathspec):
+ self._task_id = task_id
+ self._pathspec = pathspec
+ self._mtime = self._get_mtime()
+
+ @abstractproperty
+ def type(self):
+ """Indicates the type of task (active or finished)"""
+
+ def _read_task(self, memoized={}):
+ """Read the corresponding task from disk and return a ThermosTask. Memoizes already-read tasks.
+ """
+ if self._task_id not in memoized:
+ path = self._pathspec.given(task_id=self._task_id, state=self.type).getpath('task_path')
+ if os.path.exists(path):
+ task = ThermosTaskWrapper.from_file(path)
+ if task is None:
+ log.error('Error reading ThermosTask from %s in observer.' % path)
+ else:
+ context = self.context(self._task_id)
+ if not context:
+ log.warning('Task not yet available: %s' % self._task_id)
+ task = task.task() % Environment(thermos=context)
+ memoized[self._task_id] = task
+
+ return memoized.get(self._task_id, None)
+
+ def _get_mtime(self):
+ """Retrieve the mtime of the task's state directory"""
+ get_path = lambda state: self._pathspec.given(
+ task_id=self._task_id, state=state).getpath('task_path')
+ mtime = self.safe_mtime(get_path('active'))
+ if mtime is None:
+ mtime = self.safe_mtime(get_path('finished'))
+ if mtime is None:
+ log.error("Couldn't get mtime for task %s!" % self._task_id)
+ return mtime
+
+ def context(self, task_id):
+ state = self.state
+ if state.header is None:
+ return None
+ return ThermosContext(
+ ports=state.header.ports if state.header.ports else {},
+ task_id=state.header.task_id,
+ user=state.header.user,
+ )
+
+ @property
+ def task(self):
+ """Return a ThermosTask representing this task"""
+ return self._read_task()
+
+ @property
+ def task_id(self):
+ """Return the task's task_id"""
+ return self._task_id
+
+ @property
+ def mtime(self):
+ """Return mtime of task file"""
+ return self._mtime
+
+ @abstractproperty
+ def state(self):
+ """Return state of task (gen.twitter.thermos.ttypes.RunnerState)"""
+
+
+class ActiveObservedTask(ObservedTask):
+ """An active Task known by the TaskObserver"""
+
+ def __init__(self, task_id, pathspec, task_monitor, resource_monitor):
+ super(ActiveObservedTask, self).__init__(task_id, pathspec)
+ self._task_monitor = task_monitor
+ self._resource_monitor = resource_monitor
+
+ @property
+ def type(self):
+ return 'active'
+
+ @property
+ def state(self):
+ """Return a RunnerState representing the current state of task, retrieved from TaskMonitor"""
+ return self.task_monitor.get_state()
+
+ @property
+ def task_monitor(self):
+ """Return a TaskMonitor monitoring this task"""
+ return self._task_monitor
+
+ @property
+ def resource_monitor(self):
+ """Return a ResourceMonitor implementation monitoring this task's resources"""
+ return self._resource_monitor
+
+
+class FinishedObservedTask(ObservedTask):
+ """A finished Task known by the TaskObserver"""
+
+ def __init__(self, task_id, pathspec):
+ super(FinishedObservedTask, self).__init__(task_id, pathspec)
+ self._state = None
+
+ @property
+ def type(self):
+ return 'finished'
+
+ @property
+ def state(self):
+ """Return final state of Task (RunnerState, read from disk and cached for future access)"""
+ if self._state is None:
+ path = self._pathspec.given(task_id=self._task_id).getpath('runner_checkpoint')
+ self._state = CheckpointDispatcher.from_file(path)
+ return self._state
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/apache/thermos/observer/task_observer.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/thermos/observer/task_observer.py b/src/main/python/apache/thermos/observer/task_observer.py
new file mode 100644
index 0000000..b6ebcf7
--- /dev/null
+++ b/src/main/python/apache/thermos/observer/task_observer.py
@@ -0,0 +1,640 @@
+"""Observe Thermos tasks on a system
+
+This module provides a number of classes for exposing information about running (active) and
+finished Thermos tasks on a system. The primary entry point is the TaskObserver, a thread which
+polls a designated Thermos checkpoint root and collates information about all tasks it discovers.
+
+"""
+from operator import attrgetter
+import os
+import threading
+import time
+
+from twitter.common import log
+from twitter.common.exceptions import ExceptionalThread
+from twitter.common.lang import Lockable
+from twitter.common.quantity import Amount, Time
+
+from twitter.thermos.common.path import TaskPath
+from twitter.thermos.monitoring.detector import TaskDetector
+from twitter.thermos.monitoring.monitor import TaskMonitor
+from twitter.thermos.monitoring.process import ProcessSample
+from twitter.thermos.monitoring.resource import ResourceMonitorBase, TaskResourceMonitor
+
+from gen.twitter.thermos.ttypes import ProcessState, TaskState
+
+from .observed_task import ActiveObservedTask, FinishedObservedTask
+
+
+class TaskObserver(ExceptionalThread, Lockable):
+ """
+ The TaskObserver monitors the thermos checkpoint root for active/finished
+ tasks. It is used to be the oracle of the state of all thermos tasks on
+ a machine.
+
+ It currently returns JSON, but really should just return objects. We should
+ then build an object->json translator.
+ """
+ class UnexpectedError(Exception): pass
+ class UnexpectedState(Exception): pass
+
+ POLLING_INTERVAL = Amount(1, Time.SECONDS)
+
+ def __init__(self, root, resource_monitor_class=TaskResourceMonitor):
+ self._pathspec = TaskPath(root=root)
+ self._detector = TaskDetector(root)
+ if not issubclass(resource_monitor_class, ResourceMonitorBase):
+ raise ValueError("resource monitor class must implement ResourceMonitorBase!")
+ self._resource_monitor = resource_monitor_class
+ self._active_tasks = {} # task_id => ActiveObservedTask
+ self._finished_tasks = {} # task_id => FinishedObservedTask
+ self._stop_event = threading.Event()
+ ExceptionalThread.__init__(self)
+ Lockable.__init__(self)
+ self.daemon = True
+
+ @property
+ def active_tasks(self):
+ """Return a dictionary of active Tasks"""
+ return self._active_tasks
+
+ @property
+ def finished_tasks(self):
+ """Return a dictionary of finished Tasks"""
+ return self._finished_tasks
+
+ @property
+ def all_tasks(self):
+ """Return a dictionary of all Tasks known by the TaskObserver"""
+ return dict(self.active_tasks.items() + self.finished_tasks.items())
+
+ def stop(self):
+ self._stop_event.set()
+
+ def start(self):
+ ExceptionalThread.start(self)
+
+ @Lockable.sync
+ def add_active_task(self, task_id):
+ if task_id in self.finished_tasks:
+ log.error('Found an active task (%s) in finished tasks?' % task_id)
+ return
+ task_monitor = TaskMonitor(self._pathspec, task_id)
+ if not task_monitor.get_state().header:
+ log.info('Unable to load task "%s"' % task_id)
+ return
+ sandbox = task_monitor.get_state().header.sandbox
+ resource_monitor = self._resource_monitor(task_monitor, sandbox)
+ resource_monitor.start()
+ self._active_tasks[task_id] = ActiveObservedTask(
+ task_id=task_id, pathspec=self._pathspec,
+ task_monitor=task_monitor, resource_monitor=resource_monitor
+ )
+
+ @Lockable.sync
+ def add_finished_task(self, task_id):
+ self._finished_tasks[task_id] = FinishedObservedTask(
+ task_id=task_id, pathspec=self._pathspec
+ )
+
+ @Lockable.sync
+ def active_to_finished(self, task_id):
+ self.remove_active_task(task_id)
+ self.add_finished_task(task_id)
+
+ @Lockable.sync
+ def remove_active_task(self, task_id):
+ task = self.active_tasks.pop(task_id)
+ task.resource_monitor.kill()
+
+ @Lockable.sync
+ def remove_finished_task(self, task_id):
+ self.finished_tasks.pop(task_id)
+
+ def run(self):
+ """
+ The internal thread for the observer. This periodically polls the
+ checkpoint root for new tasks, or transitions of tasks from active to
+ finished state.
+ """
+ while not self._stop_event.is_set():
+ time.sleep(self.POLLING_INTERVAL.as_(Time.SECONDS))
+
+ active_tasks = [task_id for _, task_id in self._detector.get_task_ids(state='active')]
+ finished_tasks = [task_id for _, task_id in self._detector.get_task_ids(state='finished')]
+
+ with self.lock:
+
+ # Ensure all tasks currently detected on the system are observed appropriately
+ for active in active_tasks:
+ if active not in self.active_tasks:
+ log.debug('task_id %s (unknown) -> active' % active)
+ self.add_active_task(active)
+ for finished in finished_tasks:
+ if finished in self.active_tasks:
+ log.debug('task_id %s active -> finished' % finished)
+ self.active_to_finished(finished)
+ elif finished not in self.finished_tasks:
+ log.debug('task_id %s (unknown) -> finished' % finished)
+ self.add_finished_task(finished)
+
+ # Remove ObservedTasks for tasks no longer detected on the system
+ for unknown in set(self.active_tasks) - set(active_tasks + finished_tasks):
+ log.debug('task_id %s active -> (unknown)' % unknown)
+ self.remove_active_task(unknown)
+ for unknown in set(self.finished_tasks) - set(active_tasks + finished_tasks):
+ log.debug('task_id %s finished -> (unknown)' % unknown)
+ self.remove_finished_task(unknown)
+
+ @Lockable.sync
+ def process_from_name(self, task_id, process_id):
+ if task_id in self.all_tasks:
+ task = self.all_tasks[task_id].task
+ if task:
+ for process in task.processes():
+ if process.name().get() == process_id:
+ return process
+
+ @Lockable.sync
+ def task_count(self):
+ """
+ Return the count of tasks that could be ready properly from disk.
+ This may be <= self.task_id_count()
+ """
+ return dict(
+ active=len(self.active_tasks),
+ finished=len(self.finished_tasks),
+ all=len(self.all_tasks),
+ )
+
+ @Lockable.sync
+ def task_id_count(self):
+ """
+ Return the raw count of active and finished task_ids from the TaskDetector.
+ """
+ num_active = len(list(self._detector.get_task_ids(state='active')))
+ num_finished = len(list(self._detector.get_task_ids(state='finished')))
+ return dict(active=num_active, finished=num_finished, all=num_active + num_finished)
+
+ def _get_tasks_of_type(self, type):
+ """Convenience function to return all tasks of a given type"""
+ tasks = {
+ 'active': self.active_tasks,
+ 'finished': self.finished_tasks,
+ 'all': self.all_tasks,
+ }.get(type, None)
+
+ if tasks is None:
+ log.error('Unknown task type %s' % type)
+ return {}
+
+ return tasks
+
+ @Lockable.sync
+ def state(self, task_id):
+ """Return a dict containing mapped information about a task's state"""
+ real_state = self.raw_state(task_id)
+ if real_state is None or real_state.header is None:
+ return {}
+ else:
+ return dict(
+ task_id=real_state.header.task_id,
+ launch_time=real_state.header.launch_time_ms / 1000.0,
+ sandbox=real_state.header.sandbox,
+ hostname=real_state.header.hostname,
+ user=real_state.header.user
+ )
+
+ @Lockable.sync
+ def raw_state(self, task_id):
+ """
+ Return the current runner state (thrift blob: gen.twitter.thermos.ttypes.RunnerState)
+ of a given task id
+ """
+ if task_id not in self.all_tasks:
+ return None
+ return self.all_tasks[task_id].state
+
+ @Lockable.sync
+ def _task_processes(self, task_id):
+ """
+ Return the processes of a task given its task_id.
+
+ Returns a map from state to processes in that state, where possible
+ states are: waiting, running, success, failed.
+ """
+ if task_id not in self.all_tasks:
+ return {}
+ state = self.raw_state(task_id)
+ if state is None or state.header is None:
+ return {}
+
+ waiting, running, success, failed, killed = [], [], [], [], []
+ for process, runs in state.processes.items():
+ # No runs ==> nothing started.
+ if len(runs) == 0:
+ waiting.append(process)
+ else:
+ if runs[-1].state in (None, ProcessState.WAITING, ProcessState.LOST):
+ waiting.append(process)
+ elif runs[-1].state in (ProcessState.FORKED, ProcessState.RUNNING):
+ running.append(process)
+ elif runs[-1].state == ProcessState.SUCCESS:
+ success.append(process)
+ elif runs[-1].state == ProcessState.FAILED:
+ failed.append(process)
+ elif runs[-1].state == ProcessState.KILLED:
+ killed.append(process)
+ else:
+ # TODO(wickman) Consider log.error instead of raising.
+ raise self.UnexpectedState(
+ "Unexpected ProcessHistoryState: %s" % state.processes[process].state)
+
+ return dict(waiting=waiting, running=running, success=success, failed=failed, killed=killed)
+
+ @Lockable.sync
+ def main(self, type=None, offset=None, num=None):
+ """Return a set of information about tasks, optionally filtered
+
+ Args:
+ type = (all|active|finished|None) [default: all]
+ offset = offset into the list of task_ids [default: 0]
+ num = number of results to return [default: 20]
+
+ Tasks are sorted by interest:
+ - active tasks are sorted by start time
+ - finished tasks are sorted by completion time
+
+ Returns:
+ {
+ tasks: [task_id_1, ..., task_id_N],
+ type: query type,
+ offset: next offset,
+ num: next num
+ }
+
+ """
+ type = type or 'all'
+ offset = offset or 0
+ num = num or 20
+
+ # Get a list of all ObservedTasks of requested type
+ tasks = sorted((task for task in self._get_tasks_of_type(type).values()),
+ key=attrgetter('mtime'), reverse=True)
+
+ # Filter by requested offset + number of results
+ end = num
+ if offset < 0:
+ offset = offset % len(tasks) if len(tasks) > abs(offset) else 0
+ end += offset
+ tasks = tasks[offset:end]
+
+ def task_row(observed_task):
+ """Generate an output row for a Task"""
+ task = self._task(observed_task.task_id)
+ # tasks include those which could not be found properly and are hence empty {}
+ if task:
+ return dict(
+ task_id=observed_task.task_id,
+ name=task['name'],
+ role=task['user'],
+ launch_timestamp=task['launch_timestamp'],
+ state=task['state'],
+ state_timestamp=task['state_timestamp'],
+ ports=task['ports'],
+ **task['resource_consumption'])
+
+ return dict(
+ tasks=filter(None, map(task_row, tasks)),
+ type=type,
+ offset=offset,
+ num=num,
+ task_count=self.task_count()[type],
+ )
+
+ def _sample(self, task_id):
+ if task_id not in self.active_tasks:
+ log.debug("Task %s not found in active tasks" % task_id)
+ sample = ProcessSample.empty().to_dict()
+ sample['disk'] = 0
+ else:
+ resource_sample = self.active_tasks[task_id].resource_monitor.sample()[1]
+ sample = resource_sample.process_sample.to_dict()
+ sample['disk'] = resource_sample.disk_usage
+ log.debug("Got sample for task %s: %s" % (task_id, sample))
+ return sample
+
+ @Lockable.sync
+ def task_statuses(self, task_id):
+ """
+ Return the sequence of task states.
+
+ [(task_state [string], timestamp), ...]
+ """
+
+ # Unknown task_id.
+ if task_id not in self.all_tasks:
+ return []
+
+ task = self.all_tasks[task_id]
+ if task is None:
+ return []
+
+ state = self.raw_state(task_id)
+ if state is None or state.header is None:
+ return []
+
+ # Get the timestamp of the transition into the current state.
+ return [
+ (TaskState._VALUES_TO_NAMES.get(st.state, 'UNKNOWN'), st.timestamp_ms / 1000)
+ for st in state.statuses]
+
+ @Lockable.sync
+ def _task(self, task_id):
+ """
+ Return composite information about a particular task task_id, given the below
+ schema.
+
+ {
+ task_id: string,
+ name: string,
+ user: string,
+ launch_timestamp: seconds,
+ state: string [ACTIVE, SUCCESS, FAILED]
+ ports: { name1: 'url', name2: 'url2' }
+ resource_consumption: { cpu:, ram:, disk: }
+ processes: { -> names only
+ waiting: [],
+ running: [],
+ success: [],
+ failed: []
+ }
+ }
+ """
+ # Unknown task_id.
+ if task_id not in self.all_tasks:
+ return {}
+
+ task = self.all_tasks[task_id].task
+ if task is None:
+ # TODO(wickman) Can this happen?
+ log.error('Could not find task: %s' % task_id)
+ return {}
+
+ state = self.raw_state(task_id)
+ if state is None or state.header is None:
+ # TODO(wickman) Can this happen?
+ return {}
+
+ # Get the timestamp of the transition into the current state.
+ current_state = state.statuses[-1].state
+ last_state = state.statuses[0]
+ state_timestamp = 0
+ for status in state.statuses:
+ if status.state == current_state and last_state != current_state:
+ state_timestamp = status.timestamp_ms / 1000
+ last_state = status.state
+
+ return dict(
+ task_id=task_id,
+ name=task.name().get(),
+ launch_timestamp=state.statuses[0].timestamp_ms / 1000,
+ state=TaskState._VALUES_TO_NAMES[state.statuses[-1].state],
+ state_timestamp=state_timestamp,
+ user=state.header.user,
+ resource_consumption=self._sample(task_id),
+ ports=state.header.ports,
+ processes=self._task_processes(task_id),
+ task_struct=task,
+ )
+
+ @Lockable.sync
+ def _get_process_resource_consumption(self, task_id, process_name):
+ if task_id not in self.active_tasks:
+ log.debug("Task %s not found in active tasks" % task_id)
+ return ProcessSample.empty().to_dict()
+ sample = self.active_tasks[task_id].resource_monitor.sample_by_process(process_name).to_dict()
+ log.debug('Resource consumption (%s, %s) => %s' % (task_id, process_name, sample))
+ return sample
+
+ @Lockable.sync
+ def _get_process_tuple(self, history, run):
+ """
+ Return the basic description of a process run if it exists, otherwise
+ an empty dictionary.
+
+ {
+ process_name: string
+ process_run: int
+ state: string [WAITING, FORKED, RUNNING, SUCCESS, KILLED, FAILED, LOST]
+ (optional) start_time: seconds from epoch
+ (optional) stop_time: seconds from epoch
+ }
+ """
+ if len(history) == 0:
+ return {}
+ if run >= len(history):
+ return {}
+ else:
+ process_run = history[run]
+ run = run % len(history)
+ d = dict(
+ process_name=process_run.process,
+ process_run=run,
+ state=ProcessState._VALUES_TO_NAMES[process_run.state],
+ )
+ if process_run.start_time:
+ d.update(start_time=process_run.start_time)
+ if process_run.stop_time:
+ d.update(stop_time=process_run.stop_time)
+ return d
+
+ @Lockable.sync
+ def process(self, task_id, process, run=None):
+ """
+ Returns a process run, where the schema is given below:
+
+ {
+ process_name: string
+ process_run: int
+ used: { cpu: float, ram: int bytes, disk: int bytes }
+ start_time: (time since epoch in millis (utc))
+ stop_time: (time since epoch in millis (utc))
+ state: string [WAITING, FORKED, RUNNING, SUCCESS, KILLED, FAILED, LOST]
+ }
+
+ If run is None, return the latest run.
+ """
+ state = self.raw_state(task_id)
+ if state is None or state.header is None:
+ return {}
+ if process not in state.processes:
+ return {}
+ history = state.processes[process]
+ run = int(run) if run is not None else -1
+ tup = self._get_process_tuple(history, run)
+ if not tup:
+ return {}
+ if tup.get('state') == 'RUNNING':
+ tup.update(used=self._get_process_resource_consumption(task_id, process))
+ return tup
+
+ @Lockable.sync
+ def _processes(self, task_id):
+ """
+ Return
+ {
+ process1: { ... }
+ process2: { ... }
+ ...
+ processN: { ... }
+ }
+
+ where processK is the latest run of processK and in the schema as
+ defined by process().
+ """
+
+ if task_id not in self.all_tasks:
+ return {}
+ state = self.raw_state(task_id)
+ if state is None or state.header is None:
+ return {}
+
+ processes = self._task_processes(task_id)
+ d = dict()
+ for process_type in processes:
+ for process_name in processes[process_type]:
+ d[process_name] = self.process(task_id, process_name)
+ return d
+
+ @Lockable.sync
+ def processes(self, task_ids):
+ """
+ Given a list of task_ids, returns a map of task_id => processes, where processes
+ is defined by the schema in _processes.
+ """
+ if not isinstance(task_ids, (list, tuple)):
+ return {}
+ return dict((task_id, self._processes(task_id)) for task_id in task_ids)
+
+ @Lockable.sync
+ def get_run_number(self, runner_state, process, run=None):
+ if runner_state is not None and runner_state.processes is not None:
+ run = run if run is not None else -1
+ if run < len(runner_state.processes[process]):
+ if len(runner_state.processes[process]) > 0:
+ return run % len(runner_state.processes[process])
+
+ @Lockable.sync
+ def logs(self, task_id, process, run=None):
+ """
+ Given a task_id and a process and (optional) run number, return a dict:
+ {
+ stderr: [dir, filename]
+ stdout: [dir, filename]
+ }
+
+ If the run number is unspecified, uses the latest run.
+
+ TODO(wickman) Just return the filenames directly?
+ """
+ runner_state = self.raw_state(task_id)
+ if runner_state is None or runner_state.header is None:
+ return {}
+ run = self.get_run_number(runner_state, process, run)
+ if run is None:
+ return {}
+ log_path = self._pathspec.given(task_id=task_id, process=process, run=run,
+ log_dir=runner_state.header.log_dir).getpath('process_logdir')
+ return dict(
+ stdout=[log_path, 'stdout'],
+ stderr=[log_path, 'stderr']
+ )
+
+ @staticmethod
+ def _sanitize_path(base_path, relpath):
+ """
+ Attempts to sanitize a path through path normalization, also making sure
+ that the relative path is contained inside of base_path.
+ """
+ if relpath is None:
+ relpath = "."
+ normalized_base = os.path.realpath(base_path)
+ normalized = os.path.realpath(os.path.join(base_path, relpath))
+ if normalized.startswith(normalized_base):
+ return (normalized_base, os.path.relpath(normalized, normalized_base))
+ return (None, None)
+
+ @Lockable.sync
+ def valid_file(self, task_id, path):
+ """
+ Like valid_path, but also verify the given path is a file
+ """
+ chroot, path = self.valid_path(task_id, path)
+ if chroot and path and os.path.isfile(os.path.join(chroot, path)):
+ return chroot, path
+ return None, None
+
+ @Lockable.sync
+ def valid_path(self, task_id, path):
+ """
+ Given a task_id and a path within that task_id's sandbox, verify:
+ (1) it's actually in the sandbox and not outside
+ (2) it's a valid, existing path
+ Returns chroot and the pathname relative to that chroot.
+ """
+ runner_state = self.raw_state(task_id)
+ if runner_state is None or runner_state.header is None:
+ return None, None
+ try:
+ chroot = runner_state.header.sandbox
+ except AttributeError:
+ return None, None
+ chroot, path = self._sanitize_path(chroot, path)
+ if chroot and path:
+ return chroot, path
+ return None, None
+
+ @Lockable.sync
+ def files(self, task_id, path=None):
+ """
+ Returns dictionary
+ {
+ task_id: task_id
+ chroot: absolute directory on machine
+ path: sanitized relative path w.r.t. chroot
+ dirs: list of directories
+ files: list of files
+ }
+ """
+ # TODO(jon): DEPRECATED: most of the necessary logic is handled directly in the templates.
+ # Also, global s/chroot/sandbox/?
+ empty = dict(task_id=task_id, chroot=None, path=None, dirs=None, files=None)
+ path = path if path is not None else '.'
+ runner_state = self.raw_state(task_id)
+ if runner_state is None:
+ return empty
+ try:
+ chroot = runner_state.header.sandbox
+ except AttributeError:
+ return empty
+ if chroot is None: # chroot-less job
+ return empty
+ chroot, path = self._sanitize_path(chroot, path)
+ if (chroot is None or path is None
+ or not os.path.isdir(os.path.join(chroot, path))):
+ return empty
+ names = os.listdir(os.path.join(chroot, path))
+ dirs, files = [], []
+ for name in names:
+ if os.path.isdir(os.path.join(chroot, path, name)):
+ dirs.append(name)
+ else:
+ files.append(name)
+ return dict(
+ task_id=task_id,
+ chroot=chroot,
+ path=path,
+ dirs=dirs,
+ files=files
+ )
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/apache/thermos/testing/BUILD
----------------------------------------------------------------------
diff --git a/src/main/python/apache/thermos/testing/BUILD b/src/main/python/apache/thermos/testing/BUILD
new file mode 100644
index 0000000..b5485d0
--- /dev/null
+++ b/src/main/python/apache/thermos/testing/BUILD
@@ -0,0 +1,14 @@
+python_library(
+ name = 'runner',
+ sources = ['runner.py'],
+ dependencies = [
+ pants('aurora/twitterdeps/src/python/twitter/common/contextutil'),
+ pants('aurora/twitterdeps/src/python/twitter/common/log'),
+ pants('src/main/python/twitter/thermos/common:ckpt'),
+ pants('src/main/python/twitter/thermos/common:path'),
+ pants('src/main/python/twitter/thermos/config'),
+ pants('src/main/python/twitter/thermos/core'),
+ pants('src/main/python/twitter/thermos:thrift'),
+ pants('src/main/thrift/com/twitter/thermos:py-thrift'),
+ ]
+)
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/apache/thermos/testing/__init__.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/thermos/testing/__init__.py b/src/main/python/apache/thermos/testing/__init__.py
new file mode 100644
index 0000000..e69de29
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/apache/thermos/testing/runner.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/thermos/testing/runner.py b/src/main/python/apache/thermos/testing/runner.py
new file mode 100644
index 0000000..930f41b
--- /dev/null
+++ b/src/main/python/apache/thermos/testing/runner.py
@@ -0,0 +1,191 @@
+from __future__ import print_function
+
+import atexit
+import errno
+import os
+import shutil
+import subprocess
+import sys
+import tempfile
+import time
+
+from twitter.common import log
+from twitter.common.contextutil import temporary_file, environment_as
+from twitter.thermos.common.path import TaskPath
+from twitter.thermos.common.ckpt import CheckpointDispatcher
+from twitter.thermos.config.loader import ThermosTaskWrapper
+from thrift.TSerialization import deserialize as thrift_deserialize
+
+from gen.twitter.thermos.ttypes import (
+ TaskState,
+ RunnerCkpt,
+ RunnerState,
+)
+
+
+class Runner(object):
+ RUN_JOB_SCRIPT = """
+import os
+import random
+import sys
+from twitter.common import log
+from twitter.common.log.options import LogOptions
+from twitter.thermos.config.loader import ThermosConfigLoader
+from twitter.thermos.core.helper import TaskRunnerHelper
+from twitter.thermos.core.runner import TaskRunner, TaskRunnerUniversalHandler
+from thrift.TSerialization import serialize as thrift_serialize
+
+random.seed(%(random_seed)d)
+
+log.init('runner_base')
+LogOptions.set_disk_log_level('DEBUG')
+
+task = ThermosConfigLoader.load_json('%(filename)s')
+task = task.tasks()[0].task
+
+success_rate=%(success_rate)d
+
+class AngryHandler(TaskRunnerUniversalHandler):
+ def checkpoint(self, record):
+ if not self._runner._recovery:
+ if random.randint(0, 100) <= success_rate:
+ super(AngryHandler, self).checkpoint(record)
+ else:
+ sys.exit(1)
+
+sandbox = os.path.join('%(sandbox)s', '%(task_id)s')
+args = {}
+args['task_id'] = '%(task_id)s'
+if %(portmap)s:
+ args['portmap'] = %(portmap)s
+args['universal_handler'] = AngryHandler
+
+runner = TaskRunner(task, '%(root)s', sandbox, **args)
+runner.run()
+
+with open('%(state_filename)s', 'w') as fp:
+ fp.write(thrift_serialize(runner.state))
+"""
+
+ def __init__(self, task, portmap={}, success_rate=100, random_seed=31337):
+ """
+ task = Thermos task
+ portmap = port map
+ success_rate = success rate of writing checkpoint to disk
+ """
+ self.task = task
+
+ with temporary_file(cleanup=False) as fp:
+ self.job_filename = fp.name
+ fp.write(ThermosTaskWrapper(task).to_json())
+
+ self.state_filename = tempfile.mktemp()
+ self.tempdir = tempfile.mkdtemp()
+ self.task_id = '%s-runner-base' % int(time.time()*1000000)
+ self.sandbox = os.path.join(self.tempdir, 'sandbox')
+ self.portmap = portmap
+ self.cleaned = False
+ self.pathspec = TaskPath(root = self.tempdir, task_id = self.task_id)
+ self.script_filename = None
+ self.success_rate = success_rate
+ self.random_seed = random_seed
+ self._run_count = 0
+
+ @property
+ def pid(self):
+ return self.po.pid
+
+ @property
+ def root(self):
+ return self.tempdir
+
+ def run(self):
+ self._run_count += 1
+ atexit.register(self.cleanup)
+
+ if self.script_filename:
+ os.unlink(self.script_filename)
+
+ with temporary_file(cleanup=False) as fp:
+ self.script_filename = fp.name
+ fp.write(self.RUN_JOB_SCRIPT % {
+ 'filename': self.job_filename,
+ 'sandbox': self.sandbox,
+ 'root': self.tempdir,
+ 'task_id': self.task_id,
+ 'state_filename': self.state_filename,
+ 'portmap': repr(self.portmap),
+ 'success_rate': self.success_rate,
+ 'random_seed': self.random_seed + self._run_count,
+ })
+
+ with environment_as(PYTHONPATH=os.pathsep.join(sys.path)):
+ self.po = subprocess.Popen([sys.executable, self.script_filename],
+ stdout=subprocess.PIPE, stderr=subprocess.PIPE)
+ try:
+ so, se = self.po.communicate()
+ except OSError as e:
+ if e.errno == errno.ECHILD:
+ so = se = 'Killed'
+ else:
+ raise
+
+ rc = self.po.returncode
+ if rc != 0:
+ if os.path.exists(self.job_filename):
+ config = open(self.job_filename).read()
+ else:
+ config = 'Nonexistent!'
+ if 'THERMOS_DEBUG' in os.environ:
+ print("Runner failed!\n\n\nconfig:%s\n\n\nstdout:%s\n\n\nstderr:%s\n\n\n" % (
+ config, so, se))
+
+ try:
+ with open(self.state_filename, 'r') as fp:
+ self.state = thrift_deserialize(RunnerState(), fp.read())
+ except Exception as e:
+ if 'THERMOS_DEBUG' in os.environ:
+ print('Failed to load Runner state: %s' % e, file=sys.stderr)
+ self.state = RunnerState()
+
+ try:
+ self.reconstructed_state = CheckpointDispatcher.from_file(
+ self.pathspec.getpath('runner_checkpoint'))
+ except:
+ self.reconstructed_state = None
+ self.initialized = True
+ return rc
+
+ def cleanup(self):
+ if not self.cleaned:
+ if hasattr(self, 'po'):
+ try:
+ self.po.kill()
+ except:
+ pass
+ os.unlink(self.job_filename)
+ os.unlink(self.script_filename)
+ if 'THERMOS_DEBUG' not in os.environ:
+ shutil.rmtree(self.tempdir, ignore_errors=True)
+ else:
+ print('Logs saved in %s' % self.tempdir)
+ self.cleaned = True
+
+
+class RunnerTestBase(object):
+ @classmethod
+ def task(cls):
+ raise NotImplementedError
+
+ @classmethod
+ def setup_class(cls):
+ cls.runner = Runner(cls.task(), portmap=getattr(cls, 'portmap', {}))
+ cls.runner.run()
+ cls.state = cls.runner.state
+
+ @classmethod
+ def teardown_class(cls):
+ cls.runner.cleanup()
+
+ def test_runner_state_reconstruction(self):
+ assert self.state == self.runner.reconstructed_state
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/twitter/__init__.py
----------------------------------------------------------------------
diff --git a/src/main/python/twitter/__init__.py b/src/main/python/twitter/__init__.py
deleted file mode 100644
index de40ea7..0000000
--- a/src/main/python/twitter/__init__.py
+++ /dev/null
@@ -1 +0,0 @@
-__import__('pkg_resources').declare_namespace(__name__)
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/twitter/aurora/BUILD
----------------------------------------------------------------------
diff --git a/src/main/python/twitter/aurora/BUILD b/src/main/python/twitter/aurora/BUILD
deleted file mode 100644
index e69de29..0000000
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/twitter/aurora/BUILD.thirdparty
----------------------------------------------------------------------
diff --git a/src/main/python/twitter/aurora/BUILD.thirdparty b/src/main/python/twitter/aurora/BUILD.thirdparty
deleted file mode 100644
index 7a43aca..0000000
--- a/src/main/python/twitter/aurora/BUILD.thirdparty
+++ /dev/null
@@ -1,19 +0,0 @@
-def make_dep(name, version, dependency_name=None):
- """Build a target from a specified dependency tuple.
-
- name is the target name, specified in other BUILD files.
- version is a hardcoded version string
- dependency_name is used to identify the specific binary to resolve
- """
- dependency_name = dependency_name or name
- versioned_name = "%s==%s" % (dependency_name, version)
- python_requirement(requirement=versioned_name, name=name)
-
-make_dep('argparse', '1.2.1')
-make_dep('mesos-core', '0.15.0-rc4', 'mesos')
-make_dep('mock', '1.0.1')
-make_dep('mox', '0.5.3')
-make_dep('psutil', '1.1.2')
-make_dep('pystachio', '0.7.2')
-make_dep('pyyaml', '3.10', 'PyYAML')
-make_dep('thrift', '0.9.1')
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/twitter/aurora/__init__.py
----------------------------------------------------------------------
diff --git a/src/main/python/twitter/aurora/__init__.py b/src/main/python/twitter/aurora/__init__.py
deleted file mode 100644
index b0d6433..0000000
--- a/src/main/python/twitter/aurora/__init__.py
+++ /dev/null
@@ -1 +0,0 @@
-__import__('pkg_resources').declare_namespace(__name__)
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/twitter/aurora/admin/BUILD
----------------------------------------------------------------------
diff --git a/src/main/python/twitter/aurora/admin/BUILD b/src/main/python/twitter/aurora/admin/BUILD
deleted file mode 100644
index c8089b4..0000000
--- a/src/main/python/twitter/aurora/admin/BUILD
+++ /dev/null
@@ -1,11 +0,0 @@
-python_library(
- name = 'mesos_maintenance',
- sources = 'mesos_maintenance.py',
- dependencies = [
- pants('aurora/twitterdeps/src/python/twitter/common/log'),
- pants('aurora/twitterdeps/src/python/twitter/common/quantity'),
- pants('src/main/python/twitter/aurora/client:api'),
- pants('src/main/python/twitter/aurora/client:base'),
- pants('src/main/thrift/com/twitter/aurora/gen:py-thrift'),
- ]
-)
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/twitter/aurora/admin/__init__.py
----------------------------------------------------------------------
diff --git a/src/main/python/twitter/aurora/admin/__init__.py b/src/main/python/twitter/aurora/admin/__init__.py
deleted file mode 100644
index e69de29..0000000