You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aurora.apache.org by me...@apache.org on 2017/05/02 16:48:25 UTC

aurora git commit: AURORA-1923 Aurora client should not automatically retry non-idempotent operations

Repository: aurora
Updated Branches:
  refs/heads/master 6a896df63 -> f1e25375d


AURORA-1923 Aurora client should not automatically retry non-idempotent operations

Aurora client has a built in mechanism to automatically retry thrift API operations if the connection with scheduler times out, experiences transport exception, or encounters a transient exception on the scheduler side.

Retrying thrift calls due to scheduler connection timeout and transient exceptions (see AURORA-187) is safe. However, as Aurora has no concept of idempotency, its client can retry non-idempotent operations upon encountering transport exceptions which can lead to nondeterministic situations.

For example, if client requests go through a proxy to reach scheduler, client might consider a non-idempotent request failed and automatically retry it while the original request has been received and processed by the scheduler.

This patch changes Aurora client invocation semantics from "at least once" to "at most once" for non-idempotent operations.

Reviewed at https://reviews.apache.org/r/58850/


Project: http://git-wip-us.apache.org/repos/asf/aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/aurora/commit/f1e25375
Tree: http://git-wip-us.apache.org/repos/asf/aurora/tree/f1e25375
Diff: http://git-wip-us.apache.org/repos/asf/aurora/diff/f1e25375

Branch: refs/heads/master
Commit: f1e25375def5a047da97d8bdfb47a3a9101568f6
Parents: 6a896df
Author: Mehrdad Nurolahzade <me...@apache.org>
Authored: Tue May 2 09:35:36 2017 -0700
Committer: Mehrdad Nurolahzade <mn...@twitter.com>
Committed: Tue May 2 09:35:36 2017 -0700

----------------------------------------------------------------------
 .../python/apache/aurora/client/api/__init__.py | 39 ++++++++----
 .../aurora/client/api/scheduler_client.py       | 23 ++++++-
 .../python/apache/aurora/client/cli/__init__.py |  7 ++
 .../apache/aurora/admin/test_maintenance.py     | 14 ++--
 src/test/python/apache/aurora/api_util.py       | 36 +++++++++++
 .../python/apache/aurora/client/api/test_api.py | 17 +++--
 .../aurora/client/api/test_scheduler_client.py  | 67 ++++++++++++++++++++
 .../apache/aurora/client/cli/test_status.py     |  6 +-
 .../apache/aurora/client/cli/test_task.py       |  9 ++-
 9 files changed, 185 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/aurora/blob/f1e25375/src/main/python/apache/aurora/client/api/__init__.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/aurora/client/api/__init__.py b/src/main/python/apache/aurora/client/api/__init__.py
index a4639db..7efafd3 100644
--- a/src/main/python/apache/aurora/client/api/__init__.py
+++ b/src/main/python/apache/aurora/client/api/__init__.py
@@ -88,7 +88,8 @@ class AuroraClientAPI(object):
     return self._scheduler_proxy.descheduleCronJob(jobkey.to_thrift())
 
   def populate_job_config(self, config):
-    return self._scheduler_proxy.populateJobConfig(config.job())
+    # read-only calls are retriable.
+    return self._scheduler_proxy.populateJobConfig(config.job(), retry=True)
 
   def start_cronjob(self, job_key):
     self._assert_valid_job_key(job_key)
@@ -98,7 +99,8 @@ class AuroraClientAPI(object):
 
   def get_jobs(self, role):
     log.info("Retrieving jobs for role %s" % role)
-    return self._scheduler_proxy.getJobs(role)
+    # read-only calls are retriable.
+    return self._scheduler_proxy.getJobs(role, retry=True)
 
   def add_instances(self, job_key, instance_id, count):
     key = InstanceKey(jobKey=job_key.to_thrift(), instanceId=instance_id)
@@ -113,7 +115,6 @@ class AuroraClientAPI(object):
     if instances is not None:
       log.info("Instances to be killed: %s" % instances)
       instances = frozenset([int(s) for s in instances])
-
     return self._scheduler_proxy.killTasks(job_key.to_thrift(), instances, message)
 
   def check_status(self, job_key):
@@ -130,14 +131,16 @@ class AuroraClientAPI(object):
 
   def query(self, query):
     try:
