You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aurora.apache.org by jc...@apache.org on 2016/09/30 19:54:35 UTC

aurora git commit: Modify executor state transition logic to rely on health checks (if enabled).

Repository: aurora
Updated Branches:
  refs/heads/master 59b4d319b -> ca683cb9e


Modify executor state transition logic to rely on health checks (if enabled).

[Summary]
Executor needs to start executing user content in STARTING and transition to RUNNING when a
successful required number of health checks is reached.

[Background]
Please see this epic: https://issues.apache.org/jira/browse/AURORA-1225
and the design doc: https://docs.google.com/document/d/1ZdgW8S4xMhvKW7iQUX99xZm10NXSxEWR0a-21FP5d94/edit#
for more details and background.

[Description]
If health check is enabled on vCurrent executor, the health checker will send a "TASK_RUNNING"
message when a successful required number of health checks is reached within the
initial_interval_secs. On the other hand, a "TASK_FAILED" message was sent if the health checker
fails to reach the required number of health checks within that period, or a maximum number of
failed health check limit is reached after the initital_interval_secs.

If health check is disabled on the vCurrent executor, it will sends "TASK_RUNNING" message to
scheduler after the thermos runner was started. In this scenario, the behavior of vCurrent executor
will be the same as the vPrev executor.

[Change List]
The current change set includes:
1. Removed the status memoization in ChainedStatusChecker.
2. Modified the StatusManager to be edge triggered.
3. Changed the Aurora Executor callback function.
4. Modified the Health Checker and redefined the meaning initial_interval_secs.

Bugs closed: AURORA-1225

Reviewed at https://reviews.apache.org/r/51876/


Project: http://git-wip-us.apache.org/repos/asf/aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/aurora/commit/ca683cb9
Tree: http://git-wip-us.apache.org/repos/asf/aurora/tree/ca683cb9
Diff: http://git-wip-us.apache.org/repos/asf/aurora/diff/ca683cb9

Branch: refs/heads/master
Commit: ca683cb9e27bae76424a687bc6c3af5a73c501b9
Parents: 59b4d31
Author: Kai Huang <te...@hotmail.com>
Authored: Fri Sep 30 14:54:15 2016 -0500
Committer: Joshua Cohen <jc...@apache.org>
Committed: Fri Sep 30 14:54:15 2016 -0500

----------------------------------------------------------------------
 .../apache/aurora/executor/aurora_executor.py   | 46 ++++++++++----
 .../aurora/executor/common/health_checker.py    | 59 ++++++++++++-----
 .../aurora/executor/common/status_checker.py    | 29 +++++----
 .../apache/aurora/executor/status_manager.py    | 33 ++++++++--
 .../executor/common/test_health_checker.py      | 67 +++++++++++++-------
 .../executor/common/test_status_checker.py      | 37 ++++++++++-
 .../aurora/executor/test_status_manager.py      | 28 ++++----
 .../aurora/executor/test_thermos_executor.py    | 44 ++++++++++---
 8 files changed, 250 insertions(+), 93 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/aurora/blob/ca683cb9/src/main/python/apache/aurora/executor/aurora_executor.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/aurora/executor/aurora_executor.py b/src/main/python/apache/aurora/executor/aurora_executor.py
index ce5ef68..d30f2d3 100644
--- a/src/main/python/apache/aurora/executor/aurora_executor.py
+++ b/src/main/python/apache/aurora/executor/aurora_executor.py
@@ -22,6 +22,8 @@ from twitter.common.concurrent import Timeout, deadline, defer
 from twitter.common.metrics import Observable
 from twitter.common.quantity import Amount, Time
 
+from apache.aurora.executor.common.health_checker import HealthChecker
+
 from .common.kill_manager import KillManager
 from .common.sandbox import DefaultSandboxProvider
 from .common.status_checker import ChainedStatusChecker
