You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aurora.apache.org by mc...@apache.org on 2014/05/13 21:38:55 UTC
git commit: Make the "task run" command accept an instances spec.
Repository: incubator-aurora
Updated Branches:
refs/heads/master d931a314d -> 653d581e2
Make the "task run" command accept an instances spec.
Also, add a subclass of DistributedCommandRunner that takes an
instance list, and runs a command only on the specific task instances
in that list.
Bugs closed: aurora-198
Reviewed at https://reviews.apache.org/r/21170/
Project: http://git-wip-us.apache.org/repos/asf/incubator-aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-aurora/commit/653d581e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-aurora/tree/653d581e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-aurora/diff/653d581e
Branch: refs/heads/master
Commit: 653d581e2c62e379b01ba4c5a74083c9f1be5b82
Parents: d931a31
Author: Mark Chu-Carroll <mc...@twopensource.com>
Authored: Tue May 13 15:34:19 2014 -0400
Committer: Mark Chu-Carroll <mc...@twitter.com>
Committed: Tue May 13 15:34:19 2014 -0400
----------------------------------------------------------------------
.../apache/aurora/client/api/command_runner.py | 23 ++++++++++++++++++
.../python/apache/aurora/client/cli/options.py | 2 +-
.../python/apache/aurora/client/cli/task.py | 11 +++++----
.../apache/aurora/client/cli/test_task_run.py | 25 ++++++++++++++++----
4 files changed, 50 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/653d581e/src/main/python/apache/aurora/client/api/command_runner.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/aurora/client/api/command_runner.py b/src/main/python/apache/aurora/client/api/command_runner.py
index 14f8e45..c1459c4 100644
--- a/src/main/python/apache/aurora/client/api/command_runner.py
+++ b/src/main/python/apache/aurora/client/api/command_runner.py
@@ -130,3 +130,26 @@ class DistributedCommandRunner(object):
threadpool = ThreadPool(processes=parallelism)
for result in threadpool.imap_unordered(self.execute, self.process_arguments(command, **kw)):
print result
+
+class InstanceDistributedCommandRunner(DistributedCommandRunner):
+ """A distributed command runner that only runs on specified instances of a job."""
+
+ @classmethod
+ def query_from(cls, role, env, job, instances=None):
+ return TaskQuery(statuses=LIVE_STATES, owner=Identity(role), jobName=job, environment=env, instanceIds=instances)
+
+ def __init__(self, cluster, role, env, job, ssh_user=None, instances=None):
+ super(InstanceDistributedCommandRunner, self).__init__(cluster, role, env, [job], ssh_user)
+ self._job = job
+ self._ssh_user = ssh_user if ssh_user else self._role
+ self.instances = instances
+
+ def resolve(self):
+ resp = self._api.query(self.query_from(self._role, self._env, self._job, self.instances))
+ if resp.responseCode == ResponseCode.OK:
+ for task in resp.result.scheduleStatusResult.tasks:
+ yield task
+ else:
+ print_aurora_log(logging.ERROR,
+ "Error: could not retrieve task information for run command: %s" % resp.message)
+ raise ValueError("Could not retrieve task information: %s" % resp.message)
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/653d581e/src/main/python/apache/aurora/client/cli/options.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/aurora/client/cli/options.py b/src/main/python/apache/aurora/client/cli/options.py
index bf86c3a..84b4d7b 100644
--- a/src/main/python/apache/aurora/client/cli/options.py
+++ b/src/main/python/apache/aurora/client/cli/options.py
@@ -103,7 +103,7 @@ def parse_qualified_role(rolestr):
return role_parts
-ALL_INSTANCES = 'all'
+ALL_INSTANCES = None
def parse_instances(instances):
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/653d581e/src/main/python/apache/aurora/client/cli/task.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/aurora/client/cli/task.py b/src/main/python/apache/aurora/client/cli/task.py
index 1253ece..1bd746d 100644
--- a/src/main/python/apache/aurora/client/cli/task.py
+++ b/src/main/python/apache/aurora/client/cli/task.py
@@ -31,7 +31,7 @@ from pystachio.config import Config
from thrift.protocol import TJSONProtocol
from thrift.TSerialization import serialize
-from apache.aurora.client.api.command_runner import DistributedCommandRunner
+from apache.aurora.client.api.command_runner import DistributedCommandRunner, InstanceDistributedCommandRunner
from apache.aurora.client.api.updater_util import UpdaterConfig
from apache.aurora.client.cli import (
EXIT_COMMAND_FAILURE,
@@ -51,6 +51,7 @@ from apache.aurora.client.cli.options import (
EXECUTOR_SANDBOX_OPTION,
FORCE_OPTION,
HEALTHCHECK_OPTION,
+ INSTANCES_SPEC_ARGUMENT,
JOBSPEC_ARGUMENT,
JSON_READ_OPTION,
JSON_WRITE_OPTION,
@@ -74,7 +75,7 @@ class RunCommand(Verb):
def help(self):
return """Usage: aurora task run cluster/role/env/job cmd
- Runs a shell command on all machines currently hosting instances of a single job.
+ Runs a shell command on machines currently hosting instances of a single job.
This feature supports the same command line wildcards that are used to
populate a job's commands.
@@ -88,15 +89,15 @@ class RunCommand(Verb):
help='Number of threads to use'),
SSH_USER_OPTION,
EXECUTOR_SANDBOX_OPTION,
- JOBSPEC_ARGUMENT,
+ INSTANCES_SPEC_ARGUMENT,
CommandOption('cmd', type=str)
]
def execute(self, context):
# TODO(mchucarroll): add options to specify which instances to run on (AURORA-198)
- cluster_name, role, env, name = context.options.jobspec
+ (cluster_name, role, env, name), instances = context.options.instance_spec
cluster = CLUSTERS[cluster_name]
- dcr = DistributedCommandRunner(cluster, role, env, [name], context.options.ssh_user)
+ dcr = InstanceDistributedCommandRunner(cluster, role, env, name, context.options.ssh_user, instances)
dcr.run(context.options.cmd, parallelism=context.options.num_threads,
executor_sandbox=context.options.executor_sandbox)
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/653d581e/src/test/python/apache/aurora/client/cli/test_task_run.py
----------------------------------------------------------------------
diff --git a/src/test/python/apache/aurora/client/cli/test_task_run.py b/src/test/python/apache/aurora/client/cli/test_task_run.py
index c60189c..9c6bcf8 100644
--- a/src/test/python/apache/aurora/client/cli/test_task_run.py
+++ b/src/test/python/apache/aurora/client/cli/test_task_run.py
@@ -88,6 +88,20 @@ class TestRunCommand(AuroraClientCommandTest):
def test_successful_run(self):
"""Test the run command."""
+ self.generic_test_successful_run(['task', 'run', 'west/bozo/test/hello', 'ls'], None)
+
+ def test_successful_run_with_instances(self):
+ """Test the run command."""
+ self.generic_test_successful_run(['task', 'run', 'west/bozo/test/hello/1-3', 'ls'], [1, 2, 3])
+
+ def generic_test_successful_run(self, cmd_args, instances):
+ """Common structure of all successful run tests.
+ Params:
+ cmd_args: the arguments to pass to the aurora command line to run this test.
+ instances: the list of instances that should be passed to a status query.
+ (The status query is the only visible difference between a sharded
+ run, and an all-instances run in the test.)
+ """
# Calls api.check_status, which calls scheduler_proxy.getJobs
(mock_api, mock_scheduler_proxy) = self.create_mock_api()
mock_scheduler_proxy.getTasksStatus.return_value = self.create_status_response()
@@ -96,7 +110,7 @@ class TestRunCommand(AuroraClientCommandTest):
patch('apache.aurora.client.api.SchedulerProxy', return_value=mock_scheduler_proxy),
patch('apache.aurora.client.factory.CLUSTERS', new=self.TEST_CLUSTERS),
patch('apache.aurora.client.cli.task.CLUSTERS', new=self.TEST_CLUSTERS),
- patch('apache.aurora.client.api.command_runner.DistributedCommandRunner.sandbox_args',
+ patch('apache.aurora.client.api.command_runner.InstanceDistributedCommandRunner.sandbox_args',
return_value=sandbox_args),
patch('subprocess.Popen', return_value=self.create_mock_process())) as (
mock_scheduler_proxy_class,
@@ -105,13 +119,15 @@ class TestRunCommand(AuroraClientCommandTest):
mock_runner_args_patch,
mock_subprocess):
cmd = AuroraCommandLine()
- cmd.execute(['task', 'run', 'west/bozo/test/hello', 'ls'])
+ cmd.execute(cmd_args)
# The status command sends a getTasksStatus query to the scheduler,
- # and then prints the result.
+ # and then prints the result. The use of shards, above, should change
+ # this query - that's the focus of the instances test.
mock_scheduler_proxy.getTasksStatus.assert_called_with(TaskQuery(jobName='hello',
environment='test', owner=Identity(role='bozo'),
statuses=set([ScheduleStatus.RUNNING, ScheduleStatus.KILLING, ScheduleStatus.RESTARTING,
- ScheduleStatus.PREEMPTING, ScheduleStatus.DRAINING])))
+ ScheduleStatus.PREEMPTING, ScheduleStatus.DRAINING]),
+ instanceIds=instances))
# The mock status call returns 3 three ScheduledTasks, so three commands should have been run
assert mock_subprocess.call_count == 3
@@ -120,7 +136,6 @@ class TestRunCommand(AuroraClientCommandTest):
'slaverun/sandbox;ls'],
stderr=-2, stdout=-1)
-
class TestSshCommand(AuroraClientCommandTest):
@classmethod
def create_mock_scheduled_tasks(cls):