You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aurora.apache.org by wi...@apache.org on 2014/08/05 22:03:17 UTC
git commit: AURORA-587: Example ServerSet Announcer implementation
Repository: incubator-aurora
Updated Branches:
refs/heads/master 6adb35f09 -> fec17b6d7
AURORA-587: Example ServerSet Announcer implementation
This is an exemplar ServerSet Announcer implementation in
apache.aurora.executor.common, currently enabled via additional command line
flags to the aurora executor.
Testing Done:
./pants src/test/python/apache/aurora/executor/common:announcer -v
Bugs closed: AURORA-587
Reviewed at https://reviews.apache.org/r/23863/
Project: http://git-wip-us.apache.org/repos/asf/incubator-aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-aurora/commit/fec17b6d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-aurora/tree/fec17b6d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-aurora/diff/fec17b6d
Branch: refs/heads/master
Commit: fec17b6d72b2078666a6676d2840335bf8379204
Parents: 6adb35f
Author: Brian Wickman <wi...@apache.org>
Authored: Tue Aug 5 13:03:12 2014 -0700
Committer: Brian Wickman <wi...@apache.org>
Committed: Tue Aug 5 13:03:12 2014 -0700
----------------------------------------------------------------------
docs/configuration-reference.md | 37 +++
docs/user-guide.md | 18 +-
examples/vagrant/aurorabuild.sh | 6 +
examples/vagrant/upstart/aurora-scheduler.conf | 2 +-
.../python/apache/aurora/executor/bin/BUILD | 3 +-
.../executor/bin/thermos_executor_main.py | 40 ++-
.../python/apache/aurora/executor/common/BUILD | 15 ++
.../apache/aurora/executor/common/announcer.py | 224 ++++++++++++++++
.../aurora/client/api/test_scheduler_client.py | 4 +-
.../python/apache/aurora/executor/common/BUILD | 12 +
.../aurora/executor/common/test_announcer.py | 257 +++++++++++++++++++
.../aurora/e2e/http/http_example_updated.aurora | 1 +
.../sh/org/apache/aurora/e2e/test_common.sh | 23 ++
.../sh/org/apache/aurora/e2e/test_end_to_end.sh | 3 +
.../org/apache/aurora/e2e/test_end_to_end_v2.sh | 3 +
.../org/apache/aurora/e2e/validate_serverset.py | 46 ++++
16 files changed, 687 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/fec17b6d/docs/configuration-reference.md
----------------------------------------------------------------------
diff --git a/docs/configuration-reference.md b/docs/configuration-reference.md
index 34f2359..5166d45 100644
--- a/docs/configuration-reference.md
+++ b/docs/configuration-reference.md
@@ -28,6 +28,7 @@ Aurora + Thermos Configuration Reference
- [Services](#services)
- [UpdateConfig Objects](#updateconfig-objects)
- [HealthCheckConfig Objects](#healthcheckconfig-objects)
+ - [Announcer Objects](#announcer-objects)
- [Specifying Scheduling Constraints](#specifying-scheduling-constraints)
- [Template Namespaces](#template-namespaces)
- [mesos Namespace](#mesos-namespace)
@@ -356,6 +357,42 @@ Parameters for controlling a task's health checks via HTTP.
| ```timeout_secs``` | Integer | HTTP request timeout. (Default: 1)
| ```max_consecutive_failures``` | Integer | Maximum number of consecutive failures that tolerated before considering a task unhealthy (Default: 0)
+### Announcer Objects
+
+If the `announce` field in the Job configuration is set, each task will be
+registered in the ServerSet `/aurora/role/environment/jobname` in the
+zookeeper ensemble configured by the executor. If no Announcer object is specified,
+no announcement will take place. For more information about ServerSets, see the [User Guide](user-guide.md).
+
+| object | type | description
+| ------- | :-------: | --------
+| ```primary_port``` | String | Which named port to register as the primary endpoint in the ServerSet (Default: `http`)
+| ```portmap``` | dict | A mapping of additional endpoints to announced in the ServerSet (Default: `{ 'aurora': '{{primary_port}}' }`)
+
+### Port aliasing with the Announcer `portmap`
+
+The primary endpoint registered in the ServerSet is the one allocated to the port
+specified by the `primary_port` in the `Announcer` object, by default
+the `http` port. This port can be referenced from anywhere within a configuration
+as `{{thermos.ports[http]}}`.
+
+Without the port map, each named port would be allocated a unique port number.
+The `portmap` allows two different named ports to be aliased together. The default
+`portmap` aliases the `aurora` port (i.e. `{{thermos.ports[aurora]}}`) to
+the `http` port. Even though the two ports can be referenced independently,
+only one port is allocated by Mesos. Any port referenced in a `Process` object
+but which is not in the portmap will be allocated dynamically by Mesos and announced as well.
+
+It is possible to use the portmap to alias names to static port numbers, e.g.
+`{'http': 80, 'https': 443, 'aurora': 'http'}`. In this case, referencing
+`{{thermos.ports[aurora]}}` would look up `{{thermos.ports[http]}}` then
+find a static port 80. No port would be requested of or allocated by Mesos.
+
+Static ports should be used cautiously as Aurora does nothing to prevent two
+tasks with the same static port allocations from being co-scheduled.
+External constraints such as slave attributes should be used to enforce such
+guarantees should they be needed.
+
Specifying Scheduling Constraints
=================================
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/fec17b6d/docs/user-guide.md
----------------------------------------------------------------------
diff --git a/docs/user-guide.md b/docs/user-guide.md
index 6c703a3..e12ee89 100644
--- a/docs/user-guide.md
+++ b/docs/user-guide.md
@@ -11,6 +11,7 @@ Aurora User Guide
- [Giving Priority to Production Tasks: PREEMPTING](#user-content-giving-priority-to-production-tasks-preempting)
- [Natural Termination: FINISHED, FAILED](#user-content-natural-termination-finished-failed)
- [Forceful Termination: KILLING, RESTARTING](#user-content-forceful-termination-killing-restarting)
+- [Service Discovery](#user-content-service-discovery)
- [Configuration](#user-content-configuration)
- [Creating Jobs](#user-content-creating-jobs)
- [Interacting With Jobs](#user-content-interacting-with-jobs)
@@ -276,12 +277,27 @@ Configuration
You define and configure your Jobs (and their Tasks and Processes) in
Aurora configuration files. Their filenames end with the `.aurora`
-suffix, and you write them in Python making use of the Pystashio
+suffix, and you write them in Python making use of the Pystachio
templating language, along
with specific Aurora, Mesos, and Thermos commands and methods. See the
[Configuration Guide and Reference](configuration-reference.md) and
[Configuration Tutorial](configuration-tutorial.md).
+Service Discovery
+-----------------
+
+It is possible for the Aurora executor to announce tasks into ServerSets for
+the purpose of service discovery. ServerSets use the Zookeeper [group membership pattern](http://zookeeper.apache.org/doc/trunk/recipes.html#sc_outOfTheBox)
+of which there are several reference implementations:
+
+ - [C++](https://github.com/apache/mesos/blob/master/src/zookeeper/group.cpp)
+ - [Java](https://github.com/twitter/commons/blob/master/src/java/com/twitter/common/zookeeper/ServerSetImpl.java#L221)
+ - [Python](https://github.com/twitter/commons/blob/master/src/python/twitter/common/zookeeper/serverset/serverset.py#L51)
+
+These can also be used natively in Finagle using the [ZookeeperServerSetCluster](https://github.com/twitter/finagle/blob/master/finagle-serversets/src/main/scala/com/twitter/finagle/zookeeper/ZookeeperServerSetCluster.scala).
+
+For more information about how to configure announcing, see the [Configuration Reference](configuration-reference.md).
+
Creating Jobs
-------------
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/fec17b6d/examples/vagrant/aurorabuild.sh
----------------------------------------------------------------------
diff --git a/examples/vagrant/aurorabuild.sh b/examples/vagrant/aurorabuild.sh
index 9f604d3..8b39a16 100755
--- a/examples/vagrant/aurorabuild.sh
+++ b/examples/vagrant/aurorabuild.sh
@@ -75,6 +75,12 @@ with contextlib.closing(zipfile.ZipFile('dist/thermos_executor.pex', 'a')) as zf
zf.writestr('apache/aurora/executor/resources/__init__.py', '')
zf.write('dist/thermos_runner.pex', 'apache/aurora/executor/resources/thermos_runner.pex')
EOF
+
+ cat <<EOF > $DIST_DIR/thermos_executor.sh
+#!/usr/bin/env bash
+exec /home/vagrant/aurora/dist/thermos_executor.pex --announcer-enable --announcer-ensemble localhost:2181
+EOF
+ chmod +x $DIST_DIR/thermos_executor.sh
}
function build_observer {
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/fec17b6d/examples/vagrant/upstart/aurora-scheduler.conf
----------------------------------------------------------------------
diff --git a/examples/vagrant/upstart/aurora-scheduler.conf b/examples/vagrant/upstart/aurora-scheduler.conf
index 05b2f26..7e2b3f0 100644
--- a/examples/vagrant/upstart/aurora-scheduler.conf
+++ b/examples/vagrant/upstart/aurora-scheduler.conf
@@ -35,7 +35,7 @@ exec $DIST_DIR/install/aurora-scheduler/bin/aurora-scheduler \
-native_log_zk_group_path=/aurora/replicated-log \
-native_log_file_path=$AURORA_HOME/scheduler/db \
-backup_dir=$AURORA_HOME/scheduler/backups \
- -thermos_executor_path=$DIST_DIR/thermos_executor.pex \
+ -thermos_executor_path=$DIST_DIR/thermos_executor.sh \
-gc_executor_path=$DIST_DIR/gc_executor.pex \
-vlog=INFO \
-logtostderr
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/fec17b6d/src/main/python/apache/aurora/executor/bin/BUILD
----------------------------------------------------------------------
diff --git a/src/main/python/apache/aurora/executor/bin/BUILD b/src/main/python/apache/aurora/executor/bin/BUILD
index 6b3f620..9351715 100644
--- a/src/main/python/apache/aurora/executor/bin/BUILD
+++ b/src/main/python/apache/aurora/executor/bin/BUILD
@@ -16,12 +16,12 @@ python_binary(
name = 'thermos_executor',
source = 'thermos_executor_main.py',
entry_point = 'apache.aurora.executor.bin.thermos_executor_main:proxy_main',
- ignore_errors = True,
always_write_cache = True,
dependencies = [
pants('3rdparty/python:twitter.common.app'),
pants('3rdparty/python:twitter.common.log'),
pants('3rdparty/python:twitter.common.metrics'),
+ pants('src/main/python/apache/aurora/executor/common:announcer'),
pants('src/main/python/apache/aurora/executor/common:executor_timeout'),
pants('src/main/python/apache/aurora/executor/common:health_checker'),
pants('src/main/python/apache/aurora/executor/common:sandbox'),
@@ -36,7 +36,6 @@ python_binary(
name = 'gc_executor',
source = 'gc_executor_main.py',
entry_point = 'apache.aurora.executor.bin.gc_executor_main:proxy_main',
- ignore_errors = True,
always_write_cache = True,
dependencies = [
pants('3rdparty/python:twitter.common.app'),
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/fec17b6d/src/main/python/apache/aurora/executor/bin/thermos_executor_main.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/aurora/executor/bin/thermos_executor_main.py b/src/main/python/apache/aurora/executor/bin/thermos_executor_main.py
index 198f2f6..aacc19a 100644
--- a/src/main/python/apache/aurora/executor/bin/thermos_executor_main.py
+++ b/src/main/python/apache/aurora/executor/bin/thermos_executor_main.py
@@ -26,6 +26,7 @@ from twitter.common import app, log
from twitter.common.log.options import LogOptions
from apache.aurora.executor.aurora_executor import AuroraExecutor
+from apache.aurora.executor.common.announcer import DefaultAnnouncerCheckerProvider
from apache.aurora.executor.common.executor_timeout import ExecutorTimeout
from apache.aurora.executor.common.health_checker import HealthCheckerProvider
from apache.aurora.executor.thermos_task_runner import DefaultThermosTaskRunnerProvider
@@ -36,6 +37,32 @@ LogOptions.set_disk_log_level('DEBUG')
LogOptions.set_log_dir('.')
+app.add_option(
+ '--announcer-enable',
+ dest='announcer_enable',
+ action='store_true',
+ default=False,
+ help='Enable the ServerSet announcer for this executor. Jobs must still activate using '
+ 'the Announcer configuration.')
+
+
+app.add_option(
+ '--announcer-ensemble',
+ dest='announcer_ensemble',
+ type=str,
+ default=None,
+ help='The ensemble to which the Announcer should register ServerSets.')
+
+
+app.add_option(
+ '--announcer-serverset-path',
+ dest='announcer_serverset_path',
+ type=str,
+ default='/aurora',
+ help='The root of the tree into which ServerSets should be announced. The paths will '
+ 'be of the form $ROOT/$ROLE/$ENVIRONMENT/$JOBNAME.')
+
+
# TODO(wickman) Consider just having the OSS version require pip installed
# thermos_runner binaries on every machine and instead of embedding the pex
# as a resource, shell out to one on the PATH.
@@ -52,16 +79,25 @@ def dump_runner_pex():
def proxy_main():
- def main():
+ def main(args, options):
thermos_runner_provider = DefaultThermosTaskRunnerProvider(
dump_runner_pex(),
artifact_dir=os.path.realpath('.'),
)
+ # status providers:
+ status_providers = [HealthCheckerProvider()]
+
+ if options.announcer_enable:
+ if options.announcer_ensemble is None:
+ app.error('Must specify --announcer-ensemble if the announcer is enabled.')
+ status_providers.append(DefaultAnnouncerCheckerProvider(
+ options.announcer_ensemble, options.announcer_serverset_path))
+
# Create executor stub
thermos_executor = AuroraExecutor(
runner_provider=thermos_runner_provider,
- status_providers=(HealthCheckerProvider(),),
+ status_providers=status_providers,
)
# Create driver stub
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/fec17b6d/src/main/python/apache/aurora/executor/common/BUILD
----------------------------------------------------------------------
diff --git a/src/main/python/apache/aurora/executor/common/BUILD b/src/main/python/apache/aurora/executor/common/BUILD
index d0ff6fb..0a4d35a 100644
--- a/src/main/python/apache/aurora/executor/common/BUILD
+++ b/src/main/python/apache/aurora/executor/common/BUILD
@@ -44,6 +44,21 @@ python_library(
)
python_library(
+ name = 'announcer',
+ sources = ['announcer.py'],
+ dependencies = [
+ pants('3rdparty/python:twitter.common.concurrent'),
+ pants('3rdparty/python:twitter.common.exceptions'),
+ pants('3rdparty/python:twitter.common.log'),
+ pants('3rdparty/python:twitter.common.metrics'),
+ pants('3rdparty/python:twitter.common.quantity'),
+ pants('3rdparty/python:twitter.common.zookeeper'),
+ pants(':status_checker'),
+ pants(':task_info'),
+ ]
+)
+
+python_library(
name = 'executor_timeout',
sources = ['executor_timeout.py'],
dependencies = [
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/fec17b6d/src/main/python/apache/aurora/executor/common/announcer.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/aurora/executor/common/announcer.py b/src/main/python/apache/aurora/executor/common/announcer.py
new file mode 100644
index 0000000..c466da8
--- /dev/null
+++ b/src/main/python/apache/aurora/executor/common/announcer.py
@@ -0,0 +1,224 @@
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import posixpath
+import socket
+import threading
+import time
+from abc import abstractmethod
+
+from kazoo.client import KazooClient
+from kazoo.retry import KazooRetry
+from twitter.common import log
+from twitter.common.concurrent.deferred import defer
+from twitter.common.exceptions import ExceptionalThread
+from twitter.common.metrics import LambdaGauge, Observable
+from twitter.common.quantity import Amount, Time
+from twitter.common.zookeeper.serverset import Endpoint, ServerSet
+
+from apache.aurora.executor.common.status_checker import StatusChecker, StatusCheckerProvider
+from apache.aurora.executor.common.task_info import (
+ mesos_task_instance_from_assigned_task,
+ resolve_ports
+)
+
+
+def make_endpoints(hostname, portmap, primary_port):
+ """
+ Generate primary, additional endpoints from a portmap and primary_port.
+ primary_port must be a name in the portmap dictionary.
+ """
+ # Do int check as stop-gap measure against incompatible downstream clients.
+ additional_endpoints = dict(
+ (name, Endpoint(hostname, port)) for (name, port) in portmap.items()
+ if isinstance(port, int))
+
+ # It's possible for the primary port to not have been allocated if this task
+ # is using autoregistration, so register with a port of 0.
+ return Endpoint(hostname, portmap.get(primary_port, 0)), additional_endpoints
+
+
+class AnnouncerCheckerProvider(StatusCheckerProvider):
+ def __init__(self, name=None):
+ self.name = name
+ super(AnnouncerCheckerProvider, self).__init__()
+
+ @abstractmethod
+ def make_serverset(self, assigned_task):
+ """Given an assigned task, return the serverset into which we should announce the task."""
+
+ def from_assigned_task(self, assigned_task, _):
+ mesos_task = mesos_task_instance_from_assigned_task(assigned_task)
+
+ if not mesos_task.has_announce():
+ return None
+
+ portmap = resolve_ports(mesos_task, assigned_task.assignedPorts)
+
+ endpoint, additional = make_endpoints(
+ socket.gethostname(),
+ portmap,
+ mesos_task.announce().primary_port().get())
+
+ serverset = self.make_serverset(assigned_task)
+
+ return AnnouncerChecker(
+ serverset, endpoint, additional=additional, shard=assigned_task.instanceId, name=self.name)
+
+
+class DefaultAnnouncerCheckerProvider(AnnouncerCheckerProvider):
+ DEFAULT_RETRY_MAX_DELAY = Amount(5, Time.MINUTES)
+ DEFAULT_RETRY_POLICY = KazooRetry(
+ max_tries=None,
+ ignore_expire=True,
+ max_delay=DEFAULT_RETRY_MAX_DELAY.as_(Time.SECONDS),
+ )
+
+ def __init__(self, ensemble, root='/aurora'):
+ self.__ensemble = ensemble
+ self.__root = root
+ super(DefaultAnnouncerCheckerProvider, self).__init__()
+
+ def make_serverset(self, assigned_task):
+ role, environment, name = (
+ assigned_task.task.owner.role,
+ assigned_task.task.environment,
+ assigned_task.task.jobName)
+ path = posixpath.join(self.__root, role, environment, name)
+ client = KazooClient(self.__ensemble, connection_retry=self.DEFAULT_RETRY_POLICY)
+ client.start()
+ return ServerSet(client, path)
+
+
+class ServerSetJoinThread(ExceptionalThread):
+ """Background thread to reconnect to Serverset on session expiration."""
+
+ LOOP_WAIT = Amount(1, Time.SECONDS)
+
+ def __init__(self, event, joiner, loop_wait=LOOP_WAIT):
+ self._event = event
+ self._joiner = joiner
+ self._stopped = threading.Event()
+ self._loop_wait = loop_wait
+ super(ServerSetJoinThread, self).__init__()
+ self.daemon = True
+
+ def run(self):
+ while True:
+ if self._stopped.is_set():
+ break
+ self._event.wait(timeout=self._loop_wait.as_(Time.SECONDS))
+ if not self._event.is_set():
+ continue
+ log.debug('Join event triggered, joining serverset.')
+ self._event.clear()
+ self._joiner()
+
+ def stop(self):
+ self._stopped.set()
+
+
+class Announcer(Observable):
+ class Error(Exception): pass
+
+ EXCEPTION_WAIT = Amount(15, Time.SECONDS)
+
+ def __init__(self,
+ serverset,
+ endpoint,
+ additional=None,
+ shard=None,
+ clock=time,
+ exception_wait=None):
+ self._membership = None
+ self._membership_termination = clock.time()
+ self._endpoint = endpoint
+ self._additional = additional or {}
+ self._shard = shard
+ self._serverset = serverset
+ self._rejoin_event = threading.Event()
+ self._clock = clock
+ self._thread = None
+ self._exception_wait = exception_wait or self.EXCEPTION_WAIT
+
+ def disconnected_time(self):
+ # Lockless membership length check
+ membership_termination = self._membership_termination
+ if membership_termination is None:
+ return 0
+ return self._clock.time() - membership_termination
+
+ def _join_inner(self):
+ return self._serverset.join(
+ endpoint=self._endpoint,
+ additional=self._additional,
+ shard=self._shard,
+ expire_callback=self.on_expiration)
+
+ def _join(self):
+ if self._membership is not None:
+ raise self.Error("join called, but already have membership!")
+ while True:
+ try:
+ self._membership = self._join_inner()
+ self._membership_termination = None
+ except Exception as e:
+ log.error('Failed to join ServerSet: %s' % e)
+ self._clock.sleep(self._exception_wait.as_(Time.SECONDS))
+ else:
+ break
+
+ def start(self):
+ self._thread = ServerSetJoinThread(self._rejoin_event, self._join)
+ self._thread.start()
+ self.rejoin()
+
+ def rejoin(self):
+ self._rejoin_event.set()
+
+ def stop(self):
+ thread, self._thread = self._thread, None
+ thread.stop()
+ if self._membership:
+ self._serverset.cancel(self._membership)
+
+ def on_expiration(self):
+ self._membership = None
+ if not self._thread:
+ return
+ self._membership_termination = self._clock.time()
+ log.info('Zookeeper session expired.')
+ self.rejoin()
+
+
+class AnnouncerChecker(StatusChecker):
+ DEFAULT_NAME = 'announcer'
+
+ def __init__(self, serverset, endpoint, additional=None, shard=None, name=None):
+ self.__announcer = Announcer(serverset, endpoint, additional=additional, shard=shard)
+ self.__name = name or self.DEFAULT_NAME
+ self.metrics.register(LambdaGauge('disconnected_time', self.__announcer.disconnected_time))
+
+ @property
+ def status(self):
+ return None # always return healthy
+
+ def name(self):
+ return self.__name
+
+ def start(self):
+ self.__announcer.start()
+
+ def stop(self):
+ defer(self.__announcer.stop)
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/fec17b6d/src/test/python/apache/aurora/client/api/test_scheduler_client.py
----------------------------------------------------------------------
diff --git a/src/test/python/apache/aurora/client/api/test_scheduler_client.py b/src/test/python/apache/aurora/client/api/test_scheduler_client.py
index ab74db3..8f5fe56 100644
--- a/src/test/python/apache/aurora/client/api/test_scheduler_client.py
+++ b/src/test/python/apache/aurora/client/api/test_scheduler_client.py
@@ -46,7 +46,9 @@ from gen.apache.aurora.api.ttypes import (
RewriteConfigsRequest,
ScheduleStatus,
SessionKey,
- TaskQuery
+ TaskQuery,
+ UpdateQuery,
+ UpdateRequest
)
ROLE = 'foorole'
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/fec17b6d/src/test/python/apache/aurora/executor/common/BUILD
----------------------------------------------------------------------
diff --git a/src/test/python/apache/aurora/executor/common/BUILD b/src/test/python/apache/aurora/executor/common/BUILD
index 8b54e91..6316c7f 100644
--- a/src/test/python/apache/aurora/executor/common/BUILD
+++ b/src/test/python/apache/aurora/executor/common/BUILD
@@ -15,6 +15,7 @@
python_test_suite(
name = 'all',
dependencies = [
+ pants(':announcer'),
pants(':directory_sandbox'),
pants(':health_checker'),
pants(':status_checker'),
@@ -23,6 +24,17 @@ python_test_suite(
)
python_tests(
+ name = 'announcer',
+ sources = ['test_announcer.py'],
+ dependencies = [
+ pants('3rdparty/python:mock'),
+ pants('3rdparty/python:twitter.common.quantity'),
+ pants('3rdparty/python:twitter.common.testing'),
+ pants('src/main/python/apache/aurora/executor/common:announcer'),
+ ],
+)
+
+python_tests(
name = 'directory_sandbox',
sources = ['test_directory_sandbox.py'],
dependencies = [
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/fec17b6d/src/test/python/apache/aurora/executor/common/test_announcer.py
----------------------------------------------------------------------
diff --git a/src/test/python/apache/aurora/executor/common/test_announcer.py b/src/test/python/apache/aurora/executor/common/test_announcer.py
new file mode 100644
index 0000000..e5c4ce4
--- /dev/null
+++ b/src/test/python/apache/aurora/executor/common/test_announcer.py
@@ -0,0 +1,257 @@
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import threading
+
+import mock
+from kazoo.client import KazooClient
+from kazoo.exceptions import KazooException
+from twitter.common.quantity import Amount, Time
+from twitter.common.testing.clock import ThreadedClock
+from twitter.common.zookeeper.serverset import Endpoint, ServerSet
+
+from apache.aurora.executor.common.announcer import (
+ Announcer,
+ DefaultAnnouncerCheckerProvider,
+ make_endpoints,
+ ServerSetJoinThread
+)
+
+
+def test_serverset_join_thread():
+ join = threading.Event()
+ joined = threading.Event()
+
+ def join_function():
+ joined.set()
+
+ ssjt = ServerSetJoinThread(join, join_function, loop_wait=Amount(1, Time.MILLISECONDS))
+ ssjt.start()
+ ssjt.stop()
+ ssjt.join(timeout=1.0)
+ assert not ssjt.is_alive()
+ assert not joined.is_set()
+
+ ssjt = ServerSetJoinThread(join, join_function, loop_wait=Amount(1, Time.MILLISECONDS))
+ ssjt.start()
+
+ def test_loop():
+ join.set()
+ joined.wait(timeout=1.0)
+ assert not join.is_set() # join is cleared
+ assert joined.is_set() # joined has been called
+ joined.clear()
+
+ # validate that the loop is working
+ test_loop()
+ test_loop()
+
+ ssjt.stop()
+ ssjt.join(timeout=1.0)
+ assert not ssjt.is_alive()
+ assert not joined.is_set()
+
+
+def test_announcer_under_normal_circumstances():
+ joined = threading.Event()
+
+ def joined_side_effect(*args, **kw):
+ joined.set()
+ return 'membership foo'
+
+ mock_serverset = mock.MagicMock(spec=ServerSet)
+ mock_serverset.join = mock.MagicMock()
+ mock_serverset.join.side_effect = joined_side_effect
+ mock_serverset.cancel = mock.MagicMock()
+
+ endpoint = Endpoint('localhost', 12345)
+ clock = ThreadedClock(31337.0)
+
+ announcer = Announcer(mock_serverset, endpoint, clock=clock)
+ assert announcer.disconnected_time() == 0.0
+ clock.tick(1.0)
+ assert announcer.disconnected_time() == 1.0, (
+ 'Announcer should advance disconnection time when not yet initially connected.')
+
+ announcer.start()
+
+ try:
+ joined.wait(timeout=1.0)
+ assert joined.is_set()
+
+ assert announcer.disconnected_time() == 0.0
+ clock.tick(1.0)
+ assert announcer.disconnected_time() == 0.0, (
+ 'Announcer should not advance disconnection time when connected.')
+ assert announcer._membership == 'membership foo'
+
+ finally:
+ announcer.stop()
+
+ mock_serverset.cancel.assert_called_with('membership foo')
+
+ assert announcer.disconnected_time() == 0.0
+ clock.tick(1.0)
+ assert announcer.disconnected_time() == 0.0, (
+ 'Announcer should not advance disconnection time when stopped.')
+
+
+def test_announcer_on_expiration():
+ joined = threading.Event()
+ operations = []
+
+ def joined_side_effect(*args, **kw):
+ # 'global' does not work within python nested functions, so we cannot use a
+ # counter here, so instead we do append/len (see PEP-3104)
+ operations.append(1)
+ if len(operations) == 1 or len(operations) == 3:
+ joined.set()
+ return 'membership %d' % len(operations)
+ else:
+ raise KazooException('Failed to reconnect')
+
+ mock_serverset = mock.MagicMock(spec=ServerSet)
+ mock_serverset.join = mock.MagicMock()
+ mock_serverset.join.side_effect = joined_side_effect
+ mock_serverset.cancel = mock.MagicMock()
+
+ endpoint = Endpoint('localhost', 12345)
+ clock = ThreadedClock(31337.0)
+
+ announcer = Announcer(
+ mock_serverset, endpoint, clock=clock, exception_wait=Amount(2, Time.SECONDS))
+ announcer.start()
+
+ try:
+ joined.wait(timeout=1.0)
+ assert joined.is_set()
+ assert announcer._membership == 'membership 1'
+ assert announcer.disconnected_time() == 0.0
+ clock.tick(1.0)
+ assert announcer.disconnected_time() == 0.0
+ announcer.on_expiration() # expect exception
+ clock.tick(1.0)
+ assert announcer.disconnected_time() == 1.0, (
+ 'Announcer should be disconnected on expiration.')
+ clock.tick(1.0)
+ assert announcer.disconnected_time() == 0.0, (
+ 'Announcer should not advance disconnection time when connected.')
+ assert announcer._membership == 'membership 3'
+
+ finally:
+ announcer.stop()
+
+
+def test_announcer_under_abnormal_circumstances():
+ mock_serverset = mock.MagicMock(spec=ServerSet)
+ mock_serverset.join = mock.MagicMock()
+ mock_serverset.join.side_effect = [
+ KazooException('Whoops the ensemble is down!'),
+ 'member0001',
+ ]
+ mock_serverset.cancel = mock.MagicMock()
+
+ endpoint = Endpoint('localhost', 12345)
+ clock = ThreadedClock(31337.0)
+
+ announcer = Announcer(
+ mock_serverset, endpoint, clock=clock, exception_wait=Amount(2, Time.SECONDS))
+ announcer.start()
+
+ try:
+ clock.tick(1.0)
+ assert announcer.disconnected_time() == 1.0
+ clock.tick(2.0)
+ assert announcer.disconnected_time() == 0.0, (
+ 'Announcer should recover after an exception thrown internally.')
+ assert announcer._membership == 'member0001'
+ finally:
+ announcer.stop()
+
+
+def make_assigned_task(thermos_config, assigned_ports=None):
+ from gen.apache.aurora.api.constants import AURORA_EXECUTOR_NAME
+ from gen.apache.aurora.api.ttypes import AssignedTask, ExecutorConfig, Identity, TaskConfig
+
+ assigned_ports = assigned_ports or {}
+ executor_config = ExecutorConfig(name=AURORA_EXECUTOR_NAME, data=thermos_config.json_dumps())
+ task_config = TaskConfig(
+ owner=Identity(role=thermos_config.role().get(), user=thermos_config.role().get()),
+ environment=thermos_config.environment().get(),
+ jobName=thermos_config.name().get(),
+ executorConfig=executor_config)
+
+ return AssignedTask(instanceId=12345, task=task_config, assignedPorts=assigned_ports)
+
+
+def make_job(role, environment, name, primary_port, portmap):
+ from apache.aurora.config.schema.base import (
+ Announcer,
+ Job,
+ Process,
+ Resources,
+ Task,
+ )
+ task = Task(
+ name='ignore2',
+ processes=[Process(name='ignore3', cmdline='ignore4')],
+ resources=Resources(cpu=1, ram=1, disk=1))
+ job = Job(
+ role=role,
+ environment=environment,
+ name=name,
+ cluster='ignore1',
+ task=task,
+ announce=Announcer(primary_port=primary_port, portmap=portmap))
+ return job
+
+
+def test_make_empty_endpoints():
+ hostname = 'aurora.example.com'
+ portmap = {}
+ primary_port = 'http'
+
+ # test no bound 'http' port
+ primary, additional = make_endpoints(hostname, portmap, primary_port)
+ assert primary == Endpoint(hostname, 0)
+ assert additional == {}
+
+
+@mock.patch('apache.aurora.executor.common.announcer.ServerSet')
+@mock.patch('apache.aurora.executor.common.announcer.KazooClient')
+def test_default_announcer_provider(mock_client_provider, mock_serverset_provider):
+ mock_client = mock.MagicMock(spec=KazooClient)
+ mock_client_provider.return_value = mock_client
+ mock_serverset = mock.MagicMock(spec=ServerSet)
+ mock_serverset_provider.return_value = mock_serverset
+
+ dap = DefaultAnnouncerCheckerProvider('zookeeper.example.com', root='/aurora')
+ job = make_job('aurora', 'prod', 'proxy', 'primary', portmap={'http': 80, 'admin': 'primary'})
+ assigned_task = make_assigned_task(job, assigned_ports={'primary': 12345})
+ checker = dap.from_assigned_task(assigned_task, None)
+
+ mock_client.start.assert_called_once_with()
+ mock_serverset_provider.assert_called_once_with(mock_client, '/aurora/aurora/prod/proxy')
+ assert checker.name() == 'announcer'
+ assert checker.status is None
+
+
+def test_default_announcer_provider_without_announce():
+ from pystachio import Empty
+
+ job = make_job('aurora', 'prod', 'proxy', 'primary', portmap={})
+ job = job(announce=Empty)
+ assigned_task = make_assigned_task(job)
+
+ assert DefaultAnnouncerCheckerProvider('foo.bar').from_assigned_task(assigned_task, None) is None
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/fec17b6d/src/test/sh/org/apache/aurora/e2e/http/http_example_updated.aurora
----------------------------------------------------------------------
diff --git a/src/test/sh/org/apache/aurora/e2e/http/http_example_updated.aurora b/src/test/sh/org/apache/aurora/e2e/http/http_example_updated.aurora
index 9dfc651..67d3dbb 100644
--- a/src/test/sh/org/apache/aurora/e2e/http/http_example_updated.aurora
+++ b/src/test/sh/org/apache/aurora/e2e/http/http_example_updated.aurora
@@ -46,6 +46,7 @@ job = Job(
constraints = {
'host': 'limit:4',
},
+ announce = Announcer(),
)
jobs = [ job ]
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/fec17b6d/src/test/sh/org/apache/aurora/e2e/test_common.sh
----------------------------------------------------------------------
diff --git a/src/test/sh/org/apache/aurora/e2e/test_common.sh b/src/test/sh/org/apache/aurora/e2e/test_common.sh
index def823b..43d2516 100644
--- a/src/test/sh/org/apache/aurora/e2e/test_common.sh
+++ b/src/test/sh/org/apache/aurora/e2e/test_common.sh
@@ -41,3 +41,26 @@ collect_result() {
exit $RETCODE
) >&4 # Send to the stderr we had at startup.
}
+
+validate_serverset() {
+ # default python return code
+ local retcode=0
+
+ # launch aurora client in interpreter mode to get access to the kazoo client
+ vagrant ssh -c \
+ "env SERVERSET="$1" PEX_INTERPRETER=1 aurora /vagrant/src/test/sh/org/apache/aurora/e2e/validate_serverset.py" \
+ || retcode=$?
+
+ if [[ $retcode = 1 ]]; then
+ echo "Validated announced job."
+ return 0
+ elif [[ $retcode = 2 ]]; then
+ echo "Job failed to announce in serverset."
+ elif [[ $retcode = 3 ]]; then
+ echo "Job failed to re-announce when expired."
+ else
+ echo "Unknown failure in test script."
+ fi
+
+ return 1
+}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/fec17b6d/src/test/sh/org/apache/aurora/e2e/test_end_to_end.sh
----------------------------------------------------------------------
diff --git a/src/test/sh/org/apache/aurora/e2e/test_end_to_end.sh b/src/test/sh/org/apache/aurora/e2e/test_end_to_end.sh
index 3529c18..fa5568e 100755
--- a/src/test/sh/org/apache/aurora/e2e/test_end_to_end.sh
+++ b/src/test/sh/org/apache/aurora/e2e/test_end_to_end.sh
@@ -40,6 +40,9 @@ test_http_example() {
echo '== Updating test job'
vagrant ssh -c "aurora update $jobkey $_updated_config"
+ echo '== Validating announce'
+ validate_serverset "/aurora/$_role/$_env/$_job"
+
#echo "== Probing job via 'aurora run'"
# TODO(mchucarroll): Get "run" working: the vagrant configuration currently doesn't set up ssh
# to allow automatic logins to the slaves. "aurora run" therefore tries to prompt the user for
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/fec17b6d/src/test/sh/org/apache/aurora/e2e/test_end_to_end_v2.sh
----------------------------------------------------------------------
diff --git a/src/test/sh/org/apache/aurora/e2e/test_end_to_end_v2.sh b/src/test/sh/org/apache/aurora/e2e/test_end_to_end_v2.sh
index a3f530f..ea5ae8d 100755
--- a/src/test/sh/org/apache/aurora/e2e/test_end_to_end_v2.sh
+++ b/src/test/sh/org/apache/aurora/e2e/test_end_to_end_v2.sh
@@ -45,6 +45,9 @@ test_http_example() {
echo '== Updating test job'
vagrant ssh -c "aurora2 job update $jobkey $_updated_config"
+ echo '== Validating announce'
+ validate_serverset "/aurora/$_role/$_env/$_job"
+
# TODO(mchucarroll): Get "run" working: the vagrant configuration currently doesn't set up ssh
# to allow automatic logins to the slaves. "aurora run" therefore tries to prompt the user for
# a password, finds that it's not running in a TTY, and aborts.
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/fec17b6d/src/test/sh/org/apache/aurora/e2e/validate_serverset.py
----------------------------------------------------------------------
diff --git a/src/test/sh/org/apache/aurora/e2e/validate_serverset.py b/src/test/sh/org/apache/aurora/e2e/validate_serverset.py
new file mode 100644
index 0000000..66fa965
--- /dev/null
+++ b/src/test/sh/org/apache/aurora/e2e/validate_serverset.py
@@ -0,0 +1,46 @@
+import os
+import posixpath
+import sys
+import time
+
+from kazoo.client import KazooClient
+from kazoo.exceptions import NoNodeError
+
+OK = 1
+DID_NOT_REGISTER = 2
+DID_NOT_RECOVER_FROM_EXPIRY = 3
+
+
+serverset = os.getenv('SERVERSET')
+client = KazooClient('localhost:2181')
+client.start()
+
+
+def wait_until_znodes(count, timeout=30):
+ now = time.time()
+ timeout += now
+ while now < timeout:
+ try:
+ children = client.get_children(serverset)
+ except NoNodeError:
+ children = []
+ print('Announced members: %s' % children)
+ if len(children) == count:
+ return [posixpath.join(serverset, child) for child in children]
+ time.sleep(1)
+ now += 1
+ return []
+
+
+# job is created with 4 znodes.
+znodes = wait_until_znodes(4, timeout=10)
+if not znodes:
+ sys.exit(DID_NOT_REGISTER)
+
+client.delete(znodes[0])
+
+znodes = wait_until_znodes(4, timeout=10)
+if not znodes:
+ sys.exit(DID_NOT_RECOVER_FROM_EXPIRY)
+
+sys.exit(OK)