@@ -94,7 +96,7 @@ class AuroraExecutor(ExecutorBase, Observable):
         - Set up necessary HealthCheckers
         - Set up StatusManager, and attach HealthCheckers
     """
-    self.send_update(driver, self._task_id, mesos_pb2.TASK_STARTING, 'Initializing sandbox.')
+    self.send_update(driver, self._task_id, mesos_pb2.TASK_STARTING, 'Starting task execution.')
 
     if not self._initialize_sandbox(driver, assigned_task, mounted_volume_paths):
       return
@@ -115,10 +117,27 @@ class AuroraExecutor(ExecutorBase, Observable):
     if not self._start_runner(driver, assigned_task):
       return
 
-    self.send_update(driver, self._task_id, mesos_pb2.TASK_RUNNING)
+    is_health_check_enabled = False
+    status_checkers = []
+    try:
+      for status_provider in self._status_providers:
+        status_checker = status_provider.from_assigned_task(assigned_task, self._sandbox)
+        if status_checker is None:
+          continue
+        else:
+          if isinstance(status_checker, HealthChecker):
+            is_health_check_enabled = True
+          status_checkers.append(status_checker)
+    except Exception as e:
+      log.error(traceback.format_exc())
+      self._die(driver, mesos_pb2.TASK_FAILED, "Failed to set up status checker: %s" % e)
+      return
+
+    if not is_health_check_enabled:
+      self.send_update(driver, self._task_id, mesos_pb2.TASK_RUNNING)
 
     try:
-      self._start_status_manager(driver, assigned_task)
+      self._start_status_manager(status_checkers)
     except Exception:
       log.error(traceback.format_exc())
       self._die(driver, mesos_pb2.TASK_FAILED, "Internal error")
@@ -162,15 +181,9 @@ class AuroraExecutor(ExecutorBase, Observable):
 
     return True
 
-  def _start_status_manager(self, driver, assigned_task):
-    status_checkers = [self._kill_manager]
-    self.metrics.register_observable(self._kill_manager.name(), self._kill_manager)
-
-    for status_provider in self._status_providers:
-      status_checker = status_provider.from_assigned_task(assigned_task, self._sandbox)
-      if status_checker is None:
-        continue
-      status_checkers.append(status_checker)
+  def _start_status_manager(self, status_checkers):
+    status_checkers = [self._kill_manager] + status_checkers
+    for status_checker in status_checkers:
       self.metrics.register_observable(status_checker.name(), status_checker)
 
     self._chained_checker = ChainedStatusChecker(status_checkers)
@@ -178,8 +191,12 @@ class AuroraExecutor(ExecutorBase, Observable):
 
     # chain the runner to the other checkers, but do not chain .start()/.stop()
     complete_checker = ChainedStatusChecker([self._runner, self._chained_checker])
+
     self._status_manager = self._status_manager_class(
-        complete_checker, self._shutdown, clock=self._clock)
+        complete_checker,
+        running=self._running,
+        shutdown=self._shutdown,
+        clock=self._clock)
     self._status_manager.start()
     self.status_manager_started.set()
 
@@ -197,6 +214,9 @@ class AuroraExecutor(ExecutorBase, Observable):
     self.log('Activating kill manager.')
     self._kill_manager.kill(reason)
 
+  def _running(self, status_result):
+    self.send_update(self._driver, self._task_id, status_result.status, status_result.reason)
+
   def _shutdown(self, status_result):
     runner_status = self._runner.status
 

http://git-wip-us.apache.org/repos/asf/aurora/blob/ca683cb9/src/main/python/apache/aurora/executor/common/health_checker.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/aurora/executor/common/health_checker.py b/src/main/python/apache/aurora/executor/common/health_checker.py
index 3c7c09d..03fbffd 100644
--- a/src/main/python/apache/aurora/executor/common/health_checker.py
+++ b/src/main/python/apache/aurora/executor/common/health_checker.py
@@ -18,7 +18,7 @@ import threading
 import time
 import traceback
 
-from mesos.interface.mesos_pb2 import TaskState
+from mesos.interface import mesos_pb2
 from pystachio import Environment, String
 from twitter.common import log
 from twitter.common.exceptions import ExceptionalThread
@@ -51,6 +51,7 @@ class ThreadedHealthChecker(ExceptionalThread):
       interval_secs,
       initial_interval_secs,
       max_consecutive_failures,
+      min_consecutive_successes,
       clock):
     """
     :param health_checker: health checker to confirm service health
@@ -59,10 +60,12 @@ class ThreadedHealthChecker(ExceptionalThread):
     :type sandbox: DirectorySandbox
     :param interval_secs: delay between checks
     :type interval_secs: int
-    :param initial_interval_secs: seconds to wait before starting checks
+    :param initial_interval_secs: seconds to wait before marking health check passed
     :type initial_interval_secs: int
     :param max_consecutive_failures: number of failures to allow before marking dead
     :type max_consecutive_failures: int
