You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aurora.apache.org by wf...@apache.org on 2013/12/31 22:20:06 UTC
[13/51] [partial] Rename twitter* and com.twitter to apache and
org.apache directories to preserve all file history before the refactor.
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/twitter/aurora/common/BUILD
----------------------------------------------------------------------
diff --git a/src/main/python/twitter/aurora/common/BUILD b/src/main/python/twitter/aurora/common/BUILD
deleted file mode 100644
index 4e839f7..0000000
--- a/src/main/python/twitter/aurora/common/BUILD
+++ /dev/null
@@ -1,63 +0,0 @@
-import os
-
-python_library(
- name = 'aurora_job_key',
- sources = ['aurora_job_key.py'],
- dependencies = [
- pants('aurora/twitterdeps/src/python/twitter/common/lang'),
- pants('src/main/thrift/com/twitter/aurora/gen:py-thrift'),
- ]
-)
-
-python_library(
- name = 'cluster',
- sources = ['cluster.py'],
- dependencies = [
- pants('src/main/python/twitter/aurora/BUILD.thirdparty:pystachio'),
- ]
-)
-
-python_library(
- name = 'clusters',
- sources = ['clusters.py'],
- dependencies = [
- pants(':cluster'),
- pants('src/main/python/twitter/aurora/BUILD.thirdparty:pystachio'),
- pants('aurora/twitterdeps/src/python/twitter/common/collections'),
- ]
-)
-
-python_library(
- name = 'cluster_option',
- sources = ['cluster_option.py'],
- dependencies = [
- pants('aurora/twitterdeps/src/python/twitter/common/lang'),
- ]
-)
-
-python_library(
- name = 'http_signaler',
- sources = ['http_signaler.py'],
- dependencies = [
- pants('aurora/twitterdeps/src/python/twitter/common/lang'),
- pants('aurora/twitterdeps/src/python/twitter/common/log'),
- ]
-)
-
-python_library(
- name = 'common',
- dependencies = [
- pants(':aurora_job_key'),
- pants(':cluster'),
- pants(':cluster_option'),
- pants(':clusters'),
- pants(':http_signaler'),
- pants('src/main/python/twitter/aurora/common/auth'),
- ],
- provides = setup_py(
- name = 'twitter.aurora.common',
- version = open(os.path.join(get_buildroot(), '.auroraversion')).read().strip().lower(),
- description = 'Aurora common libraries.',
- license = 'Apache License, Version 2.0',
- )
-)
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/twitter/aurora/common/__init__.py
----------------------------------------------------------------------
diff --git a/src/main/python/twitter/aurora/common/__init__.py b/src/main/python/twitter/aurora/common/__init__.py
deleted file mode 100644
index e69de29..0000000
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/twitter/aurora/common/aurora_job_key.py
----------------------------------------------------------------------
diff --git a/src/main/python/twitter/aurora/common/aurora_job_key.py b/src/main/python/twitter/aurora/common/aurora_job_key.py
deleted file mode 100644
index 81a8687..0000000
--- a/src/main/python/twitter/aurora/common/aurora_job_key.py
+++ /dev/null
@@ -1,97 +0,0 @@
-import re
-
-from twitter.common.lang import Compatibility, total_ordering
-
-from gen.twitter.aurora.constants import GOOD_IDENTIFIER_PATTERN_PYTHON
-from gen.twitter.aurora.ttypes import Identity, JobKey, TaskQuery
-
-# TODO(ksweeney): This can just probably just extend namedtuple.
-@total_ordering
-class AuroraJobKey(object):
- """A canonical representation of a key that can identify a job in any of the clusters the client
- is aware of."""
- class Error(Exception): pass
- class TypeError(TypeError, Error): pass
- class InvalidIdentifier(ValueError, Error): pass
- class ParseError(ValueError, Error): pass
-
- VALID_IDENTIFIER = re.compile(GOOD_IDENTIFIER_PATTERN_PYTHON)
-
- def __init__(self, cluster, role, env, name):
- if not isinstance(cluster, Compatibility.string):
- raise self.TypeError("cluster should be a string, got %s" % (cluster.__class__.__name__))
- self._cluster = cluster
- self._role = self._assert_valid_identifier("role", role)
- self._env = self._assert_valid_identifier("env", env)
- self._name = self._assert_valid_identifier("name", name)
-
- @classmethod
- def from_path(cls, path):
- try:
- cluster, role, env, name = path.split('/', 4)
- except ValueError:
- raise cls.ParseError(
- "Invalid path '%s'. path should be a string in the form CLUSTER/ROLE/ENV/NAME" % path)
- return cls(cluster, role, env, name)
-
- @classmethod
- def from_thrift(cls, cluster, job_key):
- if not isinstance(job_key, JobKey):
- raise cls.TypeError("job_key must be a Thrift JobKey struct")
- return cls(cluster, job_key.role, job_key.environment, job_key.name)
-
- @classmethod
- def _assert_valid_identifier(cls, field, identifier):
- if not isinstance(identifier, Compatibility.string):
- raise cls.TypeError("%s must be a string" % field)
- if not cls.VALID_IDENTIFIER.match(identifier):
- raise cls.InvalidIdentifier("Invalid %s '%s'" % (field, identifier))
- return identifier
-
- @property
- def cluster(self):
- return self._cluster
-
- @property
- def role(self):
- return self._role
-
- @property
- def env(self):
- return self._env
-
- @property
- def name(self):
- return self._name
-
- def to_path(self):
- return "%s/%s/%s/%s" % (self.cluster, self.role, self.env, self.name)
-
- def to_thrift(self):
- return JobKey(role=self.role, environment=self.env, name=self.name)
-
- def to_thrift_query(self):
- return TaskQuery(owner=Identity(role=self.role), environment=self.env, jobName=self.name)
-
- def __iter__(self):
- """Support 'cluster, role, env, name = job_key' assignment."""
- return iter((self._cluster, self._role, self._env, self._name))
-
- def __repr__(self):
- return "%s(%r, %r, %r, %r)" % (self.__class__, self._cluster, self._role, self._env, self._name)
-
- def __str__(self):
- return self.to_path()
-
- def __hash__(self):
- return hash(AuroraJobKey) + hash(self.to_path())
-
- def __eq__(self, other):
- if not isinstance(other, AuroraJobKey):
- return NotImplemented
- return self.to_path() == other.to_path()
-
- def __lt__(self, other):
- if not isinstance(other, AuroraJobKey):
- return NotImplemented
- return self.to_path() < other.to_path()
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/twitter/aurora/common/auth/BUILD
----------------------------------------------------------------------
diff --git a/src/main/python/twitter/aurora/common/auth/BUILD b/src/main/python/twitter/aurora/common/auth/BUILD
deleted file mode 100644
index fd5b024..0000000
--- a/src/main/python/twitter/aurora/common/auth/BUILD
+++ /dev/null
@@ -1,9 +0,0 @@
-python_library(
- name = 'auth',
- sources = globs('*.py'),
- dependencies = [
- pants('aurora/twitterdeps/src/python/twitter/common/lang'),
- pants('aurora/twitterdeps/src/python/twitter/common/log'),
- pants('src/main/thrift/com/twitter/aurora/gen:py-thrift'),
- ]
-)
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/twitter/aurora/common/auth/__init__.py
----------------------------------------------------------------------
diff --git a/src/main/python/twitter/aurora/common/auth/__init__.py b/src/main/python/twitter/aurora/common/auth/__init__.py
deleted file mode 100644
index 5418228..0000000
--- a/src/main/python/twitter/aurora/common/auth/__init__.py
+++ /dev/null
@@ -1,2 +0,0 @@
-from .auth_module_manager import make_session_key, register_auth_module, SessionKeyError
-from .auth_module import AuthModule, InsecureAuthModule
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/twitter/aurora/common/auth/auth_module.py
----------------------------------------------------------------------
diff --git a/src/main/python/twitter/aurora/common/auth/auth_module.py b/src/main/python/twitter/aurora/common/auth/auth_module.py
deleted file mode 100644
index 7b146a9..0000000
--- a/src/main/python/twitter/aurora/common/auth/auth_module.py
+++ /dev/null
@@ -1,30 +0,0 @@
-from abc import abstractmethod, abstractproperty
-
-import getpass
-import time
-
-from twitter.common.lang import Interface
-
-from gen.twitter.aurora.ttypes import SessionKey
-
-
-class AuthModule(Interface):
- @abstractproperty
- def mechanism(self):
- """Return the mechanism provided by this AuthModule."""
-
- @abstractmethod
- def payload(self):
- """Return the payload generated by the AuthModule."""
-
- def __call__(self):
- return SessionKey(mechanism=self.mechanism, data=self.payload())
-
-
-class InsecureAuthModule(AuthModule):
- @property
- def mechanism(self):
- return 'UNAUTHENTICATED'
-
- def payload(self):
- return 'UNAUTHENTICATED'
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/twitter/aurora/common/auth/auth_module_manager.py
----------------------------------------------------------------------
diff --git a/src/main/python/twitter/aurora/common/auth/auth_module_manager.py b/src/main/python/twitter/aurora/common/auth/auth_module_manager.py
deleted file mode 100644
index 03d51b3..0000000
--- a/src/main/python/twitter/aurora/common/auth/auth_module_manager.py
+++ /dev/null
@@ -1,47 +0,0 @@
-from twitter.common import log
-
-from gen.twitter.aurora.ttypes import SessionKey
-
-from .auth_module import AuthModule, InsecureAuthModule
-
-
-_INSECURE_AUTH_MODULE = InsecureAuthModule()
-_AUTH_MODULES = {
- _INSECURE_AUTH_MODULE.mechanism: _INSECURE_AUTH_MODULE
-}
-
-
-class SessionKeyError(Exception): pass
-
-
-def register_auth_module(auth_module):
- """
- Add an auth module into the registry used by make_session_key. An auth module is discovered
- via its auth mechanism.
-
- args:
- auth_module: A 0-arg callable that should return a SessionKey or raises a SessionKeyError
- and extend AuthModule.
- """
- if not isinstance(auth_module, AuthModule):
- raise TypeError('Given auth module must be a AuthModule subclass, got %s' % type(auth_module))
- if not callable(auth_module):
- raise TypeError('auth_module should be callable.')
- _AUTH_MODULES[auth_module.mechanism] = auth_module
-
-
-def make_session_key(auth_mechanism='UNAUTHENTICATED'):
- """
- Attempts to create a session key by calling the auth module registered to the auth mechanism.
- If an auth module does not exist for an auth mechanism, an InsecureAuthModule will be used.
- """
- if not _AUTH_MODULES:
- raise SessionKeyError('No auth modules have been registered. Please call register_auth_module.')
-
- auth_module = _AUTH_MODULES.get(auth_mechanism) or _INSECURE_AUTH_MODULE
- log.debug('Using auth module: %r' % auth_module)
- session_key = auth_module()
- if not isinstance(session_key, SessionKey):
- raise SessionKeyError('Expected %r but got %r from auth module %r' % (
- SessionKey, session_key.__class__, auth_module))
- return session_key
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/twitter/aurora/common/cluster.py
----------------------------------------------------------------------
diff --git a/src/main/python/twitter/aurora/common/cluster.py b/src/main/python/twitter/aurora/common/cluster.py
deleted file mode 100644
index 21d8ac3..0000000
--- a/src/main/python/twitter/aurora/common/cluster.py
+++ /dev/null
@@ -1,81 +0,0 @@
-from pystachio import Empty, Struct
-from pystachio.composite import Structural
-
-__all__ = ('Cluster',)
-
-
-# TODO(wickman) It seems like some of this Trait/Mixin stuff should be a
-# first-class construct in Pystachio. It could be a solution for extensible
-# Job/Task definitions.
-class Cluster(dict):
- """Cluster encapsulates a set of K/V attributes describing cluster configurations.
-
- Given a cluster, attributes may be accessed directly on them, e.g.
- cluster.name
- cluster.scheduler_zk_path
-
- In order to enforce particular "traits" of Cluster, use Cluster.Trait to construct
- enforceable schemas, e.g.
-
- class ResolverTrait(Cluster.Trait):
- scheduler_zk_ensemble = Required(String)
- scheduler_zk_path = Default(String, '/twitter/service/mesos/prod/scheduler')
-
- cluster = Cluster(name = 'west', scheduler_zk_ensemble = 'zookeeper.west.twttr.net')
-
- # Ensures that scheduler_zk_ensemble is defined in the cluster or it will raise a TypeError
- cluster.with_trait(ResolverTrait).scheduler_zk_ensemble
-
- # Will use the default if none is provided on Cluster.
- cluster.with_trait(ResolverTrait).scheduler_zk_path
- """
- Trait = Struct
-
- def __init__(self, **kwargs):
- self._traits = ()
- super(Cluster, self).__init__(**kwargs)
-
- def get_trait(self, trait):
- """Given a Cluster.Trait, extract that trait."""
- if not issubclass(trait, Structural):
- raise TypeError('provided trait must be a Cluster.Trait subclass, got %s' % type(trait))
- # TODO(wickman) Expose this in pystachio as a non-private or add a load method with strict=
- return trait(trait._filter_against_schema(self))
-
- def check_trait(self, trait):
- """Given a Cluster.Trait, typecheck that trait."""
- trait_check = self.get_trait(trait).check()
- if not trait_check.ok():
- raise TypeError(trait_check.message())
-
- def with_traits(self, *traits):
- """Return a cluster annotated with a set of traits."""
- new_cluster = self.__class__(**self)
- for trait in traits:
- new_cluster.check_trait(trait)
- new_cluster._traits = traits
- return new_cluster
-
- def with_trait(self, trait):
- """Return a cluster annotated with a single trait (helper for self.with_traits)."""
- return self.with_traits(trait)
-
- def __setitem__(self, key, value):
- raise TypeError('Clusters are immutable.')
-
- def __getattr__(self, attribute):
- for trait in self._traits:
- expressed_trait = self.get_trait(trait)
- if hasattr(expressed_trait, attribute):
- value = getattr(expressed_trait, attribute)()
- return None if value is Empty else value.get()
- try:
- return self[attribute]
- except KeyError:
- return self.__getattribute__(attribute)
-
- def __copy__(self):
- return self
-
- def __deepcopy__(self, memo):
- return self
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/twitter/aurora/common/cluster_option.py
----------------------------------------------------------------------
diff --git a/src/main/python/twitter/aurora/common/cluster_option.py b/src/main/python/twitter/aurora/common/cluster_option.py
deleted file mode 100644
index 3e07476..0000000
--- a/src/main/python/twitter/aurora/common/cluster_option.py
+++ /dev/null
@@ -1,82 +0,0 @@
-from copy import copy
-from optparse import (
- NO_DEFAULT,
- OptionValueError,
- Option)
-
-
-def _check_mesos_cluster(option, opt, value):
- cluster_name = value
- if option.clusters and cluster_name in option.clusters:
- return option.clusters[cluster_name]
- elif option.cluster_provider:
- return option.cluster_provider(cluster_name)
-
- cluster_list = ""
- if option.clusters:
- cluster_list = 'Valid options for clusters are %s' % ' '.join(option.clusters)
-
- raise OptionValueError(
- '%s is not a valid cluster for the %s option. %s' % (value, opt, cluster_list))
-
-
-class ClusterOption(Option):
- """A command-line Option that requires a valid cluster name and returns a Cluster object.
-
- Use in an @app.command_option decorator to avoid boilerplate. For example:
-
- CLUSTER_PATH = os.path.expanduser('~/.clusters')
- CLUSTERS = Clusters.from_json(CLUSTER_PATH)
-
- @app.command
- @app.command_option(ClusterOption('--cluster', default='smf1-test', clusters=CLUSTERS))
- def get_health(args, options):
- if options.cluster.zk_server:
- do_something(options.cluster)
-
- @app.command
- @app.command_option(ClusterOption('-s',
- '--source_cluster',
- default='smf1-test',
- clusters=CLUSTERS,
- help='Source cluster to pull metadata from.'))
- @app.command_option(ClusterOption('-d',
- '--dest_cluster',
- clusters=CLUSTERS,
- default='smf1-test'))
- def copy_metadata(args, options):
- if not options.source_cluster:
- print('required option source_cluster missing!')
- metadata_copy(options.source_cluster, options.dest_cluster)
- """
-
- # Needed since we're creating a new type for validation - see optparse docs.
- TYPES = copy(Option.TYPES) + ('mesos_cluster',)
- TYPE_CHECKER = copy(Option.TYPE_CHECKER)
- TYPE_CHECKER['mesos_cluster'] = _check_mesos_cluster
-
- def __init__(self, *opt_str, **attrs):
- """
- *opt_str: Same meaning as in twitter.common.options.Option, at least one is required.
- **attrs: See twitter.common.options.Option, with the following caveats:
-
- Exactly one of the following must be provided:
-
- clusters: A static Clusters object from which to pick clusters.
- cluster_provider: A function that takes a cluster name and returns a Cluster object.
- """
- self.clusters = attrs.pop('clusters', None)
- self.cluster_provider = attrs.pop('cluster_provider', None)
- if not (self.clusters is not None) ^ (self.cluster_provider is not None):
- raise ValueError('Must specify exactly one of clusters and cluster_provider.')
-
- default_attrs = dict(
- default=None,
- action='store',
- type='mesos_cluster',
- help='Mesos cluster to use (Default: %%default)'
- )
-
- combined_attrs = default_attrs
- combined_attrs.update(attrs) # Defensive copy
- Option.__init__(self, *opt_str, **combined_attrs) # old-style superclass
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/twitter/aurora/common/clusters.py
----------------------------------------------------------------------
diff --git a/src/main/python/twitter/aurora/common/clusters.py b/src/main/python/twitter/aurora/common/clusters.py
deleted file mode 100644
index 2777569..0000000
--- a/src/main/python/twitter/aurora/common/clusters.py
+++ /dev/null
@@ -1,135 +0,0 @@
-from __future__ import print_function
-
-from collections import Mapping, namedtuple
-from contextlib import contextmanager
-import itertools
-import json
-import os
-import sys
-
-from twitter.common.collections import maybe_list
-
-from .cluster import Cluster
-
-from pystachio import Required, String
-
-try:
- import yaml
- HAS_YAML = True
-except ImportError:
- HAS_YAML = False
-
-
-__all__ = (
- 'CLUSTERS',
- 'Clusters',
-)
-
-
-class NameTrait(Cluster.Trait):
- name = Required(String)
-
-
-Parser = namedtuple('Parser', 'loader exception')
-
-
-class Clusters(Mapping):
- class Error(Exception): pass
- class ClusterExists(Error): pass
- class ClusterNotFound(KeyError, Error): pass
- class UnknownFormatError(Error): pass
- class ParseError(Error): pass
-
- LOADERS = {'.json': Parser(json.load, ValueError)}
- if HAS_YAML:
- LOADERS['.yml'] = Parser(yaml.load, yaml.parser.ParserError)
-
- @classmethod
- def from_file(cls, filename):
- return cls(list(cls.iter_clusters(filename)))
-
- @classmethod
- def iter_clusters(cls, filename):
- _, ext = os.path.splitext(filename)
- if ext not in cls.LOADERS:
- raise cls.UnknownFormatError('Unknown clusters file extension: %r' % ext)
- with open(filename) as fp:
- loader, exc_type = cls.LOADERS[ext]
- try:
- document = loader(fp)
- except exc_type as e:
- raise cls.ParseError('Unable to parse %s: %s' % (filename, e))
- if isinstance(document, list):
- iterator = document
- elif isinstance(document, dict):
- iterator = document.values()
- else:
- raise cls.ParseError('Unknown layout in %s' % filename)
- for document in iterator:
- if not isinstance(document, dict):
- raise cls.ParseError('Clusters must be maps of key/value pairs, got %s' % type(document))
- # documents not adhering to NameTrait are ignored.
- if 'name' not in document:
- continue
- yield Cluster(**document)
-
- def __init__(self, cluster_list):
- self.replace(cluster_list)
-
- def replace(self, cluster_list):
- self._clusters = {}
- self.update(cluster_list)
-
- def update(self, cluster_list):
- cluster_list = maybe_list(cluster_list, expected_type=Cluster, raise_type=TypeError)
- for cluster in cluster_list:
- self.add(cluster)
-
- def add(self, cluster):
- """Add a cluster to this Clusters map."""
- cluster = Cluster(**cluster)
- cluster.check_trait(NameTrait)
- self._clusters[cluster.name] = cluster
-
- @contextmanager
- def patch(self, cluster_list):
- """Patch this Clusters instance with a new list of clusters in a
- contextmanager. Intended for testing purposes."""
- old_clusters = self._clusters.copy()
- self.replace(cluster_list)
- yield self
- self._clusters = old_clusters
-
- def __iter__(self):
- return iter(self._clusters)
-
- def __len__(self):
- return len(self._clusters)
-
- def __getitem__(self, name):
- try:
- return self._clusters[name]
- except KeyError:
- raise self.ClusterNotFound('Unknown cluster %s, valid clusters: %s' % (
- name, ', '.join(self._clusters.keys())))
-
-
-
-DEFAULT_SEARCH_PATHS = (
- os.environ.get('AURORA_CONFIG_ROOT') or '/etc/aurora',
- os.path.expanduser('~/.aurora')
-)
-
-
-CLUSTERS = Clusters(())
-
-
-def load():
- """(re-)load all clusters from the search path."""
- for search_path, ext in itertools.product(DEFAULT_SEARCH_PATHS, Clusters.LOADERS):
- filename = os.path.join(search_path, 'clusters' + ext)
- if os.path.exists(filename):
- CLUSTERS.update(Clusters.from_file(filename).values())
-
-
-load()
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/twitter/aurora/common/http_signaler.py
----------------------------------------------------------------------
diff --git a/src/main/python/twitter/aurora/common/http_signaler.py b/src/main/python/twitter/aurora/common/http_signaler.py
deleted file mode 100644
index 778ddc4..0000000
--- a/src/main/python/twitter/aurora/common/http_signaler.py
+++ /dev/null
@@ -1,82 +0,0 @@
-import contextlib
-from socket import timeout as SocketTimeout
-import sys
-
-from twitter.common import log
-from twitter.common.lang import Compatibility
-
-if Compatibility.PY3:
- from http.client import HTTPException
- import urllib.request as urllib_request
- from urllib.error import URLError, HTTPError
-else:
- from httplib import HTTPException
- import urllib2 as urllib_request
- from urllib2 import URLError, HTTPError
-
-
-class HttpSignaler(object):
- """Simple HTTP endpoint wrapper to check health or trigger quitquitquit/abortabortabort"""
- TIMEOUT_SECS = 1.0
- FAILURE_REASON_LENGTH = 10
-
- class Error(Exception): pass
- class QueryError(Error): pass
-
- def __init__(self, port, host='localhost', timeout_secs=TIMEOUT_SECS):
- self._host = host
- self._url_base = 'http://%s:%d/' % (host, port)
- self._timeout_secs = timeout_secs
-
- def url(self, endpoint):
- return self._url_base + endpoint
-
- @property
- def opener(self):
- return urllib_request.urlopen
-
- def query(self, endpoint, data=None):
- """Request an HTTP endpoint with a GET request (or POST if data is not None)"""
- url = self.url(endpoint)
- log.debug("%s: %s %s" % (self.__class__.__name__, 'GET' if data is None else 'POST', url))
-
- def raise_error(reason):
- raise self.QueryError('Failed to signal %s: %s' % (self.url(endpoint), reason))
-
- try:
- with contextlib.closing(
- self.opener(url, data, timeout=self._timeout_secs)) as fp:
- return fp.read()
- except (HTTPException, SocketTimeout) as e:
- # the type of an HTTPException is typically more useful than its contents (since for example
- # BadStatusLines are often empty). likewise with socket.timeout.
- raise_error('Error within %s' % e.__class__.__name__)
- except (URLError, HTTPError) as e:
- raise_error(e)
- except Exception as e:
- raise_error('Unexpected error: %s' % e)
-
- def __call__(self, endpoint, use_post_method=False, expected_response=None):
- """Returns a (boolean, string|None) tuple of (call success, failure reason)"""
- try:
- response = self.query(endpoint, '' if use_post_method else None).strip().lower()
- if expected_response is not None and response != expected_response:
- def shorten(string):
- return (string if len(string) < self.FAILURE_REASON_LENGTH
- else "%s..." % string[:self.FAILURE_REASON_LENGTH - 3])
- reason = 'Response differs from expected response (expected "%s", got "%s")'
- log.warning(reason % (expected_response, response))
- return (False, reason % (shorten(str(expected_response)), shorten(str(response))))
- else:
- return (True, None)
- except self.QueryError as e:
- return (False, str(e))
-
- def health(self):
- return self('health', use_post_method=False, expected_response='ok')
-
- def quitquitquit(self):
- return self('quitquitquit', use_post_method=True)
-
- def abortabortabort(self):
- return self('abortabortabort', use_post_method=True)
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/twitter/aurora/config/BUILD
----------------------------------------------------------------------
diff --git a/src/main/python/twitter/aurora/config/BUILD b/src/main/python/twitter/aurora/config/BUILD
deleted file mode 100644
index 91cb3e4..0000000
--- a/src/main/python/twitter/aurora/config/BUILD
+++ /dev/null
@@ -1,43 +0,0 @@
-import os
-
-# Alias for src/main/python/twitter/aurora/config/schema
-python_library(
- name = 'schema',
- dependencies = [
- pants('src/main/python/twitter/aurora/config/schema'),
- ]
-)
-
-python_library(
- name = 'config',
- sources = (
- '__init__.py',
- 'loader.py',
- 'port_resolver.py',
- 'thrift.py',
- ),
- dependencies = [
- pants('src/main/python/twitter/aurora/BUILD.thirdparty:pystachio'),
- pants('aurora/twitterdeps/src/python/twitter/common/lang'),
- pants('src/main/python/twitter/aurora/common'),
- pants('src/main/python/twitter/aurora/config/schema'),
- pants('src/main/thrift/com/twitter/aurora/gen:py-thrift'),
- ],
-
-)
-
-python_library(
- name = 'config-packaged',
- dependencies = [
- pants(':config'),
-
- # covering dependencies
- pants('src/main/python/twitter/thermos/config'),
- ],
- provides = setup_py(
- name = 'twitter.aurora.config',
- version = open(os.path.join(get_buildroot(), '.auroraversion')).read().strip().lower(),
- description = 'Aurora/Thermos Pystachio schemas for describing job configurations.',
- license = 'Apache License, Version 2.0',
- )
-)
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/twitter/aurora/config/__init__.py
----------------------------------------------------------------------
diff --git a/src/main/python/twitter/aurora/config/__init__.py b/src/main/python/twitter/aurora/config/__init__.py
deleted file mode 100644
index a3ba981..0000000
--- a/src/main/python/twitter/aurora/config/__init__.py
+++ /dev/null
@@ -1,271 +0,0 @@
-from __future__ import absolute_import
-
-from collections import defaultdict
-
-from twitter.aurora.common.aurora_job_key import AuroraJobKey
-from twitter.aurora.config.schema.base import MesosContext
-from twitter.thermos.config.loader import PortExtractor, ThermosTaskWrapper
-from twitter.thermos.config.schema import ThermosContext
-
-from .loader import AuroraConfigLoader
-from .port_resolver import PortResolver
-from .thrift import convert as convert_thrift, InvalidConfig as InvalidThriftConfig
-
-from pystachio import Empty, Environment, Ref
-
-__all__ = ('AuroraConfig', 'PortResolver')
-
-
-class AuroraConfig(object):
- class Error(Exception): pass
-
- class InvalidConfig(Error):
- def __str__(self):
- return 'The configuration was invalid: %s' % self.args[0]
-
- @classmethod
- def plugins(cls):
- """A stack of callables to apply to the config on load."""
- return []
-
- @classmethod
- def pick(cls, env, name, bindings, select_cluster=None, select_role=None, select_env=None):
- # TODO(atollenaere): should take a JobKey when non-jobkey interface is deprecated
-
- job_list = env.get('jobs', [])
- if not job_list:
- raise ValueError('No job defined in this config!')
-
- def maybe_bind(j):
- return j.bind(*bindings) if bindings else j
-
- if name is None:
- if len(job_list) > 1:
- raise ValueError('Configuration has multiple jobs but no job name specified!')
- return maybe_bind(job_list[0])
-
- # TODO(wfarner): Rework this and calling code to make name optional as well.
- def match_name(job):
- return str(job.name()) == name
- def match_cluster(job):
- return select_cluster is None or str(job.cluster()) == select_cluster
- def match_env(job):
- return select_env is None or str(job.environment()) == select_env
- def match_role(job):
- return select_role is None or str(job.role()) == select_role
-
- bound_jobs = map(maybe_bind, job_list)
- matches = [j for j in bound_jobs if
- all([match_cluster(j), match_role(j), match_env(j), match_name(j)])]
-
- if len(matches) == 0:
- msg = "Could not find job %s/%s/%s/%s\n" % (
- select_cluster or '*', select_role or '*', select_env or '*', name)
- for j in bound_jobs:
- if j.environment() is Empty:
- msg += "Job %s/%s/%s/%s in configuration file doesn't specify an environment\n" % (
- j.cluster(), j.role(), '{MISSING}', j.name()
- )
- msg += cls._candidate_jobs_str(bound_jobs)
- raise ValueError(msg)
-
- elif len(matches) > 1:
- msg = 'Multiple jobs match, please disambiguate by specifying a job key.\n'
- msg += cls._candidate_jobs_str(bound_jobs)
- raise ValueError(msg)
- else:
- return matches[0]
-
- @staticmethod
- def _candidate_jobs_str(job_list):
- assert(job_list)
- job_list = [" %s/%s/%s/%s" % (
- j.cluster(), j.role(),
- j.environment() if j.environment() is not Empty else "{MISSING}",
- j.name())
- for j in job_list]
- return 'Candidates are:\n' + '\n'.join(job_list)
-
- @classmethod
- def apply_plugins(cls, config, env=None):
- for plugin in cls.plugins():
- if not callable(plugin):
- raise cls.Error('Invalid configuration plugin %r, should be callable!' % plugin)
- plugin(config, env)
- return config
-
- @classmethod
- def load(
- cls, filename, name=None, bindings=None,
- select_cluster=None, select_role=None, select_env=None):
- # TODO(atollenaere): should take a JobKey when non-jobkey interface is deprecated
- env = AuroraConfigLoader.load(filename)
- return cls.apply_plugins(
- cls(cls.pick(env, name, bindings, select_cluster, select_role, select_env)), env)
-
- @classmethod
- def load_json(
- cls, filename, name=None, bindings=None,
- select_cluster=None, select_role=None, select_env=None):
- # TODO(atollenaere): should take a JobKey when non-jobkey interface is deprecated
- job = AuroraConfigLoader.load_json(filename)
- return cls.apply_plugins(cls(job.bind(*bindings) if bindings else job))
-
- @classmethod
- def loads_json(cls, string, name=None, bindings=None, select_cluster=None, select_env=None):
- # TODO(atollenaere): should take a JobKey when non-jobkey interface is deprecated
- job = AuroraConfigLoader.loads_json(string)
- return cls.apply_plugins(cls(job.bind(*bindings) if bindings else job))
-
- @classmethod
- def validate_job(cls, job):
- """
- Validate and sanitize the input job
-
- Currently, the validation stage simply ensures that the job has all required fields.
- self.InvalidConfig is raised if any required fields are not present.
- """
- def has(pystachio_type, thing):
- return getattr(pystachio_type, 'has_%s' % thing)()
- for required in ("cluster", "task", "role"):
- if not has(job, required):
- raise cls.InvalidConfig(
- '%s required for job "%s"' % (required.capitalize(), job.name()))
- if not has(job.task(), 'processes'):
- raise cls.InvalidConfig('Processes required for task on job "%s"' % job.name())
-
- @classmethod
- def standard_bindings(cls, job):
- # Rewrite now-deprecated bindings into their proper form.
- return job.bind({
- Ref.from_address('mesos.role'): '{{role}}',
- Ref.from_address('mesos.cluster'): '{{cluster}}',
- Ref.from_address('thermos.user'): '{{role}}',
- })
-
- def __init__(self, job):
- self.validate_job(job)
- self._job = self.standard_bindings(job)
- self._packages = []
- self.binding_dicts = defaultdict(dict)
- self.hooks = []
-
- def context(self, instance=None):
- context = dict(instance=instance)
- # Filter unspecified values
- return Environment(mesos=MesosContext(dict((k, v) for k, v in context.items() if v)))
-
- def job(self):
- interpolated_job = self._job % self.context()
-
- # TODO(wickman) Once thermos is onto thrift instead of pystachio, use
- # %%replacements%% instead.
- #
- # Typecheck against the Job, with the following free variables unwrapped at the Task level:
- # - a dummy {{mesos.instance}}
- # - dummy values for the {{thermos.ports}} context, to allow for their use in task_links
- env = dict(mesos=Environment(instance=0))
- if interpolated_job.task_links() is not Empty:
- try:
- dummy_ports = dict(
- (port, 31337) for port in PortExtractor.extract(interpolated_job.task_links()))
- except PortExtractor.InvalidPorts as err:
- raise self.InvalidConfig('Invalid port references in task_links! %s' % err)
- env.update(thermos=ThermosContext(ports=dummy_ports))
- typecheck = interpolated_job.bind(Environment(env)).check()
- if not typecheck.ok():
- raise self.InvalidConfig(typecheck.message())
- interpolated_job = interpolated_job(task_links=self.task_links())
- try:
- return convert_thrift(interpolated_job, self._packages, self.ports())
- except InvalidThriftConfig as e:
- raise self.InvalidConfig(str(e))
-
- def bind(self, binding):
- self._job = self._job.bind(binding)
-
- def raw(self):
- return self._job
-
- # This stinks to high heaven
- def update_job(self, new_job):
- self._job = new_job
-
- def instances(self):
- return self._job.instances().get()
-
- def task(self, instance):
- return (self._job % self.context(instance)).task()
-
- def name(self):
- return self._job.name().get()
-
- def role(self):
- return self._job.role().get()
-
- def cluster(self):
- return self._job.cluster().get()
-
- def environment(self):
- return self._job.environment().get()
-
- def job_key(self):
- return AuroraJobKey(self.cluster(), self.role(), self.environment(), self.name())
-
- def ports(self):
- """Return the list of ports that need to be allocated by the scheduler."""
-
- # Strictly speaking this is wrong -- it is possible to do things like
- # {{thermos.ports[instance_{{mesos.instance}}]}}
- # which can only be extracted post-unwrapping. This means that validating
- # the state of the announce configuration could be problematic if people
- # try to do complicated things.
- referenced_ports = ThermosTaskWrapper(self._job.task(), strict=False).ports()
- resolved_portmap = PortResolver.resolve(self._job.announce().portmap().get()
- if self._job.has_announce() else {})
-
- # values of the portmap that are not integers => unallocated
- unallocated = set(port for port in resolved_portmap.values() if not isinstance(port, int))
-
- # find referenced {{thermos.portmap[ports]}} that are not resolved by the portmap
- unresolved_references = set(
- port for port in (resolved_portmap.get(port_ref, port_ref) for port_ref in referenced_ports)
- if not isinstance(port, int))
-
- return unallocated | unresolved_references
-
- def has_health_port(self):
- return "health" in ThermosTaskWrapper(self._job.task(), strict=False).ports()
-
- def task_links(self):
- # {{mesos.instance}} --> %shard_id%
- # {{thermos.ports[foo]}} --> %port:foo%
- task_links = self._job.task_links()
- if task_links is Empty:
- return task_links
- _, uninterp = task_links.interpolate()
- substitutions = {
- Ref.from_address('mesos.instance'): '%shard_id%'
- }
- port_scope = Ref.from_address('thermos.ports')
- for ref in uninterp:
- subscope = port_scope.scoped_to(ref)
- if subscope:
- substitutions[ref] = '%%port:%s%%' % subscope.action().value
- return task_links.bind(substitutions)
-
- def update_config(self):
- return self._job.update_config()
-
- def add_package(self, package):
- self._packages.append(package)
-
- # TODO(wickman) Kill package() once MESOS-3191 is in.
- def package(self):
- pass
-
- def is_dedicated(self):
- return self._job.has_constraints() and 'dedicated' in self._job.constraints()
-
- def __repr__(self):
- return '%s(%r)' % (self.__class__.__name__, self._job)
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/twitter/aurora/config/loader.py
----------------------------------------------------------------------
diff --git a/src/main/python/twitter/aurora/config/loader.py b/src/main/python/twitter/aurora/config/loader.py
deleted file mode 100644
index e3e5559..0000000
--- a/src/main/python/twitter/aurora/config/loader.py
+++ /dev/null
@@ -1,63 +0,0 @@
-import json
-import pkgutil
-import textwrap
-
-from twitter.aurora.config.schema import base as base_schema
-
-from pystachio.config import Config as PystachioConfig
-
-
-class AuroraConfigLoader(PystachioConfig):
- SCHEMA_MODULES = []
-
- @classmethod
- def assembled_schema(cls, schema_modules):
- default_schema = [super(AuroraConfigLoader, cls).DEFAULT_SCHEMA]
- default_schema.extend('from %s import *' % module.__name__ for module in schema_modules)
- return '\n'.join(default_schema)
-
- @classmethod
- def register_schema(cls, schema_module):
- """Register the schema defined in schema_module, equivalent to doing
-
- from schema_module.__name__ import *
-
- before all pystachio configurations are evaluated.
- """
- cls.SCHEMA_MODULES.append(schema_module)
- cls.DEFAULT_SCHEMA = cls.assembled_schema(cls.SCHEMA_MODULES)
-
- @classmethod
- def register_schemas_from(cls, package):
- """Register schemas from all modules in a particular package."""
- for _, submodule, is_package in pkgutil.iter_modules(package.__path__):
- if is_package:
- continue
- cls.register_schema(
- __import__('%s.%s' % (package.__name__, submodule), fromlist=[package.__name__]))
-
- @classmethod
- def flush_schemas(cls):
- """Flush all schemas from AuroraConfigLoader. Intended for test use only."""
- cls.SCHEMA_MODULES = []
- cls.register_schema(base_schema)
-
- @classmethod
- def load(cls, loadable):
- return cls.load_raw(loadable).environment
-
- @classmethod
- def load_raw(cls, loadable):
- return cls(loadable)
-
- @classmethod
- def load_json(cls, filename):
- with open(filename) as fp:
- return base_schema.Job.json_load(fp)
-
- @classmethod
- def loads_json(cls, string):
- return base_schema.Job(json.loads(string))
-
-
-AuroraConfigLoader.flush_schemas()
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/twitter/aurora/config/port_resolver.py
----------------------------------------------------------------------
diff --git a/src/main/python/twitter/aurora/config/port_resolver.py b/src/main/python/twitter/aurora/config/port_resolver.py
deleted file mode 100644
index 486095f..0000000
--- a/src/main/python/twitter/aurora/config/port_resolver.py
+++ /dev/null
@@ -1,45 +0,0 @@
-from twitter.common.lang import Compatibility
-
-
-class PortResolver(object):
- class CycleException(Exception): pass
-
- @classmethod
- def resolve(cls, portmap):
- """
- Given an announce-style portmap, return a fully dereferenced portmap.
-
- For example, given the portmap:
- {
- 'http': 80,
- 'aurora: 'http',
- 'https': 'aurora',
- 'thrift': 'service'
- }
-
- Returns {'http': 80, 'aurora': 80, 'https': 80, 'thrift': 'service'}
- """
- for (name, port) in portmap.items():
- if not isinstance(name, Compatibility.string):
- raise ValueError('All portmap keys must be strings!')
- if not isinstance(port, (int, Compatibility.string)):
- raise ValueError('All portmap values must be strings or integers!')
-
- portmap = portmap.copy()
- for port in list(portmap):
- try:
- portmap[port] = int(portmap[port])
- except ValueError:
- continue
-
- def resolve_one(static_port):
- visited = set()
- root = portmap[static_port]
- while root in portmap:
- visited.add(root)
- if portmap[root] in visited:
- raise cls.CycleException('Found cycle in portmap!')
- root = portmap[root]
- return root
-
- return dict((name, resolve_one(name)) for name in portmap)
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/twitter/aurora/config/recipes.py
----------------------------------------------------------------------
diff --git a/src/main/python/twitter/aurora/config/recipes.py b/src/main/python/twitter/aurora/config/recipes.py
deleted file mode 100644
index b2dc23c..0000000
--- a/src/main/python/twitter/aurora/config/recipes.py
+++ /dev/null
@@ -1,44 +0,0 @@
-import os
-
-from .loader import AuroraConfigLoader
-
-import pkg_resources
-
-
-class Recipes(object):
- """
- Encapsulate a registry of Recipes (i.e. tasks to mutate the behavior of other tasks.)
- """
- REGISTRY = {}
- RECIPE_EXTENSION = '.aurora_recipe'
-
- class Error(Exception): pass
- class UnknownRecipe(Error): pass
-
- @classmethod
- def get(cls, name):
- if name not in cls.REGISTRY:
- raise cls.UnknownRecipe('Could not find recipe %s!' % name)
- return cls.REGISTRY[name]
-
- @classmethod
- def include_one(cls, filename):
- recipe_env = AuroraConfigLoader.load(filename)
- cls.REGISTRY.update(recipe_env.get('recipes', {}))
-
- @classmethod
- def include_module(cls, module):
- for filename in pkg_resources.resource_listdir(module, ''):
- if filename.endswith(cls.RECIPE_EXTENSION):
- cls.include_one(os.path.join(module.replace('.', os.sep), filename))
-
- @classmethod
- def include(cls, path):
- if os.path.isfile(path):
- cls.include_one(path)
- elif os.path.isdir(path):
- for filename in os.listdir(path):
- if filename.endswith(cls.RECIPE_EXTENSION):
- cls.include_one(os.path.join(path, filename))
- else:
- raise ValueError('Could not find %s' % path)
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/twitter/aurora/config/repl.py
----------------------------------------------------------------------
diff --git a/src/main/python/twitter/aurora/config/repl.py b/src/main/python/twitter/aurora/config/repl.py
deleted file mode 100644
index d26f94d..0000000
--- a/src/main/python/twitter/aurora/config/repl.py
+++ /dev/null
@@ -1,8 +0,0 @@
-from twitter.aurora.config.loader import AuroraConfigLoader
-from twitter.common.lang import Compatibility
-
-
-
-import code
-code.interact('Mesos Config REPL',
- local=Compatibility.exec_function(AuroraConfigLoader.DEFAULT_SCHEMA, globals()))
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/twitter/aurora/config/schema/BUILD
----------------------------------------------------------------------
diff --git a/src/main/python/twitter/aurora/config/schema/BUILD b/src/main/python/twitter/aurora/config/schema/BUILD
deleted file mode 100644
index fd08ec7..0000000
--- a/src/main/python/twitter/aurora/config/schema/BUILD
+++ /dev/null
@@ -1,9 +0,0 @@
-python_library(
- name = 'schema',
- sources = ['base.py'],
- dependencies = [
- pants('src/main/python/twitter/aurora/BUILD.thirdparty:pystachio'),
- pants('src/main/python/twitter/thermos/config:schema'),
- pants('src/main/thrift/com/twitter/aurora/gen:py-thrift'),
- ]
-)
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/twitter/aurora/config/schema/__init__.py
----------------------------------------------------------------------
diff --git a/src/main/python/twitter/aurora/config/schema/__init__.py b/src/main/python/twitter/aurora/config/schema/__init__.py
deleted file mode 100644
index e69de29..0000000
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/twitter/aurora/config/schema/base.py
----------------------------------------------------------------------
diff --git a/src/main/python/twitter/aurora/config/schema/base.py b/src/main/python/twitter/aurora/config/schema/base.py
deleted file mode 100644
index b3d437f..0000000
--- a/src/main/python/twitter/aurora/config/schema/base.py
+++ /dev/null
@@ -1,122 +0,0 @@
-from twitter.thermos.config.schema import *
-
-from gen.twitter.aurora.constants import DEFAULT_ENVIRONMENT
-
-
-# TODO(wickman) Bind {{mesos.instance}} to %shard_id%
-class MesosContext(Struct):
- # The instance id (i.e. replica id, shard id) in the context of a task
- instance = Required(Integer)
-
-
-# AppApp layout setup
-class AppPackage(Struct):
- name = Required(String)
- version = Default(String, 'latest')
-
-class AppLayout(Struct):
- packages = Default(List(AppPackage), [])
-
-
-# The object bound into the {{packer}} namespace.
-# Referenced by
-# {{packer[role][name][version]}}
-#
-# Where version =
-# number (integer)
-# 'live' (live package)
-# 'latest' (highest version number)
-#
-# For example if you'd like to create a copy process for a particular
-# package,
-# copy_latest = Process(
-# name = 'copy-{{package_name}}',
-# cmdline = '{{packer[{{role}}][{{package_name}}][latest].copy_command}}')
-# processes = [
-# copy_latest.bind(package_name = 'labrat'),
-# copy_latest.bind(package_name = 'packer')
-# ]
-class PackerObject(Struct):
- package = String
- package_uri = String
- copy_command = String
-
-
-class UpdateConfig(Struct):
- batch_size = Default(Integer, 1)
- restart_threshold = Default(Integer, 60)
- watch_secs = Default(Integer, 30)
- max_per_shard_failures = Default(Integer, 0)
- max_total_failures = Default(Integer, 0)
-
-
-class HealthCheckConfig(Struct):
- initial_interval_secs = Default(Float, 60.0)
- interval_secs = Default(Float, 30.0)
- timeout_secs = Default(Float, 1.0)
- max_consecutive_failures = Default(Integer, 0)
-
-
-class Announcer(Struct):
- primary_port = Default(String, 'http')
-
- # Portmap can either alias two ports together, e.g.
- # aurora <= http
- # Or it can be used to alias static ports to endpoints, e.g.
- # http <= 80
- # https <= 443
- # aurora <= https
- portmap = Default(Map(String, String), {
- 'aurora': '{{primary_port}}'
- })
-
-
-# The executorConfig populated inside of TaskConfig.
-class MesosTaskInstance(Struct):
- task = Required(Task)
- layout = AppLayout
- instance = Required(Integer)
- role = Required(String)
- announce = Announcer
- environment = Default(String, DEFAULT_ENVIRONMENT)
- health_check_interval_secs = Default(Integer, 30) # DEPRECATED (MESOS-2649)
- health_check_config = Default(HealthCheckConfig, HealthCheckConfig())
-
-
-class MesosJob(Struct):
- name = Default(String, '{{task.name}}')
- role = Required(String)
- contact = String
- cluster = Required(String)
- environment = Required(String)
- instances = Default(Integer, 1)
- task = Required(Task)
- recipes = List(String)
- announce = Announcer
-
- cron_schedule = String
- cron_policy = String # these two are aliases of each other. default is KILL_EXISTING
- cron_collision_policy = String # if unspecified.
- # cron_policy is DEPRECATED (MESOS-2491) in favor of
- # cron_collision_policy.
-
- update_config = Default(UpdateConfig, UpdateConfig())
-
- constraints = Map(String, String)
- daemon = Boolean # daemon and service are aliased together.
- service = Boolean # daemon is DEPRECATED (MESOS-2492) in favor of
- # service. by default, service is False.
- max_task_failures = Default(Integer, 1)
- production = Default(Boolean, False)
- priority = Default(Integer, 0)
- health_check_interval_secs = Integer # DEPRECATED in favor of health_check_config (MESOS-2649).
- health_check_config = HealthCheckConfig
- task_links = Map(String, String)
-
- layout = AppLayout # DEPRECATED in favor of directory sandboxes
-
- enable_hooks = Default(Boolean, False) # enable client API hooks; from env python-list 'hooks'
-
-
-Job = MesosJob
-Service = Job(service = True)
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/twitter/aurora/config/thrift.py
----------------------------------------------------------------------
diff --git a/src/main/python/twitter/aurora/config/thrift.py b/src/main/python/twitter/aurora/config/thrift.py
deleted file mode 100644
index 8f6a5ce..0000000
--- a/src/main/python/twitter/aurora/config/thrift.py
+++ /dev/null
@@ -1,259 +0,0 @@
-import getpass
-import re
-
-from twitter.aurora.config.schema.base import (
- HealthCheckConfig,
- MesosContext,
- MesosTaskInstance,
-)
-from twitter.common.lang import Compatibility
-from twitter.thermos.config.loader import ThermosTaskValidator
-
-from gen.twitter.aurora.constants import GOOD_IDENTIFIER_PATTERN_PYTHON, AURORA_EXECUTOR_NAME
-from gen.twitter.aurora.ttypes import (
- Constraint,
- CronCollisionPolicy,
- ExecutorConfig,
- Identity,
- JobConfiguration,
- JobKey,
- LimitConstraint,
- Package,
- TaskConfig,
- TaskConstraint,
- ValueConstraint,
-)
-
-from pystachio import Empty, Ref
-
-__all__ = (
- 'InvalidConfig',
- 'convert'
-)
-
-
-class InvalidConfig(ValueError):
- pass
-
-
-def constraints_to_thrift(constraints):
- """Convert a python dictionary to a set of Constraint thrift objects."""
- result = set()
- for attribute, constraint_value in constraints.items():
- assert isinstance(attribute, Compatibility.string) and (
- isinstance(constraint_value, Compatibility.string)), (
- "Both attribute name and value in constraints must be string")
- constraint = Constraint()
- constraint.name = attribute
- task_constraint = TaskConstraint()
- if constraint_value.startswith('limit:'):
- task_constraint.limit = LimitConstraint()
- try:
- task_constraint.limit.limit = int(constraint_value.replace('limit:', '', 1))
- except ValueError:
- print('%s is not a valid limit value, must be integer' % constraint_value)
- raise
- else:
- # Strip off the leading negation if present.
- negated = constraint_value.startswith('!')
- if negated:
- constraint_value = constraint_value[1:]
- task_constraint.value = ValueConstraint(negated, set(constraint_value.split(',')))
- constraint.constraint = task_constraint
- result.add(constraint)
- return result
-
-
-def task_instance_from_job(job, instance):
- instance_context = MesosContext(instance=instance)
- # TODO(Sathya): Remove health_check_interval_secs references after deprecation cycle is complete.
- health_check_config = HealthCheckConfig()
- if job.has_health_check_interval_secs():
- health_check_config = HealthCheckConfig(interval_secs=job.health_check_interval_secs().get())
- elif job.has_health_check_config():
- health_check_config = job.health_check_config()
- ti = MesosTaskInstance(task=job.task(),
- layout=job.layout(),
- role=job.role(),
- health_check_interval_secs=health_check_config.interval_secs().get(),
- health_check_config=health_check_config,
- instance=instance)
- if job.has_announce():
- ti = ti(announce=job.announce())
- if job.has_environment():
- ti = ti(environment=job.environment())
- return ti.bind(mesos=instance_context).interpolate()
-
-
-def translate_cron_policy(policy):
- cron_policy = CronCollisionPolicy._NAMES_TO_VALUES.get(policy.get())
- if cron_policy is None:
- raise InvalidConfig('Invalid cron policy: %s' % policy.get())
- return cron_policy
-
-
-def fully_interpolated(pystachio_object, coerce_fn=lambda i: i):
- # Extract a fully-interpolated unwrapped object from pystachio_object or raise InvalidConfig.
- #
- # TODO(ksweeney): Remove this once Pystachio 1.0 changes the behavior of interpolate() to return
- # unwrapped objects and fail when there are any unbound refs.
- if not pystachio_object.check().ok():
- raise InvalidConfig(pystachio_object.check().message())
-
- # If an object type-checks it's okay to use the raw value from the wrapped object returned by
- # interpolate. Without the previous check value.get() could return a string with mustaches
- # instead of an object of the expected type.
- value, _ = pystachio_object.interpolate()
- return coerce_fn(value.get())
-
-
-def select_cron_policy(cron_policy, cron_collision_policy):
- if cron_policy is Empty and cron_collision_policy is Empty:
- return CronCollisionPolicy.KILL_EXISTING
- elif cron_policy is not Empty and cron_collision_policy is Empty:
- return translate_cron_policy(cron_policy)
- elif cron_policy is Empty and cron_collision_policy is not Empty:
- return translate_cron_policy(cron_collision_policy)
- else:
- raise InvalidConfig('Specified both cron_policy and cron_collision_policy!')
-
-
-def select_service_bit(job):
- if not job.has_daemon() and not job.has_service():
- return False
- elif job.has_daemon() and not job.has_service():
- return fully_interpolated(job.daemon(), bool)
- elif not job.has_daemon() and job.has_service():
- return fully_interpolated(job.service(), bool)
- else:
- raise InvalidConfig('Specified both daemon and service bits!')
-
-
-# TODO(wickman) Due to MESOS-2718 we should revert to using the MesosTaskInstance.
-#
-# Using the MesosJob instead of the MesosTaskInstance was to allow for
-# planned future use of fields such as 'cluster' and to allow for conversion
-# from Job=>Task to be done entirely on the executor, but instead this had
-# made it impossible to run idempotent updates.
-#
-# In the meantime, we are erasing fields of the Job that are controversial.
-# This achieves roughly the same effect as using the MesosTaskInstance.
-# The future work is tracked at MESOS-2727.
-ALIASED_FIELDS = (
- 'cron_policy',
- 'cron_collision_policy',
- 'update_config',
- 'daemon',
- 'service',
- 'instances'
-)
-
-
-def filter_aliased_fields(job):
- return job(**dict((key, Empty) for key in ALIASED_FIELDS))
-
-
-def assert_valid_field(field, identifier):
- VALID_IDENTIFIER = re.compile(GOOD_IDENTIFIER_PATTERN_PYTHON)
- if not isinstance(identifier, Compatibility.string):
- raise InvalidConfig("%s must be a string" % field)
- if not VALID_IDENTIFIER.match(identifier):
- raise InvalidConfig("Invalid %s '%s'" % (field, identifier))
- return identifier
-
-
-MESOS_INSTANCE_REF = Ref.from_address('mesos.instance')
-THERMOS_PORT_SCOPE_REF = Ref.from_address('thermos.ports')
-THERMOS_TASK_ID_REF = Ref.from_address('thermos.task_id')
-
-
-# TODO(wickman) Make this a method directly on an AuroraConfig so that we don't
-# need the packages/ports shenanigans.
-def convert(job, packages=frozenset(), ports=frozenset()):
- """Convert a Pystachio MesosJob to an Aurora Thrift JobConfiguration."""
-
- owner = Identity(role=fully_interpolated(job.role()), user=getpass.getuser())
- key = JobKey(
- role=assert_valid_field('role', fully_interpolated(job.role())),
- environment=assert_valid_field('environment', fully_interpolated(job.environment())),
- name=assert_valid_field('name', fully_interpolated(job.name())))
-
- task_raw = job.task()
-
- MB = 1024 * 1024
- task = TaskConfig()
-
- def not_empty_or(item, default):
- return default if item is Empty else fully_interpolated(item)
-
- # job components
- task.jobName = fully_interpolated(job.name())
- task.environment = fully_interpolated(job.environment())
- task.production = fully_interpolated(job.production(), bool)
- task.isService = select_service_bit(job)
- task.maxTaskFailures = fully_interpolated(job.max_task_failures())
- task.priority = fully_interpolated(job.priority())
- task.contactEmail = not_empty_or(job.contact(), None)
-
- # Add package tuples to a task, to display in the scheduler UI.
- task.packages = frozenset(
- Package(role=str(role), name=str(package_name), version=int(version))
- for role, package_name, version in packages)
-
- # task components
- if not task_raw.has_resources():
- raise InvalidConfig('Task must specify resources!')
-
- if (fully_interpolated(task_raw.resources().ram()) == 0
- or fully_interpolated(task_raw.resources().disk()) == 0):
- raise InvalidConfig('Must specify ram and disk resources, got ram:%r disk:%r' % (
- fully_interpolated(task_raw.resources().ram()),
- fully_interpolated(task_raw.resources().disk())))
-
- task.numCpus = fully_interpolated(task_raw.resources().cpu())
- task.ramMb = fully_interpolated(task_raw.resources().ram()) / MB
- task.diskMb = fully_interpolated(task_raw.resources().disk()) / MB
- if task.numCpus <= 0 or task.ramMb <= 0 or task.diskMb <= 0:
- raise InvalidConfig('Task has invalid resources. cpu/ramMb/diskMb must all be positive: '
- 'cpu:%r ramMb:%r diskMb:%r' % (task.numCpus, task.ramMb, task.diskMb))
-
- task.owner = owner
- task.requestedPorts = ports
- task.taskLinks = not_empty_or(job.task_links(), {})
- task.constraints = constraints_to_thrift(not_empty_or(job.constraints(), {}))
-
- underlying, refs = job.interpolate()
-
- # need to fake an instance id for the sake of schema checking
- underlying_checked = underlying.bind(mesos = {'instance': 31337})
- try:
- ThermosTaskValidator.assert_valid_task(underlying_checked.task())
- except ThermosTaskValidator.InvalidTaskError as e:
- raise InvalidConfig('Task is invalid: %s' % e)
- if not underlying_checked.check().ok():
- raise InvalidConfig('Job not fully specified: %s' % underlying.check().message())
-
- unbound = []
- for ref in refs:
- if ref == THERMOS_TASK_ID_REF or ref == MESOS_INSTANCE_REF or (
- Ref.subscope(THERMOS_PORT_SCOPE_REF, ref)):
- continue
- unbound.append(ref)
-
- if unbound:
- raise InvalidConfig('Config contains unbound variables: %s' % ' '.join(map(str, unbound)))
-
- cron_schedule = not_empty_or(job.cron_schedule(), '')
- cron_policy = select_cron_policy(job.cron_policy(), job.cron_collision_policy())
-
- task.executorConfig = ExecutorConfig(
- name=AURORA_EXECUTOR_NAME,
- data=filter_aliased_fields(underlying).json_dumps())
-
- return JobConfiguration(
- key=key,
- owner=owner,
- cronSchedule=cron_schedule,
- cronCollisionPolicy=cron_policy,
- taskConfig=task,
- instanceCount=fully_interpolated(job.instances()))
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/twitter/aurora/executor/BUILD
----------------------------------------------------------------------
diff --git a/src/main/python/twitter/aurora/executor/BUILD b/src/main/python/twitter/aurora/executor/BUILD
deleted file mode 100644
index 01701e9..0000000
--- a/src/main/python/twitter/aurora/executor/BUILD
+++ /dev/null
@@ -1,139 +0,0 @@
-import os
-
-python_library(
- name = 'thermos_task_runner',
- sources = ['thermos_task_runner.py'],
- dependencies = [
- pants('aurora/twitterdeps/src/python/twitter/common/dirutil'),
- pants('aurora/twitterdeps/src/python/twitter/common/log'),
- pants('aurora/twitterdeps/src/python/twitter/common/quantity'),
- pants('src/main/python/twitter/thermos/common'),
- pants('src/main/python/twitter/thermos/config:schema'),
- pants('src/main/python/twitter/thermos/core'),
- pants('src/main/python/twitter/thermos/monitoring:monitor'),
- pants('src/main/python/twitter/aurora/common:http_signaler'),
- pants('src/main/python/twitter/aurora/executor/common:status_checker'),
- pants('src/main/python/twitter/aurora/executor/common:task_info'),
- pants('src/main/python/twitter/aurora/executor/common:task_runner'),
- ]
-)
-
-python_library(
- name = 'executor_detector',
- sources = ['executor_detector.py'],
- dependencies = [
- pants('aurora/twitterdeps/src/python/twitter/common/string'),
- ]
-)
-
-python_library(
- name = 'executor_vars',
- sources = ['executor_vars.py'],
- dependencies = [
- pants('src/main/python/twitter/aurora/BUILD.thirdparty:psutil'),
- pants('aurora/twitterdeps/src/python/twitter/common/exceptions'),
- pants('aurora/twitterdeps/src/python/twitter/common/metrics'),
- pants('aurora/twitterdeps/src/python/twitter/common/python'),
- pants('aurora/twitterdeps/src/python/twitter/common/quantity'),
- pants('aurora/twitterdeps/src/python/twitter/common/string'),
- ]
-)
-
-python_library(
- name = 'status_manager',
- sources = ['status_manager.py'],
- dependencies = [
- pants('aurora/twitterdeps/src/python/twitter/common/exceptions'),
- pants('aurora/twitterdeps/src/python/twitter/common/log'),
- pants('aurora/twitterdeps/src/python/twitter/common/quantity'),
- pants('src/main/python/twitter/aurora/executor/common:status_checker'),
- ]
-)
-
-python_library(
- name = 'thermos_executor_base',
- sources = ['executor_base.py'],
- dependencies = [
- pants('aurora/twitterdeps/src/python/twitter/common/log'),
- pants('src/main/thrift/com/twitter/thermos:py-thrift'),
- pants('src/main/python/twitter/aurora:mesos-core'),
- pants('src/main/thrift/com/twitter/aurora/gen:py-thrift'),
- ]
-)
-
-python_library(
- name = 'thermos_executor',
- sources = ['thermos_executor.py'],
- dependencies = [
- pants('src/main/python/twitter/aurora/BUILD.thirdparty:pystachio'),
- pants(':status_manager'),
- pants(':thermos_executor_base'),
- pants('aurora/twitterdeps/src/python/twitter/common/app'),
- pants('aurora/twitterdeps/src/python/twitter/common/concurrent'),
- pants('aurora/twitterdeps/src/python/twitter/common/log'),
- pants('aurora/twitterdeps/src/python/twitter/common/quantity'),
- pants('src/main/python/twitter/aurora/executor/common:kill_manager'),
- pants('src/main/python/twitter/aurora/executor/common:sandbox'),
- pants('src/main/python/twitter/aurora/executor/common:task_info'),
- pants('src/main/python/twitter/aurora/executor/common:task_runner'),
- pants('src/main/python/twitter/aurora:mesos-core'),
- ]
-)
-
-python_library(
- name = 'thermos_runner',
- sources = ['thermos_runner.py'],
- dependencies = [
- pants('aurora/twitterdeps/src/python/twitter/common/app'),
- pants('aurora/twitterdeps/src/python/twitter/common/log'),
- pants('src/main/python/twitter/thermos/common:planner'),
- pants('src/main/python/twitter/thermos/config:schema'),
- pants('src/main/python/twitter/thermos/core'),
- ],
-)
-
-python_library(
- name = 'gc_executor',
- sources = ['gc_executor.py'],
- dependencies = [
- pants('src/main/python/twitter/aurora/BUILD.thirdparty:psutil'),
- pants(':executor_detector'),
- pants(':thermos_executor_base'),
- pants('aurora/twitterdeps/src/python/twitter/common/collections'),
- pants('aurora/twitterdeps/src/python/twitter/common/exceptions'),
- pants('aurora/twitterdeps/src/python/twitter/common/log'),
- pants('aurora/twitterdeps/src/python/twitter/common/metrics'),
- pants('aurora/twitterdeps/src/python/twitter/common/quantity'),
- pants('src/main/python/twitter/thermos/common:ckpt'),
- pants('src/main/python/twitter/thermos/common:path'),
- pants('src/main/python/twitter/thermos/core:helper'),
- pants('src/main/python/twitter/thermos/core:inspector'),
- pants('src/main/python/twitter/thermos/monitoring:detector'),
- pants('src/main/python/twitter/thermos/monitoring:garbage'),
- pants('src/main/python/twitter/aurora/config:schema'),
- pants('src/main/python/twitter/aurora/executor/common:sandbox'),
- pants('src/main/python/twitter/aurora:mesos-core'),
- pants('src/main/thrift/com/twitter/aurora/gen:py-thrift'),
- ]
-)
-
-python_library(
- name = 'executor-packaged',
- dependencies = [
- # Covering dependencies
- pants('src/main/python/twitter/aurora/common'),
- pants('src/main/python/twitter/aurora/config'),
- pants('src/main/python/twitter/thermos/common'),
- pants('src/main/python/twitter/thermos/config'),
- pants('src/main/python/twitter/thermos/core'),
- pants('src/main/python/twitter/thermos/monitoring'),
- ],
- provides = setup_py(
- name = 'twitter.aurora.executor',
- version = open(os.path.join(get_buildroot(), '.auroraversion')).read().strip().lower(),
- ).with_binaries(
- gc_executor = pants('src/main/python/twitter/aurora/executor/bin:gc_executor'),
- thermos_executor = pants('src/main/python/twitter/aurora/executor/bin:thermos_executor'),
- thermos_runner = pants('src/main/python/twitter/aurora/executor/bin:thermos_runner'),
- )
-)
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/twitter/aurora/executor/__init__.py
----------------------------------------------------------------------
diff --git a/src/main/python/twitter/aurora/executor/__init__.py b/src/main/python/twitter/aurora/executor/__init__.py
deleted file mode 100644
index b0d6433..0000000
--- a/src/main/python/twitter/aurora/executor/__init__.py
+++ /dev/null
@@ -1 +0,0 @@
-__import__('pkg_resources').declare_namespace(__name__)
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/twitter/aurora/executor/bin/BUILD
----------------------------------------------------------------------
diff --git a/src/main/python/twitter/aurora/executor/bin/BUILD b/src/main/python/twitter/aurora/executor/bin/BUILD
deleted file mode 100644
index 5e9cab1..0000000
--- a/src/main/python/twitter/aurora/executor/bin/BUILD
+++ /dev/null
@@ -1,47 +0,0 @@
-python_binary(
- name = 'thermos_executor',
- source = 'thermos_executor_main.py',
- entry_point = 'twitter.aurora.executor.bin.thermos_executor_main:proxy_main',
- ignore_errors = True,
- always_write_cache = True,
- dependencies = [
- pants('aurora/twitterdeps/src/python/twitter/common/app'),
- pants('aurora/twitterdeps/src/python/twitter/common/log'),
- pants('aurora/twitterdeps/src/python/twitter/common/metrics'),
- pants('src/main/python/twitter/aurora/executor/common:executor_timeout'),
- pants('src/main/python/twitter/aurora/executor/common:health_checker'),
- pants('src/main/python/twitter/aurora/executor/common:sandbox'),
- pants('src/main/python/twitter/aurora/executor:executor_detector'),
- pants('src/main/python/twitter/aurora/executor:executor_vars'),
- pants('src/main/python/twitter/aurora/executor:thermos_executor'),
- pants('src/main/python/twitter/aurora/executor:thermos_task_runner'),
- ]
-)
-
-python_binary(
- name = 'gc_executor',
- source = 'gc_executor_main.py',
- entry_point = 'twitter.aurora.executor.bin.gc_executor_main:proxy_main',
- ignore_errors = True,
- always_write_cache = True,
- dependencies = [
- pants('aurora/twitterdeps/src/python/twitter/common/app'),
- pants('aurora/twitterdeps/src/python/twitter/common/log'),
- pants('aurora/twitterdeps/src/python/twitter/common/metrics'),
- pants('src/main/python/twitter/thermos/common:path'),
- pants('src/main/python/twitter/aurora/executor:executor_detector'),
- pants('src/main/python/twitter/aurora/executor:executor_vars'),
- pants('src/main/python/twitter/aurora/executor:gc_executor'),
- ]
-)
-
-python_binary(
- name = 'thermos_runner',
- source = 'thermos_runner_main.py',
- entry_point = 'twitter.aurora.executor.bin.thermos_runner_main:proxy_main',
- dependencies = [
- pants('aurora/twitterdeps/src/python/twitter/common/app'),
- pants('aurora/twitterdeps/src/python/twitter/common/log'),
- pants('src/main/python/twitter/aurora/executor:thermos_runner'),
- ],
-)
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/twitter/aurora/executor/bin/__init__.py
----------------------------------------------------------------------
diff --git a/src/main/python/twitter/aurora/executor/bin/__init__.py b/src/main/python/twitter/aurora/executor/bin/__init__.py
deleted file mode 100644
index e69de29..0000000
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/twitter/aurora/executor/bin/gc_executor_main.py
----------------------------------------------------------------------
diff --git a/src/main/python/twitter/aurora/executor/bin/gc_executor_main.py b/src/main/python/twitter/aurora/executor/bin/gc_executor_main.py
deleted file mode 100644
index fad20e6..0000000
--- a/src/main/python/twitter/aurora/executor/bin/gc_executor_main.py
+++ /dev/null
@@ -1,45 +0,0 @@
-"""Command-line entry point to the Thermos GC executor
-
-This module wraps the Thermos GC executor into an executable suitable for launching by a Mesos
-slave.
-
-"""
-
-from twitter.aurora.executor.executor_detector import ExecutorDetector
-from twitter.aurora.executor.gc_executor import ThermosGCExecutor
-from twitter.common import app, log
-from twitter.common.log.options import LogOptions
-from twitter.common.metrics.sampler import DiskMetricWriter
-from twitter.thermos.common.path import TaskPath
-
-import mesos
-
-
-app.configure(debug=True)
-
-
-# locate logs locally in executor sandbox
-LogOptions.set_simple(True)
-LogOptions.set_disk_log_level('DEBUG')
-LogOptions.set_log_dir(ExecutorDetector.LOG_PATH)
-
-
-def proxy_main():
- def main():
- # Create executor stub
- thermos_gc_executor = ThermosGCExecutor(checkpoint_root=TaskPath.DEFAULT_CHECKPOINT_ROOT)
- thermos_gc_executor.start()
-
- # Start metrics collection
- metric_writer = DiskMetricWriter(thermos_gc_executor.metrics, ExecutorDetector.VARS_PATH)
- metric_writer.start()
-
- # Create driver stub
- driver = mesos.MesosExecutorDriver(thermos_gc_executor)
-
- # Start GC executor
- driver.run()
-
- log.info('MesosExecutorDriver.run() has finished.')
-
- app.main()
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/twitter/aurora/executor/bin/thermos_executor_main.py
----------------------------------------------------------------------
diff --git a/src/main/python/twitter/aurora/executor/bin/thermos_executor_main.py b/src/main/python/twitter/aurora/executor/bin/thermos_executor_main.py
deleted file mode 100644
index 31e718e..0000000
--- a/src/main/python/twitter/aurora/executor/bin/thermos_executor_main.py
+++ /dev/null
@@ -1,67 +0,0 @@
-"""Command-line entry point to the Thermos Executor
-
-This module wraps the Thermos Executor into an executable suitable for launching by a Mesos
-slave.
-
-"""
-
-import os
-
-from twitter.common import app, log
-from twitter.common.log.options import LogOptions
-
-from twitter.aurora.executor.common.executor_timeout import ExecutorTimeout
-from twitter.aurora.executor.common.health_checker import HealthCheckerProvider
-from twitter.aurora.executor.thermos_executor import ThermosExecutor
-from twitter.aurora.executor.thermos_task_runner import DefaultThermosTaskRunnerProvider
-
-import mesos
-
-
-app.configure(debug=True)
-LogOptions.set_simple(True)
-LogOptions.set_disk_log_level('DEBUG')
-LogOptions.set_log_dir('.')
-
-
-# TODO(wickman) Consider just having the OSS version require pip installed
-# thermos_runner binaries on every machine and instead of embedding the pex
-# as a resource, shell out to one on the PATH.
-def dump_runner_pex():
- import pkg_resources
- import twitter.aurora.executor.resources
- pex_name = 'thermos_runner.pex'
- runner_pex = os.path.join(os.path.realpath('.'), pex_name)
- with open(runner_pex, 'w') as fp:
- # TODO(wickman) Use shutil.copyfileobj to reduce memory footprint here.
- fp.write(pkg_resources.resource_stream(
- twitter.aurora.executor.resources.__name__, pex_name).read())
- return runner_pex
-
-
-def proxy_main():
- def main():
- runner_provider = DefaultThermosTaskRunnerProvider(
- dump_runner_pex(),
- artifact_dir=os.path.realpath('.'),
- )
-
- # Create executor stub
- thermos_executor = ThermosExecutor(
- runner_provider=runner_provider,
- status_providers=(HealthCheckerProvider(),),
- )
-
- # Create driver stub
- driver = mesos.MesosExecutorDriver(thermos_executor)
-
- # This is an ephemeral executor -- shutdown if we receive no tasks within a certain
- # time period
- ExecutorTimeout(thermos_executor.launched, driver).start()
-
- # Start executor
- driver.run()
-
- log.info('MesosExecutorDriver.run() has finished.')
-
- app.main()
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/twitter/aurora/executor/bin/thermos_runner_main.py
----------------------------------------------------------------------
diff --git a/src/main/python/twitter/aurora/executor/bin/thermos_runner_main.py b/src/main/python/twitter/aurora/executor/bin/thermos_runner_main.py
deleted file mode 100644
index 602111e..0000000
--- a/src/main/python/twitter/aurora/executor/bin/thermos_runner_main.py
+++ /dev/null
@@ -1,12 +0,0 @@
-from twitter.common import app
-from twitter.common.log.options import LogOptions
-from twitter.aurora.executor.thermos_runner import proxy_main as runner_proxy_main
-
-
-LogOptions.set_simple(True)
-
-
-def proxy_main():
- main = runner_proxy_main
-
- app.main()
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/twitter/aurora/executor/common/BUILD
----------------------------------------------------------------------
diff --git a/src/main/python/twitter/aurora/executor/common/BUILD b/src/main/python/twitter/aurora/executor/common/BUILD
deleted file mode 100644
index 9c932d1..0000000
--- a/src/main/python/twitter/aurora/executor/common/BUILD
+++ /dev/null
@@ -1,69 +0,0 @@
-python_library(
- name = 'status_checker',
- sources = ['status_checker.py'],
- dependencies = [
- pants('aurora/twitterdeps/src/python/twitter/common/lang'),
- pants('aurora/twitterdeps/src/python/twitter/common/log'),
- pants('aurora/twitterdeps/src/python/twitter/common/metrics'),
- ]
-)
-
-python_library(
- name = 'task_runner',
- sources = ['task_runner.py'],
- dependencies = [
- pants(':status_checker'),
- pants('aurora/twitterdeps/src/python/twitter/common/lang'),
- ]
-)
-
-python_library(
- name = 'health_checker',
- sources = ['health_checker.py'],
- dependencies = [
- pants(':status_checker'),
- pants(':task_info'),
- pants('aurora/twitterdeps/src/python/twitter/common/log'),
- pants('aurora/twitterdeps/src/python/twitter/common/exceptions'),
- pants('src/main/python/twitter/aurora/common:http_signaler'),
- ]
-)
-
-python_library(
- name = 'executor_timeout',
- sources = ['executor_timeout.py'],
- dependencies = [
- pants('aurora/twitterdeps/src/python/twitter/common/exceptions'),
- pants('aurora/twitterdeps/src/python/twitter/common/log'),
- pants('aurora/twitterdeps/src/python/twitter/common/quantity'),
- ]
-)
-
-python_library(
- name = 'kill_manager',
- sources = ['kill_manager.py'],
- dependencies = [
- pants(':status_checker'),
- ]
-)
-
-python_library(
- name = 'sandbox',
- sources = ['sandbox.py'],
- dependencies = [
- pants('aurora/twitterdeps/src/python/twitter/common/dirutil'),
- pants('aurora/twitterdeps/src/python/twitter/common/log'),
- ]
-)
-
-python_library(
- name = 'task_info',
- sources = ['task_info.py'],
- dependencies = [
- pants('src/main/python/twitter/aurora/BUILD.thirdparty:pystachio'),
- pants('aurora/twitterdeps/src/python/twitter/common/log'),
- pants('src/main/python/twitter/aurora/config'),
- pants('src/main/python/twitter/aurora/config/schema'),
- pants('src/main/thrift/com/twitter/aurora/gen:py-thrift'),
- ]
-)
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/twitter/aurora/executor/common/__init__.py
----------------------------------------------------------------------
diff --git a/src/main/python/twitter/aurora/executor/common/__init__.py b/src/main/python/twitter/aurora/executor/common/__init__.py
deleted file mode 100644
index e69de29..0000000
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/twitter/aurora/executor/common/executor_timeout.py
----------------------------------------------------------------------
diff --git a/src/main/python/twitter/aurora/executor/common/executor_timeout.py b/src/main/python/twitter/aurora/executor/common/executor_timeout.py
deleted file mode 100644
index 2828973..0000000
--- a/src/main/python/twitter/aurora/executor/common/executor_timeout.py
+++ /dev/null
@@ -1,21 +0,0 @@
-from twitter.common import log
-from twitter.common.quantity import Amount, Time
-from twitter.common.exceptions import ExceptionalThread
-
-
-class ExecutorTimeout(ExceptionalThread):
- DEFAULT_TIMEOUT = Amount(10, Time.SECONDS)
-
- def __init__(self, event, driver, logger=log.error, timeout=DEFAULT_TIMEOUT):
- self._event = event
- self._logger = logger
- self._driver = driver
- self._timeout = timeout
- super(ExecutorTimeout, self).__init__()
- self.daemon = True
-
- def run(self):
- self._event.wait(self._timeout.as_(Time.SECONDS))
- if not self._event.is_set():
- self._logger('Executor timing out.')
- self._driver.stop()
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/twitter/aurora/executor/common/health_checker.py
----------------------------------------------------------------------
diff --git a/src/main/python/twitter/aurora/executor/common/health_checker.py b/src/main/python/twitter/aurora/executor/common/health_checker.py
deleted file mode 100644
index 3b25f0e..0000000
--- a/src/main/python/twitter/aurora/executor/common/health_checker.py
+++ /dev/null
@@ -1,99 +0,0 @@
-import threading
-import time
-
-from twitter.aurora.common.http_signaler import HttpSignaler
-from twitter.common import log
-from twitter.common.exceptions import ExceptionalThread
-
-from .status_checker import (
- ExitState,
- StatusChecker,
- StatusCheckerProvider,
- StatusResult,
-)
-from .task_info import mesos_task_instance_from_assigned_task, resolve_ports
-
-
-class HealthCheckerThread(StatusChecker, ExceptionalThread):
- """Generic, StatusChecker-conforming thread for arbitrary periodic health checks
-
- health_checker should be a callable returning a tuple of (boolean, reason), indicating
- respectively the health of the service and the reason for its failure (or None if the service is
- still healthy).
- """
- def __init__(self,
- health_checker,
- interval_secs=30,
- initial_interval_secs=None,
- max_consecutive_failures=0,
- clock=time):
- self._checker = health_checker
- self._interval = interval_secs
- if initial_interval_secs is not None:
- self._initial_interval = initial_interval_secs
- else:
- self._initial_interval = interval_secs * 2
- self._current_consecutive_failures = 0
- self._max_consecutive_failures = max_consecutive_failures
- self._dead = threading.Event()
- if self._initial_interval > 0:
- self._healthy, self._reason = True, None
- else:
- self._healthy, self._reason = self._checker()
- self._clock = clock
- super(HealthCheckerThread, self).__init__()
- self.daemon = True
-
- @property
- def status(self):
- if not self._healthy:
- return StatusResult('Failed health check! %s' % self._reason, ExitState.FAILED)
-
- def run(self):
- log.debug('Health checker thread started.')
- self._clock.sleep(self._initial_interval)
- log.debug('Initial interval expired.')
- while not self._dead.is_set():
- self._maybe_update_failure_count(*self._checker())
- self._clock.sleep(self._interval)
-
- def _maybe_update_failure_count(self, is_healthy, reason):
- if not is_healthy:
- log.warning('Health check failure: %s' % reason)
- self._current_consecutive_failures += 1
- if self._current_consecutive_failures > self._max_consecutive_failures:
- log.warning('Reached consecutive failure limit.')
- self._healthy = False
- self._reason = reason
- else:
- if self._current_consecutive_failures > 0:
- log.debug('Reset consecutive failures counter.')
- self._current_consecutive_failures = 0
-
- def start(self):
- StatusChecker.start(self)
- ExceptionalThread.start(self)
-
- def stop(self):
- log.debug('Health checker thread stopped.')
- self._dead.set()
-
-
-class HealthCheckerProvider(StatusCheckerProvider):
- def from_assigned_task(self, assigned_task, _):
- mesos_task = mesos_task_instance_from_assigned_task(assigned_task)
- portmap = resolve_ports(mesos_task, assigned_task.assignedPorts)
-
- if 'health' not in portmap:
- return None
-
- health_check_config = mesos_task.health_check_config().get()
- http_signaler = HttpSignaler(
- portmap['health'],
- timeout_secs=health_check_config.get('timeout_secs'))
- health_checker = HealthCheckerThread(
- http_signaler.health,
- interval_secs=health_check_config.get('interval_secs'),
- initial_interval_secs=health_check_config.get('initial_interval_secs'),
- max_consecutive_failures=health_check_config.get('max_consecutive_failures'))
- return health_checker
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/twitter/aurora/executor/common/kill_manager.py
----------------------------------------------------------------------
diff --git a/src/main/python/twitter/aurora/executor/common/kill_manager.py b/src/main/python/twitter/aurora/executor/common/kill_manager.py
deleted file mode 100644
index 70ab733..0000000
--- a/src/main/python/twitter/aurora/executor/common/kill_manager.py
+++ /dev/null
@@ -1,19 +0,0 @@
-from .status_checker import ExitState, StatusChecker, StatusResult
-
-
-class KillManager(StatusChecker):
- """
- A health interface that provides a kill-switch for a task monitored by the status manager.
- """
- def __init__(self):
- self._killed = False
- self._reason = None
-
- @property
- def status(self):
- if self._killed:
- return StatusResult(self._reason, ExitState.KILLED)
-
- def kill(self, reason):
- self._reason = reason
- self._killed = True
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/twitter/aurora/executor/common/sandbox.py
----------------------------------------------------------------------
diff --git a/src/main/python/twitter/aurora/executor/common/sandbox.py b/src/main/python/twitter/aurora/executor/common/sandbox.py
deleted file mode 100644
index 551d360..0000000
--- a/src/main/python/twitter/aurora/executor/common/sandbox.py
+++ /dev/null
@@ -1,72 +0,0 @@
-from abc import abstractmethod, abstractproperty
-import getpass
-import grp
-import os
-import pwd
-
-from twitter.common import log
-from twitter.common.dirutil import safe_mkdir, safe_rmtree
-from twitter.common.lang import Interface
-
-
-class SandboxInterface(Interface):
- class Error(Exception): pass
- class CreationError(Error): pass
- class DeletionError(Error): pass
-
- @abstractproperty
- def root(self):
- """Return the root path of the sandbox."""
-
- @abstractproperty
- def chrooted(self):
- """Returns whether or not the sandbox is a chroot."""
-
- @abstractmethod
- def exists(self):
- """Returns true if the sandbox appears to exist."""
-
- @abstractmethod
- def create(self, *args, **kw):
- """Create the sandbox."""
-
- @abstractmethod
- def destroy(self, *args, **kw):
- """Destroy the sandbox."""
-
-
-class SandboxProvider(Interface):
- @abstractmethod
- def from_assigned_task(self, assigned_task):
- """Return the appropriate Sandbox implementation from an AssignedTask."""
-
-
-class DirectorySandbox(SandboxInterface):
- """ Basic sandbox implementation using a directory on the filesystem """
- def __init__(self, root, user=getpass.getuser()):
- self._root = root
- self._user = user
-
- @property
- def root(self):
- return self._root
-
- @property
- def chrooted(self):
- return False
-
- def exists(self):
- return os.path.exists(self.root)
-
- def create(self):
- log.debug('DirectorySandbox: mkdir %s' % self.root)
- safe_mkdir(self.root)
- pwent = pwd.getpwnam(self._user)
- grent = grp.getgrgid(pwent.pw_gid)
- log.debug('DirectorySandbox: chown %s:%s %s' % (self._user, grent.gr_name, self.root))
- os.chown(self.root, pwent.pw_uid, pwent.pw_gid)
- log.debug('DirectorySandbox: chmod 700 %s' % self.root)
- os.chmod(self.root, 0700)
-
- def destroy(self):
- safe_rmtree(self.root)