You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aurora.apache.org by ma...@apache.org on 2014/08/15 02:56:20 UTC

git commit: Implementing client job lock and start update APIs.

Repository: incubator-aurora
Updated Branches:
  refs/heads/master 040e71ef9 -> 8d5a62cfd


Implementing client job lock and start update APIs.

Bugs closed: AURORA-615

Reviewed at https://reviews.apache.org/r/24702/


Project: http://git-wip-us.apache.org/repos/asf/incubator-aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-aurora/commit/8d5a62cf
Tree: http://git-wip-us.apache.org/repos/asf/incubator-aurora/tree/8d5a62cf
Diff: http://git-wip-us.apache.org/repos/asf/incubator-aurora/diff/8d5a62cf

Branch: refs/heads/master
Commit: 8d5a62cfd063e437c3d117c6acb7d3eb431b3b62
Parents: 040e71e
Author: Maxim Khutornenko <ma...@apache.org>
Authored: Thu Aug 14 17:55:49 2014 -0700
Committer: Maxim Khutornenko <ma...@apache.org>
Committed: Thu Aug 14 17:55:49 2014 -0700

----------------------------------------------------------------------
 .../python/apache/aurora/client/api/__init__.py |  65 +++++++-
 .../apache/aurora/client/api/updater_util.py    |  42 +++++
 src/test/python/apache/aurora/client/api/BUILD  |  76 ++++++----
 .../python/apache/aurora/client/api/test_api.py | 152 +++++++++++++++++++
 .../aurora/client/api/test_updater_util.py      |  44 ++++++
 5 files changed, 347 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/8d5a62cf/src/main/python/apache/aurora/client/api/__init__.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/aurora/client/api/__init__.py b/src/main/python/apache/aurora/client/api/__init__.py
index 371137e..62de93b 100644
--- a/src/main/python/apache/aurora/client/api/__init__.py
+++ b/src/main/python/apache/aurora/client/api/__init__.py
@@ -24,9 +24,20 @@ from .restarter import Restarter
 from .scheduler_client import SchedulerProxy
 from .sla import Sla
 from .updater import Updater
+from .updater_util import UpdaterConfig
 
 from gen.apache.aurora.api.constants import LIVE_STATES
-from gen.apache.aurora.api.ttypes import Identity, ResourceAggregate, ResponseCode, TaskQuery
+from gen.apache.aurora.api.ttypes import (
+    Identity,
+    JobKey,
+    JobUpdateRequest,
+    Lock,
+    LockKey,
+    LockValidation,
+    ResourceAggregate,
+    ResponseCode,
+    TaskQuery
+)
 
 
 class AuroraClientAPI(object):
@@ -36,6 +47,7 @@ class AuroraClientAPI(object):
   class TypeError(Error, TypeError): pass
   class ClusterMismatch(Error, ValueError): pass
   class ThriftInternalError(Error): pass
+  class UpdateConfigError(Error): pass
 
   def __init__(self, cluster, verbose=False, session_key_factory=make_session_key):
     if not isinstance(cluster, Cluster):
@@ -84,9 +96,7 @@ class AuroraClientAPI(object):
 
   def kill_job(self, job_key, instances=None, lock=None):
     log.info("Killing tasks for job: %s" % job_key)
-    if not isinstance(job_key, AuroraJobKey):
-      raise TypeError('Expected type of job_key %r to be %s but got %s instead'
-          % (job_key, AuroraJobKey.__name__, job_key.__class__.__name__))
+    self._assert_valid_job_key(job_key)
 
     # Leave query.owner.user unset so the query doesn't filter jobs only submitted by a particular
     # user.
@@ -134,6 +144,48 @@ class AuroraClientAPI(object):
 
     return updater.update(instances)
 