+    :param min_consecutive_successes: number of successes before marking health check passed
+    :type min_consecutive_successes: int
     :param clock: time module available to be mocked for testing
     :type clock: time module
     """
@@ -70,11 +73,16 @@ class ThreadedHealthChecker(ExceptionalThread):
     self.sandbox = sandbox
     self.clock = clock
     self.current_consecutive_failures = 0
+    self.current_consecutive_successes = 0
     self.dead = threading.Event()
     self.interval = interval_secs
     self.max_consecutive_failures = max_consecutive_failures
+    self.min_consecutive_successes = min_consecutive_successes
     self.snooze_file = None
     self.snoozed = False
+    self._expired = False
+    self.start_time = 0
+    self.health_check_passed = False
 
     if self.sandbox and self.sandbox.exists():
       self.snooze_file = os.path.join(self.sandbox.root, '.healthchecksnooze')
@@ -84,10 +92,8 @@ class ThreadedHealthChecker(ExceptionalThread):
     else:
       self.initial_interval = interval_secs * 2
 
-    if self.initial_interval > 0:
-      self.healthy, self.reason = True, None
-    else:
-      self.healthy, self.reason = self._perform_check_if_not_disabled()
+    self.healthy, self.reason = True, None
+
     super(ThreadedHealthChecker, self).__init__()
     self.daemon = True
 
@@ -110,21 +116,35 @@ class ThreadedHealthChecker(ExceptionalThread):
   def _maybe_update_failure_count(self, is_healthy, reason):
     if not is_healthy:
       log.warning('Health check failure: %s' % reason)
+      self.reason = reason
+      if self.current_consecutive_successes > 0:
+        log.debug('Reset consecutive successes counter.')
+      self.current_consecutive_successes = 0
       self.current_consecutive_failures += 1
-      if self.current_consecutive_failures > self.max_consecutive_failures:
-        log.warning('Reached consecutive failure limit.')
-        self.healthy = False
-        self.reason = reason
     else:
       if self.current_consecutive_failures > 0:
         log.debug('Reset consecutive failures counter.')
       self.current_consecutive_failures = 0
+      self.current_consecutive_successes += 1
+
+    if not self._expired:
+      if self.clock.time() - self.start_time > self.initial_interval:
+        log.debug('Initial interval expired.')
+        self._expired = True
+        if not self.health_check_passed:
+          log.warning('Failed to reach minimum consecutive successes.')
+          self.healthy = False
+      else:
+        if self.current_consecutive_successes >= self.min_consecutive_successes:
+          log.info('Reached minimum consecutive successes.')
+          self.health_check_passed = True
+
+    if self._expired and self.healthy:
+      self.healthy = self.current_consecutive_failures <= self.max_consecutive_failures
 
   def run(self):
     log.debug('Health checker thread started.')
-    if self.initial_interval > 0:
-      self.clock.sleep(self.initial_interval)
-    log.debug('Initial interval expired.')
+    self.start_time = self.clock.time()
     while not self.dead.is_set():
       is_healthy, reason = self._perform_check_if_not_disabled()
       self._maybe_update_failure_count(is_healthy, reason)
@@ -148,6 +168,8 @@ class HealthChecker(StatusChecker):
     Exported metrics:
       health_checker.consecutive_failures: Number of consecutive failures observed.  Resets
         to zero on successful health check.
+      health_checker.consecutive_successes: Number of consecutive successes observed.  Resets
+        to zero on failed health check.
       health_checker.snoozed: Returns 1 if the health checker is snoozed, 0 if not.
       health_checker.total_latency_secs: Total time waiting for the health checker to respond in
         seconds. To get average latency, use health_checker.total_latency / health_checker.checks.
@@ -160,6 +182,7 @@ class HealthChecker(StatusChecker):
                interval_secs=10,
                initial_interval_secs=None,
                max_consecutive_failures=0,
+               min_consecutive_successes=1,
                clock=time):
     self._health_checks = 0
     self._total_latency = 0
@@ -171,9 +194,12 @@ class HealthChecker(StatusChecker):
         interval_secs,
         initial_interval_secs,
         max_consecutive_failures,
+        min_consecutive_successes,
         clock)
     self.metrics.register(LambdaGauge('consecutive_failures',
         lambda: self.threaded_health_checker.current_consecutive_failures))
+    self.metrics.register(LambdaGauge('consecutive_successes',
+        lambda: self.threaded_health_checker.current_consecutive_successes))
     self.metrics.register(LambdaGauge('snoozed', lambda: int(self.threaded_health_checker.snoozed)))
     self.metrics.register(LambdaGauge('total_latency_secs', lambda: self._total_latency))
     self.metrics.register(LambdaGauge('checks', lambda: self._health_checks))
@@ -192,9 +218,12 @@ class HealthChecker(StatusChecker):
 
   @property
   def status(self):
-    if not self.threaded_health_checker.healthy:
+    if self.threaded_health_checker.healthy:
+      if self.threaded_health_checker.health_check_passed:
+        return StatusResult('Task is healthy.', mesos_pb2.TASK_RUNNING)
+    else:
       return StatusResult('Failed health check! %s' % self.threaded_health_checker.reason,
-          TaskState.Value('TASK_FAILED'))
+          mesos_pb2.TASK_FAILED)
 
   def name(self):
     return 'health_checker'

http://git-wip-us.apache.org/repos/asf/aurora/blob/ca683cb9/src/main/python/apache/aurora/executor/common/status_checker.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/aurora/executor/common/status_checker.py b/src/main/python/apache/aurora/executor/common/status_checker.py
index 795dae2..fccb69a 100644
--- a/src/main/python/apache/aurora/executor/common/status_checker.py
+++ b/src/main/python/apache/aurora/executor/common/status_checker.py
@@ -14,7 +14,7 @@
 
 from abc import abstractmethod, abstractproperty
 
-from mesos.interface.mesos_pb2 import TaskState
+from mesos.interface import mesos_pb2
 from twitter.common import log
 from twitter.common.lang import Interface
 from twitter.common.metrics import Observable
@@ -31,7 +31,7 @@ class StatusResult(object):
 
   def __init__(self, reason, status):
     self._reason = reason
-    if status not in TaskState.values():
+    if status not in mesos_pb2.TaskState.values():
       raise ValueError('Unknown task state: %r' % status)
     self._status = status
 
@@ -47,7 +47,7 @@ class StatusResult(object):
     return '%s(%r, status=%r)' % (
         self.__class__.__name__,
         self._reason,
-        TaskState.Name(self._status))
+        mesos_pb2.TaskState.Name(self._status))
 
 
 class StatusChecker(Observable, Interface):
@@ -85,24 +85,25 @@ class Healthy(StatusChecker):
 class ChainedStatusChecker(StatusChecker):
   def __init__(self, status_checkers):
     self._status_checkers = status_checkers
-    self._status = None
     if not all(isinstance(h_i, StatusChecker) for h_i in status_checkers):
       raise TypeError('ChainedStatusChecker must take an iterable of StatusCheckers.')
     super(ChainedStatusChecker, self).__init__()
 
   @property
   def status(self):
-    if self._status is None:
-      for status_checker in self._status_checkers:
-        status_checker_status = status_checker.status
-        if status_checker_status is not None:
-          log.info('%s reported %s' % (status_checker.__class__.__name__, status_checker_status))
-          if not isinstance(status_checker_status, StatusResult):
-            raise TypeError('StatusChecker returned something other than a StatusResult: got %s' %
-                type(status_checker_status))
-          self._status = status_checker_status
+    # Do not memoize the status so that faluires occur in RUNNING state won't be masked.
+    status = None
+    for status_checker in self._status_checkers:
+      status_checker_status = status_checker.status
+      if status_checker_status is not None:
+        log.info('%s reported %s' % (status_checker.__class__.__name__, status_checker_status))
+        if not isinstance(status_checker_status, StatusResult):
+          raise TypeError('StatusChecker returned something other than a StatusResult: got %s' %
+              type(status_checker_status))
+        status = status_checker_status
+        if status_checker_status.status is not mesos_pb2.TASK_RUNNING:
           break
-    return self._status
+    return status
 
   def start(self):
     for status_checker in self._status_checkers:

http://git-wip-us.apache.org/repos/asf/aurora/blob/ca683cb9/src/main/python/apache/aurora/executor/status_manager.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/aurora/executor/status_manager.py b/src/main/python/apache/aurora/executor/status_manager.py
index 228a99a..26aea23 100644
--- a/src/main/python/apache/aurora/executor/status_manager.py
+++ b/src/main/python/apache/aurora/executor/status_manager.py
@@ -14,6 +14,7 @@
 
 import time
 
+from mesos.interface import mesos_pb2
 from twitter.common import log
 from twitter.common.exceptions import ExceptionalThread
 from twitter.common.quantity import Amount, Time
@@ -31,23 +32,43 @@ class StatusManager(ExceptionalThread):
   """
   POLL_WAIT = Amount(500, Time.MILLISECONDS)
 
