You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aurora.apache.org by ma...@apache.org on 2013/12/10 01:35:37 UTC

git commit: Enable integration with hooks on a failed update.

Updated Branches:
  refs/heads/master 1b6f1b8fe -> de69dc59d


Enable integration with hooks on a failed update.

Adding support for err hooks when update is rolled back or failed. Hooks use thrift ResponseCode to detect operation failure, so updater has to simulate an ERROR response code to properly support error hooks.

Also, failed_instances no longer tell the truth as far as update status goes. Converted to use failure count check to determine update success.

Testing Done:
./pants src/test/python/twitter/aurora/client/api:all

Reviewed at https://reviews.apache.org/r/16139/


Project: http://git-wip-us.apache.org/repos/asf/incubator-aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-aurora/commit/de69dc59
Tree: http://git-wip-us.apache.org/repos/asf/incubator-aurora/tree/de69dc59
Diff: http://git-wip-us.apache.org/repos/asf/incubator-aurora/diff/de69dc59

Branch: refs/heads/master
Commit: de69dc59dd8eb12803e227090cef9a395aa358f3
Parents: 1b6f1b8
Author: Maxim Khutornenko <ma...@apache.org>
Authored: Mon Dec 9 16:34:10 2013 -0800
Committer: Maxim Khutornenko <mk...@twitter.com>
Committed: Mon Dec 9 16:34:10 2013 -0800

----------------------------------------------------------------------
 .../python/twitter/aurora/client/api/updater.py | 45 +++++-----
 .../twitter/aurora/client/api/test_updater.py   | 90 ++++++++++++++------
 2 files changed, 81 insertions(+), 54 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/de69dc59/src/main/python/twitter/aurora/client/api/updater.py
----------------------------------------------------------------------
diff --git a/src/main/python/twitter/aurora/client/api/updater.py b/src/main/python/twitter/aurora/client/api/updater.py
index 24c811b..32bd6a1 100644
--- a/src/main/python/twitter/aurora/client/api/updater.py
+++ b/src/main/python/twitter/aurora/client/api/updater.py
@@ -27,8 +27,6 @@ class Updater(object):
   """Update the instances of a job in batches."""
 
   class Error(Exception): pass
-  class InvalidConfigError(Error): pass
-  class InvalidStateError(Error): pass
 
   InstanceState = namedtuple('InstanceState', ['instance_id', 'is_updated'])
   OperationConfigs = namedtuple('OperationConfigs', ['from_config', 'to_config'])
@@ -45,7 +43,7 @@ class Updater(object):
     try:
       self._update_config = UpdaterConfig(**config.update_config().get())
     except ValueError as e:
-      raise self.InvalidConfigError(str(e))
+      raise self.Error(str(e))
     self._lock = None
     self._watcher = instance_watcher or InstanceWatcher(
         self._scheduler,
@@ -109,7 +107,6 @@ class Updater(object):
     ]
 
     log.info('Starting job update.')
-    failed_instances = set()
     while remaining_instances and not failure_threshold.is_failed_update():
       batch_instances = remaining_instances[0 : self._update_config.batch_size]
       remaining_instances = list(set(remaining_instances) - set(batch_instances))
@@ -139,14 +136,14 @@ class Updater(object):
       ]
       remaining_instances.sort(key=lambda tup: tup.instance_id)
 
-    if failed_instances:
+    if failure_threshold.is_failed_update():
       untouched_instances = [s.instance_id for s in remaining_instances if not s.is_updated]
       instances_to_rollback = list(
           set(instance_configs.instances_to_process) - set(untouched_instances)
       )
       self._rollback(instances_to_rollback, instance_configs)
 
