You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aurora.apache.org by mc...@apache.org on 2014/05/13 21:38:55 UTC

git commit: Make the "task run" command accept an instances spec.

Repository: incubator-aurora
Updated Branches:
  refs/heads/master d931a314d -> 653d581e2


Make the "task run" command accept an instances spec.

Also, add a subclass of DistributedCommandRunner that takes an
instance list, and runs a command only on the specific task instances
in that list.

Bugs closed: aurora-198

Reviewed at https://reviews.apache.org/r/21170/


Project: http://git-wip-us.apache.org/repos/asf/incubator-aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-aurora/commit/653d581e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-aurora/tree/653d581e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-aurora/diff/653d581e

Branch: refs/heads/master
Commit: 653d581e2c62e379b01ba4c5a74083c9f1be5b82
Parents: d931a31
Author: Mark Chu-Carroll <mc...@twopensource.com>
Authored: Tue May 13 15:34:19 2014 -0400
Committer: Mark Chu-Carroll <mc...@twitter.com>
Committed: Tue May 13 15:34:19 2014 -0400

----------------------------------------------------------------------
 .../apache/aurora/client/api/command_runner.py  | 23 ++++++++++++++++++
 .../python/apache/aurora/client/cli/options.py  |  2 +-
 .../python/apache/aurora/client/cli/task.py     | 11 +++++----
 .../apache/aurora/client/cli/test_task_run.py   | 25 ++++++++++++++++----
 4 files changed, 50 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/653d581e/src/main/python/apache/aurora/client/api/command_runner.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/aurora/client/api/command_runner.py b/src/main/python/apache/aurora/client/api/command_runner.py
index 14f8e45..c1459c4 100644
--- a/src/main/python/apache/aurora/client/api/command_runner.py
+++ b/src/main/python/apache/aurora/client/api/command_runner.py
@@ -130,3 +130,26 @@ class DistributedCommandRunner(object):
     threadpool = ThreadPool(processes=parallelism)
     for result in threadpool.imap_unordered(self.execute, self.process_arguments(command, **kw)):
       print result
+
+class InstanceDistributedCommandRunner(DistributedCommandRunner):
+  """A distributed command runner that only runs on specified instances of a job."""
+
+  @classmethod
+  def query_from(cls, role, env, job, instances=None):
+    return TaskQuery(statuses=LIVE_STATES, owner=Identity(role), jobName=job, environment=env, instanceIds=instances)
+
+  def __init__(self, cluster, role, env, job, ssh_user=None, instances=None):
+    super(InstanceDistributedCommandRunner, self).__init__(cluster, role, env, [job], ssh_user)
+    self._job = job
+    self._ssh_user = ssh_user if ssh_user else self._role
+    self.instances = instances
+
+  def resolve(self):
+    resp = self._api.query(self.query_from(self._role, self._env, self._job, self.instances))
+    if resp.responseCode == ResponseCode.OK:
+      for task in resp.result.scheduleStatusResult.tasks:
+        yield task
+    else:
+      print_aurora_log(logging.ERROR,
+          "Error: could not retrieve task information for run command: %s" % resp.message)
+      raise ValueError("Could not retrieve task information: %s" % resp.message)

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/653d581e/src/main/python/apache/aurora/client/cli/options.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/aurora/client/cli/options.py b/src/main/python/apache/aurora/client/cli/options.py
index bf86c3a..84b4d7b 100644
--- a/src/main/python/apache/aurora/client/cli/options.py
+++ b/src/main/python/apache/aurora/client/cli/options.py
@@ -103,7 +103,7 @@ def parse_qualified_role(rolestr):
   return role_parts
 
 
-ALL_INSTANCES = 'all'
+ALL_INSTANCES = None
 
 
 def parse_instances(instances):

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/653d581e/src/main/python/apache/aurora/client/cli/task.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/aurora/client/cli/task.py b/src/main/python/apache/aurora/client/cli/task.py
index 1253ece..1bd746d 100644
--- a/src/main/python/apache/aurora/client/cli/task.py
+++ b/src/main/python/apache/aurora/client/cli/task.py
@@ -31,7 +31,7 @@ from pystachio.config import Config
 from thrift.protocol import TJSONProtocol
 from thrift.TSerialization import serialize
 
