You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aurora.apache.org by wf...@apache.org on 2013/12/31 22:20:18 UTC
[25/51] [partial] Rename twitter* and com.twitter to apache and
org.apache directories to preserve all file history before the refactor.
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/apache/aurora/client/hooks/hooked_api.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/aurora/client/hooks/hooked_api.py b/src/main/python/apache/aurora/client/hooks/hooked_api.py
new file mode 100644
index 0000000..cc4d3db
--- /dev/null
+++ b/src/main/python/apache/aurora/client/hooks/hooked_api.py
@@ -0,0 +1,167 @@
+import functools
+import traceback
+
+from twitter.common import log
+from twitter.aurora.client.api import AuroraClientAPI
+from twitter.aurora.common.aurora_job_key import AuroraJobKey
+
+from gen.twitter.aurora.ttypes import ResponseCode
+
+
+def _partial(function, *args, **kw):
+ """Returns a partial function __name__ inherited from parent function."""
+ partial = functools.partial(function, *args, **kw)
+ return functools.update_wrapper(partial, function)
+
+
+class HookConfig(object):
+ def __init__(self, config, job_key):
+ self.config = config
+ self.job_key = job_key or (config.job_key() if config is not None else None)
+
+ def __iter__(self):
+ yield self.config
+ yield self.job_key
+
+
+class NonHookedAuroraClientAPI(AuroraClientAPI):
+ """
+ This wraps those AuroraClientAPI methods that don't have an AuroraConfig 'config' param
+ to take an optional 'config' param which:
+ * contains the configured hooks (config.hooks)
+ * is dropped before the call is proxied to AuroraClientAPI
+ * is thus available to API methods in subclasses
+ """
+
+ def cancel_update(self, job_key, config=None):
+ return super(NonHookedAuroraClientAPI, self).cancel_update(job_key)
+
+ def kill_job(self, job_key, instances=None, lock=None, config=None):
+ return super(NonHookedAuroraClientAPI, self).kill_job(job_key, instances=instances, lock=lock)
+
+ def restart(self, job_key, shards, updater_config, health_check_interval_seconds, config=None):
+ return super(NonHookedAuroraClientAPI, self).restart(job_key, shards, updater_config,
+ health_check_interval_seconds)
+
+ def start_cronjob(self, job_key, config=None):
+ return super(NonHookedAuroraClientAPI, self).start_cronjob(job_key)
+
+
+class HookedAuroraClientAPI(NonHookedAuroraClientAPI):
+ """
+ Adds a hooking aspect/behaviour to the lifecycle of Mesos Client API methods
+ by injecting hooks (instances of twitter.aurora.client.hooks.Hooks)
+
+ * Hooks are available in the 'config' (AuroraConfig) param that each API call receives
+ * Each Hook is run around each API call:
+ * 'pre' hook before the call
+ * 'post' hook if the call succeeds
+ * 'err' hook if the call fails
+ * If the hook itself fails, then it is treated as a WARN rather than an ERROR
+ """
+
+ class Error(Exception): pass
+ class PreHooksStoppedCall(Error): pass
+ class APIError(Error):
+ def __init__(self, response):
+ self.response = response
+
+ def __str__(self):
+ return '%s: %s: %s' % (self.__class__.__name__,
+ ResponseCode._VALUES_TO_NAMES.get(self.response.responseCode, 'UNKNOWN'),
+ self.response.message)
+
+ @classmethod
+ def _meta_hook(cls, hook, hook_method):
+ def callback():
+ if hook_method is None:
+ return True
+ log.debug('Running %s in %s' % (hook_method.__name__, hook.__class__.__name__))
+ hook_result = False
+ try:
+ hook_result = hook_method()
+ if not hook_result:
+ log.debug('%s in %s returned False' % (hook_method.__name__,
+ hook.__class__.__name__))
+ except Exception:
+ log.warn('Error in %s in %s' %
+ (hook_method.__name__, hook.__class__.__name__))
+ log.warn(traceback.format_exc())
+ return hook_result
+ return callback
+
+ @classmethod
+ def _generate_method(cls, hook, config, job_key, event, method, extra_argument=None):
+ method_name, args, kw = method.__name__, method.args, method.keywords
+ kw = kw or {}
+ hook_method = getattr(hook, '%s_%s' % (event, method_name), None)
+ if callable(hook_method):
+ if extra_argument is not None:
+ hook_method = _partial(hook_method, extra_argument)
+ return _partial(hook_method, *args, **kw)
+ else:
+ hook_method = getattr(hook, 'generic_hook', None)
+ if hook_method is None:
+ return None
+ hook_method = _partial(hook_method, HookConfig(config, job_key),
+ event, method_name, extra_argument)
+ return _partial(hook_method, args, kw)
+
+ @classmethod
+ def _yield_hooks(cls, event, config, job_key, api_call, extra_argument=None):
+ hooks = config.hooks if config and config.raw().enable_hooks().get() else ()
+ for hook in hooks:
+ yield cls._meta_hook(hook,
+ cls._generate_method(hook, config, job_key, event, api_call, extra_argument))
+
+ @classmethod
+ def _invoke_hooks(cls, event, config, job_key, api_call, extra_argument=None):
+ hooks_passed = [hook() for hook in cls._yield_hooks(event, config, job_key, api_call,
+ extra_argument)]
+ return all(hooks_passed)
+
+ def _hooked_call(self, config, job_key, api_call):
+ if not self._invoke_hooks('pre', config, job_key, api_call):
+ raise self.PreHooksStoppedCall('Pre hooks stopped call to %s' % api_call.__name__)
+
+ try:
+ resp = api_call()
+ except Exception as e:
+ self._invoke_hooks('err', config, job_key, api_call, e)
+ raise # propagate since the API method call failed for unknown reasons
+
+ if resp.responseCode != ResponseCode.OK:
+ self._invoke_hooks('err', config, job_key, api_call, self.APIError(resp))
+ else:
+ self._invoke_hooks('post', config, job_key, api_call, resp)
+
+ return resp
+
+ def create_job(self, config, lock=None):
+ return self._hooked_call(config, None,
+ _partial(super(HookedAuroraClientAPI, self).create_job, config, lock))
+
+ def cancel_update(self, job_key, config=None):
+ return self._hooked_call(config, job_key,
+ _partial(super(HookedAuroraClientAPI, self).cancel_update,
+ job_key, config=config))
+
+ def kill_job(self, job_key, instances=None, lock=None, config=None):
+ return self._hooked_call(config, job_key,
+ _partial(super(HookedAuroraClientAPI, self).kill_job,
+ job_key, instances=instances, lock=lock, config=config))
+
+ def restart(self, job_key, shards, updater_config, health_check_interval_seconds, config=None):
+ return self._hooked_call(config, job_key,
+ _partial(super(HookedAuroraClientAPI, self).restart,
+ job_key, shards, updater_config, health_check_interval_seconds, config=config))
+
+ def start_cronjob(self, job_key, config=None):
+ return self._hooked_call(config, job_key,
+ _partial(super(HookedAuroraClientAPI, self).start_cronjob,
+ job_key, config=config))
+
+ def update_job(self, config, health_check_interval_seconds=3, instances=None):
+ return self._hooked_call(config, None,
+ _partial(super(HookedAuroraClientAPI, self).update_job,
+ config, health_check_interval_seconds=health_check_interval_seconds, instances=instances))
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/apache/aurora/client/options.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/aurora/client/options.py b/src/main/python/apache/aurora/client/options.py
new file mode 100644
index 0000000..7ced961
--- /dev/null
+++ b/src/main/python/apache/aurora/client/options.py
@@ -0,0 +1,199 @@
+import optparse
+
+from twitter.aurora.common.aurora_job_key import AuroraJobKey
+from twitter.thermos.common.options import add_binding_to
+
+
+__all__ = (
+ 'CLUSTER_CONFIG_OPTION',
+ 'CLUSTER_INVOKE_OPTION',
+ 'ENVIRONMENT_BIND_OPTION',
+ 'ENV_CONFIG_OPTION',
+ 'EXECUTOR_SANDBOX_OPTION',
+ 'FROM_JOBKEY_OPTION',
+ 'HEALTH_CHECK_INTERVAL_SECONDS_OPTION',
+ 'JSON_OPTION',
+ 'OPEN_BROWSER_OPTION',
+ 'SHARDS_OPTION',
+ 'SSH_USER_OPTION',
+ 'WAIT_UNTIL_OPTION',
+)
+
+
+def add_verbosity_options():
+ from twitter.common import app
+ from twitter.common.log.options import LogOptions
+
+ def set_quiet(option, _1, _2, parser):
+ setattr(parser.values, option.dest, 'quiet')
+ LogOptions.set_stderr_log_level('NONE')
+
+ def set_verbose(option, _1, _2, parser):
+ setattr(parser.values, option.dest, 'verbose')
+ LogOptions.set_stderr_log_level('DEBUG')
+
+ app.add_option('-v',
+ dest='verbosity',
+ default='normal',
+ action='callback',
+ callback=set_verbose,
+ help='Verbose logging. (default: %default)')
+
+ app.add_option('-q',
+ dest='verbosity',
+ default='normal',
+ action='callback',
+ callback=set_quiet,
+ help='Quiet logging. (default: %default)')
+
+
+def parse_shards_into(option, opt, value, parser):
+ """Parse lists of shard or shard ranges into a set().
+
+ Examples:
+ 0-2
+ 0,1-3,5
+ 1,3,5
+ """
+ def shard_range_parser(shards):
+ result = set()
+ for part in shards.split(','):
+ x = part.split('-')
+ result.update(range(int(x[0]), int(x[-1]) + 1))
+ return sorted(result)
+
+ try:
+ setattr(parser.values, option.dest, shard_range_parser(value))
+ except ValueError as e:
+ raise optparse.OptionValueError('Failed to parse: %s' % e)
+
+
+def parse_aurora_job_key_into(option, opt, value, parser):
+ try:
+ setattr(parser.values, option.dest, AuroraJobKey.from_path(value))
+ except AuroraJobKey.Error as e:
+ raise optparse.OptionValueError('Failed to parse: %s' % e)
+
+
+def make_env_option(explanation):
+ return optparse.Option(
+ '--env',
+ dest='env',
+ default=None,
+ help=explanation)
+
+
+OPEN_BROWSER_OPTION = optparse.Option(
+ '-o',
+ '--open_browser',
+ dest='open_browser',
+ action='store_true',
+ default=False,
+ help='Open a browser window to the job page after a job mutation.')
+
+
+SHARDS_OPTION = optparse.Option(
+ '--shards',
+ type='string',
+ dest='shards',
+ default=None,
+ action='callback',
+ callback=parse_shards_into,
+ help='A list of shard ids to act on. Can either be a comma-separated list (e.g. 0,1,2) '
+ 'or a range (e.g. 0-2) or any combination of the two (e.g. 0-2,5,7-9). If not set, '
+ 'all shards will be acted on.')
+
+
+FROM_JOBKEY_OPTION = optparse.Option('--from', dest='rename_from', type='string', default=None,
+ metavar='CLUSTER/ROLE/ENV/JOB', action='callback', callback=parse_aurora_job_key_into,
+ help='Job key to diff against.')
+
+
+JSON_OPTION = optparse.Option(
+ '-j',
+ '--json',
+ dest='json',
+ default=False,
+ action='store_true',
+ help='If specified, configuration is read in JSON format.')
+
+
+CLUSTER_CONFIG_OPTION = optparse.Option(
+ '--cluster',
+ dest='cluster',
+ default=None,
+ type='string',
+ help='Cluster to match when selecting a job from a configuration. Optional if only one job '
+ 'matching the given job name exists in the config.')
+
+
+CLUSTER_INVOKE_OPTION = optparse.Option(
+ '--cluster',
+ dest='cluster',
+ default=None,
+ type='string',
+ help='Cluster to invoke this command against. Deprecated in favor of the CLUSTER/ROLE/ENV/NAME '
+ 'syntax.')
+
+
+ENV_CONFIG_OPTION = make_env_option(
+ 'Environment to match when selecting a job from a configuration.')
+
+
+# This is for binding arbitrary points in the Thermos namespace to specific strings, e.g.
+# if a Thermos configuration has {{jvm.version}}, it can be bound explicitly from the
+# command-line with, for example, -E jvm.version=7
+ENVIRONMENT_BIND_OPTION = optparse.Option(
+ '-E',
+ type='string',
+ nargs=1,
+ action='callback',
+ default=[],
+ metavar='NAME=VALUE',
+ callback=add_binding_to('bindings'),
+ dest='bindings',
+ help='Bind a thermos mustache variable name to a value. '
+ 'Multiple flags may be used to specify multiple values.')
+
+
+EXECUTOR_SANDBOX_OPTION = optparse.Option(
+ '-e',
+ '--executor_sandbox',
+ action='store_true',
+ default=False,
+ dest='executor_sandbox',
+ help='Run the command in the executor sandbox instead of the task sandbox.')
+
+
+SSH_USER_OPTION = optparse.Option(
+ '--user',
+ dest='ssh_user',
+ default=None,
+ help="ssh as this user instead of the role.")
+
+
+CREATE_STATES = (
+ 'PENDING',
+ 'RUNNING',
+ 'FINISHED'
+)
+
+
+WAIT_UNTIL_OPTION = optparse.Option(
+ '--wait_until',
+ default='PENDING',
+ type='choice',
+ choices=('PENDING', 'RUNNING', 'FINISHED'),
+ metavar='STATE',
+ dest='wait_until',
+ help='Block the client until all the tasks have transitioned into the '
+ 'requested state. Options: %s. Default: %%default' % (', '.join(CREATE_STATES)))
+
+
+HEALTH_CHECK_INTERVAL_SECONDS_OPTION = optparse.Option(
+ '--updater_health_check_interval_seconds',
+ dest='health_check_interval_seconds',
+ type=int,
+ default=3,
+ help='Time interval between subsequent shard status checks.'
+)
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/apache/aurora/common/BUILD
----------------------------------------------------------------------
diff --git a/src/main/python/apache/aurora/common/BUILD b/src/main/python/apache/aurora/common/BUILD
new file mode 100644
index 0000000..4e839f7
--- /dev/null
+++ b/src/main/python/apache/aurora/common/BUILD
@@ -0,0 +1,63 @@
+import os
+
+python_library(
+ name = 'aurora_job_key',
+ sources = ['aurora_job_key.py'],
+ dependencies = [
+ pants('aurora/twitterdeps/src/python/twitter/common/lang'),
+ pants('src/main/thrift/com/twitter/aurora/gen:py-thrift'),
+ ]
+)
+
+python_library(
+ name = 'cluster',
+ sources = ['cluster.py'],
+ dependencies = [
+ pants('src/main/python/twitter/aurora/BUILD.thirdparty:pystachio'),
+ ]
+)
+
+python_library(
+ name = 'clusters',
+ sources = ['clusters.py'],
+ dependencies = [
+ pants(':cluster'),
+ pants('src/main/python/twitter/aurora/BUILD.thirdparty:pystachio'),
+ pants('aurora/twitterdeps/src/python/twitter/common/collections'),
+ ]
+)
+
+python_library(
+ name = 'cluster_option',
+ sources = ['cluster_option.py'],
+ dependencies = [
+ pants('aurora/twitterdeps/src/python/twitter/common/lang'),
+ ]
+)
+
+python_library(
+ name = 'http_signaler',
+ sources = ['http_signaler.py'],
+ dependencies = [
+ pants('aurora/twitterdeps/src/python/twitter/common/lang'),
+ pants('aurora/twitterdeps/src/python/twitter/common/log'),
+ ]
+)
+
+python_library(
+ name = 'common',
+ dependencies = [
+ pants(':aurora_job_key'),
+ pants(':cluster'),
+ pants(':cluster_option'),
+ pants(':clusters'),
+ pants(':http_signaler'),
+ pants('src/main/python/twitter/aurora/common/auth'),
+ ],
+ provides = setup_py(
+ name = 'twitter.aurora.common',
+ version = open(os.path.join(get_buildroot(), '.auroraversion')).read().strip().lower(),
+ description = 'Aurora common libraries.',
+ license = 'Apache License, Version 2.0',
+ )
+)
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/apache/aurora/common/__init__.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/aurora/common/__init__.py b/src/main/python/apache/aurora/common/__init__.py
new file mode 100644
index 0000000..e69de29
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/apache/aurora/common/aurora_job_key.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/aurora/common/aurora_job_key.py b/src/main/python/apache/aurora/common/aurora_job_key.py
new file mode 100644
index 0000000..81a8687
--- /dev/null
+++ b/src/main/python/apache/aurora/common/aurora_job_key.py
@@ -0,0 +1,97 @@
+import re
+
+from twitter.common.lang import Compatibility, total_ordering
+
+from gen.twitter.aurora.constants import GOOD_IDENTIFIER_PATTERN_PYTHON
+from gen.twitter.aurora.ttypes import Identity, JobKey, TaskQuery
+
+# TODO(ksweeney): This can just probably just extend namedtuple.
+@total_ordering
+class AuroraJobKey(object):
+ """A canonical representation of a key that can identify a job in any of the clusters the client
+ is aware of."""
+ class Error(Exception): pass
+ class TypeError(TypeError, Error): pass
+ class InvalidIdentifier(ValueError, Error): pass
+ class ParseError(ValueError, Error): pass
+
+ VALID_IDENTIFIER = re.compile(GOOD_IDENTIFIER_PATTERN_PYTHON)
+
+ def __init__(self, cluster, role, env, name):
+ if not isinstance(cluster, Compatibility.string):
+ raise self.TypeError("cluster should be a string, got %s" % (cluster.__class__.__name__))
+ self._cluster = cluster
+ self._role = self._assert_valid_identifier("role", role)
+ self._env = self._assert_valid_identifier("env", env)
+ self._name = self._assert_valid_identifier("name", name)
+
+ @classmethod
+ def from_path(cls, path):
+ try:
+ cluster, role, env, name = path.split('/', 4)
+ except ValueError:
+ raise cls.ParseError(
+ "Invalid path '%s'. path should be a string in the form CLUSTER/ROLE/ENV/NAME" % path)
+ return cls(cluster, role, env, name)
+
+ @classmethod
+ def from_thrift(cls, cluster, job_key):
+ if not isinstance(job_key, JobKey):
+ raise cls.TypeError("job_key must be a Thrift JobKey struct")
+ return cls(cluster, job_key.role, job_key.environment, job_key.name)
+
+ @classmethod
+ def _assert_valid_identifier(cls, field, identifier):
+ if not isinstance(identifier, Compatibility.string):
+ raise cls.TypeError("%s must be a string" % field)
+ if not cls.VALID_IDENTIFIER.match(identifier):
+ raise cls.InvalidIdentifier("Invalid %s '%s'" % (field, identifier))
+ return identifier
+
+ @property
+ def cluster(self):
+ return self._cluster
+
+ @property
+ def role(self):
+ return self._role
+
+ @property
+ def env(self):
+ return self._env
+
+ @property
+ def name(self):
+ return self._name
+
+ def to_path(self):
+ return "%s/%s/%s/%s" % (self.cluster, self.role, self.env, self.name)
+
+ def to_thrift(self):
+ return JobKey(role=self.role, environment=self.env, name=self.name)
+
+ def to_thrift_query(self):
+ return TaskQuery(owner=Identity(role=self.role), environment=self.env, jobName=self.name)
+
+ def __iter__(self):
+ """Support 'cluster, role, env, name = job_key' assignment."""
+ return iter((self._cluster, self._role, self._env, self._name))
+
+ def __repr__(self):
+ return "%s(%r, %r, %r, %r)" % (self.__class__, self._cluster, self._role, self._env, self._name)
+
+ def __str__(self):
+ return self.to_path()
+
+ def __hash__(self):
+ return hash(AuroraJobKey) + hash(self.to_path())
+
+ def __eq__(self, other):
+ if not isinstance(other, AuroraJobKey):
+ return NotImplemented
+ return self.to_path() == other.to_path()
+
+ def __lt__(self, other):
+ if not isinstance(other, AuroraJobKey):
+ return NotImplemented
+ return self.to_path() < other.to_path()
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/apache/aurora/common/auth/BUILD
----------------------------------------------------------------------
diff --git a/src/main/python/apache/aurora/common/auth/BUILD b/src/main/python/apache/aurora/common/auth/BUILD
new file mode 100644
index 0000000..fd5b024
--- /dev/null
+++ b/src/main/python/apache/aurora/common/auth/BUILD
@@ -0,0 +1,9 @@
+python_library(
+ name = 'auth',
+ sources = globs('*.py'),
+ dependencies = [
+ pants('aurora/twitterdeps/src/python/twitter/common/lang'),
+ pants('aurora/twitterdeps/src/python/twitter/common/log'),
+ pants('src/main/thrift/com/twitter/aurora/gen:py-thrift'),
+ ]
+)
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/apache/aurora/common/auth/__init__.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/aurora/common/auth/__init__.py b/src/main/python/apache/aurora/common/auth/__init__.py
new file mode 100644
index 0000000..5418228
--- /dev/null
+++ b/src/main/python/apache/aurora/common/auth/__init__.py
@@ -0,0 +1,2 @@
+from .auth_module_manager import make_session_key, register_auth_module, SessionKeyError
+from .auth_module import AuthModule, InsecureAuthModule
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/apache/aurora/common/auth/auth_module.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/aurora/common/auth/auth_module.py b/src/main/python/apache/aurora/common/auth/auth_module.py
new file mode 100644
index 0000000..7b146a9
--- /dev/null
+++ b/src/main/python/apache/aurora/common/auth/auth_module.py
@@ -0,0 +1,30 @@
+from abc import abstractmethod, abstractproperty
+
+import getpass
+import time
+
+from twitter.common.lang import Interface
+
+from gen.twitter.aurora.ttypes import SessionKey
+
+
+class AuthModule(Interface):
+ @abstractproperty
+ def mechanism(self):
+ """Return the mechanism provided by this AuthModule."""
+
+ @abstractmethod
+ def payload(self):
+ """Return the payload generated by the AuthModule."""
+
+ def __call__(self):
+ return SessionKey(mechanism=self.mechanism, data=self.payload())
+
+
+class InsecureAuthModule(AuthModule):
+ @property
+ def mechanism(self):
+ return 'UNAUTHENTICATED'
+
+ def payload(self):
+ return 'UNAUTHENTICATED'
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/apache/aurora/common/auth/auth_module_manager.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/aurora/common/auth/auth_module_manager.py b/src/main/python/apache/aurora/common/auth/auth_module_manager.py
new file mode 100644
index 0000000..03d51b3
--- /dev/null
+++ b/src/main/python/apache/aurora/common/auth/auth_module_manager.py
@@ -0,0 +1,47 @@
+from twitter.common import log
+
+from gen.twitter.aurora.ttypes import SessionKey
+
+from .auth_module import AuthModule, InsecureAuthModule
+
+
+_INSECURE_AUTH_MODULE = InsecureAuthModule()
+_AUTH_MODULES = {
+ _INSECURE_AUTH_MODULE.mechanism: _INSECURE_AUTH_MODULE
+}
+
+
+class SessionKeyError(Exception): pass
+
+
+def register_auth_module(auth_module):
+ """
+ Add an auth module into the registry used by make_session_key. An auth module is discovered
+ via its auth mechanism.
+
+ args:
+ auth_module: A 0-arg callable that should return a SessionKey or raises a SessionKeyError
+ and extend AuthModule.
+ """
+ if not isinstance(auth_module, AuthModule):
+ raise TypeError('Given auth module must be a AuthModule subclass, got %s' % type(auth_module))
+ if not callable(auth_module):
+ raise TypeError('auth_module should be callable.')
+ _AUTH_MODULES[auth_module.mechanism] = auth_module
+
+
+def make_session_key(auth_mechanism='UNAUTHENTICATED'):
+ """
+ Attempts to create a session key by calling the auth module registered to the auth mechanism.
+ If an auth module does not exist for an auth mechanism, an InsecureAuthModule will be used.
+ """
+ if not _AUTH_MODULES:
+ raise SessionKeyError('No auth modules have been registered. Please call register_auth_module.')
+
+ auth_module = _AUTH_MODULES.get(auth_mechanism) or _INSECURE_AUTH_MODULE
+ log.debug('Using auth module: %r' % auth_module)
+ session_key = auth_module()
+ if not isinstance(session_key, SessionKey):
+ raise SessionKeyError('Expected %r but got %r from auth module %r' % (
+ SessionKey, session_key.__class__, auth_module))
+ return session_key
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/apache/aurora/common/cluster.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/aurora/common/cluster.py b/src/main/python/apache/aurora/common/cluster.py
new file mode 100644
index 0000000..21d8ac3
--- /dev/null
+++ b/src/main/python/apache/aurora/common/cluster.py
@@ -0,0 +1,81 @@
+from pystachio import Empty, Struct
+from pystachio.composite import Structural
+
+__all__ = ('Cluster',)
+
+
+# TODO(wickman) It seems like some of this Trait/Mixin stuff should be a
+# first-class construct in Pystachio. It could be a solution for extensible
+# Job/Task definitions.
+class Cluster(dict):
+ """Cluster encapsulates a set of K/V attributes describing cluster configurations.
+
+ Given a cluster, attributes may be accessed directly on them, e.g.
+ cluster.name
+ cluster.scheduler_zk_path
+
+ In order to enforce particular "traits" of Cluster, use Cluster.Trait to construct
+ enforceable schemas, e.g.
+
+ class ResolverTrait(Cluster.Trait):
+ scheduler_zk_ensemble = Required(String)
+ scheduler_zk_path = Default(String, '/twitter/service/mesos/prod/scheduler')
+
+ cluster = Cluster(name = 'west', scheduler_zk_ensemble = 'zookeeper.west.twttr.net')
+
+ # Ensures that scheduler_zk_ensemble is defined in the cluster or it will raise a TypeError
+ cluster.with_trait(ResolverTrait).scheduler_zk_ensemble
+
+ # Will use the default if none is provided on Cluster.
+ cluster.with_trait(ResolverTrait).scheduler_zk_path
+ """
+ Trait = Struct
+
+ def __init__(self, **kwargs):
+ self._traits = ()
+ super(Cluster, self).__init__(**kwargs)
+
+ def get_trait(self, trait):
+ """Given a Cluster.Trait, extract that trait."""
+ if not issubclass(trait, Structural):
+ raise TypeError('provided trait must be a Cluster.Trait subclass, got %s' % type(trait))
+ # TODO(wickman) Expose this in pystachio as a non-private or add a load method with strict=
+ return trait(trait._filter_against_schema(self))
+
+ def check_trait(self, trait):
+ """Given a Cluster.Trait, typecheck that trait."""
+ trait_check = self.get_trait(trait).check()
+ if not trait_check.ok():
+ raise TypeError(trait_check.message())
+
+ def with_traits(self, *traits):
+ """Return a cluster annotated with a set of traits."""
+ new_cluster = self.__class__(**self)
+ for trait in traits:
+ new_cluster.check_trait(trait)
+ new_cluster._traits = traits
+ return new_cluster
+
+ def with_trait(self, trait):
+ """Return a cluster annotated with a single trait (helper for self.with_traits)."""
+ return self.with_traits(trait)
+
+ def __setitem__(self, key, value):
+ raise TypeError('Clusters are immutable.')
+
+ def __getattr__(self, attribute):
+ for trait in self._traits:
+ expressed_trait = self.get_trait(trait)
+ if hasattr(expressed_trait, attribute):
+ value = getattr(expressed_trait, attribute)()
+ return None if value is Empty else value.get()
+ try:
+ return self[attribute]
+ except KeyError:
+ return self.__getattribute__(attribute)
+
+ def __copy__(self):
+ return self
+
+ def __deepcopy__(self, memo):
+ return self
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/apache/aurora/common/cluster_option.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/aurora/common/cluster_option.py b/src/main/python/apache/aurora/common/cluster_option.py
new file mode 100644
index 0000000..3e07476
--- /dev/null
+++ b/src/main/python/apache/aurora/common/cluster_option.py
@@ -0,0 +1,82 @@
+from copy import copy
+from optparse import (
+ NO_DEFAULT,
+ OptionValueError,
+ Option)
+
+
+def _check_mesos_cluster(option, opt, value):
+ cluster_name = value
+ if option.clusters and cluster_name in option.clusters:
+ return option.clusters[cluster_name]
+ elif option.cluster_provider:
+ return option.cluster_provider(cluster_name)
+
+ cluster_list = ""
+ if option.clusters:
+ cluster_list = 'Valid options for clusters are %s' % ' '.join(option.clusters)
+
+ raise OptionValueError(
+ '%s is not a valid cluster for the %s option. %s' % (value, opt, cluster_list))
+
+
+class ClusterOption(Option):
+ """A command-line Option that requires a valid cluster name and returns a Cluster object.
+
+ Use in an @app.command_option decorator to avoid boilerplate. For example:
+
+ CLUSTER_PATH = os.path.expanduser('~/.clusters')
+ CLUSTERS = Clusters.from_json(CLUSTER_PATH)
+
+ @app.command
+ @app.command_option(ClusterOption('--cluster', default='smf1-test', clusters=CLUSTERS))
+ def get_health(args, options):
+ if options.cluster.zk_server:
+ do_something(options.cluster)
+
+ @app.command
+ @app.command_option(ClusterOption('-s',
+ '--source_cluster',
+ default='smf1-test',
+ clusters=CLUSTERS,
+ help='Source cluster to pull metadata from.'))
+ @app.command_option(ClusterOption('-d',
+ '--dest_cluster',
+ clusters=CLUSTERS,
+ default='smf1-test'))
+ def copy_metadata(args, options):
+ if not options.source_cluster:
+ print('required option source_cluster missing!')
+ metadata_copy(options.source_cluster, options.dest_cluster)
+ """
+
+ # Needed since we're creating a new type for validation - see optparse docs.
+ TYPES = copy(Option.TYPES) + ('mesos_cluster',)
+ TYPE_CHECKER = copy(Option.TYPE_CHECKER)
+ TYPE_CHECKER['mesos_cluster'] = _check_mesos_cluster
+
+ def __init__(self, *opt_str, **attrs):
+ """
+ *opt_str: Same meaning as in twitter.common.options.Option, at least one is required.
+ **attrs: See twitter.common.options.Option, with the following caveats:
+
+ Exactly one of the following must be provided:
+
+ clusters: A static Clusters object from which to pick clusters.
+ cluster_provider: A function that takes a cluster name and returns a Cluster object.
+ """
+ self.clusters = attrs.pop('clusters', None)
+ self.cluster_provider = attrs.pop('cluster_provider', None)
+ if not (self.clusters is not None) ^ (self.cluster_provider is not None):
+ raise ValueError('Must specify exactly one of clusters and cluster_provider.')
+
+ default_attrs = dict(
+ default=None,
+ action='store',
+ type='mesos_cluster',
+ help='Mesos cluster to use (Default: %%default)'
+ )
+
+ combined_attrs = default_attrs
+ combined_attrs.update(attrs) # Defensive copy
+ Option.__init__(self, *opt_str, **combined_attrs) # old-style superclass
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/apache/aurora/common/clusters.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/aurora/common/clusters.py b/src/main/python/apache/aurora/common/clusters.py
new file mode 100644
index 0000000..2777569
--- /dev/null
+++ b/src/main/python/apache/aurora/common/clusters.py
@@ -0,0 +1,135 @@
+from __future__ import print_function
+
+from collections import Mapping, namedtuple
+from contextlib import contextmanager
+import itertools
+import json
+import os
+import sys
+
+from twitter.common.collections import maybe_list
+
+from .cluster import Cluster
+
+from pystachio import Required, String
+
+try:
+ import yaml
+ HAS_YAML = True
+except ImportError:
+ HAS_YAML = False
+
+
+__all__ = (
+ 'CLUSTERS',
+ 'Clusters',
+)
+
+
+class NameTrait(Cluster.Trait):
+ name = Required(String)
+
+
+Parser = namedtuple('Parser', 'loader exception')
+
+
+class Clusters(Mapping):
+ class Error(Exception): pass
+ class ClusterExists(Error): pass
+ class ClusterNotFound(KeyError, Error): pass
+ class UnknownFormatError(Error): pass
+ class ParseError(Error): pass
+
+ LOADERS = {'.json': Parser(json.load, ValueError)}
+ if HAS_YAML:
+ LOADERS['.yml'] = Parser(yaml.load, yaml.parser.ParserError)
+
+ @classmethod
+ def from_file(cls, filename):
+ return cls(list(cls.iter_clusters(filename)))
+
+ @classmethod
+ def iter_clusters(cls, filename):
+ _, ext = os.path.splitext(filename)
+ if ext not in cls.LOADERS:
+ raise cls.UnknownFormatError('Unknown clusters file extension: %r' % ext)
+ with open(filename) as fp:
+ loader, exc_type = cls.LOADERS[ext]
+ try:
+ document = loader(fp)
+ except exc_type as e:
+ raise cls.ParseError('Unable to parse %s: %s' % (filename, e))
+ if isinstance(document, list):
+ iterator = document
+ elif isinstance(document, dict):
+ iterator = document.values()
+ else:
+ raise cls.ParseError('Unknown layout in %s' % filename)
+ for document in iterator:
+ if not isinstance(document, dict):
+ raise cls.ParseError('Clusters must be maps of key/value pairs, got %s' % type(document))
+ # documents not adhering to NameTrait are ignored.
+ if 'name' not in document:
+ continue
+ yield Cluster(**document)
+
+ def __init__(self, cluster_list):
+ self.replace(cluster_list)
+
+ def replace(self, cluster_list):
+ self._clusters = {}
+ self.update(cluster_list)
+
+ def update(self, cluster_list):
+ cluster_list = maybe_list(cluster_list, expected_type=Cluster, raise_type=TypeError)
+ for cluster in cluster_list:
+ self.add(cluster)
+
+ def add(self, cluster):
+ """Add a cluster to this Clusters map."""
+ cluster = Cluster(**cluster)
+ cluster.check_trait(NameTrait)
+ self._clusters[cluster.name] = cluster
+
+ @contextmanager
+ def patch(self, cluster_list):
+ """Patch this Clusters instance with a new list of clusters in a
+ contextmanager. Intended for testing purposes."""
+ old_clusters = self._clusters.copy()
+ self.replace(cluster_list)
+ yield self
+ self._clusters = old_clusters
+
+ def __iter__(self):
+ return iter(self._clusters)
+
+ def __len__(self):
+ return len(self._clusters)
+
+ def __getitem__(self, name):
+ try:
+ return self._clusters[name]
+ except KeyError:
+ raise self.ClusterNotFound('Unknown cluster %s, valid clusters: %s' % (
+ name, ', '.join(self._clusters.keys())))
+
+
+
+DEFAULT_SEARCH_PATHS = (
+ os.environ.get('AURORA_CONFIG_ROOT') or '/etc/aurora',
+ os.path.expanduser('~/.aurora')
+)
+
+
+CLUSTERS = Clusters(())
+
+
+def load():
+ """(re-)load all clusters from the search path."""
+ for search_path, ext in itertools.product(DEFAULT_SEARCH_PATHS, Clusters.LOADERS):
+ filename = os.path.join(search_path, 'clusters' + ext)
+ if os.path.exists(filename):
+ CLUSTERS.update(Clusters.from_file(filename).values())
+
+
+load()
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/apache/aurora/common/http_signaler.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/aurora/common/http_signaler.py b/src/main/python/apache/aurora/common/http_signaler.py
new file mode 100644
index 0000000..778ddc4
--- /dev/null
+++ b/src/main/python/apache/aurora/common/http_signaler.py
@@ -0,0 +1,82 @@
+import contextlib
+from socket import timeout as SocketTimeout
+import sys
+
+from twitter.common import log
+from twitter.common.lang import Compatibility
+
+if Compatibility.PY3:
+ from http.client import HTTPException
+ import urllib.request as urllib_request
+ from urllib.error import URLError, HTTPError
+else:
+ from httplib import HTTPException
+ import urllib2 as urllib_request
+ from urllib2 import URLError, HTTPError
+
+
+class HttpSignaler(object):
+ """Simple HTTP endpoint wrapper to check health or trigger quitquitquit/abortabortabort"""
+ TIMEOUT_SECS = 1.0
+ FAILURE_REASON_LENGTH = 10
+
+ class Error(Exception): pass
+ class QueryError(Error): pass
+
+ def __init__(self, port, host='localhost', timeout_secs=TIMEOUT_SECS):
+ self._host = host
+ self._url_base = 'http://%s:%d/' % (host, port)
+ self._timeout_secs = timeout_secs
+
+ def url(self, endpoint):
+ return self._url_base + endpoint
+
+ @property
+ def opener(self):
+ return urllib_request.urlopen
+
+ def query(self, endpoint, data=None):
+ """Request an HTTP endpoint with a GET request (or POST if data is not None)"""
+ url = self.url(endpoint)
+ log.debug("%s: %s %s" % (self.__class__.__name__, 'GET' if data is None else 'POST', url))
+
+ def raise_error(reason):
+ raise self.QueryError('Failed to signal %s: %s' % (self.url(endpoint), reason))
+
+ try:
+ with contextlib.closing(
+ self.opener(url, data, timeout=self._timeout_secs)) as fp:
+ return fp.read()
+ except (HTTPException, SocketTimeout) as e:
+ # the type of an HTTPException is typically more useful than its contents (since for example
+ # BadStatusLines are often empty). likewise with socket.timeout.
+ raise_error('Error within %s' % e.__class__.__name__)
+ except (URLError, HTTPError) as e:
+ raise_error(e)
+ except Exception as e:
+ raise_error('Unexpected error: %s' % e)
+
+ def __call__(self, endpoint, use_post_method=False, expected_response=None):
+ """Returns a (boolean, string|None) tuple of (call success, failure reason)"""
+ try:
+ response = self.query(endpoint, '' if use_post_method else None).strip().lower()
+ if expected_response is not None and response != expected_response:
+ def shorten(string):
+ return (string if len(string) < self.FAILURE_REASON_LENGTH
+ else "%s..." % string[:self.FAILURE_REASON_LENGTH - 3])
+ reason = 'Response differs from expected response (expected "%s", got "%s")'
+ log.warning(reason % (expected_response, response))
+ return (False, reason % (shorten(str(expected_response)), shorten(str(response))))
+ else:
+ return (True, None)
+ except self.QueryError as e:
+ return (False, str(e))
+
+ def health(self):
+ return self('health', use_post_method=False, expected_response='ok')
+
+ def quitquitquit(self):
+ return self('quitquitquit', use_post_method=True)
+
+ def abortabortabort(self):
+ return self('abortabortabort', use_post_method=True)
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/apache/aurora/config/BUILD
----------------------------------------------------------------------
diff --git a/src/main/python/apache/aurora/config/BUILD b/src/main/python/apache/aurora/config/BUILD
new file mode 100644
index 0000000..91cb3e4
--- /dev/null
+++ b/src/main/python/apache/aurora/config/BUILD
@@ -0,0 +1,43 @@
+import os
+
+# Alias for src/main/python/twitter/aurora/config/schema
+python_library(
+ name = 'schema',
+ dependencies = [
+ pants('src/main/python/twitter/aurora/config/schema'),
+ ]
+)
+
+python_library(
+ name = 'config',
+ sources = (
+ '__init__.py',
+ 'loader.py',
+ 'port_resolver.py',
+ 'thrift.py',
+ ),
+ dependencies = [
+ pants('src/main/python/twitter/aurora/BUILD.thirdparty:pystachio'),
+ pants('aurora/twitterdeps/src/python/twitter/common/lang'),
+ pants('src/main/python/twitter/aurora/common'),
+ pants('src/main/python/twitter/aurora/config/schema'),
+ pants('src/main/thrift/com/twitter/aurora/gen:py-thrift'),
+ ],
+
+)
+
+python_library(
+ name = 'config-packaged',
+ dependencies = [
+ pants(':config'),
+
+ # covering dependencies
+ pants('src/main/python/twitter/thermos/config'),
+ ],
+ provides = setup_py(
+ name = 'twitter.aurora.config',
+ version = open(os.path.join(get_buildroot(), '.auroraversion')).read().strip().lower(),
+ description = 'Aurora/Thermos Pystachio schemas for describing job configurations.',
+ license = 'Apache License, Version 2.0',
+ )
+)
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/apache/aurora/config/__init__.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/aurora/config/__init__.py b/src/main/python/apache/aurora/config/__init__.py
new file mode 100644
index 0000000..a3ba981
--- /dev/null
+++ b/src/main/python/apache/aurora/config/__init__.py
@@ -0,0 +1,271 @@
+from __future__ import absolute_import
+
+from collections import defaultdict
+
+from twitter.aurora.common.aurora_job_key import AuroraJobKey
+from twitter.aurora.config.schema.base import MesosContext
+from twitter.thermos.config.loader import PortExtractor, ThermosTaskWrapper
+from twitter.thermos.config.schema import ThermosContext
+
+from .loader import AuroraConfigLoader
+from .port_resolver import PortResolver
+from .thrift import convert as convert_thrift, InvalidConfig as InvalidThriftConfig
+
+from pystachio import Empty, Environment, Ref
+
+__all__ = ('AuroraConfig', 'PortResolver')
+
+
+class AuroraConfig(object):
+ class Error(Exception): pass
+
+ class InvalidConfig(Error):
+ def __str__(self):
+ return 'The configuration was invalid: %s' % self.args[0]
+
+ @classmethod
+ def plugins(cls):
+ """A stack of callables to apply to the config on load."""
+ return []
+
+ @classmethod
+ def pick(cls, env, name, bindings, select_cluster=None, select_role=None, select_env=None):
+ # TODO(atollenaere): should take a JobKey when non-jobkey interface is deprecated
+
+ job_list = env.get('jobs', [])
+ if not job_list:
+ raise ValueError('No job defined in this config!')
+
+ def maybe_bind(j):
+ return j.bind(*bindings) if bindings else j
+
+ if name is None:
+ if len(job_list) > 1:
+ raise ValueError('Configuration has multiple jobs but no job name specified!')
+ return maybe_bind(job_list[0])
+
+ # TODO(wfarner): Rework this and calling code to make name optional as well.
+ def match_name(job):
+ return str(job.name()) == name
+ def match_cluster(job):
+ return select_cluster is None or str(job.cluster()) == select_cluster
+ def match_env(job):
+ return select_env is None or str(job.environment()) == select_env
+ def match_role(job):
+ return select_role is None or str(job.role()) == select_role
+
+ bound_jobs = map(maybe_bind, job_list)
+ matches = [j for j in bound_jobs if
+ all([match_cluster(j), match_role(j), match_env(j), match_name(j)])]
+
+ if len(matches) == 0:
+ msg = "Could not find job %s/%s/%s/%s\n" % (
+ select_cluster or '*', select_role or '*', select_env or '*', name)
+ for j in bound_jobs:
+ if j.environment() is Empty:
+ msg += "Job %s/%s/%s/%s in configuration file doesn't specify an environment\n" % (
+ j.cluster(), j.role(), '{MISSING}', j.name()
+ )
+ msg += cls._candidate_jobs_str(bound_jobs)
+ raise ValueError(msg)
+
+ elif len(matches) > 1:
+ msg = 'Multiple jobs match, please disambiguate by specifying a job key.\n'
+ msg += cls._candidate_jobs_str(bound_jobs)
+ raise ValueError(msg)
+ else:
+ return matches[0]
+
+ @staticmethod
+ def _candidate_jobs_str(job_list):
+ assert(job_list)
+ job_list = [" %s/%s/%s/%s" % (
+ j.cluster(), j.role(),
+ j.environment() if j.environment() is not Empty else "{MISSING}",
+ j.name())
+ for j in job_list]
+ return 'Candidates are:\n' + '\n'.join(job_list)
+
+ @classmethod
+ def apply_plugins(cls, config, env=None):
+ for plugin in cls.plugins():
+ if not callable(plugin):
+ raise cls.Error('Invalid configuration plugin %r, should be callable!' % plugin)
+ plugin(config, env)
+ return config
+
+ @classmethod
+ def load(
+ cls, filename, name=None, bindings=None,
+ select_cluster=None, select_role=None, select_env=None):
+ # TODO(atollenaere): should take a JobKey when non-jobkey interface is deprecated
+ env = AuroraConfigLoader.load(filename)
+ return cls.apply_plugins(
+ cls(cls.pick(env, name, bindings, select_cluster, select_role, select_env)), env)
+
+ @classmethod
+ def load_json(
+ cls, filename, name=None, bindings=None,
+ select_cluster=None, select_role=None, select_env=None):
+ # TODO(atollenaere): should take a JobKey when non-jobkey interface is deprecated
+ job = AuroraConfigLoader.load_json(filename)
+ return cls.apply_plugins(cls(job.bind(*bindings) if bindings else job))
+
+ @classmethod
+ def loads_json(cls, string, name=None, bindings=None, select_cluster=None, select_env=None):
+ # TODO(atollenaere): should take a JobKey when non-jobkey interface is deprecated
+ job = AuroraConfigLoader.loads_json(string)
+ return cls.apply_plugins(cls(job.bind(*bindings) if bindings else job))
+
+ @classmethod
+ def validate_job(cls, job):
+ """
+ Validate and sanitize the input job
+
+ Currently, the validation stage simply ensures that the job has all required fields.
+ self.InvalidConfig is raised if any required fields are not present.
+ """
+ def has(pystachio_type, thing):
+ return getattr(pystachio_type, 'has_%s' % thing)()
+ for required in ("cluster", "task", "role"):
+ if not has(job, required):
+ raise cls.InvalidConfig(
+ '%s required for job "%s"' % (required.capitalize(), job.name()))
+ if not has(job.task(), 'processes'):
+ raise cls.InvalidConfig('Processes required for task on job "%s"' % job.name())
+
+ @classmethod
+ def standard_bindings(cls, job):
+ # Rewrite now-deprecated bindings into their proper form.
+ return job.bind({
+ Ref.from_address('mesos.role'): '{{role}}',
+ Ref.from_address('mesos.cluster'): '{{cluster}}',
+ Ref.from_address('thermos.user'): '{{role}}',
+ })
+
+ def __init__(self, job):
+ self.validate_job(job)
+ self._job = self.standard_bindings(job)
+ self._packages = []
+ self.binding_dicts = defaultdict(dict)
+ self.hooks = []
+
+ def context(self, instance=None):
+ context = dict(instance=instance)
+ # Filter unspecified values
+ return Environment(mesos=MesosContext(dict((k, v) for k, v in context.items() if v)))
+
+ def job(self):
+ interpolated_job = self._job % self.context()
+
+ # TODO(wickman) Once thermos is onto thrift instead of pystachio, use
+ # %%replacements%% instead.
+ #
+ # Typecheck against the Job, with the following free variables unwrapped at the Task level:
+ # - a dummy {{mesos.instance}}
+ # - dummy values for the {{thermos.ports}} context, to allow for their use in task_links
+ env = dict(mesos=Environment(instance=0))
+ if interpolated_job.task_links() is not Empty:
+ try:
+ dummy_ports = dict(
+ (port, 31337) for port in PortExtractor.extract(interpolated_job.task_links()))
+ except PortExtractor.InvalidPorts as err:
+ raise self.InvalidConfig('Invalid port references in task_links! %s' % err)
+ env.update(thermos=ThermosContext(ports=dummy_ports))
+ typecheck = interpolated_job.bind(Environment(env)).check()
+ if not typecheck.ok():
+ raise self.InvalidConfig(typecheck.message())
+ interpolated_job = interpolated_job(task_links=self.task_links())
+ try:
+ return convert_thrift(interpolated_job, self._packages, self.ports())
+ except InvalidThriftConfig as e:
+ raise self.InvalidConfig(str(e))
+
+ def bind(self, binding):
+ self._job = self._job.bind(binding)
+
+ def raw(self):
+ return self._job
+
+ # This stinks to high heaven
+ def update_job(self, new_job):
+ self._job = new_job
+
+ def instances(self):
+ return self._job.instances().get()
+
+ def task(self, instance):
+ return (self._job % self.context(instance)).task()
+
+ def name(self):
+ return self._job.name().get()
+
+ def role(self):
+ return self._job.role().get()
+
+ def cluster(self):
+ return self._job.cluster().get()
+
+ def environment(self):
+ return self._job.environment().get()
+
+ def job_key(self):
+ return AuroraJobKey(self.cluster(), self.role(), self.environment(), self.name())
+
+ def ports(self):
+ """Return the list of ports that need to be allocated by the scheduler."""
+
+ # Strictly speaking this is wrong -- it is possible to do things like
+ # {{thermos.ports[instance_{{mesos.instance}}]}}
+ # which can only be extracted post-unwrapping. This means that validating
+ # the state of the announce configuration could be problematic if people
+ # try to do complicated things.
+ referenced_ports = ThermosTaskWrapper(self._job.task(), strict=False).ports()
+ resolved_portmap = PortResolver.resolve(self._job.announce().portmap().get()
+ if self._job.has_announce() else {})
+
+ # values of the portmap that are not integers => unallocated
+ unallocated = set(port for port in resolved_portmap.values() if not isinstance(port, int))
+
+ # find referenced {{thermos.portmap[ports]}} that are not resolved by the portmap
+ unresolved_references = set(
+ port for port in (resolved_portmap.get(port_ref, port_ref) for port_ref in referenced_ports)
+ if not isinstance(port, int))
+
+ return unallocated | unresolved_references
+
+ def has_health_port(self):
+ return "health" in ThermosTaskWrapper(self._job.task(), strict=False).ports()
+
+ def task_links(self):
+ # {{mesos.instance}} --> %shard_id%
+ # {{thermos.ports[foo]}} --> %port:foo%
+ task_links = self._job.task_links()
+ if task_links is Empty:
+ return task_links
+ _, uninterp = task_links.interpolate()
+ substitutions = {
+ Ref.from_address('mesos.instance'): '%shard_id%'
+ }
+ port_scope = Ref.from_address('thermos.ports')
+ for ref in uninterp:
+ subscope = port_scope.scoped_to(ref)
+ if subscope:
+ substitutions[ref] = '%%port:%s%%' % subscope.action().value
+ return task_links.bind(substitutions)
+
+ def update_config(self):
+ return self._job.update_config()
+
+ def add_package(self, package):
+ self._packages.append(package)
+
+ # TODO(wickman) Kill package() once MESOS-3191 is in.
+ def package(self):
+ pass
+
+ def is_dedicated(self):
+ return self._job.has_constraints() and 'dedicated' in self._job.constraints()
+
+ def __repr__(self):
+ return '%s(%r)' % (self.__class__.__name__, self._job)
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/apache/aurora/config/loader.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/aurora/config/loader.py b/src/main/python/apache/aurora/config/loader.py
new file mode 100644
index 0000000..e3e5559
--- /dev/null
+++ b/src/main/python/apache/aurora/config/loader.py
@@ -0,0 +1,63 @@
+import json
+import pkgutil
+import textwrap
+
+from twitter.aurora.config.schema import base as base_schema
+
+from pystachio.config import Config as PystachioConfig
+
+
+class AuroraConfigLoader(PystachioConfig):
+ SCHEMA_MODULES = []
+
+ @classmethod
+ def assembled_schema(cls, schema_modules):
+ default_schema = [super(AuroraConfigLoader, cls).DEFAULT_SCHEMA]
+ default_schema.extend('from %s import *' % module.__name__ for module in schema_modules)
+ return '\n'.join(default_schema)
+
+ @classmethod
+ def register_schema(cls, schema_module):
+ """Register the schema defined in schema_module, equivalent to doing
+
+ from schema_module.__name__ import *
+
+ before all pystachio configurations are evaluated.
+ """
+ cls.SCHEMA_MODULES.append(schema_module)
+ cls.DEFAULT_SCHEMA = cls.assembled_schema(cls.SCHEMA_MODULES)
+
+ @classmethod
+ def register_schemas_from(cls, package):
+ """Register schemas from all modules in a particular package."""
+ for _, submodule, is_package in pkgutil.iter_modules(package.__path__):
+ if is_package:
+ continue
+ cls.register_schema(
+ __import__('%s.%s' % (package.__name__, submodule), fromlist=[package.__name__]))
+
+ @classmethod
+ def flush_schemas(cls):
+ """Flush all schemas from AuroraConfigLoader. Intended for test use only."""
+ cls.SCHEMA_MODULES = []
+ cls.register_schema(base_schema)
+
+ @classmethod
+ def load(cls, loadable):
+ return cls.load_raw(loadable).environment
+
+ @classmethod
+ def load_raw(cls, loadable):
+ return cls(loadable)
+
+ @classmethod
+ def load_json(cls, filename):
+ with open(filename) as fp:
+ return base_schema.Job.json_load(fp)
+
+ @classmethod
+ def loads_json(cls, string):
+ return base_schema.Job(json.loads(string))
+
+
+AuroraConfigLoader.flush_schemas()
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/apache/aurora/config/port_resolver.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/aurora/config/port_resolver.py b/src/main/python/apache/aurora/config/port_resolver.py
new file mode 100644
index 0000000..486095f
--- /dev/null
+++ b/src/main/python/apache/aurora/config/port_resolver.py
@@ -0,0 +1,45 @@
+from twitter.common.lang import Compatibility
+
+
+class PortResolver(object):
+ class CycleException(Exception): pass
+
+ @classmethod
+ def resolve(cls, portmap):
+ """
+ Given an announce-style portmap, return a fully dereferenced portmap.
+
+ For example, given the portmap:
+ {
+ 'http': 80,
+ 'aurora: 'http',
+ 'https': 'aurora',
+ 'thrift': 'service'
+ }
+
+ Returns {'http': 80, 'aurora': 80, 'https': 80, 'thrift': 'service'}
+ """
+ for (name, port) in portmap.items():
+ if not isinstance(name, Compatibility.string):
+ raise ValueError('All portmap keys must be strings!')
+ if not isinstance(port, (int, Compatibility.string)):
+ raise ValueError('All portmap values must be strings or integers!')
+
+ portmap = portmap.copy()
+ for port in list(portmap):
+ try:
+ portmap[port] = int(portmap[port])
+ except ValueError:
+ continue
+
+ def resolve_one(static_port):
+ visited = set()
+ root = portmap[static_port]
+ while root in portmap:
+ visited.add(root)
+ if portmap[root] in visited:
+ raise cls.CycleException('Found cycle in portmap!')
+ root = portmap[root]
+ return root
+
+ return dict((name, resolve_one(name)) for name in portmap)
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/apache/aurora/config/recipes.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/aurora/config/recipes.py b/src/main/python/apache/aurora/config/recipes.py
new file mode 100644
index 0000000..b2dc23c
--- /dev/null
+++ b/src/main/python/apache/aurora/config/recipes.py
@@ -0,0 +1,44 @@
+import os
+
+from .loader import AuroraConfigLoader
+
+import pkg_resources
+
+
+class Recipes(object):
+ """
+ Encapsulate a registry of Recipes (i.e. tasks to mutate the behavior of other tasks.)
+ """
+ REGISTRY = {}
+ RECIPE_EXTENSION = '.aurora_recipe'
+
+ class Error(Exception): pass
+ class UnknownRecipe(Error): pass
+
+ @classmethod
+ def get(cls, name):
+ if name not in cls.REGISTRY:
+ raise cls.UnknownRecipe('Could not find recipe %s!' % name)
+ return cls.REGISTRY[name]
+
+ @classmethod
+ def include_one(cls, filename):
+ recipe_env = AuroraConfigLoader.load(filename)
+ cls.REGISTRY.update(recipe_env.get('recipes', {}))
+
+ @classmethod
+ def include_module(cls, module):
+ for filename in pkg_resources.resource_listdir(module, ''):
+ if filename.endswith(cls.RECIPE_EXTENSION):
+ cls.include_one(os.path.join(module.replace('.', os.sep), filename))
+
+ @classmethod
+ def include(cls, path):
+ if os.path.isfile(path):
+ cls.include_one(path)
+ elif os.path.isdir(path):
+ for filename in os.listdir(path):
+ if filename.endswith(cls.RECIPE_EXTENSION):
+ cls.include_one(os.path.join(path, filename))
+ else:
+ raise ValueError('Could not find %s' % path)
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/apache/aurora/config/repl.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/aurora/config/repl.py b/src/main/python/apache/aurora/config/repl.py
new file mode 100644
index 0000000..d26f94d
--- /dev/null
+++ b/src/main/python/apache/aurora/config/repl.py
@@ -0,0 +1,8 @@
+from twitter.aurora.config.loader import AuroraConfigLoader
+from twitter.common.lang import Compatibility
+
+
+
+import code
+code.interact('Mesos Config REPL',
+ local=Compatibility.exec_function(AuroraConfigLoader.DEFAULT_SCHEMA, globals()))
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/apache/aurora/config/schema/BUILD
----------------------------------------------------------------------
diff --git a/src/main/python/apache/aurora/config/schema/BUILD b/src/main/python/apache/aurora/config/schema/BUILD
new file mode 100644
index 0000000..fd08ec7
--- /dev/null
+++ b/src/main/python/apache/aurora/config/schema/BUILD
@@ -0,0 +1,9 @@
+python_library(
+ name = 'schema',
+ sources = ['base.py'],
+ dependencies = [
+ pants('src/main/python/twitter/aurora/BUILD.thirdparty:pystachio'),
+ pants('src/main/python/twitter/thermos/config:schema'),
+ pants('src/main/thrift/com/twitter/aurora/gen:py-thrift'),
+ ]
+)
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/apache/aurora/config/schema/__init__.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/aurora/config/schema/__init__.py b/src/main/python/apache/aurora/config/schema/__init__.py
new file mode 100644
index 0000000..e69de29
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/apache/aurora/config/schema/base.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/aurora/config/schema/base.py b/src/main/python/apache/aurora/config/schema/base.py
new file mode 100644
index 0000000..b3d437f
--- /dev/null
+++ b/src/main/python/apache/aurora/config/schema/base.py
@@ -0,0 +1,122 @@
+from twitter.thermos.config.schema import *
+
+from gen.twitter.aurora.constants import DEFAULT_ENVIRONMENT
+
+
+# TODO(wickman) Bind {{mesos.instance}} to %shard_id%
+class MesosContext(Struct):
+ # The instance id (i.e. replica id, shard id) in the context of a task
+ instance = Required(Integer)
+
+
+# AppApp layout setup
+class AppPackage(Struct):
+ name = Required(String)
+ version = Default(String, 'latest')
+
+class AppLayout(Struct):
+ packages = Default(List(AppPackage), [])
+
+
+# The object bound into the {{packer}} namespace.
+# Referenced by
+# {{packer[role][name][version]}}
+#
+# Where version =
+# number (integer)
+# 'live' (live package)
+# 'latest' (highest version number)
+#
+# For example if you'd like to create a copy process for a particular
+# package,
+# copy_latest = Process(
+# name = 'copy-{{package_name}}',
+# cmdline = '{{packer[{{role}}][{{package_name}}][latest].copy_command}}')
+# processes = [
+# copy_latest.bind(package_name = 'labrat'),
+# copy_latest.bind(package_name = 'packer')
+# ]
+class PackerObject(Struct):
+ package = String
+ package_uri = String
+ copy_command = String
+
+
+class UpdateConfig(Struct):
+ batch_size = Default(Integer, 1)
+ restart_threshold = Default(Integer, 60)
+ watch_secs = Default(Integer, 30)
+ max_per_shard_failures = Default(Integer, 0)
+ max_total_failures = Default(Integer, 0)
+
+
+class HealthCheckConfig(Struct):
+ initial_interval_secs = Default(Float, 60.0)
+ interval_secs = Default(Float, 30.0)
+ timeout_secs = Default(Float, 1.0)
+ max_consecutive_failures = Default(Integer, 0)
+
+
+class Announcer(Struct):
+ primary_port = Default(String, 'http')
+
+ # Portmap can either alias two ports together, e.g.
+ # aurora <= http
+ # Or it can be used to alias static ports to endpoints, e.g.
+ # http <= 80
+ # https <= 443
+ # aurora <= https
+ portmap = Default(Map(String, String), {
+ 'aurora': '{{primary_port}}'
+ })
+
+
+# The executorConfig populated inside of TaskConfig.
+class MesosTaskInstance(Struct):
+ task = Required(Task)
+ layout = AppLayout
+ instance = Required(Integer)
+ role = Required(String)
+ announce = Announcer
+ environment = Default(String, DEFAULT_ENVIRONMENT)
+ health_check_interval_secs = Default(Integer, 30) # DEPRECATED (MESOS-2649)
+ health_check_config = Default(HealthCheckConfig, HealthCheckConfig())
+
+
+class MesosJob(Struct):
+ name = Default(String, '{{task.name}}')
+ role = Required(String)
+ contact = String
+ cluster = Required(String)
+ environment = Required(String)
+ instances = Default(Integer, 1)
+ task = Required(Task)
+ recipes = List(String)
+ announce = Announcer
+
+ cron_schedule = String
+ cron_policy = String # these two are aliases of each other. default is KILL_EXISTING
+ cron_collision_policy = String # if unspecified.
+ # cron_policy is DEPRECATED (MESOS-2491) in favor of
+ # cron_collision_policy.
+
+ update_config = Default(UpdateConfig, UpdateConfig())
+
+ constraints = Map(String, String)
+ daemon = Boolean # daemon and service are aliased together.
+ service = Boolean # daemon is DEPRECATED (MESOS-2492) in favor of
+ # service. by default, service is False.
+ max_task_failures = Default(Integer, 1)
+ production = Default(Boolean, False)
+ priority = Default(Integer, 0)
+ health_check_interval_secs = Integer # DEPRECATED in favor of health_check_config (MESOS-2649).
+ health_check_config = HealthCheckConfig
+ task_links = Map(String, String)
+
+ layout = AppLayout # DEPRECATED in favor of directory sandboxes
+
+ enable_hooks = Default(Boolean, False) # enable client API hooks; from env python-list 'hooks'
+
+
+Job = MesosJob
+Service = Job(service = True)
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/apache/aurora/config/thrift.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/aurora/config/thrift.py b/src/main/python/apache/aurora/config/thrift.py
new file mode 100644
index 0000000..8f6a5ce
--- /dev/null
+++ b/src/main/python/apache/aurora/config/thrift.py
@@ -0,0 +1,259 @@
+import getpass
+import re
+
+from twitter.aurora.config.schema.base import (
+ HealthCheckConfig,
+ MesosContext,
+ MesosTaskInstance,
+)
+from twitter.common.lang import Compatibility
+from twitter.thermos.config.loader import ThermosTaskValidator
+
+from gen.twitter.aurora.constants import GOOD_IDENTIFIER_PATTERN_PYTHON, AURORA_EXECUTOR_NAME
+from gen.twitter.aurora.ttypes import (
+ Constraint,
+ CronCollisionPolicy,
+ ExecutorConfig,
+ Identity,
+ JobConfiguration,
+ JobKey,
+ LimitConstraint,
+ Package,
+ TaskConfig,
+ TaskConstraint,
+ ValueConstraint,
+)
+
+from pystachio import Empty, Ref
+
+__all__ = (
+ 'InvalidConfig',
+ 'convert'
+)
+
+
+class InvalidConfig(ValueError):
+ pass
+
+
+def constraints_to_thrift(constraints):
+ """Convert a python dictionary to a set of Constraint thrift objects."""
+ result = set()
+ for attribute, constraint_value in constraints.items():
+ assert isinstance(attribute, Compatibility.string) and (
+ isinstance(constraint_value, Compatibility.string)), (
+ "Both attribute name and value in constraints must be string")
+ constraint = Constraint()
+ constraint.name = attribute
+ task_constraint = TaskConstraint()
+ if constraint_value.startswith('limit:'):
+ task_constraint.limit = LimitConstraint()
+ try:
+ task_constraint.limit.limit = int(constraint_value.replace('limit:', '', 1))
+ except ValueError:
+ print('%s is not a valid limit value, must be integer' % constraint_value)
+ raise
+ else:
+ # Strip off the leading negation if present.
+ negated = constraint_value.startswith('!')
+ if negated:
+ constraint_value = constraint_value[1:]
+ task_constraint.value = ValueConstraint(negated, set(constraint_value.split(',')))
+ constraint.constraint = task_constraint
+ result.add(constraint)
+ return result
+
+
+def task_instance_from_job(job, instance):
+ instance_context = MesosContext(instance=instance)
+ # TODO(Sathya): Remove health_check_interval_secs references after deprecation cycle is complete.
+ health_check_config = HealthCheckConfig()
+ if job.has_health_check_interval_secs():
+ health_check_config = HealthCheckConfig(interval_secs=job.health_check_interval_secs().get())
+ elif job.has_health_check_config():
+ health_check_config = job.health_check_config()
+ ti = MesosTaskInstance(task=job.task(),
+ layout=job.layout(),
+ role=job.role(),
+ health_check_interval_secs=health_check_config.interval_secs().get(),
+ health_check_config=health_check_config,
+ instance=instance)
+ if job.has_announce():
+ ti = ti(announce=job.announce())
+ if job.has_environment():
+ ti = ti(environment=job.environment())
+ return ti.bind(mesos=instance_context).interpolate()
+
+
+def translate_cron_policy(policy):
+ cron_policy = CronCollisionPolicy._NAMES_TO_VALUES.get(policy.get())
+ if cron_policy is None:
+ raise InvalidConfig('Invalid cron policy: %s' % policy.get())
+ return cron_policy
+
+
+def fully_interpolated(pystachio_object, coerce_fn=lambda i: i):
+ # Extract a fully-interpolated unwrapped object from pystachio_object or raise InvalidConfig.
+ #
+ # TODO(ksweeney): Remove this once Pystachio 1.0 changes the behavior of interpolate() to return
+ # unwrapped objects and fail when there are any unbound refs.
+ if not pystachio_object.check().ok():
+ raise InvalidConfig(pystachio_object.check().message())
+
+ # If an object type-checks it's okay to use the raw value from the wrapped object returned by
+ # interpolate. Without the previous check value.get() could return a string with mustaches
+ # instead of an object of the expected type.
+ value, _ = pystachio_object.interpolate()
+ return coerce_fn(value.get())
+
+
+def select_cron_policy(cron_policy, cron_collision_policy):
+ if cron_policy is Empty and cron_collision_policy is Empty:
+ return CronCollisionPolicy.KILL_EXISTING
+ elif cron_policy is not Empty and cron_collision_policy is Empty:
+ return translate_cron_policy(cron_policy)
+ elif cron_policy is Empty and cron_collision_policy is not Empty:
+ return translate_cron_policy(cron_collision_policy)
+ else:
+ raise InvalidConfig('Specified both cron_policy and cron_collision_policy!')
+
+
+def select_service_bit(job):
+ if not job.has_daemon() and not job.has_service():
+ return False
+ elif job.has_daemon() and not job.has_service():
+ return fully_interpolated(job.daemon(), bool)
+ elif not job.has_daemon() and job.has_service():
+ return fully_interpolated(job.service(), bool)
+ else:
+ raise InvalidConfig('Specified both daemon and service bits!')
+
+
+# TODO(wickman) Due to MESOS-2718 we should revert to using the MesosTaskInstance.
+#
+# Using the MesosJob instead of the MesosTaskInstance was to allow for
+# planned future use of fields such as 'cluster' and to allow for conversion
+# from Job=>Task to be done entirely on the executor, but instead this had
+# made it impossible to run idempotent updates.
+#
+# In the meantime, we are erasing fields of the Job that are controversial.
+# This achieves roughly the same effect as using the MesosTaskInstance.
+# The future work is tracked at MESOS-2727.
+ALIASED_FIELDS = (
+ 'cron_policy',
+ 'cron_collision_policy',
+ 'update_config',
+ 'daemon',
+ 'service',
+ 'instances'
+)
+
+
+def filter_aliased_fields(job):
+ return job(**dict((key, Empty) for key in ALIASED_FIELDS))
+
+
+def assert_valid_field(field, identifier):
+ VALID_IDENTIFIER = re.compile(GOOD_IDENTIFIER_PATTERN_PYTHON)
+ if not isinstance(identifier, Compatibility.string):
+ raise InvalidConfig("%s must be a string" % field)
+ if not VALID_IDENTIFIER.match(identifier):
+ raise InvalidConfig("Invalid %s '%s'" % (field, identifier))
+ return identifier
+
+
+MESOS_INSTANCE_REF = Ref.from_address('mesos.instance')
+THERMOS_PORT_SCOPE_REF = Ref.from_address('thermos.ports')
+THERMOS_TASK_ID_REF = Ref.from_address('thermos.task_id')
+
+
+# TODO(wickman) Make this a method directly on an AuroraConfig so that we don't
+# need the packages/ports shenanigans.
+def convert(job, packages=frozenset(), ports=frozenset()):
+ """Convert a Pystachio MesosJob to an Aurora Thrift JobConfiguration."""
+
+ owner = Identity(role=fully_interpolated(job.role()), user=getpass.getuser())
+ key = JobKey(
+ role=assert_valid_field('role', fully_interpolated(job.role())),
+ environment=assert_valid_field('environment', fully_interpolated(job.environment())),
+ name=assert_valid_field('name', fully_interpolated(job.name())))
+
+ task_raw = job.task()
+
+ MB = 1024 * 1024
+ task = TaskConfig()
+
+ def not_empty_or(item, default):
+ return default if item is Empty else fully_interpolated(item)
+
+ # job components
+ task.jobName = fully_interpolated(job.name())
+ task.environment = fully_interpolated(job.environment())
+ task.production = fully_interpolated(job.production(), bool)
+ task.isService = select_service_bit(job)
+ task.maxTaskFailures = fully_interpolated(job.max_task_failures())
+ task.priority = fully_interpolated(job.priority())
+ task.contactEmail = not_empty_or(job.contact(), None)
+
+ # Add package tuples to a task, to display in the scheduler UI.
+ task.packages = frozenset(
+ Package(role=str(role), name=str(package_name), version=int(version))
+ for role, package_name, version in packages)
+
+ # task components
+ if not task_raw.has_resources():
+ raise InvalidConfig('Task must specify resources!')
+
+ if (fully_interpolated(task_raw.resources().ram()) == 0
+ or fully_interpolated(task_raw.resources().disk()) == 0):
+ raise InvalidConfig('Must specify ram and disk resources, got ram:%r disk:%r' % (
+ fully_interpolated(task_raw.resources().ram()),
+ fully_interpolated(task_raw.resources().disk())))
+
+ task.numCpus = fully_interpolated(task_raw.resources().cpu())
+ task.ramMb = fully_interpolated(task_raw.resources().ram()) / MB
+ task.diskMb = fully_interpolated(task_raw.resources().disk()) / MB
+ if task.numCpus <= 0 or task.ramMb <= 0 or task.diskMb <= 0:
+ raise InvalidConfig('Task has invalid resources. cpu/ramMb/diskMb must all be positive: '
+ 'cpu:%r ramMb:%r diskMb:%r' % (task.numCpus, task.ramMb, task.diskMb))
+
+ task.owner = owner
+ task.requestedPorts = ports
+ task.taskLinks = not_empty_or(job.task_links(), {})
+ task.constraints = constraints_to_thrift(not_empty_or(job.constraints(), {}))
+
+ underlying, refs = job.interpolate()
+
+ # need to fake an instance id for the sake of schema checking
+ underlying_checked = underlying.bind(mesos = {'instance': 31337})
+ try:
+ ThermosTaskValidator.assert_valid_task(underlying_checked.task())
+ except ThermosTaskValidator.InvalidTaskError as e:
+ raise InvalidConfig('Task is invalid: %s' % e)
+ if not underlying_checked.check().ok():
+ raise InvalidConfig('Job not fully specified: %s' % underlying.check().message())
+
+ unbound = []
+ for ref in refs:
+ if ref == THERMOS_TASK_ID_REF or ref == MESOS_INSTANCE_REF or (
+ Ref.subscope(THERMOS_PORT_SCOPE_REF, ref)):
+ continue
+ unbound.append(ref)
+
+ if unbound:
+ raise InvalidConfig('Config contains unbound variables: %s' % ' '.join(map(str, unbound)))
+
+ cron_schedule = not_empty_or(job.cron_schedule(), '')
+ cron_policy = select_cron_policy(job.cron_policy(), job.cron_collision_policy())
+
+ task.executorConfig = ExecutorConfig(
+ name=AURORA_EXECUTOR_NAME,
+ data=filter_aliased_fields(underlying).json_dumps())
+
+ return JobConfiguration(
+ key=key,
+ owner=owner,
+ cronSchedule=cron_schedule,
+ cronCollisionPolicy=cron_policy,
+ taskConfig=task,
+ instanceCount=fully_interpolated(job.instances()))
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/apache/aurora/executor/BUILD
----------------------------------------------------------------------
diff --git a/src/main/python/apache/aurora/executor/BUILD b/src/main/python/apache/aurora/executor/BUILD
new file mode 100644
index 0000000..01701e9
--- /dev/null
+++ b/src/main/python/apache/aurora/executor/BUILD
@@ -0,0 +1,139 @@
+import os
+
+python_library(
+ name = 'thermos_task_runner',
+ sources = ['thermos_task_runner.py'],
+ dependencies = [
+ pants('aurora/twitterdeps/src/python/twitter/common/dirutil'),
+ pants('aurora/twitterdeps/src/python/twitter/common/log'),
+ pants('aurora/twitterdeps/src/python/twitter/common/quantity'),
+ pants('src/main/python/twitter/thermos/common'),
+ pants('src/main/python/twitter/thermos/config:schema'),
+ pants('src/main/python/twitter/thermos/core'),
+ pants('src/main/python/twitter/thermos/monitoring:monitor'),
+ pants('src/main/python/twitter/aurora/common:http_signaler'),
+ pants('src/main/python/twitter/aurora/executor/common:status_checker'),
+ pants('src/main/python/twitter/aurora/executor/common:task_info'),
+ pants('src/main/python/twitter/aurora/executor/common:task_runner'),
+ ]
+)
+
+python_library(
+ name = 'executor_detector',
+ sources = ['executor_detector.py'],
+ dependencies = [
+ pants('aurora/twitterdeps/src/python/twitter/common/string'),
+ ]
+)
+
+python_library(
+ name = 'executor_vars',
+ sources = ['executor_vars.py'],
+ dependencies = [
+ pants('src/main/python/twitter/aurora/BUILD.thirdparty:psutil'),
+ pants('aurora/twitterdeps/src/python/twitter/common/exceptions'),
+ pants('aurora/twitterdeps/src/python/twitter/common/metrics'),
+ pants('aurora/twitterdeps/src/python/twitter/common/python'),
+ pants('aurora/twitterdeps/src/python/twitter/common/quantity'),
+ pants('aurora/twitterdeps/src/python/twitter/common/string'),
+ ]
+)
+
+python_library(
+ name = 'status_manager',
+ sources = ['status_manager.py'],
+ dependencies = [
+ pants('aurora/twitterdeps/src/python/twitter/common/exceptions'),
+ pants('aurora/twitterdeps/src/python/twitter/common/log'),
+ pants('aurora/twitterdeps/src/python/twitter/common/quantity'),
+ pants('src/main/python/twitter/aurora/executor/common:status_checker'),
+ ]
+)
+
+python_library(
+ name = 'thermos_executor_base',
+ sources = ['executor_base.py'],
+ dependencies = [
+ pants('aurora/twitterdeps/src/python/twitter/common/log'),
+ pants('src/main/thrift/com/twitter/thermos:py-thrift'),
+ pants('src/main/python/twitter/aurora:mesos-core'),
+ pants('src/main/thrift/com/twitter/aurora/gen:py-thrift'),
+ ]
+)
+
+python_library(
+ name = 'thermos_executor',
+ sources = ['thermos_executor.py'],
+ dependencies = [
+ pants('src/main/python/twitter/aurora/BUILD.thirdparty:pystachio'),
+ pants(':status_manager'),
+ pants(':thermos_executor_base'),
+ pants('aurora/twitterdeps/src/python/twitter/common/app'),
+ pants('aurora/twitterdeps/src/python/twitter/common/concurrent'),
+ pants('aurora/twitterdeps/src/python/twitter/common/log'),
+ pants('aurora/twitterdeps/src/python/twitter/common/quantity'),
+ pants('src/main/python/twitter/aurora/executor/common:kill_manager'),
+ pants('src/main/python/twitter/aurora/executor/common:sandbox'),
+ pants('src/main/python/twitter/aurora/executor/common:task_info'),
+ pants('src/main/python/twitter/aurora/executor/common:task_runner'),
+ pants('src/main/python/twitter/aurora:mesos-core'),
+ ]
+)
+
+python_library(
+ name = 'thermos_runner',
+ sources = ['thermos_runner.py'],
+ dependencies = [
+ pants('aurora/twitterdeps/src/python/twitter/common/app'),
+ pants('aurora/twitterdeps/src/python/twitter/common/log'),
+ pants('src/main/python/twitter/thermos/common:planner'),
+ pants('src/main/python/twitter/thermos/config:schema'),
+ pants('src/main/python/twitter/thermos/core'),
+ ],
+)
+
+python_library(
+ name = 'gc_executor',
+ sources = ['gc_executor.py'],
+ dependencies = [
+ pants('src/main/python/twitter/aurora/BUILD.thirdparty:psutil'),
+ pants(':executor_detector'),
+ pants(':thermos_executor_base'),
+ pants('aurora/twitterdeps/src/python/twitter/common/collections'),
+ pants('aurora/twitterdeps/src/python/twitter/common/exceptions'),
+ pants('aurora/twitterdeps/src/python/twitter/common/log'),
+ pants('aurora/twitterdeps/src/python/twitter/common/metrics'),
+ pants('aurora/twitterdeps/src/python/twitter/common/quantity'),
+ pants('src/main/python/twitter/thermos/common:ckpt'),
+ pants('src/main/python/twitter/thermos/common:path'),
+ pants('src/main/python/twitter/thermos/core:helper'),
+ pants('src/main/python/twitter/thermos/core:inspector'),
+ pants('src/main/python/twitter/thermos/monitoring:detector'),
+ pants('src/main/python/twitter/thermos/monitoring:garbage'),
+ pants('src/main/python/twitter/aurora/config:schema'),
+ pants('src/main/python/twitter/aurora/executor/common:sandbox'),
+ pants('src/main/python/twitter/aurora:mesos-core'),
+ pants('src/main/thrift/com/twitter/aurora/gen:py-thrift'),
+ ]
+)
+
+python_library(
+ name = 'executor-packaged',
+ dependencies = [
+ # Covering dependencies
+ pants('src/main/python/twitter/aurora/common'),
+ pants('src/main/python/twitter/aurora/config'),
+ pants('src/main/python/twitter/thermos/common'),
+ pants('src/main/python/twitter/thermos/config'),
+ pants('src/main/python/twitter/thermos/core'),
+ pants('src/main/python/twitter/thermos/monitoring'),
+ ],
+ provides = setup_py(
+ name = 'twitter.aurora.executor',
+ version = open(os.path.join(get_buildroot(), '.auroraversion')).read().strip().lower(),
+ ).with_binaries(
+ gc_executor = pants('src/main/python/twitter/aurora/executor/bin:gc_executor'),
+ thermos_executor = pants('src/main/python/twitter/aurora/executor/bin:thermos_executor'),
+ thermos_runner = pants('src/main/python/twitter/aurora/executor/bin:thermos_runner'),
+ )
+)
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/apache/aurora/executor/__init__.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/aurora/executor/__init__.py b/src/main/python/apache/aurora/executor/__init__.py
new file mode 100644
index 0000000..b0d6433
--- /dev/null
+++ b/src/main/python/apache/aurora/executor/__init__.py
@@ -0,0 +1 @@
+__import__('pkg_resources').declare_namespace(__name__)
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/apache/aurora/executor/bin/BUILD
----------------------------------------------------------------------
diff --git a/src/main/python/apache/aurora/executor/bin/BUILD b/src/main/python/apache/aurora/executor/bin/BUILD
new file mode 100644
index 0000000..5e9cab1
--- /dev/null
+++ b/src/main/python/apache/aurora/executor/bin/BUILD
@@ -0,0 +1,47 @@
+python_binary(
+ name = 'thermos_executor',
+ source = 'thermos_executor_main.py',
+ entry_point = 'twitter.aurora.executor.bin.thermos_executor_main:proxy_main',
+ ignore_errors = True,
+ always_write_cache = True,
+ dependencies = [
+ pants('aurora/twitterdeps/src/python/twitter/common/app'),
+ pants('aurora/twitterdeps/src/python/twitter/common/log'),
+ pants('aurora/twitterdeps/src/python/twitter/common/metrics'),
+ pants('src/main/python/twitter/aurora/executor/common:executor_timeout'),
+ pants('src/main/python/twitter/aurora/executor/common:health_checker'),
+ pants('src/main/python/twitter/aurora/executor/common:sandbox'),
+ pants('src/main/python/twitter/aurora/executor:executor_detector'),
+ pants('src/main/python/twitter/aurora/executor:executor_vars'),
+ pants('src/main/python/twitter/aurora/executor:thermos_executor'),
+ pants('src/main/python/twitter/aurora/executor:thermos_task_runner'),
+ ]
+)
+
+python_binary(
+ name = 'gc_executor',
+ source = 'gc_executor_main.py',
+ entry_point = 'twitter.aurora.executor.bin.gc_executor_main:proxy_main',
+ ignore_errors = True,
+ always_write_cache = True,
+ dependencies = [
+ pants('aurora/twitterdeps/src/python/twitter/common/app'),
+ pants('aurora/twitterdeps/src/python/twitter/common/log'),
+ pants('aurora/twitterdeps/src/python/twitter/common/metrics'),
+ pants('src/main/python/twitter/thermos/common:path'),
+ pants('src/main/python/twitter/aurora/executor:executor_detector'),
+ pants('src/main/python/twitter/aurora/executor:executor_vars'),
+ pants('src/main/python/twitter/aurora/executor:gc_executor'),
+ ]
+)
+
+python_binary(
+ name = 'thermos_runner',
+ source = 'thermos_runner_main.py',
+ entry_point = 'twitter.aurora.executor.bin.thermos_runner_main:proxy_main',
+ dependencies = [
+ pants('aurora/twitterdeps/src/python/twitter/common/app'),
+ pants('aurora/twitterdeps/src/python/twitter/common/log'),
+ pants('src/main/python/twitter/aurora/executor:thermos_runner'),
+ ],
+)
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/apache/aurora/executor/bin/__init__.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/aurora/executor/bin/__init__.py b/src/main/python/apache/aurora/executor/bin/__init__.py
new file mode 100644
index 0000000..e69de29
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/apache/aurora/executor/bin/gc_executor_main.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/aurora/executor/bin/gc_executor_main.py b/src/main/python/apache/aurora/executor/bin/gc_executor_main.py
new file mode 100644
index 0000000..fad20e6
--- /dev/null
+++ b/src/main/python/apache/aurora/executor/bin/gc_executor_main.py
@@ -0,0 +1,45 @@
+"""Command-line entry point to the Thermos GC executor
+
+This module wraps the Thermos GC executor into an executable suitable for launching by a Mesos
+slave.
+
+"""
+
+from twitter.aurora.executor.executor_detector import ExecutorDetector
+from twitter.aurora.executor.gc_executor import ThermosGCExecutor
+from twitter.common import app, log
+from twitter.common.log.options import LogOptions
+from twitter.common.metrics.sampler import DiskMetricWriter
+from twitter.thermos.common.path import TaskPath
+
+import mesos
+
+
+app.configure(debug=True)
+
+
+# locate logs locally in executor sandbox
+LogOptions.set_simple(True)
+LogOptions.set_disk_log_level('DEBUG')
+LogOptions.set_log_dir(ExecutorDetector.LOG_PATH)
+
+
+def proxy_main():
+ def main():
+ # Create executor stub
+ thermos_gc_executor = ThermosGCExecutor(checkpoint_root=TaskPath.DEFAULT_CHECKPOINT_ROOT)
+ thermos_gc_executor.start()
+
+ # Start metrics collection
+ metric_writer = DiskMetricWriter(thermos_gc_executor.metrics, ExecutorDetector.VARS_PATH)
+ metric_writer.start()
+
+ # Create driver stub
+ driver = mesos.MesosExecutorDriver(thermos_gc_executor)
+
+ # Start GC executor
+ driver.run()
+
+ log.info('MesosExecutorDriver.run() has finished.')
+
+ app.main()
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/apache/aurora/executor/bin/thermos_executor_main.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/aurora/executor/bin/thermos_executor_main.py b/src/main/python/apache/aurora/executor/bin/thermos_executor_main.py
new file mode 100644
index 0000000..31e718e
--- /dev/null
+++ b/src/main/python/apache/aurora/executor/bin/thermos_executor_main.py
@@ -0,0 +1,67 @@
+"""Command-line entry point to the Thermos Executor
+
+This module wraps the Thermos Executor into an executable suitable for launching by a Mesos
+slave.
+
+"""
+
+import os
+
+from twitter.common import app, log
+from twitter.common.log.options import LogOptions
+
+from twitter.aurora.executor.common.executor_timeout import ExecutorTimeout
+from twitter.aurora.executor.common.health_checker import HealthCheckerProvider
+from twitter.aurora.executor.thermos_executor import ThermosExecutor
+from twitter.aurora.executor.thermos_task_runner import DefaultThermosTaskRunnerProvider
+
+import mesos
+
+
+app.configure(debug=True)
+LogOptions.set_simple(True)
+LogOptions.set_disk_log_level('DEBUG')
+LogOptions.set_log_dir('.')
+
+
+# TODO(wickman) Consider just having the OSS version require pip installed
+# thermos_runner binaries on every machine and instead of embedding the pex
+# as a resource, shell out to one on the PATH.
+def dump_runner_pex():
+ import pkg_resources
+ import twitter.aurora.executor.resources
+ pex_name = 'thermos_runner.pex'
+ runner_pex = os.path.join(os.path.realpath('.'), pex_name)
+ with open(runner_pex, 'w') as fp:
+ # TODO(wickman) Use shutil.copyfileobj to reduce memory footprint here.
+ fp.write(pkg_resources.resource_stream(
+ twitter.aurora.executor.resources.__name__, pex_name).read())
+ return runner_pex
+
+
+def proxy_main():
+ def main():
+ runner_provider = DefaultThermosTaskRunnerProvider(
+ dump_runner_pex(),
+ artifact_dir=os.path.realpath('.'),
+ )
+
+ # Create executor stub
+ thermos_executor = ThermosExecutor(
+ runner_provider=runner_provider,
+ status_providers=(HealthCheckerProvider(),),
+ )
+
+ # Create driver stub
+ driver = mesos.MesosExecutorDriver(thermos_executor)
+
+ # This is an ephemeral executor -- shutdown if we receive no tasks within a certain
+ # time period
+ ExecutorTimeout(thermos_executor.launched, driver).start()
+
+ # Start executor
+ driver.run()
+
+ log.info('MesosExecutorDriver.run() has finished.')
+
+ app.main()
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/apache/aurora/executor/bin/thermos_runner_main.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/aurora/executor/bin/thermos_runner_main.py b/src/main/python/apache/aurora/executor/bin/thermos_runner_main.py
new file mode 100644
index 0000000..602111e
--- /dev/null
+++ b/src/main/python/apache/aurora/executor/bin/thermos_runner_main.py
@@ -0,0 +1,12 @@
+from twitter.common import app
+from twitter.common.log.options import LogOptions
+from twitter.aurora.executor.thermos_runner import proxy_main as runner_proxy_main
+
+
+LogOptions.set_simple(True)
+
+
+def proxy_main():
+ main = runner_proxy_main
+
+ app.main()