-    return failed_instances
+    return not failure_threshold.is_failed_update()
 
   def _rollback(self, instances_to_rollback, instance_configs):
     """Performs a rollback operation for the failed instances.
@@ -201,7 +198,7 @@ class Updater(object):
       elif not from_config and to_config:
         to_add.append(instance_id)
       else:
-        raise self.InvalidStateError('Instance %s is outside of supported range' % instance_id)
+        raise self.Error('Instance %s is outside of supported range' % instance_id)
 
     return to_kill, to_add
 
@@ -303,7 +300,7 @@ class Updater(object):
       instances_to_process = instances
       unrecognized = list(set(instances) - set(instance_superset))
       if unrecognized:
-        raise self.InvalidConfigError('Instances %s are outside of supported range' % unrecognized)
+        raise self.Error('Instances %s are outside of supported range' % unrecognized)
 
     # Populate local config map
     local_config_map = dict.fromkeys(job_config_instances, local_task_config)
@@ -350,6 +347,9 @@ class Updater(object):
         statuses=ACTIVE_STATES,
         instanceIds=instanceIds)
 
+  def _failed_response(self, message):
+    return Response(responseCode=ResponseCode.ERROR, message=message)
+
   def update(self, instances=None):
     """Performs the job update, blocking until it completes.
     A rollback will be performed if the update was considered a failure based on the
@@ -368,20 +368,17 @@ class Updater(object):
       # Handle cron jobs separately from other jobs.
       if self._replace_template_if_cron():
         log.info('Cron template updated, next run will reflect changes')
+        return self._finish()
       else:
-        failed_instances = self._update(instances)
-        if failed_instances:
-          log.error('Update reverted, failures detected on instances %s' % failed_instances)
+        if not self._update(instances):
+          log.warn('Update failures threshold reached')
+          self._finish()
+          return self._failed_response('Update reverted')
         else:
           log.info('Update successful')
-      resp = self._finish()
-    except (self.InvalidConfigError, self.InvalidStateError) as e:
-      log.error(e)
-      log.error('Aborting update without rollback!')
-    except self.Error:
-      pass  # Error is already logged in _check_and_log_response()
-
-    return resp
+          return self._finish()
+    except self.Error as e:
+      return self._failed_response('Aborting update without rollback! Fatal error: %s' % e)
 
   @classmethod
   def cancel_update(cls, scheduler, job_key):
@@ -397,9 +394,8 @@ class Updater(object):
         Lock(key=LockKey(job=job_key.to_thrift())),
         LockValidation.UNCHECKED)
 
-  @classmethod
-  def _check_and_log_response(cls, resp):
-    """Checks scheduler return status, logs and raises Error in case of unexpected response.
+  def _check_and_log_response(self, resp):
+    """Checks scheduler return status, raises Error in case of unexpected response.
 
     Arguments:
     resp -- scheduler response object.
@@ -410,7 +406,4 @@ class Updater(object):
     if resp.responseCode == ResponseCode.OK:
       log.debug('Response from scheduler: %s (message: %s)' % (name, message))
     else:
-      e = cls.Error('Unexpected response from scheduler: %s (message: %s)' % (name, message))
-      log.error(e)
-      log.error('Aborting update without rollback!!!')
-      raise e
+      raise self.Error(message)

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/de69dc59/src/test/python/twitter/aurora/client/api/test_updater.py
----------------------------------------------------------------------
diff --git a/src/test/python/twitter/aurora/client/api/test_updater.py b/src/test/python/twitter/aurora/client/api/test_updater.py
index 3c148d3..0b60d9e 100644
--- a/src/test/python/twitter/aurora/client/api/test_updater.py
+++ b/src/test/python/twitter/aurora/client/api/test_updater.py
@@ -186,10 +186,6 @@ class UpdaterTest(TestCase):
         LockValidation.CHECKED,
         self._session_key).AndReturn(response)
 
