You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aurora.apache.org by mc...@apache.org on 2014/03/26 00:28:59 UTC
git commit: Add batching to the kill and killall commands.
Repository: incubator-aurora
Updated Branches:
refs/heads/master a4d30e8e5 -> 77db3e541
Add batching to the kill and killall commands.
Bugs closed: aurora-283
Reviewed at https://reviews.apache.org/r/19466/
Project: http://git-wip-us.apache.org/repos/asf/incubator-aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-aurora/commit/77db3e54
Tree: http://git-wip-us.apache.org/repos/asf/incubator-aurora/tree/77db3e54
Diff: http://git-wip-us.apache.org/repos/asf/incubator-aurora/diff/77db3e54
Branch: refs/heads/master
Commit: 77db3e5416eefd947090a8e9ec290c6744029a1f
Parents: a4d30e8
Author: Mark Chu-Carroll <mc...@twopensource.com>
Authored: Tue Mar 25 19:24:57 2014 -0400
Committer: Mark Chu-Carroll <mc...@twitter.com>
Committed: Tue Mar 25 19:24:57 2014 -0400
----------------------------------------------------------------------
.../python/apache/aurora/client/cli/context.py | 1 +
.../python/apache/aurora/client/cli/jobs.py | 131 ++++++++++++++----
.../python/apache/aurora/client/cli/options.py | 44 +++++-
.../python/apache/aurora/client/cli/task.py | 1 -
.../apache/aurora/client/cli/test_kill.py | 134 +++++++++++++++++--
.../apache/aurora/client/cli/test_restart.py | 18 ---
.../python/apache/aurora/client/cli/util.py | 30 +++++
7 files changed, 303 insertions(+), 56 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/77db3e54/src/main/python/apache/aurora/client/cli/context.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/aurora/client/cli/context.py b/src/main/python/apache/aurora/client/cli/context.py
index c07f54a..83f7b6a 100644
--- a/src/main/python/apache/aurora/client/cli/context.py
+++ b/src/main/python/apache/aurora/client/cli/context.py
@@ -150,6 +150,7 @@ class AuroraCommandContext(Context):
return jobs
def get_job_status(self, key):
+ """Returns a list of task instances running under the job."""
api = self.get_api(key.cluster)
resp = api.check_status(key)
if resp.responseCode is not ResponseCode.OK:
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/77db3e54/src/main/python/apache/aurora/client/cli/jobs.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/aurora/client/cli/jobs.py b/src/main/python/apache/aurora/client/cli/jobs.py
index 290a8ba..e354ee1 100644
--- a/src/main/python/apache/aurora/client/cli/jobs.py
+++ b/src/main/python/apache/aurora/client/cli/jobs.py
@@ -36,6 +36,7 @@ from apache.aurora.client.cli import (
)
from apache.aurora.client.cli.context import AuroraCommandContext
from apache.aurora.client.cli.options import (
+ ALL_INSTANCES,
BATCH_OPTION,
BIND_OPTION,
BROWSER_OPTION,
@@ -43,10 +44,12 @@ from apache.aurora.client.cli.options import (
CONFIG_ARGUMENT,
FORCE_OPTION,
HEALTHCHECK_OPTION,
- INSTANCES_OPTION,
+ INSTANCES_SPEC_ARGUMENT,
JOBSPEC_ARGUMENT,
JSON_READ_OPTION,
JSON_WRITE_OPTION,
+ MAX_TOTAL_FAILURES_OPTION,
+ NO_BATCHING_OPTION,
WATCH_OPTION,
)
from apache.aurora.common.aurora_job_key import AuroraJobKey
@@ -273,33 +276,103 @@ the parsed configuration."""
return EXIT_OK
-class KillJobCommand(Verb):
+class AbstractKillCommand(Verb):
+ def get_options(self):
+ return [BROWSER_OPTION,
+ CommandOption('--config', type=str, default=None, dest='config',
+ help='Config file for the job, possibly containing hooks'),
+ BATCH_OPTION,
+ MAX_TOTAL_FAILURES_OPTION,
+ NO_BATCHING_OPTION]
+
+ def kill_in_batches(self, context, job, instances_arg):
+ api = context.get_api(job.cluster)
+ # query the job, to get the list of active instances.
+ tasks = context.get_job_status(job)
+ if tasks is None or len(tasks) == 0:
+ context.print_err('No tasks to kill found for job %s' % job)
+ return EXIT_INVALID_PARAMETER
+ instance_ids = set(instance.assignedTask.instanceId for instance in tasks)
+ # intersect that with the set of shards specified by the user.
+ instances_to_kill = (instance_ids & set(instances_arg) if instances_arg is not None
+ else instance_ids)
+ # kill the shard batches.
+ errors = 0
+ while len(instances_to_kill) > 0:
+ batch = []
+ for i in range(min(context.options.batch_size, len(instances_to_kill))):
+ batch.append(instances_to_kill.pop())
+ resp = api.kill_job(job, batch)
+ if resp.responseCode is not ResponseCode.OK:
+ resp.print_log('Kill of shards %s failed with error %s' % (batch, resp.message))
+ errors += 1
+ if errors > context.options.max_total_failures:
+ raise context.CommandError(EXIT_COMMAND_FAILURE,
+ 'Exceeded maximum number of errors while killing instances')
+ if errors > 0:
+ context.print_err('Warning: Errors occurred during batch kill')
+ raise context.CommandError(EXIT_COMMAND_FAILURE, "Errors occurred while killing instances")
+
+
+class KillCommand(AbstractKillCommand):
@property
def name(self):
return 'kill'
@property
def help(self):
- return "Kill a scheduled job"
+ return "Kill instances in a scheduled job"
def get_options(self):
- return [BROWSER_OPTION, INSTANCES_OPTION,
- CommandOption('--config', type=str, default=None, dest='config',
- help='Config file for the job, possibly containing hooks'),
- JOBSPEC_ARGUMENT]
+ return super(KillCommand, self).get_options() + [INSTANCES_SPEC_ARGUMENT]
def execute(self, context):
- # TODO(mchucarroll): Check for wildcards; we don't allow wildcards for job kill.
- api = context.get_api(context.options.jobspec.cluster)
- resp = api.kill_job(context.options.jobspec, context.options.instances)
- if resp.responseCode != ResponseCode.OK:
- context.print_err('Job %s not found' % context.options.jobspec, file=sys.stderr)
- return EXIT_INVALID_PARAMETER
+ job = context.options.instance_spec.jobkey
+ instances_arg = context.options.instance_spec.instance
+ if instances_arg == ALL_INSTANCES:
+ raise context.CommandError(EXIT_INVALID_PARAMETER,
+ 'The instances list cannot be omitted in a kill command!; '
+ 'use killall to kill all instances')
+ api = context.get_api(job.cluster)
+ if context.options.no_batching:
+ resp = api.kill_job(job, instances_arg)
+ if resp.responseCode != ResponseCode.OK:
+ context.print_err('Job %s not found' % job, file=sys.stderr)
+ return EXIT_INVALID_PARAMETER
+ else:
+ self.kill_in_batches(context, job, instances_arg)
if context.options.open_browser:
context.open_job_page(api, context.options.jobspec)
return EXIT_OK
+class KillAllJobCommand(AbstractKillCommand):
+ @property
+ def name(self):
+ return 'killall'
+
+ @property
+ def help(self):
+ return "Kill all instances of a scheduled job"
+
+ def get_options(self):
+ return super(KillAllJobCommand, self).get_options() + [JOBSPEC_ARGUMENT]
+
+ def execute(self, context):
+ job = context.options.jobspec
+ api = context.get_api(job.cluster)
+ if context.options.no_batching:
+ resp = api.kill_job(job, None)
+ if resp.responseCode != ResponseCode.OK:
+ context.print_err('Job %s not found' % job, file=sys.stderr)
+ return EXIT_INVALID_PARAMETER
+ else:
+ self.kill_in_batches(context, job, None)
+ if context.options.open_browser:
+ context.open_job_page(api, job)
+ return EXIT_OK
+
+
class ListJobsCommand(Verb):
@property
@@ -329,18 +402,17 @@ class RestartCommand(Verb):
def get_options(self):
return [BATCH_OPTION, BIND_OPTION, BROWSER_OPTION, FORCE_OPTION, HEALTHCHECK_OPTION,
- INSTANCES_OPTION, JSON_READ_OPTION, WATCH_OPTION,
+ JSON_READ_OPTION, WATCH_OPTION,
CommandOption('--max-per-instance-failures', type=int, default=0,
help='Maximum number of restarts per instance during restart. Increments total failure '
'count when this limit is exceeded.'),
CommandOption('--restart-threshold', type=int, default=60,
help='Maximum number of seconds before a shard must move into the RUNNING state '
'before considered a failure.'),
- CommandOption('--max-total-failures', type=int, default=0,
- help='Maximum number of instance failures to be tolerated in total during restart.'),
+ MAX_TOTAL_FAILURES_OPTION,
CommandOption('--rollback-on-failure', default=True, action='store_false',
help='If false, prevent update from performing a rollback.'),
- JOBSPEC_ARGUMENT, CONFIG_ARGUMENT]
+ INSTANCES_SPEC_ARGUMENT, CONFIG_ARGUMENT]
@property
def help(self):
@@ -348,8 +420,11 @@ class RestartCommand(Verb):
Restarts are fully controlled client-side, so aborting halts the restart."""
def execute(self, context):
- api = context.get_api(context.options.jobspec.cluster)
- config = (context.get_job_config(context.options.jobspec, context.options.config_file)
+ job = context.options.instance_spec.jobkey
+ instances = (None if context.options.instance_spec.instance == ALL_INSTANCES else
+ context.options.instance_spec.instance)
+ api = context.get_api(job.cluster)
+ config = (context.get_job_config(job, context.options.config_file)
if context.options.config_file else None)
updater_config = UpdaterConfig(
context.options.batch_size,
@@ -358,7 +433,7 @@ Restarts are fully controlled client-side, so aborting halts the restart."""
context.options.max_per_instance_failures,
context.options.max_total_failures,
context.options.rollback_on_failure)
- resp = api.restart(context.options.jobspec, context.options.instances, updater_config,
+ resp = api.restart(job, instances, updater_config,
context.options.healthcheck_interval_seconds, config=config)
context.check_and_log_response(resp)
@@ -460,8 +535,8 @@ class UpdateCommand(Verb):
return 'update'
def get_options(self):
- return [FORCE_OPTION, BIND_OPTION, JSON_READ_OPTION, INSTANCES_OPTION, HEALTHCHECK_OPTION,
- JOBSPEC_ARGUMENT, CONFIG_ARGUMENT]
+ return [FORCE_OPTION, BIND_OPTION, JSON_READ_OPTION, HEALTHCHECK_OPTION,
+ INSTANCES_SPEC_ARGUMENT, CONFIG_ARGUMENT]
@property
def help(self):
@@ -505,12 +580,15 @@ to preview what changes will take effect.
time.sleep(5)
def execute(self, context):
- config = context.get_job_config(context.options.jobspec, context.options.config_file)
+ job = context.options.instance_spec.jobkey
+ instances = (None if context.options.instance_spec.instance == ALL_INSTANCES else
+ context.options.instance_spec.instance)
+ config = context.get_job_config(job, context.options.config_file)
api = context.get_api(config.cluster())
if not context.options.force:
- self.warn_if_dangerous_change(context, api, context.options.jobspec, config)
+ self.warn_if_dangerous_change(context, api, job, config)
resp = api.update_job(config, context.options.healthcheck_interval_seconds,
- context.options.instances)
+ instances)
if resp.responseCode != ResponseCode.OK:
raise context.CommandError(EXIT_COMMAND_FAILURE, 'Update failed: %s' % resp.message)
return EXIT_OK
@@ -535,7 +613,8 @@ class Job(Noun):
self.register_verb(CreateJobCommand())
self.register_verb(DiffCommand())
self.register_verb(InspectCommand())
- self.register_verb(KillJobCommand())
+ self.register_verb(KillCommand())
+ self.register_verb(KillAllJobCommand())
self.register_verb(ListJobsCommand())
self.register_verb(RestartCommand())
self.register_verb(StatusCommand())
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/77db3e54/src/main/python/apache/aurora/client/cli/options.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/aurora/client/cli/options.py b/src/main/python/apache/aurora/client/cli/options.py
index 1f33ea2..040c5c2 100644
--- a/src/main/python/apache/aurora/client/cli/options.py
+++ b/src/main/python/apache/aurora/client/cli/options.py
@@ -41,9 +41,11 @@ class CommandOption(object):
self.type = kwargs.get('type')
self.help = kwargs.get('help', '')
+
def is_mandatory(self):
return self.kwargs.get('required', not self.name.startswith('--'))
+
def get_displayname(self):
"""Get a display name for a the expected format of a parameter value"""
if 'metavar' in self.kwargs:
@@ -58,6 +60,7 @@ class CommandOption(object):
displayname = "value"
return displayname
+
def render_usage(self):
"""Create a usage string for this option"""
if not self.name.startswith('--'):
@@ -72,6 +75,7 @@ class CommandOption(object):
else:
return "[%s=%s]" % (self.name, self.get_displayname())
+
def render_help(self):
"""Render a full help message for this option"""
result = ""
@@ -98,14 +102,20 @@ def parse_qualified_role(rolestr):
raise ArgumentTypeError('Role argument must be a CLUSTER/NAME pair')
return role_parts
+
+ALL_INSTANCES = 'all'
+
+
def parse_instances(instances):
- """Parse lists of instances or instance ranges into a set().
+ """Parse lists of instances or instance ranges into a set(). This accepts a comma-separated
+ list of instances.
+
Examples:
0-2
0,1-3,5
1,3,5
"""
- if instances is None or instances == '':
+ if instances is None or instances == "":
return None
result = set()
for part in instances.split(','):
@@ -113,6 +123,7 @@ def parse_instances(instances):
result.update(range(int(x[0]), int(x[-1]) + 1))
return sorted(result)
+
def parse_time_values(time_values):
"""Parse lists of discrete time values. Every value must be in the following format: XdYhZmWs.
Examples:
@@ -124,6 +135,7 @@ def parse_time_values(time_values):
except ValueError as e:
raise ArgumentTypeError(e)
+
def parse_percentiles(percentiles):
"""Parse lists of percentile values in (0,100) range.
Examples:
@@ -145,6 +157,7 @@ def parse_percentiles(percentiles):
TaskInstanceKey = namedtuple('TaskInstanceKey', [ 'jobkey', 'instance' ])
+
def parse_task_instance_key(key):
pieces = key.split('/')
if len(pieces) != 5:
@@ -158,6 +171,21 @@ def parse_task_instance_key(key):
return TaskInstanceKey(AuroraJobKey(cluster, role, env, name), instance)
+def instance_specifier(spec_str):
+ if spec_str is None or spec_str == '':
+ raise ValueError('Instance specifier must be non-empty')
+ parts = spec_str.split('/')
+ if len(parts) == 4:
+ jobkey = AuroraJobKey(*parts)
+ return TaskInstanceKey(jobkey, ALL_INSTANCES)
+ elif len(parts) != 5:
+ raise ArgumentTypeError('Instance specifier must be a CLUSTER/ROLE/ENV/JOB/INSTANCES tuple')
+ (cluster, role, env, name, instance_str) = parts
+ jobkey = AuroraJobKey(cluster, role, env, name)
+ instances = parse_instances(instance_str)
+ return TaskInstanceKey(jobkey, instances)
+
+
BATCH_OPTION = CommandOption('--batch-size', type=int, default=5,
help='Number of instances to be operate on in one iteration')
@@ -197,6 +225,11 @@ INSTANCES_OPTION = CommandOption('--instances', type=parse_instances, dest='inst
'or a range (e.g. 0-2) or any combination of the two (e.g. 0-2,5,7-9). If not set, '
'all instances will be acted on.')
+INSTANCES_SPEC_ARGUMENT = CommandOption('instance_spec', type=instance_specifier,
+ default=None, metavar="CLUSTER/ROLE/ENV/NAME[/INSTANCES]",
+ help=('Fully specified job instance key, in CLUSTER/ROLE/ENV/NAME[/INSTANCES] format. '
+ 'If INSTANCES is omitted, then all instances will be operated on.'))
+
JOBSPEC_ARGUMENT = CommandOption('jobspec', type=AuroraJobKey.from_path,
metavar="CLUSTER/ROLE/ENV/NAME",
@@ -212,6 +245,13 @@ JSON_WRITE_OPTION = CommandOption('--write-json', default=False, dest='write_jso
action='store_true',
help='Generate command output in JSON format')
+MAX_TOTAL_FAILURES_OPTION = CommandOption('--max-total-failures', type=int, default=0,
+ help='Maximum number of instance failures to be tolerated in total before aborting.')
+
+
+NO_BATCHING_OPTION = CommandOption('--no-batching', default=False, action='store_true',
+ help='Run the command on all instances at once, instead of running in batches')
+
ROLE_ARGUMENT = CommandOption('role', type=parse_qualified_role, metavar='CLUSTER/NAME',
help='Rolename to retrieve information about')
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/77db3e54/src/main/python/apache/aurora/client/cli/task.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/aurora/client/cli/task.py b/src/main/python/apache/aurora/client/cli/task.py
index 8d4d38e..e4ee77e 100644
--- a/src/main/python/apache/aurora/client/cli/task.py
+++ b/src/main/python/apache/aurora/client/cli/task.py
@@ -47,7 +47,6 @@ from apache.aurora.client.cli.options import (
EXECUTOR_SANDBOX_OPTION,
FORCE_OPTION,
HEALTHCHECK_OPTION,
- INSTANCES_OPTION,
JOBSPEC_ARGUMENT,
JSON_READ_OPTION,
JSON_WRITE_OPTION,
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/77db3e54/src/test/python/apache/aurora/client/cli/test_kill.py
----------------------------------------------------------------------
diff --git a/src/test/python/apache/aurora/client/cli/test_kill.py b/src/test/python/apache/aurora/client/cli/test_kill.py
index 6040ed4..2088160 100644
--- a/src/test/python/apache/aurora/client/cli/test_kill.py
+++ b/src/test/python/apache/aurora/client/cli/test_kill.py
@@ -20,14 +20,15 @@ import unittest
from twitter.common.contextutil import temporary_file
from apache.aurora.client.cli.client import AuroraCommandLine
+from apache.aurora.client.cli.options import parse_instances
+from apache.aurora.client.cli.util import AuroraClientCommandTest, FakeAuroraCommandContext
from apache.aurora.client.hooks.hooked_api import HookedAuroraClientAPI
from apache.aurora.common.aurora_job_key import AuroraJobKey
from twitter.common.contextutil import temporary_file
-from apache.aurora.client.cli.options import parse_instances
-from apache.aurora.client.cli.util import AuroraClientCommandTest, FakeAuroraCommandContext
from gen.apache.aurora.ttypes import (
Identity,
+ ScheduleStatusResult,
TaskQuery,
)
@@ -54,7 +55,7 @@ class TestClientKillCommand(AuroraClientCommandTest):
def assert_kill_job_called(cls, mock_api):
assert mock_api.kill_job.call_count == 1
- def test_kill_job(self):
+ def test_killall_job(self):
"""Test kill client-side API logic."""
mock_context = FakeAuroraCommandContext()
mock_scheduler_proxy = Mock()
@@ -63,38 +64,153 @@ class TestClientKillCommand(AuroraClientCommandTest):
patch('apache.aurora.client.factory.CLUSTERS', new=self.TEST_CLUSTERS)):
api = mock_context.get_api('west')
+ mock_scheduler_proxy.getTasksStatus.return_value = self.create_status_call_result()
api.kill_job.return_value = self.get_kill_job_response()
mock_scheduler_proxy.killTasks.return_value = self.get_kill_job_response()
with temporary_file() as fp:
fp.write(self.get_valid_config())
fp.flush()
cmd = AuroraCommandLine()
- cmd.execute(['job', 'kill', '--config=%s' % fp.name, 'west/bozo/test/hello'])
+ cmd.execute(['job', 'killall', '--no-batching', '--config=%s' % fp.name, 'west/bozo/test/hello'])
# Now check that the right API calls got made.
assert api.kill_job.call_count == 1
api.kill_job.assert_called_with(AuroraJobKey.from_path('west/bozo/test/hello'), None)
- def test_kill_job_with_instances(self):
+ def test_killall_job(self):
+ """Test kill client-side API logic."""
+ mock_context = FakeAuroraCommandContext()
+ mock_scheduler_proxy = Mock()
+ with contextlib.nested(
+ patch('apache.aurora.client.cli.jobs.Job.create_context', return_value=mock_context),
+ patch('apache.aurora.client.factory.CLUSTERS', new=self.TEST_CLUSTERS)):
+
+ api = mock_context.get_api('west')
+ api.kill_job.return_value = self.get_kill_job_response()
+ mock_context.add_expected_status_query_result(self.create_status_call_result())
+ mock_scheduler_proxy.killTasks.return_value = self.get_kill_job_response()
+ with temporary_file() as fp:
+ fp.write(self.get_valid_config())
+ fp.flush()
+ cmd = AuroraCommandLine()
+ cmd.execute(['job', 'killall', '--config=%s' % fp.name, 'west/bozo/test/hello'])
+
+ # Now check that the right API calls got made.
+ assert api.kill_job.call_count == 4
+ api.kill_job.assert_called_with(AuroraJobKey.from_path('west/bozo/test/hello'), [15, 16, 17, 18, 19])
+
+ def test_kill_job_with_instances_nobatching(self):
"""Test kill client-side API logic."""
mock_context = FakeAuroraCommandContext()
with contextlib.nested(
patch('apache.aurora.client.cli.jobs.Job.create_context', return_value=mock_context),
patch('apache.aurora.client.factory.CLUSTERS', new=self.TEST_CLUSTERS)):
api = mock_context.get_api('west')
+ self.setup_get_tasks_status_calls(api.scheduler_proxy)
api.kill_job.return_value = self.get_kill_job_response()
with temporary_file() as fp:
fp.write(self.get_valid_config())
fp.flush()
cmd = AuroraCommandLine()
- cmd.execute(['job', 'kill', '--config=%s' % fp.name, '--instances=0,2,4-6',
- 'west/bozo/test/hello'])
+ cmd.execute(['job', 'kill', '--config=%s' % fp.name, '--no-batching', 'west/bozo/test/hello/0,2,4-6'])
# Now check that the right API calls got made.
assert api.kill_job.call_count == 1
api.kill_job.assert_called_with(AuroraJobKey.from_path('west/bozo/test/hello'),
[0, 2, 4, 5, 6])
+ def test_kill_job_with_instances_batched(self):
+ """Test kill client-side API logic."""
+ mock_context = FakeAuroraCommandContext()
+ with contextlib.nested(
+ patch('apache.aurora.client.cli.jobs.Job.create_context', return_value=mock_context),
+ patch('apache.aurora.client.factory.CLUSTERS', new=self.TEST_CLUSTERS)):
+ api = mock_context.get_api('west')
+ status_result = self.create_status_call_result()
+ mock_context.add_expected_status_query_result(status_result)
+ api.kill_job.return_value = self.get_kill_job_response()
+ with temporary_file() as fp:
+ fp.write(self.get_valid_config())
+ fp.flush()
+ cmd = AuroraCommandLine()
+ cmd.execute(['job', 'kill', '--config=%s' % fp.name, 'west/bozo/test/hello/0,2,4-6'])
+
+ # Now check that the right API calls got made.
+ assert api.kill_job.call_count == 1
+ api.kill_job.assert_called_with(AuroraJobKey.from_path('west/bozo/test/hello'),
+ [0, 2, 4, 5, 6])
+
+ def test_kill_job_with_instances_batched_large(self):
+ """Test kill client-side API logic."""
+ mock_context = FakeAuroraCommandContext()
+ with contextlib.nested(
+ patch('apache.aurora.client.cli.jobs.Job.create_context', return_value=mock_context),
+ patch('apache.aurora.client.factory.CLUSTERS', new=self.TEST_CLUSTERS)):
+ api = mock_context.get_api('west')
+ status_result = self.create_status_call_result()
+ mock_context.add_expected_status_query_result(status_result)
+ api.kill_job.return_value = self.get_kill_job_response()
+ with temporary_file() as fp:
+ fp.write(self.get_valid_config())
+ fp.flush()
+ cmd = AuroraCommandLine()
+ cmd.execute(['job', 'kill', '--config=%s' % fp.name, 'west/bozo/test/hello/0,2,4-13'])
+
+ # Now check that the right API calls got made.
+ assert api.kill_job.call_count == 3
+ api.kill_job.assert_called_with(AuroraJobKey.from_path('west/bozo/test/hello'),
+ [12, 13])
+
+ def test_kill_job_with_instances_batched_maxerrors(self):
+ """Test kill client-side API logic."""
+ mock_context = FakeAuroraCommandContext()
+ with contextlib.nested(
+ patch('apache.aurora.client.cli.jobs.Job.create_context', return_value=mock_context),
+ patch('apache.aurora.client.factory.CLUSTERS', new=self.TEST_CLUSTERS)):
+ api = mock_context.get_api('west')
+ status_result = self.create_status_call_result()
+ failed_status_result = self.create_error_response()
+ mock_context.add_expected_status_query_result(status_result)
+ mock_context.add_expected_status_query_result(failed_status_result)
+ mock_context.add_expected_status_query_result(failed_status_result)
+ mock_context.add_expected_status_query_result(status_result)
+ mock_context.add_expected_status_query_result(status_result)
+ api.kill_job.return_value = self.get_kill_job_response()
+ with temporary_file() as fp:
+ fp.write(self.get_valid_config())
+ fp.flush()
+ cmd = AuroraCommandLine()
+ cmd.execute(['job', 'kill', '--max-total-failures=1', '--config=%s' % fp.name, 'west/bozo/test/hello/0,2,4-13'])
+
+ # Now check that the right API calls got made. We should have aborted after the third batch.
+ assert api.kill_job.call_count == 3
+
+
+ def test_kill_job_with_empty_instances_batched(self):
+ """Test kill client-side API logic."""
+ mock_context = FakeAuroraCommandContext()
+ with contextlib.nested(
+ patch('apache.aurora.client.cli.jobs.Job.create_context', return_value=mock_context),
+ patch('apache.aurora.client.factory.CLUSTERS', new=self.TEST_CLUSTERS)):
+ api = mock_context.get_api('west')
+ # set up an empty instance list in the getTasksStatus response
+ status_response = self.create_simple_success_response()
+ schedule_status = Mock(spec=ScheduleStatusResult)
+ status_response.result.scheduleStatusResult = schedule_status
+ mock_task_config = Mock()
+ schedule_status.tasks = []
+ mock_context.add_expected_status_query_result(status_response)
+ api.kill_job.return_value = self.get_kill_job_response()
+ with temporary_file() as fp:
+ fp.write(self.get_valid_config())
+ fp.flush()
+ cmd = AuroraCommandLine()
+ cmd.execute(['job', 'kill', '--config=%s' % fp.name, 'west/bozo/test/hello/0,2,4-13'])
+
+ # Now check that the right API calls got made.
+ assert api.kill_job.call_count == 0
+
+
def test_kill_job_with_instances_deep_api(self):
"""Test kill client-side API logic."""
(mock_api, mock_scheduler_proxy) = self.create_mock_api()
@@ -102,12 +218,12 @@ class TestClientKillCommand(AuroraClientCommandTest):
patch('apache.aurora.client.api.SchedulerProxy', return_value=mock_scheduler_proxy),
patch('apache.aurora.client.factory.CLUSTERS', new=self.TEST_CLUSTERS)):
mock_scheduler_proxy.killTasks.return_value = self.get_kill_job_response()
+ self.setup_get_tasks_status_calls(mock_scheduler_proxy)
with temporary_file() as fp:
fp.write(self.get_valid_config())
fp.flush()
cmd = AuroraCommandLine()
- cmd.execute(['job', 'kill', '--config=%s' % fp.name, '--instances=0,2,4-6',
- 'west/bozo/test/hello'])
+ cmd.execute(['job', 'kill', '--config=%s' % fp.name, 'west/bozo/test/hello/0,2,4-6'])
# Now check that the right API calls got made.
assert mock_scheduler_proxy.killTasks.call_count == 1
mock_scheduler_proxy.killTasks.assert_called_with(
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/77db3e54/src/test/python/apache/aurora/client/cli/test_restart.py
----------------------------------------------------------------------
diff --git a/src/test/python/apache/aurora/client/cli/test_restart.py b/src/test/python/apache/aurora/client/cli/test_restart.py
index 7547bd7..6bcdd6a 100644
--- a/src/test/python/apache/aurora/client/cli/test_restart.py
+++ b/src/test/python/apache/aurora/client/cli/test_restart.py
@@ -39,24 +39,6 @@ class TestRestartCommand(AuroraClientCommandTest):
populate.result.populateJobResult.populated = set(configs)
return populate
- @classmethod
- def setup_get_tasks_status_calls(cls, scheduler):
- status_response = cls.create_simple_success_response()
- scheduler.getTasksStatus.return_value = status_response
- schedule_status = Mock(spec=ScheduleStatusResult)
- status_response.result.scheduleStatusResult = schedule_status
- mock_task_config = Mock()
- # This should be a list of ScheduledTask's.
- schedule_status.tasks = []
- for i in range(20):
- task_status = Mock(spec=ScheduledTask)
- task_status.assignedTask = Mock(spec=AssignedTask)
- task_status.assignedTask.instanceId = i
- task_status.assignedTask.taskId = "Task%s" % i
- task_status.assignedTask.slaveId = "Slave%s" % i
- task_status.slaveHost = "Slave%s" % i
- task_status.assignedTask.task = mock_task_config
- schedule_status.tasks.append(task_status)
@classmethod
def setup_health_checks(cls, mock_api):
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/77db3e54/src/test/python/apache/aurora/client/cli/util.py
----------------------------------------------------------------------
diff --git a/src/test/python/apache/aurora/client/cli/util.py b/src/test/python/apache/aurora/client/cli/util.py
index db65eee..bc2a3e9 100644
--- a/src/test/python/apache/aurora/client/cli/util.py
+++ b/src/test/python/apache/aurora/client/cli/util.py
@@ -17,9 +17,12 @@
import unittest
from gen.apache.aurora.ttypes import (
+ AssignedTask,
Response,
ResponseCode,
Result,
+ ScheduleStatusResult,
+ ScheduledTask,
)
from apache.aurora.client.cli.context import AuroraCommandContext
@@ -83,6 +86,7 @@ class FakeAuroraCommandContext(AuroraCommandContext):
self.task_status.append(expected_result)
# each call adds an expected query result, in order.
self.fake_api.scheduler_proxy.getTasksStatus.side_effect = self.task_status
+ self.fake_api.check_status.side_effect = self.task_status
class AuroraClientCommandTest(unittest.TestCase):
@@ -124,6 +128,32 @@ class AuroraClientCommandTest(unittest.TestCase):
mock_api_factory.return_value = mock_api
return mock_api_factory, mock_scheduler_client
+ @classmethod
+ def create_status_call_result(cls):
+ status_response = cls.create_simple_success_response()
+ schedule_status = Mock(spec=ScheduleStatusResult)
+ status_response.result.scheduleStatusResult = schedule_status
+ mock_task_config = Mock()
+ # This should be a list of ScheduledTask's.
+ schedule_status.tasks = []
+ for i in range(20):
+ task_status = Mock(spec=ScheduledTask)
+ task_status.assignedTask = Mock(spec=AssignedTask)
+ task_status.assignedTask.instanceId = i
+ task_status.assignedTask.taskId = "Task%s" % i
+ task_status.assignedTask.slaveId = "Slave%s" % i
+ task_status.slaveHost = "Slave%s" % i
+ task_status.assignedTask.task = mock_task_config
+ schedule_status.tasks.append(task_status)
+ return status_response
+
+
+ @classmethod
+ def setup_get_tasks_status_calls(cls, scheduler):
+ status_response = cls.create_status_call_result()
+ scheduler.getTasksStatus.return_value = status_response
+
+
FAKE_TIME = 42131
@classmethod