You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aurora.apache.org by mc...@apache.org on 2014/05/06 21:35:17 UTC

git commit: Add batch options to kill and killall. (This brings the kill commands in client v1 into parity with client v2. At the moment, in order to avoid disrupting current users, v1 still defaults to non-batched operation.)

Repository: incubator-aurora
Updated Branches:
  refs/heads/master fb5027b55 -> a4602b050


Add batch options to kill and killall. (This brings the kill commands in client v1
into parity with client v2. At the moment, in order to avoid disrupting current users,
v1 still defaults to non-batched operation.)

Bugs closed: aurora-356

Reviewed at https://reviews.apache.org/r/20723/


Project: http://git-wip-us.apache.org/repos/asf/incubator-aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-aurora/commit/a4602b05
Tree: http://git-wip-us.apache.org/repos/asf/incubator-aurora/tree/a4602b05
Diff: http://git-wip-us.apache.org/repos/asf/incubator-aurora/diff/a4602b05

Branch: refs/heads/master
Commit: a4602b0505fb609318486d0d1a8ef2c066070e34
Parents: fb5027b
Author: Mark Chu-Carroll <mc...@twopensource.com>
Authored: Tue May 6 15:32:46 2014 -0400
Committer: Mark Chu-Carroll <mc...@twitter.com>
Committed: Tue May 6 15:32:46 2014 -0400

----------------------------------------------------------------------
 .../apache/aurora/client/commands/core.py       |  63 ++++++++-
 src/main/python/apache/aurora/client/options.py |  14 ++
 .../apache/aurora/client/commands/test_kill.py  | 141 +++++++++++++++++++
 3 files changed, 214 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/a4602b05/src/main/python/apache/aurora/client/commands/core.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/aurora/client/commands/core.py b/src/main/python/apache/aurora/client/commands/core.py