-  def __init__(self, status_checker, callback, clock=time):
+  def __init__(self, status_checker, running, shutdown, clock=time):
+    """
+    :param status_checker: status checker to check the task status
+    :type status_checker: ChainedStatusChecker
+    :param running: callback function that handles the RUNNING task status
+    :type running: callable
+    :param shutdown: callback function that handles the terminal task status
+    :type shutdown: callable
+    :param clock: time module available to be mocked for testing
+    :type clock: time modulet
+    """
     if not isinstance(status_checker, StatusChecker):
       raise TypeError('status_checker must be a StatusChecker, got %s' % type(status_checker))
-    if not callable(callback):
-      raise TypeError('callback needs to be callable!')
+    if not callable(running):
+      raise TypeError('running callback needs to be callable!')
+    if not callable(shutdown):
+      raise TypeError('shutdown callback needs to be callable!')
     self._status_checker = status_checker
-    self._callback = callback
+    self._running = running
+    self._shutdown = shutdown
     self._clock = clock
     super(StatusManager, self).__init__()
     self.daemon = True
+    self.is_task_running = False
 
   def run(self):
     while True:
       status_result = self._status_checker.status
       if status_result is not None:
         log.info('Status manager got %s' % status_result)
-        self._callback(status_result)
-        break
+        if status_result.status is mesos_pb2.TASK_RUNNING:
+          if not self.is_task_running:
+            self._running(status_result)
+            self.is_task_running = True
+          self._clock.sleep(self.POLL_WAIT.as_(Time.SECONDS))
+        else:
+          self._shutdown(status_result)
+          break
       else:
         self._clock.sleep(self.POLL_WAIT.as_(Time.SECONDS))

