You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aurora.apache.org by ke...@apache.org on 2014/10/07 23:00:47 UTC

git commit: Prevent initial ZK timeouts from killing the executor.

Repository: incubator-aurora
Updated Branches:
  refs/heads/master c1c394b51 -> 539793878


Prevent initial ZK timeouts from killing the executor.

In addition, prevent uncaught exceptions from killing the executor.

Testing Done:
./pants src/test/python/apache/aurora/executor:executor-small
./src/test/sh/org/apache/aurora/e2e/test_end_to_end.sh

Bugs closed: AURORA-728

Reviewed at https://reviews.apache.org/r/25974/


Project: http://git-wip-us.apache.org/repos/asf/incubator-aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-aurora/commit/53979387
Tree: http://git-wip-us.apache.org/repos/asf/incubator-aurora/tree/53979387
Diff: http://git-wip-us.apache.org/repos/asf/incubator-aurora/diff/53979387

Branch: refs/heads/master
Commit: 539793878f953c798ac1bd45df84a4574850891d
Parents: c1c394b
Author: Zameer Manji <zm...@twopensource.com>
Authored: Tue Oct 7 13:33:14 2014 -0700
Committer: Kevin Sweeney <ke...@apache.org>
Committed: Tue Oct 7 13:33:14 2014 -0700

----------------------------------------------------------------------
 .../apache/aurora/executor/aurora_executor.py   |  6 +-
 .../apache/aurora/executor/common/announcer.py  | 59 +++++++++++++++-----
 .../aurora/executor/common/test_announcer.py    | 31 +++++++++-
 3 files changed, 80 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/53979387/src/main/python/apache/aurora/executor/aurora_executor.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/aurora/executor/aurora_executor.py b/src/main/python/apache/aurora/executor/aurora_executor.py
index 79a2485..2c6423d 100644
--- a/src/main/python/apache/aurora/executor/aurora_executor.py
+++ b/src/main/python/apache/aurora/executor/aurora_executor.py
@@ -118,7 +118,11 @@ class AuroraExecutor(ExecutorBase, Observable):
 
     self.send_update(driver, self._task_id, mesos_pb2.TASK_RUNNING)
 
-    self._start_status_manager(driver, assigned_task)
+    try:
+      self._start_status_manager(driver, assigned_task)
+    except Exception:
+      log.error(traceback.format_exc())
+      self._die(driver, mesos_pb2.TASK_FAILED, "Internal error")
 
   def _initialize_sandbox(self, driver, assigned_task):
     self._sandbox = self._sandbox_provider.from_assigned_task(assigned_task)

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/53979387/src/main/python/apache/aurora/executor/common/announcer.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/aurora/executor/common/announcer.py b/src/main/python/apache/aurora/executor/common/announcer.py
index c466da8..74b2114 100644
--- a/src/main/python/apache/aurora/executor/common/announcer.py
+++ b/src/main/python/apache/aurora/executor/common/announcer.py
@@ -20,6 +20,7 @@ from abc import abstractmethod
 
 from kazoo.client import KazooClient
 from kazoo.retry import KazooRetry
+from mesos.interface import mesos_pb2
 from twitter.common import log
 from twitter.common.concurrent.deferred import defer
 from twitter.common.exceptions import ExceptionalThread
@@ -27,7 +28,11 @@ from twitter.common.metrics import LambdaGauge, Observable
 from twitter.common.quantity import Amount, Time
 from twitter.common.zookeeper.serverset import Endpoint, ServerSet
 
-from apache.aurora.executor.common.status_checker import StatusChecker, StatusCheckerProvider
+from apache.aurora.executor.common.status_checker import (
+    StatusChecker,
+    StatusCheckerProvider,
+    StatusResult
+)
 from apache.aurora.executor.common.task_info import (
     mesos_task_instance_from_assigned_task,
     resolve_ports
@@ -55,8 +60,12 @@ class AnnouncerCheckerProvider(StatusCheckerProvider):
     super(AnnouncerCheckerProvider, self).__init__()
 
   @abstractmethod
-  def make_serverset(self, assigned_task):
-    """Given an assigned task, return the serverset into which we should announce the task."""
+  def make_zk_client(self):
+    """Create a ZooKeeper client which can be asyncronously started"""
+
+  @abstractmethod
+  def make_zk_path(self, assigned_task):
+    """Given an assigned task return the path into where we should announce the task."""
 
   def from_assigned_task(self, assigned_task, _):
     mesos_task = mesos_task_instance_from_assigned_task(assigned_task)
@@ -71,10 +80,17 @@ class AnnouncerCheckerProvider(StatusCheckerProvider):
         portmap,
         mesos_task.announce().primary_port().get())
 
-    serverset = self.make_serverset(assigned_task)
+    client = self.make_zk_client()
+    path = self.make_zk_path(assigned_task)
+
+    initial_interval = mesos_task.health_check_config().initial_interval_secs().get()
+    interval = mesos_task.health_check_config().interval_secs().get()
+    consecutive_failures = mesos_task.health_check_config().max_consecutive_failures().get()
+    timeout_secs = initial_interval + (consecutive_failures * interval)
 
     return AnnouncerChecker(
-        serverset, endpoint, additional=additional, shard=assigned_task.instanceId, name=self.name)
+      client, path, timeout_secs, endpoint, additional=additional, shard=assigned_task.instanceId,
+      name=self.name)
 
 
 class DefaultAnnouncerCheckerProvider(AnnouncerCheckerProvider):