+  def acquire_job_lock(self, job_key):
+    """Acquires an exclusive job lock preventing any simultaneous mutations (update, kill, etc.)
+
+    Arguments:
+    job_key - AuroraJobKey instance.
+    """
+    self._assert_valid_job_key(job_key)
+    return self._scheduler_proxy.acquireLock(LockKey(job=job_key.to_thrift()))
+
+  def release_job_lock(self, lock):
+    """Releases the previously-acquired job lock.
+
+    Requires valid Lock instance. See cancel_update if release without lock handle is required.
+
+    Arguments:
+    lock - Previously acquired Lock instance.
+    """
+    self._assert_valid_lock(lock)
+    return self._scheduler_proxy.releaseLock(lock, LockValidation.CHECKED)
+
+  def start_job_update(self, config, lock, instances=None):
+    """Requests Scheduler to start job update process.
+
+    Arguments:
+    config - AuroraConfig instance with update details.
+    lock - Job Lock instance to ensure exclusive job mutation access.
+    instances - Optional list of instances to restrict update to.
+    """
+    try:
+      settings = UpdaterConfig(**config.update_config().get()).to_thrift_update_settings(instances)
+    except ValueError as e:
+      raise self.UpdateConfigError(str(e))
+
+    request = JobUpdateRequest(
+        jobKey=JobKey(role=config.role(), environment=config.environment(), name=config.name()),
+        instanceCount=config.instances(),
+        settings=settings,
+        taskConfig=config.job().taskConfig
+    )
+
+    return self._scheduler_proxy.startJobUpdate(request, lock)
+
   def cancel_update(self, job_key):
     """Cancel the update represented by job_key. Returns whether or not the cancellation was
        successful."""
@@ -227,6 +279,11 @@ class AuroraClientAPI(object):
         min_instance_count,
         hosts)
 
