You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aurora.apache.org by wi...@apache.org on 2014/08/05 22:03:17 UTC

git commit: AURORA-587: Example ServerSet Announcer implementation

Repository: incubator-aurora
Updated Branches:
  refs/heads/master 6adb35f09 -> fec17b6d7


AURORA-587: Example ServerSet Announcer implementation

This is an exemplar ServerSet Announcer implementation in
apache.aurora.executor.common, currently enabled via additional command line
flags to the aurora executor.

Testing Done:
./pants src/test/python/apache/aurora/executor/common:announcer -v

Bugs closed: AURORA-587

Reviewed at https://reviews.apache.org/r/23863/


Project: http://git-wip-us.apache.org/repos/asf/incubator-aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-aurora/commit/fec17b6d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-aurora/tree/fec17b6d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-aurora/diff/fec17b6d

Branch: refs/heads/master
Commit: fec17b6d72b2078666a6676d2840335bf8379204
Parents: 6adb35f
Author: Brian Wickman <wi...@apache.org>
Authored: Tue Aug 5 13:03:12 2014 -0700
Committer: Brian Wickman <wi...@apache.org>
Committed: Tue Aug 5 13:03:12 2014 -0700

----------------------------------------------------------------------
 docs/configuration-reference.md                 |  37 +++
 docs/user-guide.md                              |  18 +-
 examples/vagrant/aurorabuild.sh                 |   6 +
 examples/vagrant/upstart/aurora-scheduler.conf  |   2 +-
 .../python/apache/aurora/executor/bin/BUILD     |   3 +-
 .../executor/bin/thermos_executor_main.py       |  40 ++-
 .../python/apache/aurora/executor/common/BUILD  |  15 ++
 .../apache/aurora/executor/common/announcer.py  | 224 ++++++++++++++++
 .../aurora/client/api/test_scheduler_client.py  |   4 +-
 .../python/apache/aurora/executor/common/BUILD  |  12 +
 .../aurora/executor/common/test_announcer.py    | 257 +++++++++++++++++++
 .../aurora/e2e/http/http_example_updated.aurora |   1 +
 .../sh/org/apache/aurora/e2e/test_common.sh     |  23 ++
 .../sh/org/apache/aurora/e2e/test_end_to_end.sh |   3 +
 .../org/apache/aurora/e2e/test_end_to_end_v2.sh |   3 +
 .../org/apache/aurora/e2e/validate_serverset.py |  46 ++++
 16 files changed, 687 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/fec17b6d/docs/configuration-reference.md
