You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aurora.apache.org by ma...@apache.org on 2014/07/23 19:08:12 UTC
[2/2] git commit: Implementing parallel updater.
Implementing parallel updater.
The updater now spawns upto batch_size threads to process
one instance per thread.
All scheduler calls are multiplexed by the SchedulerMux to do
batch kill/add/restart/status calls.
Bugs closed: AURORA-350
Reviewed at https://reviews.apache.org/r/21440/
Project: http://git-wip-us.apache.org/repos/asf/incubator-aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-aurora/commit/e1c0ade2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-aurora/tree/e1c0ade2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-aurora/diff/e1c0ade2
Branch: refs/heads/master
Commit: e1c0ade29d232282a82550949f5f5370ce2666ab
Parents: 2b78aff
Author: Maxim Khutornenko <ma...@apache.org>
Authored: Wed Jul 23 10:07:41 2014 -0700
Committer: Maxim Khutornenko <ma...@apache.org>
Committed: Wed Jul 23 10:07:41 2014 -0700
----------------------------------------------------------------------
src/main/python/apache/aurora/client/api/BUILD | 34 ++
.../aurora/client/api/error_handling_thread.py | 75 +++
.../aurora/client/api/instance_watcher.py | 36 +-
.../apache/aurora/client/api/job_monitor.py | 38 +-
.../aurora/client/api/scheduler_client.py | 51 +-
.../apache/aurora/client/api/scheduler_mux.py | 121 +++++
src/main/python/apache/aurora/client/api/sla.py | 6 +-
.../apache/aurora/client/api/task_util.py | 101 ++++
.../python/apache/aurora/client/api/updater.py | 495 +++++++++++++------
.../apache/aurora/client/api/updater_util.py | 4 +-
src/main/python/apache/aurora/client/base.py | 8 +-
src/test/python/apache/aurora/client/api/BUILD | 17 +
.../aurora/client/api/test_instance_watcher.py | 27 +-
.../aurora/client/api/test_job_monitor.py | 22 +-
.../aurora/client/api/test_scheduler_mux.py | 72 +++
.../apache/aurora/client/api/test_task_util.py | 83 ++++
.../apache/aurora/client/api/test_updater.py | 299 +++++------
.../apache/aurora/client/cli/test_create.py | 6 +-
.../apache/aurora/client/cli/test_kill.py | 20 +-
.../apache/aurora/client/cli/test_restart.py | 10 +-
.../apache/aurora/client/cli/test_update.py | 42 +-
.../python/apache/aurora/client/cli/util.py | 2 +-
.../aurora/client/commands/test_create.py | 4 +-
.../apache/aurora/client/commands/test_kill.py | 8 +-
.../aurora/client/commands/test_restart.py | 8 +-
.../aurora/client/commands/test_update.py | 46 +-
.../apache/aurora/client/commands/util.py | 2 +-
.../aurora/client/fake_scheduler_proxy.py | 7 +
.../apache/aurora/e2e/http/http_example.aurora | 4 +-
.../aurora/e2e/http/http_example_updated.aurora | 13 +-
.../sh/org/apache/aurora/e2e/test_end_to_end.sh | 2 +-
.../org/apache/aurora/e2e/test_end_to_end_v2.sh | 2 +-
32 files changed, 1182 insertions(+), 483 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/e1c0ade2/src/main/python/apache/aurora/client/api/BUILD
----------------------------------------------------------------------
diff --git a/src/main/python/apache/aurora/client/api/BUILD b/src/main/python/apache/aurora/client/api/BUILD
index c205a7d..70ad38e 100644
--- a/src/main/python/apache/aurora/client/api/BUILD
+++ b/src/main/python/apache/aurora/client/api/BUILD
@@ -55,6 +55,7 @@ python_library(
name = 'job_monitor',
sources = ['job_monitor.py'],
dependencies = [
+ pants(':task_util'),
pants('3rdparty/python:twitter.common.log'),
pants('3rdparty/python:twitter.common.quantity'),
pants('src/main/thrift/org/apache/aurora/gen:py-thrift'),
@@ -88,10 +89,43 @@ python_library(
)
python_library(
+ name = 'error_handling_thread',
+ sources = ['error_handling_thread.py'],
+ dependencies = [
+ pants('3rdparty/python:twitter.common.decorators'),
+ pants('3rdparty/python:twitter.common.quantity'),
+ pants('3rdparty/python:twitter.common.log'),
+ ]
+)
+
+python_library(
+ name = 'scheduler_mux',
+ sources = ['scheduler_mux.py'],
+ dependencies = [
+ pants(':error_handling_thread'),
+ pants('3rdparty/python:twitter.common.quantity'),
+ pants('3rdparty/python:twitter.common.log'),
+ ]
+)
+
+python_library(
+ name = 'task_util',
+ sources = ['task_util.py'],
+ dependencies = [
+ pants(':scheduler_mux'),
+ pants('3rdparty/python:twitter.common.log'),
+ pants('src/main/python/apache/aurora/client:base'),
+ pants('src/main/thrift/org/apache/aurora/gen:py-thrift'),
+ ]
+)
+
+python_library(
name = 'instance_watcher',
sources = ['instance_watcher.py', 'health_check.py'],
dependencies = [
pants(':scheduler_client'),
+ pants(':scheduler_mux'),
+ pants(':task_util'),
pants('3rdparty/python:twitter.common.lang'),
pants('3rdparty/python:twitter.common.log'),
pants('src/main/python/apache/aurora/common:http_signaler'),
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/e1c0ade2/src/main/python/apache/aurora/client/api/error_handling_thread.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/aurora/client/api/error_handling_thread.py b/src/main/python/apache/aurora/client/api/error_handling_thread.py
new file mode 100644
index 0000000..530715a
--- /dev/null
+++ b/src/main/python/apache/aurora/client/api/error_handling_thread.py
@@ -0,0 +1,75 @@
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+import sys
+import traceback
+from threading import Thread
+
+from twitter.common.decorators import identify_thread
+
+try:
+ from Queue import Queue
+except ImportError:
+ from queue import Queue
+
+
+class ExecutionError(Exception):
+ """Unhandled thread error wrapper. Raised on the calling thread."""
+ pass
+
+
+class ErrorHandlingThread(Thread):
+ """A thread that helps with unhandled exceptions by re-raising errors
+ with the parent thread upon completion."""
+
+ def __init__(self, *args, **kw):
+ super(ErrorHandlingThread, self).__init__(*args, **kw)
+ self.__real_run, self.run = self.run, self._excepting_run
+ self.__errors = Queue()
+
+ @identify_thread
+ def _excepting_run(self, *args, **kw):
+ try:
+ self.__real_run(*args, **kw)
+ self.__errors.put(None)
+ except Exception:
+ try:
+ e_type, e_val, e_tb = sys.exc_info()
+ self.__errors.put(ExecutionError(
+ 'Unhandled error while running worker thread. '
+ 'Original error details: %s' % traceback.format_exception(e_type, e_val, e_tb)))
+ except: # noqa
+ # This appears to be the only way to avoid nasty "interpreter shutdown" errors when
+ # dealing with daemon threads. While not ideal, there is nothing else we could do here
+ # if the sys.exc_info() call fails.
+ pass
+
+ def join_and_raise(self):
+ """Waits for completion and re-raises any exception on a caller thread."""
+ error = self.__errors.get(timeout=sys.maxint) # Timeout for interruptibility.
+ if error is not None:
+ raise error
+
+
+def spawn_worker(target, *args, **kwargs):
+ """Creates and starts a new daemon worker thread.
+
+ Arguments:
+ target -- target method.
+
+ Returns thread handle.
+ """
+ thread = ErrorHandlingThread(target=target, *args, **kwargs)
+ thread.daemon = True
+ thread.start()
+ return thread
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/e1c0ade2/src/main/python/apache/aurora/client/api/instance_watcher.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/aurora/client/api/instance_watcher.py b/src/main/python/apache/aurora/client/api/instance_watcher.py
index d2ad6fd..fe2f551 100644
--- a/src/main/python/apache/aurora/client/api/instance_watcher.py
+++ b/src/main/python/apache/aurora/client/api/instance_watcher.py
@@ -13,10 +13,12 @@
#
import time
+from threading import Event
from twitter.common import log
from .health_check import StatusHealthCheck
+from .task_util import StatusMuxHelper
from gen.apache.aurora.api.ttypes import Identity, ResponseCode, ScheduleStatus, TaskQuery
@@ -42,7 +44,9 @@ class InstanceWatcher(object):
restart_threshold,
watch_secs,
health_check_interval_seconds,
- clock=time):
+ clock=time,
+ terminating_event=None,
+ scheduler_mux=None):
self._scheduler = scheduler
self._job_key = job_key
@@ -50,6 +54,8 @@ class InstanceWatcher(object):
self._watch_secs = watch_secs
self._health_check_interval_seconds = health_check_interval_seconds
self._clock = clock
+ self._terminating = terminating_event or Event()
+ self._status_helper = StatusMuxHelper(self._scheduler, self._create_query, scheduler_mux)
def watch(self, instance_ids, health_check=None):
"""Watches a set of instances and detects failures based on a delegated health check.
@@ -91,8 +97,8 @@ class InstanceWatcher(object):
instance_id, self._restart_threshold))
instance_states[instance_id] = Instance(finished=True)
- while True:
- running_tasks = self._get_tasks_by_instance_id(instance_ids)
+ while not self._terminating.is_set():
+ running_tasks = self._status_helper.get_tasks(instance_ids)
now = self._clock.time()
tasks_by_instance = dict((task.assignedTask.instanceId, task) for task in running_tasks)
for instance_id in instance_ids:
@@ -119,27 +125,17 @@ class InstanceWatcher(object):
return set([s_id for s_id in instance_ids if s_id not in instance_states
or not instance_states[s_id].healthy])
- self._clock.sleep(self._health_check_interval_seconds)
+ self._terminating.wait(self._health_check_interval_seconds)
- def _get_tasks_by_instance_id(self, instance_ids):
- log.debug('Querying instance statuses.')
+ def terminate(self):
+ """Requests immediate termination of the watch cycle."""
+ self._terminating.set()
+
+ def _create_query(self, instance_ids):
query = TaskQuery()
query.owner = Identity(role=self._job_key.role)
query.environment = self._job_key.environment
query.jobName = self._job_key.name
query.statuses = set([ScheduleStatus.RUNNING])
-
query.instanceIds = instance_ids
- try:
- resp = self._scheduler.getTasksWithoutConfigs(query)
- except IOError as e:
- log.error('IO Exception during scheduler call: %s' % e)
- return []
-
- tasks = []
- if resp.responseCode == ResponseCode.OK:
- tasks = resp.result.scheduleStatusResult.tasks
-
- log.debug('Response from scheduler: %s (message: %s)'
- % (ResponseCode._VALUES_TO_NAMES[resp.responseCode], resp.messageDEPRECATED))
- return tasks
+ return query
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/e1c0ade2/src/main/python/apache/aurora/client/api/job_monitor.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/aurora/client/api/job_monitor.py b/src/main/python/apache/aurora/client/api/job_monitor.py
index 6c26cb9..756093d 100644
--- a/src/main/python/apache/aurora/client/api/job_monitor.py
+++ b/src/main/python/apache/aurora/client/api/job_monitor.py
@@ -12,12 +12,14 @@
# limitations under the License.
#
-import time
+from threading import Event
from thrift.transport import TTransport
from twitter.common import log
from twitter.common.quantity import Amount, Time
+from .task_util import StatusMuxHelper
+
from gen.apache.aurora.api.constants import LIVE_STATES, TERMINAL_STATES
from gen.apache.aurora.api.ttypes import Identity, TaskQuery
@@ -34,28 +36,24 @@ class JobMonitor(object):
def terminal(cls, status):
return status in TERMINAL_STATES
- def __init__(self, scheduler, job_key, clock=time,
- min_poll_interval=MIN_POLL_INTERVAL, max_poll_interval=MAX_POLL_INTERVAL):
+ def __init__(self, scheduler, job_key, terminating_event=None,
+ min_poll_interval=MIN_POLL_INTERVAL, max_poll_interval=MAX_POLL_INTERVAL,
+ scheduler_mux=None):
self._scheduler = scheduler
self._job_key = job_key
- self._clock = clock
self._min_poll_interval = min_poll_interval
self._max_poll_interval = max_poll_interval
+ self._terminating = terminating_event or Event()
+ self._status_helper = StatusMuxHelper(self._scheduler, self.create_query, scheduler_mux)
- def iter_query(self, query):
- try:
- res = self._scheduler.getTasksWithoutConfigs(query)
- except TTransport.TTransportException as e:
- log.error('Failed to query tasks from scheduler: %s' % e)
- return
- if res is None or res.result is None:
- return
- for task in res.result.scheduleStatusResult.tasks:
+ def iter_tasks(self, instances):
+ tasks = self._status_helper.get_tasks(instances)
+ for task in tasks:
yield task
- def states(self, query):
+ def states(self, instance_ids):
states = {}
- for task in self.iter_query(query):
+ for task in self.iter_tasks(instance_ids):
status, instance_id = task.status, task.assignedTask.instanceId
first_timestamp = task.taskEvents[0].timestamp
if instance_id not in states or first_timestamp > states[instance_id][0]:
@@ -81,11 +79,17 @@ class JobMonitor(object):
Returns: True if predicate is met or False if timeout has expired.
"""
poll_interval = self._min_poll_interval
- while not all(predicate(state) for state in self.states(self.create_query(instances)).values()):
+ while not self._terminating.is_set() and not all(predicate(state) for state
+ in self.states(instances).values()):
+
if with_timeout and poll_interval >= self._max_poll_interval:
return False
- self._clock.sleep(poll_interval.as_(Time.SECONDS))
+ self._terminating.wait(poll_interval.as_(Time.SECONDS))
poll_interval = min(self._max_poll_interval, 2 * poll_interval)
return True
+
+ def terminate(self):
+ """Requests immediate termination of the wait cycle."""
+ self._terminating.set()
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/e1c0ade2/src/main/python/apache/aurora/client/api/scheduler_client.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/aurora/client/api/scheduler_client.py b/src/main/python/apache/aurora/client/api/scheduler_client.py
index e911135..4b20efa 100644
--- a/src/main/python/apache/aurora/client/api/scheduler_client.py
+++ b/src/main/python/apache/aurora/client/api/scheduler_client.py
@@ -204,6 +204,8 @@ class SchedulerProxy(object):
self._session_key_factory = session_key_factory
self._client = self._scheduler_client = None
self.verbose = verbose
+ self._lock = threading.RLock()
+ self._terminating = threading.Event()
def with_scheduler(method):
"""Decorator magic to make sure a connection is made to the scheduler"""
@@ -216,6 +218,11 @@ class SchedulerProxy(object):
def invalidate(self):
self._client = self._scheduler_client = None
+ def terminate(self):
+ """Requests immediate termination of any retry attempts and invalidates client."""
+ self._terminating.set()
+ self.invalidate()
+
@with_scheduler
def client(self):
return self._client
@@ -268,24 +275,30 @@ class SchedulerProxy(object):
@functools.wraps(method)
def method_wrapper(*args):
- start = time.time()
- while (time.time() - start) < self.RPC_MAXIMUM_WAIT.as_(Time.SECONDS):
- auth_args = () if method_name in self.UNAUTHENTICATED_RPCS else (self.session_key(),)
- try:
- method = getattr(self.client(), method_name)
- if not callable(method):
- return method
- return method(*(args + auth_args))
- except (TTransport.TTransportException, self.TimeoutError) as e:
- log.warning('Connection error with scheduler: %s, reconnecting...' % e)
- self.invalidate()
- time.sleep(self.RPC_RETRY_INTERVAL.as_(Time.SECONDS))
- except Exception as e:
- # Take any error that occurs during the RPC call, and transform it
- # into something clients can handle.
- raise self.ThriftInternalError("Error during thrift call %s to %s: %s" %
- (method_name, self.cluster.name, e))
- raise self.TimeoutError('Timed out attempting to issue %s to %s' % (
- method_name, self.cluster.name))
+ with self._lock:
+ start = time.time()
+ while not self._terminating.is_set() and (
+ time.time() - start) < self.RPC_MAXIMUM_WAIT.as_(Time.SECONDS):
+
+ auth_args = () if method_name in self.UNAUTHENTICATED_RPCS else (self.session_key(),)
+ try:
+ method = getattr(self.client(), method_name)
+ if not callable(method):
+ return method
+ return method(*(args + auth_args))
+ except (TTransport.TTransportException, self.TimeoutError) as e:
+ if not self._terminating:
+ log.warning('Connection error with scheduler: %s, reconnecting...' % e)
+ self.invalidate()
+ self._terminating.wait(self.RPC_RETRY_INTERVAL.as_(Time.SECONDS))
+ except Exception as e:
+ # Take any error that occurs during the RPC call, and transform it
+ # into something clients can handle.
+ if not self._terminating:
+ raise self.ThriftInternalError("Error during thrift call %s to %s: %s" %
+ (method_name, self.cluster.name, e))
+ if not self._terminating:
+ raise self.TimeoutError('Timed out attempting to issue %s to %s' % (
+ method_name, self.cluster.name))
return method_wrapper
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/e1c0ade2/src/main/python/apache/aurora/client/api/scheduler_mux.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/aurora/client/api/scheduler_mux.py b/src/main/python/apache/aurora/client/api/scheduler_mux.py
new file mode 100644
index 0000000..0832a13
--- /dev/null
+++ b/src/main/python/apache/aurora/client/api/scheduler_mux.py
@@ -0,0 +1,121 @@
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+import threading
+from collections import defaultdict, namedtuple
+
+from twitter.common.quantity import Amount, Time
+
+from .error_handling_thread import spawn_worker
+
+try:
+ from Queue import Queue, Empty
+except ImportError:
+ from queue import Queue, Empty
+
+
+class SchedulerMux(object):
+ """Multiplexes scheduler RPC requests on a dedicated worker thread."""
+
+ class Error(Exception):
+ """Call error wrapper."""
+ pass
+
+ OK_RESULT = 1
+ DEFAULT_WAIT_TIMEOUT = Amount(1, Time.SECONDS)
+ DEFAULT_JOIN_TIMEOUT = Amount(5, Time.SECONDS)
+ DEFAULT_RPC_TIMEOUT = Amount(120, Time.SECONDS)
+ WORK_ITEM = namedtuple('WorkItem', ['completion_queue', 'command', 'data', 'aggregator'])
+
+ def __init__(self, wait_timeout=DEFAULT_WAIT_TIMEOUT):
+ self.__queue = Queue()
+ self.__terminating = threading.Event()
+ self.__wait_timeout = wait_timeout
+ self.__worker = spawn_worker(self.__monitor)
+
+ def __monitor(self):
+ """Main body of the multiplexer thread.
+
+ This method repeatedly polls the worker queue for new calls, and then
+ dispatches them in batches to the scheduler.
+ Callers are notified when their requests complete."""
+
+ requests_by_command = defaultdict(list)
+ while not self.__terminating.is_set():
+ try:
+ work_item = self.__queue.get(timeout=self.__wait_timeout.as_(Time.SECONDS))
+ requests_by_command[work_item.command].append(work_item)
+ except Empty:
+ self.__call_and_notify(requests_by_command)
+ requests_by_command = defaultdict(list)
+
+ def __call_and_notify(self, requests_by_command):
+ """Batch executes scheduler requests and notifies on completion.
+
+ Takes a set of RPC requests grouped by command type, dispatches them to the scheduler,
+ and then waits for the batched calls to complete. When a call is completed, its callers
+ will be notified via the completion queue."""
+
+ for command, work_items in requests_by_command.items():
+ request = [item.data for item in work_items]
+ request = work_items[0].aggregator(request) if work_items[0].aggregator else request
+ result_status = self.OK_RESULT
+ result_data = None
+ try:
+ result_data = command(request)
+ except (self.Error, Exception) as e:
+ result_status = e
+
+ for work_item in work_items:
+ work_item.completion_queue.put((result_status, result_data))
+
+ def _enqueue(self, completion_queue, command, data, aggregator):
+ """Queues up a scheduler call for a delayed (batched) completion.
+
+ Arguments:
+ completion_queue -- completion queue to notify caller on completion.
+ command -- callback signature accepting a list of data.
+ data -- single request data object to be batched with other similar requests.
+ aggregator -- callback function for data aggregation.
+ """
+ self.__queue.put(self.WORK_ITEM(completion_queue, command, data, aggregator))
+
+ def terminate(self):
+ """Requests the SchedulerMux to terminate."""
+ self.__terminating.set()
+ self.__worker.join(timeout=self.DEFAULT_JOIN_TIMEOUT.as_(Time.SECONDS))
+
+ def enqueue_and_wait(self, command, data, aggregator=None, timeout=DEFAULT_RPC_TIMEOUT):
+ """Queues up the scheduler call and waits for completion.
+
+ Arguments:
+ command -- scheduler command to run.
+ data -- data to query scheduler for.
+ aggregator -- callback function for data aggregation.
+ timeout -- amount of time to wait for completion.
+
+ Returns the aggregated command call response. Response data decomposition is up to the caller.
+ """
+ try:
+ completion_queue = Queue()
+ self._enqueue(completion_queue, command, data, aggregator)
+ result = completion_queue.get(timeout=timeout.as_(Time.SECONDS))
+ result_status = result[0]
+ if result_status != self.OK_RESULT and not self.__terminating.is_set():
+ if isinstance(result_status, self.Error):
+ raise result_status
+ else:
+ raise self.Error('Unknown error: %s' % result_status)
+ return result[1]
+ except Empty:
+ raise self.Error('Failed to complete operation within %s' % timeout)
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/e1c0ade2/src/main/python/apache/aurora/client/api/sla.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/aurora/client/api/sla.py b/src/main/python/apache/aurora/client/api/sla.py
index 50befea..b9b6468 100644
--- a/src/main/python/apache/aurora/client/api/sla.py
+++ b/src/main/python/apache/aurora/client/api/sla.py
@@ -16,7 +16,9 @@ import math
import time
from collections import defaultdict, namedtuple
-from apache.aurora.client.base import DEFAULT_GROUPING, group_hosts, log_response
+from twitter.common import log
+
+from apache.aurora.client.base import DEFAULT_GROUPING, format_response, group_hosts
from apache.aurora.common.aurora_job_key import AuroraJobKey
from gen.apache.aurora.api.constants import LIVE_STATES
@@ -323,7 +325,7 @@ class Sla(object):
def _get_tasks(self, task_query):
resp = self._scheduler.getTasksWithoutConfigs(task_query)
- log_response(resp)
+ log.info(format_response(resp))
if resp.responseCode != ResponseCode.OK:
return []
return resp.result.scheduleStatusResult.tasks
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/e1c0ade2/src/main/python/apache/aurora/client/api/task_util.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/aurora/client/api/task_util.py b/src/main/python/apache/aurora/client/api/task_util.py
new file mode 100644
index 0000000..b5244ee
--- /dev/null
+++ b/src/main/python/apache/aurora/client/api/task_util.py
@@ -0,0 +1,101 @@
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+from itertools import chain
+
+from twitter.common import log
+
+from apache.aurora.client.base import format_response
+
+from .scheduler_mux import SchedulerMux
+
+from gen.apache.aurora.api.ttypes import ResponseCode
+
+
+class StatusMuxHelper(object):
+ """Handles mux/demux logic of the getTasksWithoutConfigs RPC."""
+
+ def __init__(self, scheduler, query_factory, scheduler_mux=None):
+ self._scheduler = scheduler
+ self._query_factory = query_factory
+ self._scheduler_mux = scheduler_mux
+
+ def get_tasks(self, instance_ids=None):
+ """Routes call to either immediate direct or multiplexed threaded execution.
+
+ Arguments:
+ instance_ids -- optional list of instance IDs to query for.
+
+ Returns a list of tasks.
+ """
+ log.debug('Querying instance statuses: %s' % instance_ids)
+
+ if self._scheduler_mux is not None:
+ return self._get_tasks_multiplexed(instance_ids)
+ else:
+ return self._get_tasks(self._query_factory(instance_ids))
+
+ def _get_tasks_multiplexed(self, instance_ids=None):
+ """Gets tasks via SchedulerMux.
+
+ Arguments:
+ instance_ids -- optional list of instance IDs to query for.
+
+ Returns a list of tasks.
+ """
+ tasks = []
+ include_ids = lambda id: id in instance_ids if instance_ids is not None else True
+
+ log.debug('Batch getting task status: %s' % instance_ids)
+ try:
+ unfiltered_tasks = self._scheduler_mux.enqueue_and_wait(
+ self._get_tasks,
+ instance_ids if instance_ids else [],
+ self._create_aggregated_query)
+ tasks = [task for task in unfiltered_tasks if include_ids(task.assignedTask.instanceId)]
+ except SchedulerMux.Error as e:
+ log.error('Failed to query status for instances %s. Reason: %s' % (instance_ids, e))
+
+ log.debug('Done batch getting task status: %s' % instance_ids)
+ return tasks
+
+ def _get_tasks(self, query):
+ """Gets tasks directly via SchedulerProxy.
+
+ Arguments:
+ query -- TaskQuery instance.
+
+ Returns a list of tasks.
+ """
+ try:
+ resp = self._scheduler.getTasksWithoutConfigs(query)
+ except IOError as e:
+ log.error('IO Exception during scheduler call: %s' % e)
+ return []
+
+ tasks = []
+ if resp.responseCode == ResponseCode.OK:
+ tasks = resp.result.scheduleStatusResult.tasks
+
+ log.debug(format_response(resp))
+ return tasks
+
+ def _create_aggregated_query(self, instance_id_lists):
+ """Aggregates multiple instance_id lists into a single list.
+
+ Arguments:
+ instance_id_lists -- list of lists of int.
+ """
+ instance_ids = list(chain.from_iterable(instance_id_lists))
+ log.debug('Aggregated instance ids to query status: %s' % instance_ids)
+ return self._query_factory(instance_ids)
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/e1c0ade2/src/main/python/apache/aurora/client/api/updater.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/aurora/client/api/updater.py b/src/main/python/apache/aurora/client/api/updater.py
index c592651..05b4c0c 100644
--- a/src/main/python/apache/aurora/client/api/updater.py
+++ b/src/main/python/apache/aurora/client/api/updater.py
@@ -13,17 +13,22 @@
#
import json
+import signal
from collections import namedtuple
from difflib import unified_diff
+from threading import Lock as threading_lock
from thrift.protocol import TJSONProtocol
from thrift.TSerialization import serialize
from twitter.common import log
+from twitter.common.quantity import Amount, Time
+from .error_handling_thread import ExecutionError, spawn_worker
from .instance_watcher import InstanceWatcher
from .job_monitor import JobMonitor
from .quota_check import CapacityRequest, QuotaCheck
from .scheduler_client import SchedulerProxy
+from .scheduler_mux import SchedulerMux
from .updater_util import FailureThreshold, UpdaterConfig
from gen.apache.aurora.api.constants import ACTIVE_STATES
@@ -39,22 +44,29 @@ from gen.apache.aurora.api.ttypes import (
TaskQuery
)
-InstanceState = namedtuple('InstanceState', ['instance_id', 'is_updated'])
+try:
+ from Queue import Queue, Empty
+except ImportError:
+ from queue import Queue, Empty
-OperationConfigs = namedtuple('OperationConfigs', ['from_config', 'to_config'])
-
+class Updater(object):
+ """Performs an update command using a collection of parallel threads.
+ The number of parallel threads used is determined by the UpdateConfig.batch_size."""
-InstanceConfigs = namedtuple(
- 'InstanceConfigs',
- ['remote_config_map', 'local_config_map', 'instances_to_process']
-)
+ class Error(Exception):
+ """Updater error wrapper."""
+ pass
+ RPC_COMPLETION_TIMEOUT_SECS = Amount(120, Time.SECONDS)
-class Updater(object):
- """Update the instances of a job in batches."""
+ OPERATION_CONFIGS = namedtuple('OperationConfigs', ['from_config', 'to_config'])
+ INSTANCE_CONFIGS = namedtuple(
+ 'InstanceConfigs',
+ ['remote_config_map', 'local_config_map', 'instances_to_process']
+ )
- class Error(Exception): pass
+ INSTANCE_DATA = namedtuple('InstanceData', ['instance_id', 'operation_configs'])
def __init__(self,
config,
@@ -62,30 +74,43 @@ class Updater(object):
scheduler=None,
instance_watcher=None,
quota_check=None,
- job_monitor=None):
+ job_monitor=None,
+ scheduler_mux=None,
+ rpc_completion_timeout=RPC_COMPLETION_TIMEOUT_SECS):
self._config = config
self._job_key = JobKey(role=config.role(), environment=config.environment(), name=config.name())
self._health_check_interval_seconds = health_check_interval_seconds
self._scheduler = scheduler or SchedulerProxy(config.cluster())
self._quota_check = quota_check or QuotaCheck(self._scheduler)
- self._job_monitor = job_monitor or JobMonitor(self._scheduler, self._config.job_key())
+ self._scheduler_mux = scheduler_mux or SchedulerMux()
+ self._job_monitor = job_monitor or JobMonitor(
+ self._scheduler,
+ self._config.job_key(),
+ scheduler_mux=self._scheduler_mux)
+ self._rpc_completion_timeout = rpc_completion_timeout
try:
self._update_config = UpdaterConfig(**config.update_config().get())
except ValueError as e:
raise self.Error(str(e))
self._lock = None
+ self._thread_lock = threading_lock()
+ self.failure_threshold = FailureThreshold(
+ self._update_config.max_per_instance_failures,
+ self._update_config.max_total_failures
+ )
self._watcher = instance_watcher or InstanceWatcher(
self._scheduler,
self._job_key,
self._update_config.restart_threshold,
self._update_config.watch_secs,
- self._health_check_interval_seconds)
+ self._health_check_interval_seconds,
+ scheduler_mux=self._scheduler_mux)
+ self._terminating = False
def _start(self):
"""Starts an update by applying an exclusive lock on a job being updated.
- Returns:
- Response instance from the scheduler call.
+ Returns Response instance from the scheduler call.
"""
resp = self._scheduler.acquireLock(LockKey(job=self._job_key))
if resp.responseCode == ResponseCode.OK:
@@ -95,8 +120,7 @@ class Updater(object):
def _finish(self):
"""Finishes an update by removing an exclusive lock on an updated job.
- Returns:
- Response instance from the scheduler call.
+ Returns Response instance from the scheduler call.
"""
resp = self._scheduler.releaseLock(self._lock, LockValidation.CHECKED)
@@ -106,96 +130,261 @@ class Updater(object):
log.error('There was an error finalizing the update: %s' % resp.messageDEPRECATED)
return resp
+ def int_handler(self, *args):
+ """Ensures keyboard interrupt exception is raised on a main thread."""
+ raise KeyboardInterrupt()
+
def _update(self, instance_configs):
- """Drives execution of the update logic. Performs a batched update/rollback for all instances
- affected by the current update request.
+ """Drives execution of the update logic.
+
+ Performs instance updates in parallel using a number of threads bound by
+ the batch_size config option.
Arguments:
instance_configs -- list of instance update configurations to go through.
Returns the set of instances that failed to update.
"""
- failure_threshold = FailureThreshold(
- self._update_config.max_per_instance_failures,
- self._update_config.max_total_failures
- )
+ # Register signal handler to ensure KeyboardInterrupt is received by a main thread.
+ signal.signal(signal.SIGINT, self.int_handler)
+
+ instances_to_update = [
+ self.INSTANCE_DATA(
+ instance_id,
+ self.OPERATION_CONFIGS(
+ from_config=instance_configs.remote_config_map,
+ to_config=instance_configs.local_config_map))
+ for instance_id in instance_configs.instances_to_process
+ ]
- instance_operation = OperationConfigs(
- from_config=instance_configs.remote_config_map,
- to_config=instance_configs.local_config_map
- )
+ log.info('Instances to update: %s' % instance_configs.instances_to_process)
+ update_queue = self._update_instances_in_parallel(self._update_instance, instances_to_update)
+
+ if self._is_failed_update(quiet=False):
+ if not self._update_config.rollback_on_failure:
+ log.info('Rollback on failure is disabled in config. Aborting rollback')
+ return
+
+ rollback_ids = self._get_rollback_ids(instance_configs.instances_to_process, update_queue)
+ instances_to_revert = [
+ self.INSTANCE_DATA(
+ instance_id,
+ self.OPERATION_CONFIGS(
+ from_config=instance_configs.local_config_map,
+ to_config=instance_configs.remote_config_map))
+ for instance_id in rollback_ids
+ ]
- remaining_instances = [
- InstanceState(instance_id, is_updated=False)
- for instance_id in instance_configs.instances_to_process
- ]
+ log.info('Reverting update for: %s' % rollback_ids)
+ self._update_instances_in_parallel(self._revert_instance, instances_to_revert)
+
+ return not self._is_failed_update()
+
+ def _update_instances_in_parallel(self, target, instances_to_update):
+ """Processes instance updates in parallel and waits for completion.
+
+ Arguments:
+ target -- target method to handle instance update.
+ instances_to_update -- list of InstanceData with update details.
+
+ Returns Queue with non-updated instance data.
+ """
+ log.info('Processing in parallel with %s worker thread(s)' % self._update_config.batch_size)
+ instance_queue = Queue()
+ for instance_to_update in instances_to_update:
+ instance_queue.put(instance_to_update)
+
+ try:
+ threads = []
+ for _ in range(self._update_config.batch_size):
+ threads.append(spawn_worker(target, kwargs={'instance_queue': instance_queue}))
+
+ for thread in threads:
+ thread.join_and_raise()
+ except Exception:
+ self._terminate()
+ raise
+
+ return instance_queue
+
+ def _terminate(self):
+ """Attempts to terminate all outstanding activities."""
+ if not self._terminating:
+ log.info('Cleaning up')
+ self._terminating = True
+ self._scheduler.terminate()
+ self._job_monitor.terminate()
+ self._scheduler_mux.terminate()
+ self._watcher.terminate()
+
+ def _update_instance(self, instance_queue):
+ """Works through the instance_queue and performs instance updates (one at a time).
+
+ Arguments:
+ instance_queue -- Queue of InstanceData to update.
+ """
+ while not self._terminating and not self._is_failed_update():
+ try:
+ instance_data = instance_queue.get_nowait()
+ except Empty:
+ return
+
+ update = True
+ restart = False
+ while update or restart and not self._terminating and not self._is_failed_update():
+ instances_to_watch = []
+ if update:
+ instances_to_watch += self._kill_and_add_instance(instance_data)
+ update = False
+ else:
+ instances_to_watch += self._request_restart_instance(instance_data)
+
+ if instances_to_watch:
+ failed_instances = self._watcher.watch(instances_to_watch)
+ restart = self._is_restart_needed(failed_instances)
+
+ def _revert_instance(self, instance_queue):
+ """Works through the instance_queue and performs instance rollbacks (one at a time).
+
+ Arguments:
+ instance_queue -- Queue of InstanceData to revert.
+ """
+ while not self._terminating:
+ try:
+ instance_data = instance_queue.get_nowait()
+ except Empty:
+ return
+
+ log.info('Reverting instance: %s' % instance_data.instance_id)
+ instances_to_watch = self._kill_and_add_instance(instance_data)
+ if instances_to_watch and self._watcher.watch(instances_to_watch):
+ log.error('Rollback failed for instance: %s' % instance_data.instance_id)
+
+ def _kill_and_add_instance(self, instance_data):
+ """Acquires update instructions and performs required kill/add/kill+add sequence.
+
+ Arguments:
+ instance_data -- InstanceData to update.
+
+ Returns added instance ID.
+ """
+ log.info('Examining instance: %s' % instance_data.instance_id)
+ to_kill, to_add = self._create_kill_add_lists(
+ [instance_data.instance_id],
+ instance_data.operation_configs)
+ if not to_kill and not to_add:
+ log.info('Skipping unchanged instance: %s' % instance_data.instance_id)
+ return to_add
+
+ if to_kill:
+ self._request_kill_instance(instance_data)
+ if to_add:
+ self._request_add_instance(instance_data)
+
+ return to_add
+
+ def _request_kill_instance(self, instance_data):
+ """Instructs the scheduler to kill instance and waits for completion.
- log.info('Starting job update.')
- while remaining_instances and not failure_threshold.is_failed_update():
- batch_instances = remaining_instances[0:self._update_config.batch_size]
- remaining_instances = list(set(remaining_instances) - set(batch_instances))
- instances_to_restart = [s.instance_id for s in batch_instances if s.is_updated]
- instances_to_update = [s.instance_id for s in batch_instances if not s.is_updated]
+ Arguments:
+ instance_data -- InstanceData to kill.
+ """
+ log.info('Killing instance: %s' % instance_data.instance_id)
+ self._enqueue_and_wait(instance_data, self._kill_instances)
+ result = self._job_monitor.wait_until(
+ JobMonitor.terminal,
+ [instance_data.instance_id],
+ with_timeout=True)
+
+ if not result:
+ raise self.Error('Instance %s was not killed in time' % instance_data.instance_id)
+ log.info('Killed: %s' % instance_data.instance_id)
+
+ def _request_add_instance(self, instance_data):
+ """Instructs the scheduler to add instance.
+
+ Arguments:
+ instance_data -- InstanceData to add.
+ """
+ log.info('Adding instance: %s' % instance_data.instance_id)
+ self._enqueue_and_wait(instance_data, self._add_instances)
+ log.info('Added: %s' % instance_data.instance_id)
+
+ def _request_restart_instance(self, instance_data):
+ """Instructs the scheduler to restart instance.
+
+ Arguments:
+ instance_data -- InstanceData to restart.
+
+ Returns restarted instance ID.
+ """
+ log.info('Restarting instance: %s' % instance_data.instance_id)
+ self._enqueue_and_wait(instance_data, self._restart_instances)
+ log.info('Restarted: %s' % instance_data.instance_id)
+ return [instance_data.instance_id]
- instances_to_watch = []
- if instances_to_restart:
- instances_to_watch += self._restart_instances(instances_to_restart)
+ def _enqueue_and_wait(self, instance_data, command):
+ """Queues up the scheduler call and waits for completion.
- if instances_to_update:
- instances_to_watch += self._update_instances(instances_to_update, instance_operation)
+ Arguments:
+ instance_data -- InstanceData to query scheduler for.
+ command -- scheduler command to run.
+ """
+ try:
+ self._scheduler_mux.enqueue_and_wait(
+ command,
+ instance_data,
+ timeout=self._rpc_completion_timeout)
+ except SchedulerMux.Error as e:
+ raise self.Error('Failed to complete instance %s operation. Reason: %s'
+ % (instance_data.instance_id, e))
+
+ def _is_failed_update(self, quiet=True):
+ """Verifies the update status in a thread-safe manner.
+
+ Arguments:
+ quiet -- Whether the logging should be suppressed in case of a failed update. Default True.
- failed_instances = self._watcher.watch(instances_to_watch) if instances_to_watch else set()
+ Returns True if update failed, False otherwise.
+ """
+ with self._thread_lock:
+ return self.failure_threshold.is_failed_update(log_errors=not quiet)
+
+ def _is_restart_needed(self, failed_instances):
+ """Checks if there are any failed instances recoverable via restart.
+
+ Arguments:
+ failed_instances -- Failed instance IDs.
- if failed_instances:
- log.error('Failed instances: %s' % failed_instances)
+ Returns True if restart is allowed, False otherwise (i.e. update failed).
+ """
+ if not failed_instances:
+ return False
- unretryable_instances = failure_threshold.update_failure_counts(failed_instances)
+ log.info('Failed instances: %s' % failed_instances)
+
+ with self._thread_lock:
+ unretryable_instances = self.failure_threshold.update_failure_counts(failed_instances)
if unretryable_instances:
log.warn('Not restarting failed instances %s, which exceeded '
'maximum allowed instance failure limit of %s' %
(unretryable_instances, self._update_config.max_per_instance_failures))
- retryable_instances = list(set(failed_instances) - set(unretryable_instances))
- remaining_instances += [
- InstanceState(instance_id, is_updated=True) for instance_id in retryable_instances
- ]
- remaining_instances.sort(key=lambda tup: tup.instance_id)
-
- if failure_threshold.is_failed_update():
- untouched_instances = [s.instance_id for s in remaining_instances if not s.is_updated]
- instances_to_rollback = list(
- set(instance_configs.instances_to_process) - set(untouched_instances)
- )
- self._rollback(instances_to_rollback, instance_configs)
-
- return not failure_threshold.is_failed_update()
+ return False if unretryable_instances else True
- def _rollback(self, instances_to_rollback, instance_configs):
- """Performs a rollback operation for the failed instances.
+ def _get_rollback_ids(self, update_list, update_queue):
+ """Gets a list of instance ids to rollback.
Arguments:
- instances_to_rollback -- instance ids to rollback.
- instance_configs -- instance configuration to use for rollback.
+ update_list -- original list of instances intended for update.
+ update_queue -- untouched instances not processed during update.
+
+ Returns sorted list of instance IDs to rollback.
"""
- if not self._update_config.rollback_on_failure:
- log.info('Rollback on failure is disabled in config. Aborting rollback')
- return
-
- log.info('Reverting update for %s' % instances_to_rollback)
- instance_operation = OperationConfigs(
- from_config=instance_configs.local_config_map,
- to_config=instance_configs.remote_config_map
- )
- instances_to_rollback.sort(reverse=True)
- failed_instances = []
- while instances_to_rollback:
- batch_instances = instances_to_rollback[0:self._update_config.batch_size]
- instances_to_rollback = list(set(instances_to_rollback) - set(batch_instances))
- instances_to_rollback.sort(reverse=True)
- instances_to_watch = self._update_instances(batch_instances, instance_operation)
- failed_instances += self._watcher.watch(instances_to_watch)
-
- if failed_instances:
- log.error('Rollback failed for instances: %s' % failed_instances)
+ untouched_ids = []
+ while not update_queue.empty():
+ untouched_ids.append(update_queue.get_nowait().instance_id)
+
+ return sorted(list(set(update_list) - set(untouched_ids)), reverse=True)
def _hashable(self, element):
if isinstance(element, (list, set)):
@@ -240,8 +429,8 @@ class Updater(object):
if from_config and to_config:
diff_output = self._diff_configs(from_config, to_config)
if diff_output:
- log.debug('Task configuration changed for instance [%s]:\n%s' % (
- instance_id, diff_output))
+ log.debug('Task configuration changed for instance [%s]:\n%s'
+ % (instance_id, diff_output))
to_kill.append(instance_id)
to_add.append(instance_id)
elif from_config and not to_config:
@@ -253,68 +442,46 @@ class Updater(object):
return to_kill, to_add
- def _update_instances(self, instance_ids, operation_configs):
- """Applies kill/add actions for the specified batch instances.
+ def _kill_instances(self, instance_data):
+ """Instructs the scheduler to batch-kill instances and waits for completion.
Arguments:
- instance_ids -- current batch of IDs to process.
- operation_configs -- OperationConfigs with update details.
-
- Returns a list of added instances.
+ instance_data -- list of InstanceData to kill.
"""
- log.info('Examining instances: %s' % instance_ids)
-
- to_kill, to_add = self._create_kill_add_lists(instance_ids, operation_configs)
+ instance_ids = [data.instance_id for data in instance_data]
+ log.debug('Batch killing instances: %s' % instance_ids)
+ query = self._create_task_query(instanceIds=frozenset(int(s) for s in instance_ids))
+ self._check_and_log_response(self._scheduler.killTasks(query, self._lock))
+ log.debug('Done batch killing instances: %s' % instance_ids)
- unchanged = list(set(instance_ids) - set(to_kill + to_add))
- if unchanged:
- log.info('Skipping unchanged instances: %s' % unchanged)
-
- self._kill_instances(to_kill)
- self._add_instances(to_add, operation_configs.to_config)
- return to_add
-
- def _kill_instances(self, instance_ids):
- """Instructs the scheduler to kill instances and waits for completion.
+ def _add_instances(self, instance_data):
+ """Instructs the scheduler to batch-add instances.
Arguments:
- instance_ids -- list of IDs to kill.
+ instance_data -- list of InstanceData to add.
"""
- if instance_ids:
- log.info('Killing instances: %s' % instance_ids)
- query = self._create_task_query(instanceIds=frozenset(int(s) for s in instance_ids))
- self._check_and_log_response(self._scheduler.killTasks(query, self._lock))
- res = self._job_monitor.wait_until(JobMonitor.terminal, instance_ids, with_timeout=True)
- if not res:
- raise self.Error('Tasks were not killed in time.')
- log.info('Instances killed')
-
- def _add_instances(self, instance_ids, to_config):
- """Instructs the scheduler to add instances.
+ instance_ids = [data.instance_id for data in instance_data]
+ to_config = instance_data[0].operation_configs.to_config
- Arguments:
- instance_ids -- list of IDs to add.
- to_config -- OperationConfigs with update details.
- """
- if instance_ids:
- log.info('Adding instances: %s' % instance_ids)
- add_config = AddInstancesConfig(
- key=self._job_key,
- taskConfig=to_config[instance_ids[0]], # instance_ids will always have at least 1 item.
- instanceIds=frozenset(int(s) for s in instance_ids))
- self._check_and_log_response(self._scheduler.addInstances(add_config, self._lock))
- log.info('Instances added')
-
- def _restart_instances(self, instance_ids):
- """Instructs the scheduler to restart instances.
+ log.debug('Batch adding instances: %s' % instance_ids)
+ add_config = AddInstancesConfig(
+ key=self._job_key,
+ taskConfig=to_config[instance_ids[0]], # instance_ids will always have at least 1 item.
+ instanceIds=frozenset(int(s) for s in instance_ids))
+ self._check_and_log_response(self._scheduler.addInstances(add_config, self._lock))
+ log.debug('Done batch adding instances: %s' % instance_ids)
+
+ def _restart_instances(self, instance_data):
+ """Instructs the scheduler to batch-restart instances.
Arguments:
- instance_ids -- set of instances to be restarted by the scheduler.
+ instance_data -- list of InstanceData to restart.
"""
- log.info('Restarting instances: %s' % instance_ids)
+ instance_ids = [data.instance_id for data in instance_data]
+ log.debug('Batch restarting instances: %s' % instance_ids)
resp = self._scheduler.restartShards(self._job_key, instance_ids, self._lock)
self._check_and_log_response(resp)
- return instance_ids
+ log.debug('Done batch restarting instances: %s' % instance_ids)
def _validate_quota(self, instance_configs):
"""Validates job update will not exceed quota for production tasks.
@@ -323,7 +490,7 @@ class Updater(object):
Returns Response.OK if quota check was successful.
"""
- instance_operation = OperationConfigs(
+ instance_operation = self.OPERATION_CONFIGS(
from_config=instance_configs.remote_config_map,
to_config=instance_configs.local_config_map
)
@@ -389,7 +556,7 @@ class Updater(object):
# Populate local config map
local_config_map = dict.fromkeys(job_config_instances, local_task_config)
- return InstanceConfigs(remote_config_map, local_config_map, instances_to_process)
+ return self.INSTANCE_CONFIGS(remote_config_map, local_config_map, instances_to_process)
def _get_existing_tasks(self):
"""Loads all existing tasks from the scheduler.
@@ -436,6 +603,7 @@ class Updater(object):
def update(self, instances=None):
"""Performs the job update, blocking until it completes.
+
A rollback will be performed if the update was considered a failure based on the
update configuration.
@@ -444,33 +612,36 @@ class Updater(object):
Returns a response object with update result status.
"""
- resp = self._start()
- if resp.responseCode != ResponseCode.OK:
- return resp
-
try:
- # Handle cron jobs separately from other jobs.
- if self._replace_template_if_cron():
- log.info('Cron template updated, next run will reflect changes')
- return self._finish()
- else:
- try:
- instance_configs = self._get_update_instructions(instances)
- self._check_and_log_response(self._validate_quota(instance_configs))
- except self.Error as e:
- # Safe to release the lock acquired above as no job mutation has happened yet.
- self._finish()
- return self._failed_response('Unable to start job update: %s' % e)
-
- if not self._update(instance_configs):
- log.warn('Update failures threshold reached')
- self._finish()
- return self._failed_response('Update reverted')
- else:
- log.info('Update successful')
+ resp = self._start()
+ if resp.responseCode != ResponseCode.OK:
+ return resp
+
+ try:
+ # Handle cron jobs separately from other jobs.
+ if self._replace_template_if_cron():
+ log.info('Cron template updated, next run will reflect changes')
return self._finish()
- except self.Error as e:
- return self._failed_response('Aborting update without rollback! Fatal error: %s' % e)
+ else:
+ try:
+ instance_configs = self._get_update_instructions(instances)
+ self._check_and_log_response(self._validate_quota(instance_configs))
+ except self.Error as e:
+ # Safe to release the lock acquired above as no job mutation has happened yet.
+ self._finish()
+ return self._failed_response('Unable to start job update: %s' % e)
+
+ if not self._update(instance_configs):
+ log.warn('Update failures threshold reached')
+ self._finish()
+ return self._failed_response('Update reverted')
+ else:
+ log.info('Update successful')
+ return self._finish()
+ except (self.Error, ExecutionError, Exception) as e:
+ return self._failed_response('Aborting update without rollback! Fatal error: %s' % e)
+ finally:
+ self._scheduler_mux.terminate()
@classmethod
def cancel_update(cls, scheduler, job_key):
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/e1c0ade2/src/main/python/apache/aurora/client/api/updater_util.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/aurora/client/api/updater_util.py b/src/main/python/apache/aurora/client/api/updater_util.py
index 6b689c1..c5f8f23 100644
--- a/src/main/python/apache/aurora/client/api/updater_util.py
+++ b/src/main/python/apache/aurora/client/api/updater_util.py
@@ -75,11 +75,11 @@ class FailureThreshold(object):
return exceeded_failure_count_instances
- def is_failed_update(self):
+ def is_failed_update(self, log_errors=True):
total_failed_instances = self._exceeded_instance_fail_count()
is_failed = total_failed_instances > self._max_total_failures
- if is_failed:
+ if is_failed and log_errors:
log.error('%s failed instances observed, maximum allowed is %s' % (total_failed_instances,
self._max_total_failures))
for instance, failure_count in self._failures_by_instance.items():
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/e1c0ade2/src/main/python/apache/aurora/client/base.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/aurora/client/base.py b/src/main/python/apache/aurora/client/base.py
index 663a247..0c8e97e 100644
--- a/src/main/python/apache/aurora/client/base.py
+++ b/src/main/python/apache/aurora/client/base.py
@@ -40,13 +40,13 @@ def die(msg):
sys.exit(1)
-def log_response(resp):
- log.info('Response from scheduler: %s (message: %s)'
- % (ResponseCode._VALUES_TO_NAMES[resp.responseCode], resp.messageDEPRECATED))
+def format_response(resp):
+ return 'Response from scheduler: %s (message: %s)' % (
+ ResponseCode._VALUES_TO_NAMES[resp.responseCode], resp.messageDEPRECATED)
def check_and_log_response(resp):
- log_response(resp)
+ log.info(format_response(resp))
if resp.responseCode != ResponseCode.OK:
if resp.responseCode == ResponseCode.LOCK_ERROR:
log.info(LOCKED_WARNING)
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/e1c0ade2/src/test/python/apache/aurora/client/api/BUILD
----------------------------------------------------------------------
diff --git a/src/test/python/apache/aurora/client/api/BUILD b/src/test/python/apache/aurora/client/api/BUILD
index 804195b..db5c223 100644
--- a/src/test/python/apache/aurora/client/api/BUILD
+++ b/src/test/python/apache/aurora/client/api/BUILD
@@ -22,6 +22,7 @@ python_test_suite(name = 'all',
pants(':updater'),
pants(':quota_check'),
pants(':sla'),
+ pants(':mux')
],
)
@@ -94,6 +95,22 @@ python_tests(name = 'sla',
]
)
+python_tests(name = 'mux',
+ sources = ['test_scheduler_mux.py'],
+ dependencies = [
+ pants('src/main/python/apache/aurora/client/api:scheduler_mux'),
+ ]
+)
+
+python_tests(name = 'task_util',
+ sources = ['test_task_util.py'],
+ dependencies = [
+ pants('3rdparty/python:mock'),
+ pants('src/main/python/apache/aurora/client/api:task_util'),
+ pants('src/main/thrift/org/apache/aurora/gen:py-thrift'),
+ ]
+)
+
python_tests(name = 'updater',
sources = ['test_updater.py'],
dependencies = [
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/e1c0ade2/src/test/python/apache/aurora/client/api/test_instance_watcher.py
----------------------------------------------------------------------
diff --git a/src/test/python/apache/aurora/client/api/test_instance_watcher.py b/src/test/python/apache/aurora/client/api/test_instance_watcher.py
index 93b43e8..ae1b24b 100644
--- a/src/test/python/apache/aurora/client/api/test_instance_watcher.py
+++ b/src/test/python/apache/aurora/client/api/test_instance_watcher.py
@@ -47,6 +47,21 @@ class FakeClock(object):
self._now_seconds += seconds
+class FakeEvent(object):
+ def __init__(self, clock):
+ self._clock = clock
+ self._is_set = False
+
+ def wait(self, seconds):
+ self._clock.sleep(seconds)
+
+ def is_set(self):
+ return self._is_set
+
+ def set(self):
+ self._is_set = True
+
+
def find_expected_cycles(period, sleep_secs):
return ceil(period / sleep_secs) + 1
@@ -61,6 +76,7 @@ class InstanceWatcherTest(unittest.TestCase):
self._env = 'test'
self._name = 'jimbob'
self._clock = FakeClock()
+ self._event = FakeEvent(self._clock)
self._scheduler = mox.MockObject(scheduler_client)
job_key = JobKey(name=self._name, environment=self._env, role=self._role)
self._health_check = mox.MockObject(HealthCheck)
@@ -69,7 +85,8 @@ class InstanceWatcherTest(unittest.TestCase):
self.RESTART_THRESHOLD,
self.WATCH_SECS,
health_check_interval_seconds=3,
- clock=self._clock)
+ clock=self._clock,
+ terminating_event=self._event)
def get_tasks_status_query(self, instance_ids):
query = TaskQuery()
@@ -212,3 +229,11 @@ class InstanceWatcherTest(unittest.TestCase):
self.replay_mocks()
self.assert_watch_result([2])
self.verify_mocks()
+
+ def test_terminated_exits_immediately(self):
+ """Terminated instance watched should bail out immediately."""
+ self.replay_mocks()
+ self._watcher.terminate()
+ result = self._watcher.watch([], self._health_check)
+ assert result is None, ('Expected instances None : Returned instances (%s)' % result)
+ self.verify_mocks()
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/e1c0ade2/src/test/python/apache/aurora/client/api/test_job_monitor.py
----------------------------------------------------------------------
diff --git a/src/test/python/apache/aurora/client/api/test_job_monitor.py b/src/test/python/apache/aurora/client/api/test_job_monitor.py
index 3cc876f..5b26539 100644
--- a/src/test/python/apache/aurora/client/api/test_job_monitor.py
+++ b/src/test/python/apache/aurora/client/api/test_job_monitor.py
@@ -32,17 +32,26 @@ from gen.apache.aurora.api.ttypes import (
)
-class FakeClock(object):
- def sleep(self, seconds):
+class FakeEvent(object):
+ def __init__(self):
+ self._is_set = False
+
+ def wait(self, seconds):
pass
+ def is_set(self):
+ return self._is_set
+
+ def set(self):
+ self._is_set = True
+
class JobMonitorTest(unittest.TestCase):
def setUp(self):
self._scheduler = Mock()
self._job_key = AuroraJobKey('cl', 'johndoe', 'test', 'test_job')
- self._clock = FakeClock()
+ self._event = FakeEvent()
def create_task(self, status, id):
return ScheduledTask(
@@ -108,6 +117,11 @@ class JobMonitorTest(unittest.TestCase):
self.create_task(ScheduleStatus.RUNNING, '3'),
])
- monitor = JobMonitor(self._scheduler, self._job_key, clock=self._clock)
+ monitor = JobMonitor(self._scheduler, self._job_key, terminating_event=self._event)
assert not monitor.wait_until(monitor.terminal, with_timeout=True)
self.expect_task_status()
+
+ def test_terminated_exits_immediately(self):
+ self._event.set()
+ monitor = JobMonitor(self._scheduler, self._job_key, terminating_event=self._event)
+ assert monitor.wait_until(monitor.terminal)
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/e1c0ade2/src/test/python/apache/aurora/client/api/test_scheduler_mux.py
----------------------------------------------------------------------
diff --git a/src/test/python/apache/aurora/client/api/test_scheduler_mux.py b/src/test/python/apache/aurora/client/api/test_scheduler_mux.py
new file mode 100644
index 0000000..021175c
--- /dev/null
+++ b/src/test/python/apache/aurora/client/api/test_scheduler_mux.py
@@ -0,0 +1,72 @@
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+import time
+import unittest
+
+from twitter.common.quantity import Amount, Time
+
+from apache.aurora.client.api.scheduler_mux import SchedulerMux
+
+
+class SchedulerMuxTest(unittest.TestCase):
+
+ DATA = [1, 2, 3]
+ MUX = None
+
+ @classmethod
+ def setUpClass(cls):
+ cls.MUX = SchedulerMux(wait_timeout=Amount(10, Time.MILLISECONDS))
+
+ @classmethod
+ def tearDownClass(cls):
+ cls.MUX.terminate()
+
+ @classmethod
+ def error_command(cls, data):
+ raise SchedulerMux.Error('expected')
+
+ @classmethod
+ def unknown_error_command(cls, data):
+ raise Exception('expected')
+
+ @classmethod
+ def timeout_command(cls, data):
+ time.sleep(2)
+
+ def test_success(self):
+ assert [self.DATA] == self.MUX.enqueue_and_wait(lambda d: d, self.DATA)
+
+ def test_failure(self):
+ try:
+ self.MUX.enqueue_and_wait(self.error_command, self.DATA)
+ except SchedulerMux.Error as e:
+ assert 'expected' in e.message
+ else:
+ self.fail()
+
+ def test_unknown_failure(self):
+ try:
+ self.MUX.enqueue_and_wait(self.unknown_error_command, self.DATA)
+ except SchedulerMux.Error as e:
+ assert 'Unknown error' in e.message
+ else:
+ self.fail()
+
+ def test_timeout(self):
+ try:
+ self.MUX.enqueue_and_wait(self.timeout_command, self.DATA, timeout=Amount(1, Time.SECONDS))
+ except SchedulerMux.Error as e:
+ 'Failed to complete operation' in e.message
+ else:
+ self.fail()
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/e1c0ade2/src/test/python/apache/aurora/client/api/test_task_util.py
----------------------------------------------------------------------
diff --git a/src/test/python/apache/aurora/client/api/test_task_util.py b/src/test/python/apache/aurora/client/api/test_task_util.py
new file mode 100644
index 0000000..582c708
--- /dev/null
+++ b/src/test/python/apache/aurora/client/api/test_task_util.py
@@ -0,0 +1,83 @@
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import unittest
+
+from mock import Mock
+
+from apache.aurora.client.api.scheduler_mux import SchedulerMux
+from apache.aurora.client.api.task_util import StatusMuxHelper
+
+from gen.apache.aurora.api.ttypes import (
+ AssignedTask,
+ Response,
+ ResponseCode,
+ Result,
+ ScheduledTask,
+ ScheduleStatusResult,
+ TaskQuery
+)
+
+
+class TaskUtilTest(unittest.TestCase):
+ INSTANCES = [1]
+
+ @classmethod
+ def create_query(cls, instances):
+ query = TaskQuery()
+ query.instanceIds = set(instances)
+ return query
+
+ @classmethod
+ def create_mux_helper(cls, scheduler, query, scheduler_mux=None):
+ return StatusMuxHelper(scheduler, query, scheduler_mux=scheduler_mux)
+
+ @classmethod
+ def create_tasks(cls):
+ return [ScheduledTask(assignedTask=AssignedTask(instanceId=index)) for index in cls.INSTANCES]
+
+ @classmethod
+ def mock_mux(cls, tasks):
+ mux = Mock(spec=SchedulerMux)
+ mux.enqueue_and_wait.return_value = tasks
+ return mux
+
+ @classmethod
+ def mock_scheduler(cls, response_code=None):
+ scheduler = Mock()
+ response_code = ResponseCode.OK if response_code is None else response_code
+ resp = Response(responseCode=response_code, messageDEPRECATED='test')
+ resp.result = Result(scheduleStatusResult=ScheduleStatusResult(tasks=cls.create_tasks()))
+ scheduler.getTasksWithoutConfigs.return_value = resp
+ return scheduler
+
+ def test_no_mux_run(self):
+ scheduler = self.mock_scheduler()
+ helper = self.create_mux_helper(scheduler, self.create_query)
+ tasks = helper.get_tasks(self.INSTANCES)
+
+ scheduler.getTasksWithoutConfigs.assert_called_once_with(self.create_query(self.INSTANCES))
+ assert 1 == len(tasks)
+
+ def test_mux_run(self):
+ expected_tasks = self.create_tasks()
+ mux = self.mock_mux(expected_tasks)
+ helper = self.create_mux_helper(None, self.create_query, scheduler_mux=mux)
+ tasks = helper.get_tasks(self.INSTANCES)
+
+ mux.enqueue_and_wait.assert_called_once_with(
+ helper._get_tasks,
+ self.INSTANCES,
+ helper._create_aggregated_query)
+ assert 1 == len(tasks)
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/e1c0ade2/src/test/python/apache/aurora/client/api/test_updater.py
----------------------------------------------------------------------
diff --git a/src/test/python/apache/aurora/client/api/test_updater.py b/src/test/python/apache/aurora/client/api/test_updater.py
index 48f82c8..7020712 100644
--- a/src/test/python/apache/aurora/client/api/test_updater.py
+++ b/src/test/python/apache/aurora/client/api/test_updater.py
@@ -22,6 +22,7 @@ from pytest import raises
from apache.aurora.client.api.instance_watcher import InstanceWatcher
from apache.aurora.client.api.job_monitor import JobMonitor
from apache.aurora.client.api.quota_check import CapacityRequest, QuotaCheck
+from apache.aurora.client.api.scheduler_mux import SchedulerMux
from apache.aurora.client.api.updater import Updater
from apache.aurora.client.fake_scheduler_proxy import FakeSchedulerProxy
from apache.aurora.common.aurora_job_key import AuroraJobKey
@@ -38,6 +39,7 @@ from gen.apache.aurora.api.ttypes import (
JobConfiguration,
JobKey,
LimitConstraint,
+ Lock,
LockKey,
LockValidation,
Metadata,
@@ -102,9 +104,25 @@ class FakeConfig(object):
return self.job_config.instanceCount
+class FakeSchedulerMux(object):
+ def __init__(self):
+ self._raise_error = False
+
+ def enqueue_and_wait(self, command, data, timeout=None):
+ command([data])
+ if self._raise_error:
+ raise SchedulerMux.Error("expected")
+
+ def terminate(self):
+ pass
+
+ def raise_error(self):
+ self._raise_error = True
+
+
class UpdaterTest(TestCase):
UPDATE_CONFIG = {
- 'batch_size': 3,
+ 'batch_size': 1,
'restart_threshold': 50,
'watch_secs': 50,
'max_per_shard_failures': 0,
@@ -112,6 +130,7 @@ class UpdaterTest(TestCase):
'rollback_on_failure': True,
}
+
def setUp(self):
self._role = 'mesos'
self._name = 'jimbob'
@@ -121,6 +140,7 @@ class UpdaterTest(TestCase):
self._lock = 'test_lock'
self._instance_watcher = MockObject(InstanceWatcher)
self._job_monitor = MockObject(JobMonitor)
+ self._scheduler_mux = FakeSchedulerMux()
self._scheduler = MockObject(scheduler_client)
self._scheduler_proxy = FakeSchedulerProxy('test-cluster', self._scheduler, self._session_key)
self._quota_check = MockObject(QuotaCheck)
@@ -149,10 +169,17 @@ class UpdaterTest(TestCase):
self._scheduler_proxy,
self._instance_watcher,
self._quota_check,
- self._job_monitor)
+ self._job_monitor,
+ self._scheduler_mux)
+
+ def expect_terminate(self):
+ self._job_monitor.terminate()
+ self._instance_watcher.terminate()
def expect_watch_instances(self, instance_ids, failed_instances=[]):
- self._instance_watcher.watch(instance_ids).AndReturn(set(failed_instances))
+ for i in instance_ids:
+ failed = [i] if i in failed_instances else []
+ self._instance_watcher.watch(instance_ids).AndReturn(set(failed))
def expect_populate(self, job_config, response_code=None):
response_code = ResponseCode.OK if response_code is None else response_code
@@ -182,38 +209,60 @@ class UpdaterTest(TestCase):
self._scheduler.replaceCronTemplate(job_config, self._lock, self._session_key).AndReturn(resp)
def expect_restart(self, instance_ids, response_code=None):
- response_code = ResponseCode.OK if response_code is None else response_code
- response = Response(responseCode=response_code, messageDEPRECATED='test')
- self._scheduler.restartShards(
- self._job_key,
- instance_ids,
- self._lock,
- self._session_key).AndReturn(response)
-
- def expect_kill(self, instance_ids, response_code=None, monitor_result=True):
- response_code = ResponseCode.OK if response_code is None else response_code
- response = Response(responseCode=response_code, messageDEPRECATED='test')
- query = TaskQuery(
- owner=Identity(role=self._job_key.role),
- environment=self._job_key.environment,
- jobName=self._job_key.name,
- statuses=ACTIVE_STATES,
- instanceIds=frozenset([int(s) for s in instance_ids]))
- self._scheduler.killTasks(query, self._lock, self._session_key).AndReturn(response)
- if response_code != ResponseCode.OK:
+ for i in instance_ids:
+ response_code = ResponseCode.OK if response_code is None else response_code
+ response = Response(responseCode=response_code, messageDEPRECATED='test')
+ self._scheduler.restartShards(
+ self._job_key,
+ [i],
+ self._lock,
+ self._session_key).AndReturn(response)
+
+ def expect_kill(self, instance_ids, response_code=None, monitor_result=True, skip_monitor=False):
+ for i in instance_ids:
+ response_code = ResponseCode.OK if response_code is None else response_code
+ response = Response(responseCode=response_code, messageDEPRECATED='test')
+ query = TaskQuery(
+ owner=Identity(role=self._job_key.role),
+ environment=self._job_key.environment,
+ jobName=self._job_key.name,
+ statuses=ACTIVE_STATES,
+ instanceIds=frozenset([int(i)]))
+ self._scheduler.killTasks(query, self._lock, self._session_key).AndReturn(response)
+
+ self.expect_job_monitor(response_code, instance_ids, monitor_result, skip_monitor)
+
+ def expect_job_monitor(self, response_code, instance_ids, monitor_result=True, skip=False):
+ if skip or response_code != ResponseCode.OK:
return
self._job_monitor.wait_until(JobMonitor.terminal, instance_ids, with_timeout=True).AndReturn(
- monitor_result)
+ monitor_result)
def expect_add(self, instance_ids, task_config, response_code=None):
- response_code = ResponseCode.OK if response_code is None else response_code
- response = Response(responseCode=response_code, messageDEPRECATED='test')
- add_config = AddInstancesConfig(
- key=self._job_key,
- taskConfig=task_config,
- instanceIds=frozenset([int(s) for s in instance_ids]))
- self._scheduler.addInstances(add_config, self._lock, self._session_key).AndReturn(response)
+ for i in instance_ids:
+ response_code = ResponseCode.OK if response_code is None else response_code
+ response = Response(responseCode=response_code, messageDEPRECATED='test')
+ add_config = AddInstancesConfig(
+ key=self._job_key,
+ taskConfig=task_config,
+ instanceIds=frozenset([int(i)]))
+ self._scheduler.addInstances(add_config, self._lock, self._session_key).AndReturn(response)
+
+ def expect_update_instances(self, instance_ids, task_config):
+ for i in instance_ids:
+ self.expect_kill([i])
+ self.expect_add([i], task_config)
+ self.expect_watch_instances([i])
+
+ def expect_add_instances(self, instance_ids, task_config):
+ for i in instance_ids:
+ self.expect_add([i], task_config)
+ self.expect_watch_instances([i])
+
+ def expect_kill_instances(self, instance_ids):
+ for i in instance_ids:
+ self.expect_kill([i])
def expect_start(self, response_code=None):
response_code = ResponseCode.OK if response_code is None else response_code
@@ -288,10 +337,7 @@ class UpdaterTest(TestCase):
self.expect_get_tasks(old_configs)
self.expect_populate(job_config)
self.expect_quota_check(0, 4)
- self.expect_add([3, 4, 5], new_config)
- self.expect_watch_instances([3, 4, 5])
- self.expect_add([6], new_config)
- self.expect_watch_instances([6])
+ self.expect_add_instances([3, 4, 5, 6], new_config)
self.expect_finish()
self.replay_mocks()
@@ -343,8 +389,7 @@ class UpdaterTest(TestCase):
self.expect_populate(job_config)
self.expect_quota_check(1, 0, prod=False)
self.expect_kill([0])
- self.expect_add([0, 1, 2], new_config)
- self.expect_watch_instances([0, 1, 2])
+ self.expect_add_instances([0, 1, 2], new_config)
self.expect_finish()
self.replay_mocks()
@@ -361,9 +406,7 @@ class UpdaterTest(TestCase):
self.expect_get_tasks(old_configs)
self.expect_populate(job_config)
self.expect_quota_check(7, 0)
- self.expect_kill([3, 4, 5])
- self.expect_kill([6, 7, 8])
- self.expect_kill([9])
+ self.expect_kill_instances([3, 4, 5, 6, 7, 8, 9])
self.expect_finish()
self.replay_mocks()
@@ -381,13 +424,8 @@ class UpdaterTest(TestCase):
self.expect_get_tasks(old_configs)
self.expect_populate(job_config)
self.expect_quota_check(3, 7)
- self.expect_kill([0, 1, 2])
- self.expect_add([0, 1, 2], new_config)
- self.expect_watch_instances([0, 1, 2])
- self.expect_add([3, 4, 5], new_config)
- self.expect_watch_instances([3, 4, 5])
- self.expect_add([6], new_config)
- self.expect_watch_instances([6])
+ self.expect_update_instances([0, 1, 2], new_config)
+ self.expect_add_instances([3, 4, 5, 6], new_config)
self.expect_finish()
self.replay_mocks()
@@ -405,12 +443,8 @@ class UpdaterTest(TestCase):
self.expect_get_tasks(old_configs)
self.expect_populate(job_config)
self.expect_quota_check(10, 1)
- self.expect_kill([0, 1, 2])
- self.expect_add([0], new_config)
- self.expect_watch_instances([0])
- self.expect_kill([3, 4, 5])
- self.expect_kill([6, 7, 8])
- self.expect_kill([9])
+ self.expect_update_instances([0], new_config)
+ self.expect_kill_instances([1, 2, 3, 4, 5, 6, 7, 8, 9])
self.expect_finish()
self.replay_mocks()
@@ -428,12 +462,7 @@ class UpdaterTest(TestCase):
self.expect_get_tasks(old_configs)
self.expect_populate(job_config)
self.expect_quota_check(5, 5)
- self.expect_kill([0, 1, 2])
- self.expect_add([0, 1, 2], new_config)
- self.expect_watch_instances([0, 1, 2])
- self.expect_kill([3, 4])
- self.expect_add([3, 4], new_config)
- self.expect_watch_instances([3, 4])
+ self.expect_update_instances([0, 1, 2, 3, 4], new_config)
self.expect_finish()
self.replay_mocks()
@@ -450,8 +479,7 @@ class UpdaterTest(TestCase):
self.expect_get_tasks(old_configs)
self.expect_populate(job_config)
self.expect_quota_check(0, 2)
- self.expect_add([3, 4], new_config)
- self.expect_watch_instances([3, 4])
+ self.expect_add_instances([3, 4], new_config)
self.expect_finish()
self.replay_mocks()
@@ -468,8 +496,7 @@ class UpdaterTest(TestCase):
self.expect_get_tasks(old_configs)
self.expect_populate(job_config)
self.expect_quota_check(6, 0)
- self.expect_kill([4, 5, 6])
- self.expect_kill([7, 8, 9])
+ self.expect_kill_instances([4, 5, 6, 7, 8, 9])
self.expect_finish()
self.replay_mocks()
@@ -487,15 +514,14 @@ class UpdaterTest(TestCase):
self.expect_get_tasks(old_configs)
self.expect_populate(job_config)
self.expect_quota_check(3, 3)
- self.expect_kill([2, 3, 4])
- self.expect_add([2, 3, 4], new_config)
- self.expect_watch_instances([2, 3, 4])
+ self.expect_update_instances([2, 3, 4], new_config)
self.expect_finish()
self.replay_mocks()
self.update_and_expect_ok(instances=[2, 3, 4])
self.verify_mocks()
+
def test_patch_hole_with_instance_option(self):
"""Patching an instance ID gap created by a terminated update."""
old_configs = self.make_task_configs(8)
@@ -506,8 +532,7 @@ class UpdaterTest(TestCase):
self.expect_get_tasks(old_configs, [2, 3])
self.expect_populate(job_config)
self.expect_quota_check(0, 2)
- self.expect_add([2, 3], new_config)
- self.expect_watch_instances([2, 3])
+ self.expect_add_instances([2, 3], new_config)
self.expect_finish()
self.replay_mocks()
@@ -533,7 +558,7 @@ class UpdaterTest(TestCase):
def test_update_rollback(self):
"""Update process failures exceed total allowable count and update is rolled back."""
update_config = self.UPDATE_CONFIG.copy()
- update_config.update(max_total_failures=2, max_per_shard_failures=1)
+ update_config.update(max_per_shard_failures=1)
self.init_updater(update_config)
old_configs = self.make_task_configs(10)
@@ -545,55 +570,13 @@ class UpdaterTest(TestCase):
self.expect_get_tasks(old_configs)
self.expect_populate(job_config)
self.expect_quota_check(10, 10)
- self.expect_kill([0, 1, 2])
- self.expect_add([0, 1, 2], new_config)
- self.expect_watch_instances([0, 1, 2], failed_instances=[0, 1, 2])
- self.expect_restart([0, 1, 2])
- self.expect_watch_instances([0, 1, 2], failed_instances=[0, 1, 2])
- self.expect_kill([2, 1, 0])
- self.expect_add([2, 1, 0], old_configs[0])
- self.expect_watch_instances([2, 1, 0])
- self.expect_finish()
- self.replay_mocks()
-
- self.update_and_expect_response(ResponseCode.ERROR)
- self.verify_mocks()
-
- def test_update_rollback_sorted(self):
- """Rolling back with a batch of 1 should still be correctly sorted in reverse"""
- update_config = self.UPDATE_CONFIG.copy()
- update_config.update(max_total_failures=0, max_per_shard_failures=1, batch_size=1)
- self.init_updater(update_config)
-
- old_configs = self.make_task_configs(5)
- new_config = deepcopy(old_configs[0])
- new_config.priority = 5
- job_config = self.make_job_config(new_config, 5)
- self._config.job_config = job_config
- self.expect_start()
- self.expect_get_tasks(old_configs)
- self.expect_populate(job_config)
- self.expect_quota_check(5, 5)
- self.expect_kill([0])
- self.expect_add([0], new_config)
- self.expect_watch_instances([0])
- self.expect_kill([1])
- self.expect_add([1], new_config)
- self.expect_watch_instances([1])
+ self.expect_update_instances([0, 1], new_config)
self.expect_kill([2])
self.expect_add([2], new_config)
self.expect_watch_instances([2], failed_instances=[2])
self.expect_restart([2])
self.expect_watch_instances([2], failed_instances=[2])
- self.expect_kill([2])
- self.expect_add([2], old_configs[0])
- self.expect_watch_instances([2])
- self.expect_kill([1])
- self.expect_add([1], old_configs[0])
- self.expect_watch_instances([1])
- self.expect_kill([0])
- self.expect_add([0], old_configs[0])
- self.expect_watch_instances([0])
+ self.expect_update_instances([2, 1, 0], old_configs[0])
self.expect_finish()
self.replay_mocks()
@@ -615,14 +598,13 @@ class UpdaterTest(TestCase):
self.expect_get_tasks(old_configs)
self.expect_populate(job_config)
self.expect_quota_check(6, 6)
- self.expect_kill([0, 1, 2])
- self.expect_add([0, 1, 2], new_config)
- self.expect_watch_instances([0, 1, 2], failed_instances=[0, 1, 2])
- self.expect_restart([0, 1, 2])
- self.expect_watch_instances([0, 1, 2])
- self.expect_kill([3, 4, 5])
- self.expect_add([3, 4, 5], new_config)
- self.expect_watch_instances([3, 4, 5])
+ self.expect_update_instances([0, 1], new_config)
+ self.expect_kill([2])
+ self.expect_add([2], new_config)
+ self.expect_watch_instances([2], failed_instances=[2])
+ self.expect_restart([2])
+ self.expect_watch_instances([2])
+ self.expect_update_instances([3, 4, 5], new_config)
self.expect_finish()
self.replay_mocks()
@@ -695,7 +677,9 @@ class UpdaterTest(TestCase):
self.expect_get_tasks(old_configs)
self.expect_populate(job_config)
self.expect_quota_check(5, 5)
- self.expect_kill([0, 1, 2], response_code=ResponseCode.INVALID_REQUEST)
+ self._scheduler_mux.raise_error()
+ self.expect_kill([0], skip_monitor=True)
+ self.expect_terminate()
self.replay_mocks()
self.update_and_expect_response(ResponseCode.ERROR)
@@ -712,7 +696,8 @@ class UpdaterTest(TestCase):
self.expect_get_tasks(old_configs)
self.expect_populate(job_config)
self.expect_quota_check(5, 5)
- self.expect_kill([0, 1, 2], monitor_result=False)
+ self.expect_kill([0], monitor_result=False)
+ self.expect_terminate()
self.replay_mocks()
self.update_and_expect_response(ResponseCode.ERROR)
@@ -763,50 +748,15 @@ class UpdaterTest(TestCase):
self.expect_get_tasks(old_configs)
self.expect_populate(job_config)
self.expect_quota_check(10, 10)
- self.expect_kill([0, 1, 2])
- self.expect_add([0, 1, 2], new_config)
- self.expect_watch_instances([0, 1, 2], failed_instances=[0])
- self.expect_restart([0])
- self.expect_kill([3, 4])
- self.expect_add([3, 4], new_config)
- self.expect_watch_instances([0, 3, 4], failed_instances=[0])
- self.expect_restart([0])
- self.expect_kill([5, 6])
- self.expect_add([5, 6], new_config)
- self.expect_watch_instances([0, 5, 6], failed_instances=[0])
- self.expect_kill([7, 8, 9])
- self.expect_add([7, 8, 9], new_config)
- self.expect_watch_instances([7, 8, 9])
- self.expect_finish()
- self.replay_mocks()
-
- self.update_and_expect_ok()
- self.verify_mocks()
-
- def test_failed_unretryable_do_not_cause_rollback(self):
- """Update process still succeeds if failed instances in last batch are within allowed limit."""
- update_config = self.UPDATE_CONFIG.copy()
- update_config.update(max_total_failures=1, max_per_shard_failures=2)
- self.init_updater(update_config)
-
- old_configs = self.make_task_configs(5)
- new_config = deepcopy(old_configs[0])
- new_config.priority = 5
- job_config = self.make_job_config(new_config, 5)
- self._config.job_config = job_config
- self.expect_start()
- self.expect_get_tasks(old_configs)
- self.expect_populate(job_config)
- self.expect_quota_check(5, 5)
- self.expect_kill([0, 1, 2])
- self.expect_add([0, 1, 2], new_config)
- self.expect_watch_instances([0, 1, 2], failed_instances=[0])
- self.expect_restart([0])
- self.expect_kill([3, 4])
- self.expect_add([3, 4], new_config)
- self.expect_watch_instances([0, 3, 4], failed_instances=[0])
- self.expect_restart([0])
- self.expect_watch_instances([0], failed_instances=[0])
+ self.expect_update_instances([0, 1], new_config)
+ self.expect_kill([2])
+ self.expect_add([2], new_config)
+ self.expect_watch_instances([2], failed_instances=[2])
+ self.expect_restart([2])
+ self.expect_watch_instances([2], failed_instances=[2])
+ self.expect_restart([2])
+ self.expect_watch_instances([2], failed_instances=[2])
+ self.expect_update_instances([3, 4, 5, 6, 7, 8, 9], new_config)
self.expect_finish()
self.replay_mocks()
@@ -839,7 +789,7 @@ class UpdaterTest(TestCase):
def test_update_no_rollback(self):
"""Update process failures exceed total allowable count and update is not rolled back."""
update_config = self.UPDATE_CONFIG.copy()
- update_config.update(max_total_failures=2, max_per_shard_failures=1, rollback_on_failure=False)
+ update_config.update(max_total_failures=1, max_per_shard_failures=1, rollback_on_failure=False)
self.init_updater(update_config)
old_configs = self.make_task_configs(10)
@@ -851,11 +801,16 @@ class UpdaterTest(TestCase):
self.expect_get_tasks(old_configs)
self.expect_populate(job_config)
self.expect_quota_check(10, 10)
- self.expect_kill([0, 1, 2])
- self.expect_add([0, 1, 2], new_config)
- self.expect_watch_instances([0, 1, 2], failed_instances=[0, 1, 2])
- self.expect_restart([0, 1, 2])
- self.expect_watch_instances([0, 1, 2], failed_instances=[0, 1, 2])
+ self.expect_kill([0])
+ self.expect_add([0], new_config)
+ self.expect_watch_instances([0], failed_instances=[0])
+ self.expect_restart([0])
+ self.expect_watch_instances([0], failed_instances=[0])
+ self.expect_kill([1])
+ self.expect_add([1], new_config)
+ self.expect_watch_instances([1], failed_instances=[1])
+ self.expect_restart([1])
+ self.expect_watch_instances([1], failed_instances=[1])
self.expect_finish()
self.replay_mocks()
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/e1c0ade2/src/test/python/apache/aurora/client/cli/test_create.py
----------------------------------------------------------------------
diff --git a/src/test/python/apache/aurora/client/cli/test_create.py b/src/test/python/apache/aurora/client/cli/test_create.py
index af548ae..ca635bd 100644
--- a/src/test/python/apache/aurora/client/cli/test_create.py
+++ b/src/test/python/apache/aurora/client/cli/test_create.py
@@ -95,7 +95,9 @@ class TestClientCreateCommand(AuroraClientCommandTest):
# object, and everything can be stubbed through that.
mock_context = FakeAuroraCommandContext()
with contextlib.nested(
- patch('time.sleep'),
+ # TODO(maxim): Patching threading.Event with all possible namespace/patch/mock
+ # combinations did not produce the desired effect. Investigate why (AURORA-510)
+ patch('threading._Event.wait'),
patch('apache.aurora.client.cli.jobs.Job.create_context', return_value=mock_context)):
# After making the client, create sets up a job monitor.
# The monitor uses TaskQuery to get the tasks. It's called at least twice:once before
@@ -127,7 +129,7 @@ class TestClientCreateCommand(AuroraClientCommandTest):
"""
mock_context = FakeAuroraCommandContext()
with contextlib.nested(
- patch('time.sleep'),
+ patch('threading._Event.wait'),
patch('apache.aurora.client.cli.jobs.Job.create_context', return_value=mock_context)):
mock_query = self.create_mock_query()
for result in [ScheduleStatus.PENDING, ScheduleStatus.PENDING, ScheduleStatus.RUNNING]: