You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aurora.apache.org by ma...@apache.org on 2014/05/02 19:17:49 UTC
git commit: Moving kill wait to the client (Part 1: client changes)
Repository: incubator-aurora
Updated Branches:
refs/heads/master 639c4634f -> 464d6ea56
Moving kill wait to the client (Part 1: client changes)
Bugs closed: AURORA-370
Reviewed at https://reviews.apache.org/r/20950/
Project: http://git-wip-us.apache.org/repos/asf/incubator-aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-aurora/commit/464d6ea5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-aurora/tree/464d6ea5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-aurora/diff/464d6ea5
Branch: refs/heads/master
Commit: 464d6ea56b5fd678f959982be8929cfe6ac16b12
Parents: 639c463
Author: Maxim Khutornenko <ma...@apache.org>
Authored: Fri May 2 10:06:28 2014 -0700
Committer: Maxim Khutornenko <ma...@apache.org>
Committed: Fri May 2 10:06:28 2014 -0700
----------------------------------------------------------------------
src/main/python/apache/aurora/client/api/BUILD | 2 +
.../apache/aurora/client/api/job_monitor.py | 65 +++++----
.../python/apache/aurora/client/api/updater.py | 9 +-
.../python/apache/aurora/client/cli/jobs.py | 35 +++--
.../python/apache/aurora/client/cli/task.py | 1 -
.../apache/aurora/client/commands/core.py | 15 ++-
src/test/python/apache/aurora/client/api/BUILD | 3 +-
.../aurora/client/api/test_job_monitor.py | 93 ++++++++++---
.../apache/aurora/client/api/test_updater.py | 35 ++++-
.../aurora/client/cli/test_command_hooks.py | 2 +-
.../apache/aurora/client/cli/test_create.py | 23 ++--
.../apache/aurora/client/cli/test_kill.py | 98 +++++++++++---
.../apache/aurora/client/cli/test_plugins.py | 4 +-
.../apache/aurora/client/cli/test_update.py | 12 +-
.../python/apache/aurora/client/cli/util.py | 34 +++--
.../aurora/client/commands/test_create.py | 31 ++---
.../apache/aurora/client/commands/test_kill.py | 132 ++++++++++++++++---
.../aurora/client/commands/test_update.py | 14 +-
18 files changed, 463 insertions(+), 145 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/464d6ea5/src/main/python/apache/aurora/client/api/BUILD
----------------------------------------------------------------------
diff --git a/src/main/python/apache/aurora/client/api/BUILD b/src/main/python/apache/aurora/client/api/BUILD
index 32097d2..0412bb5 100644
--- a/src/main/python/apache/aurora/client/api/BUILD
+++ b/src/main/python/apache/aurora/client/api/BUILD
@@ -57,6 +57,7 @@ python_library(
name = 'job_monitor',
sources = ['job_monitor.py'],
dependencies = [
+ pants('3rdparty/python:twitter.common.log'),
pants('3rdparty/python:twitter.common.quantity'),
pants('src/main/thrift/org/apache/aurora/gen:py-thrift'),
]
@@ -127,6 +128,7 @@ python_library(
dependencies = [
pants(':scheduler_client'),
pants(':instance_watcher'),
+ pants(':job_monitor'),
pants(':updater_util'),
pants(':quota_check'),
pants('3rdparty/python:twitter.common.log'),
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/464d6ea5/src/main/python/apache/aurora/client/api/job_monitor.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/aurora/client/api/job_monitor.py b/src/main/python/apache/aurora/client/api/job_monitor.py
index b694ef6..a1a39e8 100644
--- a/src/main/python/apache/aurora/client/api/job_monitor.py
+++ b/src/main/python/apache/aurora/client/api/job_monitor.py
@@ -26,12 +26,13 @@ from gen.apache.aurora.api.ttypes import (
)
from thrift.transport import TTransport
+from twitter.common import log
from twitter.common.quantity import Amount, Time
class JobMonitor(object):
- MIN_POLL_INTERVAL = Amount(10, Time.SECONDS)
- MAX_POLL_INTERVAL = Amount(2, Time.MINUTES)
+ MIN_POLL_INTERVAL = Amount(2, Time.SECONDS)
+ MAX_POLL_INTERVAL = Amount(150, Time.SECONDS)
@classmethod
def running_or_finished(cls, status):
@@ -41,38 +42,58 @@ class JobMonitor(object):
def terminal(cls, status):
return status in TERMINAL_STATES
- # TODO(ksweeney): Make this use the AuroraJobKey
- def __init__(self, client, role, env, jobname):
- self._client = client
- self._query = TaskQuery(owner=Identity(role=role), environment=env, jobName=jobname)
- self._initial_tasks = set()
- self._initial_tasks = set(task.assignedTask.taskId for task in self.iter_query())
+ def __init__(self, scheduler, job_key, clock=time,
+ min_poll_interval=MIN_POLL_INTERVAL, max_poll_interval=MAX_POLL_INTERVAL):
+ self._scheduler = scheduler
+ self._job_key = job_key
+ self._clock = clock
+ self._min_poll_interval = min_poll_interval
+ self._max_poll_interval = max_poll_interval
- def iter_query(self):
+ def iter_query(self, query):
try:
- res = self._client.scheduler_proxy.getTasksStatus(self._query)
+ res = self._scheduler.getTasksStatus(query)
except TTransport.TTransportException as e:
- print('Failed to query slaves from scheduler: %s' % e)
+ log.error('Failed to query tasks from scheduler: %s' % e)
return
if res is None or res.result is None:
return
for task in res.result.scheduleStatusResult.tasks:
- if task.assignedTask.taskId not in self._initial_tasks:
- yield task
+ yield task
- def states(self):
+ def states(self, query):
states = {}
- for task in self.iter_query():
+ for task in self.iter_query(query):
status, instance_id = task.status, task.assignedTask.instanceId
first_timestamp = task.taskEvents[0].timestamp
if instance_id not in states or first_timestamp > states[instance_id][0]:
states[instance_id] = (first_timestamp, status)
return dict((instance_id, status[1]) for (instance_id, status) in states.items())
- def wait_until(self, predicate):
- """Given a predicate (from ScheduleStatus => Boolean), return once all tasks
- return true for that predicate."""
- poll_interval = self.MIN_POLL_INTERVAL
- while not all(predicate(state) for state in self.states().values()):
- time.sleep(poll_interval.as_(Time.SECONDS))
- poll_interval = min(self.MAX_POLL_INTERVAL, 2 * poll_interval)
+ def create_query(self, instances=None):
+ return TaskQuery(
+ owner=Identity(role=self._job_key.role),
+ environment=self._job_key.env,
+ jobName=self._job_key.name,
+ instanceIds=frozenset([int(s) for s in instances]) if instances else None)
+
+ def wait_until(self, predicate, instances=None, with_timeout=False):
+ """Given a predicate (from ScheduleStatus => Boolean), wait until all requested instances
+ return true for that predicate OR the timeout expires (if with_timeout=True)
+
+ Arguments:
+ predicate -- predicate to check completion with.
+ instances -- optional subset of job instances to wait for.
+ with_timeout -- if set, caps waiting time to the MAX_POLL_INTERVAL.
+
+ Returns: True if predicate is met or False if timeout has expired.
+ """
+ poll_interval = self._min_poll_interval
+ while not all(predicate(state) for state in self.states(self.create_query(instances)).values()):
+ if with_timeout and poll_interval >= self._max_poll_interval:
+ return False
+
+ self._clock.sleep(poll_interval.as_(Time.SECONDS))
+ poll_interval = min(self._max_poll_interval, 2 * poll_interval)
+
+ return True
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/464d6ea5/src/main/python/apache/aurora/client/api/updater.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/aurora/client/api/updater.py b/src/main/python/apache/aurora/client/api/updater.py
index 0acf450..2f55d5a 100644
--- a/src/main/python/apache/aurora/client/api/updater.py
+++ b/src/main/python/apache/aurora/client/api/updater.py
@@ -32,6 +32,7 @@ from gen.apache.aurora.api.ttypes import (
)
from .instance_watcher import InstanceWatcher
+from .job_monitor import JobMonitor
from .quota_check import CapacityRequest, QuotaCheck
from .scheduler_client import SchedulerProxy
from .updater_util import FailureThreshold, UpdaterConfig
@@ -57,12 +58,14 @@ class Updater(object):
health_check_interval_seconds,
scheduler=None,
instance_watcher=None,
- quota_check=None):
+ quota_check=None,
+ job_monitor=None):
self._config = config
self._job_key = JobKey(role=config.role(), environment=config.environment(), name=config.name())
self._health_check_interval_seconds = health_check_interval_seconds
self._scheduler = scheduler or SchedulerProxy(config.cluster())
self._quota_check = quota_check or QuotaCheck(self._scheduler)
+ self._job_monitor = job_monitor or JobMonitor(self._scheduler, self._config.job_key())
try:
self._update_config = UpdaterConfig(**config.update_config().get())
except ValueError as e:
@@ -263,7 +266,6 @@ class Updater(object):
if unchanged:
log.info('Skipping unchanged instances: %s' % unchanged)
- # Kill is a blocking call in scheduler -> no need to watch it yet.
self._kill_instances(to_kill)
self._add_instances(to_add, operation_configs.to_config)
return to_add
@@ -278,6 +280,9 @@ class Updater(object):
log.info('Killing instances: %s' % instance_ids)
query = self._create_task_query(instanceIds=frozenset(int(s) for s in instance_ids))
self._check_and_log_response(self._scheduler.killTasks(query, self._lock))
+ res = self._job_monitor.wait_until(JobMonitor.terminal, instance_ids, with_timeout=True)
+ if not res:
+ raise self.Error('Tasks were not killed in time.')
log.info('Instances killed')
def _add_instances(self, instance_ids, to_config):
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/464d6ea5/src/main/python/apache/aurora/client/cli/jobs.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/aurora/client/cli/jobs.py b/src/main/python/apache/aurora/client/cli/jobs.py
index 782b348..cf45640 100644
--- a/src/main/python/apache/aurora/client/cli/jobs.py
+++ b/src/main/python/apache/aurora/client/cli/jobs.py
@@ -16,6 +16,7 @@
from __future__ import print_function
from datetime import datetime
+import logging
import json
import os
import pprint
@@ -31,6 +32,7 @@ from apache.aurora.client.cli import (
EXIT_INVALID_CONFIGURATION,
EXIT_INVALID_PARAMETER,
EXIT_OK,
+ EXIT_TIMEOUT,
Noun,
Verb,
)
@@ -117,7 +119,6 @@ class CreateJobCommand(Verb):
def execute(self, context):
config = context.get_job_config(context.options.jobspec, context.options.config_file)
api = context.get_api(config.cluster())
- monitor = JobMonitor(api, config.role(), config.environment(), config.name())
resp = api.create_job(config)
if resp.responseCode == ResponseCode.INVALID_REQUEST:
raise context.CommandError(EXIT_INVALID_PARAMETER, 'Job not found')
@@ -126,9 +127,9 @@ class CreateJobCommand(Verb):
if context.options.open_browser:
context.open_job_page(api, config)
if context.options.wait_until == 'RUNNING':
- monitor.wait_until(monitor.running_or_finished)
+ JobMonitor(api.scheduler_proxy, config.job_key()).wait_until(JobMonitor.running_or_finished)
elif context.options.wait_until == 'FINISHED':
- monitor.wait_until(monitor.terminal)
+ JobMonitor(api.scheduler_proxy, config.job_key()).wait_until(JobMonitor.terminal)
return EXIT_OK
@@ -285,6 +286,13 @@ class AbstractKillCommand(Verb):
MAX_TOTAL_FAILURES_OPTION,
NO_BATCHING_OPTION]
+ def wait_kill_tasks(self, context, scheduler, job_key, instances=None):
+ monitor = JobMonitor(scheduler, job_key)
+ if not monitor.wait_until(JobMonitor.terminal, instances=instances, with_timeout=True):
+ context.print_err('Tasks were not killed in time.')
+ return EXIT_TIMEOUT
+ return EXIT_OK
+
def kill_in_batches(self, context, job, instances_arg):
api = context.get_api(job.cluster)
# query the job, to get the list of active instances.
@@ -303,8 +311,11 @@ class AbstractKillCommand(Verb):
for i in range(min(context.options.batch_size, len(instances_to_kill))):
batch.append(instances_to_kill.pop())
resp = api.kill_job(job, batch)
- if resp.responseCode is not ResponseCode.OK:
- context.print_log('Kill of shards %s failed with error %s' % (batch, resp.message))
+ if resp.responseCode is not ResponseCode.OK or self.wait_kill_tasks(
+ context, api.scheduler_proxy, job, batch) is not EXIT_OK:
+
+ context.print_log(logging.INFO,
+ 'Kill of shards %s failed with error %s' % (batch, resp.message))
errors += 1
if errors > context.options.max_total_failures:
raise context.CommandError(EXIT_COMMAND_FAILURE,
@@ -336,9 +347,10 @@ class KillCommand(AbstractKillCommand):
api = context.get_api(job.cluster)
if context.options.no_batching:
resp = api.kill_job(job, instances_arg)
- if resp.responseCode != ResponseCode.OK:
- context.print_err('Job %s not found' % job, file=sys.stderr)
- return EXIT_INVALID_PARAMETER
+ context.check_and_log_response(resp)
+ wait_result = self.wait_kill_tasks(context, api.scheduler_proxy, job, instances_arg)
+ if wait_result is not EXIT_OK:
+ return wait_result
else:
self.kill_in_batches(context, job, instances_arg)
if context.options.open_browser:
@@ -363,9 +375,10 @@ class KillAllJobCommand(AbstractKillCommand):
api = context.get_api(job.cluster)
if context.options.no_batching:
resp = api.kill_job(job, None)
- if resp.responseCode != ResponseCode.OK:
- context.print_err('Job %s not found' % job, file=sys.stderr)
- return EXIT_INVALID_PARAMETER
+ context.check_and_log_response(resp)
+ wait_result = self.wait_kill_tasks(context, api.scheduler_proxy, job)
+ if wait_result is not EXIT_OK:
+ return wait_result
else:
self.kill_in_batches(context, job, None)
if context.options.open_browser:
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/464d6ea5/src/main/python/apache/aurora/client/cli/task.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/aurora/client/cli/task.py b/src/main/python/apache/aurora/client/cli/task.py
index 62747ed..a162b86 100644
--- a/src/main/python/apache/aurora/client/cli/task.py
+++ b/src/main/python/apache/aurora/client/cli/task.py
@@ -27,7 +27,6 @@ import sys
import time
from apache.aurora.client.api.command_runner import DistributedCommandRunner
-from apache.aurora.client.api.job_monitor import JobMonitor
from apache.aurora.client.api.updater_util import UpdaterConfig
from apache.aurora.client.cli import (
EXIT_COMMAND_FAILURE,
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/464d6ea5/src/main/python/apache/aurora/client/commands/core.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/aurora/client/commands/core.py b/src/main/python/apache/aurora/client/commands/core.py
index 39190e0..089a2c6 100644
--- a/src/main/python/apache/aurora/client/commands/core.py
+++ b/src/main/python/apache/aurora/client/commands/core.py
@@ -94,6 +94,13 @@ def get_job_config(job_spec, config_file, options):
select_role=select_role,
select_env=select_env)
+
+def wait_kill_tasks(scheduler, job_key, instances=None):
+ monitor = JobMonitor(scheduler, job_key)
+ if not monitor.wait_until(monitor.terminal, instances=instances, with_timeout=True):
+ die('Tasks were not killed in time.')
+
+
@app.command
def version(args):
"""usage: version
@@ -142,15 +149,14 @@ def create(job_spec, config_file):
print("Error: %s" % v)
sys.exit(1)
api = make_client(config.cluster())
- monitor = JobMonitor(api, config.role(), config.environment(), config.name())
resp = api.create_job(config)
check_and_log_response(resp)
handle_open(api.scheduler_proxy.scheduler_client().url, config.role(), config.environment(),
config.name())
if options.wait_until == 'RUNNING':
- monitor.wait_until(monitor.running_or_finished)
+ JobMonitor(api.scheduler_proxy, config.job_key()).wait_until(JobMonitor.running_or_finished)
elif options.wait_until == 'FINISHED':
- monitor.wait_until(monitor.terminal)
+ JobMonitor(api.scheduler_proxy, config.job_key()).wait_until(JobMonitor.terminal)
@app.command
@@ -407,6 +413,8 @@ def kill(args, options):
resp = api.kill_job(job_key, options.shards, config=config)
check_and_log_response(resp)
handle_open(api.scheduler_proxy.scheduler_client().url, job_key.role, job_key.env, job_key.name)
+ wait_kill_tasks(api.scheduler_proxy, job_key, options.shards)
+
@app.command
@app.command_option(CLUSTER_INVOKE_OPTION)
@@ -424,6 +432,7 @@ def killall(args, options):
resp = api.kill_job(job_key, None, config=config)
check_and_log_response(resp)
handle_open(api.scheduler_proxy.scheduler_client().url, job_key.role, job_key.env, job_key.name)
+ wait_kill_tasks(api.scheduler_proxy, job_key)
@app.command
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/464d6ea5/src/test/python/apache/aurora/client/api/BUILD
----------------------------------------------------------------------
diff --git a/src/test/python/apache/aurora/client/api/BUILD b/src/test/python/apache/aurora/client/api/BUILD
index dd9b797..f0fa58f 100644
--- a/src/test/python/apache/aurora/client/api/BUILD
+++ b/src/test/python/apache/aurora/client/api/BUILD
@@ -40,7 +40,7 @@ python_tests(
python_tests(name = 'job_monitor',
sources = ['test_job_monitor.py'],
dependencies = [
- pants('3rdparty/python:mox'),
+ pants('3rdparty/python:mock'),
pants('src/main/python/apache/aurora/client/api:api'),
pants('src/main/python/apache/aurora/client/api:job_monitor'),
pants('src/main/thrift/org/apache/aurora/gen:py-thrift'),
@@ -99,6 +99,7 @@ python_tests(name = 'updater',
sources = ['test_updater.py'],
dependencies = [
pants('3rdparty/python:mox'),
+ pants('src/main/python/apache/aurora/common:aurora_job_key'),
pants('src/main/python/apache/aurora/client/api:updater'),
pants('src/main/thrift/org/apache/aurora/gen:py-thrift'),
pants('src/test/python/apache/aurora/client:fake_scheduler_proxy'),
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/464d6ea5/src/test/python/apache/aurora/client/api/test_job_monitor.py
----------------------------------------------------------------------
diff --git a/src/test/python/apache/aurora/client/api/test_job_monitor.py b/src/test/python/apache/aurora/client/api/test_job_monitor.py
index 32609e4..bfb7fed 100644
--- a/src/test/python/apache/aurora/client/api/test_job_monitor.py
+++ b/src/test/python/apache/aurora/client/api/test_job_monitor.py
@@ -13,42 +13,103 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
+import unittest
from gen.apache.aurora.api.AuroraSchedulerManager import Client
from gen.apache.aurora.api.ttypes import (
+ AssignedTask,
Identity,
Response,
ResponseCode,
Result,
+ ScheduleStatus,
ScheduleStatusResult,
+ ScheduledTask,
+ TaskEvent,
TaskQuery,
)
from apache.aurora.client.api import AuroraClientAPI
from apache.aurora.client.api.job_monitor import JobMonitor
+from apache.aurora.common.aurora_job_key import AuroraJobKey
-from mox import MoxTestBase
+from mock import Mock
-ROLE = 'johndoe'
-ENV = 'test'
-JOB_NAME = 'test_job'
+class FakeClock(object):
+ def sleep(self, seconds):
+ pass
-class JobMonitorTest(MoxTestBase):
+
+class JobMonitorTest(unittest.TestCase):
def setUp(self):
+ self._scheduler = Mock()
+ self._job_key = AuroraJobKey('cl', 'johndoe', 'test', 'test_job')
+ self._clock = FakeClock()
+
+ def create_task(self, status, id):
+ return ScheduledTask(
+ assignedTask=AssignedTask(
+ instanceId=id,
+ taskId=id),
+ status=status,
+ taskEvents=[TaskEvent(
+ status=status,
+ timestamp=10)]
+ )
+ def mock_get_tasks(self, tasks, response_code=None):
+ response_code = ResponseCode.OK if response_code is None else response_code
+ resp = Response(responseCode=response_code, message='test')
+ resp.result = Result(scheduleStatusResult=ScheduleStatusResult(tasks=tasks))
+ self._scheduler.getTasksStatus.return_value = resp
+
+ def expect_task_status(self, once=False, instances=None):
+ query = TaskQuery(
+ owner=Identity(role=self._job_key.role),
+ environment=self._job_key.env,
+ jobName=self._job_key.name)
+ if instances is not None:
+ query.instanceIds = frozenset([int(s) for s in instances])
+
+ if once:
+ self._scheduler.getTasksStatus.assert_called_once_with(query)
+ else:
+ self._scheduler.getTasksStatus.assert_called_with(query)
+
+ def test_wait_until_state(self):
+ self.mock_get_tasks([
+ self.create_task(ScheduleStatus.RUNNING, '1'),
+ self.create_task(ScheduleStatus.RUNNING, '2'),
+ self.create_task(ScheduleStatus.FAILED, '3'),
+ ])
+
+ monitor = JobMonitor(self._scheduler, self._job_key)
+ assert monitor.wait_until(monitor.running_or_finished)
+ self.expect_task_status(once=True)
+
+ def test_empty_job_succeeds(self):
+ self.mock_get_tasks([])
- super(JobMonitorTest, self).setUp()
- self.mock_api = self.mox.CreateMock(AuroraClientAPI)
- self.mock_scheduler = self.mox.CreateMock(Client)
- self.mock_api.scheduler_proxy = self.mock_scheduler
+ monitor = JobMonitor(self._scheduler, self._job_key)
+ assert monitor.wait_until(monitor.running_or_finished)
+ self.expect_task_status(once=True)
- def test_init(self):
- result = Result(scheduleStatusResult=ScheduleStatusResult(tasks=[]))
- response = Response(responseCode=ResponseCode.OK, message="test", result=result)
- query = TaskQuery(owner=Identity(role=ROLE), environment=ENV, jobName=JOB_NAME)
+ def test_wait_with_instances(self):
+ self.mock_get_tasks([
+ self.create_task(ScheduleStatus.FAILED, '2'),
+ ])
- self.mock_scheduler.getTasksStatus(query).AndReturn(response)
+ monitor = JobMonitor(self._scheduler, self._job_key)
+ assert monitor.wait_until(monitor.terminal, instances=[2])
+ self.expect_task_status(once=True, instances=[2])
- self.mox.ReplayAll()
+ def test_wait_until_timeout(self):
+ self.mock_get_tasks([
+ self.create_task(ScheduleStatus.RUNNING, '1'),
+ self.create_task(ScheduleStatus.RUNNING, '2'),
+ self.create_task(ScheduleStatus.RUNNING, '3'),
+ ])
- JobMonitor(self.mock_api, ROLE, ENV, JOB_NAME)
+ monitor = JobMonitor(self._scheduler, self._job_key, clock=self._clock)
+ assert not monitor.wait_until(monitor.terminal, with_timeout=True)
+ self.expect_task_status()
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/464d6ea5/src/test/python/apache/aurora/client/api/test_updater.py
----------------------------------------------------------------------
diff --git a/src/test/python/apache/aurora/client/api/test_updater.py b/src/test/python/apache/aurora/client/api/test_updater.py
index e7eb1e7..4c931d5 100644
--- a/src/test/python/apache/aurora/client/api/test_updater.py
+++ b/src/test/python/apache/aurora/client/api/test_updater.py
@@ -19,9 +19,11 @@ from os import environ
from unittest import TestCase
from apache.aurora.client.api.instance_watcher import InstanceWatcher
+from apache.aurora.client.api.job_monitor import JobMonitor
from apache.aurora.client.api.quota_check import CapacityRequest, QuotaCheck
from apache.aurora.client.api.updater import Updater
from apache.aurora.client.fake_scheduler_proxy import FakeSchedulerProxy
+from apache.aurora.common.aurora_job_key import AuroraJobKey
from gen.apache.aurora.api.AuroraSchedulerManager import Client as scheduler_client
from gen.apache.aurora.api.constants import ACTIVE_STATES
@@ -95,6 +97,9 @@ class FakeConfig(object):
def job(self):
return self.job_config
+ def job_key(self):
+ return AuroraJobKey(self.cluster(), self.role(), self.environment(), self.name())
+
def instances(self):
return self.job_config.instanceCount
@@ -117,6 +122,7 @@ class UpdaterTest(TestCase):
self._session_key = 'test_session'
self._lock = 'test_lock'
self._instance_watcher = MockObject(InstanceWatcher)
+ self._job_monitor = MockObject(JobMonitor)
self._scheduler = MockObject(scheduler_client)
self._scheduler_proxy = FakeSchedulerProxy('test-cluster', self._scheduler, self._session_key)
self._quota_check = MockObject(QuotaCheck)
@@ -129,11 +135,13 @@ class UpdaterTest(TestCase):
Replay(self._scheduler)
Replay(self._instance_watcher)
Replay(self._quota_check)
+ Replay(self._job_monitor)
def verify_mocks(self):
Verify(self._scheduler)
Verify(self._instance_watcher)
Verify(self._quota_check)
+ Verify(self._job_monitor)
def init_updater(self, update_config):
self._config = FakeConfig(self._role, self._name, self._env, update_config)
@@ -142,7 +150,8 @@ class UpdaterTest(TestCase):
3,
self._scheduler_proxy,
self._instance_watcher,
- self._quota_check)
+ self._quota_check,
+ self._job_monitor)
def expect_watch_instances(self, instance_ids, failed_instances=[]):
self._instance_watcher.watch(instance_ids).AndReturn(set(failed_instances))
@@ -183,7 +192,7 @@ class UpdaterTest(TestCase):
self._lock,
self._session_key).AndReturn(response)
- def expect_kill(self, instance_ids, response_code=None):
+ def expect_kill(self, instance_ids, response_code=None, monitor_result=True):
response_code = ResponseCode.OK if response_code is None else response_code
response = Response(responseCode=response_code, message='test')
query = TaskQuery(
@@ -193,6 +202,11 @@ class UpdaterTest(TestCase):
statuses=ACTIVE_STATES,
instanceIds=frozenset([int(s) for s in instance_ids]))
self._scheduler.killTasks(query, self._lock, self._session_key).AndReturn(response)
+ if response_code != ResponseCode.OK:
+ return
+
+ self._job_monitor.wait_until(JobMonitor.terminal, instance_ids, with_timeout=True).AndReturn(
+ monitor_result)
def expect_add(self, instance_ids, task_config, response_code=None):
response_code = ResponseCode.OK if response_code is None else response_code
@@ -653,6 +667,23 @@ class UpdaterTest(TestCase):
self.update_and_expect_response(ResponseCode.ERROR)
self.verify_mocks()
+ def test_update_kill_timeout(self):
+ """Test job monitor timeout while waiting for tasks killed."""
+ old_configs = self.make_task_configs(5)
+ new_config = deepcopy(old_configs[0])
+ new_config.priority = 5
+ job_config = self.make_job_config(new_config, 5)
+ self._config.job_config = job_config
+ self.expect_start()
+ self.expect_get_tasks(old_configs)
+ self.expect_populate(job_config)
+ self.expect_quota_check(5, 5)
+ self.expect_kill([0, 1, 2], monitor_result=False)
+ self.replay_mocks()
+
+ self.update_and_expect_response(ResponseCode.ERROR)
+ self.verify_mocks()
+
def test_job_does_not_exist(self):
"""Unable to update a job that does not exist."""
old_configs = self.make_task_configs(5)
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/464d6ea5/src/test/python/apache/aurora/client/cli/test_command_hooks.py
----------------------------------------------------------------------
diff --git a/src/test/python/apache/aurora/client/cli/test_command_hooks.py b/src/test/python/apache/aurora/client/cli/test_command_hooks.py
index 7c6f70c..6c6f6f5 100644
--- a/src/test/python/apache/aurora/client/cli/test_command_hooks.py
+++ b/src/test/python/apache/aurora/client/cli/test_command_hooks.py
@@ -137,7 +137,7 @@ class TestClientCreateCommand(AuroraClientCommandTest):
fp.name])
self.assert_create_job_called(api)
- self.assert_scheduler_called(api, mock_query, 2)
+ self.assert_scheduler_called(api, mock_query, 1)
assert command_hook.ran_pre
assert command_hook.ran_post
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/464d6ea5/src/test/python/apache/aurora/client/cli/test_create.py
----------------------------------------------------------------------
diff --git a/src/test/python/apache/aurora/client/cli/test_create.py b/src/test/python/apache/aurora/client/cli/test_create.py
index 875573e..96dc592 100644
--- a/src/test/python/apache/aurora/client/cli/test_create.py
+++ b/src/test/python/apache/aurora/client/cli/test_create.py
@@ -57,13 +57,9 @@ class TestClientCreateCommand(AuroraClientCommandTest):
def create_mock_status_query_result(cls, scheduleStatus):
mock_query_result = cls.create_simple_success_response()
mock_query_result.result.scheduleStatusResult = Mock(spec=ScheduleStatusResult)
- if scheduleStatus == ScheduleStatus.INIT:
- # status query result for before job is launched.
- mock_query_result.result.scheduleStatusResult.tasks = []
- else:
- mock_task_one = cls.create_mock_task('hello', 0, 1000, scheduleStatus)
- mock_task_two = cls.create_mock_task('hello', 1, 1004, scheduleStatus)
- mock_query_result.result.scheduleStatusResult.tasks = [mock_task_one, mock_task_two]
+ mock_task_one = cls.create_mock_task('hello', 0, 1000, scheduleStatus)
+ mock_task_two = cls.create_mock_task('hello', 1, 1004, scheduleStatus)
+ mock_query_result.result.scheduleStatusResult.tasks = [mock_task_one, mock_task_two]
return mock_query_result
@classmethod
@@ -99,13 +95,15 @@ class TestClientCreateCommand(AuroraClientCommandTest):
# We'll patch out create_context, which will give us a fake context
# object, and everything can be stubbed through that.
mock_context = FakeAuroraCommandContext()
- with patch('apache.aurora.client.cli.jobs.Job.create_context', return_value=mock_context):
+ with contextlib.nested(
+ patch('time.sleep'),
+ patch('apache.aurora.client.cli.jobs.Job.create_context', return_value=mock_context)):
# After making the client, create sets up a job monitor.
# The monitor uses TaskQuery to get the tasks. It's called at least twice:once before
# the job is created, and once after. So we need to set up mocks for the query results.
mock_query = self.create_mock_query()
mock_context.add_expected_status_query_result(
- self.create_mock_status_query_result(ScheduleStatus.INIT))
+ self.create_mock_status_query_result(ScheduleStatus.PENDING))
mock_context.add_expected_status_query_result(
self.create_mock_status_query_result(ScheduleStatus.RUNNING))
api = mock_context.get_api('west')
@@ -133,8 +131,7 @@ class TestClientCreateCommand(AuroraClientCommandTest):
patch('time.sleep'),
patch('apache.aurora.client.cli.jobs.Job.create_context', return_value=mock_context)):
mock_query = self.create_mock_query()
- for result in [ScheduleStatus.INIT, ScheduleStatus.PENDING, ScheduleStatus.PENDING,
- ScheduleStatus.RUNNING, ScheduleStatus.FINISHED]:
+ for result in [ScheduleStatus.PENDING, ScheduleStatus.PENDING, ScheduleStatus.RUNNING]:
mock_context.add_expected_status_query_result(self.create_mock_status_query_result(result))
api = mock_context.get_api('west')
api.create_job.return_value = self.get_createjob_response()
@@ -147,7 +144,7 @@ class TestClientCreateCommand(AuroraClientCommandTest):
# Now check that the right API calls got made.
# Check that create_job was called exactly once, with an AuroraConfig parameter.
self.assert_create_job_called(api)
- self.assert_scheduler_called(api, mock_query, 4)
+ self.assert_scheduler_called(api, mock_query, 3)
def test_create_job_failed(self):
"""Run a test of the "create" command against a mocked-out API:
@@ -172,8 +169,6 @@ class TestClientCreateCommand(AuroraClientCommandTest):
# Check that create_job was called exactly once, with an AuroraConfig parameter.
self.assert_create_job_called(api)
- # getTasksStatus was called once, before the create_job
- assert api.scheduler_proxy.getTasksStatus.call_count == 1
def test_create_job_failed_invalid_config(self):
"""Run a test of the "create" command against a mocked-out API, with a configuration
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/464d6ea5/src/test/python/apache/aurora/client/cli/test_kill.py
----------------------------------------------------------------------
diff --git a/src/test/python/apache/aurora/client/cli/test_kill.py b/src/test/python/apache/aurora/client/cli/test_kill.py
index cf5df64..e11fc81 100644
--- a/src/test/python/apache/aurora/client/cli/test_kill.py
+++ b/src/test/python/apache/aurora/client/cli/test_kill.py
@@ -19,6 +19,7 @@ import unittest
from twitter.common.contextutil import temporary_file
+from apache.aurora.client.cli import EXIT_TIMEOUT
from apache.aurora.client.cli.client import AuroraCommandLine
from apache.aurora.client.cli.options import parse_instances
from apache.aurora.client.cli.util import AuroraClientCommandTest, FakeAuroraCommandContext
@@ -27,8 +28,12 @@ from apache.aurora.common.aurora_job_key import AuroraJobKey
from twitter.common.contextutil import temporary_file
from gen.apache.aurora.api.ttypes import (
+ AssignedTask,
Identity,
+ ScheduleStatus,
ScheduleStatusResult,
+ ScheduledTask,
+ TaskEvent,
TaskQuery,
)
@@ -55,11 +60,24 @@ class TestClientKillCommand(AuroraClientCommandTest):
def assert_kill_job_called(cls, mock_api):
assert mock_api.kill_job.call_count == 1
+ @classmethod
+ def assert_scheduler_called(cls, mock_api, mock_query, num_queries):
+ assert mock_api.scheduler_proxy.getTasksStatus.call_count == num_queries
+ mock_api.scheduler_proxy.getTasksStatus.assert_called_with(mock_query)
+
+ @classmethod
+ def get_expected_task_query(cls, instances=None):
+ instance_ids = frozenset(instances) if instances is not None else None
+ return TaskQuery(taskIds=None, jobName=cls.TEST_JOB, environment=cls.TEST_ENV,
+ instanceIds=instance_ids, owner=Identity(role=cls.TEST_ROLE, user=None))
+
+
def test_killall_job(self):
"""Test kill client-side API logic."""
mock_context = FakeAuroraCommandContext()
mock_scheduler_proxy = Mock()
with contextlib.nested(
+ patch('time.sleep'),
patch('apache.aurora.client.cli.jobs.Job.create_context', return_value=mock_context),
patch('apache.aurora.client.factory.CLUSTERS', new=self.TEST_CLUSTERS)):
@@ -67,6 +85,8 @@ class TestClientKillCommand(AuroraClientCommandTest):
mock_scheduler_proxy.getTasksStatus.return_value = self.create_status_call_result()
api.kill_job.return_value = self.get_kill_job_response()
mock_scheduler_proxy.killTasks.return_value = self.get_kill_job_response()
+ mock_context.add_expected_status_query_result(self.create_status_call_result(
+ self.create_mock_task(ScheduleStatus.KILLED)))
with temporary_file() as fp:
fp.write(self.get_valid_config())
fp.flush()
@@ -76,12 +96,43 @@ class TestClientKillCommand(AuroraClientCommandTest):
# Now check that the right API calls got made.
assert api.kill_job.call_count == 1
api.kill_job.assert_called_with(AuroraJobKey.from_path('west/bozo/test/hello'), None)
+ self.assert_scheduler_called(api, self.get_expected_task_query(), 2)
+
+ def test_killall_job_wait_until_timeout(self):
+ """Test kill client-side API logic."""
+ mock_context = FakeAuroraCommandContext()
+ mock_scheduler_proxy = Mock()
+ with contextlib.nested(
+ patch('time.sleep'),
+ patch('apache.aurora.client.cli.jobs.Job.create_context', return_value=mock_context),
+ patch('apache.aurora.client.factory.CLUSTERS', new=self.TEST_CLUSTERS)):
+
+ api = mock_context.get_api('west')
+ mock_scheduler_proxy.getTasksStatus.return_value = self.create_status_call_result()
+ api.kill_job.return_value = self.get_kill_job_response()
+ mock_scheduler_proxy.killTasks.return_value = self.get_kill_job_response()
+ for _ in range(8):
+ mock_context.add_expected_status_query_result(self.create_status_call_result(
+ self.create_mock_task(ScheduleStatus.RUNNING)))
+
+ with temporary_file() as fp:
+ fp.write(self.get_valid_config())
+ fp.flush()
+ cmd = AuroraCommandLine()
+ assert EXIT_TIMEOUT == cmd.execute(
+ ['job', 'killall', '--no-batching', '--config=%s' % fp.name, 'west/bozo/test/hello'])
+
+ # Now check that the right API calls got made.
+ assert api.kill_job.call_count == 1
+ api.kill_job.assert_called_with(AuroraJobKey.from_path('west/bozo/test/hello'), None)
+ self.assert_scheduler_called(api, self.get_expected_task_query(), 8)
def test_killall_job(self):
"""Test kill client-side API logic."""
mock_context = FakeAuroraCommandContext()
mock_scheduler_proxy = Mock()
with contextlib.nested(
+ patch('time.sleep'),
patch('apache.aurora.client.cli.jobs.Job.create_context', return_value=mock_context),
patch('apache.aurora.client.factory.CLUSTERS', new=self.TEST_CLUSTERS)):
@@ -89,6 +140,8 @@ class TestClientKillCommand(AuroraClientCommandTest):
api.kill_job.return_value = self.get_kill_job_response()
mock_context.add_expected_status_query_result(self.create_status_call_result())
mock_scheduler_proxy.killTasks.return_value = self.get_kill_job_response()
+ mock_context.add_expected_status_query_result(self.create_status_call_result(
+ self.create_mock_task(ScheduleStatus.KILLED)))
with temporary_file() as fp:
fp.write(self.get_valid_config())
fp.flush()
@@ -97,17 +150,22 @@ class TestClientKillCommand(AuroraClientCommandTest):
# Now check that the right API calls got made.
assert api.kill_job.call_count == 4
- api.kill_job.assert_called_with(AuroraJobKey.from_path('west/bozo/test/hello'), [15, 16, 17, 18, 19])
+ instances = [15, 16, 17, 18, 19]
+ api.kill_job.assert_called_with(AuroraJobKey.from_path('west/bozo/test/hello'), instances)
+ self.assert_scheduler_called(api, self.get_expected_task_query(instances), 6)
def test_kill_job_with_instances_nobatching(self):
"""Test kill client-side API logic."""
mock_context = FakeAuroraCommandContext()
with contextlib.nested(
+ patch('time.sleep'),
patch('apache.aurora.client.cli.jobs.Job.create_context', return_value=mock_context),
patch('apache.aurora.client.factory.CLUSTERS', new=self.TEST_CLUSTERS)):
api = mock_context.get_api('west')
self.setup_get_tasks_status_calls(api.scheduler_proxy)
api.kill_job.return_value = self.get_kill_job_response()
+ mock_context.add_expected_status_query_result(self.create_status_call_result(
+ self.create_mock_task(ScheduleStatus.KILLED)))
with temporary_file() as fp:
fp.write(self.get_valid_config())
fp.flush()
@@ -116,19 +174,24 @@ class TestClientKillCommand(AuroraClientCommandTest):
# Now check that the right API calls got made.
assert api.kill_job.call_count == 1
- api.kill_job.assert_called_with(AuroraJobKey.from_path('west/bozo/test/hello'),
- [0, 2, 4, 5, 6])
+ instances = [0, 2, 4, 5, 6]
+ api.kill_job.assert_called_with(AuroraJobKey.from_path('west/bozo/test/hello'), instances)
+ self.assert_scheduler_called(api, self.get_expected_task_query(instances), 2)
def test_kill_job_with_instances_batched(self):
"""Test kill client-side API logic."""
mock_context = FakeAuroraCommandContext()
with contextlib.nested(
+ patch('time.sleep'),
patch('apache.aurora.client.cli.jobs.Job.create_context', return_value=mock_context),
patch('apache.aurora.client.factory.CLUSTERS', new=self.TEST_CLUSTERS)):
api = mock_context.get_api('west')
status_result = self.create_status_call_result()
mock_context.add_expected_status_query_result(status_result)
api.kill_job.return_value = self.get_kill_job_response()
+ mock_context.add_expected_status_query_result(self.create_status_call_result(
+ self.create_mock_task(ScheduleStatus.KILLED)))
+
with temporary_file() as fp:
fp.write(self.get_valid_config())
fp.flush()
@@ -137,19 +200,25 @@ class TestClientKillCommand(AuroraClientCommandTest):
# Now check that the right API calls got made.
assert api.kill_job.call_count == 1
- api.kill_job.assert_called_with(AuroraJobKey.from_path('west/bozo/test/hello'),
- [0, 2, 4, 5, 6])
+ instances = [0, 2, 4, 5, 6]
+ api.kill_job.assert_called_with(AuroraJobKey.from_path('west/bozo/test/hello'), instances)
+ # Expect total 3 calls (one from JobMonitor).
+ self.assert_scheduler_called(api, self.get_expected_task_query(instances), 3)
def test_kill_job_with_instances_batched_large(self):
"""Test kill client-side API logic."""
mock_context = FakeAuroraCommandContext()
with contextlib.nested(
+ patch('time.sleep'),
patch('apache.aurora.client.cli.jobs.Job.create_context', return_value=mock_context),
patch('apache.aurora.client.factory.CLUSTERS', new=self.TEST_CLUSTERS)):
api = mock_context.get_api('west')
status_result = self.create_status_call_result()
mock_context.add_expected_status_query_result(status_result)
api.kill_job.return_value = self.get_kill_job_response()
+ mock_context.add_expected_status_query_result(self.create_status_call_result(
+ self.create_mock_task(ScheduleStatus.KILLED)))
+
with temporary_file() as fp:
fp.write(self.get_valid_config())
fp.flush()
@@ -160,6 +229,8 @@ class TestClientKillCommand(AuroraClientCommandTest):
assert api.kill_job.call_count == 3
api.kill_job.assert_called_with(AuroraJobKey.from_path('west/bozo/test/hello'),
[12, 13])
+ # Expect total 5 calls (3 from JobMonitor).
+ self.assert_scheduler_called(api, self.get_expected_task_query([12, 13]), 5)
def test_kill_job_with_instances_batched_maxerrors(self):
"""Test kill client-side API logic."""
@@ -169,27 +240,24 @@ class TestClientKillCommand(AuroraClientCommandTest):
patch('apache.aurora.client.factory.CLUSTERS', new=self.TEST_CLUSTERS)):
api = mock_context.get_api('west')
status_result = self.create_status_call_result()
- failed_status_result = self.create_error_response()
- mock_context.add_expected_status_query_result(status_result)
- mock_context.add_expected_status_query_result(failed_status_result)
- mock_context.add_expected_status_query_result(failed_status_result)
- mock_context.add_expected_status_query_result(status_result)
mock_context.add_expected_status_query_result(status_result)
- api.kill_job.return_value = self.get_kill_job_response()
+ api.kill_job.return_value = self.create_error_response()
+
with temporary_file() as fp:
fp.write(self.get_valid_config())
fp.flush()
cmd = AuroraCommandLine()
cmd.execute(['job', 'kill', '--max-total-failures=1', '--config=%s' % fp.name, 'west/bozo/test/hello/0,2,4-13'])
- # Now check that the right API calls got made. We should have aborted after the third batch.
- assert api.kill_job.call_count == 3
-
+ # Now check that the right API calls got made. We should have aborted after the second batch.
+ assert api.kill_job.call_count == 2
+ assert api.scheduler_proxy.getTasksStatus.call_count == 0
def test_kill_job_with_empty_instances_batched(self):
"""Test kill client-side API logic."""
mock_context = FakeAuroraCommandContext()
with contextlib.nested(
+ patch('time.sleep'),
patch('apache.aurora.client.cli.jobs.Job.create_context', return_value=mock_context),
patch('apache.aurora.client.factory.CLUSTERS', new=self.TEST_CLUSTERS)):
api = mock_context.get_api('west')
@@ -197,7 +265,6 @@ class TestClientKillCommand(AuroraClientCommandTest):
status_response = self.create_simple_success_response()
schedule_status = Mock(spec=ScheduleStatusResult)
status_response.result.scheduleStatusResult = schedule_status
- mock_task_config = Mock()
schedule_status.tasks = []
mock_context.add_expected_status_query_result(status_response)
api.kill_job.return_value = self.get_kill_job_response()
@@ -215,6 +282,7 @@ class TestClientKillCommand(AuroraClientCommandTest):
"""Test kill client-side API logic."""
(mock_api, mock_scheduler_proxy) = self.create_mock_api()
with contextlib.nested(
+ patch('time.sleep'),
patch('apache.aurora.client.api.SchedulerProxy', return_value=mock_scheduler_proxy),
patch('apache.aurora.client.factory.CLUSTERS', new=self.TEST_CLUSTERS)):
mock_scheduler_proxy.killTasks.return_value = self.get_kill_job_response()
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/464d6ea5/src/test/python/apache/aurora/client/cli/test_plugins.py
----------------------------------------------------------------------
diff --git a/src/test/python/apache/aurora/client/cli/test_plugins.py b/src/test/python/apache/aurora/client/cli/test_plugins.py
index 2dab749..21c4400 100644
--- a/src/test/python/apache/aurora/client/cli/test_plugins.py
+++ b/src/test/python/apache/aurora/client/cli/test_plugins.py
@@ -149,7 +149,7 @@ class TestPlugins(AuroraClientCommandTest):
# Now check that the right API calls got made.
# Check that create_job was called exactly once, with an AuroraConfig parameter.
self.assert_create_job_called(api)
- self.assert_scheduler_called(api, mock_query, 2)
+ self.assert_scheduler_called(api, mock_query, 1)
# Check that the plugin did its job.
assert mock_context.bogosity == "maximum"
assert mock_context.after == True
@@ -177,7 +177,7 @@ class TestPlugins(AuroraClientCommandTest):
cmd.execute(['job', 'create', '--wait-until=RUNNING',
'west/bozo/test/hello', fp.name])
self.assert_create_job_called(api)
- self.assert_scheduler_called(api, mock_query, 2)
+ self.assert_scheduler_called(api, mock_query, 1)
def mock_print(self, str):
for str in str.split('\n'):
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/464d6ea5/src/test/python/apache/aurora/client/cli/test_update.py
----------------------------------------------------------------------
diff --git a/src/test/python/apache/aurora/client/cli/test_update.py b/src/test/python/apache/aurora/client/cli/test_update.py
index cf077e8..854d583 100644
--- a/src/test/python/apache/aurora/client/cli/test_update.py
+++ b/src/test/python/apache/aurora/client/cli/test_update.py
@@ -20,6 +20,7 @@ from twitter.common.contextutil import temporary_file
from apache.aurora.client.api.updater import Updater
from apache.aurora.client.api.health_check import StatusHealthCheck, Retriable
+from apache.aurora.client.api.job_monitor import JobMonitor
from apache.aurora.client.api.quota_check import QuotaCheck
from apache.aurora.client.cli import EXIT_INVALID_CONFIGURATION
from apache.aurora.client.cli.client import AuroraCommandLine
@@ -184,6 +185,13 @@ class TestUpdateCommand(AuroraClientCommandTest):
mock_quota_check = Mock(spec=QuotaCheck)
mock_quota_check.validate_quota_from_requested.return_value = \
cls.create_simple_success_response()
+ return mock_quota_check
+
+ @classmethod
+ def setup_job_monitor(cls):
+ mock_job_monitor = Mock(spec=JobMonitor)
+ mock_job_monitor.wait_until.return_value = True
+ return mock_job_monitor
def test_updater_simple(self):
# Test the client-side updater logic in its simplest case: everything succeeds,
@@ -191,6 +199,7 @@ class TestUpdateCommand(AuroraClientCommandTest):
(mock_api, mock_scheduler_proxy) = self.create_mock_api()
mock_health_check = self.setup_health_checks(mock_api)
mock_quota_check = self.setup_quota_check()
+ mock_job_monitor = self.setup_job_monitor()
self.setup_mock_scheduler_for_simple_update(mock_api)
# This doesn't work, because:
# - The mock_context stubs out the API.
@@ -200,7 +209,8 @@ class TestUpdateCommand(AuroraClientCommandTest):
patch('apache.aurora.client.api.SchedulerProxy', return_value=mock_scheduler_proxy),
patch('apache.aurora.client.api.instance_watcher.StatusHealthCheck',
return_value=mock_health_check),
- patch('apache.aurora.client.api.quota_check.QuotaCheck', return_value=mock_quota_check),
+ patch('apache.aurora.client.api.updater.JobMonitor', return_value=mock_job_monitor),
+ patch('apache.aurora.client.api.updater.QuotaCheck', return_value=mock_quota_check),
patch('time.time', side_effect=functools.partial(self.fake_time, self)),
patch('time.sleep', return_value=None)):
with temporary_file() as fp:
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/464d6ea5/src/test/python/apache/aurora/client/cli/util.py
----------------------------------------------------------------------
diff --git a/src/test/python/apache/aurora/client/cli/util.py b/src/test/python/apache/aurora/client/cli/util.py
index 2225ab0..e17f256 100644
--- a/src/test/python/apache/aurora/client/cli/util.py
+++ b/src/test/python/apache/aurora/client/cli/util.py
@@ -21,8 +21,11 @@ from gen.apache.aurora.api.ttypes import (
Response,
ResponseCode,
Result,
+ ScheduleStatus,
ScheduleStatusResult,
ScheduledTask,
+ TaskConfig,
+ TaskEvent,
)
from apache.aurora.client.cli.context import AuroraCommandContext
@@ -129,24 +132,33 @@ class AuroraClientCommandTest(unittest.TestCase):
return mock_api_factory, mock_scheduler_client
@classmethod
- def create_status_call_result(cls):
+ def create_status_call_result(cls, mock_task=None):
status_response = cls.create_simple_success_response()
schedule_status = Mock(spec=ScheduleStatusResult)
status_response.result.scheduleStatusResult = schedule_status
- mock_task_config = Mock()
# This should be a list of ScheduledTask's.
schedule_status.tasks = []
- for i in range(20):
- task_status = Mock(spec=ScheduledTask)
- task_status.assignedTask = Mock(spec=AssignedTask)
- task_status.assignedTask.instanceId = i
- task_status.assignedTask.taskId = "Task%s" % i
- task_status.assignedTask.slaveId = "Slave%s" % i
- task_status.slaveHost = "Slave%s" % i
- task_status.assignedTask.task = mock_task_config
- schedule_status.tasks.append(task_status)
+ if mock_task is None:
+ for i in range(20):
+ schedule_status.tasks.append(cls.create_mock_task(i))
+ else:
+ schedule_status.tasks.append(mock_task)
return status_response
+ @classmethod
+ def create_mock_task(cls, instance_id, status=ScheduleStatus.RUNNING):
+ mock_task = Mock(spec=ScheduledTask)
+ mock_task.assignedTask = Mock(spec=AssignedTask)
+ mock_task.assignedTask.instanceId = instance_id
+ mock_task.assignedTask.taskId = "Task%s" % instance_id
+ mock_task.assignedTask.slaveId = "Slave%s" % instance_id
+ mock_task.assignedTask.task = Mock(spec=TaskConfig)
+ mock_task.slaveHost = "Slave%s" % instance_id
+ mock_task.status = status
+ mock_task_event = Mock(spec=TaskEvent)
+ mock_task_event.timestamp = 1000
+ mock_task.taskEvents = [mock_task_event]
+ return mock_task
@classmethod
def setup_get_tasks_status_calls(cls, scheduler):
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/464d6ea5/src/test/python/apache/aurora/client/commands/test_create.py
----------------------------------------------------------------------
diff --git a/src/test/python/apache/aurora/client/commands/test_create.py b/src/test/python/apache/aurora/client/commands/test_create.py
index e0ecb52..328b980 100644
--- a/src/test/python/apache/aurora/client/commands/test_create.py
+++ b/src/test/python/apache/aurora/client/commands/test_create.py
@@ -71,13 +71,9 @@ class TestClientCreateCommand(AuroraClientCommandTest):
def create_mock_status_query_result(cls, scheduleStatus):
mock_query_result = cls.create_simple_success_response()
mock_query_result.result.scheduleStatusResult = Mock(spec=ScheduleStatusResult)
- if scheduleStatus == ScheduleStatus.INIT:
- # status query result for before job is launched.
- mock_query_result.result.scheduleStatusResult.tasks = []
- else:
- mock_task_one = cls.create_mock_task('hello', 0, 1000, scheduleStatus)
- mock_task_two = cls.create_mock_task('hello', 1, 1004, scheduleStatus)
- mock_query_result.result.scheduleStatusResult.tasks = [mock_task_one, mock_task_two]
+ mock_task_one = cls.create_mock_task('hello', 0, 1000, scheduleStatus)
+ mock_task_two = cls.create_mock_task('hello', 1, 1004, scheduleStatus)
+ mock_query_result.result.scheduleStatusResult.tasks = [mock_task_one, mock_task_two]
return mock_query_result
@classmethod
@@ -127,7 +123,6 @@ class TestClientCreateCommand(AuroraClientCommandTest):
# the job is created, and once after. So we need to set up mocks for the query results.
mock_query = self.create_mock_query()
mock_scheduler_proxy.getTasksStatus.side_effect = [
- self.create_mock_status_query_result(ScheduleStatus.INIT),
self.create_mock_status_query_result(ScheduleStatus.RUNNING)
]
@@ -146,7 +141,7 @@ class TestClientCreateCommand(AuroraClientCommandTest):
# Now check that the right API calls got made.
# Check that create_job was called exactly once, with an AuroraConfig parameter.
self.assert_create_job_called(mock_api)
- self.assert_scheduler_called(mock_api, mock_query, 2)
+ self.assert_scheduler_called(mock_api, mock_query, 1)
# make_client should have been called once.
make_client.assert_called_with('west')
@@ -162,12 +157,12 @@ class TestClientCreateCommand(AuroraClientCommandTest):
patch('twitter.common.app.get_options', return_value=mock_options)) as (sleep, make_client,
options):
mock_query = self.create_mock_query()
+ mock_options.wait_until = 'FINISHED'
mock_query_results = [
- self.create_mock_status_query_result(ScheduleStatus.INIT),
self.create_mock_status_query_result(ScheduleStatus.PENDING),
self.create_mock_status_query_result(ScheduleStatus.PENDING),
self.create_mock_status_query_result(ScheduleStatus.RUNNING),
- self.create_mock_status_query_result(ScheduleStatus.FINISHED)
+ self.create_mock_status_query_result(ScheduleStatus.FINISHED),
]
mock_scheduler_proxy.getTasksStatus.side_effect = mock_query_results
mock_api.create_job.return_value = self.get_createjob_response()
@@ -193,11 +188,6 @@ class TestClientCreateCommand(AuroraClientCommandTest):
patch('apache.aurora.client.commands.core.make_client', return_value=mock_api),
patch('twitter.common.app.get_options', return_value=mock_options)) as (make_client,
options):
- mock_query = self.create_mock_query()
- mock_query_results = [
- self.create_mock_status_query_result(ScheduleStatus.INIT)
- ]
- mock_scheduler_proxy.getTasksStatus.side_effect = mock_query_results
mock_api.create_job.return_value = self.get_failed_createjob_response()
# This is the real test: invoke create as if it had been called by the command line.
with temporary_file() as fp:
@@ -209,9 +199,6 @@ class TestClientCreateCommand(AuroraClientCommandTest):
# Check that create_job was called exactly once, with an AuroraConfig parameter.
self.assert_create_job_called(mock_api)
- # getTasksStatus was called once, before the create_job
- assert mock_scheduler_proxy.getTasksStatus.call_count == 1
- mock_scheduler_proxy.getTasksStatus.assert_called_with(mock_query)
# make_client should have been called once.
make_client.assert_called_with('west')
@@ -228,11 +215,9 @@ class TestClientCreateCommand(AuroraClientCommandTest):
options):
mock_query = self.create_mock_query()
mock_query_results = [
- self.create_mock_status_query_result(ScheduleStatus.INIT),
self.create_mock_status_query_result(ScheduleStatus.PENDING),
self.create_mock_status_query_result(ScheduleStatus.PENDING),
- self.create_mock_status_query_result(ScheduleStatus.RUNNING),
- self.create_mock_status_query_result(ScheduleStatus.FINISHED)
+ self.create_mock_status_query_result(ScheduleStatus.RUNNING)
]
mock_scheduler_proxy.getTasksStatus.side_effect = mock_query_results
mock_api.create_job.return_value = self.get_createjob_response()
@@ -243,7 +228,7 @@ class TestClientCreateCommand(AuroraClientCommandTest):
# Now check that the right API calls got made.
self.assert_create_job_called(mock_api)
- self.assert_scheduler_called(mock_api, mock_query, 4)
+ self.assert_scheduler_called(mock_api, mock_query, 3)
# make_client should have been called once.
make_client.assert_called_with('west')
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/464d6ea5/src/test/python/apache/aurora/client/commands/test_kill.py
----------------------------------------------------------------------
diff --git a/src/test/python/apache/aurora/client/commands/test_kill.py b/src/test/python/apache/aurora/client/commands/test_kill.py
index db820a5..7f0b73c 100644
--- a/src/test/python/apache/aurora/client/commands/test_kill.py
+++ b/src/test/python/apache/aurora/client/commands/test_kill.py
@@ -26,9 +26,12 @@ from twitter.common.contextutil import temporary_file
from apache.aurora.client.commands.util import AuroraClientCommandTest
from gen.apache.aurora.api.ttypes import (
+ AssignedTask,
Identity,
ScheduleStatus,
+ ScheduledTask,
ScheduleStatusResult,
+ TaskEvent,
TaskQuery,
)
@@ -69,11 +72,6 @@ class TestClientKillCommand(AuroraClientCommandTest):
return mock_query_result
@classmethod
- def create_mock_query(cls):
- return TaskQuery(owner=Identity(role=cls.TEST_ROLE), environment=cls.TEST_ENV,
- jobName=cls.TEST_JOB)
-
- @classmethod
def get_kill_job_response(cls):
return cls.create_simple_success_response()
@@ -81,6 +79,82 @@ class TestClientKillCommand(AuroraClientCommandTest):
def assert_kill_job_called(cls, mock_api):
assert mock_api.kill_job.call_count == 1
+ @classmethod
+ def get_expected_task_query(cls, instances=None):
+ """Helper to create the query that will be a parameter to job kill."""
+ instance_ids = frozenset(instances) if instances is not None else None
+ return TaskQuery(taskIds=None, jobName=cls.TEST_JOB, environment=cls.TEST_ENV,
+ instanceIds=instance_ids, owner=Identity(role=cls.TEST_ROLE, user=None))
+
+ @classmethod
+ def create_mock_task(cls, task_id, instance_id, initial_time, status):
+ mock_task = Mock(spec=ScheduledTask)
+ mock_task.assignedTask = Mock(spec=AssignedTask)
+ mock_task.assignedTask.taskId = task_id
+ mock_task.assignedTask.instanceId = instance_id
+ mock_task.status = status
+ mock_task_event = Mock(spec=TaskEvent)
+ mock_task_event.timestamp = initial_time
+ mock_task.taskEvents = [mock_task_event]
+ return mock_task
+
+ @classmethod
+ def create_mock_status_query_result(cls, scheduleStatus):
+ mock_query_result = cls.create_simple_success_response()
+ mock_query_result.result.scheduleStatusResult = Mock(spec=ScheduleStatusResult)
+ task = cls.create_mock_task('hello', 0, 1000, scheduleStatus)
+ mock_query_result.result.scheduleStatusResult.tasks = [task]
+ return mock_query_result
+
+ @classmethod
+ def assert_scheduler_called(cls, mock_api, mock_query, num_queries):
+ assert mock_api.scheduler_proxy.getTasksStatus.call_count == num_queries
+ mock_api.scheduler_proxy.getTasksStatus.assert_called_with(mock_query)
+
+ def test_kill_job_tasks_not_killed_in_time(self):
+ """Test kill timed out waiting in job monitor."""
+ mock_options = self.setup_mock_options()
+ mock_config = Mock()
+ mock_config.hooks = []
+ mock_config.raw.return_value.enable_hooks.return_value.get.return_value = False
+ (mock_api, mock_scheduler_proxy) = self.create_mock_api()
+ mock_scheduler_proxy.killTasks.return_value = self.get_kill_job_response()
+ mock_query_results = [
+ self.create_mock_status_query_result(ScheduleStatus.RUNNING),
+ self.create_mock_status_query_result(ScheduleStatus.KILLING),
+ self.create_mock_status_query_result(ScheduleStatus.KILLING),
+ self.create_mock_status_query_result(ScheduleStatus.KILLING),
+ self.create_mock_status_query_result(ScheduleStatus.KILLING),
+ self.create_mock_status_query_result(ScheduleStatus.KILLING),
+ self.create_mock_status_query_result(ScheduleStatus.KILLING),
+ self.create_mock_status_query_result(ScheduleStatus.KILLING),
+ ]
+ mock_scheduler_proxy.getTasksStatus.side_effect = mock_query_results
+ with contextlib.nested(
+ patch('time.sleep'),
+ patch('apache.aurora.client.factory.make_client', return_value=mock_api),
+ patch('apache.aurora.client.api.SchedulerProxy', return_value=mock_scheduler_proxy),
+ patch('apache.aurora.client.factory.CLUSTERS', new=self.TEST_CLUSTERS),
+ patch('twitter.common.app.get_options', return_value=mock_options),
+ patch('apache.aurora.client.commands.core.get_job_config', return_value=mock_config)) as (
+ mock_sleep,
+ mock_api_patch,
+ mock_scheduler_proxy_class,
+ mock_clusters,
+ options,
+ mock_get_job_config):
+
+ with temporary_file() as fp:
+ fp.write(self.get_valid_config())
+ fp.flush()
+ self.assertRaises(
+ SystemExit, killall, ['west/mchucarroll/test/hello', fp.name], mock_options)
+
+ # Now check that the right API calls got made.
+ assert mock_scheduler_proxy.killTasks.call_count == 1
+ query = self.get_expected_task_query()
+ mock_scheduler_proxy.killTasks.assert_called_with(query, None)
+ self.assert_scheduler_called(mock_api, query, 8)
def test_kill_job_noshards_fail(self):
mock_options = self.setup_mock_options()
@@ -103,7 +177,6 @@ class TestClientKillCommand(AuroraClientCommandTest):
# Now check that the right API calls got made.
mock_api.kill_job.call_count == 0
-
def test_simple_successful_killall_job(self):
"""Run a test of the "kill" command against a mocked-out API:
Verifies that the kill command sends the right API RPCs, and performs the correct
@@ -113,13 +186,23 @@ class TestClientKillCommand(AuroraClientCommandTest):
mock_config = Mock()
(mock_api, mock_scheduler_proxy) = self.create_mock_api()
mock_api.kill_job.return_value = self.get_kill_job_response()
+ mock_scheduler_proxy.killTasks.return_value = self.get_kill_job_response()
+ mock_query_results = [
+ self.create_mock_status_query_result(ScheduleStatus.RUNNING),
+ self.create_mock_status_query_result(ScheduleStatus.KILLING),
+ self.create_mock_status_query_result(ScheduleStatus.KILLED),
+ ]
+ mock_scheduler_proxy.getTasksStatus.side_effect = mock_query_results
with contextlib.nested(
+ patch('time.sleep'),
patch('apache.aurora.client.commands.core.make_client',
return_value=mock_api),
patch('twitter.common.app.get_options', return_value=mock_options),
patch('apache.aurora.client.commands.core.get_job_config', return_value=mock_config)) as (
+ sleep,
mock_make_client,
- options, mock_get_job_config):
+ options,
+ mock_get_job_config):
with temporary_file() as fp:
fp.write(self.get_valid_config())
@@ -131,15 +214,7 @@ class TestClientKillCommand(AuroraClientCommandTest):
mock_api.kill_job.assert_called_with(
AuroraJobKey(cluster=self.TEST_CLUSTER, role=self.TEST_ROLE, env=self.TEST_ENV,
name=self.TEST_JOB), None, config=mock_config)
-
-
-
- @classmethod
- def get_expected_task_query(cls, shards=None):
- """Helper to create the query that will be a parameter to job kill."""
- instance_ids = frozenset(shards) if shards is not None else None
- return TaskQuery(taskIds=None, jobName=cls.TEST_JOB, environment=cls.TEST_ENV,
- instanceIds=instance_ids, owner=Identity(role=cls.TEST_ROLE, user=None))
+ self.assert_scheduler_called(mock_api, self.get_expected_task_query(), 3)
def test_kill_job_api_level(self):
"""Test kill client-side API logic."""
@@ -149,16 +224,26 @@ class TestClientKillCommand(AuroraClientCommandTest):
mock_config.raw.return_value.enable_hooks.return_value.get.return_value = False
(mock_api, mock_scheduler_proxy) = self.create_mock_api()
mock_scheduler_proxy.killTasks.return_value = self.get_kill_job_response()
+ mock_query_results = [
+ self.create_mock_status_query_result(ScheduleStatus.RUNNING),
+ self.create_mock_status_query_result(ScheduleStatus.KILLING),
+ self.create_mock_status_query_result(ScheduleStatus.KILLED),
+ ]
+ mock_scheduler_proxy.getTasksStatus.side_effect = mock_query_results
with contextlib.nested(
+ patch('time.sleep'),
patch('apache.aurora.client.factory.make_client', return_value=mock_api),
patch('apache.aurora.client.api.SchedulerProxy', return_value=mock_scheduler_proxy),
patch('apache.aurora.client.factory.CLUSTERS', new=self.TEST_CLUSTERS),
patch('twitter.common.app.get_options', return_value=mock_options),
patch('apache.aurora.client.commands.core.get_job_config', return_value=mock_config)) as (
+ mock_sleep,
mock_api_patch,
mock_scheduler_proxy_class,
mock_clusters,
- options, mock_get_job_config):
+ options,
+ mock_get_job_config):
+
with temporary_file() as fp:
fp.write(self.get_valid_config())
fp.flush()
@@ -166,7 +251,9 @@ class TestClientKillCommand(AuroraClientCommandTest):
# Now check that the right API calls got made.
assert mock_scheduler_proxy.killTasks.call_count == 1
- mock_scheduler_proxy.killTasks.assert_called_with(self.get_expected_task_query(), None)
+ query = self.get_expected_task_query()
+ mock_scheduler_proxy.killTasks.assert_called_with(query, None)
+ self.assert_scheduler_called(mock_api, query, 3)
def test_kill_job_api_level_with_shards(self):
"""Test kill client-side API logic."""
@@ -177,12 +264,20 @@ class TestClientKillCommand(AuroraClientCommandTest):
mock_config.raw.return_value.enable_hooks.return_value.get.return_value = False
(mock_api, mock_scheduler_proxy) = self.create_mock_api()
mock_scheduler_proxy.killTasks.return_value = self.get_kill_job_response()
+ mock_query_results = [
+ self.create_mock_status_query_result(ScheduleStatus.RUNNING),
+ self.create_mock_status_query_result(ScheduleStatus.KILLING),
+ self.create_mock_status_query_result(ScheduleStatus.KILLED),
+ ]
+ mock_scheduler_proxy.getTasksStatus.side_effect = mock_query_results
with contextlib.nested(
+ patch('time.sleep'),
patch('apache.aurora.client.factory.make_client', return_value=mock_api),
patch('apache.aurora.client.api.SchedulerProxy', return_value=mock_scheduler_proxy),
patch('apache.aurora.client.factory.CLUSTERS', new=self.TEST_CLUSTERS),
patch('twitter.common.app.get_options', return_value=mock_options),
patch('apache.aurora.client.commands.core.get_job_config', return_value=mock_config)) as (
+ mock_sleep,
mock_api_factory_patch,
mock_scheduler_proxy_class,
mock_clusters,
@@ -196,3 +291,4 @@ class TestClientKillCommand(AuroraClientCommandTest):
assert mock_scheduler_proxy.killTasks.call_count == 1
query = self.get_expected_task_query([0, 1, 2, 3])
mock_scheduler_proxy.killTasks.assert_called_with(query, None)
+ self.assert_scheduler_called(mock_api, query, 3)
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/464d6ea5/src/test/python/apache/aurora/client/commands/test_update.py
----------------------------------------------------------------------
diff --git a/src/test/python/apache/aurora/client/commands/test_update.py b/src/test/python/apache/aurora/client/commands/test_update.py
index 6e145db..d51579f 100644
--- a/src/test/python/apache/aurora/client/commands/test_update.py
+++ b/src/test/python/apache/aurora/client/commands/test_update.py
@@ -25,6 +25,7 @@ from apache.aurora.client.commands.util import AuroraClientCommandTest
from apache.aurora.client.api.updater import Updater
from apache.aurora.client.api.health_check import StatusHealthCheck, Retriable
from apache.aurora.client.api.quota_check import QuotaCheck
+from apache.aurora.client.api.job_monitor import JobMonitor
from apache.aurora.client.hooks.hooked_api import HookedAuroraClientAPI
from apache.aurora.config import AuroraConfig
from twitter.common.contextutil import temporary_file
@@ -206,6 +207,13 @@ class TestUpdateCommand(AuroraClientCommandTest):
def setup_quota_check(cls):
mock_quota_check = Mock(spec=QuotaCheck)
mock_quota_check.validate_quota_from_requested.return_value = cls.create_simple_success_response()
+ return mock_quota_check
+
+ @classmethod
+ def setup_job_monitor(cls):
+ mock_job_monitor = Mock(spec=JobMonitor)
+ mock_job_monitor.wait_until.return_value = True
+ return mock_job_monitor
def test_updater_simple(self):
# Test the client-side updater logic in its simplest case: everything succeeds, and no rolling
@@ -214,6 +222,7 @@ class TestUpdateCommand(AuroraClientCommandTest):
(mock_api, mock_scheduler_proxy) = self.create_mock_api()
mock_health_check = self.setup_health_checks(mock_api)
mock_quota_check = self.setup_quota_check()
+ mock_job_monitor = self.setup_job_monitor()
with contextlib.nested(
patch('twitter.common.app.get_options', return_value=mock_options),
@@ -221,12 +230,13 @@ class TestUpdateCommand(AuroraClientCommandTest):
patch('apache.aurora.client.factory.CLUSTERS', new=self.TEST_CLUSTERS),
patch('apache.aurora.client.api.instance_watcher.StatusHealthCheck',
return_value=mock_health_check),
- patch('apache.aurora.client.api.quota_check.QuotaCheck', return_value=mock_quota_check),
+ patch('apache.aurora.client.api.updater.QuotaCheck', return_value=mock_quota_check),
+ patch('apache.aurora.client.api.updater.JobMonitor', return_value=mock_job_monitor),
patch('time.time', side_effect=functools.partial(self.fake_time, self)),
patch('time.sleep', return_value=None)
) as (options, scheduler_proxy_class, test_clusters, mock_health_check_factory,
- mock_quota_check_patch, time_patch, sleep_patch):
+ mock_quota_check_patch, mock_job_monitor_patch, time_patch, sleep_patch):
self.setup_mock_scheduler_for_simple_update(mock_api)
with temporary_file() as fp:
fp.write(self.get_valid_config())