You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aurora.apache.org by js...@apache.org on 2016/04/12 06:31:29 UTC
aurora git commit: Introduce a Curator-based `ServiceGroupMonitor`.
Repository: aurora
Updated Branches:
refs/heads/master 262ca16c5 -> ccf23820d
Introduce a Curator-based `ServiceGroupMonitor`.
Bugs closed: AURORA-1468
Reviewed at https://reviews.apache.org/r/45902/
Project: http://git-wip-us.apache.org/repos/asf/aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/aurora/commit/ccf23820
Tree: http://git-wip-us.apache.org/repos/asf/aurora/tree/ccf23820
Diff: http://git-wip-us.apache.org/repos/asf/aurora/diff/ccf23820
Branch: refs/heads/master
Commit: ccf23820d15e251f7c7a44621992a1297b1a902f
Parents: 262ca16
Author: John Sirois <js...@apache.org>
Authored: Mon Apr 11 22:31:27 2016 -0600
Committer: John Sirois <jo...@gmail.com>
Committed: Mon Apr 11 22:31:27 2016 -0600
----------------------------------------------------------------------
build.gradle | 5 +
.../discovery/CuratorServiceGroupMonitor.java | 106 ++++++++++
.../CuratorServiceGroupMonitorTest.java | 203 +++++++++++++++++++
3 files changed, 314 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/aurora/blob/ccf23820/build.gradle
----------------------------------------------------------------------
diff --git a/build.gradle b/build.gradle
index fc61adf..d981ab7 100644
--- a/build.gradle
+++ b/build.gradle
@@ -80,6 +80,7 @@ For more details, please see https://issues.apache.org/jira/browse/AURORA-1169
}
ext.commonsLangRev = '2.6'
+ ext.curatorRev = '2.10.0'
ext.gsonRev = '2.3.1'
ext.guavaRev = '19.0'
ext.guiceRev = '3.0'
@@ -115,6 +116,7 @@ For more details, please see https://issues.apache.org/jira/browse/AURORA-1169
force "com.google.protobuf:protobuf-java:${protobufRev}"
force "junit:junit:${junitRev}"
force "org.apache.thrift:libthrift:${thriftRev}"
+ force "org.apache.zookeeper:zookeeper:${zookeeperRev}"
force "org.hamcrest:hamcrest-core:1.3"
force "org.slf4j:slf4j-api:${slf4jRev}"
force "org.mybatis:mybatis:${mybatisRev}"
@@ -358,6 +360,9 @@ dependencies {
compile 'javax.inject:javax.inject:1'
compile "javax.servlet:javax.servlet-api:${servletRev}"
compile "org.antlr:stringtemplate:${stringTemplateRev}"
+ compile "org.apache.curator:curator-client:${curatorRev}"
+ compile "org.apache.curator:curator-framework:${curatorRev}"
+ compile "org.apache.curator:curator-recipes:${curatorRev}"
compile 'org.apache.mesos:mesos:0.26.0'
compile "org.apache.shiro:shiro-guice:${shiroRev}"
compile "org.apache.shiro:shiro-web:${shiroRev}"
http://git-wip-us.apache.org/repos/asf/aurora/blob/ccf23820/src/main/java/org/apache/aurora/scheduler/discovery/CuratorServiceGroupMonitor.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/discovery/CuratorServiceGroupMonitor.java b/src/main/java/org/apache/aurora/scheduler/discovery/CuratorServiceGroupMonitor.java
new file mode 100644
index 0000000..9d8b7bd
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/discovery/CuratorServiceGroupMonitor.java
@@ -0,0 +1,106 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.discovery;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.util.Optional;
+import java.util.function.Predicate;
+
+import com.google.common.collect.ImmutableSet;
+
+import org.apache.aurora.GuavaUtils;
+import org.apache.aurora.common.io.Codec;
+import org.apache.aurora.common.thrift.ServiceInstance;
+import org.apache.aurora.scheduler.app.SchedulerMain;
+import org.apache.aurora.scheduler.app.ServiceGroupMonitor;
+import org.apache.curator.framework.recipes.cache.ChildData;
+import org.apache.curator.framework.recipes.cache.PathChildrenCache;
+import org.apache.curator.utils.ZKPaths;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static java.util.Objects.requireNonNull;
+
+class CuratorServiceGroupMonitor implements ServiceGroupMonitor {
+ private static final Logger LOG = LoggerFactory.getLogger(SchedulerMain.class);
+
+ private final PathChildrenCache groupCache;
+ private final Predicate<String> memberSelector;
+ private final Codec<ServiceInstance> codec;
+
+ /**
+ * Creates a {@code ServiceGroupMonitor} backed by Curator.
+ *
+ * Although this monitor can be queried at any time, it will not usefully reflect service group
+ * membership until it is {@link #start() started}. When starting a monitor, it should be arranged
+ * that the monitor is {@link #close() closed} when no longer needed.
+ *
+ * It's important to be able to pick out group member nodes amongst child nodes for group paths
+ * that can contain mixed-content nodes. The given {@code memberSelector} should be able to
+ * discriminate member nodes from non-member nodes given the node name.
+ *
+ * @param groupCache The cache of group nodes.
+ * @param memberSelector A predicate that returns {@code true} for group node names that represent
+ * group members. Here the name is just the `basename` of the node's full
+ * ZooKeeper path.
+ * @param codec A codec that can be used to deserialize group member {@link ServiceInstance} data.
+ */
+ CuratorServiceGroupMonitor(
+ PathChildrenCache groupCache,
+ Predicate<String> memberSelector,
+ Codec<ServiceInstance> codec) {
+
+ this.groupCache = requireNonNull(groupCache);
+ this.memberSelector = requireNonNull(memberSelector);
+ this.codec = requireNonNull(codec);
+ }
+
+ @Override
+ public void start() throws MonitorException {
+ try {
+ // NB: This blocks on an initial group population to emulate legacy ServerSetMonitor behavior;
+ // asynchronous population is an option using NORMAL or POST_INITIALIZED_EVENT StartModes
+ // though.
+ groupCache.start(PathChildrenCache.StartMode.BUILD_INITIAL_CACHE);
+ } catch (Exception e) {
+ throw new MonitorException("Failed to begin monitoring service group.", e);
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ groupCache.close();
+ }
+
+ @Override
+ public ImmutableSet<ServiceInstance> get() {
+ return groupCache.getCurrentData().stream()
+ .filter(cd -> memberSelector.test(ZKPaths.getNodeFromPath(cd.getPath())))
+ .map(this::extractServiceInstance)
+ .filter(Optional::isPresent)
+ .map(Optional::get)
+ .collect(GuavaUtils.toImmutableSet());
+ }
+
+ private Optional<ServiceInstance> extractServiceInstance(ChildData data) {
+ ByteArrayInputStream source = new ByteArrayInputStream(data.getData());
+ try {
+ return Optional.of(codec.deserialize(source));
+ } catch (IOException e) {
+ LOG.error("Failed to deserialize ServiceInstance from " + data, e);
+ return Optional.empty();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/aurora/blob/ccf23820/src/test/java/org/apache/aurora/scheduler/discovery/CuratorServiceGroupMonitorTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/discovery/CuratorServiceGroupMonitorTest.java b/src/test/java/org/apache/aurora/scheduler/discovery/CuratorServiceGroupMonitorTest.java
new file mode 100644
index 0000000..0879c2e
--- /dev/null
+++ b/src/test/java/org/apache/aurora/scheduler/discovery/CuratorServiceGroupMonitorTest.java
@@ -0,0 +1,203 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.discovery;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.function.Predicate;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+
+import org.apache.aurora.common.thrift.Endpoint;
+import org.apache.aurora.common.thrift.ServiceInstance;
+import org.apache.aurora.common.thrift.Status;
+import org.apache.aurora.common.zookeeper.ServerSet;
+import org.apache.aurora.common.zookeeper.testing.BaseZooKeeperTest;
+import org.apache.aurora.scheduler.app.ServiceGroupMonitor;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.framework.recipes.cache.PathChildrenCache;
+import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
+import org.apache.curator.utils.ZKPaths;
+import org.apache.zookeeper.CreateMode;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class CuratorServiceGroupMonitorTest extends BaseZooKeeperTest {
+
+ private static final String GROUP_PATH = "/group/root";
+ private static final String MEMBER_TOKEN = "member_";
+
+ private CuratorFramework client;
+ private BlockingQueue<PathChildrenCacheEvent> groupEvents;
+ private CuratorServiceGroupMonitor groupMonitor;
+
+ @Before
+ public void setUpCurator() {
+ client = CuratorFrameworkFactory.builder()
+ .dontUseContainerParents() // Container nodes are only available in ZK 3.5+.
+ .retryPolicy((retryCount, elapsedTimeMs, sleeper) -> false) // Don't retry.
+ .connectString(String.format("localhost:%d", getServer().getPort()))
+ .build();
+ client.start();
+ addTearDown(client::close);
+
+ PathChildrenCache groupCache =
+ new PathChildrenCache(client, GROUP_PATH, true /* cacheData */);
+ groupEvents = new LinkedBlockingQueue<>();
+ groupCache.getListenable().addListener((c, event) -> groupEvents.put(event));
+
+ Predicate<String> memberSelector = name -> name.contains(MEMBER_TOKEN);
+ groupMonitor = new CuratorServiceGroupMonitor(groupCache, memberSelector, ServerSet.JSON_CODEC);
+ }
+
+ private void startGroupMonitor() throws ServiceGroupMonitor.MonitorException {
+ groupMonitor.start();
+ addTearDown(groupMonitor::close);
+ }
+
+ private void expectGroupEvent(PathChildrenCacheEvent.Type eventType) {
+ while (true) {
+ try {
+ PathChildrenCacheEvent event = groupEvents.take();
+ if (event.getType() == eventType) {
+ break;
+ }
+ } catch (InterruptedException ex) {
+ throw new RuntimeException(ex);
+ }
+ }
+ }
+
+ @Test
+ public void testNominalLifecycle() throws Exception {
+ startGroupMonitor();
+ groupMonitor.close();
+ }
+
+ @Test
+ public void testExceptionalLifecycle() throws Exception {
+ // Close on a non-started or failed-to-start monitor should be allowed.
+ groupMonitor.close();
+ }
+
+ @Test
+ public void testNoHosts() throws Exception {
+ assertEquals(ImmutableSet.of(), groupMonitor.get());
+
+ startGroupMonitor();
+ assertEquals(ImmutableSet.of(), groupMonitor.get());
+ }
+
+ @Test
+ public void testHostUpdates() throws Exception {
+ startGroupMonitor();
+
+ ServiceInstance one = serviceInstance("one");
+ String onePath = createMember(one);
+ ServiceInstance two = serviceInstance("two");
+ String twoPath = createMember(two);
+ assertEquals(ImmutableSet.of(one, two), groupMonitor.get());
+
+ deleteChild(twoPath);
+ assertEquals(ImmutableSet.of(one), groupMonitor.get());
+
+ deleteChild(onePath);
+ ServiceInstance three = serviceInstance("three");
+ String threePath = createMember(three);
+ assertEquals(ImmutableSet.of(three), groupMonitor.get());
+
+ deleteChild(threePath);
+ assertEquals(ImmutableSet.of(), groupMonitor.get());
+ }
+
+ @Test
+ public void testMixedNodes() throws Exception {
+ startGroupMonitor();
+
+ String nonMemberPath = createNonMember();
+ assertEquals(ImmutableSet.of(), groupMonitor.get());
+
+ ServiceInstance member = serviceInstance("member");
+ String memberPath = createMember(member);
+ assertEquals(ImmutableSet.of(member), groupMonitor.get());
+
+ deleteChild(memberPath);
+ assertEquals(ImmutableSet.of(), groupMonitor.get());
+
+ deleteChild(nonMemberPath);
+ assertEquals(ImmutableSet.of(), groupMonitor.get());
+ }
+
+ @Test
+ public void testStartBlocksOnInitialMembership() throws Exception {
+ ServiceInstance one = serviceInstance("one");
+ createMember(one, false /* waitForGroupEvent */);
+
+ ServiceInstance two = serviceInstance("two");
+ createMember(two, false /* waitForGroupEvent */);
+
+ // Not started yet, should see no group members.
+ assertEquals(ImmutableSet.of(), groupMonitor.get());
+
+ startGroupMonitor();
+ assertEquals(ImmutableSet.of(one, two), groupMonitor.get());
+ }
+
+ private void deleteChild(String twoPath) throws Exception {
+ client.delete().forPath(twoPath);
+ expectGroupEvent(PathChildrenCacheEvent.Type.CHILD_REMOVED);
+ }
+
+ private String createMember(ServiceInstance serviceInstance) throws Exception {
+ return createMember(serviceInstance, true /* waitForGroupEvent */);
+ }
+
+ private String createMember(ServiceInstance serviceInstance, boolean waitForGroupEvent)
+ throws Exception {
+
+ String path = client.create()
+ .creatingParentsIfNeeded()
+ .withMode(CreateMode.EPHEMERAL_SEQUENTIAL)
+ .forPath(ZKPaths.makePath(GROUP_PATH, MEMBER_TOKEN), serialize(serviceInstance));
+ if (waitForGroupEvent) {
+ expectGroupEvent(PathChildrenCacheEvent.Type.CHILD_ADDED);
+ }
+ return path;
+ }
+
+ private String createNonMember() throws Exception {
+ String path = client.create()
+ .creatingParentsIfNeeded()
+ .withMode(CreateMode.EPHEMERAL_SEQUENTIAL)
+ .forPath(ZKPaths.makePath(GROUP_PATH, "not-a-member"));
+ expectGroupEvent(PathChildrenCacheEvent.Type.CHILD_ADDED);
+ return path;
+ }
+
+ private byte[] serialize(ServiceInstance serviceInstance) throws IOException {
+ ByteArrayOutputStream sink = new ByteArrayOutputStream();
+ ServerSet.JSON_CODEC.serialize(serviceInstance, sink);
+ return sink.toByteArray();
+ }
+
+ private ServiceInstance serviceInstance(String hostName) {
+ return new ServiceInstance(new Endpoint(hostName, 42), ImmutableMap.of(), Status.ALIVE);
+ }
+}