You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aurora.apache.org by ma...@apache.org on 2014/05/02 19:17:49 UTC

git commit: Moving kill wait to the client (Part 1: client changes)

Repository: incubator-aurora
Updated Branches:
  refs/heads/master 639c4634f -> 464d6ea56


Moving kill wait to the client (Part 1: client changes)

Bugs closed: AURORA-370

Reviewed at https://reviews.apache.org/r/20950/


Project: http://git-wip-us.apache.org/repos/asf/incubator-aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-aurora/commit/464d6ea5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-aurora/tree/464d6ea5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-aurora/diff/464d6ea5

Branch: refs/heads/master
Commit: 464d6ea56b5fd678f959982be8929cfe6ac16b12
Parents: 639c463
Author: Maxim Khutornenko <ma...@apache.org>
Authored: Fri May 2 10:06:28 2014 -0700
Committer: Maxim Khutornenko <ma...@apache.org>
Committed: Fri May 2 10:06:28 2014 -0700

----------------------------------------------------------------------
 src/main/python/apache/aurora/client/api/BUILD  |   2 +
 .../apache/aurora/client/api/job_monitor.py     |  65 +++++----
 .../python/apache/aurora/client/api/updater.py  |   9 +-
 .../python/apache/aurora/client/cli/jobs.py     |  35 +++--
 .../python/apache/aurora/client/cli/task.py     |   1 -
 .../apache/aurora/client/commands/core.py       |  15 ++-
 src/test/python/apache/aurora/client/api/BUILD  |   3 +-
 .../aurora/client/api/test_job_monitor.py       |  93 ++++++++++---
 .../apache/aurora/client/api/test_updater.py    |  35 ++++-
 .../aurora/client/cli/test_command_hooks.py     |   2 +-
 .../apache/aurora/client/cli/test_create.py     |  23 ++--
 .../apache/aurora/client/cli/test_kill.py       |  98 +++++++++++---
 .../apache/aurora/client/cli/test_plugins.py    |   4 +-
 .../apache/aurora/client/cli/test_update.py     |  12 +-
 .../python/apache/aurora/client/cli/util.py     |  34 +++--
 .../aurora/client/commands/test_create.py       |  31 ++---
 .../apache/aurora/client/commands/test_kill.py  | 132 ++++++++++++++++---
 .../aurora/client/commands/test_update.py       |  14 +-
 18 files changed, 463 insertions(+), 145 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/464d6ea5/src/main/python/apache/aurora/client/api/BUILD
----------------------------------------------------------------------
diff --git a/src/main/python/apache/aurora/client/api/BUILD b/src/main/python/apache/aurora/client/api/BUILD
index 32097d2..0412bb5 100644
--- a/src/main/python/apache/aurora/client/api/BUILD
+++ b/src/main/python/apache/aurora/client/api/BUILD
@@ -57,6 +57,7 @@ python_library(
   name = 'job_monitor',
   sources = ['job_monitor.py'],
   dependencies = [
+    pants('3rdparty/python:twitter.common.log'),
     pants('3rdparty/python:twitter.common.quantity'),
     pants('src/main/thrift/org/apache/aurora/gen:py-thrift'),
   ]
@@ -127,6 +128,7 @@ python_library(
   dependencies = [
     pants(':scheduler_client'),
     pants(':instance_watcher'),
+    pants(':job_monitor'),
     pants(':updater_util'),
     pants(':quota_check'),
     pants('3rdparty/python:twitter.common.log'),

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/464d6ea5/src/main/python/apache/aurora/client/api/job_monitor.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/aurora/client/api/job_monitor.py b/src/main/python/apache/aurora/client/api/job_monitor.py
index b694ef6..a1a39e8 100644
--- a/src/main/python/apache/aurora/client/api/job_monitor.py
+++ b/src/main/python/apache/aurora/client/api/job_monitor.py
@@ -26,12 +26,13 @@ from gen.apache.aurora.api.ttypes import (
 )
 
 from thrift.transport import TTransport
+from twitter.common import log
 from twitter.common.quantity import Amount, Time
 
 
 class JobMonitor(object):
-  MIN_POLL_INTERVAL = Amount(10, Time.SECONDS)
-  MAX_POLL_INTERVAL = Amount(2, Time.MINUTES)
+  MIN_POLL_INTERVAL = Amount(2, Time.SECONDS)
+  MAX_POLL_INTERVAL = Amount(150, Time.SECONDS)
 
   @classmethod
   def running_or_finished(cls, status):
@@ -41,38 +42,58 @@ class JobMonitor(object):
   def terminal(cls, status):
     return status in TERMINAL_STATES
 
-  # TODO(ksweeney): Make this use the AuroraJobKey
-  def __init__(self, client, role, env, jobname):
-    self._client = client
-    self._query = TaskQuery(owner=Identity(role=role), environment=env, jobName=jobname)
-    self._initial_tasks = set()
-    self._initial_tasks = set(task.assignedTask.taskId for task in self.iter_query())
+  def __init__(self, scheduler, job_key, clock=time,
+               min_poll_interval=MIN_POLL_INTERVAL, max_poll_interval=MAX_POLL_INTERVAL):
+    self._scheduler = scheduler
+    self._job_key = job_key
+    self._clock = clock
+    self._min_poll_interval = min_poll_interval
+    self._max_poll_interval = max_poll_interval
 
-  def iter_query(self):
+  def iter_query(self, query):
     try:
-      res = self._client.scheduler_proxy.getTasksStatus(self._query)
+      res = self._scheduler.getTasksStatus(query)
     except TTransport.TTransportException as e:
-      print('Failed to query slaves from scheduler: %s' % e)
+      log.error('Failed to query tasks from scheduler: %s' % e)
       return
     if res is None or res.result is None:
       return
     for task in res.result.scheduleStatusResult.tasks:
-      if task.assignedTask.taskId not in self._initial_tasks:
-        yield task
+      yield task
 
-  def states(self):
+  def states(self, query):
     states = {}
-    for task in self.iter_query():
+    for task in self.iter_query(query):
       status, instance_id = task.status, task.assignedTask.instanceId
       first_timestamp = task.taskEvents[0].timestamp
       if instance_id not in states or first_timestamp > states[instance_id][0]:
         states[instance_id] = (first_timestamp, status)
     return dict((instance_id, status[1]) for (instance_id, status) in states.items())
 
-  def wait_until(self, predicate):
-    """Given a predicate (from ScheduleStatus => Boolean), return once all tasks
-       return true for that predicate."""
-    poll_interval = self.MIN_POLL_INTERVAL
-    while not all(predicate(state) for state in self.states().values()):
-      time.sleep(poll_interval.as_(Time.SECONDS))
-      poll_interval = min(self.MAX_POLL_INTERVAL, 2 * poll_interval)
+  def create_query(self, instances=None):
+    return TaskQuery(
+        owner=Identity(role=self._job_key.role),
+        environment=self._job_key.env,
+        jobName=self._job_key.name,
+        instanceIds=frozenset([int(s) for s in instances]) if instances else None)
+
+  def wait_until(self, predicate, instances=None, with_timeout=False):
+    """Given a predicate (from ScheduleStatus => Boolean), wait until all requested instances
+       return true for that predicate OR the timeout expires (if with_timeout=True)
+
+    Arguments:
+    predicate -- predicate to check completion with.
+    instances -- optional subset of job instances to wait for.
+    with_timeout -- if set, caps waiting time to the MAX_POLL_INTERVAL.
+
+    Returns: True if predicate is met or False if timeout has expired.
+    """
+    poll_interval = self._min_poll_interval
+    while not all(predicate(state) for state in self.states(self.create_query(instances)).values()):
+      if with_timeout and poll_interval >= self._max_poll_interval:
+        return False
+
+      self._clock.sleep(poll_interval.as_(Time.SECONDS))
+      poll_interval = min(self._max_poll_interval, 2 * poll_interval)
+
+    return True
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/464d6ea5/src/main/python/apache/aurora/client/api/updater.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/aurora/client/api/updater.py b/src/main/python/apache/aurora/client/api/updater.py
index 0acf450..2f55d5a 100644
--- a/src/main/python/apache/aurora/client/api/updater.py
+++ b/src/main/python/apache/aurora/client/api/updater.py
@@ -32,6 +32,7 @@ from gen.apache.aurora.api.ttypes import (
 )
 
 from .instance_watcher import InstanceWatcher
+from .job_monitor import JobMonitor
 from .quota_check import CapacityRequest, QuotaCheck
 from .scheduler_client import SchedulerProxy
 from .updater_util import FailureThreshold, UpdaterConfig
@@ -57,12 +58,14 @@ class Updater(object):
                health_check_interval_seconds,
                scheduler=None,
                instance_watcher=None,
-               quota_check=None):
+               quota_check=None,
+               job_monitor=None):
     self._config = config
     self._job_key = JobKey(role=config.role(), environment=config.environment(), name=config.name())
     self._health_check_interval_seconds = health_check_interval_seconds
     self._scheduler = scheduler or SchedulerProxy(config.cluster())
     self._quota_check = quota_check or QuotaCheck(self._scheduler)
+    self._job_monitor = job_monitor or JobMonitor(self._scheduler, self._config.job_key())
     try:
       self._update_config = UpdaterConfig(**config.update_config().get())
     except ValueError as e:
@@ -263,7 +266,6 @@ class Updater(object):
     if unchanged:
       log.info('Skipping unchanged instances: %s' % unchanged)
 
-    # Kill is a blocking call in scheduler -> no need to watch it yet.
     self._kill_instances(to_kill)
     self._add_instances(to_add, operation_configs.to_config)
     return to_add
@@ -278,6 +280,9 @@ class Updater(object):
       log.info('Killing instances: %s' % instance_ids)
       query = self._create_task_query(instanceIds=frozenset(int(s) for s in instance_ids))
       self._check_and_log_response(self._scheduler.killTasks(query, self._lock))
+      res = self._job_monitor.wait_until(JobMonitor.terminal, instance_ids, with_timeout=True)
+      if not res:
+        raise self.Error('Tasks were not killed in time.')
       log.info('Instances killed')
 
   def _add_instances(self, instance_ids, to_config):

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/464d6ea5/src/main/python/apache/aurora/client/cli/jobs.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/aurora/client/cli/jobs.py b/src/main/python/apache/aurora/client/cli/jobs.py
index 782b348..cf45640 100644
--- a/src/main/python/apache/aurora/client/cli/jobs.py
+++ b/src/main/python/apache/aurora/client/cli/jobs.py
@@ -16,6 +16,7 @@
 
 from __future__ import print_function
 from datetime import datetime
+import logging
 import json
 import os
 import pprint
@@ -31,6 +32,7 @@ from apache.aurora.client.cli import (
     EXIT_INVALID_CONFIGURATION,
     EXIT_INVALID_PARAMETER,
     EXIT_OK,
+    EXIT_TIMEOUT,
     Noun,
     Verb,
 )
@@ -117,7 +119,6 @@ class CreateJobCommand(Verb):
   def execute(self, context):
     config = context.get_job_config(context.options.jobspec, context.options.config_file)
     api = context.get_api(config.cluster())
-    monitor = JobMonitor(api, config.role(), config.environment(), config.name())
     resp = api.create_job(config)
     if resp.responseCode == ResponseCode.INVALID_REQUEST:
       raise context.CommandError(EXIT_INVALID_PARAMETER, 'Job not found')
@@ -126,9 +127,9 @@ class CreateJobCommand(Verb):
     if context.options.open_browser:
       context.open_job_page(api, config)
     if context.options.wait_until == 'RUNNING':
-      monitor.wait_until(monitor.running_or_finished)
+      JobMonitor(api.scheduler_proxy, config.job_key()).wait_until(JobMonitor.running_or_finished)
     elif context.options.wait_until == 'FINISHED':
-      monitor.wait_until(monitor.terminal)
+      JobMonitor(api.scheduler_proxy, config.job_key()).wait_until(JobMonitor.terminal)
     return EXIT_OK
 
 
@@ -285,6 +286,13 @@ class AbstractKillCommand(Verb):
         MAX_TOTAL_FAILURES_OPTION,
         NO_BATCHING_OPTION]
 