+  def _assert_valid_lock(self, lock):
+    if not isinstance(lock, Lock):
+      raise self.TypeError('Invalid lock %r: expected %s but got %s'
+                           % (lock, AuroraJobKey.__name__, lock.__class__.__name__))
+
   def _assert_valid_job_key(self, job_key):
     if not isinstance(job_key, AuroraJobKey):
       raise self.TypeError('Invalid job_key %r: expected %s but got %s'

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/8d5a62cf/src/main/python/apache/aurora/client/api/updater_util.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/aurora/client/api/updater_util.py b/src/main/python/apache/aurora/client/api/updater_util.py
index cbb93e4..178c2fb 100644
--- a/src/main/python/apache/aurora/client/api/updater_util.py
+++ b/src/main/python/apache/aurora/client/api/updater_util.py
@@ -13,9 +13,13 @@
 #
 
 import collections
+from itertools import groupby
+from operator import itemgetter
 
 from twitter.common import log
 
+from gen.apache.aurora.api.ttypes import JobUpdateSettings, Range
+
 
 class UpdaterConfig(object):
   """
@@ -54,6 +58,44 @@ class UpdaterConfig(object):
     self.rollback_on_failure = rollback_on_failure
     self.wait_for_batch_completion = wait_for_batch_completion
 
+  @classmethod
+  def instances_to_ranges(cls, instances):
+    """Groups instance IDs into a set of contiguous integer ranges.
+
+    Every Range(first, last) represents a closed span of instance IDs. For example,
+    :param instances:
+    instances=[0,1,2,5,8,9] would result in the following set of ranges:
+    (Range(first=0, last=2], Range(first=5, last=5), Range(first=8, last=9))
+
+    Algorithm: http://stackoverflow.com/questions/3149440
+
+    Arguments:
+    instances - sorted list of instance IDs.
+    """
+    if not instances:
+      return None
+
+    ranges = set()
+    for _, group in groupby(enumerate(instances), lambda(element, position): element - position):
+      range_seq = map(itemgetter(1), group)
+      ranges.add(Range(first=range_seq[0], last=range_seq[-1]))
+    return ranges
+
+  def to_thrift_update_settings(self, instances=None):
+    """Converts UpdateConfig into thrift JobUpdateSettings object.
+
+    Arguments:
+    instances - optional list of instances to update.
+    """
+    return JobUpdateSettings(
+        updateGroupSize=self.batch_size,
+        maxPerInstanceFailures=self.max_per_instance_failures,
+        maxFailedInstances=self.max_total_failures,
+        maxWaitToInstanceRunningMs=self.restart_threshold,
+        minWaitInInstanceRunningMs=self.watch_secs,
+        rollbackOnFailure=self.rollback_on_failure,
+        updateOnlyTheseInstances=self.instances_to_ranges(instances) if instances else None)
+
 
 class FailureThreshold(object):
   def __init__(self, max_per_instance_failures, max_total_failures):

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/8d5a62cf/src/test/python/apache/aurora/client/api/BUILD
----------------------------------------------------------------------
diff --git a/src/test/python/apache/aurora/client/api/BUILD b/src/test/python/apache/aurora/client/api/BUILD
index db5c223..b4f08c6 100644
--- a/src/test/python/apache/aurora/client/api/BUILD
+++ b/src/test/python/apache/aurora/client/api/BUILD
@@ -14,15 +14,27 @@
 
 python_test_suite(name = 'all',
   dependencies = [
+    pants(':api'),
     pants(':disambiguator'),
+    pants(':instance_watcher'),
     pants(':job_monitor'),
+    pants(':mux'),
+    pants(':quota_check'),
     pants(':restarter'),
     pants(':scheduler_client'),
-    pants(':instance_watcher'),
-    pants(':updater'),
-    pants(':quota_check'),
     pants(':sla'),
-    pants(':mux')
+    pants(':updater'),
+    pants(':updater_util')
+  ],
+)
+
+python_tests(name = 'api',
+  sources = ['test_api.py'],
+  dependencies = [
+    pants('3rdparty/python:mock'),
+    pants('src/main/python/apache/aurora/client/api:api'),
+    pants('src/main/python/apache/aurora/client:config'),
+    pants('src/main/thrift/org/apache/aurora/gen:py-thrift')
   ],
 )
 
@@ -47,41 +59,48 @@ python_tests(name = 'job_monitor',
   ],
 )
 
-python_tests(name = 'restarter',
-  sources = ['test_restarter.py'],
+python_tests(name = 'instance_watcher',
+  sources = ['test_instance_watcher.py', 'test_health_check.py'],
   dependencies = [
     pants('3rdparty/python:mox'),
-    pants('src/main/python/apache/aurora/client/api:restarter'),
-    pants('src/main/python/apache/aurora/common:aurora_job_key'),
+    pants('src/main/python/apache/aurora/client/api:instance_watcher'),
     pants('src/main/thrift/org/apache/aurora/gen:py-thrift'),
-    pants('src/test/python/apache/aurora/client:fake_scheduler_proxy'),
-  ],
+  ]
 )
 
-python_tests(name = 'scheduler_client',
-  sources = ['test_scheduler_client.py'],
+python_tests(name = 'mux',
+  sources = ['test_scheduler_mux.py'],
+  dependencies = [
+    pants('src/main/python/apache/aurora/client/api:scheduler_mux'),
+  ]
+)
+
+python_tests(name = 'quota_check',
+  sources = ['test_quota_check.py'],
   dependencies = [
     pants('3rdparty/python:mock'),
-    pants('3rdparty/python:mox'),
-    pants('src/main/python/apache/aurora/client/api:scheduler_client'),
+    pants('src/main/python/apache/aurora/client/api:quota_check'),
     pants('src/main/thrift/org/apache/aurora/gen:py-thrift'),
   ]
 )
 
-python_tests(name = 'instance_watcher',
-  sources = ['test_instance_watcher.py', 'test_health_check.py'],
+python_tests(name = 'restarter',
+  sources = ['test_restarter.py'],
   dependencies = [
     pants('3rdparty/python:mox'),
-    pants('src/main/python/apache/aurora/client/api:instance_watcher'),
+    pants('src/main/python/apache/aurora/client/api:restarter'),
+    pants('src/main/python/apache/aurora/common:aurora_job_key'),
     pants('src/main/thrift/org/apache/aurora/gen:py-thrift'),
-  ]
+    pants('src/test/python/apache/aurora/client:fake_scheduler_proxy'),
+  ],
 )
 
-python_tests(name = 'quota_check',
-  sources = ['test_quota_check.py'],
+python_tests(name = 'scheduler_client',
+  sources = ['test_scheduler_client.py'],
   dependencies = [
     pants('3rdparty/python:mock'),
-    pants('src/main/python/apache/aurora/client/api:quota_check'),
+    pants('3rdparty/python:mox'),
+    pants('src/main/python/apache/aurora/client/api:scheduler_client'),
     pants('src/main/thrift/org/apache/aurora/gen:py-thrift'),
   ]
 )
@@ -95,13 +114,6 @@ python_tests(name = 'sla',
   ]
 )
 
-python_tests(name = 'mux',
-  sources = ['test_scheduler_mux.py'],
-  dependencies = [
-    pants('src/main/python/apache/aurora/client/api:scheduler_mux'),
-  ]
-)
-
 python_tests(name = 'task_util',
   sources = ['test_task_util.py'],
   dependencies = [
@@ -121,3 +133,11 @@ python_tests(name = 'updater',
     pants('src/test/python/apache/aurora/client:fake_scheduler_proxy'),
   ]
 )
+
+python_tests(name = 'updater_util',
+  sources = ['test_updater_util.py'],
+  dependencies = [
+    pants('src/main/python/apache/aurora/client/api:api'),
+    pants('src/main/thrift/org/apache/aurora/gen:py-thrift')
+  ],
+)

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/8d5a62cf/src/test/python/apache/aurora/client/api/test_api.py
----------------------------------------------------------------------
diff --git a/src/test/python/apache/aurora/client/api/test_api.py b/src/test/python/apache/aurora/client/api/test_api.py
new file mode 100644
index 0000000..96db25d
--- /dev/null
+++ b/src/test/python/apache/aurora/client/api/test_api.py
@@ -0,0 +1,152 @@
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+import unittest
+
+from mock import Mock
+
+from apache.aurora.client.api import AuroraClientAPI
+from apache.aurora.common.aurora_job_key import AuroraJobKey
+from apache.aurora.common.cluster import Cluster
+from apache.aurora.config import AuroraConfig
+
+from gen.apache.aurora.api.ttypes import (
+    JobKey,
+    JobUpdateRequest,
+    JobUpdateSettings,
+    Lock,
+    LockKey,
+    LockValidation,
+    Response,
+    ResponseCode,
+    Result,
+    TaskConfig
+)
+
+
+class TestJobUpdateApis(unittest.TestCase):
+  """Job update APIs tests."""
+
+  UPDATE_CONFIG = {
+      'batch_size': 1,
+      'restart_threshold': 50,
+      'watch_secs': 50,
+      'max_per_shard_failures': 2,
+      'max_total_failures': 1,
+      'rollback_on_failure': True,
+      'wait_for_batch_completion': False,
+  }
+
+  @classmethod
+  def create_blank_response(cls, code, msg):
+    response = Mock(spec=Response)
+    response.responseCode = code
+    response.messageDEPRECATED = msg
+    response.result = Mock(spec=Result)
+    return response
+
+  @classmethod
+  def create_simple_success_response(cls):
+    return cls.create_blank_response(ResponseCode.OK, 'OK')
+
+  @classmethod
+  def create_error_response(cls):
+    return cls.create_blank_response(ResponseCode.ERROR, 'ERROR')
+
+  @classmethod
+  def mock_api(cls):
+    api = AuroraClientAPI(Cluster(name="foo"))
+    mock_proxy = Mock()
+    api._scheduler_proxy = mock_proxy
+    return api, mock_proxy
+
+  @classmethod
+  def create_update_settings(cls):
+    return JobUpdateSettings(
+        updateGroupSize=1,
+        maxPerInstanceFailures=2,
+        maxFailedInstances=1,
+        maxWaitToInstanceRunningMs=50,
+        minWaitInInstanceRunningMs=50,
+        rollbackOnFailure=True)
+
+  @classmethod
+  def create_update_request(cls, task_config):
+    return JobUpdateRequest(
+        jobKey=JobKey(role="role", environment="env", name="name"),
+        instanceCount=5,
+        settings=cls.create_update_settings(),
+        taskConfig=task_config)
+
+  @classmethod
+  def mock_job_config(cls, error=None):
+    config = Mock(spec=AuroraConfig)
+    mock_get = Mock()
+    mock_get.get.return_value = cls.UPDATE_CONFIG
+    if error:
+      config.update_config.side_effect = error
+    else:
+      config.update_config.return_value = mock_get
+    mock_task_config = Mock()
+    mock_task_config.taskConfig = TaskConfig()
+    config.job.return_value = mock_task_config
+    config.role.return_value = "role"
+    config.environment.return_value = "env"
+    config.name.return_value = "name"
+    config.instances.return_value = 5
+    return config
+
+  def test_acquire_lock(self):
+    """Test successful job lock creation."""
+    job_key = AuroraJobKey("foo", "role", "env", "name")
+    api, mock_proxy = self.mock_api()
+    mock_proxy.acquireLock.return_value = self.create_simple_success_response()
+    api.acquire_job_lock(job_key)
+    mock_proxy.acquireLock.assert_called_once_with(LockKey(job=job_key.to_thrift()))
+
+  def test_acquire_lock_fails_validation(self):
+    """Test acquire_job_lock fails with invalid job key."""
+    api, mock_proxy = self.mock_api()
+    self.assertRaises(AuroraClientAPI.TypeError, api.acquire_job_lock, "invalid job key")
+
+  def test_release_lock(self):
+    """Test successful lock release."""
+    lock = Lock()
+    api, mock_proxy = self.mock_api()
+    mock_proxy.releaseLock.return_value = self.create_simple_success_response()
+    api.release_job_lock(lock)
+    mock_proxy.releaseLock.assert_called_once_with(lock, LockValidation.CHECKED)
+
+  def test_release_lock_fails_validation(self):
+    """Test release_job_lock fails with invalid lock."""
+    api, mock_proxy = self.mock_api()
+    self.assertRaises(AuroraClientAPI.Error, api.release_job_lock, "invalid lock")
+
+  def test_start_job_update(self):
+    """Test successful job update start."""
+    api, mock_proxy = self.mock_api()
+    lock = Lock()
+    task_config = TaskConfig()
+    mock_proxy.startJobUpdate.return_value = self.create_simple_success_response()
+
+    api.start_job_update(self.mock_job_config(), lock)
+    mock_proxy.startJobUpdate.assert_called_once_with(self.create_update_request(task_config), lock)
+
+  def test_start_job_update_fails_parse_update_config(self):
+    """Test start_job_update fails to parse invalid UpdateConfig."""
+    api, mock_proxy = self.mock_api()
+
+    self.assertRaises(
+        AuroraClientAPI.UpdateConfigError,
+        api.start_job_update,
+        self.mock_job_config(error=ValueError()), None)

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/8d5a62cf/src/test/python/apache/aurora/client/api/test_updater_util.py
----------------------------------------------------------------------
diff --git a/src/test/python/apache/aurora/client/api/test_updater_util.py b/src/test/python/apache/aurora/client/api/test_updater_util.py
new file mode 100644
index 0000000..fe3ac49
--- /dev/null
+++ b/src/test/python/apache/aurora/client/api/test_updater_util.py
@@ -0,0 +1,44 @@
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+import unittest
+
+from apache.aurora.client.api import UpdaterConfig
+
+from gen.apache.aurora.api.ttypes import Range
+
+
+class TestRangeConversion(unittest.TestCase):
+  """Job instance ID to range conversion."""
+
+  def test_multiple_ranges(self):
+    """Test multiple ranges."""
+    ranges = [repr(e) for e in UpdaterConfig.instances_to_ranges([1, 2, 3, 5, 7, 8])]
+    assert 3 == len(ranges), "Wrong number of ranges:%s" % len(ranges)
+    assert repr(Range(first=1, last=3)) in ranges, "Missing range [1,3]"
+    assert repr(Range(first=5, last=5)) in ranges, "Missing range [5,5]"
+    assert repr(Range(first=7, last=8)) in ranges, "Missing range [7,8]"
+
+  def test_one_element(self):
+    """Test one ID in the list."""
+    ranges = [repr(e) for e in UpdaterConfig.instances_to_ranges([1])]
+    assert 1 == len(ranges), "Wrong number of ranges:%s" % len(ranges)
+    assert repr(Range(first=1, last=1)) in ranges, "Missing range [1,1]"
+
+  def test_none_list(self):
+    """Test None list produces None result."""
+    assert UpdaterConfig.instances_to_ranges(None) is None, "Result must be None."
+
+  def test_empty_list(self):
+    """Test empty list produces None result."""
+    assert UpdaterConfig.instances_to_ranges([]) is None, "Result must be None."