----------------------------------------------------------------------
diff --git a/docs/configuration-reference.md b/docs/configuration-reference.md
index 34f2359..5166d45 100644
--- a/docs/configuration-reference.md
+++ b/docs/configuration-reference.md
@@ -28,6 +28,7 @@ Aurora + Thermos Configuration Reference
     - [Services](#services)
     - [UpdateConfig Objects](#updateconfig-objects)
     - [HealthCheckConfig Objects](#healthcheckconfig-objects)
+    - [Announcer Objects](#announcer-objects)
 - [Specifying Scheduling Constraints](#specifying-scheduling-constraints)
 - [Template Namespaces](#template-namespaces)
     - [mesos Namespace](#mesos-namespace)
@@ -356,6 +357,42 @@ Parameters for controlling a task's health checks via HTTP.
 | ```timeout_secs```             | Integer   | HTTP request timeout. (Default: 1)
 | ```max_consecutive_failures``` | Integer   | Maximum number of consecutive failures that tolerated before considering a task unhealthy (Default: 0)
 
+### Announcer Objects
+
+If the `announce` field in the Job configuration is set, each task will be
+registered in the ServerSet `/aurora/role/environment/jobname` in the
+zookeeper ensemble configured by the executor.  If no Announcer object is specified,
+no announcement will take place.  For more information about ServerSets, see the [User Guide](user-guide.md).
+
+| object                         | type      | description
+| -------                        | :-------: | --------
+| ```primary_port```             | String    | Which named port to register as the primary endpoint in the ServerSet (Default: `http`)
+| ```portmap```                  | dict      | A mapping of additional endpoints to announced in the ServerSet (Default: `{ 'aurora': '{{primary_port}}' }`)
+
+### Port aliasing with the Announcer `portmap`
+
+The primary endpoint registered in the ServerSet is the one allocated to the port
+specified by the `primary_port` in the `Announcer` object, by default
+the `http` port.  This port can be referenced from anywhere within a configuration
+as `{{thermos.ports[http]}}`.
+
+Without the port map, each named port would be allocated a unique port number.
+The `portmap` allows two different named ports to be aliased together.  The default
+`portmap` aliases the `aurora` port (i.e. `{{thermos.ports[aurora]}}`) to
+the `http` port.  Even though the two ports can be referenced independently,
+only one port is allocated by Mesos.  Any port referenced in a `Process` object
+but which is not in the portmap will be allocated dynamically by Mesos and announced as well.
+
+It is possible to use the portmap to alias names to static port numbers, e.g.
+`{'http': 80, 'https': 443, 'aurora': 'http'}`.  In this case, referencing
+`{{thermos.ports[aurora]}}` would look up `{{thermos.ports[http]}}` then
+find a static port 80.  No port would be requested of or allocated by Mesos.
+
+Static ports should be used cautiously as Aurora does nothing to prevent two
+tasks with the same static port allocations from being co-scheduled.
+External constraints such as slave attributes should be used to enforce such
+guarantees should they be needed.
+
 Specifying Scheduling Constraints
 =================================
 

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/fec17b6d/docs/user-guide.md
----------------------------------------------------------------------
diff --git a/docs/user-guide.md b/docs/user-guide.md
index 6c703a3..e12ee89 100644
--- a/docs/user-guide.md
+++ b/docs/user-guide.md
@@ -11,6 +11,7 @@ Aurora User Guide
 	- [Giving Priority to Production Tasks: PREEMPTING](#user-content-giving-priority-to-production-tasks-preempting)
 	- [Natural Termination: FINISHED, FAILED](#user-content-natural-termination-finished-failed)
 	- [Forceful Termination: KILLING, RESTARTING](#user-content-forceful-termination-killing-restarting)
+- [Service Discovery](#user-content-service-discovery)
 - [Configuration](#user-content-configuration)
 - [Creating Jobs](#user-content-creating-jobs)
 - [Interacting With Jobs](#user-content-interacting-with-jobs)
@@ -276,12 +277,27 @@ Configuration
 
 You define and configure your Jobs (and their Tasks and Processes) in
 Aurora configuration files. Their filenames end with the `.aurora`
-suffix, and you write them in Python making use of the Pystashio
+suffix, and you write them in Python making use of the Pystachio
 templating language, along
 with specific Aurora, Mesos, and Thermos commands and methods. See the
 [Configuration Guide and Reference](configuration-reference.md) and
 [Configuration Tutorial](configuration-tutorial.md).
 
+Service Discovery
+-----------------
+
+It is possible for the Aurora executor to announce tasks into ServerSets for
+the purpose of service discovery.  ServerSets use the Zookeeper [group membership pattern](http://zookeeper.apache.org/doc/trunk/recipes.html#sc_outOfTheBox)
+of which there are several reference implementations:
+
+  - [C++](https://github.com/apache/mesos/blob/master/src/zookeeper/group.cpp)
+  - [Java](https://github.com/twitter/commons/blob/master/src/java/com/twitter/common/zookeeper/ServerSetImpl.java#L221)
+  - [Python](https://github.com/twitter/commons/blob/master/src/python/twitter/common/zookeeper/serverset/serverset.py#L51)
+
+These can also be used natively in Finagle using the [ZookeeperServerSetCluster](https://github.com/twitter/finagle/blob/master/finagle-serversets/src/main/scala/com/twitter/finagle/zookeeper/ZookeeperServerSetCluster.scala).
+
+For more information about how to configure announcing, see the [Configuration Reference](configuration-reference.md).
+
 Creating Jobs
 -------------
 

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/fec17b6d/examples/vagrant/aurorabuild.sh
----------------------------------------------------------------------
diff --git a/examples/vagrant/aurorabuild.sh b/examples/vagrant/aurorabuild.sh
index 9f604d3..8b39a16 100755
--- a/examples/vagrant/aurorabuild.sh
+++ b/examples/vagrant/aurorabuild.sh
@@ -75,6 +75,12 @@ with contextlib.closing(zipfile.ZipFile('dist/thermos_executor.pex', 'a')) as zf
   zf.writestr('apache/aurora/executor/resources/__init__.py', '')
   zf.write('dist/thermos_runner.pex', 'apache/aurora/executor/resources/thermos_runner.pex')
 EOF
+
+  cat <<EOF > $DIST_DIR/thermos_executor.sh
+#!/usr/bin/env bash
+exec /home/vagrant/aurora/dist/thermos_executor.pex --announcer-enable --announcer-ensemble localhost:2181
+EOF
+  chmod +x $DIST_DIR/thermos_executor.sh
 }
 
 function build_observer {

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/fec17b6d/examples/vagrant/upstart/aurora-scheduler.conf
----------------------------------------------------------------------
diff --git a/examples/vagrant/upstart/aurora-scheduler.conf b/examples/vagrant/upstart/aurora-scheduler.conf
index 05b2f26..7e2b3f0 100644
--- a/examples/vagrant/upstart/aurora-scheduler.conf
+++ b/examples/vagrant/upstart/aurora-scheduler.conf
@@ -35,7 +35,7 @@ exec $DIST_DIR/install/aurora-scheduler/bin/aurora-scheduler \
   -native_log_zk_group_path=/aurora/replicated-log \
   -native_log_file_path=$AURORA_HOME/scheduler/db \
   -backup_dir=$AURORA_HOME/scheduler/backups \
-  -thermos_executor_path=$DIST_DIR/thermos_executor.pex \
+  -thermos_executor_path=$DIST_DIR/thermos_executor.sh \
   -gc_executor_path=$DIST_DIR/gc_executor.pex \
   -vlog=INFO \
   -logtostderr

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/fec17b6d/src/main/python/apache/aurora/executor/bin/BUILD
----------------------------------------------------------------------
diff --git a/src/main/python/apache/aurora/executor/bin/BUILD b/src/main/python/apache/aurora/executor/bin/BUILD
index 6b3f620..9351715 100644
--- a/src/main/python/apache/aurora/executor/bin/BUILD
+++ b/src/main/python/apache/aurora/executor/bin/BUILD
@@ -16,12 +16,12 @@ python_binary(
   name = 'thermos_executor',
   source = 'thermos_executor_main.py',
   entry_point = 'apache.aurora.executor.bin.thermos_executor_main:proxy_main',
-  ignore_errors = True,
   always_write_cache = True,
   dependencies = [
     pants('3rdparty/python:twitter.common.app'),
     pants('3rdparty/python:twitter.common.log'),
     pants('3rdparty/python:twitter.common.metrics'),
+    pants('src/main/python/apache/aurora/executor/common:announcer'),
     pants('src/main/python/apache/aurora/executor/common:executor_timeout'),
     pants('src/main/python/apache/aurora/executor/common:health_checker'),
     pants('src/main/python/apache/aurora/executor/common:sandbox'),
@@ -36,7 +36,6 @@ python_binary(
   name = 'gc_executor',
   source = 'gc_executor_main.py',
   entry_point = 'apache.aurora.executor.bin.gc_executor_main:proxy_main',
-  ignore_errors = True,
   always_write_cache = True,
   dependencies = [
     pants('3rdparty/python:twitter.common.app'),

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/fec17b6d/src/main/python/apache/aurora/executor/bin/thermos_executor_main.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/aurora/executor/bin/thermos_executor_main.py b/src/main/python/apache/aurora/executor/bin/thermos_executor_main.py
index 198f2f6..aacc19a 100644
--- a/src/main/python/apache/aurora/executor/bin/thermos_executor_main.py
+++ b/src/main/python/apache/aurora/executor/bin/thermos_executor_main.py
@@ -26,6 +26,7 @@ from twitter.common import app, log
 from twitter.common.log.options import LogOptions
 
 from apache.aurora.executor.aurora_executor import AuroraExecutor
+from apache.aurora.executor.common.announcer import DefaultAnnouncerCheckerProvider
 from apache.aurora.executor.common.executor_timeout import ExecutorTimeout
 from apache.aurora.executor.common.health_checker import HealthCheckerProvider
 from apache.aurora.executor.thermos_task_runner import DefaultThermosTaskRunnerProvider
@@ -36,6 +37,32 @@ LogOptions.set_disk_log_level('DEBUG')
 LogOptions.set_log_dir('.')
 
 
+app.add_option(
+    '--announcer-enable',
+    dest='announcer_enable',
+    action='store_true',
+    default=False,
+    help='Enable the ServerSet announcer for this executor.  Jobs must still activate using '
+         'the Announcer configuration.')
+
+
+app.add_option(
+    '--announcer-ensemble',
+    dest='announcer_ensemble',
+    type=str,
+    default=None,
+    help='The ensemble to which the Announcer should register ServerSets.')
+
+
+app.add_option(
+    '--announcer-serverset-path',
+    dest='announcer_serverset_path',
+    type=str,
+    default='/aurora',
+    help='The root of the tree into which ServerSets should be announced.  The paths will '
+         'be of the form $ROOT/$ROLE/$ENVIRONMENT/$JOBNAME.')
+
+
 # TODO(wickman) Consider just having the OSS version require pip installed
 # thermos_runner binaries on every machine and instead of embedding the pex
 # as a resource, shell out to one on the PATH.
@@ -52,16 +79,25 @@ def dump_runner_pex():
 
 
 def proxy_main():
-  def main():
+  def main(args, options):
     thermos_runner_provider = DefaultThermosTaskRunnerProvider(
         dump_runner_pex(),
         artifact_dir=os.path.realpath('.'),
     )
 
+    # status providers:
+    status_providers = [HealthCheckerProvider()]
+
+    if options.announcer_enable:
+      if options.announcer_ensemble is None:
+        app.error('Must specify --announcer-ensemble if the announcer is enabled.')
+      status_providers.append(DefaultAnnouncerCheckerProvider(
+          options.announcer_ensemble, options.announcer_serverset_path))
+
     # Create executor stub
     thermos_executor = AuroraExecutor(
         runner_provider=thermos_runner_provider,
-        status_providers=(HealthCheckerProvider(),),
+        status_providers=status_providers,
     )
 
     # Create driver stub

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/fec17b6d/src/main/python/apache/aurora/executor/common/BUILD
----------------------------------------------------------------------
diff --git a/src/main/python/apache/aurora/executor/common/BUILD b/src/main/python/apache/aurora/executor/common/BUILD
index d0ff6fb..0a4d35a 100644
--- a/src/main/python/apache/aurora/executor/common/BUILD
+++ b/src/main/python/apache/aurora/executor/common/BUILD
@@ -44,6 +44,21 @@ python_library(
 )
 
 python_library(
+  name = 'announcer',
+  sources = ['announcer.py'],
+  dependencies = [
+    pants('3rdparty/python:twitter.common.concurrent'),
+    pants('3rdparty/python:twitter.common.exceptions'),
+    pants('3rdparty/python:twitter.common.log'),
+    pants('3rdparty/python:twitter.common.metrics'),
+    pants('3rdparty/python:twitter.common.quantity'),
+    pants('3rdparty/python:twitter.common.zookeeper'),
+    pants(':status_checker'),
+    pants(':task_info'),
+  ]
+)
+
+python_library(
   name = 'executor_timeout',
   sources = ['executor_timeout.py'],
   dependencies = [

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/fec17b6d/src/main/python/apache/aurora/executor/common/announcer.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/aurora/executor/common/announcer.py b/src/main/python/apache/aurora/executor/common/announcer.py
new file mode 100644
index 0000000..c466da8
--- /dev/null
+++ b/src/main/python/apache/aurora/executor/common/announcer.py
@@ -0,0 +1,224 @@
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import posixpath
+import socket
+import threading
+import time
+from abc import abstractmethod
+
+from kazoo.client import KazooClient
+from kazoo.retry import KazooRetry
+from twitter.common import log
+from twitter.common.concurrent.deferred import defer
+from twitter.common.exceptions import ExceptionalThread
+from twitter.common.metrics import LambdaGauge, Observable
+from twitter.common.quantity import Amount, Time
+from twitter.common.zookeeper.serverset import Endpoint, ServerSet
+
+from apache.aurora.executor.common.status_checker import StatusChecker, StatusCheckerProvider
+from apache.aurora.executor.common.task_info import (
+    mesos_task_instance_from_assigned_task,
+    resolve_ports
+)
+
+
+def make_endpoints(hostname, portmap, primary_port):
+  """
+    Generate primary, additional endpoints from a portmap and primary_port.
+    primary_port must be a name in the portmap dictionary.
+  """
+  # Do int check as stop-gap measure against incompatible downstream clients.
+  additional_endpoints = dict(
+      (name, Endpoint(hostname, port)) for (name, port) in portmap.items()
+      if isinstance(port, int))
+
+  # It's possible for the primary port to not have been allocated if this task
+  # is using autoregistration, so register with a port of 0.
+  return Endpoint(hostname, portmap.get(primary_port, 0)), additional_endpoints
+
+
+class AnnouncerCheckerProvider(StatusCheckerProvider):
+  def __init__(self, name=None):
+    self.name = name
+    super(AnnouncerCheckerProvider, self).__init__()
+
+  @abstractmethod
+  def make_serverset(self, assigned_task):
+    """Given an assigned task, return the serverset into which we should announce the task."""
+
+  def from_assigned_task(self, assigned_task, _):
+    mesos_task = mesos_task_instance_from_assigned_task(assigned_task)
+
+    if not mesos_task.has_announce():
+      return None
+
+    portmap = resolve_ports(mesos_task, assigned_task.assignedPorts)
+
+    endpoint, additional = make_endpoints(
+        socket.gethostname(),
+        portmap,
+        mesos_task.announce().primary_port().get())
+
+    serverset = self.make_serverset(assigned_task)
+
+    return AnnouncerChecker(
+        serverset, endpoint, additional=additional, shard=assigned_task.instanceId, name=self.name)
+
+
+class DefaultAnnouncerCheckerProvider(AnnouncerCheckerProvider):
+  DEFAULT_RETRY_MAX_DELAY = Amount(5, Time.MINUTES)
+  DEFAULT_RETRY_POLICY = KazooRetry(
+      max_tries=None,
+      ignore_expire=True,
+      max_delay=DEFAULT_RETRY_MAX_DELAY.as_(Time.SECONDS),
+  )
+
+  def __init__(self, ensemble, root='/aurora'):
+    self.__ensemble = ensemble
+    self.__root = root
+    super(DefaultAnnouncerCheckerProvider, self).__init__()
+
+  def make_serverset(self, assigned_task):
+    role, environment, name = (
+        assigned_task.task.owner.role,
+        assigned_task.task.environment,
+        assigned_task.task.jobName)
+    path = posixpath.join(self.__root, role, environment, name)
+    client = KazooClient(self.__ensemble, connection_retry=self.DEFAULT_RETRY_POLICY)
+    client.start()
+    return ServerSet(client, path)
+
+
+class ServerSetJoinThread(ExceptionalThread):
+  """Background thread to reconnect to Serverset on session expiration."""
+
+  LOOP_WAIT = Amount(1, Time.SECONDS)
+
+  def __init__(self, event, joiner, loop_wait=LOOP_WAIT):
+    self._event = event
+    self._joiner = joiner
+    self._stopped = threading.Event()
+    self._loop_wait = loop_wait
+    super(ServerSetJoinThread, self).__init__()
+    self.daemon = True
+
+  def run(self):
+    while True:
+      if self._stopped.is_set():
+        break
+      self._event.wait(timeout=self._loop_wait.as_(Time.SECONDS))
+      if not self._event.is_set():
+        continue
+      log.debug('Join event triggered, joining serverset.')
+      self._event.clear()
+      self._joiner()
+
+  def stop(self):
+    self._stopped.set()
+
+
+class Announcer(Observable):
+  class Error(Exception): pass
+
+  EXCEPTION_WAIT = Amount(15, Time.SECONDS)
+
+  def __init__(self,
+               serverset,
+               endpoint,
+               additional=None,
+               shard=None,
+               clock=time,
+               exception_wait=None):
+    self._membership = None
+    self._membership_termination = clock.time()
+    self._endpoint = endpoint
+    self._additional = additional or {}
+    self._shard = shard
+    self._serverset = serverset
+    self._rejoin_event = threading.Event()
+    self._clock = clock
+    self._thread = None
+    self._exception_wait = exception_wait or self.EXCEPTION_WAIT
+
+  def disconnected_time(self):
+    # Lockless membership length check
+    membership_termination = self._membership_termination
+    if membership_termination is None:
+      return 0
+    return self._clock.time() - membership_termination
+
+  def _join_inner(self):
+    return self._serverset.join(
+        endpoint=self._endpoint,
+        additional=self._additional,
+        shard=self._shard,
+        expire_callback=self.on_expiration)
+
+  def _join(self):
+    if self._membership is not None:
+      raise self.Error("join called, but already have membership!")
+    while True:
+      try:
+        self._membership = self._join_inner()
+        self._membership_termination = None
+      except Exception as e:
+        log.error('Failed to join ServerSet: %s' % e)
+        self._clock.sleep(self._exception_wait.as_(Time.SECONDS))
+      else:
+        break
+
+  def start(self):
+    self._thread = ServerSetJoinThread(self._rejoin_event, self._join)
+    self._thread.start()
+    self.rejoin()
+
+  def rejoin(self):
+    self._rejoin_event.set()
+
+  def stop(self):
+    thread, self._thread = self._thread, None
+    thread.stop()
+    if self._membership:
+      self._serverset.cancel(self._membership)
+
+  def on_expiration(self):
+    self._membership = None
+    if not self._thread:
+      return
+    self._membership_termination = self._clock.time()
+    log.info('Zookeeper session expired.')
+    self.rejoin()
+
+
+class AnnouncerChecker(StatusChecker):
+  DEFAULT_NAME = 'announcer'
+
+  def __init__(self, serverset, endpoint, additional=None, shard=None, name=None):
+    self.__announcer = Announcer(serverset, endpoint, additional=additional, shard=shard)
+    self.__name = name or self.DEFAULT_NAME
+    self.metrics.register(LambdaGauge('disconnected_time', self.__announcer.disconnected_time))
+
+  @property
+  def status(self):
+    return None  # always return healthy
+
+  def name(self):
+    return self.__name
+
+  def start(self):
+    self.__announcer.start()
+
+  def stop(self):
+    defer(self.__announcer.stop)

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/fec17b6d/src/test/python/apache/aurora/client/api/test_scheduler_client.py
----------------------------------------------------------------------
diff --git a/src/test/python/apache/aurora/client/api/test_scheduler_client.py b/src/test/python/apache/aurora/client/api/test_scheduler_client.py
index ab74db3..8f5fe56 100644
--- a/src/test/python/apache/aurora/client/api/test_scheduler_client.py
+++ b/src/test/python/apache/aurora/client/api/test_scheduler_client.py
@@ -46,7 +46,9 @@ from gen.apache.aurora.api.ttypes import (
     RewriteConfigsRequest,
     ScheduleStatus,
     SessionKey,
-    TaskQuery
+    TaskQuery,
+    UpdateQuery,
+    UpdateRequest
 )
 
 ROLE = 'foorole'

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/fec17b6d/src/test/python/apache/aurora/executor/common/BUILD
----------------------------------------------------------------------
diff --git a/src/test/python/apache/aurora/executor/common/BUILD b/src/test/python/apache/aurora/executor/common/BUILD
index 8b54e91..6316c7f 100644
--- a/src/test/python/apache/aurora/executor/common/BUILD
+++ b/src/test/python/apache/aurora/executor/common/BUILD
@@ -15,6 +15,7 @@
 python_test_suite(
   name = 'all',
   dependencies = [
+    pants(':announcer'),
     pants(':directory_sandbox'),
     pants(':health_checker'),
     pants(':status_checker'),
@@ -23,6 +24,17 @@ python_test_suite(
 )
 
 python_tests(
+  name = 'announcer',
+  sources = ['test_announcer.py'],
+  dependencies = [
+    pants('3rdparty/python:mock'),
+    pants('3rdparty/python:twitter.common.quantity'),
+    pants('3rdparty/python:twitter.common.testing'),
+    pants('src/main/python/apache/aurora/executor/common:announcer'),
+  ],
+)
+
+python_tests(
   name = 'directory_sandbox',
   sources = ['test_directory_sandbox.py'],
   dependencies = [

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/fec17b6d/src/test/python/apache/aurora/executor/common/test_announcer.py
----------------------------------------------------------------------
diff --git a/src/test/python/apache/aurora/executor/common/test_announcer.py b/src/test/python/apache/aurora/executor/common/test_announcer.py
new file mode 100644
index 0000000..e5c4ce4
--- /dev/null
+++ b/src/test/python/apache/aurora/executor/common/test_announcer.py
@@ -0,0 +1,257 @@
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import threading
+
+import mock
+from kazoo.client import KazooClient
+from kazoo.exceptions import KazooException
+from twitter.common.quantity import Amount, Time
+from twitter.common.testing.clock import ThreadedClock
+from twitter.common.zookeeper.serverset import Endpoint, ServerSet
+
+from apache.aurora.executor.common.announcer import (
+    Announcer,
+    DefaultAnnouncerCheckerProvider,
+    make_endpoints,
+    ServerSetJoinThread
+)
+
+
+def test_serverset_join_thread():
+  join = threading.Event()
+  joined = threading.Event()
+
+  def join_function():
+    joined.set()
+
+  ssjt = ServerSetJoinThread(join, join_function, loop_wait=Amount(1, Time.MILLISECONDS))
+  ssjt.start()
+  ssjt.stop()
+  ssjt.join(timeout=1.0)
+  assert not ssjt.is_alive()
+  assert not joined.is_set()
+
+  ssjt = ServerSetJoinThread(join, join_function, loop_wait=Amount(1, Time.MILLISECONDS))
+  ssjt.start()
+
+  def test_loop():
+    join.set()
+    joined.wait(timeout=1.0)
+    assert not join.is_set()  # join is cleared
+    assert joined.is_set()  # joined has been called
+    joined.clear()
+
+  # validate that the loop is working
+  test_loop()
+  test_loop()
+
+  ssjt.stop()
+  ssjt.join(timeout=1.0)
+  assert not ssjt.is_alive()
+  assert not joined.is_set()
+
+
+def test_announcer_under_normal_circumstances():
+  joined = threading.Event()
+
+  def joined_side_effect(*args, **kw):
+    joined.set()
+    return 'membership foo'
+
+  mock_serverset = mock.MagicMock(spec=ServerSet)
+  mock_serverset.join = mock.MagicMock()
+  mock_serverset.join.side_effect = joined_side_effect
+  mock_serverset.cancel = mock.MagicMock()
+
+  endpoint = Endpoint('localhost', 12345)
+  clock = ThreadedClock(31337.0)
+
+  announcer = Announcer(mock_serverset, endpoint, clock=clock)
+  assert announcer.disconnected_time() == 0.0
+  clock.tick(1.0)
+  assert announcer.disconnected_time() == 1.0, (
+      'Announcer should advance disconnection time when not yet initially connected.')
+
+  announcer.start()
+
+  try:
+    joined.wait(timeout=1.0)
+    assert joined.is_set()
+
+    assert announcer.disconnected_time() == 0.0
+    clock.tick(1.0)
+    assert announcer.disconnected_time() == 0.0, (
+        'Announcer should not advance disconnection time when connected.')
+    assert announcer._membership == 'membership foo'
+
+  finally:
+    announcer.stop()
+
+  mock_serverset.cancel.assert_called_with('membership foo')
+
+  assert announcer.disconnected_time() == 0.0
+  clock.tick(1.0)
+  assert announcer.disconnected_time() == 0.0, (
+      'Announcer should not advance disconnection time when stopped.')
+
+
+def test_announcer_on_expiration():
+  joined = threading.Event()
+  operations = []
+
+  def joined_side_effect(*args, **kw):
+    # 'global' does not work within python nested functions, so we cannot use a
+    # counter here, so instead we do append/len (see PEP-3104)
+    operations.append(1)
+    if len(operations) == 1 or len(operations) == 3:
+      joined.set()
+      return 'membership %d' % len(operations)
+    else:
+      raise KazooException('Failed to reconnect')
+
+  mock_serverset = mock.MagicMock(spec=ServerSet)
+  mock_serverset.join = mock.MagicMock()
+  mock_serverset.join.side_effect = joined_side_effect
+  mock_serverset.cancel = mock.MagicMock()
+
+  endpoint = Endpoint('localhost', 12345)
+  clock = ThreadedClock(31337.0)
+
+  announcer = Announcer(
+      mock_serverset, endpoint, clock=clock, exception_wait=Amount(2, Time.SECONDS))
+  announcer.start()
+
+  try:
+    joined.wait(timeout=1.0)
+    assert joined.is_set()
+    assert announcer._membership == 'membership 1'
+    assert announcer.disconnected_time() == 0.0
+    clock.tick(1.0)
+    assert announcer.disconnected_time() == 0.0
+    announcer.on_expiration()  # expect exception
+    clock.tick(1.0)
+    assert announcer.disconnected_time() == 1.0, (
+        'Announcer should be disconnected on expiration.')
+    clock.tick(1.0)
+    assert announcer.disconnected_time() == 0.0, (
+        'Announcer should not advance disconnection time when connected.')
+    assert announcer._membership == 'membership 3'
+
+  finally:
+    announcer.stop()
+
+
+def test_announcer_under_abnormal_circumstances():
+  mock_serverset = mock.MagicMock(spec=ServerSet)
+  mock_serverset.join = mock.MagicMock()
+  mock_serverset.join.side_effect = [
+      KazooException('Whoops the ensemble is down!'),
+      'member0001',
+  ]
+  mock_serverset.cancel = mock.MagicMock()
+
+  endpoint = Endpoint('localhost', 12345)
+  clock = ThreadedClock(31337.0)
+
+  announcer = Announcer(
+       mock_serverset, endpoint, clock=clock, exception_wait=Amount(2, Time.SECONDS))
+  announcer.start()
+
+  try:
+    clock.tick(1.0)
+    assert announcer.disconnected_time() == 1.0
+    clock.tick(2.0)
+    assert announcer.disconnected_time() == 0.0, (
+        'Announcer should recover after an exception thrown internally.')
+    assert announcer._membership == 'member0001'
+  finally:
+    announcer.stop()
+
+
+def make_assigned_task(thermos_config, assigned_ports=None):
+  from gen.apache.aurora.api.constants import AURORA_EXECUTOR_NAME
+  from gen.apache.aurora.api.ttypes import AssignedTask, ExecutorConfig, Identity, TaskConfig
+
+  assigned_ports = assigned_ports or {}
+  executor_config = ExecutorConfig(name=AURORA_EXECUTOR_NAME, data=thermos_config.json_dumps())
+  task_config = TaskConfig(
+      owner=Identity(role=thermos_config.role().get(), user=thermos_config.role().get()),
+      environment=thermos_config.environment().get(),
+      jobName=thermos_config.name().get(),
+      executorConfig=executor_config)
+
+  return AssignedTask(instanceId=12345, task=task_config, assignedPorts=assigned_ports)
+
+
+def make_job(role, environment, name, primary_port, portmap):
+  from apache.aurora.config.schema.base import (
+      Announcer,
+      Job,
+      Process,
+      Resources,
+      Task,
+  )
+  task = Task(
+      name='ignore2',
+      processes=[Process(name='ignore3', cmdline='ignore4')],
+      resources=Resources(cpu=1, ram=1, disk=1))
+  job = Job(
+      role=role,
+      environment=environment,
+      name=name,
+      cluster='ignore1',
+      task=task,
+      announce=Announcer(primary_port=primary_port, portmap=portmap))
+  return job
+
+
+def test_make_empty_endpoints():
+  hostname = 'aurora.example.com'
+  portmap = {}
+  primary_port = 'http'
+
+  # test no bound 'http' port
+  primary, additional = make_endpoints(hostname, portmap, primary_port)
+  assert primary == Endpoint(hostname, 0)
+  assert additional == {}
+
+
+@mock.patch('apache.aurora.executor.common.announcer.ServerSet')
+@mock.patch('apache.aurora.executor.common.announcer.KazooClient')
+def test_default_announcer_provider(mock_client_provider, mock_serverset_provider):
+  mock_client = mock.MagicMock(spec=KazooClient)
+  mock_client_provider.return_value = mock_client
+  mock_serverset = mock.MagicMock(spec=ServerSet)
+  mock_serverset_provider.return_value = mock_serverset
+
+  dap = DefaultAnnouncerCheckerProvider('zookeeper.example.com', root='/aurora')
+  job = make_job('aurora', 'prod', 'proxy', 'primary', portmap={'http': 80, 'admin': 'primary'})
+  assigned_task = make_assigned_task(job, assigned_ports={'primary': 12345})
+  checker = dap.from_assigned_task(assigned_task, None)
+
+  mock_client.start.assert_called_once_with()
+  mock_serverset_provider.assert_called_once_with(mock_client, '/aurora/aurora/prod/proxy')
+  assert checker.name() == 'announcer'
+  assert checker.status is None
+
+
+def test_default_announcer_provider_without_announce():
+  from pystachio import Empty
+
+  job = make_job('aurora', 'prod', 'proxy', 'primary', portmap={})
+  job = job(announce=Empty)
+  assigned_task = make_assigned_task(job)
+
+  assert DefaultAnnouncerCheckerProvider('foo.bar').from_assigned_task(assigned_task, None) is None

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/fec17b6d/src/test/sh/org/apache/aurora/e2e/http/http_example_updated.aurora
----------------------------------------------------------------------
diff --git a/src/test/sh/org/apache/aurora/e2e/http/http_example_updated.aurora b/src/test/sh/org/apache/aurora/e2e/http/http_example_updated.aurora
index 9dfc651..67d3dbb 100644
--- a/src/test/sh/org/apache/aurora/e2e/http/http_example_updated.aurora
+++ b/src/test/sh/org/apache/aurora/e2e/http/http_example_updated.aurora
@@ -46,6 +46,7 @@ job = Job(
   constraints = {
     'host': 'limit:4',
   },
+  announce = Announcer(),
 )
 
 jobs = [ job ]

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/fec17b6d/src/test/sh/org/apache/aurora/e2e/test_common.sh
----------------------------------------------------------------------
diff --git a/src/test/sh/org/apache/aurora/e2e/test_common.sh b/src/test/sh/org/apache/aurora/e2e/test_common.sh
index def823b..43d2516 100644
--- a/src/test/sh/org/apache/aurora/e2e/test_common.sh
+++ b/src/test/sh/org/apache/aurora/e2e/test_common.sh
@@ -41,3 +41,26 @@ collect_result() {
     exit $RETCODE
   ) >&4 # Send to the stderr we had at startup.
 }
+
+validate_serverset() {
+  # default python return code
+  local retcode=0
+
+  # launch aurora client in interpreter mode to get access to the kazoo client
+  vagrant ssh -c \
+      "env SERVERSET="$1" PEX_INTERPRETER=1 aurora /vagrant/src/test/sh/org/apache/aurora/e2e/validate_serverset.py" \
+      || retcode=$?
+
+  if [[ $retcode = 1 ]]; then
+    echo "Validated announced job."
+    return 0
+  elif [[ $retcode = 2 ]]; then
+    echo "Job failed to announce in serverset."
+  elif [[ $retcode = 3 ]]; then
+    echo "Job failed to re-announce when expired."
+  else
+    echo "Unknown failure in test script."
+  fi
+
+  return 1
+}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/fec17b6d/src/test/sh/org/apache/aurora/e2e/test_end_to_end.sh
----------------------------------------------------------------------
diff --git a/src/test/sh/org/apache/aurora/e2e/test_end_to_end.sh b/src/test/sh/org/apache/aurora/e2e/test_end_to_end.sh
index 3529c18..fa5568e 100755
--- a/src/test/sh/org/apache/aurora/e2e/test_end_to_end.sh
+++ b/src/test/sh/org/apache/aurora/e2e/test_end_to_end.sh
@@ -40,6 +40,9 @@ test_http_example() {
   echo '== Updating test job'
   vagrant ssh -c "aurora update $jobkey $_updated_config"
 
+  echo '== Validating announce'
+  validate_serverset "/aurora/$_role/$_env/$_job"
+
   #echo "== Probing job via 'aurora run'"
   # TODO(mchucarroll): Get "run" working: the vagrant configuration currently doesn't set up ssh
   # to allow automatic logins to the slaves. "aurora run" therefore tries to prompt the user for

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/fec17b6d/src/test/sh/org/apache/aurora/e2e/test_end_to_end_v2.sh
----------------------------------------------------------------------
diff --git a/src/test/sh/org/apache/aurora/e2e/test_end_to_end_v2.sh b/src/test/sh/org/apache/aurora/e2e/test_end_to_end_v2.sh
index a3f530f..ea5ae8d 100755
--- a/src/test/sh/org/apache/aurora/e2e/test_end_to_end_v2.sh
+++ b/src/test/sh/org/apache/aurora/e2e/test_end_to_end_v2.sh
@@ -45,6 +45,9 @@ test_http_example() {
   echo '== Updating test job'
   vagrant ssh -c "aurora2 job update $jobkey $_updated_config"
 
+  echo '== Validating announce'
+  validate_serverset "/aurora/$_role/$_env/$_job"
+
   # TODO(mchucarroll): Get "run" working: the vagrant configuration currently doesn't set up ssh
   # to allow automatic logins to the slaves. "aurora run" therefore tries to prompt the user for
   # a password, finds that it's not running in a TTY, and aborts.

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/fec17b6d/src/test/sh/org/apache/aurora/e2e/validate_serverset.py
----------------------------------------------------------------------
diff --git a/src/test/sh/org/apache/aurora/e2e/validate_serverset.py b/src/test/sh/org/apache/aurora/e2e/validate_serverset.py
new file mode 100644
index 0000000..66fa965
--- /dev/null
+++ b/src/test/sh/org/apache/aurora/e2e/validate_serverset.py
@@ -0,0 +1,46 @@
+import os
+import posixpath
+import sys
+import time
+
+from kazoo.client import KazooClient
+from kazoo.exceptions import NoNodeError
+
+OK = 1
+DID_NOT_REGISTER = 2
+DID_NOT_RECOVER_FROM_EXPIRY = 3
+
+
+serverset = os.getenv('SERVERSET')
+client = KazooClient('localhost:2181')
+client.start()
+
+
+def wait_until_znodes(count, timeout=30):
+  now = time.time()
+  timeout += now
+  while now < timeout:
+    try:
+      children = client.get_children(serverset)
+    except NoNodeError:
+      children = []
+    print('Announced members: %s' % children)
+    if len(children) == count:
+      return [posixpath.join(serverset, child) for child in children]
+    time.sleep(1)
+    now += 1
+  return []
+
+
+# job is created with 4 znodes.
+znodes = wait_until_znodes(4, timeout=10)
+if not znodes:
+  sys.exit(DID_NOT_REGISTER)
+
+client.delete(znodes[0])
+
+znodes = wait_until_znodes(4, timeout=10)
+if not znodes:
+  sys.exit(DID_NOT_RECOVER_FROM_EXPIRY)
+
+sys.exit(OK)