You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aurora.apache.org by js...@apache.org on 2016/04/12 06:31:29 UTC

aurora git commit: Introduce a Curator-based `ServiceGroupMonitor`.

Repository: aurora
Updated Branches:
  refs/heads/master 262ca16c5 -> ccf23820d


Introduce a Curator-based `ServiceGroupMonitor`.

Bugs closed: AURORA-1468

Reviewed at https://reviews.apache.org/r/45902/


Project: http://git-wip-us.apache.org/repos/asf/aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/aurora/commit/ccf23820
Tree: http://git-wip-us.apache.org/repos/asf/aurora/tree/ccf23820
Diff: http://git-wip-us.apache.org/repos/asf/aurora/diff/ccf23820

Branch: refs/heads/master
Commit: ccf23820d15e251f7c7a44621992a1297b1a902f
Parents: 262ca16
Author: John Sirois <js...@apache.org>
Authored: Mon Apr 11 22:31:27 2016 -0600
Committer: John Sirois <jo...@gmail.com>
Committed: Mon Apr 11 22:31:27 2016 -0600

----------------------------------------------------------------------
 build.gradle                                    |   5 +
 .../discovery/CuratorServiceGroupMonitor.java   | 106 ++++++++++
 .../CuratorServiceGroupMonitorTest.java         | 203 +++++++++++++++++++
 3 files changed, 314 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/aurora/blob/ccf23820/build.gradle
----------------------------------------------------------------------
diff --git a/build.gradle b/build.gradle
index fc61adf..d981ab7 100644
--- a/build.gradle
+++ b/build.gradle
@@ -80,6 +80,7 @@ For more details, please see https://issues.apache.org/jira/browse/AURORA-1169
   }
 
   ext.commonsLangRev = '2.6'
+  ext.curatorRev = '2.10.0'
   ext.gsonRev = '2.3.1'
   ext.guavaRev = '19.0'
   ext.guiceRev = '3.0'
@@ -115,6 +116,7 @@ For more details, please see https://issues.apache.org/jira/browse/AURORA-1169
         force "com.google.protobuf:protobuf-java:${protobufRev}"
         force "junit:junit:${junitRev}"
         force "org.apache.thrift:libthrift:${thriftRev}"
+        force "org.apache.zookeeper:zookeeper:${zookeeperRev}"
         force "org.hamcrest:hamcrest-core:1.3"
         force "org.slf4j:slf4j-api:${slf4jRev}"
         force "org.mybatis:mybatis:${mybatisRev}"
