You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aurora.apache.org by ke...@apache.org on 2014/10/07 23:00:47 UTC
git commit: Prevent initial ZK timeouts from killing the executor.
Repository: incubator-aurora
Updated Branches:
refs/heads/master c1c394b51 -> 539793878
Prevent initial ZK timeouts from killing the executor.
In addition, prevent uncaught exceptions from killing the executor.
Testing Done:
./pants src/test/python/apache/aurora/executor:executor-small
./src/test/sh/org/apache/aurora/e2e/test_end_to_end.sh
Bugs closed: AURORA-728
Reviewed at https://reviews.apache.org/r/25974/
Project: http://git-wip-us.apache.org/repos/asf/incubator-aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-aurora/commit/53979387
Tree: http://git-wip-us.apache.org/repos/asf/incubator-aurora/tree/53979387
Diff: http://git-wip-us.apache.org/repos/asf/incubator-aurora/diff/53979387
Branch: refs/heads/master
Commit: 539793878f953c798ac1bd45df84a4574850891d
Parents: c1c394b
Author: Zameer Manji <zm...@twopensource.com>
Authored: Tue Oct 7 13:33:14 2014 -0700
Committer: Kevin Sweeney <ke...@apache.org>
Committed: Tue Oct 7 13:33:14 2014 -0700
----------------------------------------------------------------------
.../apache/aurora/executor/aurora_executor.py | 6 +-
.../apache/aurora/executor/common/announcer.py | 59 +++++++++++++++-----
.../aurora/executor/common/test_announcer.py | 31 +++++++++-
3 files changed, 80 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/53979387/src/main/python/apache/aurora/executor/aurora_executor.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/aurora/executor/aurora_executor.py b/src/main/python/apache/aurora/executor/aurora_executor.py
index 79a2485..2c6423d 100644
--- a/src/main/python/apache/aurora/executor/aurora_executor.py
+++ b/src/main/python/apache/aurora/executor/aurora_executor.py
@@ -118,7 +118,11 @@ class AuroraExecutor(ExecutorBase, Observable):
self.send_update(driver, self._task_id, mesos_pb2.TASK_RUNNING)
- self._start_status_manager(driver, assigned_task)
+ try:
+ self._start_status_manager(driver, assigned_task)
+ except Exception:
+ log.error(traceback.format_exc())
+ self._die(driver, mesos_pb2.TASK_FAILED, "Internal error")
def _initialize_sandbox(self, driver, assigned_task):
self._sandbox = self._sandbox_provider.from_assigned_task(assigned_task)
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/53979387/src/main/python/apache/aurora/executor/common/announcer.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/aurora/executor/common/announcer.py b/src/main/python/apache/aurora/executor/common/announcer.py
index c466da8..74b2114 100644
--- a/src/main/python/apache/aurora/executor/common/announcer.py
+++ b/src/main/python/apache/aurora/executor/common/announcer.py
@@ -20,6 +20,7 @@ from abc import abstractmethod
from kazoo.client import KazooClient
from kazoo.retry import KazooRetry
+from mesos.interface import mesos_pb2
from twitter.common import log
from twitter.common.concurrent.deferred import defer
from twitter.common.exceptions import ExceptionalThread
@@ -27,7 +28,11 @@ from twitter.common.metrics import LambdaGauge, Observable
from twitter.common.quantity import Amount, Time
from twitter.common.zookeeper.serverset import Endpoint, ServerSet
-from apache.aurora.executor.common.status_checker import StatusChecker, StatusCheckerProvider
+from apache.aurora.executor.common.status_checker import (
+ StatusChecker,
+ StatusCheckerProvider,
+ StatusResult
+)
from apache.aurora.executor.common.task_info import (
mesos_task_instance_from_assigned_task,
resolve_ports
@@ -55,8 +60,12 @@ class AnnouncerCheckerProvider(StatusCheckerProvider):
super(AnnouncerCheckerProvider, self).__init__()
@abstractmethod
- def make_serverset(self, assigned_task):
- """Given an assigned task, return the serverset into which we should announce the task."""
+ def make_zk_client(self):
+ """Create a ZooKeeper client which can be asyncronously started"""
+
+ @abstractmethod
+ def make_zk_path(self, assigned_task):
+ """Given an assigned task return the path into where we should announce the task."""
def from_assigned_task(self, assigned_task, _):
mesos_task = mesos_task_instance_from_assigned_task(assigned_task)
@@ -71,10 +80,17 @@ class AnnouncerCheckerProvider(StatusCheckerProvider):
portmap,
mesos_task.announce().primary_port().get())
- serverset = self.make_serverset(assigned_task)
+ client = self.make_zk_client()
+ path = self.make_zk_path(assigned_task)
+
+ initial_interval = mesos_task.health_check_config().initial_interval_secs().get()
+ interval = mesos_task.health_check_config().interval_secs().get()
+ consecutive_failures = mesos_task.health_check_config().max_consecutive_failures().get()
+ timeout_secs = initial_interval + (consecutive_failures * interval)
return AnnouncerChecker(
- serverset, endpoint, additional=additional, shard=assigned_task.instanceId, name=self.name)
+ client, path, timeout_secs, endpoint, additional=additional, shard=assigned_task.instanceId,
+ name=self.name)
class DefaultAnnouncerCheckerProvider(AnnouncerCheckerProvider):
@@ -90,15 +106,15 @@ class DefaultAnnouncerCheckerProvider(AnnouncerCheckerProvider):
self.__root = root
super(DefaultAnnouncerCheckerProvider, self).__init__()
- def make_serverset(self, assigned_task):
+ def make_zk_client(self):
+ return KazooClient(self.__ensemble, connection_retry=self.DEFAULT_RETRY_POLICY)
+
+ def make_zk_path(self, assigned_task):
role, environment, name = (
assigned_task.task.owner.role,
assigned_task.task.environment,
assigned_task.task.jobName)
- path = posixpath.join(self.__root, role, environment, name)
- client = KazooClient(self.__ensemble, connection_retry=self.DEFAULT_RETRY_POLICY)
- client.start()
- return ServerSet(client, path)
+ return posixpath.join(self.__root, role, environment, name)
class ServerSetJoinThread(ExceptionalThread):
@@ -205,20 +221,35 @@ class Announcer(Observable):
class AnnouncerChecker(StatusChecker):
DEFAULT_NAME = 'announcer'
- def __init__(self, serverset, endpoint, additional=None, shard=None, name=None):
- self.__announcer = Announcer(serverset, endpoint, additional=additional, shard=shard)
+ def __init__(self, client, path, timeout_secs, endpoint, additional=None, shard=None, name=None):
+ self.__client = client
+ self.__connect_event = client.start_async()
+ self.__timeout_secs = timeout_secs
+ self.__announcer = Announcer(ServerSet(client, path), endpoint, additional=additional,
+ shard=shard)
self.__name = name or self.DEFAULT_NAME
+ self.__status = None
+ self.start_event = threading.Event()
self.metrics.register(LambdaGauge('disconnected_time', self.__announcer.disconnected_time))
@property
def status(self):
- return None # always return healthy
+ return self.__status
def name(self):
return self.__name
+ def __start(self):
+ self.__connect_event.wait(timeout=self.__timeout_secs)
+ if not self.__connect_event.is_set():
+ self.__status = StatusResult("Creating Announcer Serverset timed out.", mesos_pb2.TASK_FAILED)
+ else:
+ self.__announcer.start()
+
+ self.start_event.set()
+
def start(self):
- self.__announcer.start()
+ defer(self.__start)
def stop(self):
defer(self.__announcer.stop)
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/53979387/src/test/python/apache/aurora/executor/common/test_announcer.py
----------------------------------------------------------------------
diff --git a/src/test/python/apache/aurora/executor/common/test_announcer.py b/src/test/python/apache/aurora/executor/common/test_announcer.py
index 4f6e200..5694335 100644
--- a/src/test/python/apache/aurora/executor/common/test_announcer.py
+++ b/src/test/python/apache/aurora/executor/common/test_announcer.py
@@ -22,6 +22,7 @@ from twitter.common.quantity import Amount, Time
from twitter.common.testing.clock import ThreadedClock
from twitter.common.zookeeper.serverset import Endpoint, ServerSet
+from apache.aurora.config.schema.base import HealthCheckConfig
from apache.aurora.executor.common.announcer import (
Announcer,
DefaultAnnouncerCheckerProvider,
@@ -235,6 +236,34 @@ def test_make_empty_endpoints():
@mock.patch('apache.aurora.executor.common.announcer.ServerSet')
@mock.patch('apache.aurora.executor.common.announcer.KazooClient')
+def test_announcer_provider_with_timeout(mock_client_provider, mock_serverset_provider):
+ mock_client = mock.MagicMock(spec=KazooClient)
+ mock_client_provider.return_value = mock_client
+ client_connect_event = threading.Event()
+ mock_client.start_async.return_value = client_connect_event
+
+ mock_serverset = mock.MagicMock(spec=ServerSet)
+ mock_serverset_provider.return_value = mock_serverset
+
+ dap = DefaultAnnouncerCheckerProvider('zookeeper.example.com', root='/aurora')
+ job = make_job('aurora', 'prod', 'proxy', 'primary', portmap={'http': 80, 'admin': 'primary'})
+
+ health_check_config = HealthCheckConfig(initial_interval_secs=0.1, interval_secs=0.1)
+ job = job(health_check_config=health_check_config)
+ assigned_task = make_assigned_task(job, assigned_ports={'primary': 12345})
+ checker = dap.from_assigned_task(assigned_task, None)
+
+ mock_client.start_async.assert_called_once_with()
+ mock_serverset_provider.assert_called_once_with(mock_client, '/aurora/aurora/prod/proxy')
+
+ checker.start()
+ checker.start_event.wait()
+
+ assert checker.status is not None
+
+
+@mock.patch('apache.aurora.executor.common.announcer.ServerSet')
+@mock.patch('apache.aurora.executor.common.announcer.KazooClient')
def test_default_announcer_provider(mock_client_provider, mock_serverset_provider):
mock_client = mock.MagicMock(spec=KazooClient)
mock_client_provider.return_value = mock_client
@@ -246,7 +275,7 @@ def test_default_announcer_provider(mock_client_provider, mock_serverset_provide
assigned_task = make_assigned_task(job, assigned_ports={'primary': 12345})
checker = dap.from_assigned_task(assigned_task, None)
- mock_client.start.assert_called_once_with()
+ mock_client.start_async.assert_called_once_with()
mock_serverset_provider.assert_called_once_with(mock_client, '/aurora/aurora/prod/proxy')
assert checker.name() == 'announcer'
assert checker.status is None