http://git-wip-us.apache.org/repos/asf/aurora/blob/ca683cb9/src/test/python/apache/aurora/executor/common/test_health_checker.py
----------------------------------------------------------------------
diff --git a/src/test/python/apache/aurora/executor/common/test_health_checker.py b/src/test/python/apache/aurora/executor/common/test_health_checker.py
index da0c56c..28769dc 100644
--- a/src/test/python/apache/aurora/executor/common/test_health_checker.py
+++ b/src/test/python/apache/aurora/executor/common/test_health_checker.py
@@ -20,7 +20,7 @@ import unittest
 
 import mock
 import pytest
-from mesos.interface.mesos_pb2 import TaskState
+from mesos.interface import mesos_pb2
 from twitter.common.exceptions import ExceptionalThread
 from twitter.common.testing.clock import ThreadedClock
 
@@ -59,23 +59,28 @@ class TestHealthChecker(unittest.TestCase):
       self.fake_health_checks.append((status, 'reason'))
 
   def test_initial_interval_2x(self):
-    self.append_health_checks(False)
+    self.append_health_checks(False, 2)
+    self.append_health_checks(True, 1)
+    self.append_health_checks(False, 1)
     hct = HealthChecker(self._checker.health, interval_secs=5, clock=self._clock)
     hct.start()
     assert self._clock.converge(threads=[hct.threaded_health_checker])
-    self._clock.assert_waiting(hct.threaded_health_checker, 10)
+    self._clock.assert_waiting(hct.threaded_health_checker, amount=5)
     assert hct.status is None
-    self._clock.tick(6)
+    self._clock.tick(5)
     assert self._clock.converge(threads=[hct.threaded_health_checker])
+    self._clock.assert_waiting(hct.threaded_health_checker, amount=5)
     assert hct.status is None
-    self._clock.tick(3)
+    self._clock.tick(5)
     assert self._clock.converge(threads=[hct.threaded_health_checker])
-    assert hct.status is None
+    self._clock.assert_waiting(hct.threaded_health_checker, amount=5)
+    assert hct.status.status == mesos_pb2.TASK_RUNNING
     self._clock.tick(5)
     assert self._clock.converge(threads=[hct.threaded_health_checker])
-    assert hct.status.status == TaskState.Value('TASK_FAILED')
+    self._clock.assert_waiting(hct.threaded_health_checker, amount=5)
+    assert hct.status.status == mesos_pb2.TASK_FAILED
     hct.stop()
-    assert self._checker.health.call_count == 1
+    assert self._checker.health.call_count == 4
 
   def test_initial_interval_whatev(self):
     self.append_health_checks(False, 2)
@@ -87,7 +92,11 @@ class TestHealthChecker(unittest.TestCase):
     hct.start()
     self._clock.converge(threads=[hct.threaded_health_checker])
     self._clock.assert_waiting(hct.threaded_health_checker, amount=5)
-    assert hct.status.status == TaskState.Value('TASK_FAILED')
+    assert hct.status is None
+    self._clock.tick(5)
+    self._clock.converge(threads=[hct.threaded_health_checker])
+    self._clock.assert_waiting(hct.threaded_health_checker, amount=5)
+    assert hct.status.status == mesos_pb2.TASK_FAILED
     hct.stop()
     # this is an implementation detail -- we healthcheck in the initializer and
     # healthcheck in the run loop.  if we ever change the implementation, expect
@@ -111,38 +120,36 @@ class TestHealthChecker(unittest.TestCase):
     self._clock.converge(threads=[hct.threaded_health_checker])
 
     # 2 consecutive health check failures followed by a successful health check.
-    epsilon = 0.001
-    self._clock.tick(initial_interval_secs + epsilon)
     self._clock.converge(threads=[hct.threaded_health_checker])
     self._clock.assert_waiting(hct.threaded_health_checker, amount=1)
     assert hct.status is None
     assert hct.metrics.sample()['consecutive_failures'] == 1
-    self._clock.tick(interval_secs + epsilon)
+    self._clock.tick(interval_secs)
     self._clock.converge(threads=[hct.threaded_health_checker])
     self._clock.assert_waiting(hct.threaded_health_checker, amount=1)
     assert hct.status is None
     assert hct.metrics.sample()['consecutive_failures'] == 2
-    self._clock.tick(interval_secs + epsilon)
+    self._clock.tick(interval_secs)
     self._clock.converge(threads=[hct.threaded_health_checker])
     self._clock.assert_waiting(hct.threaded_health_checker, amount=1)