-      return self._scheduler_proxy.getTasksStatus(query)
+      # read-only calls are retriable.
+      return self._scheduler_proxy.getTasksStatus(query, retry=True)
     except SchedulerProxy.ThriftInternalError as e:
       raise self.ThriftInternalError(e.args[0])
 
   def query_no_configs(self, query):
     """Returns all matching tasks without TaskConfig.executorConfig set."""
     try:
-      return self._scheduler_proxy.getTasksWithoutConfigs(query)
+      # read-only calls are retriable.
+      return self._scheduler_proxy.getTasksWithoutConfigs(query, retry=True)
     except SchedulerProxy.ThriftInternalError as e:
       raise self.ThriftInternalError(e.args[0])
 
@@ -167,7 +170,9 @@ class AuroraClientAPI(object):
     """
     request = self._job_update_request(config, instances, metadata)
     log.info("Starting update for: %s" % config.name())
-    return self._scheduler_proxy.startJobUpdate(request, message)
+    # retring starting a job update is safe, client and scheduler reconcile state if the
+    # job update is in progress (AURORA-1711).
+    return self._scheduler_proxy.startJobUpdate(request, message, retry=True)
 
   def pause_job_update(self, update_key, message):
     """Requests Scheduler to pause active job update.
@@ -225,7 +230,8 @@ class AuroraClientAPI(object):
     """
     request = self._job_update_request(config, instances)
     log.debug("Requesting job update diff details for: %s" % config.name())
-    return self._scheduler_proxy.getJobUpdateDiff(request)
+    # read-only calls are retriable.
+    return self._scheduler_proxy.getJobUpdateDiff(request, retry=True)
 
   def query_job_updates(
       self,
@@ -246,13 +252,15 @@ class AuroraClientAPI(object):
     Returns response object with all matching job update summaries.
     """
     # TODO(wfarner): Consider accepting JobUpdateQuery in this function instead of kwargs.
+    # read-only calls are retriable.
     return self._scheduler_proxy.getJobUpdateSummaries(
         JobUpdateQuery(
             role=role,
             jobKey=job_key.to_thrift() if job_key else None,
             user=user,
             updateStatuses=update_statuses,
-            key=update_key))
+            key=update_key),
+        retry=True)
 
   def get_job_update_details(self, key):
     """Gets JobUpdateDetails for the specified job update ID.
@@ -267,7 +275,8 @@ class AuroraClientAPI(object):
                            % (key, JobUpdateKey.__name__, key.__class__.__name__))
 
     query = JobUpdateQuery(key=key)
-    return self._scheduler_proxy.getJobUpdateDetails(key, query)
+    # read-only calls are retriable.
+    return self._scheduler_proxy.getJobUpdateDetails(key, query, retry=True)
 
   def restart(self, job_key, instances, restart_settings):
     """Perform a rolling restart of the job.
@@ -291,7 +300,8 @@ class AuroraClientAPI(object):
 
   def maintenance_status(self, hosts):
     log.info("Maintenance status for: %s" % hosts.hostNames)
-    return self._scheduler_proxy.maintenanceStatus(hosts)
+    # read-only calls are retriable.
+    return self._scheduler_proxy.maintenanceStatus(hosts, retry=True)
 
   def end_maintenance(self, hosts):
     log.info("Ending maintenance for: %s" % hosts.hostNames)
@@ -299,7 +309,8 @@ class AuroraClientAPI(object):
 
   def get_quota(self, role):
     log.info("Getting quota for: %s" % role)
