You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aurora.apache.org by mc...@apache.org on 2014/03/26 00:28:59 UTC

git commit: Add batching to the kill and killall commands.

Repository: incubator-aurora
Updated Branches:
  refs/heads/master a4d30e8e5 -> 77db3e541


Add batching to the kill and killall commands.

Bugs closed: aurora-283

Reviewed at https://reviews.apache.org/r/19466/


Project: http://git-wip-us.apache.org/repos/asf/incubator-aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-aurora/commit/77db3e54
Tree: http://git-wip-us.apache.org/repos/asf/incubator-aurora/tree/77db3e54
Diff: http://git-wip-us.apache.org/repos/asf/incubator-aurora/diff/77db3e54

Branch: refs/heads/master
Commit: 77db3e5416eefd947090a8e9ec290c6744029a1f
Parents: a4d30e8
Author: Mark Chu-Carroll <mc...@twopensource.com>
Authored: Tue Mar 25 19:24:57 2014 -0400
Committer: Mark Chu-Carroll <mc...@twitter.com>
Committed: Tue Mar 25 19:24:57 2014 -0400

----------------------------------------------------------------------
 .../python/apache/aurora/client/cli/context.py  |   1 +
 .../python/apache/aurora/client/cli/jobs.py     | 131 ++++++++++++++----
 .../python/apache/aurora/client/cli/options.py  |  44 +++++-
 .../python/apache/aurora/client/cli/task.py     |   1 -
 .../apache/aurora/client/cli/test_kill.py       | 134 +++++++++++++++++--
 .../apache/aurora/client/cli/test_restart.py    |  18 ---
 .../python/apache/aurora/client/cli/util.py     |  30 +++++
 7 files changed, 303 insertions(+), 56 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/77db3e54/src/main/python/apache/aurora/client/cli/context.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/aurora/client/cli/context.py b/src/main/python/apache/aurora/client/cli/context.py
index c07f54a..83f7b6a 100644
--- a/src/main/python/apache/aurora/client/cli/context.py
+++ b/src/main/python/apache/aurora/client/cli/context.py
@@ -150,6 +150,7 @@ class AuroraCommandContext(Context):
       return jobs
 
   def get_job_status(self, key):