@@ -358,6 +360,9 @@ dependencies {
   compile 'javax.inject:javax.inject:1'
   compile "javax.servlet:javax.servlet-api:${servletRev}"
   compile "org.antlr:stringtemplate:${stringTemplateRev}"
+  compile "org.apache.curator:curator-client:${curatorRev}"
+  compile "org.apache.curator:curator-framework:${curatorRev}"
+  compile "org.apache.curator:curator-recipes:${curatorRev}"
   compile 'org.apache.mesos:mesos:0.26.0'
   compile "org.apache.shiro:shiro-guice:${shiroRev}"
   compile "org.apache.shiro:shiro-web:${shiroRev}"

http://git-wip-us.apache.org/repos/asf/aurora/blob/ccf23820/src/main/java/org/apache/aurora/scheduler/discovery/CuratorServiceGroupMonitor.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/discovery/CuratorServiceGroupMonitor.java b/src/main/java/org/apache/aurora/scheduler/discovery/CuratorServiceGroupMonitor.java
new file mode 100644
index 0000000..9d8b7bd
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/discovery/CuratorServiceGroupMonitor.java
@@ -0,0 +1,106 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.discovery;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.util.Optional;
+import java.util.function.Predicate;
+
+import com.google.common.collect.ImmutableSet;
+
+import org.apache.aurora.GuavaUtils;
+import org.apache.aurora.common.io.Codec;
+import org.apache.aurora.common.thrift.ServiceInstance;
+import org.apache.aurora.scheduler.app.SchedulerMain;
+import org.apache.aurora.scheduler.app.ServiceGroupMonitor;
+import org.apache.curator.framework.recipes.cache.ChildData;
+import org.apache.curator.framework.recipes.cache.PathChildrenCache;
+import org.apache.curator.utils.ZKPaths;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static java.util.Objects.requireNonNull;
+
+class CuratorServiceGroupMonitor implements ServiceGroupMonitor {
+  private static final Logger LOG = LoggerFactory.getLogger(SchedulerMain.class);
+
+  private final PathChildrenCache groupCache;
+  private final Predicate<String> memberSelector;
+  private final Codec<ServiceInstance> codec;
+
+  /**
+   * Creates a {@code ServiceGroupMonitor} backed by Curator.
+   *
+   * Although this monitor can be queried at any time, it will not usefully reflect service group
+   * membership until it is {@link #start() started}. When starting a monitor, it should be arranged
+   * that the monitor is {@link #close() closed} when no longer needed.
+   *
+   * It's important to be able to pick out group member nodes amongst child nodes for group paths
+   * that can contain mixed-content nodes. The given {@code memberSelector} should be able to
+   * discriminate member nodes from non-member nodes given the node name.
+   *
+   * @param groupCache The cache of group nodes.
+   * @param memberSelector A predicate that returns {@code true} for group node names that represent
+   *                       group members.  Here the name is just the `basename` of the node's full
+   *                       ZooKeeper path.
+   * @param codec A codec that can be used to deserialize group member {@link ServiceInstance} data.
+   */
+  CuratorServiceGroupMonitor(
+      PathChildrenCache groupCache,
+      Predicate<String> memberSelector,
+      Codec<ServiceInstance> codec) {
+
+    this.groupCache = requireNonNull(groupCache);
+    this.memberSelector = requireNonNull(memberSelector);
+    this.codec = requireNonNull(codec);
+  }
+
+  @Override
+  public void start() throws MonitorException {
+    try {
+      // NB: This blocks on an initial group population to emulate legacy ServerSetMonitor behavior;
+      // asynchronous population is an option using NORMAL or POST_INITIALIZED_EVENT StartModes
+      // though.
+      groupCache.start(PathChildrenCache.StartMode.BUILD_INITIAL_CACHE);
+    } catch (Exception e) {
+      throw new MonitorException("Failed to begin monitoring service group.", e);
+    }
+  }
+
+  @Override
+  public void close() throws IOException {
+    groupCache.close();
+  }
+
+  @Override
+  public ImmutableSet<ServiceInstance> get() {
+    return groupCache.getCurrentData().stream()
+        .filter(cd -> memberSelector.test(ZKPaths.getNodeFromPath(cd.getPath())))
+        .map(this::extractServiceInstance)
+        .filter(Optional::isPresent)
+        .map(Optional::get)
+        .collect(GuavaUtils.toImmutableSet());
+  }
+
+  private Optional<ServiceInstance> extractServiceInstance(ChildData data) {
+    ByteArrayInputStream source = new ByteArrayInputStream(data.getData());
+    try {
+      return Optional.of(codec.deserialize(source));
+    } catch (IOException e) {
+      LOG.error("Failed to deserialize ServiceInstance from " + data, e);
+      return Optional.empty();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/ccf23820/src/test/java/org/apache/aurora/scheduler/discovery/CuratorServiceGroupMonitorTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/discovery/CuratorServiceGroupMonitorTest.java b/src/test/java/org/apache/aurora/scheduler/discovery/CuratorServiceGroupMonitorTest.java
new file mode 100644
index 0000000..0879c2e
--- /dev/null
+++ b/src/test/java/org/apache/aurora/scheduler/discovery/CuratorServiceGroupMonitorTest.java
@@ -0,0 +1,203 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.discovery;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.function.Predicate;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+
+import org.apache.aurora.common.thrift.Endpoint;
+import org.apache.aurora.common.thrift.ServiceInstance;
+import org.apache.aurora.common.thrift.Status;
+import org.apache.aurora.common.zookeeper.ServerSet;
+import org.apache.aurora.common.zookeeper.testing.BaseZooKeeperTest;
+import org.apache.aurora.scheduler.app.ServiceGroupMonitor;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.framework.recipes.cache.PathChildrenCache;
+import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
+import org.apache.curator.utils.ZKPaths;
+import org.apache.zookeeper.CreateMode;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class CuratorServiceGroupMonitorTest extends BaseZooKeeperTest {
+
+  private static final String GROUP_PATH = "/group/root";
+  private static final String MEMBER_TOKEN = "member_";
+
+  private CuratorFramework client;
+  private BlockingQueue<PathChildrenCacheEvent> groupEvents;
+  private CuratorServiceGroupMonitor groupMonitor;
+
+  @Before
+  public void setUpCurator() {
+    client = CuratorFrameworkFactory.builder()
+        .dontUseContainerParents() // Container nodes are only available in ZK 3.5+.
+        .retryPolicy((retryCount, elapsedTimeMs, sleeper) -> false) // Don't retry.
+        .connectString(String.format("localhost:%d", getServer().getPort()))
+        .build();
+    client.start();
+    addTearDown(client::close);
+
+    PathChildrenCache groupCache =
+        new PathChildrenCache(client, GROUP_PATH, true /* cacheData */);
+    groupEvents = new LinkedBlockingQueue<>();
+    groupCache.getListenable().addListener((c, event) -> groupEvents.put(event));
+
+    Predicate<String> memberSelector = name -> name.contains(MEMBER_TOKEN);
+    groupMonitor = new CuratorServiceGroupMonitor(groupCache, memberSelector, ServerSet.JSON_CODEC);
+  }
+
+  private void startGroupMonitor() throws ServiceGroupMonitor.MonitorException {
+    groupMonitor.start();
+    addTearDown(groupMonitor::close);
+  }
+
+  private void expectGroupEvent(PathChildrenCacheEvent.Type eventType) {
+    while (true) {
+      try {
+        PathChildrenCacheEvent event = groupEvents.take();
+        if (event.getType() == eventType) {
+          break;
+        }
+      } catch (InterruptedException ex) {
+        throw new RuntimeException(ex);
+      }
+    }
+  }
+
+  @Test
+  public void testNominalLifecycle() throws Exception {
+    startGroupMonitor();
+    groupMonitor.close();
+  }
+
+  @Test
+  public void testExceptionalLifecycle() throws Exception {
+    // Close on a non-started or failed-to-start monitor should be allowed.
+    groupMonitor.close();
+  }
+
+  @Test
+  public void testNoHosts() throws Exception {
+    assertEquals(ImmutableSet.of(), groupMonitor.get());
+
+    startGroupMonitor();
+    assertEquals(ImmutableSet.of(), groupMonitor.get());
+  }
+
+  @Test
+  public void testHostUpdates() throws Exception {
+    startGroupMonitor();
+
+    ServiceInstance one = serviceInstance("one");
+    String onePath = createMember(one);
+    ServiceInstance two = serviceInstance("two");
+    String twoPath = createMember(two);
+    assertEquals(ImmutableSet.of(one, two), groupMonitor.get());
+
+    deleteChild(twoPath);
+    assertEquals(ImmutableSet.of(one), groupMonitor.get());
+
+    deleteChild(onePath);
+    ServiceInstance three = serviceInstance("three");
+    String threePath = createMember(three);
+    assertEquals(ImmutableSet.of(three), groupMonitor.get());
+
+    deleteChild(threePath);
+    assertEquals(ImmutableSet.of(), groupMonitor.get());
+  }
+
+  @Test
+  public void testMixedNodes() throws Exception {
+    startGroupMonitor();
+
+    String nonMemberPath = createNonMember();
+    assertEquals(ImmutableSet.of(), groupMonitor.get());
+
+    ServiceInstance member = serviceInstance("member");
+    String memberPath = createMember(member);
+    assertEquals(ImmutableSet.of(member), groupMonitor.get());
+
+    deleteChild(memberPath);
+    assertEquals(ImmutableSet.of(), groupMonitor.get());
+
+    deleteChild(nonMemberPath);
+    assertEquals(ImmutableSet.of(), groupMonitor.get());
+  }
+
+  @Test
+  public void testStartBlocksOnInitialMembership() throws Exception {
+    ServiceInstance one = serviceInstance("one");
+    createMember(one, false /* waitForGroupEvent */);
+
+    ServiceInstance two = serviceInstance("two");
+    createMember(two, false /* waitForGroupEvent */);
+
+    // Not started yet, should see no group members.
+    assertEquals(ImmutableSet.of(), groupMonitor.get());
+
+    startGroupMonitor();
+    assertEquals(ImmutableSet.of(one, two), groupMonitor.get());
+  }
+
+  private void deleteChild(String twoPath) throws Exception {
+    client.delete().forPath(twoPath);
+    expectGroupEvent(PathChildrenCacheEvent.Type.CHILD_REMOVED);
+  }
+
+  private String createMember(ServiceInstance serviceInstance) throws Exception {
+    return createMember(serviceInstance, true /* waitForGroupEvent */);
+  }
+
+  private String createMember(ServiceInstance serviceInstance, boolean waitForGroupEvent)
+      throws Exception {
+
+    String path = client.create()
+        .creatingParentsIfNeeded()
+        .withMode(CreateMode.EPHEMERAL_SEQUENTIAL)
+        .forPath(ZKPaths.makePath(GROUP_PATH, MEMBER_TOKEN), serialize(serviceInstance));
+    if (waitForGroupEvent) {
+      expectGroupEvent(PathChildrenCacheEvent.Type.CHILD_ADDED);
+    }
+    return path;
+  }
+
+  private String createNonMember() throws Exception {
+    String path = client.create()
+        .creatingParentsIfNeeded()
+        .withMode(CreateMode.EPHEMERAL_SEQUENTIAL)
+        .forPath(ZKPaths.makePath(GROUP_PATH, "not-a-member"));
+    expectGroupEvent(PathChildrenCacheEvent.Type.CHILD_ADDED);
+    return path;
+  }
+
+  private byte[] serialize(ServiceInstance serviceInstance) throws IOException {
+    ByteArrayOutputStream sink = new ByteArrayOutputStream();
+    ServerSet.JSON_CODEC.serialize(serviceInstance, sink);
+    return sink.toByteArray();
+  }
+
+  private ServiceInstance serviceInstance(String hostName) {
+    return new ServiceInstance(new Endpoint(hostName, 42), ImmutableMap.of(), Status.ALIVE);
+  }
+}