+  def wait_kill_tasks(self, context, scheduler, job_key, instances=None):
+    monitor = JobMonitor(scheduler, job_key)
+    if not monitor.wait_until(JobMonitor.terminal, instances=instances, with_timeout=True):
+      context.print_err('Tasks were not killed in time.')
+      return EXIT_TIMEOUT
+    return EXIT_OK
+
   def kill_in_batches(self, context, job, instances_arg):
     api = context.get_api(job.cluster)
     # query the job, to get the list of active instances.
@@ -303,8 +311,11 @@ class AbstractKillCommand(Verb):
       for i in range(min(context.options.batch_size, len(instances_to_kill))):
         batch.append(instances_to_kill.pop())
       resp = api.kill_job(job, batch)
-      if resp.responseCode is not ResponseCode.OK:
-        context.print_log('Kill of shards %s failed with error %s' % (batch, resp.message))
+      if resp.responseCode is not ResponseCode.OK or self.wait_kill_tasks(
+          context, api.scheduler_proxy, job, batch) is not EXIT_OK:
+
+        context.print_log(logging.INFO,
+                          'Kill of shards %s failed with error %s' % (batch, resp.message))
         errors += 1
         if errors > context.options.max_total_failures:
           raise context.CommandError(EXIT_COMMAND_FAILURE,
@@ -336,9 +347,10 @@ class KillCommand(AbstractKillCommand):
     api = context.get_api(job.cluster)
     if context.options.no_batching:
       resp = api.kill_job(job, instances_arg)
-      if resp.responseCode != ResponseCode.OK:
-        context.print_err('Job %s not found' % job, file=sys.stderr)
-        return EXIT_INVALID_PARAMETER
+      context.check_and_log_response(resp)
+      wait_result = self.wait_kill_tasks(context, api.scheduler_proxy, job, instances_arg)
+      if wait_result is not EXIT_OK:
+        return wait_result
     else:
       self.kill_in_batches(context, job, instances_arg)
     if context.options.open_browser:
@@ -363,9 +375,10 @@ class KillAllJobCommand(AbstractKillCommand):
     api = context.get_api(job.cluster)
     if context.options.no_batching:
       resp = api.kill_job(job, None)
-      if resp.responseCode != ResponseCode.OK:
-        context.print_err('Job %s not found' % job, file=sys.stderr)
-        return EXIT_INVALID_PARAMETER
+      context.check_and_log_response(resp)
+      wait_result = self.wait_kill_tasks(context, api.scheduler_proxy, job)
+      if wait_result is not EXIT_OK:
+        return wait_result
     else:
       self.kill_in_batches(context, job, None)
     if context.options.open_browser:

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/464d6ea5/src/main/python/apache/aurora/client/cli/task.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/aurora/client/cli/task.py b/src/main/python/apache/aurora/client/cli/task.py
index 62747ed..a162b86 100644
--- a/src/main/python/apache/aurora/client/cli/task.py
+++ b/src/main/python/apache/aurora/client/cli/task.py
@@ -27,7 +27,6 @@ import sys
 import time
 
 from apache.aurora.client.api.command_runner import DistributedCommandRunner
-from apache.aurora.client.api.job_monitor import JobMonitor
 from apache.aurora.client.api.updater_util import UpdaterConfig
 from apache.aurora.client.cli import (
     EXIT_COMMAND_FAILURE,

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/464d6ea5/src/main/python/apache/aurora/client/commands/core.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/aurora/client/commands/core.py b/src/main/python/apache/aurora/client/commands/core.py
index 39190e0..089a2c6 100644
--- a/src/main/python/apache/aurora/client/commands/core.py
+++ b/src/main/python/apache/aurora/client/commands/core.py
@@ -94,6 +94,13 @@ def get_job_config(job_spec, config_file, options):
       select_role=select_role,
       select_env=select_env)
 
+
+def wait_kill_tasks(scheduler, job_key, instances=None):
+  monitor = JobMonitor(scheduler, job_key)
+  if not monitor.wait_until(monitor.terminal, instances=instances, with_timeout=True):
+    die('Tasks were not killed in time.')
+
+
 @app.command
 def version(args):
   """usage: version
@@ -142,15 +149,14 @@ def create(job_spec, config_file):
     print("Error: %s" % v)
     sys.exit(1)
   api = make_client(config.cluster())
-  monitor = JobMonitor(api, config.role(), config.environment(), config.name())
   resp = api.create_job(config)
   check_and_log_response(resp)
   handle_open(api.scheduler_proxy.scheduler_client().url, config.role(), config.environment(),
       config.name())
   if options.wait_until == 'RUNNING':
-    monitor.wait_until(monitor.running_or_finished)
+    JobMonitor(api.scheduler_proxy, config.job_key()).wait_until(JobMonitor.running_or_finished)
   elif options.wait_until == 'FINISHED':
-    monitor.wait_until(monitor.terminal)
+    JobMonitor(api.scheduler_proxy, config.job_key()).wait_until(JobMonitor.terminal)
 
 
 @app.command
@@ -407,6 +413,8 @@ def kill(args, options):
   resp = api.kill_job(job_key, options.shards, config=config)
   check_and_log_response(resp)
   handle_open(api.scheduler_proxy.scheduler_client().url, job_key.role, job_key.env, job_key.name)
+  wait_kill_tasks(api.scheduler_proxy, job_key, options.shards)
+
 
 @app.command
 @app.command_option(CLUSTER_INVOKE_OPTION)
@@ -424,6 +432,7 @@ def killall(args, options):
   resp = api.kill_job(job_key, None, config=config)
   check_and_log_response(resp)
   handle_open(api.scheduler_proxy.scheduler_client().url, job_key.role, job_key.env, job_key.name)
+  wait_kill_tasks(api.scheduler_proxy, job_key)
 
 
 @app.command

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/464d6ea5/src/test/python/apache/aurora/client/api/BUILD
----------------------------------------------------------------------
diff --git a/src/test/python/apache/aurora/client/api/BUILD b/src/test/python/apache/aurora/client/api/BUILD
index dd9b797..f0fa58f 100644
--- a/src/test/python/apache/aurora/client/api/BUILD
+++ b/src/test/python/apache/aurora/client/api/BUILD
@@ -40,7 +40,7 @@ python_tests(
 python_tests(name = 'job_monitor',
   sources = ['test_job_monitor.py'],
   dependencies = [
-    pants('3rdparty/python:mox'),
+    pants('3rdparty/python:mock'),
     pants('src/main/python/apache/aurora/client/api:api'),
     pants('src/main/python/apache/aurora/client/api:job_monitor'),
     pants('src/main/thrift/org/apache/aurora/gen:py-thrift'),
@@ -99,6 +99,7 @@ python_tests(name = 'updater',
   sources = ['test_updater.py'],
   dependencies = [
     pants('3rdparty/python:mox'),
+    pants('src/main/python/apache/aurora/common:aurora_job_key'),
     pants('src/main/python/apache/aurora/client/api:updater'),
     pants('src/main/thrift/org/apache/aurora/gen:py-thrift'),
     pants('src/test/python/apache/aurora/client:fake_scheduler_proxy'),

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/464d6ea5/src/test/python/apache/aurora/client/api/test_job_monitor.py
----------------------------------------------------------------------
diff --git a/src/test/python/apache/aurora/client/api/test_job_monitor.py b/src/test/python/apache/aurora/client/api/test_job_monitor.py
index 32609e4..bfb7fed 100644
--- a/src/test/python/apache/aurora/client/api/test_job_monitor.py
+++ b/src/test/python/apache/aurora/client/api/test_job_monitor.py
@@ -13,42 +13,103 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 #
+import unittest
 
 from gen.apache.aurora.api.AuroraSchedulerManager import Client
 from gen.apache.aurora.api.ttypes import (
+    AssignedTask,
     Identity,
     Response,
     ResponseCode,
     Result,
+    ScheduleStatus,
     ScheduleStatusResult,
+    ScheduledTask,
+    TaskEvent,
     TaskQuery,
 )
 from apache.aurora.client.api import AuroraClientAPI
 from apache.aurora.client.api.job_monitor import JobMonitor
+from apache.aurora.common.aurora_job_key import AuroraJobKey
 
-from mox import MoxTestBase
+from mock import Mock
 
-ROLE = 'johndoe'
-ENV = 'test'
-JOB_NAME = 'test_job'
 
+class FakeClock(object):
+  def sleep(self, seconds):
+    pass
 
-class JobMonitorTest(MoxTestBase):
+
+class JobMonitorTest(unittest.TestCase):
 
   def setUp(self):
+    self._scheduler = Mock()
+    self._job_key = AuroraJobKey('cl', 'johndoe', 'test', 'test_job')
+    self._clock = FakeClock()
+
+  def create_task(self, status, id):
+    return ScheduledTask(
+        assignedTask=AssignedTask(
+            instanceId=id,
+            taskId=id),
+        status=status,
+        taskEvents=[TaskEvent(
+            status=status,
+            timestamp=10)]
+    )
+  def mock_get_tasks(self, tasks, response_code=None):
+    response_code = ResponseCode.OK if response_code is None else response_code
+    resp = Response(responseCode=response_code, message='test')
+    resp.result = Result(scheduleStatusResult=ScheduleStatusResult(tasks=tasks))
+    self._scheduler.getTasksStatus.return_value = resp
+
+  def expect_task_status(self, once=False, instances=None):
+    query = TaskQuery(
+        owner=Identity(role=self._job_key.role),
+        environment=self._job_key.env,
+        jobName=self._job_key.name)
+    if instances is not None:
+      query.instanceIds = frozenset([int(s) for s in instances])
+
+    if once:
+      self._scheduler.getTasksStatus.assert_called_once_with(query)
+    else:
+      self._scheduler.getTasksStatus.assert_called_with(query)
+
+  def test_wait_until_state(self):
+    self.mock_get_tasks([
+        self.create_task(ScheduleStatus.RUNNING, '1'),
+        self.create_task(ScheduleStatus.RUNNING, '2'),
+        self.create_task(ScheduleStatus.FAILED, '3'),
+    ])
+
+    monitor = JobMonitor(self._scheduler, self._job_key)
+    assert monitor.wait_until(monitor.running_or_finished)
+    self.expect_task_status(once=True)
+
+  def test_empty_job_succeeds(self):
+    self.mock_get_tasks([])
 
-    super(JobMonitorTest, self).setUp()
-    self.mock_api = self.mox.CreateMock(AuroraClientAPI)
-    self.mock_scheduler = self.mox.CreateMock(Client)
-    self.mock_api.scheduler_proxy = self.mock_scheduler
+    monitor = JobMonitor(self._scheduler, self._job_key)
+    assert monitor.wait_until(monitor.running_or_finished)
+    self.expect_task_status(once=True)
 
-  def test_init(self):
-    result = Result(scheduleStatusResult=ScheduleStatusResult(tasks=[]))
-    response = Response(responseCode=ResponseCode.OK, message="test", result=result)
-    query = TaskQuery(owner=Identity(role=ROLE), environment=ENV, jobName=JOB_NAME)
+  def test_wait_with_instances(self):
+    self.mock_get_tasks([
+        self.create_task(ScheduleStatus.FAILED, '2'),
+    ])
 
-    self.mock_scheduler.getTasksStatus(query).AndReturn(response)
+    monitor = JobMonitor(self._scheduler, self._job_key)
+    assert monitor.wait_until(monitor.terminal, instances=[2])
+    self.expect_task_status(once=True, instances=[2])
 
-    self.mox.ReplayAll()
+  def test_wait_until_timeout(self):
+    self.mock_get_tasks([
+        self.create_task(ScheduleStatus.RUNNING, '1'),
+        self.create_task(ScheduleStatus.RUNNING, '2'),
+        self.create_task(ScheduleStatus.RUNNING, '3'),
+    ])
 
-    JobMonitor(self.mock_api, ROLE, ENV, JOB_NAME)
+    monitor = JobMonitor(self._scheduler, self._job_key, clock=self._clock)
+    assert not monitor.wait_until(monitor.terminal, with_timeout=True)
+    self.expect_task_status()

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/464d6ea5/src/test/python/apache/aurora/client/api/test_updater.py
----------------------------------------------------------------------
diff --git a/src/test/python/apache/aurora/client/api/test_updater.py b/src/test/python/apache/aurora/client/api/test_updater.py
index e7eb1e7..4c931d5 100644
--- a/src/test/python/apache/aurora/client/api/test_updater.py
+++ b/src/test/python/apache/aurora/client/api/test_updater.py
@@ -19,9 +19,11 @@ from os import environ
 from unittest import TestCase
 
 from apache.aurora.client.api.instance_watcher import InstanceWatcher
+from apache.aurora.client.api.job_monitor import JobMonitor
 from apache.aurora.client.api.quota_check import CapacityRequest, QuotaCheck
 from apache.aurora.client.api.updater import Updater
 from apache.aurora.client.fake_scheduler_proxy import FakeSchedulerProxy
+from apache.aurora.common.aurora_job_key import AuroraJobKey
 
 from gen.apache.aurora.api.AuroraSchedulerManager import Client as scheduler_client
 from gen.apache.aurora.api.constants import ACTIVE_STATES
@@ -95,6 +97,9 @@ class FakeConfig(object):
   def job(self):
     return self.job_config
 
+  def job_key(self):
+    return AuroraJobKey(self.cluster(), self.role(), self.environment(), self.name())
+
   def instances(self):
     return self.job_config.instanceCount
 
@@ -117,6 +122,7 @@ class UpdaterTest(TestCase):
     self._session_key = 'test_session'
     self._lock = 'test_lock'
     self._instance_watcher = MockObject(InstanceWatcher)
+    self._job_monitor = MockObject(JobMonitor)
     self._scheduler = MockObject(scheduler_client)
     self._scheduler_proxy = FakeSchedulerProxy('test-cluster', self._scheduler, self._session_key)
     self._quota_check = MockObject(QuotaCheck)
@@ -129,11 +135,13 @@ class UpdaterTest(TestCase):
     Replay(self._scheduler)
     Replay(self._instance_watcher)
     Replay(self._quota_check)
+    Replay(self._job_monitor)
 
   def verify_mocks(self):
     Verify(self._scheduler)
     Verify(self._instance_watcher)
     Verify(self._quota_check)
+    Verify(self._job_monitor)
 
   def init_updater(self, update_config):
     self._config = FakeConfig(self._role, self._name, self._env, update_config)
@@ -142,7 +150,8 @@ class UpdaterTest(TestCase):
         3,
         self._scheduler_proxy,
         self._instance_watcher,
-        self._quota_check)
+        self._quota_check,
+        self._job_monitor)
 
   def expect_watch_instances(self, instance_ids, failed_instances=[]):
     self._instance_watcher.watch(instance_ids).AndReturn(set(failed_instances))
@@ -183,7 +192,7 @@ class UpdaterTest(TestCase):
         self._lock,
         self._session_key).AndReturn(response)
 
-  def expect_kill(self, instance_ids, response_code=None):
+  def expect_kill(self, instance_ids, response_code=None, monitor_result=True):
     response_code = ResponseCode.OK if response_code is None else response_code
     response = Response(responseCode=response_code, message='test')
     query = TaskQuery(
@@ -193,6 +202,11 @@ class UpdaterTest(TestCase):
         statuses=ACTIVE_STATES,
         instanceIds=frozenset([int(s) for s in instance_ids]))
     self._scheduler.killTasks(query, self._lock, self._session_key).AndReturn(response)
+    if response_code != ResponseCode.OK:
+      return
+
+    self._job_monitor.wait_until(JobMonitor.terminal, instance_ids, with_timeout=True).AndReturn(
+        monitor_result)
 
   def expect_add(self, instance_ids, task_config, response_code=None):
     response_code = ResponseCode.OK if response_code is None else response_code
@@ -653,6 +667,23 @@ class UpdaterTest(TestCase):
     self.update_and_expect_response(ResponseCode.ERROR)
     self.verify_mocks()
 
+  def test_update_kill_timeout(self):
+    """Test job monitor timeout while waiting for tasks killed."""
+    old_configs = self.make_task_configs(5)
+    new_config = deepcopy(old_configs[0])
+    new_config.priority = 5
+    job_config = self.make_job_config(new_config, 5)
+    self._config.job_config = job_config
+    self.expect_start()
+    self.expect_get_tasks(old_configs)
+    self.expect_populate(job_config)
+    self.expect_quota_check(5, 5)
+    self.expect_kill([0, 1, 2], monitor_result=False)
+    self.replay_mocks()
+
+    self.update_and_expect_response(ResponseCode.ERROR)
+    self.verify_mocks()
+
   def test_job_does_not_exist(self):
     """Unable to update a job that does not exist."""
     old_configs = self.make_task_configs(5)

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/464d6ea5/src/test/python/apache/aurora/client/cli/test_command_hooks.py
----------------------------------------------------------------------
diff --git a/src/test/python/apache/aurora/client/cli/test_command_hooks.py b/src/test/python/apache/aurora/client/cli/test_command_hooks.py
index 7c6f70c..6c6f6f5 100644
--- a/src/test/python/apache/aurora/client/cli/test_command_hooks.py
+++ b/src/test/python/apache/aurora/client/cli/test_command_hooks.py
@@ -137,7 +137,7 @@ class TestClientCreateCommand(AuroraClientCommandTest):
             fp.name])
 
       self.assert_create_job_called(api)
-      self.assert_scheduler_called(api, mock_query, 2)
+      self.assert_scheduler_called(api, mock_query, 1)
       assert command_hook.ran_pre
       assert command_hook.ran_post
 

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/464d6ea5/src/test/python/apache/aurora/client/cli/test_create.py
----------------------------------------------------------------------
diff --git a/src/test/python/apache/aurora/client/cli/test_create.py b/src/test/python/apache/aurora/client/cli/test_create.py
index 875573e..96dc592 100644
--- a/src/test/python/apache/aurora/client/cli/test_create.py
+++ b/src/test/python/apache/aurora/client/cli/test_create.py
@@ -57,13 +57,9 @@ class TestClientCreateCommand(AuroraClientCommandTest):
   def create_mock_status_query_result(cls, scheduleStatus):
     mock_query_result = cls.create_simple_success_response()
     mock_query_result.result.scheduleStatusResult = Mock(spec=ScheduleStatusResult)
-    if scheduleStatus == ScheduleStatus.INIT:
-      # status query result for before job is launched.
-      mock_query_result.result.scheduleStatusResult.tasks = []
-    else:
-      mock_task_one = cls.create_mock_task('hello', 0, 1000, scheduleStatus)
-      mock_task_two = cls.create_mock_task('hello', 1, 1004, scheduleStatus)
-      mock_query_result.result.scheduleStatusResult.tasks = [mock_task_one, mock_task_two]
+    mock_task_one = cls.create_mock_task('hello', 0, 1000, scheduleStatus)
+    mock_task_two = cls.create_mock_task('hello', 1, 1004, scheduleStatus)
+    mock_query_result.result.scheduleStatusResult.tasks = [mock_task_one, mock_task_two]
     return mock_query_result
 
   @classmethod
@@ -99,13 +95,15 @@ class TestClientCreateCommand(AuroraClientCommandTest):
     # We'll patch out create_context, which will give us a fake context
     # object, and everything can be stubbed through that.
     mock_context = FakeAuroraCommandContext()
-    with patch('apache.aurora.client.cli.jobs.Job.create_context', return_value=mock_context):
+    with contextlib.nested(
+        patch('time.sleep'),
+        patch('apache.aurora.client.cli.jobs.Job.create_context', return_value=mock_context)):
       # After making the client, create sets up a job monitor.
       # The monitor uses TaskQuery to get the tasks. It's called at least twice:once before
       # the job is created, and once after. So we need to set up mocks for the query results.
       mock_query = self.create_mock_query()
       mock_context.add_expected_status_query_result(
-        self.create_mock_status_query_result(ScheduleStatus.INIT))
+        self.create_mock_status_query_result(ScheduleStatus.PENDING))
       mock_context.add_expected_status_query_result(
         self.create_mock_status_query_result(ScheduleStatus.RUNNING))
       api = mock_context.get_api('west')
@@ -133,8 +131,7 @@ class TestClientCreateCommand(AuroraClientCommandTest):
         patch('time.sleep'),
         patch('apache.aurora.client.cli.jobs.Job.create_context', return_value=mock_context)):
       mock_query = self.create_mock_query()
-      for result in [ScheduleStatus.INIT, ScheduleStatus.PENDING, ScheduleStatus.PENDING,
-          ScheduleStatus.RUNNING, ScheduleStatus.FINISHED]:
+      for result in [ScheduleStatus.PENDING, ScheduleStatus.PENDING, ScheduleStatus.RUNNING]:
         mock_context.add_expected_status_query_result(self.create_mock_status_query_result(result))
       api = mock_context.get_api('west')
       api.create_job.return_value = self.get_createjob_response()
@@ -147,7 +144,7 @@ class TestClientCreateCommand(AuroraClientCommandTest):
         # Now check that the right API calls got made.
         # Check that create_job was called exactly once, with an AuroraConfig parameter.
         self.assert_create_job_called(api)
-        self.assert_scheduler_called(api, mock_query, 4)
+        self.assert_scheduler_called(api, mock_query, 3)
 
   def test_create_job_failed(self):
     """Run a test of the "create" command against a mocked-out API:
@@ -172,8 +169,6 @@ class TestClientCreateCommand(AuroraClientCommandTest):
       # Check that create_job was called exactly once, with an AuroraConfig parameter.
       self.assert_create_job_called(api)
 
-      # getTasksStatus was called once, before the create_job
-      assert api.scheduler_proxy.getTasksStatus.call_count == 1
 
   def test_create_job_failed_invalid_config(self):
     """Run a test of the "create" command against a mocked-out API, with a configuration

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/464d6ea5/src/test/python/apache/aurora/client/cli/test_kill.py
----------------------------------------------------------------------
diff --git a/src/test/python/apache/aurora/client/cli/test_kill.py b/src/test/python/apache/aurora/client/cli/test_kill.py
index cf5df64..e11fc81 100644
--- a/src/test/python/apache/aurora/client/cli/test_kill.py
+++ b/src/test/python/apache/aurora/client/cli/test_kill.py
@@ -19,6 +19,7 @@ import unittest
 
 from twitter.common.contextutil import temporary_file
 
+from apache.aurora.client.cli import EXIT_TIMEOUT
 from apache.aurora.client.cli.client import AuroraCommandLine
 from apache.aurora.client.cli.options import parse_instances
 from apache.aurora.client.cli.util import AuroraClientCommandTest, FakeAuroraCommandContext
@@ -27,8 +28,12 @@ from apache.aurora.common.aurora_job_key import AuroraJobKey
 from twitter.common.contextutil import temporary_file
 
 from gen.apache.aurora.api.ttypes import (
+    AssignedTask,
     Identity,
+    ScheduleStatus,
     ScheduleStatusResult,
+    ScheduledTask,
+    TaskEvent,
     TaskQuery,
 )
 
@@ -55,11 +60,24 @@ class TestClientKillCommand(AuroraClientCommandTest):
   def assert_kill_job_called(cls, mock_api):
     assert mock_api.kill_job.call_count == 1
 
+  @classmethod
+  def assert_scheduler_called(cls, mock_api, mock_query, num_queries):
+    assert mock_api.scheduler_proxy.getTasksStatus.call_count == num_queries
+    mock_api.scheduler_proxy.getTasksStatus.assert_called_with(mock_query)
+
+  @classmethod
+  def get_expected_task_query(cls, instances=None):
+    instance_ids = frozenset(instances) if instances is not None else None
+    return TaskQuery(taskIds=None, jobName=cls.TEST_JOB, environment=cls.TEST_ENV,
+                     instanceIds=instance_ids, owner=Identity(role=cls.TEST_ROLE, user=None))
+
+
   def test_killall_job(self):
     """Test kill client-side API logic."""
     mock_context = FakeAuroraCommandContext()
     mock_scheduler_proxy = Mock()
     with contextlib.nested(
+        patch('time.sleep'),
         patch('apache.aurora.client.cli.jobs.Job.create_context', return_value=mock_context),
         patch('apache.aurora.client.factory.CLUSTERS', new=self.TEST_CLUSTERS)):
 
@@ -67,6 +85,8 @@ class TestClientKillCommand(AuroraClientCommandTest):
       mock_scheduler_proxy.getTasksStatus.return_value = self.create_status_call_result()
       api.kill_job.return_value = self.get_kill_job_response()
       mock_scheduler_proxy.killTasks.return_value = self.get_kill_job_response()
+      mock_context.add_expected_status_query_result(self.create_status_call_result(
+          self.create_mock_task(ScheduleStatus.KILLED)))
       with temporary_file() as fp:
         fp.write(self.get_valid_config())
         fp.flush()
@@ -76,12 +96,43 @@ class TestClientKillCommand(AuroraClientCommandTest):
       # Now check that the right API calls got made.
       assert api.kill_job.call_count == 1
       api.kill_job.assert_called_with(AuroraJobKey.from_path('west/bozo/test/hello'), None)
+      self.assert_scheduler_called(api, self.get_expected_task_query(), 2)
+
+  def test_killall_job_wait_until_timeout(self):
+    """Test kill client-side API logic."""
+    mock_context = FakeAuroraCommandContext()
+    mock_scheduler_proxy = Mock()
+    with contextlib.nested(
+        patch('time.sleep'),
+        patch('apache.aurora.client.cli.jobs.Job.create_context', return_value=mock_context),
+        patch('apache.aurora.client.factory.CLUSTERS', new=self.TEST_CLUSTERS)):
+
+      api = mock_context.get_api('west')
+      mock_scheduler_proxy.getTasksStatus.return_value = self.create_status_call_result()
+      api.kill_job.return_value = self.get_kill_job_response()
+      mock_scheduler_proxy.killTasks.return_value = self.get_kill_job_response()
+      for _ in range(8):
+        mock_context.add_expected_status_query_result(self.create_status_call_result(
+            self.create_mock_task(ScheduleStatus.RUNNING)))
+
+      with temporary_file() as fp:
+        fp.write(self.get_valid_config())
+        fp.flush()
+        cmd = AuroraCommandLine()
+        assert EXIT_TIMEOUT == cmd.execute(
+            ['job', 'killall', '--no-batching', '--config=%s' % fp.name, 'west/bozo/test/hello'])
+
+      # Now check that the right API calls got made.
+      assert api.kill_job.call_count == 1
+      api.kill_job.assert_called_with(AuroraJobKey.from_path('west/bozo/test/hello'), None)
+      self.assert_scheduler_called(api, self.get_expected_task_query(), 8)
 
   def test_killall_job(self):
     """Test kill client-side API logic."""
     mock_context = FakeAuroraCommandContext()
     mock_scheduler_proxy = Mock()
     with contextlib.nested(
+        patch('time.sleep'),
         patch('apache.aurora.client.cli.jobs.Job.create_context', return_value=mock_context),
         patch('apache.aurora.client.factory.CLUSTERS', new=self.TEST_CLUSTERS)):
 
@@ -89,6 +140,8 @@ class TestClientKillCommand(AuroraClientCommandTest):
       api.kill_job.return_value = self.get_kill_job_response()
       mock_context.add_expected_status_query_result(self.create_status_call_result())
       mock_scheduler_proxy.killTasks.return_value = self.get_kill_job_response()
+      mock_context.add_expected_status_query_result(self.create_status_call_result(
+          self.create_mock_task(ScheduleStatus.KILLED)))
       with temporary_file() as fp:
         fp.write(self.get_valid_config())
         fp.flush()
@@ -97,17 +150,22 @@ class TestClientKillCommand(AuroraClientCommandTest):
 
       # Now check that the right API calls got made.
       assert api.kill_job.call_count == 4
-      api.kill_job.assert_called_with(AuroraJobKey.from_path('west/bozo/test/hello'), [15, 16, 17, 18, 19])
+      instances = [15, 16, 17, 18, 19]
+      api.kill_job.assert_called_with(AuroraJobKey.from_path('west/bozo/test/hello'), instances)
+      self.assert_scheduler_called(api, self.get_expected_task_query(instances), 6)
 
   def test_kill_job_with_instances_nobatching(self):
     """Test kill client-side API logic."""
     mock_context = FakeAuroraCommandContext()
     with contextlib.nested(
+        patch('time.sleep'),
         patch('apache.aurora.client.cli.jobs.Job.create_context', return_value=mock_context),
         patch('apache.aurora.client.factory.CLUSTERS', new=self.TEST_CLUSTERS)):
       api = mock_context.get_api('west')
       self.setup_get_tasks_status_calls(api.scheduler_proxy)
       api.kill_job.return_value = self.get_kill_job_response()
+      mock_context.add_expected_status_query_result(self.create_status_call_result(
+          self.create_mock_task(ScheduleStatus.KILLED)))
       with temporary_file() as fp:
         fp.write(self.get_valid_config())
         fp.flush()
@@ -116,19 +174,24 @@ class TestClientKillCommand(AuroraClientCommandTest):
 
       # Now check that the right API calls got made.
       assert api.kill_job.call_count == 1
-      api.kill_job.assert_called_with(AuroraJobKey.from_path('west/bozo/test/hello'),
-          [0, 2, 4, 5, 6])
+      instances = [0, 2, 4, 5, 6]
+      api.kill_job.assert_called_with(AuroraJobKey.from_path('west/bozo/test/hello'), instances)
+      self.assert_scheduler_called(api, self.get_expected_task_query(instances), 2)
 
   def test_kill_job_with_instances_batched(self):
     """Test kill client-side API logic."""
     mock_context = FakeAuroraCommandContext()
     with contextlib.nested(
+        patch('time.sleep'),
         patch('apache.aurora.client.cli.jobs.Job.create_context', return_value=mock_context),
         patch('apache.aurora.client.factory.CLUSTERS', new=self.TEST_CLUSTERS)):
       api = mock_context.get_api('west')
       status_result = self.create_status_call_result()
       mock_context.add_expected_status_query_result(status_result)
       api.kill_job.return_value = self.get_kill_job_response()
+      mock_context.add_expected_status_query_result(self.create_status_call_result(
+          self.create_mock_task(ScheduleStatus.KILLED)))
+
       with temporary_file() as fp:
         fp.write(self.get_valid_config())
         fp.flush()
@@ -137,19 +200,25 @@ class TestClientKillCommand(AuroraClientCommandTest):
 
       # Now check that the right API calls got made.
       assert api.kill_job.call_count == 1
-      api.kill_job.assert_called_with(AuroraJobKey.from_path('west/bozo/test/hello'),
-          [0, 2, 4, 5, 6])
+      instances = [0, 2, 4, 5, 6]
+      api.kill_job.assert_called_with(AuroraJobKey.from_path('west/bozo/test/hello'), instances)
+      # Expect total 3 calls (one from JobMonitor).
+      self.assert_scheduler_called(api, self.get_expected_task_query(instances), 3)
 
   def test_kill_job_with_instances_batched_large(self):
     """Test kill client-side API logic."""
     mock_context = FakeAuroraCommandContext()
     with contextlib.nested(
+        patch('time.sleep'),
         patch('apache.aurora.client.cli.jobs.Job.create_context', return_value=mock_context),
         patch('apache.aurora.client.factory.CLUSTERS', new=self.TEST_CLUSTERS)):
       api = mock_context.get_api('west')
       status_result = self.create_status_call_result()
       mock_context.add_expected_status_query_result(status_result)
       api.kill_job.return_value = self.get_kill_job_response()
+      mock_context.add_expected_status_query_result(self.create_status_call_result(
+          self.create_mock_task(ScheduleStatus.KILLED)))
+
       with temporary_file() as fp:
         fp.write(self.get_valid_config())
         fp.flush()
@@ -160,6 +229,8 @@ class TestClientKillCommand(AuroraClientCommandTest):
       assert api.kill_job.call_count == 3
       api.kill_job.assert_called_with(AuroraJobKey.from_path('west/bozo/test/hello'),
           [12, 13])
+      # Expect total 5 calls (3 from JobMonitor).
+      self.assert_scheduler_called(api, self.get_expected_task_query([12, 13]), 5)
 
   def test_kill_job_with_instances_batched_maxerrors(self):
     """Test kill client-side API logic."""
@@ -169,27 +240,24 @@ class TestClientKillCommand(AuroraClientCommandTest):
         patch('apache.aurora.client.factory.CLUSTERS', new=self.TEST_CLUSTERS)):
       api = mock_context.get_api('west')
       status_result = self.create_status_call_result()
-      failed_status_result  = self.create_error_response()
-      mock_context.add_expected_status_query_result(status_result)
-      mock_context.add_expected_status_query_result(failed_status_result)
-      mock_context.add_expected_status_query_result(failed_status_result)
-      mock_context.add_expected_status_query_result(status_result)
       mock_context.add_expected_status_query_result(status_result)
-      api.kill_job.return_value = self.get_kill_job_response()
+      api.kill_job.return_value = self.create_error_response()
+
       with temporary_file() as fp:
         fp.write(self.get_valid_config())
         fp.flush()
         cmd = AuroraCommandLine()
         cmd.execute(['job', 'kill', '--max-total-failures=1', '--config=%s' % fp.name, 'west/bozo/test/hello/0,2,4-13'])
 
-      # Now check that the right API calls got made. We should have aborted after the third batch.
-      assert api.kill_job.call_count == 3
-
+      # Now check that the right API calls got made. We should have aborted after the second batch.
+      assert api.kill_job.call_count == 2
+      assert api.scheduler_proxy.getTasksStatus.call_count == 0
 
   def test_kill_job_with_empty_instances_batched(self):
     """Test kill client-side API logic."""
     mock_context = FakeAuroraCommandContext()
     with contextlib.nested(
+        patch('time.sleep'),
         patch('apache.aurora.client.cli.jobs.Job.create_context', return_value=mock_context),
         patch('apache.aurora.client.factory.CLUSTERS', new=self.TEST_CLUSTERS)):
       api = mock_context.get_api('west')
@@ -197,7 +265,6 @@ class TestClientKillCommand(AuroraClientCommandTest):
       status_response = self.create_simple_success_response()
       schedule_status = Mock(spec=ScheduleStatusResult)
       status_response.result.scheduleStatusResult = schedule_status
-      mock_task_config = Mock()
       schedule_status.tasks = []
       mock_context.add_expected_status_query_result(status_response)
       api.kill_job.return_value = self.get_kill_job_response()
@@ -215,6 +282,7 @@ class TestClientKillCommand(AuroraClientCommandTest):
     """Test kill client-side API logic."""
     (mock_api, mock_scheduler_proxy) = self.create_mock_api()
     with contextlib.nested(
+        patch('time.sleep'),
         patch('apache.aurora.client.api.SchedulerProxy', return_value=mock_scheduler_proxy),
         patch('apache.aurora.client.factory.CLUSTERS', new=self.TEST_CLUSTERS)):
       mock_scheduler_proxy.killTasks.return_value = self.get_kill_job_response()

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/464d6ea5/src/test/python/apache/aurora/client/cli/test_plugins.py
----------------------------------------------------------------------
diff --git a/src/test/python/apache/aurora/client/cli/test_plugins.py b/src/test/python/apache/aurora/client/cli/test_plugins.py
index 2dab749..21c4400 100644
--- a/src/test/python/apache/aurora/client/cli/test_plugins.py
+++ b/src/test/python/apache/aurora/client/cli/test_plugins.py
@@ -149,7 +149,7 @@ class TestPlugins(AuroraClientCommandTest):
       # Now check that the right API calls got made.
       # Check that create_job was called exactly once, with an AuroraConfig parameter.
       self.assert_create_job_called(api)
-      self.assert_scheduler_called(api, mock_query, 2)
+      self.assert_scheduler_called(api, mock_query, 1)
       # Check that the plugin did its job.
       assert mock_context.bogosity == "maximum"
       assert mock_context.after == True
@@ -177,7 +177,7 @@ class TestPlugins(AuroraClientCommandTest):
         cmd.execute(['job', 'create', '--wait-until=RUNNING',
             'west/bozo/test/hello', fp.name])
       self.assert_create_job_called(api)
-      self.assert_scheduler_called(api, mock_query, 2)
+      self.assert_scheduler_called(api, mock_query, 1)
 
   def mock_print(self, str):
     for str in str.split('\n'):

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/464d6ea5/src/test/python/apache/aurora/client/cli/test_update.py
----------------------------------------------------------------------
diff --git a/src/test/python/apache/aurora/client/cli/test_update.py b/src/test/python/apache/aurora/client/cli/test_update.py
index cf077e8..854d583 100644
--- a/src/test/python/apache/aurora/client/cli/test_update.py
+++ b/src/test/python/apache/aurora/client/cli/test_update.py
@@ -20,6 +20,7 @@ from twitter.common.contextutil import temporary_file
 
 from apache.aurora.client.api.updater import Updater
 from apache.aurora.client.api.health_check import StatusHealthCheck, Retriable
+from apache.aurora.client.api.job_monitor import JobMonitor
 from apache.aurora.client.api.quota_check import QuotaCheck
 from apache.aurora.client.cli import EXIT_INVALID_CONFIGURATION
 from apache.aurora.client.cli.client import AuroraCommandLine
@@ -184,6 +185,13 @@ class TestUpdateCommand(AuroraClientCommandTest):
     mock_quota_check = Mock(spec=QuotaCheck)
     mock_quota_check.validate_quota_from_requested.return_value = \
         cls.create_simple_success_response()
+    return mock_quota_check
+
+  @classmethod
+  def setup_job_monitor(cls):
+    mock_job_monitor = Mock(spec=JobMonitor)
+    mock_job_monitor.wait_until.return_value = True
+    return mock_job_monitor
 
   def test_updater_simple(self):
     # Test the client-side updater logic in its simplest case: everything succeeds,
@@ -191,6 +199,7 @@ class TestUpdateCommand(AuroraClientCommandTest):
     (mock_api, mock_scheduler_proxy) = self.create_mock_api()
     mock_health_check = self.setup_health_checks(mock_api)
     mock_quota_check = self.setup_quota_check()
+    mock_job_monitor = self.setup_job_monitor()
     self.setup_mock_scheduler_for_simple_update(mock_api)
     # This doesn't work, because:
     # - The mock_context stubs out the API.
@@ -200,7 +209,8 @@ class TestUpdateCommand(AuroraClientCommandTest):
         patch('apache.aurora.client.api.SchedulerProxy', return_value=mock_scheduler_proxy),
         patch('apache.aurora.client.api.instance_watcher.StatusHealthCheck',
             return_value=mock_health_check),
-        patch('apache.aurora.client.api.quota_check.QuotaCheck', return_value=mock_quota_check),
+        patch('apache.aurora.client.api.updater.JobMonitor', return_value=mock_job_monitor),
+        patch('apache.aurora.client.api.updater.QuotaCheck', return_value=mock_quota_check),
         patch('time.time', side_effect=functools.partial(self.fake_time, self)),
         patch('time.sleep', return_value=None)):
       with temporary_file() as fp:

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/464d6ea5/src/test/python/apache/aurora/client/cli/util.py
----------------------------------------------------------------------
diff --git a/src/test/python/apache/aurora/client/cli/util.py b/src/test/python/apache/aurora/client/cli/util.py
index 2225ab0..e17f256 100644
--- a/src/test/python/apache/aurora/client/cli/util.py
+++ b/src/test/python/apache/aurora/client/cli/util.py
@@ -21,8 +21,11 @@ from gen.apache.aurora.api.ttypes import (
     Response,
     ResponseCode,
     Result,
+    ScheduleStatus,
     ScheduleStatusResult,
     ScheduledTask,
+    TaskConfig,
+    TaskEvent,
 )
 
 from apache.aurora.client.cli.context import AuroraCommandContext
@@ -129,24 +132,33 @@ class AuroraClientCommandTest(unittest.TestCase):
     return mock_api_factory, mock_scheduler_client
 
   @classmethod
-  def create_status_call_result(cls):
+  def create_status_call_result(cls, mock_task=None):
     status_response = cls.create_simple_success_response()
     schedule_status = Mock(spec=ScheduleStatusResult)
     status_response.result.scheduleStatusResult = schedule_status
-    mock_task_config = Mock()
     # This should be a list of ScheduledTask's.
     schedule_status.tasks = []
-    for i in range(20):
-      task_status = Mock(spec=ScheduledTask)
-      task_status.assignedTask = Mock(spec=AssignedTask)
-      task_status.assignedTask.instanceId = i
-      task_status.assignedTask.taskId = "Task%s" % i
-      task_status.assignedTask.slaveId = "Slave%s" % i
-      task_status.slaveHost = "Slave%s" % i
-      task_status.assignedTask.task = mock_task_config
-      schedule_status.tasks.append(task_status)
+    if mock_task is None:
+      for i in range(20):
+        schedule_status.tasks.append(cls.create_mock_task(i))
+    else:
+      schedule_status.tasks.append(mock_task)
     return status_response
 
+  @classmethod
+  def create_mock_task(cls, instance_id, status=ScheduleStatus.RUNNING):
+    mock_task = Mock(spec=ScheduledTask)
+    mock_task.assignedTask = Mock(spec=AssignedTask)
+    mock_task.assignedTask.instanceId = instance_id
+    mock_task.assignedTask.taskId = "Task%s" % instance_id
+    mock_task.assignedTask.slaveId = "Slave%s" % instance_id
+    mock_task.assignedTask.task = Mock(spec=TaskConfig)
+    mock_task.slaveHost = "Slave%s" % instance_id
+    mock_task.status = status
+    mock_task_event = Mock(spec=TaskEvent)
+    mock_task_event.timestamp = 1000
+    mock_task.taskEvents = [mock_task_event]
+    return mock_task
 
   @classmethod
   def setup_get_tasks_status_calls(cls, scheduler):

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/464d6ea5/src/test/python/apache/aurora/client/commands/test_create.py
----------------------------------------------------------------------
diff --git a/src/test/python/apache/aurora/client/commands/test_create.py b/src/test/python/apache/aurora/client/commands/test_create.py
index e0ecb52..328b980 100644
--- a/src/test/python/apache/aurora/client/commands/test_create.py
+++ b/src/test/python/apache/aurora/client/commands/test_create.py
@@ -71,13 +71,9 @@ class TestClientCreateCommand(AuroraClientCommandTest):
   def create_mock_status_query_result(cls, scheduleStatus):
     mock_query_result = cls.create_simple_success_response()
     mock_query_result.result.scheduleStatusResult = Mock(spec=ScheduleStatusResult)
-    if scheduleStatus == ScheduleStatus.INIT:
-      # status query result for before job is launched.
-      mock_query_result.result.scheduleStatusResult.tasks = []
-    else:
-      mock_task_one = cls.create_mock_task('hello', 0, 1000, scheduleStatus)
-      mock_task_two = cls.create_mock_task('hello', 1, 1004, scheduleStatus)
-      mock_query_result.result.scheduleStatusResult.tasks = [mock_task_one, mock_task_two]
+    mock_task_one = cls.create_mock_task('hello', 0, 1000, scheduleStatus)
+    mock_task_two = cls.create_mock_task('hello', 1, 1004, scheduleStatus)
+    mock_query_result.result.scheduleStatusResult.tasks = [mock_task_one, mock_task_two]
     return mock_query_result
 
   @classmethod
@@ -127,7 +123,6 @@ class TestClientCreateCommand(AuroraClientCommandTest):
       # the job is created, and once after. So we need to set up mocks for the query results.
       mock_query = self.create_mock_query()
       mock_scheduler_proxy.getTasksStatus.side_effect = [
-        self.create_mock_status_query_result(ScheduleStatus.INIT),
         self.create_mock_status_query_result(ScheduleStatus.RUNNING)
       ]
 
@@ -146,7 +141,7 @@ class TestClientCreateCommand(AuroraClientCommandTest):
       # Now check that the right API calls got made.
       # Check that create_job was called exactly once, with an AuroraConfig parameter.
       self.assert_create_job_called(mock_api)
-      self.assert_scheduler_called(mock_api, mock_query, 2)
+      self.assert_scheduler_called(mock_api, mock_query, 1)
       # make_client should have been called once.
       make_client.assert_called_with('west')
 
@@ -162,12 +157,12 @@ class TestClientCreateCommand(AuroraClientCommandTest):
         patch('twitter.common.app.get_options', return_value=mock_options)) as (sleep, make_client,
         options):
       mock_query = self.create_mock_query()
+      mock_options.wait_until = 'FINISHED'
       mock_query_results = [
-          self.create_mock_status_query_result(ScheduleStatus.INIT),
           self.create_mock_status_query_result(ScheduleStatus.PENDING),
           self.create_mock_status_query_result(ScheduleStatus.PENDING),
           self.create_mock_status_query_result(ScheduleStatus.RUNNING),
-          self.create_mock_status_query_result(ScheduleStatus.FINISHED)
+          self.create_mock_status_query_result(ScheduleStatus.FINISHED),
       ]
       mock_scheduler_proxy.getTasksStatus.side_effect = mock_query_results
       mock_api.create_job.return_value = self.get_createjob_response()
@@ -193,11 +188,6 @@ class TestClientCreateCommand(AuroraClientCommandTest):
         patch('apache.aurora.client.commands.core.make_client', return_value=mock_api),
         patch('twitter.common.app.get_options', return_value=mock_options)) as (make_client,
         options):
-      mock_query = self.create_mock_query()
-      mock_query_results = [
-          self.create_mock_status_query_result(ScheduleStatus.INIT)
-      ]
-      mock_scheduler_proxy.getTasksStatus.side_effect = mock_query_results
       mock_api.create_job.return_value = self.get_failed_createjob_response()
       # This is the real test: invoke create as if it had been called by the command line.
       with temporary_file() as fp:
@@ -209,9 +199,6 @@ class TestClientCreateCommand(AuroraClientCommandTest):
       # Check that create_job was called exactly once, with an AuroraConfig parameter.
       self.assert_create_job_called(mock_api)
 
-      # getTasksStatus was called once, before the create_job
-      assert mock_scheduler_proxy.getTasksStatus.call_count == 1
-      mock_scheduler_proxy.getTasksStatus.assert_called_with(mock_query)
       # make_client should have been called once.
       make_client.assert_called_with('west')
 
@@ -228,11 +215,9 @@ class TestClientCreateCommand(AuroraClientCommandTest):
         options):
       mock_query = self.create_mock_query()
       mock_query_results = [
-          self.create_mock_status_query_result(ScheduleStatus.INIT),
           self.create_mock_status_query_result(ScheduleStatus.PENDING),
           self.create_mock_status_query_result(ScheduleStatus.PENDING),
-          self.create_mock_status_query_result(ScheduleStatus.RUNNING),
-          self.create_mock_status_query_result(ScheduleStatus.FINISHED)
+          self.create_mock_status_query_result(ScheduleStatus.RUNNING)
       ]
       mock_scheduler_proxy.getTasksStatus.side_effect = mock_query_results
       mock_api.create_job.return_value = self.get_createjob_response()
@@ -243,7 +228,7 @@ class TestClientCreateCommand(AuroraClientCommandTest):
 
       # Now check that the right API calls got made.
       self.assert_create_job_called(mock_api)
-      self.assert_scheduler_called(mock_api, mock_query, 4)
+      self.assert_scheduler_called(mock_api, mock_query, 3)
       # make_client should have been called once.
       make_client.assert_called_with('west')
 

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/464d6ea5/src/test/python/apache/aurora/client/commands/test_kill.py
----------------------------------------------------------------------
diff --git a/src/test/python/apache/aurora/client/commands/test_kill.py b/src/test/python/apache/aurora/client/commands/test_kill.py
index db820a5..7f0b73c 100644
--- a/src/test/python/apache/aurora/client/commands/test_kill.py
+++ b/src/test/python/apache/aurora/client/commands/test_kill.py
@@ -26,9 +26,12 @@ from twitter.common.contextutil import temporary_file
 from apache.aurora.client.commands.util import AuroraClientCommandTest
 
 from gen.apache.aurora.api.ttypes import (
+    AssignedTask,
     Identity,
     ScheduleStatus,
+    ScheduledTask,
     ScheduleStatusResult,
+    TaskEvent,
     TaskQuery,
 )
 
@@ -69,11 +72,6 @@ class TestClientKillCommand(AuroraClientCommandTest):
     return mock_query_result
 
   @classmethod
-  def create_mock_query(cls):
-    return TaskQuery(owner=Identity(role=cls.TEST_ROLE), environment=cls.TEST_ENV,
-        jobName=cls.TEST_JOB)
-
-  @classmethod
   def get_kill_job_response(cls):
     return cls.create_simple_success_response()
 
@@ -81,6 +79,82 @@ class TestClientKillCommand(AuroraClientCommandTest):
   def assert_kill_job_called(cls, mock_api):
     assert mock_api.kill_job.call_count == 1
 
+  @classmethod
+  def get_expected_task_query(cls, instances=None):
+    """Helper to create the query that will be a parameter to job kill."""
+    instance_ids = frozenset(instances) if instances is not None else None
+    return TaskQuery(taskIds=None, jobName=cls.TEST_JOB, environment=cls.TEST_ENV,
+                     instanceIds=instance_ids, owner=Identity(role=cls.TEST_ROLE, user=None))
+
+  @classmethod
+  def create_mock_task(cls, task_id, instance_id, initial_time, status):
+    mock_task = Mock(spec=ScheduledTask)
+    mock_task.assignedTask = Mock(spec=AssignedTask)
+    mock_task.assignedTask.taskId = task_id
+    mock_task.assignedTask.instanceId = instance_id
+    mock_task.status = status
+    mock_task_event = Mock(spec=TaskEvent)
+    mock_task_event.timestamp = initial_time
+    mock_task.taskEvents = [mock_task_event]
+    return mock_task
+
+  @classmethod
+  def create_mock_status_query_result(cls, scheduleStatus):
+    mock_query_result = cls.create_simple_success_response()
+    mock_query_result.result.scheduleStatusResult = Mock(spec=ScheduleStatusResult)
+    task = cls.create_mock_task('hello', 0, 1000, scheduleStatus)
+    mock_query_result.result.scheduleStatusResult.tasks = [task]
+    return mock_query_result
+
+  @classmethod
+  def assert_scheduler_called(cls, mock_api, mock_query, num_queries):
+    assert mock_api.scheduler_proxy.getTasksStatus.call_count == num_queries
+    mock_api.scheduler_proxy.getTasksStatus.assert_called_with(mock_query)
+
+  def test_kill_job_tasks_not_killed_in_time(self):
+    """Test kill timed out waiting in job monitor."""
+    mock_options = self.setup_mock_options()
+    mock_config = Mock()
+    mock_config.hooks = []
+    mock_config.raw.return_value.enable_hooks.return_value.get.return_value = False
+    (mock_api, mock_scheduler_proxy) = self.create_mock_api()
+    mock_scheduler_proxy.killTasks.return_value = self.get_kill_job_response()
+    mock_query_results = [
+        self.create_mock_status_query_result(ScheduleStatus.RUNNING),
+        self.create_mock_status_query_result(ScheduleStatus.KILLING),
+        self.create_mock_status_query_result(ScheduleStatus.KILLING),
+        self.create_mock_status_query_result(ScheduleStatus.KILLING),
+        self.create_mock_status_query_result(ScheduleStatus.KILLING),
+        self.create_mock_status_query_result(ScheduleStatus.KILLING),
+        self.create_mock_status_query_result(ScheduleStatus.KILLING),
+        self.create_mock_status_query_result(ScheduleStatus.KILLING),
+    ]
+    mock_scheduler_proxy.getTasksStatus.side_effect = mock_query_results
+    with contextlib.nested(
+        patch('time.sleep'),
+        patch('apache.aurora.client.factory.make_client', return_value=mock_api),
+        patch('apache.aurora.client.api.SchedulerProxy', return_value=mock_scheduler_proxy),
+        patch('apache.aurora.client.factory.CLUSTERS', new=self.TEST_CLUSTERS),
+        patch('twitter.common.app.get_options', return_value=mock_options),
+        patch('apache.aurora.client.commands.core.get_job_config', return_value=mock_config)) as (
+      mock_sleep,
+      mock_api_patch,
+      mock_scheduler_proxy_class,
+      mock_clusters,
+      options,
+      mock_get_job_config):
+
+      with temporary_file() as fp:
+        fp.write(self.get_valid_config())
+        fp.flush()
+        self.assertRaises(
+            SystemExit, killall, ['west/mchucarroll/test/hello', fp.name], mock_options)
+
+      # Now check that the right API calls got made.
+      assert mock_scheduler_proxy.killTasks.call_count == 1
+      query = self.get_expected_task_query()
+      mock_scheduler_proxy.killTasks.assert_called_with(query, None)
+      self.assert_scheduler_called(mock_api, query, 8)
 
   def test_kill_job_noshards_fail(self):
     mock_options = self.setup_mock_options()
@@ -103,7 +177,6 @@ class TestClientKillCommand(AuroraClientCommandTest):
       # Now check that the right API calls got made.
       mock_api.kill_job.call_count == 0
 
-
   def test_simple_successful_killall_job(self):
     """Run a test of the "kill" command against a mocked-out API:
     Verifies that the kill command sends the right API RPCs, and performs the correct
@@ -113,13 +186,23 @@ class TestClientKillCommand(AuroraClientCommandTest):
     mock_config = Mock()
     (mock_api, mock_scheduler_proxy) = self.create_mock_api()
     mock_api.kill_job.return_value = self.get_kill_job_response()
+    mock_scheduler_proxy.killTasks.return_value = self.get_kill_job_response()
+    mock_query_results = [
+        self.create_mock_status_query_result(ScheduleStatus.RUNNING),
+        self.create_mock_status_query_result(ScheduleStatus.KILLING),
+        self.create_mock_status_query_result(ScheduleStatus.KILLED),
+    ]
+    mock_scheduler_proxy.getTasksStatus.side_effect = mock_query_results
     with contextlib.nested(
+        patch('time.sleep'),
         patch('apache.aurora.client.commands.core.make_client',
             return_value=mock_api),
         patch('twitter.common.app.get_options', return_value=mock_options),
         patch('apache.aurora.client.commands.core.get_job_config', return_value=mock_config)) as (
+            sleep,
             mock_make_client,
-            options, mock_get_job_config):
+            options,
+            mock_get_job_config):
 
       with temporary_file() as fp:
         fp.write(self.get_valid_config())