+    """Returns a list of task instances running under the job."""
     api = self.get_api(key.cluster)
     resp = api.check_status(key)
     if resp.responseCode is not ResponseCode.OK:

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/77db3e54/src/main/python/apache/aurora/client/cli/jobs.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/aurora/client/cli/jobs.py b/src/main/python/apache/aurora/client/cli/jobs.py
index 290a8ba..e354ee1 100644
--- a/src/main/python/apache/aurora/client/cli/jobs.py
+++ b/src/main/python/apache/aurora/client/cli/jobs.py
@@ -36,6 +36,7 @@ from apache.aurora.client.cli import (
 )
 from apache.aurora.client.cli.context import AuroraCommandContext
 from apache.aurora.client.cli.options import (
+    ALL_INSTANCES,
     BATCH_OPTION,
     BIND_OPTION,
     BROWSER_OPTION,
@@ -43,10 +44,12 @@ from apache.aurora.client.cli.options import (
     CONFIG_ARGUMENT,
     FORCE_OPTION,
     HEALTHCHECK_OPTION,
-    INSTANCES_OPTION,
+    INSTANCES_SPEC_ARGUMENT,
     JOBSPEC_ARGUMENT,
     JSON_READ_OPTION,
     JSON_WRITE_OPTION,
+    MAX_TOTAL_FAILURES_OPTION,
+    NO_BATCHING_OPTION,
     WATCH_OPTION,
 )
 from apache.aurora.common.aurora_job_key import AuroraJobKey
@@ -273,33 +276,103 @@ the parsed configuration."""
     return EXIT_OK
 
 
-class KillJobCommand(Verb):
+class AbstractKillCommand(Verb):
+  def get_options(self):
+    return [BROWSER_OPTION,
+        CommandOption('--config', type=str, default=None, dest='config',
+            help='Config file for the job, possibly containing hooks'),
+        BATCH_OPTION,
+        MAX_TOTAL_FAILURES_OPTION,
+        NO_BATCHING_OPTION]
+
+  def kill_in_batches(self, context, job, instances_arg):
+    api = context.get_api(job.cluster)
+    # query the job, to get the list of active instances.
+    tasks = context.get_job_status(job)
+    if tasks is None or len(tasks) == 0:
+      context.print_err('No tasks to kill found for job %s' % job)
+      return EXIT_INVALID_PARAMETER
+    instance_ids = set(instance.assignedTask.instanceId for instance in tasks)
+    # intersect that with the set of shards specified by the user.
+    instances_to_kill = (instance_ids & set(instances_arg) if instances_arg is not None
+        else instance_ids)
+    # kill the shard batches.
+    errors = 0
+    while len(instances_to_kill) > 0:
+      batch = []
+      for i in range(min(context.options.batch_size, len(instances_to_kill))):
+        batch.append(instances_to_kill.pop())
+      resp = api.kill_job(job, batch)
+      if resp.responseCode is not ResponseCode.OK:
+        resp.print_log('Kill of shards %s failed with error %s' % (batch, resp.message))
+        errors += 1
+        if errors > context.options.max_total_failures:
+          raise context.CommandError(EXIT_COMMAND_FAILURE,
+               'Exceeded maximum number of errors while killing instances')
+    if errors > 0:
+      context.print_err('Warning: Errors occurred during batch kill')
+      raise context.CommandError(EXIT_COMMAND_FAILURE, "Errors occurred while killing instances")
+
+
+class KillCommand(AbstractKillCommand):
   @property
   def name(self):
     return 'kill'
 
   @property
   def help(self):
-    return "Kill a scheduled job"
+    return "Kill instances in a scheduled job"
 
   def get_options(self):
-    return [BROWSER_OPTION, INSTANCES_OPTION,
-        CommandOption('--config', type=str, default=None, dest='config',
-            help='Config file for the job, possibly containing hooks'),
-        JOBSPEC_ARGUMENT]
+    return super(KillCommand, self).get_options() + [INSTANCES_SPEC_ARGUMENT]
 
   def execute(self, context):
-    # TODO(mchucarroll): Check for wildcards; we don't allow wildcards for job kill.
-    api = context.get_api(context.options.jobspec.cluster)
-    resp = api.kill_job(context.options.jobspec, context.options.instances)
-    if resp.responseCode != ResponseCode.OK:
-      context.print_err('Job %s not found' % context.options.jobspec, file=sys.stderr)
-      return EXIT_INVALID_PARAMETER
+    job = context.options.instance_spec.jobkey
+    instances_arg = context.options.instance_spec.instance
+    if instances_arg == ALL_INSTANCES:
+      raise context.CommandError(EXIT_INVALID_PARAMETER,
+          'The instances list cannot be omitted in a kill command!; '
+          'use killall to kill all instances')
+    api = context.get_api(job.cluster)
+    if context.options.no_batching:
+      resp = api.kill_job(job, instances_arg)
+      if resp.responseCode != ResponseCode.OK:
+        context.print_err('Job %s not found' % job, file=sys.stderr)
+        return EXIT_INVALID_PARAMETER
+    else:
+      self.kill_in_batches(context, job, instances_arg)
     if context.options.open_browser:
       context.open_job_page(api, context.options.jobspec)
     return EXIT_OK
 
 
+class KillAllJobCommand(AbstractKillCommand):
+  @property
+  def name(self):
+    return 'killall'
+
+  @property
+  def help(self):
+    return "Kill all instances of a scheduled job"
+
+  def get_options(self):
+    return super(KillAllJobCommand, self).get_options() + [JOBSPEC_ARGUMENT]
+
+  def execute(self, context):
+    job = context.options.jobspec
+    api = context.get_api(job.cluster)
+    if context.options.no_batching:
+      resp = api.kill_job(job, None)
+      if resp.responseCode != ResponseCode.OK:
+        context.print_err('Job %s not found' % job, file=sys.stderr)
+        return EXIT_INVALID_PARAMETER
+    else:
+      self.kill_in_batches(context, job, None)
+    if context.options.open_browser:
+      context.open_job_page(api, job)
+    return EXIT_OK
+
+
 class ListJobsCommand(Verb):
 
   @property
@@ -329,18 +402,17 @@ class RestartCommand(Verb):
 
   def get_options(self):
     return [BATCH_OPTION, BIND_OPTION, BROWSER_OPTION, FORCE_OPTION, HEALTHCHECK_OPTION,
-        INSTANCES_OPTION, JSON_READ_OPTION, WATCH_OPTION,
+        JSON_READ_OPTION, WATCH_OPTION,
         CommandOption('--max-per-instance-failures', type=int, default=0,
              help='Maximum number of restarts per instance during restart. Increments total failure '
                  'count when this limit is exceeded.'),
         CommandOption('--restart-threshold', type=int, default=60,
              help='Maximum number of seconds before a shard must move into the RUNNING state '
                  'before considered a failure.'),
-        CommandOption('--max-total-failures', type=int, default=0,
-             help='Maximum number of instance failures to be tolerated in total during restart.'),
+        MAX_TOTAL_FAILURES_OPTION,
         CommandOption('--rollback-on-failure', default=True, action='store_false',
             help='If false, prevent update from performing a rollback.'),
-        JOBSPEC_ARGUMENT, CONFIG_ARGUMENT]
+        INSTANCES_SPEC_ARGUMENT, CONFIG_ARGUMENT]
 
   @property
   def help(self):
@@ -348,8 +420,11 @@ class RestartCommand(Verb):
 Restarts are fully controlled client-side, so aborting halts the restart."""
 
   def execute(self, context):
-    api = context.get_api(context.options.jobspec.cluster)
-    config = (context.get_job_config(context.options.jobspec, context.options.config_file)
+    job = context.options.instance_spec.jobkey
+    instances = (None if context.options.instance_spec.instance == ALL_INSTANCES else
+        context.options.instance_spec.instance)
+    api = context.get_api(job.cluster)
+    config = (context.get_job_config(job, context.options.config_file)
         if context.options.config_file else None)
     updater_config = UpdaterConfig(
         context.options.batch_size,
@@ -358,7 +433,7 @@ Restarts are fully controlled client-side, so aborting halts the restart."""
         context.options.max_per_instance_failures,
         context.options.max_total_failures,
         context.options.rollback_on_failure)
-    resp = api.restart(context.options.jobspec, context.options.instances, updater_config,
+    resp = api.restart(job, instances, updater_config,
         context.options.healthcheck_interval_seconds, config=config)
 
     context.check_and_log_response(resp)
@@ -460,8 +535,8 @@ class UpdateCommand(Verb):
     return 'update'
 
   def get_options(self):
-    return [FORCE_OPTION, BIND_OPTION, JSON_READ_OPTION, INSTANCES_OPTION, HEALTHCHECK_OPTION,
-        JOBSPEC_ARGUMENT, CONFIG_ARGUMENT]
+    return [FORCE_OPTION, BIND_OPTION, JSON_READ_OPTION, HEALTHCHECK_OPTION,
+        INSTANCES_SPEC_ARGUMENT, CONFIG_ARGUMENT]
 
   @property
   def help(self):
@@ -505,12 +580,15 @@ to preview what changes will take effect.
       time.sleep(5)
 
   def execute(self, context):
-    config = context.get_job_config(context.options.jobspec, context.options.config_file)
+    job = context.options.instance_spec.jobkey
+    instances = (None if context.options.instance_spec.instance == ALL_INSTANCES else
+        context.options.instance_spec.instance)
+    config = context.get_job_config(job, context.options.config_file)
     api = context.get_api(config.cluster())
     if not context.options.force:
-      self.warn_if_dangerous_change(context, api, context.options.jobspec, config)
+      self.warn_if_dangerous_change(context, api, job, config)
     resp = api.update_job(config, context.options.healthcheck_interval_seconds,
-        context.options.instances)
+        instances)
     if resp.responseCode != ResponseCode.OK:
       raise context.CommandError(EXIT_COMMAND_FAILURE, 'Update failed: %s' % resp.message)
     return EXIT_OK
@@ -535,7 +613,8 @@ class Job(Noun):
     self.register_verb(CreateJobCommand())
     self.register_verb(DiffCommand())
     self.register_verb(InspectCommand())
-    self.register_verb(KillJobCommand())
+    self.register_verb(KillCommand())
+    self.register_verb(KillAllJobCommand())
     self.register_verb(ListJobsCommand())
     self.register_verb(RestartCommand())
     self.register_verb(StatusCommand())

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/77db3e54/src/main/python/apache/aurora/client/cli/options.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/aurora/client/cli/options.py b/src/main/python/apache/aurora/client/cli/options.py
index 1f33ea2..040c5c2 100644
--- a/src/main/python/apache/aurora/client/cli/options.py
+++ b/src/main/python/apache/aurora/client/cli/options.py
@@ -41,9 +41,11 @@ class CommandOption(object):
     self.type = kwargs.get('type')
     self.help = kwargs.get('help', '')
 
+
   def is_mandatory(self):
     return self.kwargs.get('required', not self.name.startswith('--'))
 
+
   def get_displayname(self):
     """Get a display name for a the expected format of a parameter value"""
     if 'metavar' in self.kwargs:
@@ -58,6 +60,7 @@ class CommandOption(object):
       displayname = "value"
     return displayname
 
+
   def render_usage(self):
     """Create a usage string for this option"""
     if not self.name.startswith('--'):
@@ -72,6 +75,7 @@ class CommandOption(object):
     else:
       return "[%s=%s]" % (self.name, self.get_displayname())
 
+
   def render_help(self):
     """Render a full help message for this option"""
     result = ""
@@ -98,14 +102,20 @@ def parse_qualified_role(rolestr):
     raise ArgumentTypeError('Role argument must be a CLUSTER/NAME pair')
   return role_parts
 
+
+ALL_INSTANCES = 'all'
+
+
 def parse_instances(instances):
-  """Parse lists of instances or instance ranges into a set().
+  """Parse lists of instances or instance ranges into a set(). This accepts a comma-separated
+  list of instances.
+
      Examples:
        0-2
        0,1-3,5
        1,3,5
   """
-  if instances is None or instances == '':
+  if instances is None or instances == "":
     return None
   result = set()
   for part in instances.split(','):
@@ -113,6 +123,7 @@ def parse_instances(instances):
     result.update(range(int(x[0]), int(x[-1]) + 1))
   return sorted(result)
 
+
 def parse_time_values(time_values):
   """Parse lists of discrete time values. Every value must be in the following format: XdYhZmWs.
      Examples:
@@ -124,6 +135,7 @@ def parse_time_values(time_values):
   except ValueError as e:
     raise ArgumentTypeError(e)
 
+
 def parse_percentiles(percentiles):
   """Parse lists of percentile values in (0,100) range.
      Examples:
@@ -145,6 +157,7 @@ def parse_percentiles(percentiles):
 
 TaskInstanceKey = namedtuple('TaskInstanceKey', [ 'jobkey', 'instance' ])
 
+
 def parse_task_instance_key(key):
   pieces = key.split('/')
   if len(pieces) != 5:
@@ -158,6 +171,21 @@ def parse_task_instance_key(key):
   return TaskInstanceKey(AuroraJobKey(cluster, role, env, name), instance)
 
 
+def instance_specifier(spec_str):
+  if spec_str is None or spec_str == '':
+    raise ValueError('Instance specifier must be non-empty')
+  parts = spec_str.split('/')
+  if len(parts) == 4:
+    jobkey = AuroraJobKey(*parts)
+    return TaskInstanceKey(jobkey, ALL_INSTANCES)
+  elif len(parts) != 5:
+    raise ArgumentTypeError('Instance specifier must be a CLUSTER/ROLE/ENV/JOB/INSTANCES tuple')
+  (cluster, role, env, name, instance_str) = parts
+  jobkey = AuroraJobKey(cluster, role, env, name)
+  instances = parse_instances(instance_str)
+  return TaskInstanceKey(jobkey, instances)
+
+
 BATCH_OPTION = CommandOption('--batch-size', type=int, default=5,
         help='Number of instances to be operate on in one iteration')
 
@@ -197,6 +225,11 @@ INSTANCES_OPTION = CommandOption('--instances', type=parse_instances, dest='inst
          'or a range (e.g. 0-2) or any combination of the two (e.g. 0-2,5,7-9). If not set, '
          'all instances will be acted on.')
 
+INSTANCES_SPEC_ARGUMENT = CommandOption('instance_spec', type=instance_specifier,
+    default=None, metavar="CLUSTER/ROLE/ENV/NAME[/INSTANCES]",
+    help=('Fully specified job instance key, in CLUSTER/ROLE/ENV/NAME[/INSTANCES] format. '
+        'If INSTANCES is omitted, then all instances will be operated on.'))
+
 
 JOBSPEC_ARGUMENT = CommandOption('jobspec', type=AuroraJobKey.from_path,
     metavar="CLUSTER/ROLE/ENV/NAME",
@@ -212,6 +245,13 @@ JSON_WRITE_OPTION = CommandOption('--write-json', default=False, dest='write_jso
     action='store_true',
     help='Generate command output in JSON format')
 
+MAX_TOTAL_FAILURES_OPTION = CommandOption('--max-total-failures', type=int, default=0,
+     help='Maximum number of instance failures to be tolerated in total before aborting.')
+
+
+NO_BATCHING_OPTION = CommandOption('--no-batching', default=False, action='store_true',
+  help='Run the command on all instances at once, instead of running in batches')
+
 
 ROLE_ARGUMENT = CommandOption('role', type=parse_qualified_role, metavar='CLUSTER/NAME',
     help='Rolename to retrieve information about')

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/77db3e54/src/main/python/apache/aurora/client/cli/task.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/aurora/client/cli/task.py b/src/main/python/apache/aurora/client/cli/task.py
index 8d4d38e..e4ee77e 100644
--- a/src/main/python/apache/aurora/client/cli/task.py
+++ b/src/main/python/apache/aurora/client/cli/task.py
@@ -47,7 +47,6 @@ from apache.aurora.client.cli.options import (
     EXECUTOR_SANDBOX_OPTION,
     FORCE_OPTION,
     HEALTHCHECK_OPTION,
-    INSTANCES_OPTION,
     JOBSPEC_ARGUMENT,
     JSON_READ_OPTION,
     JSON_WRITE_OPTION,

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/77db3e54/src/test/python/apache/aurora/client/cli/test_kill.py
----------------------------------------------------------------------
diff --git a/src/test/python/apache/aurora/client/cli/test_kill.py b/src/test/python/apache/aurora/client/cli/test_kill.py
index 6040ed4..2088160 100644
--- a/src/test/python/apache/aurora/client/cli/test_kill.py
+++ b/src/test/python/apache/aurora/client/cli/test_kill.py
@@ -20,14 +20,15 @@ import unittest
 from twitter.common.contextutil import temporary_file
 
 from apache.aurora.client.cli.client import AuroraCommandLine
+from apache.aurora.client.cli.options import parse_instances
+from apache.aurora.client.cli.util import AuroraClientCommandTest, FakeAuroraCommandContext
 from apache.aurora.client.hooks.hooked_api import HookedAuroraClientAPI
 from apache.aurora.common.aurora_job_key import AuroraJobKey
 from twitter.common.contextutil import temporary_file
-from apache.aurora.client.cli.options import parse_instances
-from apache.aurora.client.cli.util import AuroraClientCommandTest, FakeAuroraCommandContext
 
 from gen.apache.aurora.ttypes import (
     Identity,
+    ScheduleStatusResult,
     TaskQuery,
 )
 
@@ -54,7 +55,7 @@ class TestClientKillCommand(AuroraClientCommandTest):
   def assert_kill_job_called(cls, mock_api):
     assert mock_api.kill_job.call_count == 1
 
-  def test_kill_job(self):
+  def test_killall_job(self):
     """Test kill client-side API logic."""
     mock_context = FakeAuroraCommandContext()
     mock_scheduler_proxy = Mock()
@@ -63,38 +64,153 @@ class TestClientKillCommand(AuroraClientCommandTest):
         patch('apache.aurora.client.factory.CLUSTERS', new=self.TEST_CLUSTERS)):
 
       api = mock_context.get_api('west')
+      mock_scheduler_proxy.getTasksStatus.return_value = self.create_status_call_result()
       api.kill_job.return_value = self.get_kill_job_response()
       mock_scheduler_proxy.killTasks.return_value = self.get_kill_job_response()
       with temporary_file() as fp:
         fp.write(self.get_valid_config())
         fp.flush()
         cmd = AuroraCommandLine()
-        cmd.execute(['job', 'kill', '--config=%s' % fp.name, 'west/bozo/test/hello'])
+        cmd.execute(['job', 'killall', '--no-batching', '--config=%s' % fp.name, 'west/bozo/test/hello'])
 
       # Now check that the right API calls got made.
       assert api.kill_job.call_count == 1
       api.kill_job.assert_called_with(AuroraJobKey.from_path('west/bozo/test/hello'), None)
 
-  def test_kill_job_with_instances(self):
+  def test_killall_job(self):
+    """Test kill client-side API logic."""
+    mock_context = FakeAuroraCommandContext()
+    mock_scheduler_proxy = Mock()
+    with contextlib.nested(
+        patch('apache.aurora.client.cli.jobs.Job.create_context', return_value=mock_context),
+        patch('apache.aurora.client.factory.CLUSTERS', new=self.TEST_CLUSTERS)):
+
+      api = mock_context.get_api('west')
+      api.kill_job.return_value = self.get_kill_job_response()
+      mock_context.add_expected_status_query_result(self.create_status_call_result())
+      mock_scheduler_proxy.killTasks.return_value = self.get_kill_job_response()
+      with temporary_file() as fp:
+        fp.write(self.get_valid_config())
+        fp.flush()
+        cmd = AuroraCommandLine()
+        cmd.execute(['job', 'killall', '--config=%s' % fp.name, 'west/bozo/test/hello'])
+
+      # Now check that the right API calls got made.
+      assert api.kill_job.call_count == 4
+      api.kill_job.assert_called_with(AuroraJobKey.from_path('west/bozo/test/hello'), [15, 16, 17, 18, 19])
+
+  def test_kill_job_with_instances_nobatching(self):
     """Test kill client-side API logic."""
     mock_context = FakeAuroraCommandContext()
     with contextlib.nested(
         patch('apache.aurora.client.cli.jobs.Job.create_context', return_value=mock_context),
         patch('apache.aurora.client.factory.CLUSTERS', new=self.TEST_CLUSTERS)):
       api = mock_context.get_api('west')
+      self.setup_get_tasks_status_calls(api.scheduler_proxy)
       api.kill_job.return_value = self.get_kill_job_response()
       with temporary_file() as fp:
         fp.write(self.get_valid_config())
         fp.flush()
         cmd = AuroraCommandLine()
-        cmd.execute(['job', 'kill', '--config=%s' % fp.name, '--instances=0,2,4-6',
-           'west/bozo/test/hello'])
+        cmd.execute(['job', 'kill', '--config=%s' % fp.name, '--no-batching', 'west/bozo/test/hello/0,2,4-6'])
 
       # Now check that the right API calls got made.
       assert api.kill_job.call_count == 1
       api.kill_job.assert_called_with(AuroraJobKey.from_path('west/bozo/test/hello'),
           [0, 2, 4, 5, 6])
 
+  def test_kill_job_with_instances_batched(self):
+    """Test kill client-side API logic."""
+    mock_context = FakeAuroraCommandContext()
+    with contextlib.nested(
+        patch('apache.aurora.client.cli.jobs.Job.create_context', return_value=mock_context),
+        patch('apache.aurora.client.factory.CLUSTERS', new=self.TEST_CLUSTERS)):
+      api = mock_context.get_api('west')
+      status_result = self.create_status_call_result()
+      mock_context.add_expected_status_query_result(status_result)
+      api.kill_job.return_value = self.get_kill_job_response()
+      with temporary_file() as fp:
+        fp.write(self.get_valid_config())
+        fp.flush()
+        cmd = AuroraCommandLine()
+        cmd.execute(['job', 'kill', '--config=%s' % fp.name, 'west/bozo/test/hello/0,2,4-6'])
+
+      # Now check that the right API calls got made.
+      assert api.kill_job.call_count == 1
+      api.kill_job.assert_called_with(AuroraJobKey.from_path('west/bozo/test/hello'),
+          [0, 2, 4, 5, 6])
+
+  def test_kill_job_with_instances_batched_large(self):
+    """Test kill client-side API logic."""
+    mock_context = FakeAuroraCommandContext()
+    with contextlib.nested(
+        patch('apache.aurora.client.cli.jobs.Job.create_context', return_value=mock_context),
+        patch('apache.aurora.client.factory.CLUSTERS', new=self.TEST_CLUSTERS)):
+      api = mock_context.get_api('west')
+      status_result = self.create_status_call_result()
+      mock_context.add_expected_status_query_result(status_result)
+      api.kill_job.return_value = self.get_kill_job_response()
+      with temporary_file() as fp:
+        fp.write(self.get_valid_config())
+        fp.flush()
+        cmd = AuroraCommandLine()
+        cmd.execute(['job', 'kill', '--config=%s' % fp.name, 'west/bozo/test/hello/0,2,4-13'])
+
+      # Now check that the right API calls got made.
+      assert api.kill_job.call_count == 3
+      api.kill_job.assert_called_with(AuroraJobKey.from_path('west/bozo/test/hello'),
+          [12, 13])
+
+  def test_kill_job_with_instances_batched_maxerrors(self):
+    """Test kill client-side API logic."""
+    mock_context = FakeAuroraCommandContext()
+    with contextlib.nested(
+        patch('apache.aurora.client.cli.jobs.Job.create_context', return_value=mock_context),
+        patch('apache.aurora.client.factory.CLUSTERS', new=self.TEST_CLUSTERS)):
+      api = mock_context.get_api('west')
+      status_result = self.create_status_call_result()
+      failed_status_result  = self.create_error_response()
+      mock_context.add_expected_status_query_result(status_result)
+      mock_context.add_expected_status_query_result(failed_status_result)
+      mock_context.add_expected_status_query_result(failed_status_result)
+      mock_context.add_expected_status_query_result(status_result)
+      mock_context.add_expected_status_query_result(status_result)
+      api.kill_job.return_value = self.get_kill_job_response()
+      with temporary_file() as fp:
+        fp.write(self.get_valid_config())
+        fp.flush()
+        cmd = AuroraCommandLine()
+        cmd.execute(['job', 'kill', '--max-total-failures=1', '--config=%s' % fp.name, 'west/bozo/test/hello/0,2,4-13'])
+
+      # Now check that the right API calls got made. We should have aborted after the third batch.
+      assert api.kill_job.call_count == 3
+
+
+  def test_kill_job_with_empty_instances_batched(self):
+    """Test kill client-side API logic."""
+    mock_context = FakeAuroraCommandContext()
+    with contextlib.nested(
+        patch('apache.aurora.client.cli.jobs.Job.create_context', return_value=mock_context),
+        patch('apache.aurora.client.factory.CLUSTERS', new=self.TEST_CLUSTERS)):
+      api = mock_context.get_api('west')
+      # set up an empty instance list in the getTasksStatus response
+      status_response = self.create_simple_success_response()
+      schedule_status = Mock(spec=ScheduleStatusResult)
+      status_response.result.scheduleStatusResult = schedule_status
+      mock_task_config = Mock()
+      schedule_status.tasks = []
+      mock_context.add_expected_status_query_result(status_response)
+      api.kill_job.return_value = self.get_kill_job_response()
+      with temporary_file() as fp:
+        fp.write(self.get_valid_config())
+        fp.flush()
+        cmd = AuroraCommandLine()
+        cmd.execute(['job', 'kill', '--config=%s' % fp.name, 'west/bozo/test/hello/0,2,4-13'])
+
+      # Now check that the right API calls got made.
+      assert api.kill_job.call_count == 0
+
+
   def test_kill_job_with_instances_deep_api(self):
     """Test kill client-side API logic."""
     (mock_api, mock_scheduler_proxy) = self.create_mock_api()
@@ -102,12 +218,12 @@ class TestClientKillCommand(AuroraClientCommandTest):
         patch('apache.aurora.client.api.SchedulerProxy', return_value=mock_scheduler_proxy),
         patch('apache.aurora.client.factory.CLUSTERS', new=self.TEST_CLUSTERS)):
       mock_scheduler_proxy.killTasks.return_value = self.get_kill_job_response()
+      self.setup_get_tasks_status_calls(mock_scheduler_proxy)
       with temporary_file() as fp:
         fp.write(self.get_valid_config())
         fp.flush()
         cmd = AuroraCommandLine()
-        cmd.execute(['job', 'kill', '--config=%s' % fp.name, '--instances=0,2,4-6',
-           'west/bozo/test/hello'])
+        cmd.execute(['job', 'kill', '--config=%s' % fp.name, 'west/bozo/test/hello/0,2,4-6'])
       # Now check that the right API calls got made.
       assert mock_scheduler_proxy.killTasks.call_count == 1
       mock_scheduler_proxy.killTasks.assert_called_with(

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/77db3e54/src/test/python/apache/aurora/client/cli/test_restart.py
----------------------------------------------------------------------
diff --git a/src/test/python/apache/aurora/client/cli/test_restart.py b/src/test/python/apache/aurora/client/cli/test_restart.py
index 7547bd7..6bcdd6a 100644
--- a/src/test/python/apache/aurora/client/cli/test_restart.py
+++ b/src/test/python/apache/aurora/client/cli/test_restart.py
@@ -39,24 +39,6 @@ class TestRestartCommand(AuroraClientCommandTest):
     populate.result.populateJobResult.populated = set(configs)
     return populate
 
-  @classmethod
-  def setup_get_tasks_status_calls(cls, scheduler):
-    status_response = cls.create_simple_success_response()
-    scheduler.getTasksStatus.return_value = status_response
-    schedule_status = Mock(spec=ScheduleStatusResult)
-    status_response.result.scheduleStatusResult = schedule_status
-    mock_task_config = Mock()
-    # This should be a list of ScheduledTask's.
-    schedule_status.tasks = []
-    for i in range(20):
-      task_status = Mock(spec=ScheduledTask)
-      task_status.assignedTask = Mock(spec=AssignedTask)
-      task_status.assignedTask.instanceId = i
-      task_status.assignedTask.taskId = "Task%s" % i
-      task_status.assignedTask.slaveId = "Slave%s" % i
-      task_status.slaveHost = "Slave%s" % i
-      task_status.assignedTask.task = mock_task_config
-      schedule_status.tasks.append(task_status)
 
   @classmethod
   def setup_health_checks(cls, mock_api):

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/77db3e54/src/test/python/apache/aurora/client/cli/util.py
----------------------------------------------------------------------
diff --git a/src/test/python/apache/aurora/client/cli/util.py b/src/test/python/apache/aurora/client/cli/util.py
index db65eee..bc2a3e9 100644
--- a/src/test/python/apache/aurora/client/cli/util.py
+++ b/src/test/python/apache/aurora/client/cli/util.py
@@ -17,9 +17,12 @@
 import unittest
 
 from gen.apache.aurora.ttypes import (
+    AssignedTask,
     Response,
     ResponseCode,
     Result,
+    ScheduleStatusResult,
+    ScheduledTask,
 )
 
 from apache.aurora.client.cli.context import AuroraCommandContext
@@ -83,6 +86,7 @@ class FakeAuroraCommandContext(AuroraCommandContext):
     self.task_status.append(expected_result)
     # each call adds an expected query result, in order.
     self.fake_api.scheduler_proxy.getTasksStatus.side_effect = self.task_status
+    self.fake_api.check_status.side_effect = self.task_status
 
 
 class AuroraClientCommandTest(unittest.TestCase):
@@ -124,6 +128,32 @@ class AuroraClientCommandTest(unittest.TestCase):
     mock_api_factory.return_value = mock_api
     return mock_api_factory, mock_scheduler_client
 
+  @classmethod
+  def create_status_call_result(cls):
+    status_response = cls.create_simple_success_response()
+    schedule_status = Mock(spec=ScheduleStatusResult)
+    status_response.result.scheduleStatusResult = schedule_status
+    mock_task_config = Mock()
+    # This should be a list of ScheduledTask's.
+    schedule_status.tasks = []
+    for i in range(20):
+      task_status = Mock(spec=ScheduledTask)
+      task_status.assignedTask = Mock(spec=AssignedTask)
+      task_status.assignedTask.instanceId = i
+      task_status.assignedTask.taskId = "Task%s" % i
+      task_status.assignedTask.slaveId = "Slave%s" % i
+      task_status.slaveHost = "Slave%s" % i
+      task_status.assignedTask.task = mock_task_config
+      schedule_status.tasks.append(task_status)
+    return status_response
+
+
+  @classmethod
+  def setup_get_tasks_status_calls(cls, scheduler):
+    status_response = cls.create_status_call_result()
+    scheduler.getTasksStatus.return_value = status_response
+
+
   FAKE_TIME = 42131
 
   @classmethod