-    assert hct.status is None
+    assert hct.status.status == mesos_pb2.TASK_RUNNING
     assert hct.metrics.sample()['consecutive_failures'] == 0
 
     # 3 consecutive health check failures.
-    self._clock.tick(interval_secs + epsilon)
+    self._clock.tick(interval_secs)
     self._clock.converge(threads=[hct.threaded_health_checker])
     self._clock.assert_waiting(hct.threaded_health_checker, amount=1)
-    assert hct.status is None
+    assert hct.status.status == mesos_pb2.TASK_RUNNING
     assert hct.metrics.sample()['consecutive_failures'] == 1
-    self._clock.tick(interval_secs + epsilon)
+    self._clock.tick(interval_secs)
     self._clock.converge(threads=[hct.threaded_health_checker])
     self._clock.assert_waiting(hct.threaded_health_checker, amount=1)
-    assert hct.status is None
+    assert hct.status.status == mesos_pb2.TASK_RUNNING
     assert hct.metrics.sample()['consecutive_failures'] == 2
-    self._clock.tick(interval_secs + epsilon)
+    self._clock.tick(interval_secs)
     self._clock.converge(threads=[hct.threaded_health_checker])
     self._clock.assert_waiting(hct.threaded_health_checker, amount=1)
-    assert hct.status.status == TaskState.Value('TASK_FAILED')
+    assert hct.status.status == mesos_pb2.TASK_FAILED
     assert hct.metrics.sample()['consecutive_failures'] == 3
     hct.stop()
     assert self._checker.health.call_count == 6
@@ -439,9 +446,10 @@ class TestThreadedHealthChecker(unittest.TestCase):
     self.sandbox.exists.return_value = True
     self.sandbox.root = '/root'
 
-    self.initial_interval_secs = 1
-    self.interval_secs = 5
+    self.initial_interval_secs = 2
+    self.interval_secs = 1
     self.max_consecutive_failures = 2
+    self.min_consecutive_successes = 1
     self.clock = mock.Mock(spec=time)
     self.clock.time.return_value = 1.0
     self.health_checker = HealthChecker(
@@ -450,6 +458,7 @@ class TestThreadedHealthChecker(unittest.TestCase):
         self.interval_secs,
         self.initial_interval_secs,
         self.max_consecutive_failures,
+        self.min_consecutive_successes,
         self.clock)
     self.health_checker_sandbox_exists = HealthChecker(
         self.health,
@@ -457,6 +466,7 @@ class TestThreadedHealthChecker(unittest.TestCase):
         self.interval_secs,
         self.initial_interval_secs,
         self.max_consecutive_failures,
+        self.min_consecutive_successes,
         self.clock)
 
   def test_perform_check_if_not_disabled_snooze_file_is_none(self):
@@ -491,17 +501,28 @@ class TestThreadedHealthChecker(unittest.TestCase):
     hc = self.health_checker.threaded_health_checker
 
     assert hc.current_consecutive_failures == 0
+    assert hc.current_consecutive_successes == 0
+    assert hc.healthy is True
+
+    hc._maybe_update_failure_count(False, 'reason')
+    assert hc.current_consecutive_failures == 1
+    assert hc.current_consecutive_successes == 0
     assert hc.healthy is True
 
     hc._maybe_update_failure_count(True, 'reason')
     assert hc.current_consecutive_failures == 0
+    assert hc.current_consecutive_successes == 1
+    assert hc.healthy is True
 
+    hc._expired = True
     hc._maybe_update_failure_count(False, 'reason')
     assert hc.current_consecutive_failures == 1
+    assert hc.current_consecutive_successes == 0
     assert hc.healthy is True
 
     hc._maybe_update_failure_count(False, 'reason')
     assert hc.current_consecutive_failures == 2
+    assert hc.current_consecutive_successes == 0
     assert hc.healthy is True
 
     hc._maybe_update_failure_count(False, 'reason')
@@ -517,7 +538,7 @@ class TestThreadedHealthChecker(unittest.TestCase):
     liveness = [False, False, True]
     self.health_checker.threaded_health_checker.dead.is_set.side_effect = lambda: liveness.pop(0)
     self.health_checker.threaded_health_checker.run()
