You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aurora.apache.org by wf...@apache.org on 2013/12/31 22:20:15 UTC
[22/51] [partial] Rename twitter* and com.twitter to apache and
org.apache directories to preserve all file history before the refactor.
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/apache/thermos/common/path.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/thermos/common/path.py b/src/main/python/apache/thermos/common/path.py
new file mode 100644
index 0000000..28e07ac
--- /dev/null
+++ b/src/main/python/apache/thermos/common/path.py
@@ -0,0 +1,101 @@
+import os
+
+
+class TaskPath(object):
+ """
+ Handle the resolution / detection of the path structure for thermos tasks.
+
+ This is used by the runner to determine where it should be dumping checkpoints and writing
+ stderr/stdout, and by the observer to determine how to detect the running tasks on the system.
+
+ Examples:
+ pathspec = TaskPath(root = "/var/run/thermos")
+ ^ substitution dictionary for DIR_TEMPLATE
+
+
+ which template to acquire
+ v
+ pathspec.given(task_id = "12345-thermos-wickman-23", state='active').getpath("task_path")
+ ^
+ further substitutions DIR_TEMPLATE
+
+
+ As a detection mechanism:
+ path_glob = pathspec.given(task_id = "*").getpath(task_type)
+ matching_paths = glob.glob(path_glob)
+
+ path_re = pathspec.given(task_id = "(\S+)").getpath(task_type)
+ path_re = re.compile(path_re)
+
+ ids = []
+ for path in matching_paths:
+ matched_blobs = path_re.match(path).groups()
+ ids.append(int(matched_blobs[0]))
+ return ids
+ """
+
+ class UnknownPath(Exception): pass
+ class UnderspecifiedPath(Exception): pass
+
+ DEFAULT_CHECKPOINT_ROOT = "/var/run/thermos"
+ KNOWN_KEYS = [ 'root', 'task_id', 'state', 'process', 'run', 'log_dir' ]
+ LEGACY_KNOWN_KEYS = KNOWN_KEYS[:-1]
+
+ DIR_TEMPLATE = {
+ 'task_path': ['%(root)s', 'tasks', '%(state)s', '%(task_id)s'],
+ 'checkpoint_path': ['%(root)s', 'checkpoints', '%(task_id)s'],
+ 'runner_checkpoint': ['%(root)s', 'checkpoints', '%(task_id)s', 'runner'],
+ 'process_checkpoint': ['%(root)s', 'checkpoints', '%(task_id)s', 'coordinator.%(process)s'],
+ 'process_logbase': ['%(log_dir)s'],
+ 'process_logdir': ['%(log_dir)s', '%(process)s', '%(run)s']
+ }
+
+ LEGACY_DIR_TEMPLATE = DIR_TEMPLATE.copy()
+ LEGACY_DIR_TEMPLATE.update(
+ process_logbase = ['%(root)s', 'logs', '%(task_id)s'],
+ process_logdir = ['%(root)s', 'logs', '%(task_id)s', '%(process)s', '%(run)s']
+ )
+
+ def __init__(self, **kw):
+ self._filename = None
+ # initialize with self-interpolating values
+ if kw.get('root') is None:
+ kw['root'] = self.DEFAULT_CHECKPOINT_ROOT
+ # Before log_dir was added explicitly to RunnerHeader, it resolved to %(root)s/logs
+ if kw.get('log_dir'):
+ self._template, keys = self.DIR_TEMPLATE, self.KNOWN_KEYS
+ else:
+ self._template, keys = self.LEGACY_DIR_TEMPLATE, self.LEGACY_KNOWN_KEYS
+ self._data = dict((key, '%%(%s)s' % key) for key in keys)
+ self._data.update(kw)
+
+ def given(self, **kw):
+ """ Perform further interpolation of the templates given the kwargs """
+ eval_dict = dict(self._data) # copy
+ eval_dict.update(kw)
+ tp = TaskPath(**eval_dict)
+ tp._filename = self._filename
+ return tp
+
+ def with_filename(self, filename):
+ """ Return a TaskPath with the specific filename appended to the end of the path """
+ wp = TaskPath(**self._data)
+ wp._filename = filename
+ return wp
+
+ def getpath(self, pathname):
+ if pathname not in self._template:
+ raise self.UnknownPath("Internal error, unknown id: %s" % pathname)
+ path = self._template[pathname][:]
+
+ if self._filename:
+ path += [self._filename]
+ path = os.path.join(*path)
+ interpolated_path = path % self._data
+ try:
+ _ = interpolated_path % {}
+ except KeyError:
+ raise TaskPath.UnderspecifiedPath(
+ "Tried to interpolate path with insufficient variables: %s as %s" % (
+ pathname, interpolated_path))
+ return interpolated_path
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/apache/thermos/common/planner.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/thermos/common/planner.py b/src/main/python/apache/thermos/common/planner.py
new file mode 100644
index 0000000..a886dad
--- /dev/null
+++ b/src/main/python/apache/thermos/common/planner.py
@@ -0,0 +1,304 @@
+"""Planners to schedule processes within Tasks.
+
+TaskPlanner:
+ - a daemon process can depend upon a regular process
+ - a regular process cannot depend upon a daemon process
+ - a non-ephemeral process cannot depend upon an ephemeral process
+"""
+
+from collections import defaultdict, namedtuple
+import copy
+from functools import partial
+import sys
+import time
+
+class Planner(object):
+ """
+ Given a set of process names and a graph of dependencies between them, determine
+ what can run predicated upon process completions.
+ """
+ class InvalidSchedule(Exception): pass
+
+ @staticmethod
+ def filter_runnable(processes, dependencies):
+ return set(process for process in processes if not dependencies.get(process))
+
+ @staticmethod
+ def filter_dependencies(dependencies, given=frozenset()):
+ """
+ Provided a map of process => list of process :dependencies, and a set of satisfied
+ prior processes in :given, return the new map of dependencies with priors removed.
+ """
+ dependencies = copy.deepcopy(dependencies)
+ for process_set in dependencies.values():
+ process_set -= given
+ return dependencies
+
+ @staticmethod
+ def satisfiable(processes, dependencies):
+ """
+ Given a set of processes and a dependency map, determine if this is a consistent
+ schedule without cycles.
+ """
+ processes = copy.copy(processes)
+ dependencies = copy.deepcopy(dependencies)
+
+ scheduling = True
+ while scheduling:
+ scheduling = False
+ runnables = Planner.filter_runnable(processes, dependencies)
+ if runnables:
+ scheduling = True
+ processes -= runnables
+ dependencies = Planner.filter_dependencies(dependencies, given=runnables)
+ return len(processes) == 0
+
+ def __init__(self, processes, dependencies):
+ self._processes = set(processes)
+ self._dependencies = dict((process, set(dependencies.get(process, [])))
+ for process in self._processes)
+ if not Planner.satisfiable(self._processes, self._dependencies):
+ raise Planner.InvalidSchedule("Cycles detected in the task schedule!")
+ self._running = set()
+ self._finished = set()
+ self._failed = set()
+
+ @property
+ def runnable(self):
+ return Planner.filter_runnable(self._processes - self._running - self._finished - self._failed,
+ Planner.filter_dependencies(self._dependencies, given=self._finished))
+
+ @property
+ def processes(self):
+ return set(self._processes)
+
+ @property
+ def running(self):
+ return set(self._running)
+
+ @property
+ def finished(self):
+ return set(self._finished)
+
+ @property
+ def failed(self):
+ return set(self._failed)
+
+ def reset(self, process):
+ assert process in self._running
+ assert process not in self._finished
+ assert process not in self._failed
+ self._running.discard(process)
+
+ def set_running(self, process):
+ assert process not in self._failed
+ assert process not in self._finished
+ assert process in self._running or process in self.runnable
+ self._running.add(process)
+
+ def set_finished(self, process):
+ assert process in self._running
+ assert process not in self._failed
+ self._running.discard(process)
+ self._finished.add(process)
+
+ def set_failed(self, process):
+ assert process in self._running
+ assert process not in self._finished
+ self._running.discard(process)
+ self._failed.add(process)
+
+ def is_complete(self):
+ return self._finished.union(self._failed) == self._processes
+
+
+TaskAttributes = namedtuple('TaskAttributes', 'min_duration is_daemon max_failures is_ephemeral')
+
+class TaskPlanner(object):
+ """
+ A planner for the processes part of a Thermos task, taking into account ephemeral and daemon
+ bits, in addition to duration restrictions [and failure limits?].
+
+ is_daemon
+ .----------------------------------------------------.
+ | |
+ | clock gate .----------------------. |
+ | .---------------> | runnable && !waiting | |
+ v | `----------------------' |
+ .----------. | |
+ | runnable | | set_running |
+ `----------' v |
+ ^ forget .---------. | !is_daemon .----------.
+ `------------------------| running |--------------+------------> | finished |
+ ^ `---------' add_success `----------'
+ | |
+ | under failure limit | add_failure
+ `-----------------------------+
+ | past failure limit
+ v
+ .--------.
+ | failed |
+ `--------'
+ """
+ InvalidSchedule = Planner.InvalidSchedule
+ INFINITY = sys.float_info.max
+ TOTAL_RUN_LIMIT = sys.maxsize
+
+ @staticmethod
+ def extract_dependencies(task, process_filter=None):
+ """
+ Construct a set of processes and the process dependencies from a Thermos Task.
+ """
+ process_map = dict((process.name().get(), process)
+ for process in filter(process_filter, task.processes()))
+ processes = set(process_map)
+ dependencies = defaultdict(set)
+ if task.has_constraints():
+ for constraint in task.constraints():
+ # handle process orders
+ process_names = constraint.order().get()
+ process_name_set = set(process_names)
+ # either all process_names must be in processes or none should be
+ if process_name_set.issubset(processes) == process_name_set.isdisjoint(processes):
+ raise TaskPlanner.InvalidSchedule('Invalid process dependencies!')
+ if not process_name_set.issubset(processes):
+ continue
+ for k in range(1, len(process_names)):
+ pnk, pnk1 = process_names[k], process_names[k-1]
+ if process_map[pnk1].daemon().get():
+ raise TaskPlanner.InvalidSchedule(
+ 'Process %s may not depend upon daemon process %s' % (pnk, pnk1))
+ if not process_map[pnk].ephemeral().get() and process_map[pnk1].ephemeral().get():
+ raise TaskPlanner.InvalidSchedule(
+ 'Non-ephemeral process %s may not depend upon ephemeral process %s' % (pnk, pnk1))
+ dependencies[pnk].add(pnk1)
+ return (processes, dependencies)
+
+ def __init__(self, task, clock=time, process_filter=None):
+ self._filter = process_filter
+ assert self._filter is None or callable(self._filter), (
+ 'TaskPlanner must be given callable process filter.')
+ self._planner = Planner(*self.extract_dependencies(task, self._filter))
+ self._clock = clock
+ self._last_terminal = {} # process => timestamp of last terminal state
+ self._failures = defaultdict(int)
+ self._successes = defaultdict(int)
+ self._attributes = {}
+ self._ephemerals = set(process.name().get() for process in task.processes()
+ if (self._filter is None or self._filter(process)) and process.ephemeral().get())
+
+ for process in filter(self._filter, task.processes()):
+ self._attributes[process.name().get()] = TaskAttributes(
+ is_daemon=bool(process.daemon().get()),
+ is_ephemeral=bool(process.ephemeral().get()),
+ max_failures=process.max_failures().get(),
+ min_duration=process.min_duration().get())
+
+ def get_wait(self, process, timestamp=None):
+ now = timestamp if timestamp is not None else self._clock.time()
+ if process not in self._last_terminal:
+ return 0
+ return self._attributes[process].min_duration - (now - self._last_terminal[process])
+
+ def is_ready(self, process, timestamp=None):
+ return self.get_wait(process, timestamp) <= 0
+
+ def is_waiting(self, process, timestamp=None):
+ return not self.is_ready(process, timestamp)
+
+ @property
+ def runnable(self):
+ """A list of processes that are runnable w/o duration restrictions."""
+ return self.runnable_at(self._clock.time())
+
+ @property
+ def waiting(self):
+ """A list of processes that are waiting w/o duration restrictions."""
+ return self.waiting_at(self._clock.time())
+
+ def runnable_at(self, timestamp):
+ return set(filter(partial(self.is_ready, timestamp=timestamp), self._planner.runnable))
+
+ def waiting_at(self, timestamp):
+ return set(filter(partial(self.is_waiting, timestamp=timestamp), self._planner.runnable))
+
+ def min_wait(self, timestamp=None):
+ """Return the current wait time for the next process to become runnable, 0 if something is ready
+ immediately, or sys.float.max if there are no waiters."""
+ if self.runnable_at(timestamp if timestamp is not None else self._clock.time()):
+ return 0
+ waits = [self.get_wait(waiter, timestamp) for waiter in self.waiting_at(timestamp)]
+ return min(waits) if waits else self.INFINITY
+
+ def set_running(self, process):
+ self._planner.set_running(process)
+
+ def add_failure(self, process, timestamp=None):
+ """Increment the failure count of a process, and reset it to runnable if maximum number of
+ failures has not been reached, or mark it as failed otherwise (ephemeral processes do not
+ count towards the success of a task, and are hence marked finished instead)"""
+ timestamp = timestamp if timestamp is not None else self._clock.time()
+ self._last_terminal[process] = timestamp
+ self._failures[process] += 1
+ self.failure_transition(process)
+
+ def has_reached_run_limit(self, process):
+ return (self._successes[process] + self._failures[process]) >= self.TOTAL_RUN_LIMIT
+
+ def failure_transition(self, process):
+ if self.has_reached_run_limit(process):
+ self._planner.set_failed(process)
+ return
+
+ if self._attributes[process].max_failures == 0 or (
+ self._failures[process] < self._attributes[process].max_failures):
+ self._planner.reset(process)
+ elif self._attributes[process].is_ephemeral:
+ self._planner.set_finished(process)
+ else:
+ self._planner.set_failed(process)
+
+ def add_success(self, process, timestamp=None):
+ """Reset a process to runnable if it is a daemon, or mark it as finished otherwise."""
+ timestamp = timestamp if timestamp is not None else self._clock.time()
+ self._last_terminal[process] = timestamp
+ self._successes[process] += 1
+ self.success_transition(process)
+
+ def success_transition(self, process):
+ if self.has_reached_run_limit(process):
+ self._planner.set_failed(process)
+ return
+
+ if not self._attributes[process].is_daemon:
+ self._planner.set_finished(process)
+ else:
+ self._planner.reset(process)
+
+ def set_failed(self, process):
+ """Force a process to be in failed state. E.g. kill -9 and you want it pinned failed."""
+ self._planner.set_failed(process)
+
+ def lost(self, process):
+ """Mark a process as lost. This sets its runnable state back to the previous runnable
+ state and does not increment its failure count."""
+ self._planner.reset(process)
+
+ def is_complete(self):
+ """A task is complete if all ordinary processes are finished or failed (there may still be
+ running ephemeral processes)"""
+ terminals = self._planner.finished.union(self._planner.failed).union(self._ephemerals)
+ return self._planner.processes == terminals
+
+ # TODO(wickman) Should we consider subclassing again?
+ @property
+ def failed(self):
+ return self._planner.failed
+
+ @property
+ def running(self):
+ return self._planner.running
+
+ @property
+ def finished(self):
+ return self._planner.finished
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/apache/thermos/config/BUILD
----------------------------------------------------------------------
diff --git a/src/main/python/apache/thermos/config/BUILD b/src/main/python/apache/thermos/config/BUILD
new file mode 100644
index 0000000..57e7f0d
--- /dev/null
+++ b/src/main/python/apache/thermos/config/BUILD
@@ -0,0 +1,24 @@
+import os
+
+python_library(
+ name = 'schema',
+ sources = globs('*.py'),
+ dependencies = [
+ pants('aurora/twitterdeps/src/python/twitter/common/lang'),
+ pants('src/main/python/twitter/thermos:pystachio'),
+ pants('src/main/python/twitter/thermos/common:planner')
+ ],
+)
+
+python_library(
+ name = 'config',
+ dependencies = [
+ pants(':schema'),
+ pants('src/main/python/twitter/thermos/common'), # cover common:planner
+ ],
+ provides = setup_py(
+ name = 'twitter.thermos.config',
+ version = open(os.path.join(get_buildroot(), '.auroraversion')).read().strip().lower(),
+ description = 'Thermos configuration schema and loader.',
+ )
+)
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/apache/thermos/config/__init__.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/thermos/config/__init__.py b/src/main/python/apache/thermos/config/__init__.py
new file mode 100644
index 0000000..e69de29
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/apache/thermos/config/bin/config_load.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/thermos/config/bin/config_load.py b/src/main/python/apache/thermos/config/bin/config_load.py
new file mode 100644
index 0000000..c0680ec
--- /dev/null
+++ b/src/main/python/apache/thermos/config/bin/config_load.py
@@ -0,0 +1,47 @@
+import copy
+import sys
+import json
+import pprint
+
+from twitter.common import app
+from twitter.thermos.config.loader import ThermosConfigLoader
+
+def main(args):
+ """
+ Given .thermos configs, loads them and prints out information about them.
+ """
+
+ if len(args) == 0:
+ app.help()
+
+ for arg in args:
+ print '\nparsing %s\n' % arg
+ tc = ThermosConfigLoader.load(arg)
+
+ for task_wrapper in tc.tasks():
+ task = task_wrapper.task
+ if not task.has_name():
+ print 'Found unnamed task! Skipping...'
+ continue
+
+ print 'Task: %s [check: %s]' % (task.name(), task.check())
+ if not task.processes():
+ print ' No processes.'
+ else:
+ print ' Processes:'
+ for proc in task.processes():
+ print ' %s' % proc
+
+ ports = task_wrapper.ports()
+ if not ports:
+ print ' No unbound ports.'
+ else:
+ print ' Ports:'
+ for port in ports:
+ print ' %s' % port
+
+ print 'raw:'
+ pprint.pprint(json.loads(task_wrapper.to_json()))
+
+app.set_usage("%s config1 config2 ..." % app.name())
+app.main()
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/apache/thermos/config/bin/config_repl.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/thermos/config/bin/config_repl.py b/src/main/python/apache/thermos/config/bin/config_repl.py
new file mode 100644
index 0000000..4a5c797
--- /dev/null
+++ b/src/main/python/apache/thermos/config/bin/config_repl.py
@@ -0,0 +1,3 @@
+from twitter.thermos.config.schema import *
+from code import interact
+interact('Thermos Config REPL', local=locals())
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/apache/thermos/config/dsl.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/thermos/config/dsl.py b/src/main/python/apache/thermos/config/dsl.py
new file mode 100644
index 0000000..e69de29
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/apache/thermos/config/loader.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/thermos/config/loader.py b/src/main/python/apache/thermos/config/loader.py
new file mode 100644
index 0000000..54721c3
--- /dev/null
+++ b/src/main/python/apache/thermos/config/loader.py
@@ -0,0 +1,179 @@
+import copy
+import json
+import os
+import re
+import textwrap
+
+from twitter.common.dirutil import safe_open
+from twitter.common.lang import Compatibility
+from twitter.thermos.common.planner import TaskPlanner
+from twitter.thermos.config.schema import Task
+
+from pystachio import Ref
+from pystachio.config import Config
+
+
+class PortExtractor(object):
+ class InvalidPorts(Exception): pass
+
+ @staticmethod
+ def extract(obj):
+ port_scope = Ref.from_address('thermos.ports')
+ _, uninterp = obj.interpolate()
+ ports = []
+ for ref in uninterp:
+ subscope = port_scope.scoped_to(ref)
+ if subscope is not None:
+ if not subscope.is_index():
+ raise PortExtractor.InvalidPorts(
+ 'Bad port specification "%s" (should be of form "thermos.ports[name]"' % ref.address())
+ ports.append(subscope.action().value)
+ return ports
+
+
+class ThermosProcessWrapper(object):
+ # >=1 characters && anything but NULL and '/'
+ VALID_PROCESS_NAME_RE = re.compile(r'^[^./][^/]*$')
+ class InvalidProcess(Exception): pass
+
+ def __init__(self, process):
+ self._process = process
+
+ def ports(self):
+ try:
+ return PortExtractor.extract(self._process)
+ except PortExtractor.InvalidPorts:
+ raise self.InvalidProcess('Process has invalid ports scoping!')
+
+ @staticmethod
+ def assert_valid_process_name(name):
+ if not ThermosProcessWrapper.VALID_PROCESS_NAME_RE.match(name):
+ raise ThermosProcessWrapper.InvalidProcess('Invalid process name: %s' % name)
+
+
+class ThermosTaskWrapper(object):
+ class InvalidTask(Exception): pass
+
+ def __init__(self, task, bindings=None, strict=True):
+ if bindings:
+ task = task.bind(*bindings)
+ if not task.check().ok() and strict:
+ raise ThermosTaskWrapper.InvalidTask(task.check().message())
+ self._task = task
+
+ @property
+ def task(self):
+ return self._task
+
+ def ports(self):
+ ti, _ = self._task.interpolate()
+ ports = set()
+ if ti.has_processes():
+ for process in ti.processes():
+ try:
+ ports.update(ThermosProcessWrapper(process).ports())
+ except ThermosProcessWrapper.InvalidProcess:
+ raise self.InvalidTask('Task has invalid process: %s' % process)
+ return ports
+
+ def to_json(self):
+ return json.dumps(self._task.get())
+
+ def to_file(self, filename):
+ ti, _ = self._task.interpolate()
+ with safe_open(filename, 'w') as fp:
+ json.dump(ti.get(), fp)
+
+ @staticmethod
+ def from_file(filename, **kw):
+ try:
+ with safe_open(filename) as fp:
+ task = Task.json_load(fp)
+ return ThermosTaskWrapper(task, **kw)
+ except Exception as e:
+ return None
+
+
+# TODO(wickman) These should be validators pushed onto ThermosConfigLoader.plugins
+class ThermosTaskValidator(object):
+ class InvalidTaskError(Exception): pass
+
+ @classmethod
+ def assert_valid_task(cls, task):
+ cls.assert_valid_names(task)
+ cls.assert_typecheck(task)
+ cls.assert_valid_plan(task)
+
+ @classmethod
+ def assert_valid_plan(cls, task):
+ try:
+ TaskPlanner(task, process_filter=lambda proc: proc.final().get() == False)
+ TaskPlanner(task, process_filter=lambda proc: proc.final().get() == True)
+ except TaskPlanner.InvalidSchedule as e:
+ raise cls.InvalidTaskError('Task has invalid plan: %s' % e)
+
+ @classmethod
+ def assert_valid_names(cls, task):
+ for process in task.processes():
+ name = process.name().get()
+ try:
+ ThermosProcessWrapper.assert_valid_process_name(name)
+ except ThermosProcessWrapper.InvalidProcess as e:
+ raise cls.InvalidTaskError('Task has invalid process: %s' % e)
+
+ @classmethod
+ def assert_typecheck(cls, task):
+ typecheck = task.check()
+ if not typecheck.ok():
+ raise cls.InvalidTaskError('Failed to fully evaluate task: %s' %
+ typecheck.message())
+
+ @classmethod
+ def assert_valid_ports(cls, task, portmap):
+ for port in ThermosTaskWrapper(task).ports():
+ if port not in portmap:
+ raise cls.InvalidTaskError('Task requires unbound port %s!' % port)
+
+ @classmethod
+ def assert_same_task(cls, spec, task):
+ active_task = spec.given(state='active').getpath('task_path')
+ if os.path.exists(active_task):
+ task_on_disk = ThermosTaskWrapper.from_file(active_task)
+ if not task_on_disk or task_on_disk.task != task:
+ raise cls.InvalidTaskError('Task differs from on disk copy: %r vs %r' % (
+ task_on_disk.task if task_on_disk else None, task))
+
+
+class ThermosConfigLoader(object):
+ SCHEMA = textwrap.dedent("""
+ from pystachio import *
+ from twitter.thermos.config.schema import *
+
+ __TASKS = []
+
+ def export(task):
+ __TASKS.append(Task(task) if isinstance(task, dict) else task)
+ """)
+
+ @classmethod
+ def load(cls, loadable, **kw):
+ config = Config(loadable, schema=cls.SCHEMA)
+ return cls(ThermosTaskWrapper(task, **kw) for task in config.environment['__TASKS'])
+
+ @classmethod
+ def load_json(cls, filename, **kw):
+ tc = cls()
+ task = ThermosTaskWrapper.from_file(filename, **kw)
+ if task:
+ ThermosTaskValidator.assert_valid_task(task.task())
+ tc.add_task(task)
+ return tc
+
+ def __init__(self, exported_tasks=None):
+ self._exported_tasks = exported_tasks or []
+
+ def add_task(self, task):
+ self._exported_tasks.append(task)
+
+ def tasks(self):
+ return self._exported_tasks
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/apache/thermos/config/schema.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/thermos/config/schema.py b/src/main/python/apache/thermos/config/schema.py
new file mode 100644
index 0000000..2ee38a1
--- /dev/null
+++ b/src/main/python/apache/thermos/config/schema.py
@@ -0,0 +1,2 @@
+from .schema_base import *
+from .schema_helpers import *
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/apache/thermos/config/schema_base.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/thermos/config/schema_base.py b/src/main/python/apache/thermos/config/schema_base.py
new file mode 100644
index 0000000..a47cfdf
--- /dev/null
+++ b/src/main/python/apache/thermos/config/schema_base.py
@@ -0,0 +1,75 @@
+from pystachio import (
+ Boolean,
+ Default,
+ Empty,
+ Float,
+ Integer,
+ List,
+ Map,
+ Required,
+ String,
+ Struct
+)
+
+
+# Define constants for resources
+BYTES = 1
+KB = 1024 * BYTES
+MB = 1024 * KB
+GB = 1024 * MB
+TB = 1024 * GB
+
+
+class ThermosContext(Struct):
+ # TODO(wickman) Move the underlying replacement mechanism to %port% replacements
+ ports = Map(String, Integer)
+
+ # TODO(wickman) Move the underlying replacement mechanism to %task_id%
+ task_id = String
+
+ # TODO(wickman) Move underlying mechanism to %user%
+ user = String
+
+
+class Resources(Struct):
+ cpu = Required(Float)
+ ram = Required(Integer)
+ disk = Required(Integer)
+
+
+class Constraint(Struct):
+ order = List(String)
+
+
+class Process(Struct):
+ cmdline = Required(String)
+ name = Required(String)
+
+ # This is currently unused but reserved for future use by Thermos.
+ resources = Resources
+
+ # optionals
+ max_failures = Default(Integer, 1) # maximum number of failed process runs
+ # before process is failed.
+ daemon = Default(Boolean, False)
+ ephemeral = Default(Boolean, False)
+ min_duration = Default(Integer, 5) # integer seconds
+ final = Default(Boolean, False) # if this process should be a finalizing process
+ # that should always be run after regular processes
+
+
+class Task(Struct):
+ name = Default(String, '{{processes[0].name}}')
+ processes = List(Process)
+
+ # optionals
+ constraints = Default(List(Constraint), [])
+ resources = Resources
+ max_failures = Default(Integer, 1) # maximum number of failed processes before task is failed.
+ max_concurrency = Default(Integer, 0) # 0 is infinite concurrency.
+ # > 0 is max concurrent processes.
+ finalization_wait = Default(Integer, 30) # the amount of time in seconds we allocate to run the
+ # finalization schedule.
+
+ # TODO(jon): remove/replace with proper solution to MESOS-3546
+ user = String
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/apache/thermos/config/schema_helpers.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/thermos/config/schema_helpers.py b/src/main/python/apache/thermos/config/schema_helpers.py
new file mode 100644
index 0000000..502de5a
--- /dev/null
+++ b/src/main/python/apache/thermos/config/schema_helpers.py
@@ -0,0 +1,247 @@
+"""Helpers for composing Thermos workflows."""
+import itertools
+
+from twitter.common.lang import Compatibility
+
+from .schema_base import (
+ Constraint,
+ GB,
+ Process,
+ Resources,
+ Task,
+)
+
+from pystachio import Empty, List
+
+
+__all__ = (
+ # shorthand for process ordering constraint
+ 'order',
+
+ # task combinators
+ 'combine_tasks', # merge N tasks in parallel
+ 'concat_tasks', # serialize N tasks
+
+ # options helpers
+ 'java_options',
+ 'python_options',
+
+ # the automatically-sequential version of a task
+ 'SequentialTask',
+
+ # create a simple task from a command line + name
+ 'SimpleTask',
+
+ # helper classes
+ 'Options',
+ 'Processes',
+ 'Tasks',
+ 'Units',
+)
+
+
+class Units(object):
+ """Helpers for base units of Tasks and Processes."""
+
+ @classmethod
+ def optional_resources(cls, resources):
+ return Resources() if resources is Empty else resources
+
+ @classmethod
+ def resources_sum(cls, *resources):
+ """Add two Resources objects together."""
+ def add_unit(f1, f2):
+ return (0 if f1 is Empty else f1.get()) + (0 if f2 is Empty else f2.get())
+
+ def add(r1, r2):
+ return Resources(cpu=add_unit(r1.cpu(), r2.cpu()),
+ ram=add_unit(r1.ram(), r2.ram()),
+ disk=add_unit(r1.disk(), r2.disk()))
+
+ return reduce(add, map(cls.optional_resources, resources), Resources(cpu=0, ram=0, disk=0))
+
+ @classmethod
+ def resources_max(cls, resources):
+ """Return a Resource object that is the maximum of the inputs along each
+ resource dimension."""
+ def max_unit(f1, f2):
+ return max(0 if f1 is Empty else f1.get(), 0 if f2 is Empty else f2.get())
+
+ def resource_max(r1, r2):
+ return Resources(cpu=max_unit(r1.cpu(), r2.cpu()),
+ ram=max_unit(r1.ram(), r2.ram()),
+ disk=max_unit(r1.disk(), r2.disk()))
+
+ return reduce(resource_max,
+ map(cls.optional_resources, resources), Resources(cpu=0, ram=0, disk=0))
+
+ @classmethod
+ def processes_merge(cls, tasks):
+ """Return a deduped list of the processes from all tasks."""
+ return list(set(itertools.chain.from_iterable(task.processes() for task in tasks)))
+
+ @classmethod
+ def constraints_merge(cls, tasks):
+ """Return a deduped list of the constraints from all tasks."""
+ return list(set(itertools.chain.from_iterable(task.constraints() for task in tasks)))
+
+
+class Processes(object):
+ """Helper class for Process objects."""
+
+ @classmethod
+ def _process_name(cls, process):
+ if isinstance(process, Process):
+ return process.name()
+ elif isinstance(process, Compatibility.string):
+ return process
+ raise ValueError("Unknown value for process order: %s" % repr(process))
+
+ @classmethod
+ def order(cls, *processes):
+ """Given a list of processes, return the list of constraints that keeps them in order, e.g.
+ order(p1, p2, p3) => [Constraint(order=[p1.name(), p2.name(), p3.name()])].
+
+ Similarly, concatenation operations are valid, i.e.
+ order(p1, p2) + order(p2, p3) <=> order(p1, p2, p3)
+ """
+ return [Constraint(order=[cls._process_name(p) for p in processes])]
+
+
+class Tasks(object):
+ """Helper class for Task objects."""
+
+ SIMPLE_CPU = 1.0
+ SIMPLE_RAM = 1 * GB
+ SIMPLE_DISK = 1 * GB
+
+ @classmethod
+ def _combine_processes(cls, *tasks):
+ """Given multiple tasks, merge their processes together, retaining the identity of the first
+ task."""
+ if len(tasks) == 0:
+ return Task()
+ head_task = tasks[-1]
+ return head_task(processes=Units.processes_merge(tasks))
+
+ @classmethod
+ def combine(cls, *tasks, **kw):
+ """Given multiple tasks, return a Task that runs all processes in parallel."""
+ if len(tasks) == 0:
+ return Task()
+ base = cls._combine_processes(*tasks)
+ return base(
+ resources=Units.resources_sum(*(task.resources() for task in tasks)),
+ constraints=Units.constraints_merge(tasks),
+ **kw
+ )
+
+ @classmethod
+ def concat(cls, *tasks, **kw):
+ """Given tasks T1...TN, return a single Task that runs all processes such that
+ all processes in Tk run before any process in Tk+1."""
+ if len(tasks) == 0:
+ return Task()
+ base = cls._combine_processes(*tasks)
+ base = base(resources=Units.resources_max(task.resources() for task in tasks))
+ base_constraints = Units.constraints_merge(tasks)
+ # TODO(wickman) be smarter about this in light of existing constraints
+ for (t1, t2) in zip(tasks[0:-1], tasks[1:]):
+ for p1 in t1.processes():
+ for p2 in t2.processes():
+ if p1 != p2:
+ base_constraints.extend(Processes.order(p1, p2))
+ return base(constraints=base_constraints, **kw)
+
+ @classmethod
+ def simple(cls, name, command):
+ """Create a usable Task from a provided name + command line and a default set of resources"""
+ return Task(
+ name=name,
+ processes=[Process(name=name, cmdline=command)],
+ resources=Resources(cpu=cls.SIMPLE_CPU,
+ ram=cls.SIMPLE_RAM,
+ disk=cls.SIMPLE_DISK))
+
+ @classmethod
+ def sequential(cls, task):
+ """Add a constraint that makes all processes within a task run sequentially."""
+ def maybe_constrain(task):
+ return {'constraints': order(*task.processes())} if task.processes() is not Empty else {}
+ if task.constraints() is Empty or task.constraints() == List(Constraint)([]):
+ return task(**maybe_constrain(task))
+ raise ValueError('Cannot turn a Task with existing constraints into a SequentialTask!')
+
+
+class Options(object):
+ """Helper class for constructing command-line arguments."""
+
+ @classmethod
+ def render_option(cls, short_prefix, long_prefix, option, value=None):
+ option = '%s%s' % (short_prefix if len(option) == 1 else long_prefix, option)
+ return '%s %s' % (option, value) if value else option
+
+ @classmethod
+ def render_options(cls, short_prefix, long_prefix, *options, **kw_options):
+ renders = []
+
+ for option in options:
+ if isinstance(option, Compatibility.string):
+ renders.append(cls.render_option(short_prefix, long_prefix, option))
+ elif isinstance(option, dict):
+ # preserve order in case option is an OrderedDict, rather than recursing with **option
+ for argument, value in option.items():
+ renders.append(cls.render_option(short_prefix, long_prefix, argument, value))
+ else:
+ raise ValueError('Got an unexpected argument to render_options: %s' % repr(option))
+
+ for argument, value in kw_options.items():
+ renders.append(cls.render_option(short_prefix, long_prefix, argument, value))
+
+ return renders
+
+ @classmethod
+ def java(cls, *options, **kw_options):
+ """
+ Given a set of arguments, keyword arguments or dictionaries, render
+ command-line parameters accordingly. For example:
+
+ java_options('a', 'b') == '-a -b'
+ java_options({
+ 'a': 23,
+ 'b': 'foo'
+ }) == '-a 23 -b foo'
+ java_options(a=23, b='foo') == '-a 23 -b foo'
+ """
+ return ' '.join(cls.render_options('-', '-', *options, **kw_options))
+
+ @classmethod
+ def python(cls, *options, **kw_options):
+ """
+ Given a set of arguments, keyword arguments or dictionaries, render
+ command-line parameters accordingly. Single letter parameters are
+ rendered with single '-'. For example:
+
+ python_options('a', 'boo') == '-a --boo'
+ python_options({
+ 'a': 23,
+ 'boo': 'foo'
+ }) == '-a 23 --boo foo'
+ python_options(a=23, boo='foo') == '-a 23 --boo foo'
+ """
+ return ' '.join(cls.render_options('-', '--', *options, **kw_options))
+
+
+def SimpleTask(name, command):
+ """A simple command-line Task with default resources"""
+ return Tasks.simple(name, command)
+
+def SequentialTask(*args, **kw):
+ """A Task whose processes are always sequential."""
+ return Tasks.sequential(Task(*args, **kw))
+
+python_options = Options.python
+java_options = Options.java
+combine_tasks = Tasks.combine
+concat_tasks = Tasks.concat
+order = Processes.order
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/apache/thermos/core/BUILD
----------------------------------------------------------------------
diff --git a/src/main/python/apache/thermos/core/BUILD b/src/main/python/apache/thermos/core/BUILD
new file mode 100644
index 0000000..d260112
--- /dev/null
+++ b/src/main/python/apache/thermos/core/BUILD
@@ -0,0 +1,88 @@
+import os
+
+python_library(
+ name = 'helper',
+ sources = ['helper.py'],
+ dependencies = [
+ pants('src/main/python/twitter/thermos:psutil'),
+ pants('aurora/twitterdeps/src/python/twitter/common/dirutil'),
+ pants('aurora/twitterdeps/src/python/twitter/common/log'),
+ pants('aurora/twitterdeps/src/python/twitter/common/quantity'),
+ pants('aurora/twitterdeps/src/python/twitter/common/recordio:recordio-thrift'),
+ pants('src/main/python/twitter/thermos/common:ckpt'),
+ pants('src/main/python/twitter/thermos/common:path'),
+ pants('src/main/thrift/com/twitter/thermos:py-thrift'),
+ ]
+)
+
+python_library(
+ name = 'inspector',
+ sources = ['inspector.py'],
+ dependencies = [
+ pants(':muxer'),
+ pants('aurora/twitterdeps/src/python/twitter/common/recordio:recordio-thrift'),
+ pants('src/main/python/twitter/thermos/common:ckpt'),
+ pants('src/main/python/twitter/thermos/common:path'),
+ pants('src/main/thrift/com/twitter/thermos:py-thrift'),
+ ]
+)
+
+python_library(
+ name = 'muxer',
+ sources = ['muxer.py'],
+ dependencies = [
+ pants('aurora/twitterdeps/src/python/twitter/common/log'),
+ pants('aurora/twitterdeps/src/python/twitter/common/recordio:recordio-thrift'),
+ pants('src/main/thrift/com/twitter/thermos:py-thrift'),
+ ]
+)
+
+python_library(
+ name = 'process',
+ sources = ['process.py'],
+ dependencies = [
+ pants('aurora/twitterdeps/src/python/twitter/common/dirutil'),
+ pants('aurora/twitterdeps/src/python/twitter/common/lang'),
+ pants('aurora/twitterdeps/src/python/twitter/common/log'),
+ pants('aurora/twitterdeps/src/python/twitter/common/quantity'),
+ pants('aurora/twitterdeps/src/python/twitter/common/recordio:recordio-thrift'),
+ pants('src/main/thrift/com/twitter/thermos:py-thrift'),
+ ]
+)
+
+python_library(
+ name = 'runner',
+ sources = ['__init__.py', 'runner.py'],
+ dependencies = [
+ pants(':helper'),
+ pants(':muxer'),
+ pants(':process'),
+ pants('aurora/twitterdeps/src/python/twitter/common/dirutil'),
+ pants('aurora/twitterdeps/src/python/twitter/common/log'),
+ pants('aurora/twitterdeps/src/python/twitter/common/quantity'),
+ pants('aurora/twitterdeps/src/python/twitter/common/recordio:recordio-thrift'),
+ pants('src/main/python/twitter/thermos:psutil'),
+ pants('src/main/python/twitter/thermos/common:ckpt'),
+ pants('src/main/python/twitter/thermos/common:path'),
+ pants('src/main/python/twitter/thermos/common:planner'),
+ pants('src/main/python/twitter/thermos/config:schema'),
+ pants('src/main/thrift/com/twitter/thermos:py-thrift'),
+ ]
+)
+
+python_library(
+ name = 'core',
+ dependencies = [
+ pants(':inspector'),
+ pants(':runner'),
+
+ # covering libs
+ pants('src/main/python/twitter/thermos/common'),
+ pants('src/main/python/twitter/thermos/config'),
+ ],
+ provides = setup_py(
+ name = 'twitter.thermos.core',
+ version = open(os.path.join(get_buildroot(), '.auroraversion')).read().strip().lower(),
+ description = 'The Thermos core state machine.',
+ )
+)
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/apache/thermos/core/__init__.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/thermos/core/__init__.py b/src/main/python/apache/thermos/core/__init__.py
new file mode 100644
index 0000000..de40ea7
--- /dev/null
+++ b/src/main/python/apache/thermos/core/__init__.py
@@ -0,0 +1 @@
+__import__('pkg_resources').declare_namespace(__name__)
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/apache/thermos/core/helper.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/thermos/core/helper.py b/src/main/python/apache/thermos/core/helper.py
new file mode 100644
index 0000000..3587a85
--- /dev/null
+++ b/src/main/python/apache/thermos/core/helper.py
@@ -0,0 +1,387 @@
+from contextlib import closing
+import errno
+import os
+import signal
+import time
+
+from twitter.common import log
+from twitter.common.dirutil import lock_file, safe_mkdir
+from twitter.common.quantity import Amount, Time
+from twitter.common.recordio import ThriftRecordWriter
+from twitter.thermos.common.ckpt import CheckpointDispatcher
+from twitter.thermos.common.path import TaskPath
+
+from gen.twitter.thermos.ttypes import (
+ ProcessState,
+ ProcessStatus,
+ RunnerCkpt,
+ TaskState,
+ TaskStatus)
+
+import psutil
+
+
+class TaskKiller(object):
+ """
+ Task killing interface.
+ """
+
+ def __init__(self, task_id, checkpoint_root):
+ self._task_id = task_id
+ self._checkpoint_root = checkpoint_root
+
+ def kill(self, force=True):
+ TaskRunnerHelper.kill(self._task_id, self._checkpoint_root, force=force,
+ terminal_status=TaskState.KILLED)
+
+ def lose(self, force=True):
+ TaskRunnerHelper.kill(self._task_id, self._checkpoint_root, force=force,
+ terminal_status=TaskState.LOST)
+
+
+class TaskRunnerHelper(object):
+ """
+ TaskRunner helper methods that can be operated directly upon checkpoint
+ state. These operations do not require knowledge of the underlying
+ task.
+
+ TaskRunnerHelper is sort of a mishmash of "checkpoint-only" operations and
+ the "Process Platform" stuff that started to get pulled into process.py
+
+ This really needs some hard design thought to see if it can be extracted out
+ even further.
+ """
+ class Error(Exception): pass
+ class PermissionError(Error): pass
+
+ # Maximum drift between when the system says a task was forked and when we checkpointed
+ # its fork_time (used as a heuristic to determine a forked task is really ours instead of
+ # a task with coincidentally the same PID but just wrapped around.)
+ MAX_START_TIME_DRIFT = Amount(10, Time.SECONDS)
+
+ @staticmethod
+ def get_actual_user():
+ import getpass, pwd
+ try:
+ pwd_entry = pwd.getpwuid(os.getuid())
+ except KeyError:
+ return getpass.getuser()
+ return pwd_entry[0]
+
+ @staticmethod
+ def process_from_name(task, process_name):
+ if task.has_processes():
+ for process in task.processes():
+ if process.name().get() == process_name:
+ return process
+ return None
+
+ @classmethod
+ def this_is_really_our_pid(cls, process, current_user, start_time):
+ """
+ A heuristic to make sure that this is likely the pid that we own/forked. Necessary
+ because of pid-space wrapping. We don't want to go and kill processes we don't own,
+ especially if the killer is running as root.
+
+ process: psutil.Process representing the process to check
+ current_user: user expected to own the process
+ start_time: time at which it's expected the process has started
+
+ Raises:
+ psutil.NoSuchProcess - if the Process supplied no longer exists
+ """
+ if process.username != current_user:
+ log.info("Expected pid %s to be ours but the pid user is %s and we're %s" % (
+ process.pid, process.username, current_user))
+ return False
+
+ if abs(start_time - process.create_time) >= cls.MAX_START_TIME_DRIFT.as_(Time.SECONDS):
+ log.info("Expected pid %s start time to be %s but it's %s" % (
+ process.pid, start_time, process.create_time))
+ return False
+
+ return True
+
+ @classmethod
+ def scan_process(cls, state, process_name):
+ """
+ Given a RunnerState and a process_name, return the following:
+ (coordinator pid, process pid, process tree)
+ (int or None, int or None, set)
+
+ """
+ process_run = state.processes[process_name][-1]
+ process_owner = state.header.user
+
+ coordinator_pid, pid, tree = None, None, set()
+
+ if process_run.coordinator_pid:
+ try:
+ coordinator_process = psutil.Process(process_run.coordinator_pid)
+ if cls.this_is_really_our_pid(coordinator_process, process_owner, process_run.fork_time):
+ coordinator_pid = process_run.coordinator_pid
+ except psutil.NoSuchProcess:
+ log.info(' Coordinator %s [pid: %s] completed.' % (process_run.process,
+ process_run.coordinator_pid))
+ except psutil.Error as err:
+ log.warning(' Error gathering information on pid %s: %s' % (process_run.coordinator_pid,
+ err))
+
+ if process_run.pid:
+ try:
+ process = psutil.Process(process_run.pid)
+ if cls.this_is_really_our_pid(process, process_owner, process_run.start_time):
+ pid = process.pid
+ except psutil.NoSuchProcess:
+ log.info(' Process %s [pid: %s] completed.' % (process_run.process, process_run.pid))
+ except psutil.Error as err:
+ log.warning(' Error gathering information on pid %s: %s' % (process_run.pid, err))
+ else:
+ if pid:
+ try:
+ tree = set(proc.pid for proc in process.get_children(recursive=True))
+ except psutil.Error:
+ log.warning(' Error gathering information on children of pid %s' % pid)
+
+ return (coordinator_pid, pid, tree)
+
+ @classmethod
+ def scantree(cls, state):
+ """
+ Scan the process tree associated with the provided task state.
+
+ Returns a dictionary of process name => (coordinator pid, pid, pid children)
+ If the coordinator is no longer active, coordinator pid will be None. If the
+ forked process is no longer active, pid will be None and its children will be
+ an empty set.
+ """
+ return dict((process_name, cls.scan_process(state, process_name))
+ for process_name in state.processes)
+
+ @classmethod
+ def safe_signal(cls, pid, sig=signal.SIGTERM):
+ try:
+ os.kill(pid, sig)
+ except OSError as e:
+ if e.errno not in (errno.ESRCH, errno.EPERM):
+ log.error('Unexpected error in os.kill: %s' % e)
+ except Exception as e:
+ log.error('Unexpected error in os.kill: %s' % e)
+
+ @classmethod
+ def terminate_pid(cls, pid):
+ cls.safe_signal(pid, signal.SIGTERM)
+
+ @classmethod
+ def kill_pid(cls, pid):
+ cls.safe_signal(pid, signal.SIGKILL)
+
+ @classmethod
+ def kill_group(cls, pgrp):
+ cls.safe_signal(-pgrp, signal.SIGKILL)
+
+ @classmethod
+ def _get_process_tuple(cls, state, process_name):
+ assert process_name in state.processes and len(state.processes[process_name]) > 0
+ return cls.scan_process(state, process_name)
+
+ @classmethod
+ def _get_coordinator_group(cls, state, process_name):
+ assert process_name in state.processes and len(state.processes[process_name]) > 0
+ return state.processes[process_name][-1].coordinator_pid
+
+ @classmethod
+ def terminate_process(cls, state, process_name):
+ log.debug('TaskRunnerHelper.terminate_process(%s)' % process_name)
+ _, pid, _ = cls._get_process_tuple(state, process_name)
+ if pid:
+ log.debug(' => SIGTERM pid %s' % pid)
+ cls.terminate_pid(pid)
+ return bool(pid)
+
+ @classmethod
+ def kill_process(cls, state, process_name):
+ log.debug('TaskRunnerHelper.kill_process(%s)' % process_name)
+ coordinator_pgid = cls._get_coordinator_group(state, process_name)
+ coordinator_pid, pid, tree = cls._get_process_tuple(state, process_name)
+ # This is super dangerous. TODO(wickman) Add a heuristic that determines
+ # that 1) there are processes that currently belong to this process group
+ # and 2) those processes have inherited the coordinator checkpoint filehandle
+ # This way we validate that it is in fact the process group we expect.
+ if coordinator_pgid:
+ log.debug(' => SIGKILL coordinator group %s' % coordinator_pgid)
+ cls.kill_group(coordinator_pgid)
+ if coordinator_pid:
+ log.debug(' => SIGKILL coordinator %s' % coordinator_pid)
+ cls.kill_pid(coordinator_pid)
+ if pid:
+ log.debug(' => SIGKILL pid %s' % pid)
+ cls.kill_pid(pid)
+ for child in tree:
+ log.debug(' => SIGKILL child %s' % child)
+ cls.kill_pid(child)
+ return bool(coordinator_pid or pid or tree)
+
+ @classmethod
+ def kill_runner(cls, state):
+ log.debug('TaskRunnerHelper.kill_runner()')
+ if not state or not state.statuses:
+ raise cls.Error('Could not read state!')
+ pid = state.statuses[-1].runner_pid
+ if pid == os.getpid():
+ raise cls.Error('Unwilling to commit seppuku.')
+ try:
+ os.kill(pid, signal.SIGKILL)
+ return True
+ except OSError as e:
+ if e.errno == errno.EPERM:
+ # Permission denied
+ return False
+ elif e.errno == errno.ESRCH:
+ # pid no longer exists
+ return True
+ raise
+
+ @classmethod
+ def open_checkpoint(cls, filename, force=False, state=None):
+ """
+ Acquire a locked checkpoint stream.
+ """
+ safe_mkdir(os.path.dirname(filename))
+ fp = lock_file(filename, "a+")
+ if fp in (None, False):
+ if force:
+ log.info('Found existing runner, forcing leadership forfeit.')
+ state = state or CheckpointDispatcher.from_file(filename)
+ if cls.kill_runner(state):
+ log.info('Successfully killed leader.')
+ # TODO(wickman) Blocking may not be the best idea here. Perhaps block up to
+ # a maximum timeout. But blocking is necessary because os.kill does not immediately
+ # release the lock if we're in force mode.
+ fp = lock_file(filename, "a+", blocking=True)
+ else:
+ log.error('Found existing runner, cannot take control.')
+ if fp in (None, False):
+ raise cls.PermissionError('Could not open locked checkpoint: %s, lock_file = %s' %
+ (filename, fp))
+ ckpt = ThriftRecordWriter(fp)
+ ckpt.set_sync(True)
+ return ckpt
+
+ @classmethod
+ def kill(cls, task_id, checkpoint_root, force=False,
+ terminal_status=TaskState.KILLED, clock=time):
+ """
+ An implementation of Task killing that doesn't require a fully hydrated TaskRunner object.
+ Terminal status must be either KILLED or LOST state.
+ """
+ if terminal_status not in (TaskState.KILLED, TaskState.LOST):
+ raise cls.Error('terminal_status must be KILLED or LOST (got %s)' %
+ TaskState._VALUES_TO_NAMES.get(terminal_status) or terminal_status)
+ pathspec = TaskPath(root=checkpoint_root, task_id=task_id)
+ checkpoint = pathspec.getpath('runner_checkpoint')
+ state = CheckpointDispatcher.from_file(checkpoint)
+
+ if state is None or state.header is None or state.statuses is None:
+ if force:
+ log.error('Task has uninitialized TaskState - forcibly finalizing')
+ cls.finalize_task(pathspec)
+ return
+ else:
+ log.error('Cannot update states in uninitialized TaskState!')
+ return
+
+ ckpt = cls.open_checkpoint(checkpoint, force=force, state=state)
+
+ def write_task_state(state):
+ update = TaskStatus(state=state, timestamp_ms=int(clock.time() * 1000),
+ runner_pid=os.getpid(), runner_uid=os.getuid())
+ ckpt.write(RunnerCkpt(task_status=update))
+
+ def write_process_status(status):
+ ckpt.write(RunnerCkpt(process_status=status))
+
+ if cls.is_task_terminal(state.statuses[-1].state):
+ log.info('Task is already in terminal state! Finalizing.')
+ cls.finalize_task(pathspec)
+ return
+
+ with closing(ckpt):
+ write_task_state(TaskState.ACTIVE)
+ for process, history in state.processes.items():
+ process_status = history[-1]
+ if not cls.is_process_terminal(process_status.state):
+ if cls.kill_process(state, process):
+ write_process_status(ProcessStatus(process=process,
+ state=ProcessState.KILLED, seq=process_status.seq + 1, return_code=-9,
+ stop_time=clock.time()))
+ else:
+ if process_status.state is not ProcessState.WAITING:
+ write_process_status(ProcessStatus(process=process,
+ state=ProcessState.LOST, seq=process_status.seq + 1))
+ write_task_state(terminal_status)
+ cls.finalize_task(pathspec)
+
+ @classmethod
+ def reap_children(cls):
+ pids = set()
+
+ while True:
+ try:
+ pid, status, rusage = os.wait3(os.WNOHANG)
+ if pid == 0:
+ break
+ pids.add(pid)
+ log.debug('Detected terminated process: pid=%s, status=%s, rusage=%s' % (
+ pid, status, rusage))
+ except OSError as e:
+ if e.errno != errno.ECHILD:
+ log.warning('Unexpected error when calling waitpid: %s' % e)
+ break
+
+ return pids
+
+ TERMINAL_PROCESS_STATES = frozenset([
+ ProcessState.SUCCESS,
+ ProcessState.KILLED,
+ ProcessState.FAILED,
+ ProcessState.LOST])
+
+ TERMINAL_TASK_STATES = frozenset([
+ TaskState.SUCCESS,
+ TaskState.FAILED,
+ TaskState.KILLED,
+ TaskState.LOST])
+
+ @classmethod
+ def is_process_terminal(cls, process_status):
+ return process_status in cls.TERMINAL_PROCESS_STATES
+
+ @classmethod
+ def is_task_terminal(cls, task_status):
+ return task_status in cls.TERMINAL_TASK_STATES
+
+ @classmethod
+ def initialize_task(cls, spec, task):
+ active_task = spec.given(state='active').getpath('task_path')
+ finished_task = spec.given(state='finished').getpath('task_path')
+ is_active, is_finished = os.path.exists(active_task), os.path.exists(finished_task)
+ if is_finished:
+ raise cls.Error('Cannot initialize task with "finished" record!')
+ if not is_active:
+ safe_mkdir(os.path.dirname(active_task))
+ with open(active_task, 'w') as fp:
+ fp.write(task)
+
+ @classmethod
+ def finalize_task(cls, spec):
+ active_task = spec.given(state='active').getpath('task_path')
+ finished_task = spec.given(state='finished').getpath('task_path')
+ is_active, is_finished = os.path.exists(active_task), os.path.exists(finished_task)
+ if not is_active:
+ raise cls.Error('Cannot finalize task with no "active" record!')
+ elif is_finished:
+ raise cls.Error('Cannot finalize task with "finished" record!')
+ safe_mkdir(os.path.dirname(finished_task))
+ os.rename(active_task, finished_task)
+ os.utime(finished_task, None)
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/apache/thermos/core/inspector.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/thermos/core/inspector.py b/src/main/python/apache/thermos/core/inspector.py
new file mode 100644
index 0000000..8c646a1
--- /dev/null
+++ b/src/main/python/apache/thermos/core/inspector.py
@@ -0,0 +1,105 @@
+from collections import namedtuple
+from contextlib import closing
+import pwd
+
+from twitter.common import log
+from twitter.common.recordio import RecordIO, ThriftRecordReader
+
+from twitter.thermos.common.ckpt import CheckpointDispatcher
+from twitter.thermos.common.path import TaskPath
+
+from gen.twitter.thermos.ttypes import (
+ ProcessState,
+ RunnerCkpt,
+ RunnerState)
+
+from .muxer import ProcessMuxer
+
+
+CheckpointInspection = namedtuple('CheckpointInspection',
+ ['runner_latest_update',
+ 'process_latest_update',
+ 'runner_processes',
+ 'coordinator_processes',
+ 'processes'])
+
+
+class CheckpointInspector(object):
+ def __init__(self, checkpoint_root):
+ self._path = TaskPath(root=checkpoint_root)
+
+ @staticmethod
+ def get_timestamp(process_record):
+ if process_record :
+ for timestamp in ('fork_time', 'start_time', 'stop_time'):
+ stamp = getattr(process_record, timestamp, None)
+ if stamp:
+ return stamp
+ return 0
+
+ def inspect(self, task_id):
+ """
+ Reconstructs the checkpoint stream and returns a CheckpointInspection.
+ """
+ dispatcher = CheckpointDispatcher()
+ state = RunnerState(processes = {})
+ muxer = ProcessMuxer(self._path.given(task_id=task_id))
+
+ runner_processes = []
+ coordinator_processes = set()
+ processes = set()
+
+ def consume_process_record(record):
+ if not record.process_status:
+ return
+ try:
+ user_uid = pwd.getpwnam(state.header.user).pw_uid
+ except KeyError:
+ log.error('Could not find user: %s' % state.header.user)
+ return
+ if record.process_status.state == ProcessState.FORKED:
+ coordinator_processes.add((record.process_status.coordinator_pid, user_uid,
+ record.process_status.fork_time))
+ elif record.process_status.state == ProcessState.RUNNING:
+ processes.add((record.process_status.pid, user_uid,
+ record.process_status.start_time))
+
+ # replay runner checkpoint
+ runner_pid = None
+ runner_latest_update = 0
+ try:
+ with open(self._path.given(task_id=task_id).getpath('runner_checkpoint')) as fp:
+ with closing(ThriftRecordReader(fp, RunnerCkpt)) as ckpt:
+ for record in ckpt:
+ dispatcher.dispatch(state, record)
+ runner_latest_update = max(runner_latest_update,
+ self.get_timestamp(record.process_status))
+ # collect all bound runners
+ if record.task_status:
+ if record.task_status.runner_pid != runner_pid:
+ runner_processes.append((record.task_status.runner_pid,
+ record.task_status.runner_uid or 0,
+ record.task_status.timestamp_ms))
+ runner_pid = record.task_status.runner_pid
+ elif record.process_status:
+ consume_process_record(record)
+ except (IOError, OSError, RecordIO.Error) as err:
+ log.debug('Error inspecting task runner checkpoint: %s' % err)
+ return
+
+ # register existing processes in muxer
+ for process_name in state.processes:
+ muxer.register(process_name)
+
+ # read process checkpoints
+ process_latest_update = runner_latest_update
+ for record in muxer.select():
+ process_latest_update = max(process_latest_update, self.get_timestamp(record.process_status))
+ consume_process_record(record)
+
+ return CheckpointInspection(
+ runner_latest_update=runner_latest_update,
+ process_latest_update=process_latest_update,
+ runner_processes=runner_processes,
+ coordinator_processes=coordinator_processes,
+ processes=processes)
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/apache/thermos/core/muxer.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/thermos/core/muxer.py b/src/main/python/apache/thermos/core/muxer.py
new file mode 100644
index 0000000..9c0b389
--- /dev/null
+++ b/src/main/python/apache/thermos/core/muxer.py
@@ -0,0 +1,142 @@
+import os
+import errno
+
+from twitter.common import log
+from twitter.common.recordio import ThriftRecordReader
+from gen.twitter.thermos.ttypes import RunnerCkpt
+
+
+class ProcessMuxer(object):
+ class ProcessExists(Exception): pass
+ class ProcessNotFound(Exception): pass
+ class CorruptCheckpoint(Exception): pass
+
+ def __init__(self, pathspec):
+ self._processes = {} # process_name => fp
+ self._watermarks = {} # process_name => sequence high watermark
+ self._pathspec = pathspec
+
+ def __del__(self):
+ for fp in filter(None, self._processes.values()):
+ fp.close()
+
+ def register(self, process_name, watermark=0):
+ log.debug('registering %s' % process_name)
+ if process_name in self._processes:
+ raise ProcessMuxer.ProcessExists("Process %s is already registered" % process_name)
+ self._processes[process_name] = None
+ self._watermarks[process_name] = watermark
+
+ def _bind_processes(self):
+ for process_name, fp in self._processes.items():
+ if fp is None:
+ process_ckpt = self._pathspec.given(process=process_name).getpath('process_checkpoint')
+ log.debug('ProcessMuxer binding %s => %s' % (process_name, process_ckpt))
+ try:
+ self._processes[process_name] = open(process_ckpt, 'r')
+ except IOError as e:
+ if e.errno == errno.ENOENT:
+ log.debug(' => bind failed, checkpoint not available yet.')
+ continue
+ else:
+ log.error("Unexpected inability to open %s! %s" % (process_ckpt, e))
+ except Exception as e:
+ log.error("Unexpected inability to open %s! %s" % (process_ckpt, e))
+ self._fast_forward_stream(process_name)
+
+ def _fast_forward_stream(self, process_name):
+ log.debug('Fast forwarding %s stream to seq=%s' % (process_name,
+ self._watermarks[process_name]))
+ assert self._processes.get(process_name) is not None
+ fp = self._processes[process_name]
+ rr = ThriftRecordReader(fp, RunnerCkpt)
+ current_watermark = -1
+ records = 0
+ while current_watermark < self._watermarks[process_name]:
+ last_pos = fp.tell()
+ record = rr.try_read()
+ if record is None:
+ break
+ new_watermark = record.process_status.seq
+ if new_watermark > self._watermarks[process_name]:
+ log.debug('Over-seeked %s [watermark = %s, high watermark = %s], rewinding.' % (
+ process_name, new_watermark, self._watermarks[process_name]))
+ fp.seek(last_pos)
+ break
+ current_watermark = new_watermark
+ records += 1
+
+ if current_watermark < self._watermarks[process_name]:
+ log.warning('Only able to fast forward to %s@sequence=%s, high watermark is %s' % (
+ process_name, current_watermark, self._watermarks[process_name]))
+
+ if records:
+ log.debug('Fast forwarded %s %s record(s) to seq=%s.' % (process_name, records,
+ current_watermark))
+
+ def unregister(self, process_name):
+ log.debug('unregistering %s' % process_name)
+ if process_name not in self._processes:
+ raise ProcessMuxer.ProcessNotFound("No trace of process: %s" % process_name)
+ else:
+ self._watermarks.pop(process_name)
+ fp = self._processes.pop(process_name)
+ if fp is not None:
+ fp.close()
+
+ def has_data(self, process):
+ """
+ Return true if we think that there are updates available from the supplied process.
+ """
+ self._bind_processes()
+ # TODO(wickman) Should this raise ProcessNotFound?
+ if process not in self._processes:
+ return False
+ fp = self._processes[process]
+ rr = ThriftRecordReader(fp, RunnerCkpt)
+ old_pos = fp.tell()
+ try:
+ expected_new_pos = os.fstat(fp.fileno()).st_size
+ except OSError as e:
+ log.debug('ProcessMuxer could not fstat for process %s' % process)
+ return False
+ update = rr.try_read()
+ if update:
+ fp.seek(old_pos)
+ return True
+ return False
+
+ def select(self):
+ """
+ Read and multiplex checkpoint records from all the forked off process coordinators.
+
+ Checkpoint records can come from one of two places:
+ in-process: checkpoint records synthesized for FORKED and LOST events
+ out-of-process: checkpoint records from from file descriptors of forked coordinators
+
+ Returns a list of RunnerCkpt objects that were successfully read, or an empty
+ list if none were read.
+ """
+ self._bind_processes()
+ updates = []
+ for handle in filter(None, self._processes.values()):
+ try:
+ fstat = os.fstat(handle.fileno())
+ except OSError as e:
+ log.error('Unable to fstat %s!' % handle.name)
+ continue
+ if handle.tell() > fstat.st_size:
+ log.error('Truncated checkpoint record detected on %s!' % handle.name)
+ elif handle.tell() < fstat.st_size:
+ rr = ThriftRecordReader(handle, RunnerCkpt)
+ while True:
+ process_update = rr.try_read()
+ if process_update:
+ updates.append(process_update)
+ else:
+ break
+ if len(updates) > 0:
+ log.debug('select() returning %s updates:' % len(updates))
+ for update in updates:
+ log.debug(' = %s' % update)
+ return updates
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/apache/thermos/core/process.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/thermos/core/process.py b/src/main/python/apache/thermos/core/process.py
new file mode 100644
index 0000000..dd1d62a
--- /dev/null
+++ b/src/main/python/apache/thermos/core/process.py
@@ -0,0 +1,371 @@
+"""Run processes of a Thermos task.
+
+This module contains the Process class, used to manage the execution of the constituent processes of
+a Thermos task. Each process is represented by a "coordinator" process, which fires off the actual
+commandline in a subprocess of its own.
+
+"""
+
+from abc import abstractmethod
+import getpass
+import grp
+import os
+import pwd
+import signal
+import subprocess
+import sys
+import time
+
+from twitter.common import log
+from twitter.common.dirutil import (
+ lock_file,
+ safe_mkdir,
+ safe_open,
+)
+from twitter.common.lang import Interface
+from twitter.common.quantity import Amount, Time
+from twitter.common.recordio import ThriftRecordReader, ThriftRecordWriter
+
+from gen.twitter.thermos.ttypes import (
+ ProcessState,
+ ProcessStatus,
+ RunnerCkpt,
+)
+
+
+class Platform(Interface):
+ """Abstract representation of a platform encapsulating system-level functions"""
+ @abstractmethod
+ def clock(self):
+ pass
+
+ @abstractmethod
+ def fork(self):
+ pass
+
+ @abstractmethod
+ def getpid(self):
+ pass
+
+
+class ProcessBase(object):
+ """
+ Encapsulate a running process for a task.
+ """
+ class Error(Exception): pass
+ class UnknownUserError(Error): pass
+ class CheckpointError(Error): pass
+ class UnspecifiedSandbox(Error): pass
+ class PermissionError(Error): pass
+
+ CONTROL_WAIT_CHECK_INTERVAL = Amount(100, Time.MILLISECONDS)
+ MAXIMUM_CONTROL_WAIT = Amount(1, Time.MINUTES)
+
+ def __init__(self, name, cmdline, sequence, pathspec, sandbox_dir, user=None, platform=None):
+ """
+ required:
+ name = name of the process
+ cmdline = cmdline of the process
+ sequence = the next available sequence number for state updates
+ pathspec = TaskPath object for synthesizing path names
+ sandbox_dir = the sandbox in which to run the process
+ platform = Platform providing fork, clock, getpid
+
+ optional:
+ user = the user to run as (if unspecified, will default to current user.)
+ if specified to a user that is not the current user, you must have root access
+ """
+ self._name = name
+ self._cmdline = cmdline
+ self._pathspec = pathspec
+ self._seq = sequence
+ self._sandbox = sandbox_dir
+ if self._sandbox:
+ safe_mkdir(self._sandbox)
+ self._pid = None
+ self._fork_time = None
+ self._stdout = None
+ self._stderr = None
+ self._user = user
+ if self._user:
+ user, current_user = self._getpwuid() # may raise self.UnknownUserError
+ if user != current_user and os.geteuid() != 0:
+ raise self.PermissionError('Must be root to run processes as other users!')
+ self._ckpt = None
+ self._ckpt_head = -1
+ if platform is None:
+ raise ValueError("Platform must be specified")
+ self._platform = platform
+
+ def _log(self, msg):
+ log.debug('[process:%5s=%s]: %s' % (self._pid, self.name(), msg))
+
+ def _ckpt_write(self, msg):
+ self._init_ckpt_if_necessary()
+ self._log("child state transition [%s] <= %s" % (self.ckpt_file(), msg))
+ self._ckpt.write(msg)
+
+ def _write_process_update(self, **kw):
+ """Write a process update to the coordinator's checkpoint stream."""
+ process_status = ProcessStatus(**kw)
+ process_status.seq = self._seq
+ process_status.process = self.name()
+ self._ckpt_write(RunnerCkpt(process_status=process_status))
+ self._seq += 1
+
+ def _write_initial_update(self):
+ self._write_process_update(state=ProcessState.FORKED,
+ fork_time=self._fork_time,
+ coordinator_pid=self._pid)
+
+ def cmdline(self):
+ return self._cmdline
+
+ def name(self):
+ return self._name
+
+ def pid(self):
+ """pid of the coordinator"""
+ return self._pid
+
+ def rebind(self, pid, fork_time):
+ """rebind Process to an existing coordinator pid without forking"""
+ self._pid = pid
+ self._fork_time = fork_time
+
+ def ckpt_file(self):
+ return self._pathspec.getpath('process_checkpoint')
+
+ def _setup_ckpt(self):
+ """Set up the checkpoint: must be run on the parent."""
+ self._log('initializing checkpoint file: %s' % self.ckpt_file())
+ ckpt_fp = lock_file(self.ckpt_file(), "a+")
+ if ckpt_fp in (None, False):
+ raise self.CheckpointError('Could not acquire checkpoint permission or lock for %s!' %
+ self.ckpt_file())
+ self._ckpt_head = os.path.getsize(self.ckpt_file())
+ ckpt_fp.seek(self._ckpt_head)
+ self._ckpt = ThriftRecordWriter(ckpt_fp)
+ self._ckpt.set_sync(True)
+
+ def _init_ckpt_if_necessary(self):
+ if self._ckpt is None:
+ self._setup_ckpt()
+
+ def _wait_for_control(self):
+ """Wait for control of the checkpoint stream: must be run in the child."""
+ total_wait_time = Amount(0, Time.SECONDS)
+
+ with open(self.ckpt_file(), 'r') as fp:
+ fp.seek(self._ckpt_head)
+ rr = ThriftRecordReader(fp, RunnerCkpt)
+ while total_wait_time < self.MAXIMUM_CONTROL_WAIT:
+ ckpt_tail = os.path.getsize(self.ckpt_file())
+ if ckpt_tail == self._ckpt_head:
+ self._platform.clock().sleep(self.CONTROL_WAIT_CHECK_INTERVAL.as_(Time.SECONDS))
+ total_wait_time += self.CONTROL_WAIT_CHECK_INTERVAL
+ continue
+ checkpoint = rr.try_read()
+ if checkpoint:
+ if not checkpoint.process_status:
+ raise self.CheckpointError('No process status in checkpoint!')
+ if (checkpoint.process_status.process != self.name() or
+ checkpoint.process_status.state != ProcessState.FORKED or
+ checkpoint.process_status.fork_time != self._fork_time or
+ checkpoint.process_status.coordinator_pid != self._pid):
+ self._log('Losing control of the checkpoint stream:')
+ self._log(' fork_time [%s] vs self._fork_time [%s]' % (
+ checkpoint.process_status.fork_time, self._fork_time))
+ self._log(' coordinator_pid [%s] vs self._pid [%s]' % (
+ checkpoint.process_status.coordinator_pid, self._pid))
+ raise self.CheckpointError('Lost control of the checkpoint stream!')
+ self._log('Taking control of the checkpoint stream at record: %s' %
+ checkpoint.process_status)
+ self._seq = checkpoint.process_status.seq + 1
+ return True
+ raise self.CheckpointError('Timed out waiting for checkpoint stream!')
+
+ def _prepare_fork(self):
+ user, current_user = self._getpwuid()
+ uid, gid = user.pw_uid, user.pw_gid
+ self._fork_time = self._platform.clock().time()
+ self._setup_ckpt()
+ self._stdout = safe_open(self._pathspec.with_filename('stdout').getpath('process_logdir'), "w")
+ self._stderr = safe_open(self._pathspec.with_filename('stderr').getpath('process_logdir'), "w")
+ os.chown(self._stdout.name, user.pw_uid, user.pw_gid)
+ os.chown(self._stderr.name, user.pw_uid, user.pw_gid)
+
+ def _finalize_fork(self):
+ self._write_initial_update()
+ self._ckpt.close()
+ self._ckpt = None
+
+ def _getpwuid(self):
+ """Returns a tuple of the user (i.e. --user) and current user."""
+ try:
+ current_user = pwd.getpwuid(os.getuid())
+ except KeyError:
+ raise self.UnknownUserError('Unknown user %s!' % self._user)
+ try:
+ user = pwd.getpwnam(self._user) if self._user else current_user
+ except KeyError:
+ raise self.UnknownUserError('Unable to get pwent information!')
+ return user, current_user
+
+ def start(self):
+ """
+ This is the main call point from the runner, and forks a co-ordinator process to run the
+ target process (i.e. self.cmdline())
+
+ The parent returns immediately and populates information about the pid of the co-ordinator.
+ The child (co-ordinator) will launch the target process in a subprocess.
+ """
+ self._prepare_fork()
+ self._pid = self._platform.fork()
+ if self._pid == 0:
+ self._pid = self._platform.getpid()
+ self._wait_for_control()
+ try:
+ self.execute()
+ finally:
+ self._ckpt.close()
+ self.finish()
+ else:
+ self._finalize_fork()
+
+ def execute(self):
+ raise NotImplementedError
+
+ def finish(self):
+ pass
+
+
+class RealPlatform(Platform):
+ IGNORE_SIGNALS = (signal.SIGINT,)
+
+ def __init__(self, fork=os.fork):
+ self._fork = fork
+
+ def fork(self):
+ pid = self._fork()
+ if pid == 0:
+ self._sanitize()
+ return pid
+
+ def _sanitize(self):
+ for sig in self.IGNORE_SIGNALS:
+ signal.signal(sig, signal.SIG_IGN)
+
+ def getpid(self):
+ return os.getpid()
+
+ def clock(self):
+ return time
+
+
+class Process(ProcessBase):
+ """
+ Encapsulate a running process for a task.
+ """
+ RCFILE = '.thermos_profile'
+ FD_CLOEXEC = True
+
+ def __init__(self, *args, **kw):
+ """
+ See ProcessBase.__init__
+
+ Takes additional arguments:
+ fork: the fork function to use [default: os.fork]
+ chroot: whether or not to chroot into the sandbox [default: False]
+ """
+ fork = kw.pop('fork', os.fork)
+ self._use_chroot = bool(kw.pop('chroot', False))
+ self._rc = None
+ kw['platform'] = RealPlatform(fork=fork)
+ ProcessBase.__init__(self, *args, **kw)
+ if self._use_chroot and self._sandbox is None:
+ raise self.UnspecifiedSandbox('If using chroot, must specify sandbox!')
+
+ def _chroot(self):
+ """chdir and chroot to the sandbox directory."""
+ os.chdir(self._sandbox)
+ os.chroot(self._sandbox)
+
+ def _setuid(self):
+ """Drop privileges to the user supplied in Process creation (if necessary.)"""
+ user, current_user = self._getpwuid()
+ if user.pw_uid == current_user.pw_uid:
+ return
+
+ uid, gid = user.pw_uid, user.pw_gid
+ username = user.pw_name
+ group_ids = [group.gr_gid for group in grp.getgrall() if username in group.gr_mem]
+ os.setgroups(group_ids)
+ os.setgid(gid)
+ os.setuid(uid)
+
+ def execute(self):
+ """Perform final initialization and launch target process commandline in a subprocess."""
+ if not self._stderr:
+ raise RuntimeError('self._stderr not set up!')
+ if not self._stdout:
+ raise RuntimeError('self._stdout not set up!')
+
+ user, _ = self._getpwuid()
+ username, homedir = user.pw_name, user.pw_dir
+
+ # TODO(wickman) reconsider setsid now that we're invoking in a subshell
+ os.setsid()
+ if self._use_chroot:
+ self._chroot()
+ self._setuid()
+
+ # start process
+ start_time = self._platform.clock().time()
+
+ if not self._sandbox:
+ sandbox = os.getcwd()
+ else:
+ sandbox = self._sandbox if not self._use_chroot else '/'
+
+ thermos_profile = os.path.join(sandbox, self.RCFILE)
+ env = {
+ 'HOME': homedir if self._use_chroot else sandbox,
+ 'LOGNAME': username,
+ 'USER': username,
+ 'PATH': os.environ['PATH']
+ }
+
+ if os.path.exists(thermos_profile):
+ env.update(BASH_ENV=thermos_profile)
+
+ self._popen = subprocess.Popen(["/bin/bash", "-c", self.cmdline()],
+ stderr=self._stderr,
+ stdout=self._stdout,
+ close_fds=self.FD_CLOEXEC,
+ cwd=sandbox,
+ env=env)
+
+ self._write_process_update(state=ProcessState.RUNNING,
+ pid=self._popen.pid,
+ start_time=start_time)
+
+ # wait for job to finish
+ rc = self._popen.wait()
+
+ # indicate that we have finished/failed
+ if rc < 0:
+ state = ProcessState.KILLED
+ elif rc == 0:
+ state = ProcessState.SUCCESS
+ else:
+ state = ProcessState.FAILED
+
+ self._write_process_update(state=state,
+ return_code=rc,
+ stop_time=self._platform.clock().time())
+ self._rc = rc
+
+ def finish(self):
+ self._log('Coordinator exiting.')
+ sys.exit(0)