You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aurora.apache.org by ma...@apache.org on 2014/08/15 02:56:20 UTC
git commit: Implementing client job lock and start update APIs.
Repository: incubator-aurora
Updated Branches:
refs/heads/master 040e71ef9 -> 8d5a62cfd
Implementing client job lock and start update APIs.
Bugs closed: AURORA-615
Reviewed at https://reviews.apache.org/r/24702/
Project: http://git-wip-us.apache.org/repos/asf/incubator-aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-aurora/commit/8d5a62cf
Tree: http://git-wip-us.apache.org/repos/asf/incubator-aurora/tree/8d5a62cf
Diff: http://git-wip-us.apache.org/repos/asf/incubator-aurora/diff/8d5a62cf
Branch: refs/heads/master
Commit: 8d5a62cfd063e437c3d117c6acb7d3eb431b3b62
Parents: 040e71e
Author: Maxim Khutornenko <ma...@apache.org>
Authored: Thu Aug 14 17:55:49 2014 -0700
Committer: Maxim Khutornenko <ma...@apache.org>
Committed: Thu Aug 14 17:55:49 2014 -0700
----------------------------------------------------------------------
.../python/apache/aurora/client/api/__init__.py | 65 +++++++-
.../apache/aurora/client/api/updater_util.py | 42 +++++
src/test/python/apache/aurora/client/api/BUILD | 76 ++++++----
.../python/apache/aurora/client/api/test_api.py | 152 +++++++++++++++++++
.../aurora/client/api/test_updater_util.py | 44 ++++++
5 files changed, 347 insertions(+), 32 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/8d5a62cf/src/main/python/apache/aurora/client/api/__init__.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/aurora/client/api/__init__.py b/src/main/python/apache/aurora/client/api/__init__.py
index 371137e..62de93b 100644
--- a/src/main/python/apache/aurora/client/api/__init__.py
+++ b/src/main/python/apache/aurora/client/api/__init__.py
@@ -24,9 +24,20 @@ from .restarter import Restarter
from .scheduler_client import SchedulerProxy
from .sla import Sla
from .updater import Updater
+from .updater_util import UpdaterConfig
from gen.apache.aurora.api.constants import LIVE_STATES
-from gen.apache.aurora.api.ttypes import Identity, ResourceAggregate, ResponseCode, TaskQuery
+from gen.apache.aurora.api.ttypes import (
+ Identity,
+ JobKey,
+ JobUpdateRequest,
+ Lock,
+ LockKey,
+ LockValidation,
+ ResourceAggregate,
+ ResponseCode,
+ TaskQuery
+)
class AuroraClientAPI(object):
@@ -36,6 +47,7 @@ class AuroraClientAPI(object):
class TypeError(Error, TypeError): pass
class ClusterMismatch(Error, ValueError): pass
class ThriftInternalError(Error): pass
+ class UpdateConfigError(Error): pass
def __init__(self, cluster, verbose=False, session_key_factory=make_session_key):
if not isinstance(cluster, Cluster):
@@ -84,9 +96,7 @@ class AuroraClientAPI(object):
def kill_job(self, job_key, instances=None, lock=None):
log.info("Killing tasks for job: %s" % job_key)
- if not isinstance(job_key, AuroraJobKey):
- raise TypeError('Expected type of job_key %r to be %s but got %s instead'
- % (job_key, AuroraJobKey.__name__, job_key.__class__.__name__))
+ self._assert_valid_job_key(job_key)
# Leave query.owner.user unset so the query doesn't filter jobs only submitted by a particular
# user.
@@ -134,6 +144,48 @@ class AuroraClientAPI(object):
return updater.update(instances)
+ def acquire_job_lock(self, job_key):
+ """Acquires an exclusive job lock preventing any simultaneous mutations (update, kill, etc.)
+
+ Arguments:
+ job_key - AuroraJobKey instance.
+ """
+ self._assert_valid_job_key(job_key)
+ return self._scheduler_proxy.acquireLock(LockKey(job=job_key.to_thrift()))
+
+ def release_job_lock(self, lock):
+ """Releases the previously-acquired job lock.
+
+ Requires valid Lock instance. See cancel_update if release without lock handle is required.
+
+ Arguments:
+ lock - Previously acquired Lock instance.
+ """
+ self._assert_valid_lock(lock)
+ return self._scheduler_proxy.releaseLock(lock, LockValidation.CHECKED)
+
+ def start_job_update(self, config, lock, instances=None):
+ """Requests Scheduler to start job update process.
+
+ Arguments:
+ config - AuroraConfig instance with update details.
+ lock - Job Lock instance to ensure exclusive job mutation access.
+ instances - Optional list of instances to restrict update to.
+ """
+ try:
+ settings = UpdaterConfig(**config.update_config().get()).to_thrift_update_settings(instances)
+ except ValueError as e:
+ raise self.UpdateConfigError(str(e))
+
+ request = JobUpdateRequest(
+ jobKey=JobKey(role=config.role(), environment=config.environment(), name=config.name()),
+ instanceCount=config.instances(),
+ settings=settings,
+ taskConfig=config.job().taskConfig
+ )
+
+ return self._scheduler_proxy.startJobUpdate(request, lock)
+
def cancel_update(self, job_key):
"""Cancel the update represented by job_key. Returns whether or not the cancellation was
successful."""
@@ -227,6 +279,11 @@ class AuroraClientAPI(object):
min_instance_count,
hosts)
+ def _assert_valid_lock(self, lock):
+ if not isinstance(lock, Lock):
+ raise self.TypeError('Invalid lock %r: expected %s but got %s'
+ % (lock, AuroraJobKey.__name__, lock.__class__.__name__))
+
def _assert_valid_job_key(self, job_key):
if not isinstance(job_key, AuroraJobKey):
raise self.TypeError('Invalid job_key %r: expected %s but got %s'
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/8d5a62cf/src/main/python/apache/aurora/client/api/updater_util.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/aurora/client/api/updater_util.py b/src/main/python/apache/aurora/client/api/updater_util.py
index cbb93e4..178c2fb 100644
--- a/src/main/python/apache/aurora/client/api/updater_util.py
+++ b/src/main/python/apache/aurora/client/api/updater_util.py
@@ -13,9 +13,13 @@
#
import collections
+from itertools import groupby
+from operator import itemgetter
from twitter.common import log
+from gen.apache.aurora.api.ttypes import JobUpdateSettings, Range
+
class UpdaterConfig(object):
"""
@@ -54,6 +58,44 @@ class UpdaterConfig(object):
self.rollback_on_failure = rollback_on_failure
self.wait_for_batch_completion = wait_for_batch_completion
+ @classmethod
+ def instances_to_ranges(cls, instances):
+ """Groups instance IDs into a set of contiguous integer ranges.
+
+ Every Range(first, last) represents a closed span of instance IDs. For example,
+ :param instances:
+ instances=[0,1,2,5,8,9] would result in the following set of ranges:
+ (Range(first=0, last=2], Range(first=5, last=5), Range(first=8, last=9))
+
+ Algorithm: http://stackoverflow.com/questions/3149440
+
+ Arguments:
+ instances - sorted list of instance IDs.
+ """
+ if not instances:
+ return None
+
+ ranges = set()
+ for _, group in groupby(enumerate(instances), lambda(element, position): element - position):
+ range_seq = map(itemgetter(1), group)
+ ranges.add(Range(first=range_seq[0], last=range_seq[-1]))
+ return ranges
+
+ def to_thrift_update_settings(self, instances=None):
+ """Converts UpdateConfig into thrift JobUpdateSettings object.
+
+ Arguments:
+ instances - optional list of instances to update.
+ """
+ return JobUpdateSettings(
+ updateGroupSize=self.batch_size,
+ maxPerInstanceFailures=self.max_per_instance_failures,
+ maxFailedInstances=self.max_total_failures,
+ maxWaitToInstanceRunningMs=self.restart_threshold,
+ minWaitInInstanceRunningMs=self.watch_secs,
+ rollbackOnFailure=self.rollback_on_failure,
+ updateOnlyTheseInstances=self.instances_to_ranges(instances) if instances else None)
+
class FailureThreshold(object):
def __init__(self, max_per_instance_failures, max_total_failures):
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/8d5a62cf/src/test/python/apache/aurora/client/api/BUILD
----------------------------------------------------------------------
diff --git a/src/test/python/apache/aurora/client/api/BUILD b/src/test/python/apache/aurora/client/api/BUILD
index db5c223..b4f08c6 100644
--- a/src/test/python/apache/aurora/client/api/BUILD
+++ b/src/test/python/apache/aurora/client/api/BUILD
@@ -14,15 +14,27 @@
python_test_suite(name = 'all',
dependencies = [
+ pants(':api'),
pants(':disambiguator'),
+ pants(':instance_watcher'),
pants(':job_monitor'),
+ pants(':mux'),
+ pants(':quota_check'),
pants(':restarter'),
pants(':scheduler_client'),
- pants(':instance_watcher'),
- pants(':updater'),
- pants(':quota_check'),
pants(':sla'),
- pants(':mux')
+ pants(':updater'),
+ pants(':updater_util')
+ ],
+)
+
+python_tests(name = 'api',
+ sources = ['test_api.py'],
+ dependencies = [
+ pants('3rdparty/python:mock'),
+ pants('src/main/python/apache/aurora/client/api:api'),
+ pants('src/main/python/apache/aurora/client:config'),
+ pants('src/main/thrift/org/apache/aurora/gen:py-thrift')
],
)
@@ -47,41 +59,48 @@ python_tests(name = 'job_monitor',
],
)
-python_tests(name = 'restarter',
- sources = ['test_restarter.py'],
+python_tests(name = 'instance_watcher',
+ sources = ['test_instance_watcher.py', 'test_health_check.py'],
dependencies = [
pants('3rdparty/python:mox'),
- pants('src/main/python/apache/aurora/client/api:restarter'),
- pants('src/main/python/apache/aurora/common:aurora_job_key'),
+ pants('src/main/python/apache/aurora/client/api:instance_watcher'),
pants('src/main/thrift/org/apache/aurora/gen:py-thrift'),
- pants('src/test/python/apache/aurora/client:fake_scheduler_proxy'),
- ],
+ ]
)
-python_tests(name = 'scheduler_client',
- sources = ['test_scheduler_client.py'],
+python_tests(name = 'mux',
+ sources = ['test_scheduler_mux.py'],
+ dependencies = [
+ pants('src/main/python/apache/aurora/client/api:scheduler_mux'),
+ ]
+)
+
+python_tests(name = 'quota_check',
+ sources = ['test_quota_check.py'],
dependencies = [
pants('3rdparty/python:mock'),
- pants('3rdparty/python:mox'),
- pants('src/main/python/apache/aurora/client/api:scheduler_client'),
+ pants('src/main/python/apache/aurora/client/api:quota_check'),
pants('src/main/thrift/org/apache/aurora/gen:py-thrift'),
]
)
-python_tests(name = 'instance_watcher',
- sources = ['test_instance_watcher.py', 'test_health_check.py'],
+python_tests(name = 'restarter',
+ sources = ['test_restarter.py'],
dependencies = [
pants('3rdparty/python:mox'),
- pants('src/main/python/apache/aurora/client/api:instance_watcher'),
+ pants('src/main/python/apache/aurora/client/api:restarter'),
+ pants('src/main/python/apache/aurora/common:aurora_job_key'),
pants('src/main/thrift/org/apache/aurora/gen:py-thrift'),
- ]
+ pants('src/test/python/apache/aurora/client:fake_scheduler_proxy'),
+ ],
)
-python_tests(name = 'quota_check',
- sources = ['test_quota_check.py'],
+python_tests(name = 'scheduler_client',
+ sources = ['test_scheduler_client.py'],
dependencies = [
pants('3rdparty/python:mock'),
- pants('src/main/python/apache/aurora/client/api:quota_check'),
+ pants('3rdparty/python:mox'),
+ pants('src/main/python/apache/aurora/client/api:scheduler_client'),
pants('src/main/thrift/org/apache/aurora/gen:py-thrift'),
]
)
@@ -95,13 +114,6 @@ python_tests(name = 'sla',
]
)
-python_tests(name = 'mux',
- sources = ['test_scheduler_mux.py'],
- dependencies = [
- pants('src/main/python/apache/aurora/client/api:scheduler_mux'),
- ]
-)
-
python_tests(name = 'task_util',
sources = ['test_task_util.py'],
dependencies = [
@@ -121,3 +133,11 @@ python_tests(name = 'updater',
pants('src/test/python/apache/aurora/client:fake_scheduler_proxy'),
]
)
+
+python_tests(name = 'updater_util',
+ sources = ['test_updater_util.py'],
+ dependencies = [
+ pants('src/main/python/apache/aurora/client/api:api'),
+ pants('src/main/thrift/org/apache/aurora/gen:py-thrift')
+ ],
+)
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/8d5a62cf/src/test/python/apache/aurora/client/api/test_api.py
----------------------------------------------------------------------
diff --git a/src/test/python/apache/aurora/client/api/test_api.py b/src/test/python/apache/aurora/client/api/test_api.py
new file mode 100644
index 0000000..96db25d
--- /dev/null
+++ b/src/test/python/apache/aurora/client/api/test_api.py
@@ -0,0 +1,152 @@
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+import unittest
+
+from mock import Mock
+
+from apache.aurora.client.api import AuroraClientAPI
+from apache.aurora.common.aurora_job_key import AuroraJobKey
+from apache.aurora.common.cluster import Cluster
+from apache.aurora.config import AuroraConfig
+
+from gen.apache.aurora.api.ttypes import (
+ JobKey,
+ JobUpdateRequest,
+ JobUpdateSettings,
+ Lock,
+ LockKey,
+ LockValidation,
+ Response,
+ ResponseCode,
+ Result,
+ TaskConfig
+)
+
+
+class TestJobUpdateApis(unittest.TestCase):
+ """Job update APIs tests."""
+
+ UPDATE_CONFIG = {
+ 'batch_size': 1,
+ 'restart_threshold': 50,
+ 'watch_secs': 50,
+ 'max_per_shard_failures': 2,
+ 'max_total_failures': 1,
+ 'rollback_on_failure': True,
+ 'wait_for_batch_completion': False,
+ }
+
+ @classmethod
+ def create_blank_response(cls, code, msg):
+ response = Mock(spec=Response)
+ response.responseCode = code
+ response.messageDEPRECATED = msg
+ response.result = Mock(spec=Result)
+ return response
+
+ @classmethod
+ def create_simple_success_response(cls):
+ return cls.create_blank_response(ResponseCode.OK, 'OK')
+
+ @classmethod
+ def create_error_response(cls):
+ return cls.create_blank_response(ResponseCode.ERROR, 'ERROR')
+
+ @classmethod
+ def mock_api(cls):
+ api = AuroraClientAPI(Cluster(name="foo"))
+ mock_proxy = Mock()
+ api._scheduler_proxy = mock_proxy
+ return api, mock_proxy
+
+ @classmethod
+ def create_update_settings(cls):
+ return JobUpdateSettings(
+ updateGroupSize=1,
+ maxPerInstanceFailures=2,
+ maxFailedInstances=1,
+ maxWaitToInstanceRunningMs=50,
+ minWaitInInstanceRunningMs=50,
+ rollbackOnFailure=True)
+
+ @classmethod
+ def create_update_request(cls, task_config):
+ return JobUpdateRequest(
+ jobKey=JobKey(role="role", environment="env", name="name"),
+ instanceCount=5,
+ settings=cls.create_update_settings(),
+ taskConfig=task_config)
+
+ @classmethod
+ def mock_job_config(cls, error=None):
+ config = Mock(spec=AuroraConfig)
+ mock_get = Mock()
+ mock_get.get.return_value = cls.UPDATE_CONFIG
+ if error:
+ config.update_config.side_effect = error
+ else:
+ config.update_config.return_value = mock_get
+ mock_task_config = Mock()
+ mock_task_config.taskConfig = TaskConfig()
+ config.job.return_value = mock_task_config
+ config.role.return_value = "role"
+ config.environment.return_value = "env"
+ config.name.return_value = "name"
+ config.instances.return_value = 5
+ return config
+
+ def test_acquire_lock(self):
+ """Test successful job lock creation."""
+ job_key = AuroraJobKey("foo", "role", "env", "name")
+ api, mock_proxy = self.mock_api()
+ mock_proxy.acquireLock.return_value = self.create_simple_success_response()
+ api.acquire_job_lock(job_key)
+ mock_proxy.acquireLock.assert_called_once_with(LockKey(job=job_key.to_thrift()))
+
+ def test_acquire_lock_fails_validation(self):
+ """Test acquire_job_lock fails with invalid job key."""
+ api, mock_proxy = self.mock_api()
+ self.assertRaises(AuroraClientAPI.TypeError, api.acquire_job_lock, "invalid job key")
+
+ def test_release_lock(self):
+ """Test successful lock release."""
+ lock = Lock()
+ api, mock_proxy = self.mock_api()
+ mock_proxy.releaseLock.return_value = self.create_simple_success_response()
+ api.release_job_lock(lock)
+ mock_proxy.releaseLock.assert_called_once_with(lock, LockValidation.CHECKED)
+
+ def test_release_lock_fails_validation(self):
+ """Test release_job_lock fails with invalid lock."""
+ api, mock_proxy = self.mock_api()
+ self.assertRaises(AuroraClientAPI.Error, api.release_job_lock, "invalid lock")
+
+ def test_start_job_update(self):
+ """Test successful job update start."""
+ api, mock_proxy = self.mock_api()
+ lock = Lock()
+ task_config = TaskConfig()
+ mock_proxy.startJobUpdate.return_value = self.create_simple_success_response()
+
+ api.start_job_update(self.mock_job_config(), lock)
+ mock_proxy.startJobUpdate.assert_called_once_with(self.create_update_request(task_config), lock)
+
+ def test_start_job_update_fails_parse_update_config(self):
+ """Test start_job_update fails to parse invalid UpdateConfig."""
+ api, mock_proxy = self.mock_api()
+
+ self.assertRaises(
+ AuroraClientAPI.UpdateConfigError,
+ api.start_job_update,
+ self.mock_job_config(error=ValueError()), None)
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/8d5a62cf/src/test/python/apache/aurora/client/api/test_updater_util.py
----------------------------------------------------------------------
diff --git a/src/test/python/apache/aurora/client/api/test_updater_util.py b/src/test/python/apache/aurora/client/api/test_updater_util.py
new file mode 100644
index 0000000..fe3ac49
--- /dev/null
+++ b/src/test/python/apache/aurora/client/api/test_updater_util.py
@@ -0,0 +1,44 @@
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+import unittest
+
+from apache.aurora.client.api import UpdaterConfig
+
+from gen.apache.aurora.api.ttypes import Range
+
+
+class TestRangeConversion(unittest.TestCase):
+ """Job instance ID to range conversion."""
+
+ def test_multiple_ranges(self):
+ """Test multiple ranges."""
+ ranges = [repr(e) for e in UpdaterConfig.instances_to_ranges([1, 2, 3, 5, 7, 8])]
+ assert 3 == len(ranges), "Wrong number of ranges:%s" % len(ranges)
+ assert repr(Range(first=1, last=3)) in ranges, "Missing range [1,3]"
+ assert repr(Range(first=5, last=5)) in ranges, "Missing range [5,5]"
+ assert repr(Range(first=7, last=8)) in ranges, "Missing range [7,8]"
+
+ def test_one_element(self):
+ """Test one ID in the list."""
+ ranges = [repr(e) for e in UpdaterConfig.instances_to_ranges([1])]
+ assert 1 == len(ranges), "Wrong number of ranges:%s" % len(ranges)
+ assert repr(Range(first=1, last=1)) in ranges, "Missing range [1,1]"
+
+ def test_none_list(self):
+ """Test None list produces None result."""
+ assert UpdaterConfig.instances_to_ranges(None) is None, "Result must be None."
+
+ def test_empty_list(self):
+ """Test empty list produces None result."""
+ assert UpdaterConfig.instances_to_ranges([]) is None, "Result must be None."