-    assert self.clock.sleep.call_count == 3
+    assert self.clock.sleep.call_count == 2
     assert mock_maybe_update_failure_count.call_count == 2
 
   @mock.patch('apache.aurora.executor.common.health_checker.ExceptionalThread.start',

http://git-wip-us.apache.org/repos/asf/aurora/blob/ca683cb9/src/test/python/apache/aurora/executor/common/test_status_checker.py
----------------------------------------------------------------------
diff --git a/src/test/python/apache/aurora/executor/common/test_status_checker.py b/src/test/python/apache/aurora/executor/common/test_status_checker.py
index 5be1981..8942639 100644
--- a/src/test/python/apache/aurora/executor/common/test_status_checker.py
+++ b/src/test/python/apache/aurora/executor/common/test_status_checker.py
@@ -14,7 +14,7 @@
 
 import threading
 
-from mesos.interface.mesos_pb2 import TaskState
+from mesos.interface import mesos_pb2
 
 from apache.aurora.executor.common.status_checker import (
     ChainedStatusChecker,
@@ -62,11 +62,42 @@ def test_chained_health_interface():
     assert si.started.is_set()
 
   assert chained_si.status is None
-  reason = StatusResult('derp', TaskState.Value('TASK_FAILED'))
+  reason = StatusResult('derp', mesos_pb2.TASK_FAILED)
   si2.set_status(reason)
   assert chained_si.status == reason
   assert chained_si.status.reason == 'derp'
-  assert TaskState.Name(chained_si.status.status) == 'TASK_FAILED'
+  assert mesos_pb2.TaskState.Name(chained_si.status.status) == 'TASK_FAILED'
+
+  for si in (si1, si2):
+    assert not si.stopped.is_set()
+  chained_si.stop()
+  for si in (si1, si2):
+    assert si.stopped.is_set()
+
+  # A task may fail after transition into RUNNING state. We need to test
+  # the status is not memoized in ChainedStatusChecker.
+  si1 = EventHealth()
+  si2 = EventHealth()
+  chained_si = ChainedStatusChecker([si1, si2])
+
+  for si in (si1, si2):
+    assert not si.started.is_set()
+  chained_si.start()
+  for si in (si1, si2):
+    assert si.started.is_set()
+
+  assert chained_si.status is None
+  reason2 = StatusResult('Task is healthy.', mesos_pb2.TASK_RUNNING)
+  si2.set_status(reason2)
+  assert chained_si.status == reason2
+  assert chained_si.status.reason == 'Task is healthy.'
+  assert mesos_pb2.TaskState.Name(chained_si.status.status) == 'TASK_RUNNING'
+
+  reason1 = StatusResult('derp', mesos_pb2.TASK_FAILED)
+  si1.set_status(reason1)
+  assert chained_si.status == reason1
+  assert chained_si.status.reason == 'derp'
+  assert mesos_pb2.TaskState.Name(chained_si.status.status) == 'TASK_FAILED'
 
   for si in (si1, si2):
     assert not si.stopped.is_set()

http://git-wip-us.apache.org/repos/asf/aurora/blob/ca683cb9/src/test/python/apache/aurora/executor/test_status_manager.py
----------------------------------------------------------------------
diff --git a/src/test/python/apache/aurora/executor/test_status_manager.py b/src/test/python/apache/aurora/executor/test_status_manager.py
index ce4679b..7c0efe8 100644
--- a/src/test/python/apache/aurora/executor/test_status_manager.py
+++ b/src/test/python/apache/aurora/executor/test_status_manager.py
@@ -16,9 +16,9 @@ import time
 from unittest import TestCase
 
 import mock
-from mesos.interface.mesos_pb2 import TaskState
+from mesos.interface import mesos_pb2
 
-from apache.aurora.executor.common.status_checker import StatusChecker
+from apache.aurora.executor.common.status_checker import StatusChecker, StatusResult
 from apache.aurora.executor.status_manager import StatusManager
 
 
@@ -28,23 +28,29 @@ class FakeStatusChecker(StatusChecker):
 
   @property
   def status(self):
+    status_result = None
+    if self.call_count == 1:
+      status_result = StatusResult('Task is healthy.', mesos_pb2.TASK_RUNNING)
     if self.call_count == 2:
-      return TaskState.Value('TASK_KILLED')
+      status_result = StatusResult('Task is healthy.', mesos_pb2.TASK_RUNNING)
+    if self.call_count == 3:
+      status_result = StatusResult('Reason why a task failed.', mesos_pb2.TASK_KILLED)
     self.call_count += 1
-    return None
+    return status_result
 
 
 class TestStatusManager(TestCase):
   def setUp(self):
-    self.callback_called = False
+    self.callback_call_count = 0
 
   def test_run(self):
     checker = FakeStatusChecker()
-    def callback(result):
-      assert result == TaskState.Value('TASK_KILLED')
-      self.callback_called = True
+    def running(result):
+      self.callback_call_count += 1
+    def shutdown(result):
+      self.callback_call_count += 1
     mock_time = mock.create_autospec(spec=time, instance=True)
-    status_manager = StatusManager(checker, callback, mock_time)
+    status_manager = StatusManager(checker, running, shutdown, mock_time)
     status_manager.run()
-    assert mock_time.sleep.call_count == 2
-    assert self.callback_called is True
+    assert mock_time.sleep.call_count == 3
+    assert self.callback_call_count == 2

http://git-wip-us.apache.org/repos/asf/aurora/blob/ca683cb9/src/test/python/apache/aurora/executor/test_thermos_executor.py
----------------------------------------------------------------------
diff --git a/src/test/python/apache/aurora/executor/test_thermos_executor.py b/src/test/python/apache/aurora/executor/test_thermos_executor.py
index 0bfe9e9..ac914db 100644
--- a/src/test/python/apache/aurora/executor/test_thermos_executor.py
+++ b/src/test/python/apache/aurora/executor/test_thermos_executor.py
@@ -44,9 +44,9 @@ from apache.aurora.config.schema.base import (
 )
 from apache.aurora.executor.aurora_executor import AuroraExecutor
 from apache.aurora.executor.common.executor_timeout import ExecutorTimeout
-from apache.aurora.executor.common.health_checker import HealthCheckerProvider
+from apache.aurora.executor.common.health_checker import HealthChecker, HealthCheckerProvider
 from apache.aurora.executor.common.sandbox import DirectorySandbox, SandboxProvider
-from apache.aurora.executor.common.status_checker import ChainedStatusChecker
+from apache.aurora.executor.common.status_checker import ChainedStatusChecker, StatusCheckerProvider
 from apache.aurora.executor.common.task_runner import TaskError
 from apache.aurora.executor.status_manager import StatusManager
 from apache.aurora.executor.thermos_task_runner import (
@@ -100,6 +100,14 @@ class FailingSandboxProvider(SandboxProvider):
     return FailingSandbox(safe_mkdtemp(), exception_type=self._exception_type, **kwargs)
 
 
+class FailingStatusCheckerProvider(StatusCheckerProvider):
+  def __init__(self, exception_type):
+    self._exception_type = exception_type
+
+  def from_assigned_task(self, assigned_task, sandbox):
+    raise self._exception_type('Could not create status checker!')
+
+
 class SlowSandbox(DirectorySandbox):
   def __init__(self, *args, **kwargs):
     super(SlowSandbox, self).__init__(*args, **kwargs)
@@ -217,18 +225,25 @@ def make_executor(
   ExecutorTimeout(te.launched, proxy_driver, timeout=Amount(100, Time.MILLISECONDS)).start()
   task_description = make_task(task, assigned_ports=ports, instanceId=0)
   te.launchTask(proxy_driver, task_description)
-
   te.status_manager_started.wait()
 
-  while len(proxy_driver.method_calls['sendStatusUpdate']) < 2:
+  is_health_check_enabled = False
+  for status_checker in te._chained_checker._status_checkers:
+    if isinstance(status_checker, HealthChecker):
+      is_health_check_enabled = True
+      break
+
+  status_update_nums = 1 if is_health_check_enabled else 2
+  while len(proxy_driver.method_calls['sendStatusUpdate']) < status_update_nums:
     time.sleep(0.1)
 
   # make sure startup was kosher
   updates = proxy_driver.method_calls['sendStatusUpdate']
-  assert len(updates) == 2
+  assert len(updates) == status_update_nums
   status_updates = [arg_tuple[0][0] for arg_tuple in updates]
   assert status_updates[0].state == mesos_pb2.TASK_STARTING
-  assert status_updates[1].state == mesos_pb2.TASK_RUNNING
+  if not is_health_check_enabled:
+    assert status_updates[1].state == mesos_pb2.TASK_RUNNING
 
   # wait for the runner to bind to a task
   while True:
@@ -236,7 +251,6 @@ def make_executor(
     if runner:
       break
     time.sleep(0.1)
-
   assert te.launched.is_set()
   return runner, te
 
@@ -438,7 +452,7 @@ class TestThermosExecutor(object):
         executor.terminated.wait()
 
     updates = proxy_driver.method_calls['sendStatusUpdate']
-    assert len(updates) == 3
+    assert len(updates) == 2
     assert updates[-1][0][0].state == mesos_pb2.TASK_FAILED
 
   def test_task_health_ok(self):
@@ -572,6 +586,20 @@ class TestThermosExecutor(object):
     assert updates[0][0][0].state == mesos_pb2.TASK_STARTING
     assert updates[1][0][0].state == mesos_pb2.TASK_FAILED
 
+  def test_failing_status_provider_initialize_unknown_exception(self):
+    proxy_driver = ProxyDriver()
+
+    with temporary_dir() as td:
+      te = FastThermosExecutor(
+              runner_provider=make_provider(td),
+              sandbox_provider=DefaultTestSandboxProvider(),
+              status_providers=(FailingStatusCheckerProvider(exception_type=Exception),))
+      te.launchTask(proxy_driver, make_task(HELLO_WORLD_MTI))
+      proxy_driver.wait_stopped()
+
+      updates = proxy_driver.method_calls['sendStatusUpdate']
+      assert updates[-1][0][0].state == mesos_pb2.TASK_FAILED
+
 
 def test_waiting_executor():
   proxy_driver = ProxyDriver()