@@ -131,15 +214,7 @@ class TestClientKillCommand(AuroraClientCommandTest):
       mock_api.kill_job.assert_called_with(
         AuroraJobKey(cluster=self.TEST_CLUSTER, role=self.TEST_ROLE, env=self.TEST_ENV,
             name=self.TEST_JOB), None, config=mock_config)
-
-
-
-  @classmethod
-  def get_expected_task_query(cls, shards=None):
-    """Helper to create the query that will be a parameter to job kill."""
-    instance_ids = frozenset(shards) if shards is not None else None
-    return TaskQuery(taskIds=None, jobName=cls.TEST_JOB, environment=cls.TEST_ENV,
-        instanceIds=instance_ids, owner=Identity(role=cls.TEST_ROLE, user=None))
+      self.assert_scheduler_called(mock_api, self.get_expected_task_query(), 3)
 
   def test_kill_job_api_level(self):
     """Test kill client-side API logic."""
@@ -149,16 +224,26 @@ class TestClientKillCommand(AuroraClientCommandTest):
     mock_config.raw.return_value.enable_hooks.return_value.get.return_value = False
     (mock_api, mock_scheduler_proxy) = self.create_mock_api()
     mock_scheduler_proxy.killTasks.return_value = self.get_kill_job_response()
+    mock_query_results = [
+        self.create_mock_status_query_result(ScheduleStatus.RUNNING),
+        self.create_mock_status_query_result(ScheduleStatus.KILLING),
+        self.create_mock_status_query_result(ScheduleStatus.KILLED),
+    ]
+    mock_scheduler_proxy.getTasksStatus.side_effect = mock_query_results
     with contextlib.nested(
+        patch('time.sleep'),
         patch('apache.aurora.client.factory.make_client', return_value=mock_api),
         patch('apache.aurora.client.api.SchedulerProxy', return_value=mock_scheduler_proxy),
         patch('apache.aurora.client.factory.CLUSTERS', new=self.TEST_CLUSTERS),
         patch('twitter.common.app.get_options', return_value=mock_options),
         patch('apache.aurora.client.commands.core.get_job_config', return_value=mock_config)) as (
+            mock_sleep,
             mock_api_patch,
             mock_scheduler_proxy_class,
             mock_clusters,
-            options, mock_get_job_config):
+            options,
+            mock_get_job_config):
+
       with temporary_file() as fp:
         fp.write(self.get_valid_config())
         fp.flush()