@@ -90,15 +106,15 @@ class DefaultAnnouncerCheckerProvider(AnnouncerCheckerProvider):
     self.__root = root
     super(DefaultAnnouncerCheckerProvider, self).__init__()
 
-  def make_serverset(self, assigned_task):
+  def make_zk_client(self):
+    return KazooClient(self.__ensemble, connection_retry=self.DEFAULT_RETRY_POLICY)
+
+  def make_zk_path(self, assigned_task):
     role, environment, name = (
         assigned_task.task.owner.role,
         assigned_task.task.environment,
         assigned_task.task.jobName)
-    path = posixpath.join(self.__root, role, environment, name)
-    client = KazooClient(self.__ensemble, connection_retry=self.DEFAULT_RETRY_POLICY)
-    client.start()
-    return ServerSet(client, path)
+    return posixpath.join(self.__root, role, environment, name)
 
 
 class ServerSetJoinThread(ExceptionalThread):
@@ -205,20 +221,35 @@ class Announcer(Observable):
 class AnnouncerChecker(StatusChecker):
   DEFAULT_NAME = 'announcer'
 
-  def __init__(self, serverset, endpoint, additional=None, shard=None, name=None):
-    self.__announcer = Announcer(serverset, endpoint, additional=additional, shard=shard)
+  def __init__(self, client, path, timeout_secs, endpoint, additional=None, shard=None, name=None):
+    self.__client = client
+    self.__connect_event = client.start_async()
+    self.__timeout_secs = timeout_secs
+    self.__announcer = Announcer(ServerSet(client, path), endpoint, additional=additional,
+        shard=shard)
     self.__name = name or self.DEFAULT_NAME
+    self.__status = None
+    self.start_event = threading.Event()
     self.metrics.register(LambdaGauge('disconnected_time', self.__announcer.disconnected_time))
 
   @property
   def status(self):
-    return None  # always return healthy
+    return self.__status
 
   def name(self):
     return self.__name
 
+  def __start(self):
+    self.__connect_event.wait(timeout=self.__timeout_secs)
+    if not self.__connect_event.is_set():
+      self.__status = StatusResult("Creating Announcer Serverset timed out.", mesos_pb2.TASK_FAILED)
+    else:
+      self.__announcer.start()
+
+    self.start_event.set()
+
   def start(self):
-    self.__announcer.start()
+    defer(self.__start)
 
   def stop(self):
     defer(self.__announcer.stop)

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/53979387/src/test/python/apache/aurora/executor/common/test_announcer.py
----------------------------------------------------------------------
diff --git a/src/test/python/apache/aurora/executor/common/test_announcer.py b/src/test/python/apache/aurora/executor/common/test_announcer.py
index 4f6e200..5694335 100644
--- a/src/test/python/apache/aurora/executor/common/test_announcer.py
+++ b/src/test/python/apache/aurora/executor/common/test_announcer.py
@@ -22,6 +22,7 @@ from twitter.common.quantity import Amount, Time
 from twitter.common.testing.clock import ThreadedClock
 from twitter.common.zookeeper.serverset import Endpoint, ServerSet
 
+from apache.aurora.config.schema.base import HealthCheckConfig
 from apache.aurora.executor.common.announcer import (
     Announcer,
     DefaultAnnouncerCheckerProvider,
@@ -235,6 +236,34 @@ def test_make_empty_endpoints():
 
 @mock.patch('apache.aurora.executor.common.announcer.ServerSet')
 @mock.patch('apache.aurora.executor.common.announcer.KazooClient')
+def test_announcer_provider_with_timeout(mock_client_provider, mock_serverset_provider):
+  mock_client = mock.MagicMock(spec=KazooClient)
+  mock_client_provider.return_value = mock_client
+  client_connect_event = threading.Event()
+  mock_client.start_async.return_value = client_connect_event
+
+  mock_serverset = mock.MagicMock(spec=ServerSet)
+  mock_serverset_provider.return_value = mock_serverset
+
+  dap = DefaultAnnouncerCheckerProvider('zookeeper.example.com', root='/aurora')
+  job = make_job('aurora', 'prod', 'proxy', 'primary', portmap={'http': 80, 'admin': 'primary'})
+
+  health_check_config = HealthCheckConfig(initial_interval_secs=0.1, interval_secs=0.1)
+  job = job(health_check_config=health_check_config)
+  assigned_task = make_assigned_task(job, assigned_ports={'primary': 12345})
+  checker = dap.from_assigned_task(assigned_task, None)
+
+  mock_client.start_async.assert_called_once_with()
+  mock_serverset_provider.assert_called_once_with(mock_client, '/aurora/aurora/prod/proxy')
+
+  checker.start()
+  checker.start_event.wait()
+
+  assert checker.status is not None
+
+
+@mock.patch('apache.aurora.executor.common.announcer.ServerSet')
+@mock.patch('apache.aurora.executor.common.announcer.KazooClient')
 def test_default_announcer_provider(mock_client_provider, mock_serverset_provider):
   mock_client = mock.MagicMock(spec=KazooClient)
   mock_client_provider.return_value = mock_client
@@ -246,7 +275,7 @@ def test_default_announcer_provider(mock_client_provider, mock_serverset_provide
   assigned_task = make_assigned_task(job, assigned_ports={'primary': 12345})
   checker = dap.from_assigned_task(assigned_task, None)
 
-  mock_client.start.assert_called_once_with()
+  mock_client.start_async.assert_called_once_with()
   mock_serverset_provider.assert_called_once_with(mock_client, '/aurora/aurora/prod/proxy')
   assert checker.name() == 'announcer'
   assert checker.status is None