-    return self._scheduler_proxy.getQuota(role)
+    # read-only calls are retriable.
+    return self._scheduler_proxy.getQuota(role, retry=True)
 
   def set_quota(self, role, cpu, ram, disk):
     log.info("Setting quota for user:%s cpu:%f ram:%d disk: %d"
@@ -313,7 +324,8 @@ class AuroraClientAPI(object):
 
   def get_tier_configs(self):
     log.debug("Getting tier configurations")
-    return self._scheduler_proxy.getTierConfigs()
+    # read-only calls are retriable.
+    return self._scheduler_proxy.getTierConfigs(retry=True)
 
   def force_task_state(self, task_id, status):
     log.info("Requesting that task %s transition to state %s" % (task_id, status))
@@ -329,7 +341,8 @@ class AuroraClientAPI(object):
     return self._scheduler_proxy.stageRecovery(backup_id)
 
   def query_recovery(self, query):
-    return self._scheduler_proxy.queryRecovery(query)
+    # read-only calls are retriable.
+    return self._scheduler_proxy.queryRecovery(query, retry=True)
 
   def delete_recovery_tasks(self, query):
     return self._scheduler_proxy.deleteRecoveryTasks(query)

http://git-wip-us.apache.org/repos/asf/aurora/blob/f1e25375/src/main/python/apache/aurora/client/api/scheduler_client.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/aurora/client/api/scheduler_client.py b/src/main/python/apache/aurora/client/api/scheduler_client.py
index 9bbfece..e6ed0b0 100644
--- a/src/main/python/apache/aurora/client/api/scheduler_client.py
+++ b/src/main/python/apache/aurora/client/api/scheduler_client.py
@@ -235,6 +235,7 @@ class SchedulerProxy(object):
   class AuthError(Error): pass
   class APIVersionError(Error): pass
   class ThriftInternalError(Error): pass
+  class NotRetriableError(Error): pass
 
   def __init__(self, cluster, verbose=False, **kwargs):
     self.cluster = cluster
@@ -302,12 +303,12 @@ class SchedulerProxy(object):
       return method
 
     @functools.wraps(method)
-    def method_wrapper(*args):
+    def method_wrapper(*args, **kwargs):
+      retry = kwargs.get('retry', False)
       with self._lock:
         start = time.time()
         while not self._terminating.is_set() and (
             time.time() - start) < self.RPC_MAXIMUM_WAIT.as_(Time.SECONDS):
-
           try:
             method = getattr(self.client(), method_name)
             if not callable(method):
@@ -321,7 +322,23 @@ class SchedulerProxy(object):
           except TRequestsTransport.AuthError as e:
             log.error(self.scheduler_client().get_failed_auth_message())
             raise self.AuthError(e)
-          except (TTransport.TTransportException, self.TimeoutError, self.TransientError) as e:
+          except TTransport.TTransportException as e:
+            # Client does not know if the request has been received and processed by
+            # the scheduler, therefore the call is retried if it is idempotent.
+            if not self._terminating.is_set():
+              if retry:
+                log.warning('Transport error communicating with scheduler: %s, retrying...' % e)
+                self.invalidate()
+                self._terminating.wait(self.RPC_RETRY_INTERVAL.as_(Time.SECONDS))
+              else:
+                raise self.NotRetriableError('Transport error communicating with scheduler during '
+                                             'non-idempotent operation: %s, not retrying' % e)
+          except (self.TimeoutError, self.TransientError) as e:
+            # If it is TimeoutError then the connection with scheduler could not
+            # be established, therefore the call did not go through.
+            # If it is TransientError then the scheduler could not process the call
+            # because its storage is not in READY state.
+            # In both cases, the call can be safely retried.
             if not self._terminating.is_set():
               log.warning('Connection error with scheduler: %s, reconnecting...' % e)
               self.invalidate()

http://git-wip-us.apache.org/repos/asf/aurora/blob/f1e25375/src/main/python/apache/aurora/client/cli/__init__.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/aurora/client/cli/__init__.py b/src/main/python/apache/aurora/client/cli/__init__.py
index c1c5454..a28a3bb 100644
--- a/src/main/python/apache/aurora/client/cli/__init__.py
+++ b/src/main/python/apache/aurora/client/cli/__init__.py
@@ -40,6 +40,7 @@ import pkg_resources
 from twitter.common.lang import AbstractClass, Compatibility
 
 from apache.aurora.client.api import AuroraClientAPI
+from apache.aurora.client.api.scheduler_client import SchedulerProxy
 
 from .command_hooks import GlobalCommandHookRegistry
 from .options import CommandOption
@@ -319,6 +320,12 @@ class CommandLine(AbstractClass):
     except Context.CommandError as c:
       context.print_err(c.msg)
       return c.code
+    except SchedulerProxy.NotRetriableError as e:
+      context.print_err(e.message)
+      return EXIT_NETWORK_ERROR
+    except SchedulerProxy.TimeoutError as e:
+      context.print_err(e.message)
+      return EXIT_TIMEOUT
     except AuroraClientAPI.Error as e:
       # TODO(wfarner): Generalize this error type in the contract of noun and verb implementations.
       context.print_err("Fatal error running command: %s" % e.message)

http://git-wip-us.apache.org/repos/asf/aurora/blob/f1e25375/src/test/python/apache/aurora/admin/test_maintenance.py
----------------------------------------------------------------------
diff --git a/src/test/python/apache/aurora/admin/test_maintenance.py b/src/test/python/apache/aurora/admin/test_maintenance.py
index cccc1e5..ca0239b 100644
--- a/src/test/python/apache/aurora/admin/test_maintenance.py
+++ b/src/test/python/apache/aurora/admin/test_maintenance.py
@@ -107,14 +107,16 @@ class TestMaintenanceCommands(AuroraClientCommandTest):
       host_activate([self.TEST_CLUSTER])
 
       mock_scheduler_proxy.endMaintenance.assert_called_with(Hosts(set(self.HOSTNAMES)))
-      mock_scheduler_proxy.maintenanceStatus.assert_called_with(Hosts(set(self.HOSTNAMES)))
+      mock_scheduler_proxy.maintenanceStatus.assert_called_with(
+        Hosts(set(self.HOSTNAMES)),
+        retry=True)
 
   def test_perform_maintenance_hosts(self):
     mock_options = self.make_mock_options()
     mock_options.post_drain_script = 'callback'
     mock_options.grouping = 'by_host'
 
-    def host_status_results(hostnames):
+    def host_status_results(hostnames, retry=True):
       if isinstance(hostnames, Hosts):
         return self.create_drained_status_result(hostnames)
       return self.create_maintenance_status_result()
@@ -150,7 +152,7 @@ class TestMaintenanceCommands(AuroraClientCommandTest):
     mock_options.post_drain_script = None
     mock_options.grouping = 'by_host'
 
-    def host_status_results(hostnames):
+    def host_status_results(hostnames, retry=True):
       if isinstance(hostnames, Hosts):
         return self.create_drained_status_result(hostnames)
       return self.create_maintenance_status_result()
@@ -247,7 +249,7 @@ class TestMaintenanceCommands(AuroraClientCommandTest):
     mock_options.post_drain_script = None
     mock_options.grouping = 'by_host'
 
-    def host_status_results(hostnames):
+    def host_status_results(hostnames, retry=True):
       if isinstance(hostnames, Hosts):
         return self.create_drained_status_result(hostnames)
       return self.create_maintenance_status_result()
@@ -321,4 +323,6 @@ class TestMaintenanceCommands(AuroraClientCommandTest):
         patch('twitter.common.app.get_options', return_value=mock_options)):
       host_status([self.TEST_CLUSTER])
 