@@ -166,7 +251,9 @@ class TestClientKillCommand(AuroraClientCommandTest):
 
       # Now check that the right API calls got made.
       assert mock_scheduler_proxy.killTasks.call_count == 1
-      mock_scheduler_proxy.killTasks.assert_called_with(self.get_expected_task_query(), None)
+      query = self.get_expected_task_query()
+      mock_scheduler_proxy.killTasks.assert_called_with(query, None)
+      self.assert_scheduler_called(mock_api, query, 3)
 
   def test_kill_job_api_level_with_shards(self):
     """Test kill client-side API logic."""
@@ -177,12 +264,20 @@ class TestClientKillCommand(AuroraClientCommandTest):
     mock_config.raw.return_value.enable_hooks.return_value.get.return_value = False
     (mock_api, mock_scheduler_proxy) = self.create_mock_api()
     mock_scheduler_proxy.killTasks.return_value = self.get_kill_job_response()
+    mock_query_results = [
+        self.create_mock_status_query_result(ScheduleStatus.RUNNING),
+        self.create_mock_status_query_result(ScheduleStatus.KILLING),
+        self.create_mock_status_query_result(ScheduleStatus.KILLED),
+    ]
+    mock_scheduler_proxy.getTasksStatus.side_effect = mock_query_results
     with contextlib.nested(
+        patch('time.sleep'),
         patch('apache.aurora.client.factory.make_client', return_value=mock_api),
         patch('apache.aurora.client.api.SchedulerProxy', return_value=mock_scheduler_proxy),
         patch('apache.aurora.client.factory.CLUSTERS', new=self.TEST_CLUSTERS),
         patch('twitter.common.app.get_options', return_value=mock_options),
         patch('apache.aurora.client.commands.core.get_job_config', return_value=mock_config)) as (
+            mock_sleep,
             mock_api_factory_patch,
             mock_scheduler_proxy_class,
             mock_clusters,
@@ -196,3 +291,4 @@ class TestClientKillCommand(AuroraClientCommandTest):
       assert mock_scheduler_proxy.killTasks.call_count == 1
       query = self.get_expected_task_query([0, 1, 2, 3])
       mock_scheduler_proxy.killTasks.assert_called_with(query, None)
+      self.assert_scheduler_called(mock_api, query, 3)

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/464d6ea5/src/test/python/apache/aurora/client/commands/test_update.py
----------------------------------------------------------------------
diff --git a/src/test/python/apache/aurora/client/commands/test_update.py b/src/test/python/apache/aurora/client/commands/test_update.py
index 6e145db..d51579f 100644
--- a/src/test/python/apache/aurora/client/commands/test_update.py
+++ b/src/test/python/apache/aurora/client/commands/test_update.py
@@ -25,6 +25,7 @@ from apache.aurora.client.commands.util import AuroraClientCommandTest
 from apache.aurora.client.api.updater import Updater
 from apache.aurora.client.api.health_check import StatusHealthCheck, Retriable
 from apache.aurora.client.api.quota_check import QuotaCheck
+from apache.aurora.client.api.job_monitor import JobMonitor
 from apache.aurora.client.hooks.hooked_api import HookedAuroraClientAPI
 from apache.aurora.config import AuroraConfig
 from twitter.common.contextutil import temporary_file
@@ -206,6 +207,13 @@ class TestUpdateCommand(AuroraClientCommandTest):
   def setup_quota_check(cls):
     mock_quota_check = Mock(spec=QuotaCheck)
     mock_quota_check.validate_quota_from_requested.return_value = cls.create_simple_success_response()
+    return mock_quota_check
+
+  @classmethod
+  def setup_job_monitor(cls):
+    mock_job_monitor = Mock(spec=JobMonitor)
+    mock_job_monitor.wait_until.return_value = True
+    return mock_job_monitor
 
   def test_updater_simple(self):
     # Test the client-side updater logic in its simplest case: everything succeeds, and no rolling
@@ -214,6 +222,7 @@ class TestUpdateCommand(AuroraClientCommandTest):
     (mock_api, mock_scheduler_proxy) = self.create_mock_api()
     mock_health_check = self.setup_health_checks(mock_api)
     mock_quota_check = self.setup_quota_check()
+    mock_job_monitor = self.setup_job_monitor()
 
     with contextlib.nested(
         patch('twitter.common.app.get_options', return_value=mock_options),
@@ -221,12 +230,13 @@ class TestUpdateCommand(AuroraClientCommandTest):
         patch('apache.aurora.client.factory.CLUSTERS', new=self.TEST_CLUSTERS),
         patch('apache.aurora.client.api.instance_watcher.StatusHealthCheck',
             return_value=mock_health_check),
-        patch('apache.aurora.client.api.quota_check.QuotaCheck', return_value=mock_quota_check),
+        patch('apache.aurora.client.api.updater.QuotaCheck', return_value=mock_quota_check),
+        patch('apache.aurora.client.api.updater.JobMonitor', return_value=mock_job_monitor),
         patch('time.time', side_effect=functools.partial(self.fake_time, self)),
         patch('time.sleep', return_value=None)
 
     ) as (options, scheduler_proxy_class, test_clusters, mock_health_check_factory,
-          mock_quota_check_patch, time_patch, sleep_patch):
+          mock_quota_check_patch, mock_job_monitor_patch, time_patch, sleep_patch):
       self.setup_mock_scheduler_for_simple_update(mock_api)
       with temporary_file() as fp:
         fp.write(self.get_valid_config())