You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aurora.apache.org by wf...@apache.org on 2013/12/31 22:20:04 UTC
[11/51] [partial] Rename twitter* and com.twitter to apache and
org.apache directories to preserve all file history before the refactor.
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/twitter/aurora/tools/java/thrift_wrapper_codegen.py
----------------------------------------------------------------------
diff --git a/src/main/python/twitter/aurora/tools/java/thrift_wrapper_codegen.py b/src/main/python/twitter/aurora/tools/java/thrift_wrapper_codegen.py
deleted file mode 100644
index 4376e85..0000000
--- a/src/main/python/twitter/aurora/tools/java/thrift_wrapper_codegen.py
+++ /dev/null
@@ -1,475 +0,0 @@
-#!/usr/bin/env python
-
-from __future__ import print_function
-
-import os
-import re
-import sys
-
-
-class Type(object):
- '''A data type.'''
-
- def __init__(self, name, package=None, immutable=False):
- self.name = name
- self.package = package
- self.immutable = immutable
-
- def absolute_name(self):
- return '%s.%s' % (self.package, self.name) if self.package else self.name
-
- def __str__(self):
- return '%s (%smutable)' % (self.absolute_name(), 'im' if self.immutable else '')
-
-
-class PrimitiveType(Type):
- '''A primitive type, with its associated typeboxed name.'''
-
- def __init__(self, name, boxed_name):
- Type.__init__(self, name, package=None, immutable=True)
- self.boxed_name = boxed_name
-
-
-class ParameterizedType(Type):
- '''A parameterized type, usually a collection.'''
-
- def __init__(self, name, params):
- Type.__init__(self, name, None)
- self.params = params
-
- def param_names(self):
- def name(t):
- if isinstance(t, StructType):
- return t.codegen_name
- elif isinstance(t, PrimitiveType):
- return t.boxed_name
- else:
- return t.name
- return ', '.join([name(p) for p in self.params])
-
-
-class StructType(Type):
- '''A thrift-defined type, which composes other types as fields.'''
-
- def __init__(self, name, package, kind, fields):
- Type.__init__(self, name, package, kind == 'enum')
- self.codegen_name = 'I%s' % name
- self.kind = kind
- self.fields = fields
-
- def __str__(self):
- return '%s %s { %s }' % (self.kind, self.name, ', '.join(map(str, self.fields)))
-
-
-class Field(object):
- '''A field within a thrift structure.'''
-
- def __init__(self, ttype, name):
- self.ttype = ttype
- self.name = name
-
- def accessor_method(self):
- return '%s%s' % (
- 'is' if self.ttype.name == 'boolean' else 'get',
- self.name[:1].capitalize() + self.name[1:])
-
- def isset_method(self):
- return 'isSet%s' % (self.name[0].upper() + self.name[1:])
-
- def __str__(self):
- return '%s: %s' % (self.name, self.ttype)
-
-
-FIELD_TEMPLATE = ''' public %(type)s %(fn_name)s() {
- return %(field)s;
- }'''
-
-
-# Template string for a method to access an immutable field.
-IMMUTABLE_FIELD_TEMPLATE = ''' public %(type)s %(fn_name)s() {
- return wrapped.%(fn_name)s();
- }'''
-
-
-STRUCT_DECLARATION = '''private final %(type)s %(field)s;'''
-STRUCT_ASSIGNMENT = '''this.%(field)s = !wrapped.%(isset)s()
- ? null
- : %(type)s.buildNoCopy(wrapped.%(fn_name)s());'''
-
-
-IMMUTABLE_COLLECTION_DECLARATION = '''private final Immutable%(collection)s<%(params)s> %(field)s;'''
-IMMUTABLE_COLLECTION_ASSIGNMENT = '''this.%(field)s = !wrapped.%(isset)s()
- ? Immutable%(collection)s.<%(params)s>of()
- : Immutable%(collection)s.copyOf(wrapped.%(fn_name)s());'''
-
-
-# Tempalte string for assignment for a collection field containing a struct.
-STRUCT_COLLECTION_FIELD_ASSIGNMENT = '''this.%(field)s = !wrapped.%(isset)s()
- ? Immutable%(collection)s.<%(params)s>of()
- : FluentIterable.from(wrapped.%(fn_name)s())
- .transform(%(params)s.FROM_BUILDER)
- .to%(collection)s();'''
-
-PACKAGE_NAME = 'com.twitter.aurora.scheduler.storage.entities'
-
-CLASS_TEMPLATE = '''/*
- * Copyright 2013 Twitter, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package %(package)s;
-
-%(imports)s
-
-/**
- * An immutable wrapper class.
- * <p>
- * This code is auto-generated, and should not be directly modified.
- */
-public final class %(name)s {
- private final %(wrapped)s wrapped;
-%(fields)s
- private %(name)s(%(wrapped)s wrapped) {
- this.wrapped = Preconditions.checkNotNull(wrapped);%(assignments)s
- }
-
- static %(name)s buildNoCopy(%(wrapped)s wrapped) {
- return new %(name)s(wrapped);
- }
-
- public static %(name)s build(%(wrapped)s wrapped) {
- return buildNoCopy(wrapped.deepCopy());
- }
-
- public static final Function<%(name)s, %(wrapped)s> TO_BUILDER =
- new Function<%(name)s, %(wrapped)s>() {
- @Override
- public %(wrapped)s apply(%(name)s input) {
- return input.newBuilder();
- }
- };
-
- public static final Function<%(wrapped)s, %(name)s> FROM_BUILDER =
- new Function<%(wrapped)s, %(name)s>() {
- @Override
- public %(name)s apply(%(wrapped)s input) {
- return new %(name)s(input);
- }
- };
-
- public static ImmutableList<%(wrapped)s> toBuildersList(Iterable<%(name)s> w) {
- return FluentIterable.from(w).transform(TO_BUILDER).toList();
- }
-
- public static ImmutableList<%(name)s> listFromBuilders(Iterable<%(wrapped)s> b) {
- return FluentIterable.from(b).transform(FROM_BUILDER).toList();
- }
-
- public static ImmutableSet<%(wrapped)s> toBuildersSet(Iterable<%(name)s> w) {
- return FluentIterable.from(w).transform(TO_BUILDER).toSet();
- }
-
- public static ImmutableSet<%(name)s> setFromBuilders(Iterable<%(wrapped)s> b) {
- return FluentIterable.from(b).transform(FROM_BUILDER).toSet();
- }
-
- public %(wrapped)s newBuilder() {
- return wrapped.deepCopy();
- }
-
-%(accessors)s
-
- @Override
- public boolean equals(Object o) {
- if (!(o instanceof %(name)s)) {
- return false;
- }
- %(name)s other = (%(name)s) o;
- return wrapped.equals(other.wrapped);
- }
-
- @Override
- public int hashCode() {
- return wrapped.hashCode();
- }
-
- @Override
- public String toString() {
- return wrapped.toString();
- }
-}'''
-
-
-class GeneratedCode(object):
- def __init__(self, class_name, wrapped_type):
- self._class_name = class_name
- self._wrapped_type = wrapped_type
- self._imports = set()
- self._accessors = []
- self._fields = []
- self._assignments = []
-
- def add(self, s, end='\n'):
- print('This no longer does anything.')
-
- def add_import(self, import_class):
- self._imports.add(import_class)
-
- def add_assignment(self, field, assignment):
- self._fields.append(field)
- self._assignments.append(assignment)
-
- def add_accessor(self, accessor_method):
- self._accessors.append(accessor_method)
-
- def dump(self, f):
- remaining_imports = list(self._imports)
- import_groups = []
- def remove_by_prefix(prefix):
- group = [i for i in remaining_imports if i.startswith(prefix)]
- remaining_imports[:] = [i for i in remaining_imports if not i.startswith(prefix)]
- return group
-
- def add_import_group(group):
- if group:
- import_groups.append('\n'.join(['import %s;' % i for i in sorted(group)]))
-
- twitter_imports = remove_by_prefix('com.twitter')
- add_import_group(remove_by_prefix('java'))
- add_import_group(remove_by_prefix('com'))
- add_import_group(remove_by_prefix('net'))
- add_import_group(remove_by_prefix('org'))
- add_import_group(twitter_imports)
-
- print(CLASS_TEMPLATE % {
- 'package': PACKAGE_NAME,
- 'name': self._class_name,
- 'wrapped': self._wrapped_type,
- 'imports': '\n\n'.join(import_groups),
- 'accessors': '\n\n'.join(self._accessors),
- 'fields': (' ' + '\n '.join(self._fields) + '\n') if self._fields else '',
- 'assignments': ('\n ' + '\n '.join(self._assignments)) if self._assignments else '',
- }, file=f)
-
-
-# A namespace declaration, e.g.:
-# namespace java com.twitter.aurora.gen
-NAMESPACE_RE = 'namespace\s+(?P<lang>\w+)\s+(?P<namespace>[^\s]+)'
-
-# A possibly-parameterized type name, e.g.:
-# int
-# TaskConfig
-# Set<String>
-# Map<String, TaskConfig>
-TYPE_PATTERN = '(?P<type>\w+)(?:<(?P<params>[^>]+)>)?'
-
-
-# Matches a complete struct definnition, capturing the type and body.
-STRUCT_RE = '(?P<kind>enum|struct|union)\s+(?P<name>\w+)\s+{(?P<body>[^}]+)}'
-
-
-# A field definition within a struct, e.g.:
-# 1: string name
-# 15: Map<String, TaskConfig> configs # Configs mapped by name.
-FIELD_RE = '\s*\d+:\s+(?:(?:required|optional)\s+)?(%s)\s+(?P<name>\w+).*' % TYPE_PATTERN
-
-
-def parse_structs(thrift_defs):
- '''Read all thrift structures found in a file.
-
- This returns a list of Type objects representing the structs found
- and the fields they contain.
- '''
- # Capture all namespace definitions.
- namespaces = dict(re.findall(NAMESPACE_RE, thrift_defs))
-
- def parse_field(field):
- type_name = field.group('type')
- type_params = field.group('params')
- if type_params:
- params = [Type(p) for p in type_params.replace(' ', '').split(',')]
- ttype = ParameterizedType(type_name, params)
- else:
- ttype = Type(type_name)
- return Field(ttype, field.group('name'))
-
- def parse_fields(field_str):
- return map(parse_field, re.finditer(FIELD_RE, field_str))
-
- return [StructType(s.group('name'),
- namespaces['java'],
- s.group('kind'),
- parse_fields(s.group('body')))
- for s in re.finditer(STRUCT_RE, thrift_defs, flags=re.MULTILINE)]
-
-
-def generate_java(struct):
- code = GeneratedCode(struct.codegen_name, struct.name)
- code.add_import('com.google.common.base.Preconditions')
- code.add_import('com.google.common.base.Function')
- code.add_import('com.google.common.collect.ImmutableList')
- code.add_import('com.google.common.collect.ImmutableSet')
- code.add_import('com.google.common.collect.FluentIterable')
- code.add_import(struct.absolute_name())
-
- if struct.kind == 'union':
- code.add_accessor(IMMUTABLE_FIELD_TEMPLATE
- % {'type': '%s._Fields' % struct.name, 'fn_name': 'getSetField'})
-
- # Accessor for each field.
- for field in struct.fields:
- if not (isinstance(field.ttype, StructType) and (field.ttype.kind == 'enum' or struct.kind == 'union')):
- code.add_accessor(IMMUTABLE_FIELD_TEMPLATE
- % {'type': 'boolean',
- 'fn_name': field.isset_method()})
- if field.ttype.immutable:
- code.add_accessor(IMMUTABLE_FIELD_TEMPLATE % {'type': field.ttype.name,
- 'fn_name': field.accessor_method()})
- elif not struct.kind == 'union':
- if isinstance(field.ttype, StructType):
- return_type = field.ttype.codegen_name
- elif isinstance(field.ttype, ParameterizedType):
- return_type = '%s<%s>' % (field.ttype.name, field.ttype.param_names())
- else:
- return_type = field.ttype.name
- code.add_accessor(FIELD_TEMPLATE % {'type': return_type,
- 'fn_name': field.accessor_method(),
- 'field': field.name})
-
- if isinstance(field.ttype, StructType):
- if field.ttype.kind == 'enum':
- code.add_import(field.ttype.absolute_name())
-
- if field.ttype.immutable:
- # Direct accessor was already added.
- pass
- elif struct.kind == 'union':
- copy_field = '%s.build(wrapped.%s())' % (field.ttype.codegen_name,
- field.accessor_method())
- code.add_accessor(FIELD_TEMPLATE % {'type': field.ttype.codegen_name,
- 'fn_name': field.accessor_method(),
- 'field': copy_field})
- else:
- args = {
- 'field': field.name,
- 'fn_name': field.accessor_method(),
- 'isset': field.isset_method(),
- 'type': field.ttype.codegen_name,
- }
- code.add_assignment(STRUCT_DECLARATION % args, STRUCT_ASSIGNMENT % args)
- elif isinstance(field.ttype, ParameterizedType):
- # Add necessary imports, supporting only List, Map, Set.
- assert field.ttype.name in ['List', 'Map', 'Set'], 'Unrecognized type %s' % field.ttype.name
- code.add_import('com.google.common.collect.Immutable%s' % field.ttype.name)
- code.add_import('java.util.%s' % field.ttype.name)
-
- params = field.ttype.params
- if all([p.immutable for p in params]):
- # All parameter types are immutable.
- assignment = IMMUTABLE_COLLECTION_ASSIGNMENT
- elif len(params) == 1:
- # Only one non-immutable parameter.
- # Assumes the parameter type is a struct and our code generator
- # will make a compatible wrapper class and constructor.
- assignment = STRUCT_COLLECTION_FIELD_ASSIGNMENT
- else:
- assert False, 'Unable to codegen accessor field for %s' % field.name
- args = {'collection': field.ttype.name,
- 'field': field.name,
- 'fn_name': field.accessor_method(),
- 'isset': field.isset_method(),
- 'params': field.ttype.param_names()}
- code.add_assignment(IMMUTABLE_COLLECTION_DECLARATION % args, assignment % args)
- elif not field.ttype.immutable:
- assert False, 'Making type %s immutable is not supported.' % field.ttype.name
- return code
-
-
-THRIFT_ALIASES = {
- 'bool': 'boolean',
- 'i32': 'int',
- 'i64': 'long',
- 'double': 'double',
- 'string': 'String',
- 'list': 'List',
- 'set': 'Set',
- 'map': 'Map',
- 'binary': 'byte[]',
-}
-
-
-def main(args):
- if len(args) != 4:
- print('usage: %s thrift_file struct_name output_directory' % sys.argv[0])
- sys.exit(1)
-
- thrift_file, struct_name, output_directory = sys.argv[1:]
- print('Searching for %s in %s' % (sys.argv[2], sys.argv[1]))
- with open(sys.argv[1]) as f:
- # Load all structs found in the thrift file.
- structs = parse_structs(f.read())
-
- # The symbol table stores information about types we recognize.
- # As new symbols are parsed, they are accumulated here.
- # This is also seeded with JDK types.
- # Note: byte[] is not immutable, but we'd rather accept it than copy.
- primitives = dict((t, PrimitiveType(t, b)) for (t, b) in [('boolean', 'Boolean'),
- ('int', 'Integer'),
- ('long', 'Long'),
- ('double', 'Double')])
- lang_symbols = dict((t, Type(t, 'java.lang', immutable=True)) for t in ['String', 'byte[]'])
- util_symbols = dict((t, Type(t, 'java.util')) for t in ['List', 'Map', 'Set'])
- symbol_table = dict(primitives.items() + lang_symbols.items() + util_symbols.items())
-
- def load_dependencies(struct):
- # Fill in type information for fields by searching for dependencies.
- for field in struct.fields:
- if isinstance(field.ttype, ParameterizedType):
- field.ttype.name = find_symbol(field.ttype.name).name
- field.ttype.params = [find_symbol(p.name) for p in field.ttype.params]
- else:
- field.ttype = find_symbol(field.ttype.name)
-
- def find_symbol(name):
- name = THRIFT_ALIASES.get(name, name)
- if name in symbol_table:
- return symbol_table[name]
-
- symbol = next((s for s in structs if s.name == name), None)
- assert symbol, 'Failed to find required struct %s' % name
- load_dependencies(symbol)
- symbol_table[name] = symbol
- return symbol
-
- find_symbol(sys.argv[2])
- print('Symbol table:')
- for _, symbol in symbol_table.items():
- print(' %s' % symbol)
-
- for _, symbol in symbol_table.items():
- if isinstance(symbol, StructType):
- if symbol.kind == 'enum':
- print('Skipping code generation for %s, since it is immutable' % symbol.name)
- else:
- package_dir = os.path.join(sys.argv[3], PACKAGE_NAME.replace('.', os.path.sep))
- if not os.path.isdir(package_dir):
- os.makedirs(package_dir)
- gen_file = os.path.join(package_dir, '%s.java' % symbol.codegen_name)
- print('Generating %s' % gen_file)
- with open(gen_file, 'w') as f:
- code = generate_java(symbol)
- code.dump(f)
-
-
-if __name__ == '__main__':
- main(sys.argv)
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/twitter/thermos/BUILD
----------------------------------------------------------------------
diff --git a/src/main/python/twitter/thermos/BUILD b/src/main/python/twitter/thermos/BUILD
deleted file mode 100644
index 937bcf3..0000000
--- a/src/main/python/twitter/thermos/BUILD
+++ /dev/null
@@ -1,18 +0,0 @@
-import os
-
-page(name = 'readme', source = 'README.md')
-
-python_library(
- name = 'thermos',
- dependencies = [
- pants('src/main/python/twitter/thermos/core'),
- pants('src/main/python/twitter/thermos/monitoring'),
- ],
- provides = setup_py(
- name = 'twitter.thermos',
- version = open(os.path.join(get_buildroot(), '.auroraversion')).read().strip().lower(),
- descripton = 'The Twitter Thermos runtime.',
- ).with_binaries(
- thermos = pants('src/main/python/twitter/thermos/bin:thermos'),
- )
-)
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/twitter/thermos/BUILD.thirdparty
----------------------------------------------------------------------
diff --git a/src/main/python/twitter/thermos/BUILD.thirdparty b/src/main/python/twitter/thermos/BUILD.thirdparty
deleted file mode 100644
index 5a54f4a..0000000
--- a/src/main/python/twitter/thermos/BUILD.thirdparty
+++ /dev/null
@@ -1,34 +0,0 @@
-python_requirement(
- name = 'psutil',
- requirement = 'psutil==1.1.2',
-)
-
-python_requirement(
- name = 'pystachio',
- requirement = 'pystachio==0.7.2',
-)
-
-python_requirement(
- name = 'mako',
- requirement = 'mako==0.4.0',
-)
-
-python_requirement(
- name = 'cherrypy',
- requirement = 'cherrypy==3.2.2',
-)
-
-python_requirement(
- name = 'bottle',
- requirement = 'bottle==0.11.6',
-)
-
-python_requirement(
- name = 'thrift',
- requirement = 'thrift==0.9.1',
-)
-
-python_requirement(
- name = 'mock',
- requirement = 'mock==1.0.1',
-)
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/twitter/thermos/README.md
----------------------------------------------------------------------
diff --git a/src/main/python/twitter/thermos/README.md b/src/main/python/twitter/thermos/README.md
deleted file mode 100644
index e456e93..0000000
--- a/src/main/python/twitter/thermos/README.md
+++ /dev/null
@@ -1,461 +0,0 @@
-** This document is deprecated and no longer updated. **
-
-# Thermos manual #
-
-[TOC]
-
-
-## tl;dr ##
-
-** You noticed the deprecation notice, right? **
-
-### What is Thermos? ###
-
-Thermos a simple process management framework used for orchestrating
-dependent processes within a single chroot. At Twitter, it is used as a
-process manager for Mesos tasks. In practice, there is a one-to-one
-correspondence between a Mesos task and Thermos task. This document
-describes how to use Thermos in a local development environment and does not
-describe how to run Thermos tasks on Mesos, though once you have a valid
-Thermos configuration doing so is a small step.
-
-### Simplest invocation ###
-
-Thermos lives in `science` at Twitter and publically on GitHub in `twitter-commons` (TBD).
-
-#### build ####
-
-Build the Thermos CLI and the Thermos observer.
-
-```shell
-$ ./pants src/python/twitter/thermos
-$ ./pants src/python/twitter/thermos/bin:thermos_observe
-```
-
-You can copy `dist/thermos.pex` to `thermos` somewhere in your `$PATH` or use a la carte.
-
-#### simplerun ####
-
-You can run Thermos tasks without first writing a configuration file using `simplerun`:
-
-```shell
-$ thermos simplerun 'echo hello world'
-Running command: 'echo hello world'
- INFO] Forking Process(simple)
- INFO] Process(simple) finished successfully [rc=0]
- INFO] Killing task id: simple-20120529-162532.018646
- INFO] => Current user: wickman
- INFO] Kill complete.
- INFO] Task succeeded.
-```
-
-#### introspection ####
-
-```shell
-$ thermos status --verbosity=3 simple-20120529-162532.018646
-Found task simple-20120529-162532.018646
- simple-20120529-162532.018646 [owner: wickman] state: SUCCESS start: Tue May 29 16:25:32 2012
- user: wickman ports: None
- sandbox: None
- process table:
- - simple runs: 1 last: pid=57471, rc=0, finish:Tue May 29 16:25:32 2012, state:SUCCESS
-
-$ thermos tail simple-20120529-162532.018646
-Tail of terminal log /Users/wickman/.thermos/logs/simple-20120529-162532.018646/simple/0/stdout
-hello world
-```
-
-#### thermos observer ####
-
-```shell
-$ dist/thermos_observe.pex --root=$HOME/.thermos
-```
-
-This will fire up a webserver on `localhost:1338` that you can use as a web interface to interact with
-locally running Thermos tasks.
-
-
-## Building blocks ##
-
-Thermos is made of Processes and Tasks.
-
-### Thermos processes ###
-
-A Thermos process is simply a command-line that is invoked in a subshell. A single
-run of a process may succeed (have a zero exit status) or fail (have a
-non-zero exit status.) A process is considered permanently failed after a
-maximum number of individual run failures, by default one.
-
-### Thermos tasks ###
-
-A Thermos task is a collection of processes and constraints that dictate how
-and when to run them. By default there are no constraints between processes
-bound to a task and they run in parallel. The simplest (and currently only)
-Task level constraint is the `order` dependency: process B should not be run
-until process A has completed successfully. For example, process A could be
-`git clone` and process B could be `rake test`. It doesn't make sense to
-run process B until process A is fully completed and successful.
-
-
-### Thermos configuration ###
-
-The easiest way to invoke Thermos is using `thermos simplerun`. Under the
-covers, this synthesizes a Thermos task with a single Thermos process named
-`simple`. To do the same thing via Thermos configuration, add
-the following to `simple.thermos`:
-
-```python
-process = Process(name = 'simple', cmdline = 'echo hello world')
-task = Task(name = 'simple', processes = [process])
-export(task)
-```
-
-Then invoke by `thermos run simple.thermos`.
-
-Configuring Thermos is done through
-[pystachio](http://github.com/wickman/pystachio) templates. These templates
-behave like structurally typed collections of key-value pairs. You can
-either construct configuration through plain old Python objects (as above)
-or as Python dictionaries or JSON that is coerced into Python dictionaries.
-
-For example, the above configuration is equivalent to:
-
-```python
-export({
- 'name': 'simple',
- 'processes': [{
- 'name': 'simple',
- 'cmdline': 'echo hello world'
- }]
-})
-```
-
-The full Thermos pystachio schema can be found at
-`src/python/twitter/thermos/config/schema.py` and is mostly described below.
-
-
-## Configuration reference ##
-
-### Process objects ###
-
-Processes fundamentally consist of a `name`, `cmdline`. You will rarely
-need to specify anything more.
-
-<table>
- <tr> <td colspan=2> <b>Process schema</b> </td> </tr>
- <tr> <td> <em>name</em> (required) </td> <td> Process name (String) </td> </tr>
- <tr> <td> <em>cmdline</em> (required) </td> <td> Command line (String) </td> </tr>
- <tr> <td> <em>max_failures</em> </td> <td> Max failures (Integer, default 1) </td> </tr>
- <tr> <td> <em>daemon</em> </td> <td> Daemon process? (Boolean, default False) </td> </tr>
- <tr> <td> <em>ephemeral</em> </td> <td> Ephemeral process? (Boolean, default False) </td> </tr>
- <tr> <td> <em>min_duration</em> </td> <td> Min duration between runs in seconds (Integer, default 15) </td> </tr>
- <tr> <td> <em>final</em> </td> <td> This is a finalizing process that should run last (Boolean, default False) </td> </tr>
-</table>
-
-
-#### name ####
-
-The name is any string that is a valid UNIX filename (specifically no
-slashes, NULLs or leading periods.) Each process name within a task must be
-unique.
-
-#### cmdline ####
-
-The command line is invoked in a bash subshell, so can be full-blown bash
-scripts, though no command-line arguments are supplied.
-
-#### max_failures ####
-
-The maximum number of failures (non-zero exit statuses) this process can
-sustain before being marked permanently failed and not retried. If a
-process becomes permanently failed, Thermos looks to the task failure limit
-(usually 1) to determine whether or not the Thermos task should be failed.
-
-Setting max_failures to 0 means that this process will be retried
-indefinitely until a successful (zero) exit status is achieved. It will be retried
-at most once every `min_duration` seconds in order to prevent DoSing the
-coordinating thermos scheduler.
-
-#### daemon ####
-
-By default Thermos processes are non-daemon. If `daemon` is set to True, a
-successful (zero) exit status will not prevent future process runs.
-Instead, the process will be reinvoked after min_duration seconds. However,
-the maximum failure limit will still apply. A combination of `daemon=True`
-and `max_failures=0` will cause a process to be retried indefinitely
-regardless of exit status. This should generally be avoided for very
-short-lived processes because of the accumulation of checkpointed state for
-each process run.
-
-#### ephemeral ####
-
-By default Thermos processes are non-ephemeral. If `ephemeral` is set to
-True, the status of the process is not used to determine whether the task in
-which it is bound has completed. Take for example a Thermos task with a
-non-ephemeral webserver process and an ephemeral logsaver process that
-periodically checkpoints its log files to a centralized data store. The
-task is considered finished once the webserver process has completed,
-regardless of the current status of the logsaver.
-
-#### min_duration ####
-
-Processes may succeed or fail multiple times throughout the duration of a
-single task. Each of these is called a "process run." The `min_duration` is the minimum
-number of seconds the scheduler waits between running the same process.
-
-#### final ####
-
-Processes can be grouped into two classes: ordinary processes and finalizing
-processes. By default, Thermos processes are ordinary. They run as long as
-the Thermos Task is considered healthy (i.e., no failure limits have been
-reached.) But once all regular Thermos processes have either finished or the
-Task has reached a certain failure threshold, it moves into a "finalization"
-stage and then runs all finalizing processes. These are typically processes
-necessary for cleaning up the task, such as log checkpointers, or perhaps
-e-mail notifications that the task has completed.
-
-Finalizing processes may not depend upon ordinary processes or vice-versa, however
-finalizing processes may depend upon other finalizing processes and will otherwise run as
-a typical process schedule.
-
-
-### Task objects ###
-
-Tasks fundamentally consist of a `name` and a list of processes `processes`.
-Processes can be further constrained with `constraints`
-
-<table>
- <tr> <td colspan=2> <b>Task schema</b> </td> </tr>
- <tr> <td> <em>name</em> (required) </td> <td> Task name (String) </td> </tr>
- <tr> <td> <em>processes</em> (required) </td> <td> List of processes (List of Process objects) </td> </tr>
- <tr> <td> <em>constraints</em> </td> <td> Constraints (List of Constraint objects, default []) </td> </tr>
- <tr> <td> <em>resources</em> </td> <td> Resource footprint (Resource, optional) </td> </tr>
- <tr> <td> <em>max_failures</em> </td> <td> Max failures (Integer, default 1) </td> </tr>
- <tr> <td> <em>max_concurrency</em> </td> <td> Max concurrency (Integer, default 0 = unlimited concurrency) </td> </tr>
- <tr> <td> <em>finalization_wait</em> </td> <td> Amount of time allocated to run finalizing processes (Integer in seconds, default 30) </td> </tr>
-</table>
-
-<table>
- <tr> <td colspan=2> <b>Constraint schema</b> </td> </tr>
- <tr> <td> <em>order</em> </td> <td> List of process names that should run in order (List of Strings)</td> </tr>
-</table>
-
-<table>
- <tr> <td colspan=2> <b>Resource schema</b> </td> </tr>
- <tr> <td> <em>cpu</em> (required) </td> <td> Number of cores (Float) </td> </tr>
- <tr> <td> <em>ram</em> (required) </td> <td> Bytes of RAM (Integer) </td> </tr>
- <tr> <td> <em>disk</em> (required) </td> <td> Bytes of disk (Integer) </td> </tr>
-</table>
-
-#### name ####
-
-The name is used to label the task and is used for reporting in the observer UI and for
-management in the thermos CLI.
-
-#### processes ####
-
-Processes is an unordered list of `Process` objects. In order to place temporal constraints upon
-them, you must use `constraints`.
-
-#### constraints ####
-
-A list of `Constraint` objects. Currently only one type of constraint is supported, the `order` constraint.
-`order` is a list of process names that should run in order. For example,
-
-```python
-process = Process(cmdline = "echo hello world")
-task = Task(name = "echoes", processes = [process(name = "first"), process(name = "second")],
- constraints = [Constraint(order = ["first", "second"]))
-```
-
-Constraints can be supplied ad-hoc and in duplicate and not all processes need be constrained, however
-tasks with cycles will be rejected by the Thermos scheduler.
-
-#### resources ####
-
-`resources` is a `Resource` object described by `cpu`, `ram`, and `disk`. It is currently unused by
-Thermos but reserved for future use in constraining the resource consumption of a task.
-
-#### max_failures ####
-
-`max_failures` is the number of failed processes in order for this task to be marked as failed. A single
-process run does not consistute a failure. For example:
-
-```python
-template = Process(max_failures=10)
-task = Task(name = "fail", processes = [template(name = "failing", cmdline = "exit 1"),
- template(name = "succeeding", cmdline = "exit 0")],
- max_failures=2)
-```
-
-The `failing` process would fail 10 times before being marked as permanently
-failed, and the `succeeding` process would succeed on the first run. The
-task would succeed despite only allowing for two failed processes. To be
-more specific, there would be 10 failed process runs yet 1 failed process.
-
-#### max_concurrency ####
-
-For tasks with a number of expensive but otherwise independent processes, it
-may be desirable to limit the amount of concurrency provided by the Thermos
-scheduler rather than artificially constraining them through `order`
-constraints. For example, a test framework may generate a task with 100
-test run processes, but would like to run it on a machine with only 4 cores.
-You can limit the amount of parallelism to 4 by setting `max_concurrency=4`
-in your task configuration.
-
-For example, the following Thermos task spawns 180 processes ("mappers") to compute
-individual elements of a 180 degree sine table, all dependent upon one final process ("reducer")
-to tabulate the results:
-
-```python
-def make_mapper(id):
- return Process(
- name = "mapper%03d" % id,
- cmdline = "echo 'scale=50;s(%d*4*a(1)/180)' | bc -l > temp.sine_table.%03d" % (id, id))
-
-def make_reducer():
- return Process(name = "reducer",
- cmdline = "cat temp.* | nl > sine_table.txt && rm -f temp.*")
-
-processes = map(make_mapper, range(180))
-
-task = Task(
- name = "mapreduce",
- processes = processes + [make_reducer()],
- constraints = [Constraint(order = [mapper.name(), 'reducer']) for mapper in processes],
- max_concurrency = 8)
-
-export(task)
-```
-
-#### finalization_wait ####
-
-Tasks have three active stages: ACTIVE, CLEANING and FINALIZING. The ACTIVE stage is when
-ordinary processes run. This stage will last as long as processes are running and the
-task is healthy. The moment either all processes have finished successfully or the task
-has reached a maximum process failure limit, it will go into CLEANING stage and send SIGTERMs
-to all currently running processes and their process trees. Once all processes have
-terminated, the task goes into FINALIZING stage and invokes the schedule of all processes
-with the "final" bit set.
-
-This whole process from the end of ACTIVE stage to the end of FINALIZING must take place within
-"finalization_wait" seconds. If it does not complete within that time, all remaining
-processes will be sent SIGKILLs (or if they depend upon processes that have not yet completed,
-will never be invoked.)
-
-Client applications with higher priority may be able to force a shorter
-finalization wait (e.g. through parameters to `thermos kill`), so this is
-mostly a best-effort signal.
-
-
-## REPL ##
-
-You can interactively experiment with the Thermos configuration REPL via the
-`src/python/twitter/thermos/config:repl` target:
-
-```python
-$ ./pants py src/python/twitter/thermos/config:repl
-Build operating on target: PythonBinary(src/python/twitter/thermos/config/BUILD:repl)
-Thermos Config REPL
->>> boilerplate = Process(cmdline = "echo hello world")
->>> boilerplate
-Process(cmdline=echo hello world, daemon=0, max_failures=1, ephemeral=0, min_duration=5)
->>> boilerplate.check()
-TypeCheck(FAILED): Process[name] is required.
->>> boilerplate(name = "hello world").check()
-TypeCheck(OK)
-```
-
-## Thermos templating ##
-
-The Thermos DSL is implemented in [pystachio](http://github.com/wickman/pystachio) which means that
-a simple Mustache-like templating layer is available for use when configuring tasks.
-
-### Ordinary templates ###
-
-By using Mustache style templates in your job, you can do allow some amount of runtime configuration
-of your tasks:
-
-```
->>> process = Process(name = "hello", cmdline = "echo hello {{first}}")
->>> process
-Process(cmdline=echo hello {{first}}, daemon=0, name=hello, max_failures=1, ephemeral=0, min_duration=5)
->>> process.check()
-TypeCheck(FAILED): Process[cmdline] failed: Uninterpolated variables: {{first}}
-```
-
-This process leaves `{{first}}` as a free variable. It can be filled elsewhere in the configuration, e.g. via
-`%` or `bind`:
-```
->>> process % {'first': 'brian'}
-Process(cmdline=echo hello brian, daemon=0, name=hello, max_failures=1, ephemeral=0, min_duration=5)
->>> process.bind(first = 'brian')
-Process(cmdline=echo hello brian, daemon=0, name=hello, max_failures=1, ephemeral=0, min_duration=5)
-```
-
-If this is left unbound, the thermos CLI will complain:
-
-```
-$ thermos run thermos/examples/tutorial/unbound.thermos
-twitter.thermos.config.loader.InvalidTask: Task[processes] failed: Element in ProcessList failed check: Process[cmdline] failed: Uninterpolated variables: {{first}}
-```
-
-But free variables can be populated at runtime using the `-E` parameter:
-
-```
-$ thermos run -E first=brian thermos/examples/tutorial/unbound.thermos
-Writing log files to disk in /var/tmp
- INFO] Forking Process(hello)
- INFO] Process(hello) finished successfully [rc=0]
- INFO] Killing task id: unbound-20120530-124903.934384
- INFO] => Current user: wickman
- INFO] Kill complete.
- INFO] Task succeeded.
-```
-
-### Special templates ###
-
-Each Thermos task when run has a special `ThermosContext` template bound to the `thermos` variable.
-Currently this provides three things: `thermos.task_id`, `thermos.user` and `thermos.ports`. The
-`task_id` is the id generated (or supplied at runtime, in the case of the thermos CLI) for the task
-and the `thermos.user` is the real user the task runs as. `thermos.ports` is a mapping of named ports
-supplied by the user and exposed through the user interface. For example, to run (and background)
-the observer on port 1338:
-
-```
-$ thermos simplerun --daemon -P http:1338 'dist/thermos_observe.pex --http_port={{thermos.ports[http]}} --root=$HOME/.thermos'
-```
-
-If you go to http://localhost:1338, this bound port `http` will be exposed
-via the UI on both the main and task pages.
-
-To kill the background daemon:
-
-```
-$ thermos kill simple.*
-```
-
-### Includes ###
-
-It is possible to include other Thermos configurations via the `include` parameter. For example,
-`thermos/examples/tutorial/lib/math.thermos`:
-
-```python
-bc = Process(cmdline = "echo 'scale={{precision}};{{command}}' | bc -l")
-pi = bc(name = "pi").bind(command = "4*a(1)")
-e = bc(name = "e").bind(command = "e(1)")
-```
-
-and `thermos/examples/tutorial/pi.thermos`:
-
-```python
-include('lib/math.thermos')
-
-export(Task(name = "compute_pi", processes = [pi]))
-```
-
-can then be executed with the free `precision` variable:
-
-```shell
-$ thermos run -E precision=500 thermos/examples/tutorial/pi.thermos
-```
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/twitter/thermos/__init__.py
----------------------------------------------------------------------
diff --git a/src/main/python/twitter/thermos/__init__.py b/src/main/python/twitter/thermos/__init__.py
deleted file mode 100644
index de40ea7..0000000
--- a/src/main/python/twitter/thermos/__init__.py
+++ /dev/null
@@ -1 +0,0 @@
-__import__('pkg_resources').declare_namespace(__name__)
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/twitter/thermos/bin/BUILD
----------------------------------------------------------------------
diff --git a/src/main/python/twitter/thermos/bin/BUILD b/src/main/python/twitter/thermos/bin/BUILD
deleted file mode 100644
index 5aa41a2..0000000
--- a/src/main/python/twitter/thermos/bin/BUILD
+++ /dev/null
@@ -1,24 +0,0 @@
-python_binary(
- name = 'thermos_ckpt',
- source = 'thermos_ckpt.py',
- dependencies = [
- pants('aurora/twitterdeps/src/python/twitter/common/app'),
- pants('aurora/twitterdeps/src/python/twitter/common/recordio:recordio-thrift'),
- pants('src/main/python/twitter/thermos/common'),
- pants('src/main/thrift/com/twitter/thermos:py-thrift')
- ]
-)
-
-python_binary(
- name = 'thermos',
- source = 'thermos.py',
- dependencies = [
- pants('aurora/twitterdeps/src/python/twitter/common/app'),
- pants('aurora/twitterdeps/src/python/twitter/common/log'),
- pants('src/main/python/twitter/thermos/common'),
- pants('src/main/python/twitter/thermos/config:schema'),
- pants('src/main/python/twitter/thermos/core'),
- pants('src/main/python/twitter/thermos/monitoring'),
- pants('src/main/thrift/com/twitter/thermos:py-thrift'),
- ],
-)
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/twitter/thermos/bin/__init__.py
----------------------------------------------------------------------
diff --git a/src/main/python/twitter/thermos/bin/__init__.py b/src/main/python/twitter/thermos/bin/__init__.py
deleted file mode 100644
index e69de29..0000000
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/twitter/thermos/bin/thermos.py
----------------------------------------------------------------------
diff --git a/src/main/python/twitter/thermos/bin/thermos.py b/src/main/python/twitter/thermos/bin/thermos.py
deleted file mode 100644
index 5e695b4..0000000
--- a/src/main/python/twitter/thermos/bin/thermos.py
+++ /dev/null
@@ -1,653 +0,0 @@
-# TODO(wickman) This needs some usage/help refactoring.
-
-from __future__ import print_function
-
-from collections import namedtuple
-import getpass
-import os
-import pprint
-import pwd
-import re
-import sys
-import time
-
-from twitter.common import app, log
-from twitter.common.log.options import LogOptions
-from twitter.common.dirutil import du, tail_f
-from twitter.common.dirutil.tail import tail as tail_closed
-from twitter.common.quantity import Amount, Time, Data
-from twitter.common.quantity.parse_simple import parse_time, parse_data
-from twitter.common.recordio import RecordIO, ThriftRecordReader
-from twitter.thermos.common.path import TkPath
-from twitter.thermos.common.ckpt import CheckpointDispatcher
-from twitter.thermos.common.options import add_port_to, add_binding_to
-from twitter.thermos.config.loader import ThermosConfigLoader, ThermosTaskWrapper
-from twitter.thermos.config.schema import (
- Process,
- Resources,
- Task)
-from twitter.thermos.core.helper import TaskRunnerHelper
-from twitter.thermos.core.runner import TaskRunner
-from twitter.thermos.monitoring.detector import TaskDetector
-from twitter.thermos.monitoring.garbage import TaskGarbageCollector, DefaultCollector
-from twitter.thermos.monitoring.monitor import TaskMonitor
-
-from gen.twitter.thermos.ttypes import (
- ProcessState,
- RunnerCkpt,
- RunnerState,
- TaskState)
-
-from pystachio import Ref
-from pystachio.naming import frozendict
-
-app.add_option("--root", dest="root", metavar="PATH",
- default=TaskPath.DEFAULT_CHECKPOINT_ROOT,
- help="the thermos config root")
-
-
-def set_keep(option, opt_str, value, parser):
- setattr(parser.values, option.dest, opt_str.startswith('--keep'))
-
-
-def get_task_from_options(args, opts, **kw):
- loader = ThermosConfigLoader.load_json if opts.json else ThermosConfigLoader.load
-
- if len(args) != 1:
- app.error('Should specify precisely one config, instead got: %s' % args)
-
- tasks = loader(args[0], bindings=opts.bindings, **kw)
-
- task_list = list(tasks.tasks())
- if len(task_list) == 0:
- app.error("No tasks specified!")
-
- if opts.task is None and len(task_list) > 1:
- app.error("Multiple tasks in config but no task name specified!")
-
- task = None
- if opts.task is not None:
- for t in task_list:
- if t.task().name().get() == opts.task:
- task = t
- break
- if task is None:
- app.error("Could not find task %s!" % opts.task)
- else:
- task = task_list[0]
-
- if kw.get('strict', False):
- if not task.task.check().ok():
- app.error(task.task.check().message())
-
- return task
-
-
-def daemonize():
- def daemon_fork():
- try:
- if os.fork() > 0:
- os._exit(0)
- except OSError as e:
- sys.stderr.write('[pid:%s] Failed to fork: %s\n' % (os.getpid(), e))
- sys.exit(1)
- daemon_fork()
- os.setsid()
- daemon_fork()
- sys.stdin, sys.stdout, sys.stderr = (open('/dev/null', 'r'),
- open('/dev/null', 'a+'),
- open('/dev/null', 'a+', 0))
-
-
-def tasks_from_re(expressions, root, state=None):
- task_ids = [t_id for _, t_id in TaskDetector(root=root).get_task_ids(state=state)]
- matched_tasks = set()
- for task_expr in map(re.compile, expressions):
- for task_id in task_ids:
- if task_expr.match(task_id):
- matched_tasks.add(task_id)
- return matched_tasks
-
-
-def _really_run(task, root, sandbox, task_id=None, user=None, prebound_ports=None, chroot=None,
- daemon=False):
- prebound_ports = prebound_ports or {}
- missing_ports = set(task.ports()) - set(prebound_ports.keys())
- if missing_ports:
- app.error('ERROR! Unbound ports: %s' % ' '.join(port for port in missing_ports))
- task_runner = TaskRunner(task.task, root, sandbox, task_id=task_id,
- user=user, portmap=prebound_ports, chroot=chroot)
- if daemon:
- print('Daemonizing and starting runner.')
- try:
- log.teardown_stderr_logging()
- daemonize()
- except Exception as e:
- print("Failed to daemonize: %s" % e)
- sys.exit(1)
- try:
- task_runner.run()
- except KeyboardInterrupt:
- print('Got keyboard interrupt, killing job!')
- task_runner.close_ckpt()
- task_runner.kill()
-
-
-@app.command
-@app.command_option("--user", metavar="USER", default=getpass.getuser(), dest='user',
- help="run as this user. if not $USER, must have setuid privilege.")
-@app.command_option("--enable_chroot", dest="chroot", default=False, action='store_true',
- help="chroot tasks to the sandbox before executing them, requires "
- "root privileges.")
-@app.command_option("--task", metavar="TASKNAME", default=None, dest='task',
- help="The thermos task within the config that should be run. Only required if "
- "there are multiple tasks exported from the thermos configuration.")
-@app.command_option("--task_id", metavar="STRING", default=None, dest='task_id',
- help="The id to which this task should be bound, synthesized from the task "
- "name if none provided.")
-@app.command_option("--json", default=False, action='store_true', dest='json',
- help="Read the source file in json format.")
-@app.command_option("--sandbox", metavar="PATH", default="/var/lib/thermos/sandbox", dest='sandbox',
- help="The sandbox in which to run the task.")
-@app.command_option("-P", "--port", type="string", nargs=1, action="callback",
- callback=add_port_to('prebound_ports'), dest="prebound_ports", default=[],
- metavar="NAME:PORT", help="bind named PORT to NAME.")
-@app.command_option("-E", "--environment", type="string", nargs=1, action="callback",
- callback=add_binding_to('bindings'), default=[], dest="bindings",
- metavar="NAME=VALUE",
- help="bind the configuration environment variable NAME to VALUE.")
-@app.command_option("--daemon", default=False, action='store_true', dest='daemon',
- help="fork and daemonize the thermos runner.")
-def run(args, options):
-
- """Run a thermos task.
-
- Usage: thermos run [options] config
- Options:
- --user=USER run as this user. if not $USER, must have setuid privilege.
- --enable_chroot chroot into the sandbox for this task, requires superuser
- privilege
- --task=TASKNAME the thermos task within the config that should be run. only
- required if there are multiple tasks exported from the thermos
- configuration.
- --task_id=STRING the id to which this task should be bound, synthesized from the
- task name if none provided.
- --json specify that the config is in json format instead of pystachio
- --sandbox=PATH the sandbox in which to run the task
- [default: /var/lib/thermos/sandbox]
- -P/--port=NAME:PORT bind the named port NAME to port number PORT (may be specified
- multiple times to bind multiple names.)
- -E/--environment=NAME=VALUE bind the configuration environment variable NAME to
- VALUE.
- --daemon Fork and daemonize the task.
- """
- thermos_task = get_task_from_options(args, options)
- _really_run(thermos_task,
- options.root,
- options.sandbox,
- task_id=options.task_id,
- user=options.user,
- prebound_ports=options.prebound_ports,
- chroot=options.chroot,
- daemon=options.daemon)
-
-
-def inspect_unwrap(obj):
- if isinstance(obj, frozendict):
- return dict((key, inspect_unwrap(val)) for (key, val) in obj.items())
- if isinstance(obj, (list, tuple, set)):
- return tuple(inspect_unwrap(val) for val in obj)
- return obj
-
-
-@app.command
-@app.command_option("--task", metavar="TASKNAME", default=None, dest='task',
- help="The thermos task within the config that should be inspected. Only "
- "required if there are multiple tasks exported from the thermos "
- "configuration.")
-@app.command_option("--json", default=False, action='store_true', dest='json',
- help="Read the source file in json format instead of pystachio.")
-@app.command_option("-P", "--port", type="string", nargs=1, action="callback",
- callback=add_port_to('prebound_ports'), dest="prebound_ports", default=[],
- metavar="NAME:PORT", help="bind named PORT to NAME.")
-@app.command_option("-E", "--environment", type="string", nargs=1, action="callback",
- callback=add_binding_to('bindings'), default=[], dest="bindings",
- metavar="NAME=VALUE",
- help="bind the configuration environment variable NAME to VALUE.")
-def inspect(args, options):
- """Inspect a thermos config and display the evaluated task
-
- Usage: thermos inspect [options] config
- Options:
- --task=TASKNAME the thermos task within the config that should be inspected. Only
- required if there are multiple tasks exported from the thermos
- configuration.
- --json specify that the config is in json format instead of pystachio
- -P/--port=NAME:PORT bind the named port NAME to port number PORT (may be specified
- multiple times to bind multiple names.)
- -E/--environment=NAME=VALUE bind the configuration environment variable NAME to
- VALUE.
- """
- thermos_task = get_task_from_options(args, options)
- ti, _ = thermos_task.task().interpolate()
- pprint.pprint(inspect_unwrap(ti.get()), indent=4)
-
-
-@app.command
-@app.command_option("--user", metavar="USER", default=getpass.getuser(), dest='user',
- help="run as this user. if not $USER, must have setuid privilege.")
-@app.command_option("--name", metavar="STRING", default='simple', dest='name',
- help="The name to give this task.")
-@app.command_option("--task_id", metavar="STRING", default=None, dest='task_id',
- help="The id to which this task should be bound, synthesized from the task "
- "name if none provided.")
-@app.command_option("-P", "--port", type="string", nargs=1, action="callback",
- callback=add_port_to('prebound_ports'), dest="prebound_ports", default=[],
- metavar="NAME:PORT", help="bind named PORT to NAME.")
-@app.command_option("-E", "--environment", type="string", nargs=1, action="callback",
- callback=add_binding_to('bindings'), default=[], dest="bindings",
- metavar="NAME=VALUE",
- help="bind the configuration environment variable NAME to VALUE.")
-@app.command_option("--daemon", default=False, action='store_true', dest='daemon',
- help="fork and daemonize the thermos runner.")
-def simplerun(args, options):
- """Run a simple command line as a thermos task.
-
- Usage: thermos simplerun [options] [--] commandline
- Options:
- --user=USER run as this user. if not $USER, must have setuid privilege.
- --name=STRING the name to give this task. ('simple' by default)
- --task_id=STRING the id to which this task should be bound, synthesized from the
- task name if none provided.
- -P/--port=NAME:PORT bind the named port NAME to port number PORT (may be specified
- multiple times to bind multiple names.)
- -E/--environment=NAME=VALUE bind the configuration environment variable NAME to
- VALUE.
- --daemon Fork and daemonize the task.
- """
- try:
- cutoff = args.index('--')
- cmdline = ' '.join(args[cutoff+1:])
- except ValueError:
- cmdline = ' '.join(args)
-
- print("Running command: '%s'" % cmdline)
-
- thermos_task = ThermosTaskWrapper(Task(
- name = options.name,
- resources = Resources(cpu = 1.0, ram = 256 * 1024 * 1024, disk = 0),
- processes = [Process(name = options.name, cmdline = cmdline)]))
-
- _really_run(thermos_task,
- options.root,
- None,
- task_id=options.task_id,
- user=options.user,
- prebound_ports=options.prebound_ports,
- chroot=False,
- daemon=options.daemon)
-
-
-@app.command
-@app.command_option("--simple", default=False, dest='simple', action='store_true',
- help="Only print the checkpoint records, do not replay them.")
-def read(args, options):
- """Replay a thermos checkpoint.
-
- Usage: thermos read [options] checkpoint_filename
- Options:
- --simple Do not replay the full task state machine. Only print out the contents of
- each checkpoint log message.
- """
- if len(args) != 1:
- app.error('Expected one checkpoint file, got %s' % len(args))
- if not os.path.exists(args[0]):
- app.error('Could not find %s' % args[0])
-
- dispatcher = CheckpointDispatcher()
- state = RunnerState(processes={})
- with open(args[0], 'r') as fp:
- try:
- for record in ThriftRecordReader(fp, RunnerCkpt):
- if not options.simple:
- dispatcher.dispatch(state, record)
- else:
- print('CKPT: %s' % record)
- except RecordIO.Error as err:
- print("Failed to recover from %s: %s" % (fp.name, err))
- return
-
- if not options.simple:
- if state is None or state.header is None:
- print('Checkpoint stream CORRUPT or outdated format')
- return
- print('Recovered Task Header:')
- print(' id: %s' % state.header.task_id)
- print(' user: %s' % state.header.user)
- print(' host: %s' % state.header.hostname)
- print(' sandbox: %s' % state.header.sandbox)
- if state.header.ports:
- print(' ports: %s' % ' '.join(
- '%s->%s' % (name, port) for (name, port) in state.header.ports.items()))
- print('Recovered Task States:')
- for task_status in state.statuses:
- print(' %s [pid: %d] => %s' % (
- time.asctime(time.localtime(task_status.timestamp_ms/1000.0)),
- task_status.runner_pid,
- TaskState._VALUES_TO_NAMES[task_status.state]))
- print('Recovered Processes:')
- for process, process_history in state.processes.items():
- print(' %s runs: %s' % (process, len(process_history)))
- for k in reversed(range(len(process_history))):
- run = process_history[k]
- print(' %2d: pid=%d, rc=%s, finish:%s, state:%s' % (
- k,
- run.pid,
- run.return_code if run.return_code is not None else '',
- time.asctime(time.localtime(run.stop_time)) if run.stop_time else 'None',
- ProcessState._VALUES_TO_NAMES.get(run.state, 'Unknown')))
-
-
-@app.command
-def kill(args, options):
- """Kill task(s)
-
- Usage: thermos kill task_id1 [task_id2 ...]
-
- Regular expressions may be used to match multiple tasks.
- """
- if not args:
- print('Must specify tasks!', file=sys.stderr)
- return
-
- matched_tasks = tasks_from_re(args, options.root, state='active')
-
- if not matched_tasks:
- print('No active tasks matched.')
- return
-
- for task_id in matched_tasks:
- print('Killing %s...' % task_id, end='')
- TaskRunnerHelper.kill(task_id, options.root, force=True)
- print('done.')
-
-
-@app.command
-@app.command_option("--max_age", metavar="AGE", default=None, dest='max_age',
- help="Max age in human readable form, e.g. 2d5h or 7200s")
-@app.command_option("--max_tasks", metavar="NUM", default=None, dest='max_tasks',
- help="Max number of tasks to keep.")
-@app.command_option("--max_space", metavar="SPACE", default=None, dest='max_space',
- help="Max space to allow for tasks, e.g. 20G.")
-@app.command_option("--keep-data", "--delete-data",
- metavar="PATH", default=True,
- action='callback', callback=set_keep, dest='keep_data',
- help="Keep data.")
-@app.command_option("--keep-logs", "--delete-logs",
- metavar="PATH", default=True,
- action='callback', callback=set_keep, dest='keep_logs',
- help="Keep logs.")
-@app.command_option("--keep-metadata", "--delete-metadata",
- metavar="PATH", default=True,
- action='callback', callback=set_keep, dest='keep_metadata',
- help="Keep metadata.")
-@app.command_option("--force", default=False, action='store_true', dest='force',
- help="Perform garbage collection without confirmation")
-@app.command_option("--dryrun", default=False, action='store_true', dest='dryrun',
- help="Don't actually run garbage collection.")
-def gc(args, options):
- """Garbage collect task(s) and task metadata.
-
- Usage: thermos gc [options] [task_id1 task_id2 ...]
-
- If tasks specified, restrict garbage collection to only those tasks,
- otherwise all tasks are considered. The optional constraints are still
- honored.
-
- Options:
- --max_age=AGE Max age in quasi-human readable form, e.g. --max_age=2d5h,
- format *d*h*m*s [default: skip]
- --max_tasks=NUM Max number of tasks to keep [default: skip]
- --max_space=SPACE Max space to allow for tasks [default: skip]
- --[keep/delete-]metadata Garbage collect metadata [default: keep]
- --[keep/delete-]logs Garbage collect logs [default: keep]
- --[keep/delete-]data Garbage collect data [default: keep]
- WARNING: Do NOT do this if your sandbox is $HOME.
- --force Perform garbage collection without confirmation [default: false]
- --dryrun Don't actually run garbage collection [default: false]
- """
- print('Analyzing root at %s' % options.root)
- gc_options = {}
- if options.max_age is not None:
- gc_options['max_age'] = parse_time(options.max_age)
- if options.max_space is not None:
- gc_options['max_space'] = parse_data(options.max_space)
- if options.max_tasks is not None:
- gc_options['max_tasks'] = int(options.max_tasks)
- gc_options.update(include_data = not options.keep_data,
- include_metadata = not options.keep_metadata,
- include_logs = not options.keep_logs,
- verbose = True, logger = print)
- tgc = TaskGarbageCollector(root=options.root)
-
- if args:
- gc_tasks = tasks_from_re(args, options.root, state='finished')
- else:
- print('No task ids specified, using default collector.')
- gc_tasks = [task.task_id for task in DefaultCollector(tgc, **gc_options).run()]
-
- if not gc_tasks:
- print('No tasks to garbage collect. Exiting')
- return
-
- def maybe(function, *args):
- if options.dryrun:
- print(' would run %s%r' % (function.__name__, args))
- else:
- function(*args)
-
- value = 'y'
- if not options.force:
- value = raw_input("Continue [y/N]? ") or 'N'
- if value.lower() == 'y':
- print('Running gc...')
- tgc = TaskGarbageCollector(root=options.root)
- for task in gc_tasks:
- print(' Task %s ' % task, end='')
- print('data (%s) ' % ('keeping' if options.keep_data else 'deleting'), end='')
- print('logs (%s) ' % ('keeping' if options.keep_logs else 'deleting'), end='')
- print('metadata (%s) ' % ('keeping' if options.keep_metadata else 'deleting'))
- if not options.keep_data:
- maybe(tgc.erase_data, task)
- if not options.keep_logs:
- maybe(tgc.erase_logs, task)
- if not options.keep_metadata:
- maybe(tgc.erase_metadata, task)
- print('done.')
- else:
- print('Cancelling gc.')
-
-
-@app.command
-@app.command_option("--verbosity", default=0, dest='verbose', type='int',
- help="Display more verbosity")
-@app.command_option("--only", default=None, dest='only', type='choice',
- choices=('active', 'finished'), help="Display only tasks of this type.")
-def status(args, options):
- """Get the status of task(s).
-
- Usage: thermos status [options] [task_name(s) or task_regexp(s)]
-
- Options:
- --verbosity=LEVEL Verbosity level for logging. [default: 0]
- --only=TYPE Only print tasks of TYPE (options: active finished)
- """
- detector = TaskDetector(root=options.root)
-
- def format_task(task_id):
- checkpoint_filename = detector.get_checkpoint(task_id)
- checkpoint_stat = os.stat(checkpoint_filename)
- try:
- checkpoint_owner = pwd.getpwuid(checkpoint_stat.st_uid).pw_name
- except:
- checkpoint_owner = 'uid:%s' % checkpoint_stat.st_uid
- print(' %-20s [owner: %8s]' % (task_id, checkpoint_owner), end='')
- if options.verbose == 0:
- print()
- if options.verbose > 0:
- state = CheckpointDispatcher.from_file(checkpoint_filename)
- if state is None or state.header is None:
- print(' - checkpoint stream CORRUPT or outdated format')
- return
- print(' state: %8s' % TaskState._VALUES_TO_NAMES.get(state.statuses[-1].state, 'Unknown'),
- end='')
- print(' start: %25s' % time.asctime(time.localtime(state.header.launch_time_ms/1000.0)))
- if options.verbose > 1:
- print(' user: %s' % state.header.user, end='')
- if state.header.ports:
- print(' ports: %s' % ' '.join('%s -> %s' % (key, val)
- for key, val in state.header.ports.items()))
- else:
- print(' ports: None')
- print(' sandbox: %s' % state.header.sandbox)
- if options.verbose > 2:
- print(' process table:')
- for process, process_history in state.processes.items():
- print(' - %s runs: %s' % (process, len(process_history)), end='')
- last_run = process_history[-1]
- print(' last: pid=%s, rc=%s, finish:%s, state:%s' % (
- last_run.pid or 'None',
- last_run.return_code if last_run.return_code is not None else '',
- time.asctime(time.localtime(last_run.stop_time)) if last_run.stop_time else 'None',
- ProcessState._VALUES_TO_NAMES.get(last_run.state, 'Unknown')))
- print()
-
- matchers = map(re.compile, args or ['.*'])
- active = [t_id for _, t_id in detector.get_task_ids(state='active')
- if any(pattern.match(t_id) for pattern in matchers)]
- finished = [t_id for _, t_id in detector.get_task_ids(state='finished')
- if any(pattern.match(t_id) for pattern in matchers)]
-
- found = False
- if options.only is None or options.only == 'active':
- if active:
- print('Active tasks:')
- found = True
- for task_id in active:
- format_task(task_id)
- print()
-
- if options.only is None or options.only == 'finished':
- if finished:
- print('Finished tasks:')
- found = True
- for task_id in finished:
- format_task(task_id)
- print()
-
- if not found:
- print('No tasks found in root [%s]' % options.root)
- sys.exit(1)
-
-
-
-@app.command
-@app.command_option("--stderr", default=False, dest='use_stderr', action='store_true',
- help="Tail stderr instead of stdout")
-def tail(args, options):
- """Tail the logs of a task process.
-
- Usage: thermos tail task_name [process_name]
- """
- if len(args) == 0:
- app.error('Expected a task to tail, got nothing!')
- if len(args) not in (1, 2):
- app.error('Expected at most two arguments (task and optional process), got %d' % len(args))
-
- task_id = args[0]
- detector = TaskDetector(root=options.root)
- checkpoint = CheckpointDispatcher.from_file(detector.get_checkpoint(task_id))
- log_dir = checkpoint.header.log_dir
- process_runs = [(process, run) for (process, run) in detector.get_process_runs(task_id, log_dir)]
- if len(args) == 2:
- process_runs = [(process, run) for (process, run) in process_runs if process == args[1]]
-
- if len(process_runs) == 0:
- print('ERROR: No processes found.', file=sys.stderr)
- sys.exit(1)
-
- processes = set([process for process, _ in process_runs])
- if len(processes) != 1:
- print('ERROR: More than one process matches query.', file=sys.stderr)
- sys.exit(1)
-
- process = processes.pop()
- run = max([run for _, run in process_runs])
-
- logdir = TaskPath(root=options.root, task_id=args[0], process=process,
- run=run, log_dir=log_dir).getpath('process_logdir')
- logfile = os.path.join(logdir, 'stderr' if options.use_stderr else 'stdout')
-
- monitor = TaskMonitor(TaskPath(root = options.root), args[0])
- def log_is_active():
- active_processes = monitor.get_active_processes()
- for process_status, process_run in active_processes:
- if process_status.process == process and process_run == run:
- return True
- return False
-
- if not log_is_active():
- print('Tail of terminal log %s' % logfile)
- for line in tail_closed(logfile):
- print(line.rstrip())
- return
-
- now = time.time()
- next_check = now + 5.0
- print('Tail of active log %s' % logfile)
- for line in tail_f(logfile, include_last=True, forever=False):
- print(line.rstrip())
- if time.time() > next_check:
- if not log_is_active():
- break
- else:
- next_check = time.time() + 5.0
-
-
-@app.command
-def help(args, options):
- """Get help about a specific command.
- """
- if len(args) == 0:
- app.help()
- for (command, doc) in app.get_commands_and_docstrings():
- if args[0] == command:
- print('command %s:' % command)
- print(doc)
- app.quit(0)
- print('unknown command: %s' % args[0], file=sys.stderr)
-
-
-
-
-def generate_usage():
- usage = """
-thermos
-
-commands:
-"""
-
- for (command, doc) in app.get_commands_and_docstrings():
- usage += ' ' + '%-10s' % command + '\t' + doc.split('\n')[0].strip() + '\n'
- app.set_usage(usage)
-
-
-LogOptions.set_disk_log_level('NONE')
-LogOptions.set_stdout_log_level('INFO')
-generate_usage()
-
-
-proxy_main = app.main
-
-
-proxy_main()
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/twitter/thermos/bin/thermos_ckpt.py
----------------------------------------------------------------------
diff --git a/src/main/python/twitter/thermos/bin/thermos_ckpt.py b/src/main/python/twitter/thermos/bin/thermos_ckpt.py
deleted file mode 100755
index 36fd419..0000000
--- a/src/main/python/twitter/thermos/bin/thermos_ckpt.py
+++ /dev/null
@@ -1,55 +0,0 @@
-import os
-import sys
-import pprint
-import time
-
-from gen.twitter.thermos.ttypes import RunnerState, RunnerCkpt, TaskState
-
-from twitter.common import app
-from twitter.common.recordio import RecordIO, ThriftRecordReader
-from twitter.thermos.common.ckpt import CheckpointDispatcher
-
-app.add_option("--checkpoint", dest = "ckpt", metavar = "CKPT",
- help = "read checkpoint from CKPT")
-app.add_option("--assemble", dest = "assemble", metavar = "CKPT", default=True,
- help = "read checkpoint from CKPT")
-
-def main(args):
- values = app.get_options()
-
- if len(args) > 0:
- print >> sys.stderr, "ERROR: unrecognized arguments: %s\n" % (" ".join(args))
- app.help()
- sys.exit(1)
-
- if not values.ckpt:
- print >> sys.stderr, "ERROR: must supply --checkpoint"
- app.help()
- sys.exit(1)
-
- fp = file(values.ckpt, "r")
- rr = ThriftRecordReader(fp, RunnerCkpt)
- wrs = RunnerState(processes = {})
- dispatcher = CheckpointDispatcher()
- try:
- for wts in rr:
- print 'Recovering: ', wts
- if values.assemble is True:
- dispatcher.dispatch(wrs, wts)
- except RecordIO.Error as err:
- print 'Error recovering checkpoint stream: %s' % err
- return
- print '\n\n\n'
- if values.assemble:
- print 'Recovered Task Header'
- pprint.pprint(wrs.header, indent=4)
-
- print '\nRecovered Task States'
- for task_status in wrs.statuses:
- print ' %s [pid: %d] => %s' % (time.asctime(time.localtime(task_status.timestamp_ms/1000.0)),
- task_status.runner_pid, TaskState._VALUES_TO_NAMES[task_status.state])
-
- print '\nRecovered Processes'
- pprint.pprint(wrs.processes, indent=4)
-
-app.main()
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/twitter/thermos/common/BUILD
----------------------------------------------------------------------
diff --git a/src/main/python/twitter/thermos/common/BUILD b/src/main/python/twitter/thermos/common/BUILD
deleted file mode 100644
index d97c771..0000000
--- a/src/main/python/twitter/thermos/common/BUILD
+++ /dev/null
@@ -1,45 +0,0 @@
-import os
-
-python_library(
- name = 'ckpt',
- sources = ['ckpt.py'],
- dependencies = [
- pants('aurora/twitterdeps/src/python/twitter/common/log'),
- pants('aurora/twitterdeps/src/python/twitter/common/recordio:recordio-thrift'),
- pants('src/main/thrift/com/twitter/thermos:py-thrift')
- ]
-)
-
-python_library(
- name = 'path',
- sources = ['path.py'],
-)
-
-python_library(
- name = 'planner',
- sources = ['planner.py'],
-)
-
-python_library(
- name = 'options',
- sources = ['options.py'],
- dependencies = [
- pants('src/main/python/twitter/thermos:pystachio'),
- pants('src/main/thrift/com/twitter/thermos:py-thrift'),
- ]
-)
-
-python_library(
- name = 'common',
- dependencies = [
- pants(':ckpt'),
- pants(':options'),
- pants(':path'),
- pants(':planner'),
- ],
- provides = setup_py(
- name = 'twitter.thermos.common',
- version = open(os.path.join(get_buildroot(), '.auroraversion')).read().strip().lower(),
- description = 'Thermos common libraries.',
- )
-)
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/twitter/thermos/common/__init__.py
----------------------------------------------------------------------
diff --git a/src/main/python/twitter/thermos/common/__init__.py b/src/main/python/twitter/thermos/common/__init__.py
deleted file mode 100644
index e69de29..0000000
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/twitter/thermos/common/ckpt.py
----------------------------------------------------------------------
diff --git a/src/main/python/twitter/thermos/common/ckpt.py b/src/main/python/twitter/thermos/common/ckpt.py
deleted file mode 100644
index 9c5a949..0000000
--- a/src/main/python/twitter/thermos/common/ckpt.py
+++ /dev/null
@@ -1,375 +0,0 @@
-"""Read checkpoint streams for the Thermos runner, and dispatch events on state transitions
-
-This module contains the CheckpointDispatcher, which reconstructs checkpoint streams containing the
-state of the Thermos runner and its constituent processes.
-
-It also defines several Handler interfaces to define behaviour on transitions in the Process and
-Task state machines.
-
-"""
-
-import os
-
-from twitter.common import log
-from twitter.common.recordio import RecordIO, ThriftRecordReader
-from gen.twitter.thermos.ttypes import (
- ProcessState,
- ProcessStatus,
- RunnerCkpt,
- RunnerState,
- TaskState,
-)
-
-
-class UniversalStateHandler(object):
- """
- Generic interface for a handler to be called on any process/state transition, and at task
- initialization
- """
- def on_process_transition(self, state, process_update):
- pass
-
- def on_task_transition(self, state, task_update):
- pass
-
- def on_initialization(self, header):
- pass
-
-
-class ProcessStateHandler(object):
- """
- Interface for handlers for the Process state machine, called on process transitions
-
- () - starting state, [] - terminal state
-
- [FAILED]
- ^
- |
- (WAITING) ----> FORKED ----> RUNNING -----> [KILLED]
- | | |
- v | `---> [SUCCESS]
- [LOST] <------'
- """
- def on_waiting(self, process_update):
- pass
-
- def on_forked(self, process_update):
- pass
-
- def on_running(self, process_update):
- pass
-
- def on_success(self, process_update):
- pass
-
- def on_failed(self, process_update):
- pass
-
- def on_lost(self, process_update):
- pass
-
- def on_killed(self, process_update):
- pass
-
-
-class TaskStateHandler(object):
- """
- Interface for handlers for the Task state machine, called on task transitions
-
- () - starting state, [] - terminal state
-
- .--------------------------------------------+----.
- | | |
- | .----------> [SUCCESS] | |
- | | | |
- | | .--------> [FAILED] | |
- | | | | |
- (ACTIVE) FINALIZING ---> [KILLED] <---' |
- | ^ | .------^ |
- | | | | |
- `---> CLEANING ---' `----)--> [LOST] <---------'
- | | | ^
- | `----------------' |
- `-------------------------'
-
- ACTIVE -> KILLED/LOST only happens under garbage collection situations.
- Ordinary task preemption/kill still goes through CLEANING/FINALIZING before
- reaching a terminal state.
- """
-
- def on_active(self, task_update):
- pass
-
- def on_cleaning(self, task_update):
- pass
-
- def on_finalizing(self, task_update):
- pass
-
- def on_success(self, task_update):
- pass
-
- def on_failed(self, task_update):
- pass
-
- def on_killed(self, task_update):
- pass
-
- def on_lost(self, task_update):
- pass
-
-
-def assert_nonempty(state, fields):
- for field in fields:
- assert getattr(state, field, None) is not None, "Missing field %s from %s!" % (field, state)
-
-
-def copy_fields(state, state_update, fields):
- assert_nonempty(state_update, fields)
- for field in fields:
- setattr(state, field, getattr(state_update, field))
-
-
-class CheckpointDispatcher(object):
- """
- The reconstruction/dispatching mechanism for logic triggered on task/process state transitions.
-
- Most applications should build an event-loop around the CheckpointDispatcher.
- """
-
- class Error(Exception): pass
- class ErrorRecoveringState(Error): pass
- class InvalidSequenceNumber(Error): pass
- class InvalidHandler(Error): pass
-
- @classmethod
- def iter_updates(cls, filename):
- try:
- with open(filename) as fp:
- rr = ThriftRecordReader(fp, RunnerCkpt)
- for update in rr:
- yield update
- except (IOError, OSError, RecordIO.Error) as err:
- raise cls.ErrorRecoveringState(err)
-
- @classmethod
- def iter_statuses(cls, filename):
- for update in cls.iter_updates(filename):
- if update.task_status:
- yield update.task_status
-
- @classmethod
- def from_file(cls, filename, truncate=False):
- """Reconstruct a RunnerState from a checkpoint stream contained in a file
-
- Returns a hydrated RunnerState, or None on any failures.
- """
- state = RunnerState(processes={})
- builder = cls()
- try:
- for update in cls.iter_updates(filename):
- builder.dispatch(state, update, truncate=truncate)
- return state
- except cls.Error as e:
- log.error('Failed to recover from %s: %s' % (filename, e))
-
- def __init__(self):
- self._task_handlers = []
- self._process_handlers = []
- self._universal_handlers = []
-
- def register_handler(self, handler):
- HANDLER_MAP = {
- TaskStateHandler: self._task_handlers,
- ProcessStateHandler: self._process_handlers,
- UniversalStateHandler: self._universal_handlers
- }
-
- for handler_type, handler_list in HANDLER_MAP.items():
- if isinstance(handler, handler_type):
- handler_list.append(handler)
- break
- else:
- raise self.InvalidHandler("Unknown handler type %s" % type(handler))
-
- def _run_process_dispatch(self, state, process_update):
- for handler in self._universal_handlers:
- handler.on_process_transition(state, process_update)
- for handler in self._process_handlers:
- handler_function = 'on_' + ProcessState._VALUES_TO_NAMES[state].lower()
- getattr(handler, handler_function)(process_update)
-
- def _run_task_dispatch(self, state, task_update):
- for handler in self._universal_handlers:
- handler.on_task_transition(state, task_update)
- for handler in self._task_handlers:
- handler_function = 'on_' + TaskState._VALUES_TO_NAMES[state].lower()
- getattr(handler, handler_function)(task_update)
-
- def _run_header_dispatch(self, header):
- for handler in self._universal_handlers:
- handler.on_initialization(header)
-
- @staticmethod
- def is_terminal(process_state_update):
- TERMINAL_STATES = [
- ProcessState.SUCCESS,
- ProcessState.FAILED,
- ProcessState.KILLED,
- ProcessState.LOST]
- return process_state_update.state in TERMINAL_STATES
-
- @classmethod
- def _update_process_state(cls, process_state, process_state_update):
- """
- Apply process_state_update against process_state.
- Raises ErrorRecoveringState on failure.
- """
- def assert_process_state_in(*expected_states):
- if process_state.state not in expected_states:
- raise cls.ErrorRecoveringState(
- 'Detected invalid state transition %s => %s' % (
- ProcessState._VALUES_TO_NAMES.get(process_state.state),
- ProcessState._VALUES_TO_NAMES.get(process_state_update.state)))
-
- # CREATION => WAITING
- if process_state_update.state == ProcessState.WAITING:
- assert_process_state_in(None)
- required_fields = ['seq', 'state', 'process']
- copy_fields(process_state, process_state_update, required_fields)
-
- # WAITING => FORKED
- elif process_state_update.state == ProcessState.FORKED:
- assert_process_state_in(ProcessState.WAITING)
- required_fields = ['seq', 'state', 'fork_time', 'coordinator_pid']
- copy_fields(process_state, process_state_update, required_fields)
-
- # FORKED => RUNNING
- elif process_state_update.state == ProcessState.RUNNING:
- assert_process_state_in(ProcessState.FORKED)
- required_fields = ['seq', 'state', 'start_time', 'pid']
- copy_fields(process_state, process_state_update, required_fields)
-
- # RUNNING => SUCCESS
- elif process_state_update.state == ProcessState.SUCCESS:
- assert_process_state_in(ProcessState.RUNNING)
- required_fields = ['seq', 'state', 'stop_time', 'return_code']
- copy_fields(process_state, process_state_update, required_fields)
-
- # RUNNING => FAILED
- elif process_state_update.state == ProcessState.FAILED:
- assert_process_state_in(ProcessState.RUNNING)
- required_fields = ['seq', 'state', 'stop_time', 'return_code']
- copy_fields(process_state, process_state_update, required_fields)
-
- # {FORKED, RUNNING} => KILLED
- elif process_state_update.state == ProcessState.KILLED:
- assert_process_state_in(ProcessState.FORKED, ProcessState.RUNNING)
- required_fields = ['seq', 'state', 'stop_time', 'return_code']
- copy_fields(process_state, process_state_update, required_fields)
-
- # {FORKED, RUNNING} => LOST
- elif process_state_update.state == ProcessState.LOST:
- assert_process_state_in(ProcessState.FORKED, ProcessState.RUNNING)
- required_fields = ['seq', 'state']
- copy_fields(process_state, process_state_update, required_fields)
-
- else:
- raise cls.ErrorRecoveringState(
- "Unknown state = %s" % process_state_update.state)
-
- def would_update(self, state, runner_ckpt):
- """
- Provided a ProcessStatus, would this perform a transition and update state?
- """
- process_update = runner_ckpt.process_status
- if process_update is None:
- return False
- process = process_update.process
- if process not in state.processes: # never seen before
- return True
- else:
- # if this sequence number is ahead of the current high water mark, it would
- # produce a transition
- return state.processes[process][-1].seq < process_update.seq
-
- def dispatch(self, state, runner_ckpt, recovery=False, truncate=False):
- """
- Given a RunnerState and a RunnerCkpt to apply to it, determine the appropriate action and
- dispatch to the appropriate handlers.
-
- state = RunnerState to be updated
- runner_ckpt = RunnerCkpt update to apply
- recovery = if true, enable recovery mode (accept out-of-order sequence updates)
- truncate = if true, store only the latest task/process states, instead of
- history for all runs.
-
- Raises ErrorRecoveringState on failure.
- """
- # case 1: runner_header
- # -> Initialization of the task stream.
- if runner_ckpt.runner_header is not None:
- if state.header is not None:
- raise self.ErrorRecoveringState(
- "Attempting to rebind task with different parameters!")
- else:
- log.debug('Initializing TaskRunner header to %s' % runner_ckpt.runner_header)
- state.header = runner_ckpt.runner_header
- self._run_header_dispatch(runner_ckpt.runner_header)
- return
-
- # case 2: task_status
- # -> State transition on the task (ACTIVE, FAILED, SUCCESS, LOST)
- if runner_ckpt.task_status is not None:
- if state.statuses is None:
- state.statuses = []
- old_state = None
- else:
- old_state = state.statuses[-1].state
- if not truncate:
- state.statuses.append(runner_ckpt.task_status)
- else:
- state.statuses = [runner_ckpt.task_status]
- new_state = runner_ckpt.task_status.state
- log.debug('Flipping task state from %s to %s' % (
- TaskState._VALUES_TO_NAMES.get(old_state, '(undefined)'),
- TaskState._VALUES_TO_NAMES.get(new_state, '(undefined)')))
- self._run_task_dispatch(new_state, runner_ckpt.task_status)
- return
-
- # case 3: process_status
- # -> State transition on a process itself
- # (WAITING, FORKED, RUNNING, SUCCESS, KILLED, FAILED, LOST)
- if runner_ckpt.process_status is not None:
- process_update = runner_ckpt.process_status
- name = process_update.process
- current_run = state.processes[name][-1] if name in state.processes else None
- if current_run and process_update.seq != current_run.seq + 1:
- if recovery:
- log.debug('Skipping replayed out-of-order update: %s' % process_update)
- return
- else:
- raise self.InvalidSequenceNumber(
- "Out of order sequence number! %s => %s" % (current_run, process_update))
-
- # One special case for WAITING: Initialize a new target ProcessState.
- if process_update.state == ProcessState.WAITING:
- assert current_run is None or self.is_terminal(current_run)
- if name not in state.processes:
- state.processes[name] = [ProcessStatus(seq=-1)]
- else:
- if not truncate:
- state.processes[name].append(ProcessStatus(seq=current_run.seq))
- else:
- state.processes[name] = [ProcessStatus(seq=current_run.seq)]
-
- # Run the process state machine.
- log.debug('Running state machine for process=%s/seq=%s' % (name, process_update.seq))
- if not state.processes or name not in state.processes:
- raise self.ErrorRecoveringState("Encountered potentially out of order "
- "process update. Are you sure this is a full checkpoint stream?")
- self._update_process_state(state.processes[name][-1], process_update)
- self._run_process_dispatch(process_update.state, process_update)
- return
-
- raise self.ErrorRecoveringState("Empty RunnerCkpt encountered!")
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/twitter/thermos/common/options.py
----------------------------------------------------------------------
diff --git a/src/main/python/twitter/thermos/common/options.py b/src/main/python/twitter/thermos/common/options.py
deleted file mode 100644
index 9948d03..0000000
--- a/src/main/python/twitter/thermos/common/options.py
+++ /dev/null
@@ -1,34 +0,0 @@
-from pystachio import Ref
-
-class ParseError(Exception):
- pass
-
-def add_port_to(option_name):
- def add_port_callback(option, opt, value, parser):
- if not getattr(parser.values, option_name, None):
- setattr(parser.values, option_name, {})
- try:
- name, port = value.split(':')
- except (ValueError, TypeError):
- raise ParseError('Invalid value for %s: %s should be of form NAME:PORT' % (
- opt, value))
- try:
- port = int(port)
- except ValueError:
- raise ParseError('Port does not appear to be an integer: %s' % port)
- getattr(parser.values, option_name)[name] = port
- return add_port_callback
-
-def add_binding_to(option_name):
- def add_binding_callback(option, opt, value, parser):
- if not getattr(parser.values, option_name, None):
- setattr(parser.values, option_name, [])
- if len(value.split('=')) != 2:
- raise ParseError('Binding must be of the form NAME=VALUE')
- name, value = value.split('=')
- try:
- ref = Ref.from_address(name)
- except Ref.InvalidRefError as e:
- raise ParseError('Could not parse ref %s: %s' % (name, e))
- getattr(parser.values, option_name).append({ref: value})
- return add_binding_callback