-  def assert_response_code(self, expected_code, actual_resp):
-    assert expected_code == actual_resp.responseCode, (
-      'Expected response:%s Actual response:%s' % (expected_code, actual_resp.responseCode))
-
   def make_task_configs(self, count=1):
     return [TaskConfig(
         owner=Identity(role=self._job_key.role),
@@ -216,6 +212,14 @@ class UpdaterTest(TestCase):
         instanceCount=instance_count
     )
 
+  def update_and_expect_ok(self, instances=None):
+    self.update_and_expect_response(ResponseCode.OK, instances)
+
+  def update_and_expect_response(self, expected_code, instances=None):
+    resp = self._updater.update(instances)
+    assert expected_code == resp.responseCode, (
+      'Expected response:%s Actual response:%s' % (expected_code, resp.responseCode))
+
   def test_grow(self):
     """Adds instances to the existing job."""
     old_configs = self.make_task_configs(3)
@@ -232,7 +236,7 @@ class UpdaterTest(TestCase):
     self.expect_finish()
     self.replay_mocks()
 
-    self._updater.update()
+    self.update_and_expect_ok()
     self.verify_mocks()
 
   def test_shrink(self):
@@ -250,7 +254,7 @@ class UpdaterTest(TestCase):
     self.expect_finish()
     self.replay_mocks()
 
-    self._updater.update()
+    self.update_and_expect_ok()
     self.verify_mocks()
 
   def test_update_and_grow(self):
@@ -273,7 +277,7 @@ class UpdaterTest(TestCase):
     self.expect_finish()
     self.replay_mocks()
 
-    self._updater.update()
+    self.update_and_expect_ok()
     self.verify_mocks()
 
   def test_update_and_shrink(self):
@@ -295,7 +299,7 @@ class UpdaterTest(TestCase):
     self.expect_finish()
     self.replay_mocks()
 
-    self._updater.update()
+    self.update_and_expect_ok()
     self.verify_mocks()
 
   def test_update_instances(self):
@@ -317,7 +321,7 @@ class UpdaterTest(TestCase):
     self.expect_finish()
     self.replay_mocks()
 
-    self._updater.update()
+    self.update_and_expect_ok()
     self.verify_mocks()
 
   def test_grow_with_instance_option(self):
@@ -334,7 +338,7 @@ class UpdaterTest(TestCase):
     self.expect_finish()
     self.replay_mocks()
 
-    self._updater.update([3, 4])
+    self.update_and_expect_ok(instances=[3, 4])
     self.verify_mocks()
 
   def test_shrink_with_instance_option(self):
@@ -351,7 +355,7 @@ class UpdaterTest(TestCase):
     self.expect_finish()
     self.replay_mocks()
 
-    self._updater.update([4, 5, 6, 7, 8, 9])
+    self.update_and_expect_ok(instances=[4, 5, 6, 7, 8, 9])
     self.verify_mocks()
 
   def test_update_with_instance_option(self):
@@ -370,7 +374,7 @@ class UpdaterTest(TestCase):
     self.expect_finish()
     self.replay_mocks()
 
-    self._updater.update([2, 3, 4])
+    self.update_and_expect_ok(instances=[2, 3, 4])
     self.verify_mocks()
 
 
@@ -388,7 +392,7 @@ class UpdaterTest(TestCase):
     self.expect_finish()
     self.replay_mocks()
 
-    self._updater.update([2, 3])
+    self.update_and_expect_ok([2, 3])
     self.verify_mocks()
 
   def test_noop_update(self):
@@ -403,6 +407,9 @@ class UpdaterTest(TestCase):
     self.expect_finish()
     self.replay_mocks()
 
+    self.update_and_expect_ok()
+    self.verify_mocks()
+
   def test_update_rollback(self):
     """Update process failures exceed total allowable count and update is rolled back."""
     update_config = self.UPDATE_CONFIG.copy()
@@ -428,7 +435,7 @@ class UpdaterTest(TestCase):
     self.expect_finish()
     self.replay_mocks()
 
-    self._updater.update()
+    self.update_and_expect_response(ResponseCode.ERROR)
     self.verify_mocks()
 
   def test_update_rollback_sorted(self):
@@ -468,7 +475,7 @@ class UpdaterTest(TestCase):
     self.expect_finish()
     self.replay_mocks()
 
-    self._updater.update()
+    self.update_and_expect_response(ResponseCode.ERROR)
     self.verify_mocks()
 
   def test_update_after_restart(self):
@@ -496,7 +503,7 @@ class UpdaterTest(TestCase):
     self.expect_finish()
     self.replay_mocks()
 
-    self._updater.update()
+    self.update_and_expect_ok()
     self.verify_mocks()
 
   def test_update_cron_job(self):
@@ -509,7 +516,7 @@ class UpdaterTest(TestCase):
     self.expect_finish()
     self.replay_mocks()
 
-    self._updater.update()
+    self.update_and_expect_ok()
     self.verify_mocks()
 
   def test_start_invalid_response(self):
@@ -517,8 +524,7 @@ class UpdaterTest(TestCase):
     self.expect_start(response_code=ResponseCode.INVALID_REQUEST)
     self.replay_mocks()
 
-    resp = self._updater.update()
-    self.assert_response_code(ResponseCode.INVALID_REQUEST, resp)
+    self.update_and_expect_response(ResponseCode.INVALID_REQUEST)
     self.verify_mocks()
 
   def test_finish_invalid_response(self):
@@ -531,29 +537,28 @@ class UpdaterTest(TestCase):
     self.expect_finish(response_code=ResponseCode.INVALID_REQUEST)
     self.replay_mocks()
 
-    resp = self._updater.update()
-    self.assert_response_code(ResponseCode.INVALID_REQUEST, resp)
+    self.update_and_expect_response(ResponseCode.INVALID_REQUEST)
     self.verify_mocks()
 
   def test_invalid_batch_size(self):
     """Test for out of range error for batch size."""
     update_config = self.UPDATE_CONFIG.copy()
     update_config.update(batch_size=0)
-    with raises(Updater.InvalidConfigError):
+    with raises(Updater.Error):
       self.init_updater(update_config)
 
   def test_invalid_restart_threshold(self):
     """Test for out of range error for restart threshold."""
     update_config = self.UPDATE_CONFIG.copy()
     update_config.update(restart_threshold=0)
-    with raises(Updater.InvalidConfigError):
+    with raises(Updater.Error):
       self.init_updater(update_config)
 
   def test_invalid_watch_secs(self):
     """Test for out of range error for watch secs."""
     update_config = self.UPDATE_CONFIG.copy()
     update_config.update(watch_secs=0)
-    with raises(Updater.InvalidConfigError):
+    with raises(Updater.Error):
       self.init_updater(update_config)
 
   def test_update_invalid_response(self):
@@ -566,7 +571,7 @@ class UpdaterTest(TestCase):
     self.expect_get_tasks(old_configs, response_code=ResponseCode.INVALID_REQUEST)
     self.replay_mocks()
 
-    self._updater.update()
+    self.update_and_expect_response(ResponseCode.ERROR)
     self.verify_mocks()
 
   def test_instances_outside_range(self):
@@ -580,13 +585,13 @@ class UpdaterTest(TestCase):
     self.expect_populate(job_config)
     self.replay_mocks()
 
-    self._updater.update([3, 4])
+    self.update_and_expect_response(ResponseCode.ERROR, instances=[3, 4])
     self.verify_mocks()
 
   def test_update_skips_unretryable(self):
     """Update process skips instances exceeding max_per_shard_failures"""
     update_config = self.UPDATE_CONFIG.copy()
-    update_config.update(max_total_failures=2, max_per_shard_failures=2)
+    update_config.update(max_total_failures=1, max_per_shard_failures=2)
     self.init_updater(update_config)
 
     old_configs = self.make_task_configs(10)
@@ -614,6 +619,35 @@ class UpdaterTest(TestCase):
     self.expect_finish()
     self.replay_mocks()
 
-    self._updater.update()
+    self.update_and_expect_ok()
+    self.verify_mocks()
+
+  def test_failed_unretryable_do_not_cause_rollback(self):
+    """Update process still succeeds if failed instances in last batch are within allowed limit."""
+    update_config = self.UPDATE_CONFIG.copy()
+    update_config.update(max_total_failures=1, max_per_shard_failures=2)
+    self.init_updater(update_config)
+
+    old_configs = self.make_task_configs(5)
+    new_config = deepcopy(old_configs[0])
+    new_config.priority = 5
+    job_config = self.make_job_config(new_config, 5)
+    self._config.job_config = job_config
+    self.expect_start()
+    self.expect_get_tasks(old_configs)
+    self.expect_populate(job_config)
+    self.expect_kill([0, 1, 2])
+    self.expect_add([0, 1, 2], new_config)
+    self.expect_watch_instances([0, 1, 2], failed_instances=[0])
+    self.expect_restart([0])
+    self.expect_kill([3, 4])
+    self.expect_add([3, 4], new_config)
+    self.expect_watch_instances([0, 3, 4], failed_instances=[0])
+    self.expect_restart([0])
+    self.expect_watch_instances([0], failed_instances=[0])
+    self.expect_finish()
+    self.replay_mocks()
+
+    self.update_and_expect_ok()
     self.verify_mocks()