index 089a2c6..0046c76 100644
--- a/src/main/python/apache/aurora/client/commands/core.py
+++ b/src/main/python/apache/aurora/client/commands/core.py
@@ -43,6 +43,7 @@ from apache.aurora.client.api.updater_util import UpdaterConfig
 from apache.aurora.client.config import get_config, GlobalHookRegistry
 from apache.aurora.client.factory import make_client, make_client_factory
 from apache.aurora.client.options import (
+    BATCH_OPTION,
     CLUSTER_CONFIG_OPTION,
     CLUSTER_INVOKE_OPTION,
     CLUSTER_NAME_OPTION,
@@ -52,6 +53,7 @@ from apache.aurora.client.options import (
     FROM_JOBKEY_OPTION,
     HEALTH_CHECK_INTERVAL_SECONDS_OPTION,
     JSON_OPTION,
+    MAX_FAILURES_OPTION,
     OPEN_BROWSER_OPTION,
     SHARDS_OPTION,
     WAIT_UNTIL_OPTION)
@@ -396,6 +398,8 @@ def list_jobs(cluster_and_role):
 @app.command_option(OPEN_BROWSER_OPTION)
 @app.command_option(SHARDS_OPTION)
 @app.command_option(DISABLE_HOOKS_OPTION)
+@app.command_option(BATCH_OPTION)
+@app.command_option(MAX_FAILURES_OPTION)
 def kill(args, options):
   """usage: kill --shards=shardspec cluster/role/env/job
 
@@ -410,16 +414,64 @@ def kill(args, options):
       args, options, make_client_factory())
   options = app.get_options()
   config = get_job_config(job_key.to_path(), config_file, options) if config_file else None
-  resp = api.kill_job(job_key, options.shards, config=config)
-  check_and_log_response(resp)
+  if options.batch_size is not None:
+    kill_in_batches(api, job_key, options.shards, options.batch_size, options.max_failures_option)
+  else:
+    resp = api.kill_job(job_key, options.shards, config=config)
+    check_and_log_response(resp)
   handle_open(api.scheduler_proxy.scheduler_client().url, job_key.role, job_key.env, job_key.name)
   wait_kill_tasks(api.scheduler_proxy, job_key, options.shards)
 
 
+
+def kill_in_batches(api, job_key, instances_arg, batch_size, max_failures):
+  """ Common behavior shared by kill and killAll for killing instances in
+  a sequence of batches.
+  """
+  def make_batches(instances, batch_size):
+    result = []
+    while (len(instances) > 0):
+      batch = []
+      for i in range(min(batch_size, len(instances))):
+        batch.append(instances.pop())
+      result.append(batch)
+    return result
+
+
+  resp = api.check_status(job_key)
+  if resp.responseCode is not ResponseCode.OK:
+    log.error("Job %s could not be found" % job_key)
+    exit(1)
+  tasks = resp.result.scheduleStatusResult.tasks or None
+  if batch_size is not None and batch_size > 0 and tasks is not None:
+    instance_ids = set(instance.assignedTask.instanceId for instance in tasks)
+    instances_to_kill = instance_ids & set(instances_arg or instance_ids)
+    errors = 0
+    for batch in make_batches(instances_to_kill, batch_size):
+      resp = api.kill_job(job_key, batch)
+      if resp.responseCode is not ResponseCode.OK:
+        log.info("Kill of shards %s failed with error %s" % (batch, resp.message))
+        print('ERROR IN KILL_JOB')
+        errors += 1
+        if errors > max_failures:
+          log.error("Exceeded maximum number of errors while killing instances")
+          exit(1)
+    if errors > 0:
+      print("Warning: errors occurred during batch kill")
+      exit(1)
+  else:
+    if tasks is None or len(tasks) == 0:
+      log.error('No tasks to kill found for job %s' % job_key)
+      return 1
+
+
+
 @app.command
 @app.command_option(CLUSTER_INVOKE_OPTION)
 @app.command_option(OPEN_BROWSER_OPTION)
 @app.command_option(DISABLE_HOOKS_OPTION)
+@app.command_option(BATCH_OPTION)
+@app.command_option(MAX_FAILURES_OPTION)
 def killall(args, options):
   """usage: killall cluster/role/env/job
   Kills all tasks in a running job, blocking until all specified tasks have been terminated.
@@ -429,8 +481,11 @@ def killall(args, options):
   config_file = args[1] if len(args) > 1 else None  # the config for hooks
   config = get_job_config(job_key.to_path(), config_file, options) if config_file else None
   api = make_client(job_key.cluster)
-  resp = api.kill_job(job_key, None, config=config)
-  check_and_log_response(resp)
+  if options.batch_size is not None:
+    kill_in_batches(api, job_key, None, options.batch_size, options.max_failures_option)
+  else:
+    resp = api.kill_job(job_key, None, config=config)
+    check_and_log_response(resp)
   handle_open(api.scheduler_proxy.scheduler_client().url, job_key.role, job_key.env, job_key.name)
   wait_kill_tasks(api.scheduler_proxy, job_key)
 

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/a4602b05/src/main/python/apache/aurora/client/options.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/aurora/client/options.py b/src/main/python/apache/aurora/client/options.py
index 0d85c36..2bd76aa 100644
--- a/src/main/python/apache/aurora/client/options.py
+++ b/src/main/python/apache/aurora/client/options.py
@@ -21,6 +21,7 @@ from apache.thermos.common.options import add_binding_to
 
 
 __all__ = (
+  'BATCH_OPTION',
   'CLUSTER_CONFIG_OPTION',
   'CLUSTER_INVOKE_OPTION',
   'CLUSTER_NAME_OPTION',
@@ -31,6 +32,7 @@ __all__ = (
   'FROM_JOBKEY_OPTION',
   'HEALTH_CHECK_INTERVAL_SECONDS_OPTION',
   'JSON_OPTION',
+  'MAX_FAILURES_OPTION',
   'OPEN_BROWSER_OPTION',
   'SHARDS_OPTION',
   'SSH_USER_OPTION',
@@ -131,6 +133,18 @@ SHARDS_OPTION = optparse.Option(
     'all shards will be acted on.')
 
 
+BATCH_OPTION = optparse.Option(
+    '--batch_size', type='int', dest='batch_size', default=None,
+    help='The maximum number of instances to act on at a time')
+
+
+MAX_FAILURES_OPTION = optparse.Option(
+  '--max_total_failures',
+  type='int',
+  default=1,
+  help='The maximum number of errors in a batch operation before the operation should abort')
+
+
 FROM_JOBKEY_OPTION = optparse.Option('--from', dest='rename_from', type='string', default=None,
     metavar='CLUSTER/ROLE/ENV/JOB', action='callback', callback=parse_aurora_job_key_into,
     help='Job key to diff against.')

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/a4602b05/src/test/python/apache/aurora/client/commands/test_kill.py
----------------------------------------------------------------------
diff --git a/src/test/python/apache/aurora/client/commands/test_kill.py b/src/test/python/apache/aurora/client/commands/test_kill.py
index 7f0b73c..65c8dde 100644
--- a/src/test/python/apache/aurora/client/commands/test_kill.py
+++ b/src/test/python/apache/aurora/client/commands/test_kill.py
@@ -28,6 +28,10 @@ from apache.aurora.client.commands.util import AuroraClientCommandTest
 from gen.apache.aurora.api.ttypes import (
     AssignedTask,
     Identity,
+    Response,
+    ResponseCode,
+    Result,
+    ScheduledTask,
     ScheduleStatus,
     ScheduledTask,
     ScheduleStatusResult,
@@ -48,6 +52,8 @@ class TestClientKillCommand(AuroraClientCommandTest):
     mock_options.shards = None
     mock_options.cluster = None
     mock_options.json = False
+    mock_options.batch_size = None
+    mock_options.max_total_failures = 1
     mock_options.disable_all_hooks = False
     return mock_options
 
@@ -76,6 +82,11 @@ class TestClientKillCommand(AuroraClientCommandTest):
     return cls.create_simple_success_response()
 
   @classmethod
+  def get_kill_job_error_response(cls):
+    return cls.create_error_response()
+
+
+  @classmethod
   def assert_kill_job_called(cls, mock_api):
     assert mock_api.kill_job.call_count == 1
 
@@ -216,6 +227,68 @@ class TestClientKillCommand(AuroraClientCommandTest):
             name=self.TEST_JOB), None, config=mock_config)
       self.assert_scheduler_called(mock_api, self.get_expected_task_query(), 3)
 
+  def create_status_call_result(cls):
+    """Set up the mock status call that will be used to get a task list for
+    a batched kill command.
+    """
+    status_response = Mock(spec=Response)
+    status_response.responseCode = ResponseCode.OK
+    status_response.message = "Ok"
+    status_response.result = Mock(spec=Result)
+    schedule_status = Mock(spec=ScheduleStatusResult)
+    status_response.result.scheduleStatusResult = schedule_status
+    mock_task_config = Mock()
+    # This should be a list of ScheduledTask's.
+    schedule_status.tasks = []
+    for i in range(20):
+      task_status = Mock(spec=ScheduledTask)
+      task_status.assignedTask = Mock(spec=AssignedTask)
+      task_status.assignedTask.instanceId = i
+      task_status.assignedTask.taskId = "Task%s" % i
+      task_status.assignedTask.slaveId = "Slave%s" % i
+      task_status.slaveHost = "Slave%s" % i
+      task_status.assignedTask.task = mock_task_config
+      schedule_status.tasks.append(task_status)
+    return status_response
+
+  def test_successful_batched_killall_job(self):
+    """Run a test of the "kill" command against a mocked-out API:
+    Verifies that the kill command sends the right API RPCs, and performs the correct
+    tests on the result."""
+
+    mock_options = self.setup_mock_options()
+    mock_options.batch_size = 5
+    mock_config = Mock()
+    (mock_api, mock_scheduler_proxy) = self.create_mock_api()
+    mock_api.kill_job.return_value = self.get_kill_job_response()
+    mock_api.check_status.return_value = self.create_status_call_result()
+    with contextlib.nested(
+        patch('apache.aurora.client.commands.core.make_client',
+            return_value=mock_api),
+        patch('twitter.common.app.get_options', return_value=mock_options),
+        patch('apache.aurora.client.commands.core.get_job_config', return_value=mock_config),
+        patch('apache.aurora.client.commands.core.JobMonitor')) as (
+            mock_make_client,
+            options, mock_get_job_config, mock_monitor):
+
+      with temporary_file() as fp:
+        fp.write(self.get_valid_config())
+        fp.flush()
+        killall(['west/mchucarroll/test/hello', fp.name], mock_options)
+
+      # Now check that the right API calls got made.
+      assert mock_api.kill_job.call_count == 4
+      mock_api.kill_job.assert_called_with(
+        AuroraJobKey(cluster=self.TEST_CLUSTER, role=self.TEST_ROLE, env=self.TEST_ENV,
+            name=self.TEST_JOB), [15, 16, 17, 18, 19])
+
+  @classmethod
+  def get_expected_task_query(cls, shards=None):
+    """Helper to create the query that will be a parameter to job kill."""
+    instance_ids = frozenset(shards) if shards is not None else None
+    return TaskQuery(taskIds=None, jobName=cls.TEST_JOB, environment=cls.TEST_ENV,
+        instanceIds=instance_ids, owner=Identity(role=cls.TEST_ROLE, user=None))
+
   def test_kill_job_api_level(self):
     """Test kill client-side API logic."""
     mock_options = self.setup_mock_options()
@@ -292,3 +365,71 @@ class TestClientKillCommand(AuroraClientCommandTest):
       query = self.get_expected_task_query([0, 1, 2, 3])
       mock_scheduler_proxy.killTasks.assert_called_with(query, None)
       self.assert_scheduler_called(mock_api, query, 3)
+
+  def test_kill_job_api_level_with_shards_batched(self):
+    """Test kill client-side API logic."""
+    mock_options = self.setup_mock_options()
+    mock_options.batch_size = 2
+    mock_options.shards = [0, 1, 2, 3]
+    mock_config = Mock()
+    mock_config.hooks = []
+    mock_config.raw.return_value.enable_hooks.return_value.get.return_value = False
+    (mock_api, mock_scheduler_proxy) = self.create_mock_api()
+    mock_api.check_status.return_value = self.create_status_call_result()
+    mock_scheduler_proxy.getTasksStatus.return_value = self.create_status_call_result()
+    mock_scheduler_proxy.killTasks.return_value = self.get_kill_job_response()
+    with contextlib.nested(
+        patch('apache.aurora.client.factory.make_client', return_value=mock_api),
+        patch('apache.aurora.client.api.SchedulerProxy', return_value=mock_scheduler_proxy),
+        patch('apache.aurora.client.factory.CLUSTERS', new=self.TEST_CLUSTERS),
+        patch('twitter.common.app.get_options', return_value=mock_options),
+        patch('apache.aurora.client.commands.core.get_job_config', return_value=mock_config),
+        patch('apache.aurora.client.commands.core.JobMonitor')) as (
+            mock_api_factory_patch,
+            mock_scheduler_proxy_class,
+            mock_clusters,
+            options, mock_get_job_config,
+            mock_monitor):
+      with temporary_file() as fp:
+        fp.write(self.get_valid_config())
+        fp.flush()
+        kill(['west/mchucarroll/test/hello', fp.name], mock_options)
+
+      # Now check that the right API calls got made.
+      assert mock_scheduler_proxy.killTasks.call_count == 2
+      query = self.get_expected_task_query([2, 3])
+      mock_scheduler_proxy.killTasks.assert_called_with(query, None)
+
+  def test_kill_job_api_level_with_shards_batched_and_some_errors(self):
+    """Test kill client-side API logic."""
+    mock_options = self.setup_mock_options()
+    mock_options.batch_size = 2
+    mock_options.shards = [0, 1, 2, 3]
+    mock_config = Mock()
+    mock_config.hooks = []
+    mock_config.raw.return_value.enable_hooks.return_value.get.return_value = False
+    (mock_api, mock_scheduler_proxy) = self.create_mock_api()
+    mock_api.check_status.return_value = self.create_status_call_result()
+    mock_scheduler_proxy.getTasksStatus.return_value = self.create_status_call_result()
+    mock_api.kill_job.side_effect = [self.get_kill_job_error_response(), self.get_kill_job_response()]
+    with contextlib.nested(
+        patch('apache.aurora.client.factory.make_client', return_value=mock_api),
+        patch('apache.aurora.client.api.SchedulerProxy', return_value=mock_scheduler_proxy),
+        patch('apache.aurora.client.factory.CLUSTERS', new=self.TEST_CLUSTERS),
+        patch('twitter.common.app.get_options', return_value=mock_options),
+        patch('apache.aurora.client.commands.core.get_job_config', return_value=mock_config)) as (
+            mock_api_factory_patch,
+            mock_scheduler_proxy_class,
+            mock_clusters,
+            options, mock_get_job_config):
+      with temporary_file() as fp:
+        fp.write(self.get_valid_config())
+        fp.flush()
+        # We should get an exception in this case, because the one of the two calls fails.
+        self.assertRaises(SystemExit, kill, ['west/mchucarroll/test/hello', fp.name], mock_options)
+
+      # killTasks should still have gotten called twice - the first error shouldn't abort
+      # the second batch.
+      assert mock_scheduler_proxy.killTasks.call_count == 2
+      query = self.get_expected_task_query([2, 3])
+      mock_scheduler_proxy.killTasks.assert_called_with(query, None)