-from apache.aurora.client.api.command_runner import DistributedCommandRunner
+from apache.aurora.client.api.command_runner import DistributedCommandRunner, InstanceDistributedCommandRunner
 from apache.aurora.client.api.updater_util import UpdaterConfig
 from apache.aurora.client.cli import (
     EXIT_COMMAND_FAILURE,
@@ -51,6 +51,7 @@ from apache.aurora.client.cli.options import (
     EXECUTOR_SANDBOX_OPTION,
     FORCE_OPTION,
     HEALTHCHECK_OPTION,
+    INSTANCES_SPEC_ARGUMENT,
     JOBSPEC_ARGUMENT,
     JSON_READ_OPTION,
     JSON_WRITE_OPTION,
@@ -74,7 +75,7 @@ class RunCommand(Verb):
   def help(self):
     return """Usage: aurora task run cluster/role/env/job cmd
 
-  Runs a shell command on all machines currently hosting instances of a single job.
+  Runs a shell command on machines currently hosting instances of a single job.
 
   This feature supports the same command line wildcards that are used to
   populate a job's commands.
@@ -88,15 +89,15 @@ class RunCommand(Verb):
             help='Number of threads to use'),
         SSH_USER_OPTION,
         EXECUTOR_SANDBOX_OPTION,
-        JOBSPEC_ARGUMENT,
+        INSTANCES_SPEC_ARGUMENT,
         CommandOption('cmd', type=str)
     ]
 
   def execute(self, context):
     # TODO(mchucarroll): add options to specify which instances to run on (AURORA-198)
-    cluster_name, role, env, name = context.options.jobspec
+    (cluster_name, role, env, name), instances = context.options.instance_spec
     cluster = CLUSTERS[cluster_name]
-    dcr = DistributedCommandRunner(cluster, role, env, [name], context.options.ssh_user)
+    dcr = InstanceDistributedCommandRunner(cluster, role, env, name, context.options.ssh_user, instances)
     dcr.run(context.options.cmd, parallelism=context.options.num_threads,
         executor_sandbox=context.options.executor_sandbox)
 

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/653d581e/src/test/python/apache/aurora/client/cli/test_task_run.py
----------------------------------------------------------------------
diff --git a/src/test/python/apache/aurora/client/cli/test_task_run.py b/src/test/python/apache/aurora/client/cli/test_task_run.py
index c60189c..9c6bcf8 100644
--- a/src/test/python/apache/aurora/client/cli/test_task_run.py
+++ b/src/test/python/apache/aurora/client/cli/test_task_run.py
@@ -88,6 +88,20 @@ class TestRunCommand(AuroraClientCommandTest):
 
   def test_successful_run(self):
     """Test the run command."""
+    self.generic_test_successful_run(['task', 'run', 'west/bozo/test/hello', 'ls'], None)
+
+  def test_successful_run_with_instances(self):
+    """Test the run command."""
+    self.generic_test_successful_run(['task', 'run', 'west/bozo/test/hello/1-3', 'ls'], [1, 2, 3])
+
+  def generic_test_successful_run(self, cmd_args, instances):
+    """Common structure of all successful run tests.
+    Params:
+      cmd_args: the arguments to pass to the aurora command line to run this test.
+      instances: the list of instances that should be passed to a status query.
+         (The status query is the only visible difference between a sharded
+         run, and an all-instances run in the test.)
+    """
     # Calls api.check_status, which calls scheduler_proxy.getJobs
     (mock_api, mock_scheduler_proxy) = self.create_mock_api()
     mock_scheduler_proxy.getTasksStatus.return_value = self.create_status_response()
@@ -96,7 +110,7 @@ class TestRunCommand(AuroraClientCommandTest):
         patch('apache.aurora.client.api.SchedulerProxy', return_value=mock_scheduler_proxy),
         patch('apache.aurora.client.factory.CLUSTERS', new=self.TEST_CLUSTERS),
         patch('apache.aurora.client.cli.task.CLUSTERS', new=self.TEST_CLUSTERS),
-        patch('apache.aurora.client.api.command_runner.DistributedCommandRunner.sandbox_args',
+        patch('apache.aurora.client.api.command_runner.InstanceDistributedCommandRunner.sandbox_args',
             return_value=sandbox_args),
         patch('subprocess.Popen', return_value=self.create_mock_process())) as (
             mock_scheduler_proxy_class,
@@ -105,13 +119,15 @@ class TestRunCommand(AuroraClientCommandTest):
             mock_runner_args_patch,
             mock_subprocess):
       cmd = AuroraCommandLine()
-      cmd.execute(['task', 'run', 'west/bozo/test/hello', 'ls'])
+      cmd.execute(cmd_args)
       # The status command sends a getTasksStatus query to the scheduler,
-      # and then prints the result.
+      # and then prints the result. The use of shards, above, should change
+      # this query - that's the focus of the instances test.
       mock_scheduler_proxy.getTasksStatus.assert_called_with(TaskQuery(jobName='hello',
           environment='test', owner=Identity(role='bozo'),
           statuses=set([ScheduleStatus.RUNNING, ScheduleStatus.KILLING, ScheduleStatus.RESTARTING,
-              ScheduleStatus.PREEMPTING, ScheduleStatus.DRAINING])))
+              ScheduleStatus.PREEMPTING, ScheduleStatus.DRAINING]),
+          instanceIds=instances))
 
       # The mock status call returns 3 three ScheduledTasks, so three commands should have been run
       assert mock_subprocess.call_count == 3
@@ -120,7 +136,6 @@ class TestRunCommand(AuroraClientCommandTest):
           'slaverun/sandbox;ls'],
           stderr=-2, stdout=-1)
 
-
 class TestSshCommand(AuroraClientCommandTest):
   @classmethod
   def create_mock_scheduled_tasks(cls):