You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aurora.apache.org by wf...@apache.org on 2013/12/31 22:20:19 UTC
[26/51] [partial] Rename twitter* and com.twitter to apache and
org.apache directories to preserve all file history before the refactor.
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/apache/aurora/client/bin/aurora_client.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/aurora/client/bin/aurora_client.py b/src/main/python/apache/aurora/client/bin/aurora_client.py
new file mode 100644
index 0000000..8228b88
--- /dev/null
+++ b/src/main/python/apache/aurora/client/bin/aurora_client.py
@@ -0,0 +1,33 @@
+from twitter.aurora.client.base import generate_terse_usage
+from twitter.common import app
+from twitter.common.log.options import LogOptions
+
+# These are are side-effecting imports in that they register commands via
+# app.command. This is a poor code practice and should be fixed long-term
+# with the creation of twitter.common.cli that allows for argparse-style CLI
+# composition.
+from twitter.aurora.client.commands import (
+ core,
+ help,
+ run,
+ ssh,
+)
+from twitter.aurora.client.options import add_verbosity_options
+
+app.register_commands_from(core, run, ssh)
+app.register_commands_from(help)
+add_verbosity_options()
+
+
+def main():
+ app.help()
+
+
+LogOptions.set_stderr_log_level('INFO')
+LogOptions.disable_disk_logging()
+app.set_name('aurora-client')
+app.set_usage(generate_terse_usage())
+
+
+def proxy_main():
+ app.main()
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/apache/aurora/client/binding_helper.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/aurora/client/binding_helper.py b/src/main/python/apache/aurora/client/binding_helper.py
new file mode 100644
index 0000000..fa03b17
--- /dev/null
+++ b/src/main/python/apache/aurora/client/binding_helper.py
@@ -0,0 +1,115 @@
+from abc import abstractmethod, abstractproperty
+import inspect
+import os
+import sys
+
+from twitter.common.lang import Interface
+
+__all__ = (
+ 'BindingHelper',
+ 'CachingBindingHelper',
+ 'apply_all',
+ 'clear_binding_caches',
+ 'unregister_all',
+)
+
+
+# The registry for binding helpers.
+_BINDING_HELPERS = []
+
+
+# TODO(wickman) Update the pydocs to remove references to common_internal components.
+class BindingHelper(Interface):
+ """A component which resolves some set of pseudo-bindings in a config.
+
+ Many bindings are too complex to resolve with bindings using the standard mechanisms,
+ because they require some python computation to determine how to bind them. For example,
+ for references like {{packer[role][pkg][version]}}, we need to talk to the packer to figure
+ out the correct packer call for the desired cluster.
+
+ A BindingHelper is responsible for resolving one of these types of pseudo-bindings.
+ PackerBindingHelper will resolve "packer" bindings; BuildBindingHelper will resolve "build"
+ bindings, JenkinsBindingHelper will resolve "jenkins" bindings, etc.
+
+ A BindingHelper can be registered by calling "BindingHelper.register(Helper)". Instead of
+ explicitly calling "inject" methods in populate_namespaces, it will compute the set of open
+ bindings, and then call the appropriate helpers for each.
+
+ The bindings can be computed either from scratch, or from a binding dictionary. A binding
+ dictionary can be computed from live data, and then passed over an RPC connection, so that
+ the bindings can be recomputed on the server.
+
+ Each helper is responsible for computing its own binding dict. The data in the dict should
+ meet two requirements: it should be enough data to allow it to produce exactly the same
+ result as the scratch binding, and the data should provide information that makes the
+ binding comprehensible for a human debugging a job.
+
+ For example, a packer helper's binding dict should provide enough information to identify
+ the HDFS file that should be used, but also the version number of the binary in packer,
+ (because a human reader wants to know the version of the package, not the meaningless
+ HDFS URL.
+ """
+ @classmethod
+ def register(cls):
+ _BINDING_HELPERS.append(cls())
+
+ def apply(self, config, env=None, binding_dict=None):
+ for match in self.matcher.match(config.raw()):
+ self.bind(config, match, env, binding_dict or config.binding_dicts[self.name])
+
+ @abstractproperty
+ def name(self):
+ """Returns the name of this BindingHelper. Typically it is the first component of
+ the matcher, e.g. if the matcher matches {{git[sha]}}, return "git"."""
+
+ @abstractproperty
+ def matcher(self):
+ """Returns the pystachio matcher for refs that this binding helper binds."""
+
+ @abstractmethod
+ def bind(self, config, match, env, binding_dict):
+ """Resolves a ref, adding a binding to the config."""
+
+
+class CachingBindingHelper(BindingHelper):
+ """A binding helper implementation that caches binding results"""
+ def __init__(self):
+ self.cache = {}
+
+ def flush_cache(self):
+ self.cache = {}
+
+ def bind(self, config, match, env, binding_dict):
+ if match not in self.cache:
+ self.cache[match] = self.uncached_bind(config, match, env, binding_dict)
+ config.bind(self.cache[match])
+
+ @abstractmethod
+ def uncached_bind(self, config, match, env, binding_dict):
+ """Compute the binding for a ref that hasn't been seen before."""
+
+
+def unregister_all():
+ _BINDING_HELPERS[:] = []
+
+
+def apply_all(config, env=None, binding_dict=None):
+ """Computes a set of bindings and applies them to the config.
+
+ :param config: the config whose bindings need to be computed.
+ :param env: the python environment where the configuration was evaluated.
+ :param binding_dict: an optional dictionary containing data to be used to compute the
+ bindings. If this is provided, then data from the dictionary should be used in
+ preference over live data.
+ :return: a binding dictionary with data that can be used to recompute the bindings. The
+ config is updated in-place.
+ """
+ for helper in _BINDING_HELPERS:
+ helper.apply(config, env, binding_dict or config.binding_dicts[helper.name])
+
+
+def clear_binding_caches():
+ """Clear the binding helper's caches for testing."""
+ for helper in _BINDING_HELPERS:
+ if isinstance(helper, CachingBindingHelper):
+ helper.flush_cache()
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/apache/aurora/client/cli/BUILD
----------------------------------------------------------------------
diff --git a/src/main/python/apache/aurora/client/cli/BUILD b/src/main/python/apache/aurora/client/cli/BUILD
new file mode 100644
index 0000000..0a44a21
--- /dev/null
+++ b/src/main/python/apache/aurora/client/cli/BUILD
@@ -0,0 +1,26 @@
+
+python_binary(
+ name='client',
+ entry_point = 'twitter.aurora.cli:main',
+ dependencies = [ pants(':cli') ],
+ )
+
+python_library(
+ name='cli',
+ sources = [ '__init__.py', 'context.py', 'jobs.py', 'options.py' ],
+ dependencies = [
+ pants('aurora/twitterdeps/src/python/twitter/common/python'),
+ pants('src/main/python/twitter/aurora/client/api:command_runner'),
+ pants('src/main/python/twitter/aurora/client/api:disambiguator'),
+ pants('src/main/python/twitter/aurora/client/api:job_monitor'),
+ pants('src/main/python/twitter/aurora/client/api:updater'),
+ pants('src/main/python/twitter/aurora/client/hooks'),
+ pants('src/main/python/twitter/aurora/client:base'),
+ pants('src/main/python/twitter/aurora/client:config'),
+ pants('src/main/python/twitter/aurora/client:factory'),
+ pants('src/main/python/twitter/aurora/client:options'),
+ pants('src/main/python/twitter/aurora/common'),
+ pants('src/main/thrift/com/twitter/aurora/gen:py-thrift'),
+ pants('src/main/python/twitter/aurora:argparse')
+ ]
+ )
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/apache/aurora/client/cli/__init__.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/aurora/client/cli/__init__.py b/src/main/python/apache/aurora/client/cli/__init__.py
new file mode 100644
index 0000000..2c08cf9
--- /dev/null
+++ b/src/main/python/apache/aurora/client/cli/__init__.py
@@ -0,0 +1,213 @@
+'''Command-line tooling infrastructure for aurora client v2.
+
+This provides a framework for a noun/verb command-line application. The application is structured
+around a collection of basic objects (nouns) that can be manipulated by the command line, where
+each type of object provides a collection of operations (verbs). Every command invocation
+consists of the name of the noun, followed by one of the verbs for that noun, followed by other
+arguments needed by the verb.
+
+For example:
+- To create a job, the noun is "job", the verb is "create":
+ $ aurora job create us-west/www/prod/server server.aurora
+
+- To find out the resource quota for a specific user, the noun is "user" and the verb is
+ "get_quota":
+ $ aurora user get_quota mchucarroll
+'''
+
+from __future__ import print_function
+
+from abc import abstractmethod
+import argparse
+import sys
+
+
+# Constants for standard return codes.
+EXIT_OK = 0
+EXIT_INVALID_CONFIGURATION = 3
+EXIT_COMMAND_FAILURE = 4
+EXIT_INVALID_COMMAND = 5
+EXIT_INVALID_PARAMETER = 6
+EXIT_NETWORK_ERROR = 7
+EXIT_PERMISSION_VIOLATION = 8
+EXIT_TIMEOUT = 9
+EXIT_UNKNOWN_ERROR = 20
+
+
+class Context(object):
+ class Error(Exception): pass
+
+ class ArgumentException(Error): pass
+
+ class CommandError(Error):
+ def __init__(self, code, msg):
+ super(Context.CommandError, self).__init__(msg)
+ self.msg = msg
+ self.code = code
+
+ def set_options(self, options):
+ """Add the options object to a context.
+ This is separated from the constructor to make patching tests easier.
+ """
+ self.options = options
+
+
+class CommandOption(object):
+ """A lightweight encapsulation of an argparse option specification, which can be used to
+ define options that can be reused by multiple commands."""
+
+ def __init__(self, *args, **kwargs):
+ self.args = args
+ self.kwargs = kwargs
+
+ def add_to_parser(self, parser):
+ parser.add_argument(*self.args, **self.kwargs)
+
+
+class AuroraCommand(object):
+ def setup_options_parser(self, argparser):
+ """Set up command line options parsing for this command.
+ This is a thin veneer over the standard python argparse system.
+ :param argparser: the argument parser where this command can add its arguments.
+ """
+ pass
+
+ def add_option(self, argparser, option):
+ """Add a predefined argument encapsulated an a CommandOption to an argument parser."""
+ if not isinstance(option, CommandOption):
+ raise TypeError('Command option object must be an instance of CommandOption')
+ option.add_to_parser(argparser)
+
+ @property
+ def help(self):
+ """The help message for a command that will be used in the argparse help message"""
+
+ @property
+ def name(self):
+ """The command name"""
+
+
+class CommandLine(object):
+ """The top-level object implementing a command-line application."""
+
+ def __init__(self):
+ self.nouns = {}
+ self.parser = None
+
+ def register_noun(self, noun):
+ """Add a noun to the application"""
+ if not isinstance(noun, Noun):
+ raise TypeError('register_noun requires a Noun argument')
+ self.nouns[noun.name] = noun
+
+ def setup_options_parser(self):
+ """ Build the options parsing for the application."""
+ self.parser = argparse.ArgumentParser()
+ subparser = self.parser.add_subparsers(dest='noun')
+ for (name, noun) in self.nouns.items():
+ noun_parser = subparser.add_parser(name, help=noun.help)
+ noun.internal_setup_options_parser(noun_parser)
+
+ def register_nouns(self):
+ """This method should overridden by applications to register the collection of nouns
+ that they can manipulate.
+ """
+ pass
+
+ def execute(self, args):
+ """Execute a command.
+ :param args: the command-line arguments for the command. This only includes arguments
+ that should be parsed by the application; it does not include sys.argv[0].
+ """
+ self.register_nouns()
+ self.setup_options_parser()
+ options = self.parser.parse_args(args)
+ if options.noun not in self.nouns:
+ raise ValueError('Unknown command: %s' % options.noun)
+ noun = self.nouns[options.noun]
+ context = noun.create_context()
+ context.set_options(options)
+ try:
+ return noun.execute(context)
+ except Context.CommandError as c:
+ print('Error executing command: %s' % c.msg, file=sys.stderr)
+ return c.code
+
+
+class Noun(AuroraCommand):
+ """A type of object manipulated by a command line application"""
+ class InvalidVerbException(Exception): pass
+
+ def __init__(self):
+ super(Noun, self).__init__()
+ self.verbs = {}
+
+ def register_verb(self, verb):
+ """Add an operation supported for this noun."""
+ if not isinstance(verb, Verb):
+ raise TypeError('register_verb requires a Verb argument')
+ self.verbs[verb.name] = verb
+ verb._register(self)
+
+ def internal_setup_options_parser(self, argparser):
+ """Internal driver for the options processing framework."""
+ self.setup_options_parser(argparser)
+ subparser = argparser.add_subparsers(dest='verb')
+ for (name, verb) in self.verbs.items():
+ vparser = subparser.add_parser(name, help=verb.help)
+ verb.setup_options_parser(vparser)
+
+ @classmethod
+ def create_context(cls):
+ """Commands access state through a context object. The noun specifies what kind
+ of context should be created for this noun's required state.
+ """
+ pass
+
+ @abstractmethod
+ def setup_options_parser(self, argparser):
+ pass
+
+ def execute(self, context):
+ if context.options.verb not in self.verbs:
+ raise self.InvalidVerbException('Noun %s does not have a verb %s' %
+ (self.name, context.options.verb))
+ self.verbs[context.options.verb].execute(context)
+
+
+class Verb(AuroraCommand):
+ """An operation for a noun. Most application logic will live in verbs."""
+
+ def _register(self, noun):
+ """Create a link from a verb to its noun."""
+ self.noun = noun
+
+ @abstractmethod
+ def setup_options_parser(self, argparser):
+ pass
+
+ def execute(self, context):
+ pass
+
+
+class AuroraCommandLine(CommandLine):
+ """ An example implementation of a command line application using this framework.
+ This should probably eventually get moved in to its own source file.
+ """
+
+ @classmethod
+ def get_description(cls):
+ return 'Aurora client command line'
+
+ def register_nouns(self):
+ from .jobs import Job
+ self.register_noun(Job())
+
+
+def main():
+ cmd = AuroraCommandLine()
+ cmd.execute(sys.argv[1:])
+
+
+if __name__ == '__main__':
+ main(sys.argv)
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/apache/aurora/client/cli/context.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/aurora/client/cli/context.py b/src/main/python/apache/aurora/client/cli/context.py
new file mode 100644
index 0000000..2ae92ec
--- /dev/null
+++ b/src/main/python/apache/aurora/client/cli/context.py
@@ -0,0 +1,49 @@
+
+from twitter.aurora.client.base import synthesize_url
+from twitter.aurora.client.cli import Context, EXIT_NETWORK_ERROR
+from twitter.aurora.client.config import get_config
+from twitter.aurora.client.factory import make_client
+from twitter.common import log
+
+from gen.twitter.aurora.ttypes import ResponseCode
+
+
+class AuroraCommandContext(Context):
+ """A context object used by Aurora commands to manage command processing state
+ and common operations.
+ """
+
+ def get_api(self, cluster):
+ """Creates an API object for a specified cluster"""
+ return make_client(cluster)
+
+ def get_job_config(self, job_key, config_file):
+ """Loads a job configuration from a config file"""
+ jobname = job_key.name
+ return get_config(
+ jobname,
+ config_file,
+ self.options.json,
+ self.options.bindings,
+ select_cluster=job_key.cluster,
+ select_role=job_key.role,
+ select_env=job_key.env)
+
+ def open_page(self, url):
+ import webbrowser
+ webbrowser.open_new_tab(url)
+
+ def open_job_page(self, api, config):
+ self.open_page(synthesize_url(api.scheduler.scheduler().url, config.role(),
+ config.environment(), config.name()))
+
+ def handle_open(self, api):
+ if self.options.open_browser:
+ self.open_page(synthesize_url(api.scheduler.scheduler().url,
+ self.options.jobspec.role, self.options.jobspec.env, self.options.jobspec.name))
+
+ def check_and_log_response(self, resp):
+ log.info('Response from scheduler: %s (message: %s)'
+ % (ResponseCode._VALUES_TO_NAMES[resp.responseCode], resp.message))
+ if resp.responseCode != ResponseCode.OK:
+ raise self.CommandError(EXIT_NETWORK_ERROR, resp.message)
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/apache/aurora/client/cli/jobs.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/aurora/client/cli/jobs.py b/src/main/python/apache/aurora/client/cli/jobs.py
new file mode 100644
index 0000000..e66f181
--- /dev/null
+++ b/src/main/python/apache/aurora/client/cli/jobs.py
@@ -0,0 +1,115 @@
+from twitter.aurora.client.api.job_monitor import JobMonitor
+from twitter.aurora.client.cli import (
+ EXIT_INVALID_CONFIGURATION,
+ Noun,
+ Verb
+)
+from twitter.aurora.client.cli.context import AuroraCommandContext
+from twitter.aurora.client.cli.options import (
+ BIND_OPTION,
+ BROWSER_OPTION,
+ CONFIG_OPTION,
+ JOBSPEC_OPTION,
+ JSON_OPTION
+)
+from twitter.aurora.common.aurora_job_key import AuroraJobKey
+
+from pystachio.config import Config
+
+
+def parse_instances(instances):
+ """Parse lists of instances or instance ranges into a set().
+
+ Examples:
+ 0-2
+ 0,1-3,5
+ 1,3,5
+ """
+ if instances is None or instances == '':
+ return None
+ result = set()
+ for part in instances.split(','):
+ x = part.split('-')
+ result.update(range(int(x[0]), int(x[-1]) + 1))
+ return sorted(result)
+
+
+class CreateJobCommand(Verb):
+ @property
+ def name(self):
+ return 'create'
+
+ @property
+ def help(self):
+ return 'Create a job using aurora'
+
+ CREATE_STATES = ('PENDING', 'RUNNING', 'FINISHED')
+
+ def setup_options_parser(self, parser):
+ self.add_option(parser, BIND_OPTION)
+ self.add_option(parser, BROWSER_OPTION)
+ self.add_option(parser, JSON_OPTION)
+ parser.add_argument('--wait_until', choices=self.CREATE_STATES,
+ default='PENDING',
+ help=('Block the client until all the tasks have transitioned into the requested state. '
+ 'Default: PENDING'))
+ self.add_option(parser, JOBSPEC_OPTION)
+ self.add_option(parser, CONFIG_OPTION)
+
+ def execute(self, context):
+ try:
+ config = context.get_job_config(context.options.jobspec, context.options.config_file)
+ except Config.InvalidConfigError as e:
+ raise context.CommandError(EXIT_INVALID_CONFIGURATION,
+ 'Error loading job configuration: %s' % e)
+ api = context.get_api(config.cluster())
+ monitor = JobMonitor(api, config.role(), config.environment(), config.name())
+ resp = api.create_job(config)
+ context.check_and_log_response(resp)
+ if context.options.open_browser:
+ context.open_job_page(api, config)
+ if context.options.wait_until == 'RUNNING':
+ monitor.wait_until(monitor.running_or_finished)
+ elif context.options.wait_until == 'FINISHED':
+ monitor.wait_until(monitor.terminal)
+
+
+class KillJobCommand(Verb):
+ @property
+ def name(self):
+ return 'kill'
+
+ def setup_options_parser(self, parser):
+ self.add_option(parser, BROWSER_OPTION)
+ parser.add_argument('--instances', type=parse_instances, dest='instances', default=None,
+ help='A list of instance ids to act on. Can either be a comma-separated list (e.g. 0,1,2) '
+ 'or a range (e.g. 0-2) or any combination of the two (e.g. 0-2,5,7-9). If not set, '
+ 'all instances will be acted on.')
+ parser.add_argument('--config', type=str, default=None, dest='config',
+ help='Config file for the job, possibly containing hooks')
+ self.add_option(parser, JOBSPEC_OPTION)
+
+ def execute(self, context):
+ api = context.get_api(context.options.jobspec.cluster)
+ resp = api.kill_job(context.options.jobspec, context.options.instances)
+ context.check_and_log_response(resp)
+ context.handle_open(api)
+
+
+class Job(Noun):
+ @property
+ def name(self):
+ return 'job'
+
+ @property
+ def help(self):
+ return "Work with an aurora job"
+
+ @classmethod
+ def create_context(cls):
+ return AuroraCommandContext()
+
+ def __init__(self):
+ super(Job, self).__init__()
+ self.register_verb(CreateJobCommand())
+ self.register_verb(KillJobCommand())
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/apache/aurora/client/cli/options.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/aurora/client/cli/options.py b/src/main/python/apache/aurora/client/cli/options.py
new file mode 100644
index 0000000..64a58f8
--- /dev/null
+++ b/src/main/python/apache/aurora/client/cli/options.py
@@ -0,0 +1,25 @@
+from twitter.aurora.client.cli import CommandOption
+from twitter.aurora.common.aurora_job_key import AuroraJobKey
+
+
+BIND_OPTION = CommandOption('--bind', type=str, default=[], dest='bindings',
+ action='append',
+ help='Bind a thermos mustache variable name to a value. '
+ 'Multiple flags may be used to specify multiple values.')
+
+
+BROWSER_OPTION = CommandOption('--open-browser', default=False, dest='open_browser',
+ action='store_true',
+ help='open browser to view job page after job is created')
+
+
+CONFIG_OPTION = CommandOption('config_file', type='str', dest='config_file',
+ help='pathname of the aurora configuration file contain the job specification')
+
+
+JOBSPEC_OPTION = CommandOption('jobspec', type=AuroraJobKey.from_path,
+ help='Fully specified job key, in CLUSTER/ROLE/ENV/NAME format')
+
+
+JSON_OPTION = CommandOption('--json', default=False, dest='json', action='store_true',
+ help='Read job configuration in json format')
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/apache/aurora/client/commands/BUILD
----------------------------------------------------------------------
diff --git a/src/main/python/apache/aurora/client/commands/BUILD b/src/main/python/apache/aurora/client/commands/BUILD
new file mode 100644
index 0000000..3bcb6fd
--- /dev/null
+++ b/src/main/python/apache/aurora/client/commands/BUILD
@@ -0,0 +1,81 @@
+python_library(
+ name = 'all',
+ dependencies = [
+ pants(':core'),
+ pants(':help'),
+ pants(':run'),
+ pants(':ssh'),
+ ]
+)
+
+python_library(
+ name = 'admin',
+ sources = ['admin.py'],
+ dependencies = [
+ pants('aurora/twitterdeps/src/python/twitter/common/app'),
+ pants('aurora/twitterdeps/src/python/twitter/common/log'),
+ pants('aurora/twitterdeps/src/python/twitter/common/quantity'),
+ pants('src/main/python/twitter/aurora/admin:mesos_maintenance'),
+ pants('src/main/python/twitter/aurora/client/api'),
+ pants('src/main/python/twitter/aurora/client:base'),
+ pants('src/main/python/twitter/aurora/common:clusters'),
+ pants('src/main/thrift/com/twitter/aurora/gen:py-thrift'),
+ ]
+)
+
+python_library(
+ name = 'core',
+ sources = ['core.py'],
+ dependencies = [
+ pants('aurora/twitterdeps/src/python/twitter/common/app'),
+ pants('aurora/twitterdeps/src/python/twitter/common/log'),
+ pants('aurora/twitterdeps/src/python/twitter/common/python'),
+ pants('src/main/python/twitter/aurora/client/api:command_runner'),
+ pants('src/main/python/twitter/aurora/client/api:disambiguator'),
+ pants('src/main/python/twitter/aurora/client/api:job_monitor'),
+ pants('src/main/python/twitter/aurora/client/api:updater'),
+ pants('src/main/python/twitter/aurora/client/hooks'),
+ pants('src/main/python/twitter/aurora/client:base'),
+ pants('src/main/python/twitter/aurora/client:config'),
+ pants('src/main/python/twitter/aurora/client:factory'),
+ pants('src/main/python/twitter/aurora/client:options'),
+ pants('src/main/python/twitter/aurora/common'),
+ pants('src/main/thrift/com/twitter/aurora/gen:py-thrift'),
+ ]
+)
+
+python_library(
+ name = 'help',
+ sources = ['help.py'],
+ dependencies = [
+ pants('aurora/twitterdeps/src/python/twitter/common/app'),
+ pants('src/main/python/twitter/aurora/client:base'),
+ ]
+)
+
+python_library(
+ name = 'run',
+ sources = ['run.py'],
+ dependencies = [
+ pants('aurora/twitterdeps/src/python/twitter/common/app'),
+ pants('src/main/python/twitter/aurora/client/api:command_runner'),
+ pants('src/main/python/twitter/aurora/client:base'),
+ pants('src/main/python/twitter/aurora/client:options'),
+ pants('src/main/python/twitter/aurora/common:aurora_job_key'),
+ pants('src/main/python/twitter/aurora/common:clusters'),
+ ]
+)
+
+python_library(
+ name = 'ssh',
+ sources = ['ssh.py'],
+ dependencies = [
+ pants('aurora/twitterdeps/src/python/twitter/common/app'),
+ pants('src/main/python/twitter/aurora/client/api:command_runner'),
+ pants('src/main/python/twitter/aurora/client:base'),
+ pants('src/main/python/twitter/aurora/client:factory'),
+ pants('src/main/python/twitter/aurora/client:options'),
+ pants('src/main/python/twitter/aurora/common:aurora_job_key'),
+ pants('src/main/python/twitter/aurora/common:clusters'),
+ ]
+)
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/apache/aurora/client/commands/__init__.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/aurora/client/commands/__init__.py b/src/main/python/apache/aurora/client/commands/__init__.py
new file mode 100644
index 0000000..e69de29
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/apache/aurora/client/commands/admin.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/aurora/client/commands/admin.py b/src/main/python/apache/aurora/client/commands/admin.py
new file mode 100644
index 0000000..c1c9c42
--- /dev/null
+++ b/src/main/python/apache/aurora/client/commands/admin.py
@@ -0,0 +1,406 @@
+from __future__ import print_function
+
+"""Command-line client for managing admin-only interactions with the aurora scheduler.
+"""
+
+import os
+import optparse
+import subprocess
+
+from twitter.aurora.admin.mesos_maintenance import MesosMaintenance
+from twitter.aurora.client.api import AuroraClientAPI
+from twitter.aurora.client.base import check_and_log_response, die, requires
+from twitter.aurora.common.clusters import CLUSTERS
+from twitter.common import app, log
+from twitter.common.quantity import Amount, Data
+from twitter.common.quantity.parse_simple import parse_data
+
+from gen.twitter.aurora.constants import ACTIVE_STATES, TERMINAL_STATES
+from gen.twitter.aurora.ttypes import (
+ ResponseCode,
+ ScheduleStatus,
+ TaskQuery,
+)
+
+
+GROUPING_OPTION = optparse.Option(
+ '--grouping',
+ type='choice',
+ choices=MesosMaintenance.GROUPING_FUNCTIONS.keys(),
+ metavar='GROUPING',
+ default=MesosMaintenance.DEFAULT_GROUPING,
+ dest='grouping',
+ help='Grouping function to use to group hosts. Options: %s. Default: %%default' % (
+ ', '.join(MesosMaintenance.GROUPING_FUNCTIONS.keys())))
+
+
+def parse_hosts(options):
+ if not (options.filename or options.hosts):
+ die('Please specify either --filename or --hosts')
+ if options.filename:
+ with open(options.filename, 'r') as hosts:
+ hosts = [hostname.strip() for hostname in hosts]
+ elif options.hosts:
+ hosts = [hostname.strip() for hostname in options.hosts.split(",")]
+ if not hosts:
+ die('No valid hosts found.')
+ return hosts
+
+
+@app.command
+@app.command_option('--force', dest='force', default=False, action='store_true',
+ help='Force expensive queries to run.')
+@app.command_option('--shards', dest='shards', default=None,
+ help='Only match given shards of a job.')
+@app.command_option('--states', dest='states', default='RUNNING',
+ help='Only match tasks with given state(s).')
+@app.command_option('-l', '--listformat', dest='listformat',
+ default="%role%/%jobName%/%instanceId% %status%",
+ help='Format string of job/task items to print out.')
+# TODO(ksweeney): Allow query by environment here.
+def query(args, options):
+ """usage: query [--shards=N[,N,...]]
+ [--states=State[,State,...]]
+ cluster [role [job]]
+
+ Query Mesos about jobs and tasks.
+ """
+ def _convert_fmt_string(fmtstr):
+ import re
+ def convert(match):
+ return "%%(%s)s" % match.group(1)
+ return re.sub(r'%(\w+)%', convert, fmtstr)
+
+ def flatten_task(t, d={}):
+ for key in t.__dict__.keys():
+ val = getattr(t, key)
+ try:
+ val.__dict__.keys()
+ except AttributeError:
+ d[key] = val
+ else:
+ flatten_task(val, d)
+
+ return d
+
+ def map_values(d):
+ default_value = lambda v: v
+ mapping = {
+ 'status': lambda v: ScheduleStatus._VALUES_TO_NAMES[v],
+ }
+ return dict(
+ (k, mapping.get(k, default_value)(v)) for (k, v) in d.items()
+ )
+
+ for state in options.states.split(','):
+ if state not in ScheduleStatus._NAMES_TO_VALUES:
+ msg = "Unknown state '%s' specified. Valid states are:\n" % state
+ msg += ','.join(ScheduleStatus._NAMES_TO_VALUES.keys())
+ die(msg)
+
+ # Role, Job, Instances, States, and the listformat
+ if len(args) == 0:
+ die('Must specify at least cluster.')
+
+ cluster = args[0]
+ role = args[1] if len(args) > 1 else None
+ job = args[2] if len(args) > 2 else None
+ instances = set(map(int, options.shards.split(','))) if options.shards else set()
+
+ if options.states:
+ states = set(map(ScheduleStatus._NAMES_TO_VALUES.get, options.states.split(',')))
+ else:
+ states = ACTIVE_STATES | TERMINAL_STATES
+ listformat = _convert_fmt_string(options.listformat)
+
+ # Figure out "expensive" queries here and bone if they do not have --force
+ # - Does not specify role
+ if role is None and not options.force:
+ die('--force is required for expensive queries (no role specified)')
+
+ # - Does not specify job
+ if job is None and not options.force:
+ die('--force is required for expensive queries (no job specified)')
+
+ # - Specifies status outside of ACTIVE_STATES
+ if not (states <= ACTIVE_STATES) and not options.force:
+ die('--force is required for expensive queries (states outside ACTIVE states')
+
+ api = AuroraClientAPI(CLUSTERS[cluster], options.verbosity)
+ query_info = api.query(api.build_query(role, job, instances=instances, statuses=states))
+ tasks = query_info.result.scheduleStatusResult.tasks
+ if query_info.responseCode != ResponseCode.OK:
+ die('Failed to query scheduler: %s' % query_info.message)
+ if tasks is None:
+ return
+
+ try:
+ for task in tasks:
+ d = flatten_task(task)
+ print(listformat % map_values(d))
+ except KeyError:
+ msg = "Unknown key in format string. Valid keys are:\n"
+ msg += ','.join(d.keys())
+ die(msg)
+
+
+@app.command
+@requires.exactly('cluster', 'role', 'cpu', 'ramMb', 'diskMb')
+def set_quota(cluster, role, cpu_str, ram_mb_str, disk_mb_str):
+ """usage: set_quota cluster role cpu ramMb diskMb
+
+ Alters the amount of production quota allocated to a user.
+ """
+ try:
+ cpu = float(cpu_str)
+ ram_mb = int(ram_mb_str)
+ disk_mb = int(disk_mb_str)
+ except ValueError:
+ log.error('Invalid value')
+
+ options = app.get_options()
+ resp = AuroraClientAPI(CLUSTERS[cluster], options.verbosity).set_quota(role, cpu, ram_mb, disk_mb)
+ check_and_log_response(resp)
+
+
+@app.command
+@app.command_option('--filename', dest='filename', default=None,
+ help='Name of the file with hostnames')
+@app.command_option('--hosts', dest='hosts', default=None,
+ help='Comma separated list of hosts')
+@requires.exactly('cluster')
+def start_maintenance_hosts(cluster):
+ """usage: start_maintenance_hosts cluster [--filename=filename]
+ [--hosts=hosts]
+ """
+ options = app.get_options()
+ MesosMaintenance(CLUSTERS[cluster], options.verbosity).start_maintenance(parse_hosts(options))
+
+
+@app.command
+@app.command_option('--filename', dest='filename', default=None,
+ help='Name of the file with hostnames')
+@app.command_option('--hosts', dest='hosts', default=None,
+ help='Comma separated list of hosts')
+@requires.exactly('cluster')
+def end_maintenance_hosts(cluster):
+ """usage: end_maintenance_hosts cluster [--filename=filename]
+ [--hosts=hosts]
+ """
+ options = app.get_options()
+ MesosMaintenance(CLUSTERS[cluster], options.verbosity).end_maintenance(parse_hosts(options))
+
+
+@app.command
+@app.command_option('--filename', dest='filename', default=None,
+ help='Name of the file with hostnames')
+@app.command_option('--hosts', dest='hosts', default=None,
+ help='Comma separated list of hosts')
+@app.command_option('--batch_size', dest='batch_size', default=1,
+ help='Number of groups to operate on at a time.')
+@app.command_option('--post_drain_script', dest='post_drain_script', default=None,
+ help='Path to a script to run for each host.')
+@app.command_option(GROUPING_OPTION)
+@requires.exactly('cluster')
+def perform_maintenance_hosts(cluster):
+ """usage: perform_maintenance cluster [--filename=filename]
+ [--hosts=hosts]
+ [--batch_size=num]
+ [--post_drain_script=path]
+ [--grouping=function]
+
+ Asks the scheduler to remove any running tasks from the machine and remove it
+ from service temporarily, perform some action on them, then return the machines
+ to service.
+ """
+ options = app.get_options()
+ drainable_hosts = parse_hosts(options)
+
+ if options.post_drain_script:
+ if not os.path.exists(options.post_drain_script):
+ die("No such file: %s" % options.post_drain_script)
+ cmd = os.path.abspath(options.post_drain_script)
+ drained_callback = lambda host: subprocess.Popen([cmd, host])
+ else:
+ drained_callback = None
+
+ MesosMaintenance(CLUSTERS[cluster], options.verbosity).perform_maintenance(
+ drainable_hosts,
+ batch_size=int(options.batch_size),
+ callback=drained_callback,
+ grouping_function=options.grouping)
+
+
+@app.command
+@app.command_option('--filename', dest='filename', default=None,
+ help='Name of the file with hostnames')
+@app.command_option('--hosts', dest='hosts', default=None,
+ help='Comma separated list of hosts')
+@requires.exactly('cluster')
+def host_maintenance_status(cluster):
+ """usage: host_maintenance_status cluster [--filename=filename]
+ [--hosts=hosts]
+
+ Check on the schedulers maintenance status for a list of hosts in the cluster.
+ """
+ options = app.get_options()
+ checkable_hosts = parse_hosts(options)
+ statuses = MesosMaintenance(CLUSTERS[cluster], options.verbosity).check_status(checkable_hosts)
+ for pair in statuses:
+ log.info("%s is in state: %s" % pair)
+
+
+@app.command
+@requires.exactly('cluster', 'role', 'cpu', 'ram', 'disk')
+def increase_quota(cluster, role, cpu_str, ram_str, disk_str):
+ """usage: increase_quota cluster role cpu ram[unit] disk[unit]
+
+ Increases the amount of production quota allocated to a user.
+ """
+ cpu = float(cpu_str)
+ ram = parse_data(ram_str)
+ disk = parse_data(disk_str)
+
+ options = app.get_options()
+ client = AuroraClientAPI(CLUSTERS[cluster], options.verbosity == 'verbose')
+ resp = client.get_quota(role)
+ quota = resp.result.getQuotaResult.quota
+ log.info('Current quota for %s:\n\tCPU\t%s\n\tRAM\t%s MB\n\tDisk\t%s MB' %
+ (role, quota.numCpus, quota.ramMb, quota.diskMb))
+
+ new_cpu = cpu + quota.numCpus
+ new_ram = ram + Amount(quota.ramMb, Data.MB)
+ new_disk = disk + Amount(quota.diskMb, Data.MB)
+
+ log.info('Attempting to update quota for %s to\n\tCPU\t%s\n\tRAM\t%s MB\n\tDisk\t%s MB' %
+ (role, new_cpu, new_ram.as_(Data.MB), new_disk.as_(Data.MB)))
+
+ resp = client.set_quota(role, new_cpu, new_ram.as_(Data.MB), new_disk.as_(Data.MB))
+ check_and_log_response(resp)
+
+
+@app.command
+@requires.exactly('cluster')
+def scheduler_backup_now(cluster):
+ """usage: scheduler_backup_now cluster
+
+ Immediately initiates a full storage backup.
+ """
+ options = app.get_options()
+ check_and_log_response(AuroraClientAPI(CLUSTERS[cluster], options.verbosity).perform_backup())
+
+
+@app.command
+@requires.exactly('cluster')
+def scheduler_list_backups(cluster):
+ """usage: scheduler_list_backups cluster
+
+ Lists backups available for recovery.
+ """
+ options = app.get_options()
+ resp = AuroraClientAPI(CLUSTERS[cluster], options.verbosity).list_backups()
+ check_and_log_response(resp)
+ backups = resp.result.listBackupsResult.backups
+ print('%s available backups:' % len(backups))
+ for backup in backups:
+ print(backup)
+
+
+@app.command
+@requires.exactly('cluster', 'backup_id')
+def scheduler_stage_recovery(cluster, backup_id):
+ """usage: scheduler_stage_recovery cluster backup_id
+
+ Stages a backup for recovery.
+ """
+ options = app.get_options()
+ check_and_log_response(
+ AuroraClientAPI(CLUSTERS[cluster], options.verbosity).stage_recovery(backup_id))
+
+
+@app.command
+@requires.exactly('cluster')
+def scheduler_print_recovery_tasks(cluster):
+ """usage: scheduler_print_recovery_tasks cluster
+
+ Prints all active tasks in a staged recovery.
+ """
+ options = app.get_options()
+ resp = AuroraClientAPI(CLUSTERS[cluster], options.verbosity).query_recovery(
+ TaskQuery(statuses=ACTIVE_STATES))
+ check_and_log_response(resp)
+ log.info('Role\tJob\tShard\tStatus\tTask ID')
+ for task in resp.tasks:
+ assigned = task.assignedTask
+ conf = assigned.task
+ log.info('\t'.join((conf.owner.role,
+ conf.jobName,
+ str(assigned.instanceId),
+ ScheduleStatus._VALUES_TO_NAMES[task.status],
+ assigned.taskId)))
+
+
+@app.command
+@requires.exactly('cluster', 'task_ids')
+def scheduler_delete_recovery_tasks(cluster, task_ids):
+ """usage: scheduler_delete_recovery_tasks cluster task_ids
+
+ Deletes a comma-separated list of task IDs from a staged recovery.
+ """
+ ids = set(task_ids.split(','))
+ options = app.get_options()
+ check_and_log_response(AuroraClientAPI(CLUSTERS[cluster], options.verbosity)
+ .delete_recovery_tasks(TaskQuery(taskIds=ids)))
+
+
+@app.command
+@requires.exactly('cluster')
+def scheduler_commit_recovery(cluster):
+ """usage: scheduler_commit_recovery cluster
+
+ Commits a staged recovery.
+ """
+ options = app.get_options()
+ check_and_log_response(AuroraClientAPI(CLUSTERS[cluster], options.verbosity)
+ .commit_recovery())
+
+
+@app.command
+@requires.exactly('cluster')
+def scheduler_unload_recovery(cluster):
+ """usage: scheduler_unload_recovery cluster
+
+ Unloads a staged recovery.
+ """
+ options = app.get_options()
+ check_and_log_response(AuroraClientAPI(CLUSTERS[cluster], options.verbosity)
+ .unload_recovery())
+
+
+@app.command
+@requires.exactly('cluster')
+def scheduler_list_job_updates(cluster):
+ """usage: scheduler_list_job_updates cluster
+
+ Lists in-flight job updates.
+ """
+ options = app.get_options()
+ resp = AuroraClientAPI(CLUSTERS[cluster], options.verbosity).get_job_updates()
+ check_and_log_response(resp)
+ print('Role\tEnv\tJob')
+ for update in resp.jobUpdates:
+ print('%s\t%s\t%s' % (
+ update.jobKey.role if update.jobKey else update.roleDeprecated,
+ update.jobKey.environment if update.jobKey else None,
+ update.jobKey.name if update.jobKey else update.jobDeprecated))
+
+
+@app.command
+@requires.exactly('cluster')
+def scheduler_snapshot(cluster):
+ """usage: scheduler_snapshot cluster
+
+ Request that the scheduler perform a storage snapshot and block until complete.
+ """
+ options = app.get_options()
+ check_and_log_response(AuroraClientAPI(CLUSTERS['cluster'], options.verbosity).snapshot())
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/apache/aurora/client/commands/core.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/aurora/client/commands/core.py b/src/main/python/apache/aurora/client/commands/core.py
new file mode 100644
index 0000000..712c7ea
--- /dev/null
+++ b/src/main/python/apache/aurora/client/commands/core.py
@@ -0,0 +1,600 @@
+"""Command-line client for managing jobs with the Aurora scheduler.
+"""
+
+from __future__ import print_function
+
+import collections
+from datetime import datetime
+import json
+import os
+import pprint
+import subprocess
+import sys
+import time
+from tempfile import NamedTemporaryFile
+
+from twitter.common import app, log
+from twitter.common.python.pex import PexInfo
+from twitter.common.python.dirwrapper import PythonDirectoryWrapper
+
+from twitter.aurora.client.base import (
+ check_and_log_response,
+ deprecation_warning,
+ die,
+ handle_open,
+ requires,
+ synthesize_url)
+from twitter.aurora.client.api.disambiguator import LiveJobDisambiguator
+from twitter.aurora.client.api.job_monitor import JobMonitor
+from twitter.aurora.client.api.updater_util import UpdaterConfig
+from twitter.aurora.client.config import get_config
+from twitter.aurora.client.factory import make_client, make_client_factory
+from twitter.aurora.client.options import (
+ CLUSTER_CONFIG_OPTION,
+ CLUSTER_INVOKE_OPTION,
+ ENV_CONFIG_OPTION,
+ ENVIRONMENT_BIND_OPTION,
+ FROM_JOBKEY_OPTION,
+ HEALTH_CHECK_INTERVAL_SECONDS_OPTION,
+ JSON_OPTION,
+ OPEN_BROWSER_OPTION,
+ SHARDS_OPTION,
+ WAIT_UNTIL_OPTION)
+from twitter.aurora.common.aurora_job_key import AuroraJobKey
+
+from gen.twitter.aurora.constants import ACTIVE_STATES, CURRENT_API_VERSION, AURORA_EXECUTOR_NAME
+from gen.twitter.aurora.ttypes import ExecutorConfig, ResponseCode, ScheduleStatus
+
+
+def get_job_config(job_spec, config_file, options):
+ try:
+ job_key = AuroraJobKey.from_path(job_spec)
+ select_cluster = job_key.cluster
+ select_env = job_key.env
+ select_role = job_key.role
+ jobname = job_key.name
+ except AuroraJobKey.Error:
+ deprecation_warning('Please refer to your job in CLUSTER/ROLE/ENV/NAME format.')
+ select_cluster = options.cluster if options.cluster else None
+ select_env = options.env
+ select_role = None
+ jobname = job_spec
+ try:
+ json_option = options.json
+ except AttributeError:
+ json_option = False
+ try:
+ bindings = options.bindings
+ except AttributeError:
+ bindings = ()
+ return get_config(
+ jobname,
+ config_file,
+ json_option,
+ bindings,
+ select_cluster=select_cluster,
+ select_role=select_role,
+ select_env=select_env)
+
+@app.command
+def version(args):
+ """usage: version
+
+ Prints information about the version of the aurora client being run.
+ """
+ try:
+ pexpath = sys.argv[0]
+ pex_info = PexInfo.from_pex(PythonDirectoryWrapper.get(pexpath))
+ print("Aurora client build info:")
+ print("\tsha: %s" % pex_info.build_properties['sha'])
+ print("\tdate: %s" % pex_info.build_properties['date'])
+ except (IOError, PythonDirectoryWrapper.Error):
+ print("Aurora client build info not available")
+ print("Aurora API version: %s" % CURRENT_API_VERSION)
+
+
+@app.command
+@app.command_option(ENVIRONMENT_BIND_OPTION)
+@app.command_option(OPEN_BROWSER_OPTION)
+@app.command_option(CLUSTER_CONFIG_OPTION)
+@app.command_option(ENV_CONFIG_OPTION)
+@app.command_option(JSON_OPTION)
+@app.command_option(WAIT_UNTIL_OPTION)
+@requires.exactly('cluster/role/env/job', 'config')
+def create(job_spec, config_file):
+ """usage: create cluster/role/env/job config
+
+ Creates a job based on a configuration file.
+ """
+ options = app.get_options()
+ try:
+ config = get_job_config(job_spec, config_file, options)
+ except ValueError as v:
+ print("Error: %s" % v)
+ sys.exit(1)
+ api = make_client(config.cluster())
+ monitor = JobMonitor(api, config.role(), config.environment(), config.name())
+ resp = api.create_job(config)
+ check_and_log_response(resp)
+ handle_open(api.scheduler.scheduler().url, config.role(), config.environment(), config.name())
+ if options.wait_until == 'RUNNING':
+ monitor.wait_until(monitor.running_or_finished)
+ elif options.wait_until == 'FINISHED':
+ monitor.wait_until(monitor.terminal)
+
+
+@app.command
+@app.command_option(ENVIRONMENT_BIND_OPTION)
+@app.command_option(CLUSTER_CONFIG_OPTION)
+@app.command_option(ENV_CONFIG_OPTION)
+@app.command_option(JSON_OPTION)
+@app.command_option(FROM_JOBKEY_OPTION)
+@requires.exactly('cluster/role/env/job', 'config')
+def diff(job_spec, config_file):
+ """usage: diff cluster/role/env/job config
+
+ Compares a job configuration against a running job.
+ By default the diff will be displayed using 'diff', though you may choose an alternate
+ diff program by specifying the DIFF_VIEWER environment variable."""
+ options = app.get_options()
+ config = get_job_config(job_spec, config_file, options)
+ if options.rename_from:
+ cluster, role, env, name = options.rename_from
+ else:
+ cluster = config.cluster()
+ role = config.role()
+ env = config.environment()
+ name = config.name()
+ api = make_client(cluster)
+ resp = api.query(api.build_query(role, name, statuses=ACTIVE_STATES, env=env))
+ if resp.responseCode != ResponseCode.OK:
+ die('Request failed, server responded with "%s"' % resp.message)
+ remote_tasks = [t.assignedTask.task for t in resp.result.scheduleStatusResult.tasks]
+ resp = api.populate_job_config(config)
+ if resp.responseCode != ResponseCode.OK:
+ die('Request failed, server responded with "%s"' % resp.message)
+ local_tasks = resp.result.populateJobResult.populated
+
+ pp = pprint.PrettyPrinter(indent=2)
+ def pretty_print_task(task):
+ # The raw configuration is not interesting - we only care about what gets parsed.
+ task.configuration = None
+ task.executorConfig = ExecutorConfig(
+ name=AURORA_EXECUTOR_NAME,
+ data=json.loads(task.executorConfig.data))
+ return pp.pformat(vars(task))
+
+ def pretty_print_tasks(tasks):
+ return ',\n'.join([pretty_print_task(t) for t in tasks])
+
+ def dump_tasks(tasks, out_file):
+ out_file.write(pretty_print_tasks(tasks))
+ out_file.write('\n')
+ out_file.flush()
+
+ diff_program = os.environ.get('DIFF_VIEWER', 'diff')
+ with NamedTemporaryFile() as local:
+ dump_tasks(local_tasks, local)
+ with NamedTemporaryFile() as remote:
+ dump_tasks(remote_tasks, remote)
+ result = subprocess.call([diff_program, remote.name, local.name])
+ # Unlike most commands, diff doesn't return zero on success; it returns
+ # 1 when a successful diff is non-empty.
+ if result != 0 and result != 1:
+ return result
+ else:
+ return 0
+
+
+@app.command(name='open')
+def do_open(args, _):
+ """usage: open cluster[/role[/env/job]]
+
+ Opens the scheduler page for a cluster, role or job in the default web browser.
+ """
+ cluster_name = role = env = job = None
+ args = args[0].split("/")
+ if len(args) > 0:
+ cluster_name = args[0]
+ if len(args) > 1:
+ role = args[1]
+ if len(args) > 2:
+ env = args[2]
+ if len(args) > 3:
+ job = args[3]
+ else:
+ # TODO(ksweeney): Remove this after MESOS-2945 is completed.
+ die('env scheduler pages are not yet implemented, please specify job')
+
+ if not cluster_name:
+ die('cluster is required')
+
+ api = make_client(cluster_name)
+
+ import webbrowser
+ webbrowser.open_new_tab(synthesize_url(api.scheduler.scheduler().url, role, env, job))
+
+
+@app.command
+@app.command_option('--local', dest='local', default=False, action='store_true',
+ help='Inspect the configuration as would be created by the "spawn" command.')
+@app.command_option('--raw', dest='raw', default=False, action='store_true',
+ help='Show the raw configuration.')
+@app.command_option(ENVIRONMENT_BIND_OPTION)
+@app.command_option(CLUSTER_CONFIG_OPTION)
+@app.command_option(ENV_CONFIG_OPTION)
+@app.command_option(JSON_OPTION)
+@requires.exactly('cluster/role/env/job', 'config')
+def inspect(job_spec, config_file):
+ """usage: inspect cluster/role/env/job config
+
+ Verifies that a job can be parsed from a configuration file, and displays
+ the parsed configuration.
+ """
+ options = app.get_options()
+ config = get_job_config(job_spec, config_file, options)
+ if options.raw:
+ print('Parsed job config: %s' % config.job())
+ return
+
+ job_thrift = config.job()
+ job = config.raw()
+ job_thrift = config.job()
+ print('Job level information')
+ print(' name: %s' % job.name())
+ print(' role: %s' % job.role())
+ print(' contact: %s' % job.contact())
+ print(' cluster: %s' % job.cluster())
+ print(' instances: %s' % job.instances())
+ if job.has_cron_schedule():
+ print(' cron:')
+ print(' schedule: %s' % job.cron_schedule())
+ print(' policy: %s' % job.cron_collision_policy())
+ if job.has_constraints():
+ print(' constraints:')
+ for constraint, value in job.constraints().get().items():
+ print(' %s: %s' % (constraint, value))
+ print(' service: %s' % job_thrift.taskConfig.isService)
+ print(' production: %s' % bool(job.production().get()))
+ print()
+
+ task = job.task()
+ print('Task level information')
+ print(' name: %s' % task.name())
+ if len(task.constraints().get()) > 0:
+ print(' constraints:')
+ for constraint in task.constraints():
+ print(' %s' % (' < '.join(st.get() for st in constraint.order())))
+ print()
+
+ processes = task.processes()
+ for process in processes:
+ print('Process %s:' % process.name())
+ if process.daemon().get():
+ print(' daemon')
+ if process.ephemeral().get():
+ print(' ephemeral')
+ if process.final().get():
+ print(' final')
+ print(' cmdline:')
+ for line in process.cmdline().get().splitlines():
+ print(' ' + line)
+ print()
+
+
+@app.command
+@app.command_option(CLUSTER_INVOKE_OPTION)
+@app.command_option(OPEN_BROWSER_OPTION)
+def start_cron(args, options):
+ """usage: start_cron cluster/role/env/job
+
+ Invokes a cron job immediately, out of its normal cron cycle.
+ This does not affect the cron cycle in any way.
+ """
+
+ api, job_key, config_file = LiveJobDisambiguator.disambiguate_args_or_die(
+ args, options, make_client_factory())
+ config = get_job_config(job_key.to_path(), config_file, options) if config_file else None
+ resp = api.start_cronjob(job_key, config=config)
+ check_and_log_response(resp)
+ handle_open(api.scheduler.scheduler().url, job_key.role, job_key.env, job_key.name)
+
+
+@app.command
+@app.command_option(
+ '--pretty',
+ dest='pretty',
+ default=False,
+ action='store_true',
+ help='Show job information in prettyprinted format')
+@app.command_option(
+ '--show-cron',
+ '-c',
+ dest='show_cron_schedule',
+ default=False,
+ action='store_true',
+ help='List jobs registered with the Aurora scheduler')
+@requires.exactly('cluster/role')
+def list_jobs(cluster_and_role):
+ """usage: list_jobs [--show-cron] cluster/role/env/job
+
+ Shows all jobs that match the job-spec known by the scheduler.
+ If --show-cron is specified, then also shows the registered cron schedule.
+ """
+ def show_job_simple(job):
+ if options.show_cron_schedule:
+ print(('{0}/{1.key.role}/{1.key.environment}/{1.key.name}' +
+ '\t\'{1.cronSchedule}\'\t{1.cronCollisionPolicy}').format(cluster, job))
+ else:
+ print('{0}/{1.key.role}/{1.key.environment}/{1.key.name}'.format(cluster, job))
+
+ def show_job_pretty(job):
+ print("Job %s/%s/%s/%s:" %
+ (cluster, job.key.role, job.key.environment, job.key.name))
+ print('\tcron schedule: %s' % job.cronSchedule)
+ print('\tcron policy: %s' % job.cronCollisionPolicy)
+
+ options = app.get_options()
+ if options.show_cron_schedule and options.pretty:
+ print_fn = show_job_pretty
+ else:
+ print_fn = show_job_simple
+ # Take the cluster_and_role parameter, and split it into its two components.
+ if cluster_and_role.count('/') != 1:
+ die('list_jobs parameter must be in cluster/role format')
+ (cluster,role) = cluster_and_role.split('/')
+ api = make_client(cluster)
+ resp = api.get_jobs(role)
+ check_and_log_response(resp)
+ for job in resp.result.getJobsResult.configs:
+ print_fn(job)
+
+
+@app.command
+@app.command_option(CLUSTER_INVOKE_OPTION)
+@app.command_option(OPEN_BROWSER_OPTION)
+@app.command_option(SHARDS_OPTION)
+def kill(args, options):
+ """usage: kill cluster/role/env/job
+
+ Kills a running job, blocking until all tasks have terminated.
+
+ Default behaviour is to kill all shards in the job, but the kill
+ can be limited to specific shards with the --shards option
+ """
+ api, job_key, config_file = LiveJobDisambiguator.disambiguate_args_or_die(
+ args, options, make_client_factory())
+ options = app.get_options()
+ config = get_job_config(job_key.to_path(), config_file, options) if config_file else None
+ resp = api.kill_job(job_key, options.shards, config=config)
+ check_and_log_response(resp)
+ handle_open(api.scheduler.scheduler().url, job_key.role, job_key.env, job_key.name)
+
+
+@app.command
+@app.command_option(CLUSTER_INVOKE_OPTION)
+def status(args, options):
+ """usage: status cluster/role/env/job
+
+ Fetches and prints information about the active tasks in a job.
+ """
+ def is_active(task):
+ return task.status in ACTIVE_STATES
+
+ def print_task(scheduled_task):
+ assigned_task = scheduled_task.assignedTask
+ taskInfo = assigned_task.task
+ taskString = ''
+ if taskInfo:
+ taskString += '''cpus: %s, ram: %s MB, disk: %s MB''' % (taskInfo.numCpus,
+ taskInfo.ramMb,
+ taskInfo.diskMb)
+ if assigned_task.assignedPorts:
+ taskString += '\n\tports: %s' % assigned_task.assignedPorts
+ taskString += '\n\tfailure count: %s (max %s)' % (scheduled_task.failureCount,
+ taskInfo.maxTaskFailures)
+ taskString += '\n\tevents:'
+ for event in scheduled_task.taskEvents:
+ taskString += '\n\t\t %s %s: %s' % (datetime.fromtimestamp(event.timestamp / 1000),
+ ScheduleStatus._VALUES_TO_NAMES[event.status],
+ event.message)
+ taskString += '\n\tpackages:'
+ for pkg in assigned_task.task.packages:
+ taskString += ('\n\t\trole: %s, package: %s, version: %s' % (pkg.role, pkg.name, pkg.version))
+
+ return taskString
+
+ def print_tasks(tasks):
+ for task in tasks:
+ taskString = print_task(task)
+
+ log.info('role: %s, env: %s, name: %s, shard: %s, status: %s on %s\n%s' %
+ (task.assignedTask.task.owner.role,
+ task.assignedTask.task.environment,
+ task.assignedTask.task.jobName,
+ task.assignedTask.instanceId,
+ ScheduleStatus._VALUES_TO_NAMES[task.status],
+ task.assignedTask.slaveHost,
+ taskString))
+ for pkg in task.assignedTask.task.packages:
+ log.info('\tpackage %s/%s/%s' % (pkg.role, pkg.name, pkg.version))
+
+ api, job_key, _ = LiveJobDisambiguator.disambiguate_args_or_die(
+ args, options, make_client_factory())
+ resp = api.check_status(job_key)
+ check_and_log_response(resp)
+
+ tasks = resp.result.scheduleStatusResult.tasks
+ if tasks:
+ active_tasks = filter(is_active, tasks)
+ log.info('Active Tasks (%s)' % len(active_tasks))
+ print_tasks(active_tasks)
+ inactive_tasks = filter(lambda x: not is_active(x), tasks)
+ log.info('Inactive Tasks (%s)' % len(inactive_tasks))
+ print_tasks(inactive_tasks)
+ else:
+ log.info('No tasks found.')
+
+
+@app.command
+@app.command_option(SHARDS_OPTION)
+@app.command_option(ENVIRONMENT_BIND_OPTION)
+@app.command_option(CLUSTER_CONFIG_OPTION)
+@app.command_option(ENV_CONFIG_OPTION)
+@app.command_option(JSON_OPTION)
+@app.command_option(HEALTH_CHECK_INTERVAL_SECONDS_OPTION)
+@app.command_option(
+ '--force',
+ dest='force',
+ default=True, # TODO(maximk): Temporary bandaid for MESOS-4310 until a better fix is available.
+ action='store_true',
+ help='Turn off warning message that the update looks large enough to be disruptive.')
+@requires.exactly('cluster/role/env/job', 'config')
+def update(job_spec, config_file):
+ """usage: update cluster/role/env/job config
+
+ Performs a rolling upgrade on a running job, using the update configuration
+ within the config file as a control for update velocity and failure tolerance.
+
+ Updates are fully controlled client-side, so aborting an update halts the
+ update and leaves the job in a 'locked' state on the scheduler.
+ Subsequent update attempts will fail until the update is 'unlocked' using the
+ 'cancel_update' command.
+
+ The updater only takes action on shards in a job that have changed, meaning
+ that changing a single shard will only induce a restart on the changed shard.
+
+ You may want to consider using the 'diff' subcommand before updating,
+ to preview what changes will take effect.
+ """
+ def warn_if_dangerous_change(api, job_spec, config):
+ # Get the current job status, so that we can check if there's anything
+ # dangerous about this update.
+ job_key = AuroraJobKey(config.cluster(), config.role(), config.environment(), config.name())
+ resp = api.query(api.build_query(config.role(), config.name(),
+ statuses=ACTIVE_STATES, env=config.environment()))
+ if resp.responseCode != ResponseCode.OK:
+ die('Could not get job status from server for comparison: %s' % resp.message)
+ remote_tasks = [t.assignedTask.task for t in resp.result.scheduleStatusResult.tasks]
+ resp = api.populate_job_config(config)
+ if resp.responseCode != ResponseCode.OK:
+ die('Server could not populate job config for comparison: %s' % resp.message)
+ local_task_count = len(resp.result.populateJobResult.populated)
+ remote_task_count = len(remote_tasks)
+ if (local_task_count >= 4 * remote_task_count or local_task_count <= 4 * remote_task_count
+ or local_task_count == 0):
+ print('Warning: this update is a large change. Press ^c within 5 seconds to abort')
+ time.sleep(5)
+
+ options = app.get_options()
+ config = get_job_config(job_spec, config_file, options)
+ api = make_client(config.cluster())
+ if not options.force:
+ warn_if_dangerous_change(api, job_spec, config)
+ resp = api.update_job(config, options.health_check_interval_seconds, options.shards)
+ check_and_log_response(resp)
+
+
+@app.command
+@app.command_option(CLUSTER_INVOKE_OPTION)
+@app.command_option(HEALTH_CHECK_INTERVAL_SECONDS_OPTION)
+@app.command_option(OPEN_BROWSER_OPTION)
+@app.command_option(SHARDS_OPTION)
+@app.command_option(
+ '--batch_size',
+ dest='batch_size',
+ type=int,
+ default=1,
+ help='Number of shards to be restarted in one iteration.')
+@app.command_option(
+ '--max_per_shard_failures',
+ dest='max_per_shard_failures',
+ type=int,
+ default=0,
+ help='Maximum number of restarts per shard during restart. Increments total failure count when '
+ 'this limit is exceeded.')
+@app.command_option(
+ '--max_total_failures',
+ dest='max_total_failures',
+ type=int,
+ default=0,
+ help='Maximum number of shard failures to be tolerated in total during restart.')
+@app.command_option(
+ '--restart_threshold',
+ dest='restart_threshold',
+ type=int,
+ default=60,
+ help='Maximum number of seconds before a shard must move into the RUNNING state before '
+ 'considered a failure.')
+@app.command_option(
+ '--watch_secs',
+ dest='watch_secs',
+ type=int,
+ default=30,
+ help='Minimum number of seconds a shard must remain in RUNNING state before considered a '
+ 'success.')
+def restart(args, options):
+ """usage: restart cluster/role/env/job
+ [--shards=SHARDS]
+ [--batch_size=INT]
+ [--updater_health_check_interval_seconds=SECONDS]
+ [--max_per_shard_failures=INT]
+ [--max_total_failures=INT]
+ [--restart_threshold=INT]
+ [--watch_secs=SECONDS]
+
+ Performs a rolling restart of shards within a job.
+
+ Restarts are fully controlled client-side, so aborting halts the restart.
+ """
+ api, job_key, config_file = LiveJobDisambiguator.disambiguate_args_or_die(
+ args, options, make_client_factory())
+ config = get_job_config(job_key.to_path(), config_file, options) if config_file else None
+ updater_config = UpdaterConfig(
+ options.batch_size,
+ options.restart_threshold,
+ options.watch_secs,
+ options.max_per_shard_failures,
+ options.max_total_failures)
+ resp = api.restart(job_key, options.shards, updater_config,
+ options.health_check_interval_seconds, config=config)
+ check_and_log_response(resp)
+ handle_open(api.scheduler.scheduler().url, job_key.role, job_key.env, job_key.name)
+
+
+@app.command
+@app.command_option(CLUSTER_INVOKE_OPTION)
+def cancel_update(args, options):
+ """usage: cancel_update cluster/role/env/job
+
+ Unlocks a job for updates.
+ A job may be locked if a client's update session terminated abnormally,
+ or if another user is actively updating the job. This command should only
+ be used when the user is confident that they are not conflicting with another user.
+ """
+ api, job_key, config_file = LiveJobDisambiguator.disambiguate_args_or_die(
+ args, options, make_client_factory())
+ config = get_job_config(job_key.to_path(), config_file, options) if config_file else None
+ resp = api.cancel_update(job_key, config=config)
+ check_and_log_response(resp)
+
+
+@app.command
+@app.command_option(CLUSTER_INVOKE_OPTION)
+@requires.exactly('role')
+def get_quota(role):
+ """usage: get_quota --cluster=CLUSTER role
+
+ Prints the production quota that has been allocated to a user.
+ """
+ options = app.get_options()
+ resp = make_client(options.cluster).get_quota(role)
+ quota = resp.result.getQuotaResult.quota
+
+ quota_fields = [
+ ('CPU', quota.numCpus),
+ ('RAM', '%f GB' % (float(quota.ramMb) / 1024)),
+ ('Disk', '%f GB' % (float(quota.diskMb) / 1024))
+ ]
+ log.info('Quota for %s:\n\t%s' %
+ (role, '\n\t'.join(['%s\t%s' % (k, v) for (k, v) in quota_fields])))
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/apache/aurora/client/commands/help.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/aurora/client/commands/help.py b/src/main/python/apache/aurora/client/commands/help.py
new file mode 100644
index 0000000..a74c607
--- /dev/null
+++ b/src/main/python/apache/aurora/client/commands/help.py
@@ -0,0 +1,53 @@
+from __future__ import print_function
+
+import collections
+import sys
+
+from twitter.aurora.client.base import die
+from twitter.common import app
+
+
+def make_commands_str(commands):
+ commands.sort()
+ if len(commands) == 1:
+ return str(commands[0])
+ elif len(commands) == 2:
+ return '%s (or %s)' % (str(commands[0]), str(commands[1]))
+ else:
+ return '%s (or any of: %s)' % (str(commands[0]), ' '.join(map(str, commands[1:])))
+
+
+def generate_full_usage():
+ docs_to_commands = collections.defaultdict(list)
+ for (command, doc) in app.get_commands_and_docstrings():
+ if doc is not None:
+ docs_to_commands[doc].append(command)
+ def make_docstring(item):
+ (doc_text, commands) = item
+ def format_line(line):
+ return ' %s\n' % line.lstrip()
+ stripped = ''.join(map(format_line, doc_text.splitlines()))
+ return '%s\n%s' % (make_commands_str(commands), stripped)
+ usage = sorted(map(make_docstring, docs_to_commands.items()))
+ return 'Available commands:\n\n' + '\n'.join(usage)
+
+
+@app.command
+def help(args):
+ """usage: help [subcommand]
+
+ Prints help for using the aurora client, or one of its specific subcommands.
+ """
+ if not args:
+ print(generate_full_usage())
+ sys.exit(0)
+
+ if len(args) > 1:
+ die('Please specify at most one subcommand.')
+
+ subcmd = args[0]
+ if subcmd in app.get_commands():
+ app.command_parser(subcmd).print_help()
+ else:
+ print('Subcommand %s not found.' % subcmd)
+ sys.exit(1)
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/apache/aurora/client/commands/run.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/aurora/client/commands/run.py b/src/main/python/apache/aurora/client/commands/run.py
new file mode 100644
index 0000000..494ce47
--- /dev/null
+++ b/src/main/python/apache/aurora/client/commands/run.py
@@ -0,0 +1,40 @@
+from twitter.common import app
+from twitter.aurora.client.base import die
+from twitter.aurora.client.options import (
+ EXECUTOR_SANDBOX_OPTION,
+ SSH_USER_OPTION,
+)
+from twitter.aurora.common.aurora_job_key import AuroraJobKey
+from twitter.aurora.common.clusters import CLUSTERS
+from twitter.aurora.client.api.command_runner import DistributedCommandRunner
+
+
+@app.command
+@app.command_option('-t', '--threads', type=int, default=1, dest='num_threads',
+ help='The number of threads to use.')
+@app.command_option(SSH_USER_OPTION)
+@app.command_option(EXECUTOR_SANDBOX_OPTION)
+def run(args, options):
+ """usage: run cluster/role/env/job cmd
+
+ Runs a shell command on all machines currently hosting shards of a single job.
+
+ This feature supports the same command line wildcards that are used to
+ populate a job's commands.
+
+ This means anything in the {{mesos.*}} and {{thermos.*}} namespaces.
+ """
+ # TODO(William Farner): Add support for invoking on individual shards.
+ # TODO(Kevin Sweeney): Restore the ability to run across jobs with globs (See MESOS-3010).
+ if not args:
+ die('job path is required')
+ job_path = args.pop(0)
+ try:
+ cluster_name, role, env, name = AuroraJobKey.from_path(job_path)
+ except AuroraJobKey.Error as e:
+ die('Invalid job path "%s": %s' % (job_path, e))
+
+ command = ' '.join(args)
+ cluster = CLUSTERS[cluster_name]
+ dcr = DistributedCommandRunner(cluster, role, env, [name], options.ssh_user)
+ dcr.run(command, parallelism=options.num_threads, executor_sandbox=options.executor_sandbox)
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/apache/aurora/client/commands/ssh.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/aurora/client/commands/ssh.py b/src/main/python/apache/aurora/client/commands/ssh.py
new file mode 100644
index 0000000..109229f
--- /dev/null
+++ b/src/main/python/apache/aurora/client/commands/ssh.py
@@ -0,0 +1,64 @@
+import subprocess
+
+from twitter.common import app
+from twitter.aurora.client.base import check_and_log_response, die
+from twitter.aurora.client.factory import make_client
+from twitter.aurora.client.options import (
+ EXECUTOR_SANDBOX_OPTION,
+ SSH_USER_OPTION,
+)
+from twitter.aurora.common.aurora_job_key import AuroraJobKey
+from twitter.aurora.client.api.command_runner import DistributedCommandRunner
+
+
+@app.command
+@app.command_option(EXECUTOR_SANDBOX_OPTION)
+@app.command_option(SSH_USER_OPTION)
+@app.command_option('-L', dest='tunnels', action='append', metavar='PORT:NAME',
+ default=[],
+ help="Add tunnel from local port PORT to remote named port NAME.")
+def ssh(args, options):
+ """usage: ssh cluster/role/env/job shard [args...]
+
+ Initiate an SSH session on the machine that a shard is running on.
+ """
+ if not args:
+ die('Job path is required')
+ job_path = args.pop(0)
+ try:
+ cluster_name, role, env, name = AuroraJobKey.from_path(job_path)
+ except AuroraJobKey.Error as e:
+ die('Invalid job path "%s": %s' % (job_path, e))
+ if not args:
+ die('Shard is required')
+ try:
+ shard = int(args.pop(0))
+ except ValueError:
+ die('Shard must be an integer')
+ api = make_client(cluster_name)
+ resp = api.query(api.build_query(role, name, set([int(shard)]), env=env))
+ check_and_log_response(resp)
+
+ first_task = resp.result.scheduleStatusResult.tasks[0]
+ remote_cmd = 'bash' if not args else ' '.join(args)
+ command = DistributedCommandRunner.substitute(remote_cmd, first_task,
+ api.cluster, executor_sandbox=options.executor_sandbox)
+
+ ssh_command = ['ssh', '-t']
+
+ role = first_task.assignedTask.task.owner.role
+ slave_host = first_task.assignedTask.slaveHost
+
+ for tunnel in options.tunnels:
+ try:
+ port, name = tunnel.split(':')
+ port = int(port)
+ except ValueError:
+ die('Could not parse tunnel: %s. Must be of form PORT:NAME' % tunnel)
+ if name not in first_task.assignedTask.assignedPorts:
+ die('Task %s has no port named %s' % (first_task.assignedTask.taskId, name))
+ ssh_command += [
+ '-L', '%d:%s:%d' % (port, slave_host, first_task.assignedTask.assignedPorts[name])]
+
+ ssh_command += ['%s@%s' % (options.ssh_user or role, slave_host), command]
+ return subprocess.call(ssh_command)
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/apache/aurora/client/config.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/aurora/client/config.py b/src/main/python/apache/aurora/client/config.py
new file mode 100644
index 0000000..32df4eb
--- /dev/null
+++ b/src/main/python/apache/aurora/client/config.py
@@ -0,0 +1,208 @@
+'''Library of utilities called by the mesos client binary
+'''
+
+from __future__ import print_function
+
+import functools
+import math
+import posixpath
+import re
+import sys
+
+from twitter.common import app, log
+
+from twitter.aurora.client import binding_helper
+from twitter.aurora.client.base import deprecation_warning, die
+from twitter.aurora.config import AuroraConfig
+from twitter.thermos.config.schema_helpers import Tasks
+
+from gen.twitter.aurora.constants import DEFAULT_ENVIRONMENT
+
+from pystachio import Empty, Ref
+
+
+APPAPP_DEPRECATION_WARNING = """
+The use of app-app is deprecated. Please reach out to mesos-team@twitter.com for advice on
+migrating your application away from app-app layouts to an alternative packaging solution.
+"""
+
+
+def _warn_on_appapp_layouts(config):
+ if config.raw().has_layout():
+ deprecation_warning(APPAPP_DEPRECATION_WARNING)
+
+
+CRON_DEPRECATION_WARNING = """
+The "cron_policy" parameter to Jobs has been renamed to "cron_collision_policy".
+Please update your Jobs accordingly.
+"""
+
+
+def _warn_on_deprecated_cron_policy(config):
+ if config.raw().cron_policy() is not Empty:
+ deprecation_warning(CRON_DEPRECATION_WARNING)
+
+
+DAEMON_DEPRECATION_WARNING = """
+The "daemon" parameter to Jobs is deprecated in favor of the "service" parameter.
+Please update your Job to set "service = True" instead of "daemon = True", or use
+the top-level Service() instead of Job().
+"""
+
+
+def _warn_on_deprecated_daemon_job(config):
+ if config.raw().daemon() is not Empty:
+ deprecation_warning(DAEMON_DEPRECATION_WARNING)
+
+
+HEALTH_CHECK_INTERVAL_SECS_DEPRECATION_WARNING = """
+The "health_check_interval_secs" parameter to Jobs is deprecated in favor of the
+"health_check_config" parameter. Please update your Job to set the parameter by creating a new
+HealthCheckConfig.
+
+See the HealthCheckConfig section of the Configuration Reference page for more information:
+http://go/auroraconfig/#Aurora%2BThermosConfigurationReference-HealthCheckConfig
+"""
+
+
+def _warn_on_deprecated_health_check_interval_secs(config):
+ if config.raw().health_check_interval_secs() is not Empty:
+ deprecation_warning(HEALTH_CHECK_INTERVAL_SECS_DEPRECATION_WARNING)
+
+
+ANNOUNCE_WARNING = """
+Announcer specified primary port as '%(primary_port)s' but no processes have bound that port.
+If you would like to utilize this port, you should listen on {{thermos.ports[%(primary_port)s]}}
+from some Process bound to your task.
+"""
+
+
+def _validate_announce_configuration(config):
+ if not config.raw().has_announce():
+ return
+
+ primary_port = config.raw().announce().primary_port().get()
+ if primary_port not in config.ports():
+ print(ANNOUNCE_WARNING % {'primary_port': primary_port}, file=sys.stderr)
+
+ if config.raw().has_announce() and not config.raw().has_constraints() or (
+ 'dedicated' not in config.raw().constraints()):
+ for port in config.raw().announce().portmap().get().values():
+ try:
+ port = int(port)
+ except ValueError:
+ continue
+ raise ValueError('Job must be dedicated in order to specify static ports!')
+
+
+STAGING_RE = re.compile(r'^staging\d*$')
+
+
+def _validate_environment_name(config):
+ env_name = str(config.raw().environment())
+ if STAGING_RE.match(env_name):
+ return
+ if env_name not in ('prod', 'devel', 'test'):
+ raise ValueError('Environment name should be one of "prod", "devel", "test" or '
+ 'staging<number>! Got %s' % env_name)
+
+
+UPDATE_CONFIG_MAX_FAILURES_ERROR = '''
+max_total_failures in update_config must be lesser than the job size.
+Based on your job size (%s) you should use max_total_failures <= %s.
+
+See http://go/auroraconfig for details.
+'''
+
+
+UPDATE_CONFIG_DEDICATED_THRESHOLD_ERROR = '''
+Since this is a dedicated job, you must set your max_total_failures in
+your update configuration to no less than 2%% of your job size.
+Based on your job size (%s) you should use max_total_failures >= %s.
+
+See http://go/auroraconfig for details.
+'''
+
+
+def _validate_update_config(config):
+ job_size = config.instances()
+ max_failures = config.update_config().max_total_failures().get()
+
+ if max_failures >= job_size:
+ die(UPDATE_CONFIG_MAX_FAILURES_ERROR % (job_size, job_size - 1))
+
+ if config.is_dedicated():
+ min_failure_threshold = int(math.floor(job_size * 0.02))
+ if max_failures < min_failure_threshold:
+ die(UPDATE_CONFIG_DEDICATED_THRESHOLD_ERROR % (job_size, min_failure_threshold))
+
+
+HEALTH_CHECK_INTERVAL_SECS_ERROR = '''
+health_check_interval_secs paramater to Job has been deprecated. Please specify health_check_config
+only.
+
+See http://go/auroraconfig/#Aurora%2BThermosConfigurationReference-HealthCheckConfig
+'''
+
+
+def _validate_health_check_config(config):
+ # TODO(Sathya): Remove this check after health_check_interval_secs deprecation cycle is complete.
+ if config.raw().has_health_check_interval_secs() and config.raw().has_health_check_config():
+ die(HEALTH_CHECK_INTERVAL_SECS_ERROR)
+
+
+DEFAULT_ENVIRONMENT_WARNING = '''
+Job did not specify environment, auto-populating to "%s".
+'''
+
+
+def _inject_default_environment(config):
+ if not config.raw().has_environment():
+ print(DEFAULT_ENVIRONMENT_WARNING % DEFAULT_ENVIRONMENT, file=sys.stderr)
+ config.update_job(config.raw()(environment=DEFAULT_ENVIRONMENT))
+
+
+def validate_config(config, env=None):
+ _validate_update_config(config)
+ _validate_health_check_config(config)
+ _validate_announce_configuration(config)
+ _validate_environment_name(config)
+
+
+def populate_namespaces(config, env=None):
+ _inject_default_environment(config)
+ _warn_on_deprecated_cron_policy(config)
+ _warn_on_deprecated_daemon_job(config)
+ _warn_on_deprecated_health_check_interval_secs(config)
+ _warn_on_appapp_layouts(config)
+ return config
+
+
+def inject_hooks(config, env=None):
+ config.hooks = (env or {}).get('hooks', [])
+
+
+class AnnotatedAuroraConfig(AuroraConfig):
+ @classmethod
+ def plugins(cls):
+ return (inject_hooks,
+ functools.partial(binding_helper.apply_all),
+ functools.partial(populate_namespaces),
+ validate_config)
+
+
+def get_config(jobname,
+ config_file,
+ json=False,
+ bindings=(),
+ select_cluster=None,
+ select_role=None,
+ select_env=None):
+ """Creates and returns a config object contained in the provided file."""
+ loader = AnnotatedAuroraConfig.load_json if json else AnnotatedAuroraConfig.load
+ return loader(config_file,
+ jobname,
+ bindings,
+ select_cluster=select_cluster,
+ select_role=select_role,
+ select_env=select_env)
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/apache/aurora/client/factory.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/aurora/client/factory.py b/src/main/python/apache/aurora/client/factory.py
new file mode 100644
index 0000000..7a44e3b
--- /dev/null
+++ b/src/main/python/apache/aurora/client/factory.py
@@ -0,0 +1,27 @@
+import functools
+
+from twitter.aurora.client.hooks.hooked_api import HookedAuroraClientAPI
+from twitter.aurora.common.cluster import Cluster
+from twitter.aurora.common.clusters import CLUSTERS
+from twitter.common import app
+
+from .base import die
+
+
+# TODO(wickman) Kill make_client and make_client_factory as part of MESOS-3801.
+# These are currently necessary indirections for the LiveJobDisambiguator among
+# other things but can go away once those are scrubbed.
+
+def make_client_factory():
+ verbose = getattr(app.get_options(), 'verbosity', 'normal') == 'verbose'
+ class TwitterAuroraClientAPI(HookedAuroraClientAPI):
+ def __init__(self, cluster, *args, **kw):
+ if cluster not in CLUSTERS:
+ die('Unknown cluster: %s' % cluster)
+ super(TwitterAuroraClientAPI, self).__init__(CLUSTERS[cluster], *args, **kw)
+ return functools.partial(TwitterAuroraClientAPI, verbose=verbose)
+
+
+def make_client(cluster):
+ factory = make_client_factory()
+ return factory(cluster.name if isinstance(cluster, Cluster) else cluster)
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/apache/aurora/client/hooks/BUILD
----------------------------------------------------------------------
diff --git a/src/main/python/apache/aurora/client/hooks/BUILD b/src/main/python/apache/aurora/client/hooks/BUILD
new file mode 100644
index 0000000..c3d0a1a
--- /dev/null
+++ b/src/main/python/apache/aurora/client/hooks/BUILD
@@ -0,0 +1,10 @@
+python_library(
+ name = 'hooks',
+ sources = ['__init__.py', 'hooked_api.py'],
+ dependencies = [
+ pants('aurora/twitterdeps/src/python/twitter/common/log'),
+ pants('src/main/python/twitter/aurora/client:api'),
+ pants('src/main/python/twitter/aurora/common:aurora_job_key'),
+ pants('src/main/thrift/com/twitter/aurora/gen:py-thrift'),
+ ]
+)
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/python/apache/aurora/client/hooks/__init__.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/aurora/client/hooks/__init__.py b/src/main/python/apache/aurora/client/hooks/__init__.py
new file mode 100644
index 0000000..3091f67
--- /dev/null
+++ b/src/main/python/apache/aurora/client/hooks/__init__.py
@@ -0,0 +1,49 @@
+"""
+A hooks implementation for the Aurora client.
+
+The Hook protocol is the following:
+ Any object may be passed in as a hook.
+
+ If the object has pre_<api method name> defined that is callable, it will be called with:
+ method(*args, **kw)
+
+ where *args and **kw are the arguments and keyword arguments passed into
+ the original APi call. This is done prior to the invocation of the API
+ call. If this method returns Falsy, the API call will be aborted.
+
+ If the object has an err_<api method name> defined that is callable, it will be called with:
+ method(exc, *args, **kw)
+
+ If the object has a post_<api method name> defined that is callable, it will be called with:
+ method(result, *args, **kw)
+
+ These methods are called after the respective API call has been made. The
+ return codes of err and post methods are ignored.
+
+If the object does not have any of these attributes, it will instead delegate to the
+'generic_hook' method, if available. The method signature for generic_hook is:
+
+ generic_hook(hook_config, event, method_name, result_or_err, args, kw)
+
+Where hook_config is a namedtuple of 'config' and 'job_key', event is one of
+'pre', 'err', 'post', method_name is the API method name, and args, kw are
+the arguments / keyword arguments. result_or_err is a tri_state:
+ - None for pre hooks
+ - result for post hooks
+ - exc for err hooks
+
+Examples:
+
+ class Logger(object):
+ '''Just logs every at all point for all API calls'''
+ def generic_hook(self, hook_config, event, method_name, result_or_err, *args, **kw)
+ log.info('%s: %s_%s of %s' % (self.__class__.__name__, event, method_name, job_key))
+
+ class KillConfirmer(object):
+ def confirm(self, msg):
+ return True if raw_input(msg).lower() == 'yes' else False
+
+ def pre_kill(self, job_key, shards=None):
+ shards = ('shards %s' % shards) if shards is not None else 'all shards'
+ return self.confirm('Are you sure you want to kill %s? (yes/no): ' % (job_key, shards))
+"""