-      mock_scheduler_proxy.maintenanceStatus.assert_called_with(Hosts(set(self.HOSTNAMES)))
+      mock_scheduler_proxy.maintenanceStatus.assert_called_with(
+        Hosts(set(self.HOSTNAMES)),
+        retry=True)

http://git-wip-us.apache.org/repos/asf/aurora/blob/f1e25375/src/test/python/apache/aurora/api_util.py
----------------------------------------------------------------------
diff --git a/src/test/python/apache/aurora/api_util.py b/src/test/python/apache/aurora/api_util.py
index f4935b5..bd6e3a6 100644
--- a/src/test/python/apache/aurora/api_util.py
+++ b/src/test/python/apache/aurora/api_util.py
@@ -115,5 +115,41 @@ class SchedulerProxyApiSpec(SchedulerThriftApiSpec, SchedulerProxy):
   A concrete definition of the API provided by SchedulerProxy.
   """
 
+  def getTasksStatus(self, query, retry=True):
+    pass
+
+  def getTasksWithoutConfigs(self, query, retry=True):
+    pass
+
+  def getJobs(self, ownerRole, retry=True):
+    pass
+
+  def getQuota(self, ownerRole, retry=True):
+    pass
+
+  def populateJobConfig(self, description, retry=True):
+    pass
+
+  def getJobUpdateSummaries(self, jobUpdateQuery, retry=True):
+    pass
+
+  def getJobUpdateDetails(self, key, query, retry=True):
+    pass
+
+  def getJobUpdateDiff(self, request, retry=True):
+    pass
+
+  def getTierConfigs(self, retry=True):
+    pass
+
+  def queryRecovery(self, query, retry=True):
+    pass
+
+  def maintenanceStatus(self, hosts, retry=True):
+    pass
+
+  def startJobUpdate(self, request, message, retry=True):
+    pass
+
   def url(self):
     pass

http://git-wip-us.apache.org/repos/asf/aurora/blob/f1e25375/src/test/python/apache/aurora/client/api/test_api.py
----------------------------------------------------------------------
diff --git a/src/test/python/apache/aurora/client/api/test_api.py b/src/test/python/apache/aurora/client/api/test_api.py
index f1579b5..9d6d9de 100644
--- a/src/test/python/apache/aurora/client/api/test_api.py
+++ b/src/test/python/apache/aurora/client/api/test_api.py
@@ -21,7 +21,7 @@ from apache.aurora.common.cluster import Cluster
 from apache.aurora.config import AuroraConfig
 from apache.aurora.config.schema.base import UpdateConfig
 
-from ...api_util import SchedulerThriftApiSpec
+from ...api_util import SchedulerProxyApiSpec
 
 from gen.apache.aurora.api.ttypes import (
     InstanceKey,
@@ -75,7 +75,7 @@ class TestJobUpdateApis(unittest.TestCase):
   @classmethod
   def mock_api(cls):
     api = AuroraClientAPI(Cluster(name="foo"), 'test-client')
-    mock_proxy = create_autospec(spec=SchedulerThriftApiSpec, spec_set=True, instance=True)
+    mock_proxy = create_autospec(spec=SchedulerProxyApiSpec, spec_set=True, instance=True)
     api._scheduler_proxy = mock_proxy
     return api, mock_proxy
 
@@ -131,7 +131,8 @@ class TestJobUpdateApis(unittest.TestCase):
     api.start_job_update(self.mock_job_config(), instances=None, message='hello')
     mock_proxy.startJobUpdate.assert_called_once_with(
         self.create_update_request(task_config),
-        'hello')
+        'hello',
+        retry=True)
 
   def test_start_job_update_fails_parse_update_config(self):
     """Test start_job_update fails to parse invalid UpdateConfig."""
@@ -150,7 +151,9 @@ class TestJobUpdateApis(unittest.TestCase):
     mock_proxy.getJobUpdateDiff.return_value = self.create_simple_success_response()
 
     api.get_job_update_diff(self.mock_job_config(), instances=None)
-    mock_proxy.getJobUpdateDiff.assert_called_once_with(self.create_update_request(task_config))
+    mock_proxy.getJobUpdateDiff.assert_called_once_with(
+      self.create_update_request(task_config),
+      retry=True)
 
   def test_pause_job_update(self):
     """Test successful job update pause."""
@@ -176,14 +179,14 @@ class TestJobUpdateApis(unittest.TestCase):
         jobKey=job_key.to_thrift(),
         updateStatuses={JobUpdateStatus.ROLLING_FORWARD})
     api.query_job_updates(job_key=job_key, update_statuses=query.updateStatuses)
-    mock_proxy.getJobUpdateSummaries.assert_called_once_with(query)
+    mock_proxy.getJobUpdateSummaries.assert_called_once_with(query, retry=True)
 
   def test_query_job_updates_no_filter(self):
     """Test querying job updates with no filter args."""
     api, mock_proxy = self.mock_api()
     query = JobUpdateQuery()
     api.query_job_updates()
-    mock_proxy.getJobUpdateSummaries.assert_called_once_with(query)
+    mock_proxy.getJobUpdateSummaries.assert_called_once_with(query, retry=True)
 
   def test_get_job_update_details(self):
     """Test getting job update details."""
@@ -191,7 +194,7 @@ class TestJobUpdateApis(unittest.TestCase):
     key = JobUpdateKey(job=JobKey(role="role", environment="env", name="name"), id="id")
     api.get_job_update_details(key)
     query = JobUpdateQuery(key=key)
-    mock_proxy.getJobUpdateDetails.assert_called_once_with(key, query)
+    mock_proxy.getJobUpdateDetails.assert_called_once_with(key, query, retry=True)
 
   def test_set_quota(self):
     """Test setting quota."""

http://git-wip-us.apache.org/repos/asf/aurora/blob/f1e25375/src/test/python/apache/aurora/client/api/test_scheduler_client.py
----------------------------------------------------------------------
diff --git a/src/test/python/apache/aurora/client/api/test_scheduler_client.py b/src/test/python/apache/aurora/client/api/test_scheduler_client.py
index 59c651c..b2fd4d9 100644
--- a/src/test/python/apache/aurora/client/api/test_scheduler_client.py
+++ b/src/test/python/apache/aurora/client/api/test_scheduler_client.py
@@ -483,6 +483,73 @@ class TestSchedulerClient(unittest.TestCase):
 
   @mock.patch('apache.aurora.client.api.scheduler_client.SchedulerClient',
               spec=scheduler_client.SchedulerClient)
+  @mock.patch('threading._Event.wait')
+  def test_performBackup_retriable_errors(self, mock_wait, mock_client):
+    mock_scheduler_client = mock.create_autospec(
+        spec=scheduler_client.SchedulerClient,
+        spec_set=False,
+        instance=True)
+    mock_thrift_client = mock.create_autospec(spec=AuroraAdmin.Client, instance=True)
+    mock_thrift_client.performBackup.side_effect = [
+      Response(responseCode=ResponseCode.ERROR_TRANSIENT),
+      scheduler_client.SchedulerProxy.TimeoutError,
+      Response(responseCode=ResponseCode.OK)]
+
+    mock_scheduler_client.get_thrift_client.return_value = mock_thrift_client
+    mock_client.get.return_value = mock_scheduler_client
+
+    proxy = scheduler_client.SchedulerProxy(Cluster(name='local'))
+    proxy.performBackup()
+
+    assert mock_thrift_client.performBackup.call_count == 3
+    assert mock_wait.call_count == 2
+
+  @mock.patch('apache.aurora.client.api.scheduler_client.SchedulerClient',
+              spec=scheduler_client.SchedulerClient)
+  @mock.patch('threading._Event.wait')
+  def test_performBackup_transport_exception(self, mock_wait, mock_client):
+    mock_scheduler_client = mock.create_autospec(
+      spec=scheduler_client.SchedulerClient,
+      spec_set=False,
+      instance=True)
+
+    mock_thrift_client = mock.create_autospec(spec=AuroraAdmin.Client, instance=True)
+    mock_thrift_client.performBackup.side_effect = TTransport.TTransportException('error')
+    mock_scheduler_client.get_thrift_client.return_value = mock_thrift_client
+    mock_client.get.return_value = mock_scheduler_client
+
+    proxy = scheduler_client.SchedulerProxy(Cluster(name='local'))
+    with pytest.raises(scheduler_client.SchedulerProxy.NotRetriableError):
+      proxy.performBackup()
+
+    assert mock_thrift_client.performBackup.call_count == 1
+    assert not mock_wait.called
+
+  @mock.patch('apache.aurora.client.api.scheduler_client.SchedulerClient',
+              spec=scheduler_client.SchedulerClient)
+  @mock.patch('threading._Event.wait')
+  def test_getTierConfigs_transport_exception(self, mock_wait, mock_client):
+    mock_scheduler_client = mock.create_autospec(
+      spec=scheduler_client.SchedulerClient,
+      spec_set=False,
+      instance=True)
+
+    mock_thrift_client = mock.create_autospec(spec=AuroraAdmin.Client, instance=True)
+    mock_thrift_client.getTierConfigs.side_effect = [
+      TTransport.TTransportException('error'),
+      Response(responseCode=ResponseCode.OK)
+    ]
+    mock_scheduler_client.get_thrift_client.return_value = mock_thrift_client
+    mock_client.get.return_value = mock_scheduler_client
+
+    proxy = scheduler_client.SchedulerProxy(Cluster(name='local'))
+    proxy.getTierConfigs(retry=True)
+
+    assert mock_thrift_client.getTierConfigs.call_count == 2
+    assert mock_wait.call_count == 1
+
+  @mock.patch('apache.aurora.client.api.scheduler_client.SchedulerClient',
+              spec=scheduler_client.SchedulerClient)
   def test_unknown_connection_error(self, client):
     mock_scheduler_client = mock.create_autospec(spec=scheduler_client.SchedulerClient,
                                                  instance=True)

http://git-wip-us.apache.org/repos/asf/aurora/blob/f1e25375/src/test/python/apache/aurora/client/cli/test_status.py
----------------------------------------------------------------------
diff --git a/src/test/python/apache/aurora/client/cli/test_status.py b/src/test/python/apache/aurora/client/cli/test_status.py
index 14ef360..b0b7f96 100644
--- a/src/test/python/apache/aurora/client/cli/test_status.py
+++ b/src/test/python/apache/aurora/client/cli/test_status.py
@@ -222,7 +222,8 @@ class TestJobStatus(AuroraClientCommandTest):
       cmd = AuroraCommandLine()
       cmd.execute(['job', 'status', 'west/bozo/test/hello'])
       mock_scheduler_proxy.getTasksWithoutConfigs.assert_called_with(
-          TaskQuery(jobKeys=[JobKey(role='bozo', environment='test', name='hello')]))
+          TaskQuery(jobKeys=[JobKey(role='bozo', environment='test', name='hello')]),
+          retry=True)
 
   def test_successful_status_output_no_metadata(self):
     """Test the status command more deeply: in a request with a fully specified
@@ -343,7 +344,8 @@ class TestJobStatus(AuroraClientCommandTest):
       cmd = AuroraCommandLine()
       cmd.execute(['job', 'status', 'west/bozo/test/hello'])
       mock_scheduler_proxy.getTasksWithoutConfigs.assert_called_with(
-          TaskQuery(jobKeys=[JobKey(role='bozo', environment='test', name='hello')]))
+          TaskQuery(jobKeys=[JobKey(role='bozo', environment='test', name='hello')]),
+          retry=True)
 
   def test_status_wildcard(self):
     """Test status using a wildcard. It should first call api.get_jobs, and then do a

http://git-wip-us.apache.org/repos/asf/aurora/blob/f1e25375/src/test/python/apache/aurora/client/cli/test_task.py
----------------------------------------------------------------------
diff --git a/src/test/python/apache/aurora/client/cli/test_task.py b/src/test/python/apache/aurora/client/cli/test_task.py
index 610414f..390993f 100644
--- a/src/test/python/apache/aurora/client/cli/test_task.py
+++ b/src/test/python/apache/aurora/client/cli/test_task.py
@@ -94,7 +94,8 @@ class TestRunCommand(AuroraClientCommandTest):
           jobKeys=[JobKey(role='bozo', environment='test', name='hello')],
           statuses=set([ScheduleStatus.RUNNING, ScheduleStatus.KILLING, ScheduleStatus.RESTARTING,
               ScheduleStatus.PREEMPTING, ScheduleStatus.DRAINING]),
-          instanceIds=instances))
+          instanceIds=instances),
+          retry=True)
 
       # The mock status call returns 3 three ScheduledTasks, so three commands should have been run
       assert mock_subprocess.call_count == 3
@@ -147,7 +148,8 @@ class TestSshCommand(AuroraClientCommandTest):
           jobKeys=[JobKey(role='bozo', environment='test', name='hello')],
           instanceIds=set([1]),
           statuses=set([ScheduleStatus.RUNNING, ScheduleStatus.KILLING, ScheduleStatus.RESTARTING,
-              ScheduleStatus.PREEMPTING, ScheduleStatus.DRAINING])))
+              ScheduleStatus.PREEMPTING, ScheduleStatus.DRAINING])),
+          retry=True)
       mock_subprocess.assert_called_with(['ssh', '-t', '-v', 'bozo@slavehost',
           'cd /slaveroot/slaves/*/frameworks/*/executors/thermos-1287391823/runs/'
           'slaverun/sandbox;ls'])
@@ -174,7 +176,8 @@ class TestSshCommand(AuroraClientCommandTest):
           jobKeys=[JobKey(role='bozo', environment='test', name='hello')],
           instanceIds=None,
           statuses=set([ScheduleStatus.RUNNING, ScheduleStatus.KILLING, ScheduleStatus.RESTARTING,
-              ScheduleStatus.PREEMPTING, ScheduleStatus.DRAINING])))
+              ScheduleStatus.PREEMPTING, ScheduleStatus.DRAINING])),
+          retry=True)
       mock_subprocess.assert_called_with(['ssh', '-t', '-v', 'bozo@slavehost',
           'cd /slaveroot/slaves/*/frameworks/*/executors/thermos-1287391823/runs/'
           'slaverun/sandbox;bash'])