You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aurora.apache.org by dm...@apache.org on 2016/12/01 17:02:30 UTC
[1/4] aurora git commit: Revert removal of twitter/commons/zk based
leadership code
Repository: aurora
Updated Branches:
refs/heads/master 8bcad84dc -> 16e4651d5
http://git-wip-us.apache.org/repos/asf/aurora/blob/16e4651d/src/test/java/org/apache/aurora/scheduler/discovery/JsonCodecTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/discovery/JsonCodecTest.java b/src/test/java/org/apache/aurora/scheduler/discovery/JsonCodecTest.java
deleted file mode 100644
index b88ba37..0000000
--- a/src/test/java/org/apache/aurora/scheduler/discovery/JsonCodecTest.java
+++ /dev/null
@@ -1,159 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.scheduler.discovery;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-
-import com.google.common.base.Charsets;
-import com.google.common.collect.ImmutableMap;
-import com.google.gson.Gson;
-import com.google.gson.JsonIOException;
-
-import org.apache.aurora.common.thrift.Endpoint;
-import org.apache.aurora.common.thrift.ServiceInstance;
-import org.apache.aurora.common.thrift.Status;
-import org.easymock.EasyMock;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.powermock.api.easymock.PowerMock;
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.modules.junit4.PowerMockRunner;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-@RunWith(PowerMockRunner.class)
-@PrepareForTest(Gson.class)
-public class JsonCodecTest {
-
- private static byte[] serializeServiceInstance(ServiceInstance serviceInstance)
- throws IOException {
-
- ByteArrayOutputStream output = new ByteArrayOutputStream();
- JsonCodec.INSTANCE.serialize(serviceInstance, output);
- return output.toByteArray();
- }
-
- private static ServiceInstance deserializeServiceInstance(byte[] data) throws IOException {
- return JsonCodec.INSTANCE.deserialize(new ByteArrayInputStream(data));
- }
-
- @Test
- public void testJsonCodecRoundtrip() throws Exception {
- ServiceInstance instance1 = new ServiceInstance(
- new Endpoint("foo", 1000),
- ImmutableMap.of("http", new Endpoint("foo", 8080)),
- Status.ALIVE)
- .setShard(0);
- byte[] data = serializeServiceInstance(instance1);
- assertTrue(deserializeServiceInstance(data).getServiceEndpoint().isSetPort());
- assertTrue(deserializeServiceInstance(data).isSetShard());
-
- ServiceInstance instance2 = new ServiceInstance(
- new Endpoint("foo", 1000),
- ImmutableMap.of("http-admin1", new Endpoint("foo", 8080)),
- Status.ALIVE);
- data = serializeServiceInstance(instance2);
- assertTrue(deserializeServiceInstance(data).getServiceEndpoint().isSetPort());
- assertFalse(deserializeServiceInstance(data).isSetShard());
-
- ServiceInstance instance3 = new ServiceInstance(
- new Endpoint("foo", 1000),
- ImmutableMap.of(),
- Status.ALIVE);
- data = serializeServiceInstance(instance3);
- assertTrue(deserializeServiceInstance(data).getServiceEndpoint().isSetPort());
- assertFalse(deserializeServiceInstance(data).isSetShard());
- }
-
- @Test
- public void testJsonCompatibility() throws IOException {
- ServiceInstance instance = new ServiceInstance(
- new Endpoint("foo", 1000),
- ImmutableMap.of("http", new Endpoint("foo", 8080)),
- Status.ALIVE).setShard(42);
-
- ByteArrayOutputStream results = new ByteArrayOutputStream();
- JsonCodec.INSTANCE.serialize(instance, results);
- assertEquals(
- "{\"serviceEndpoint\":{\"host\":\"foo\",\"port\":1000},"
- + "\"additionalEndpoints\":{\"http\":{\"host\":\"foo\",\"port\":8080}},"
- + "\"status\":\"ALIVE\","
- + "\"shard\":42}",
- results.toString(Charsets.UTF_8.name()));
- }
-
- @Test
- public void testInvalidSerialize() {
- // Gson is final so we need to call on PowerMock here.
- Gson gson = PowerMock.createMock(Gson.class);
- gson.toJson(EasyMock.isA(Object.class), EasyMock.isA(Appendable.class));
- EasyMock.expectLastCall().andThrow(new JsonIOException("error"));
- PowerMock.replay(gson);
-
- ServiceInstance instance =
- new ServiceInstance(new Endpoint("foo", 1000), ImmutableMap.of(), Status.ALIVE);
-
- try {
- new JsonCodec(gson).serialize(instance, new ByteArrayOutputStream());
- fail();
- } catch (IOException e) {
- // Expected.
- }
-
- PowerMock.verify(gson);
- }
-
- @Test
- public void testDeserializeMinimal() throws IOException {
- String minimal = "{\"serviceEndpoint\":{\"host\":\"foo\",\"port\":1000},\"status\":\"ALIVE\"}";
- ByteArrayInputStream source = new ByteArrayInputStream(minimal.getBytes(Charsets.UTF_8));
- ServiceInstance actual = JsonCodec.INSTANCE.deserialize(source);
- ServiceInstance expected =
- new ServiceInstance(new Endpoint("foo", 1000), ImmutableMap.of(), Status.ALIVE);
- assertEquals(expected, actual);
- }
-
- @Test
- public void testInvalidDeserialize() {
- // Not JSON.
- assertInvalidDeserialize(new byte[] {0xC, 0xA, 0xF, 0xE});
-
- // No JSON object.
- assertInvalidDeserialize("");
- assertInvalidDeserialize("[]");
-
- // Missing required fields.
- assertInvalidDeserialize("{}");
- assertInvalidDeserialize("{\"serviceEndpoint\":{\"host\":\"foo\",\"port\":1000}}");
- assertInvalidDeserialize("{\"status\":\"ALIVE\"}");
- }
-
- private void assertInvalidDeserialize(String data) {
- assertInvalidDeserialize(data.getBytes(Charsets.UTF_8));
- }
-
- private void assertInvalidDeserialize(byte[] data) {
- try {
- JsonCodec.INSTANCE.deserialize(new ByteArrayInputStream(data));
- fail();
- } catch (IOException e) {
- // Expected.
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/aurora/blob/16e4651d/src/test/java/org/apache/aurora/scheduler/discovery/ZooKeeperConfigTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/discovery/ZooKeeperConfigTest.java b/src/test/java/org/apache/aurora/scheduler/discovery/ZooKeeperConfigTest.java
index 4d833f2..a065505 100644
--- a/src/test/java/org/apache/aurora/scheduler/discovery/ZooKeeperConfigTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/discovery/ZooKeeperConfigTest.java
@@ -14,12 +14,14 @@
package org.apache.aurora.scheduler.discovery;
import java.net.InetSocketAddress;
-import java.util.Optional;
+import com.google.common.base.Optional;
import com.google.common.collect.ImmutableList;
import org.apache.aurora.common.quantity.Amount;
import org.apache.aurora.common.quantity.Time;
+import org.apache.aurora.common.zookeeper.Credentials;
+import org.apache.aurora.common.zookeeper.ZooKeeperUtils;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
@@ -35,22 +37,24 @@ public class ZooKeeperConfigTest {
@Test(expected = IllegalArgumentException.class)
public void testEmptyServers() {
new ZooKeeperConfig(
+ false,
ImmutableList.of(),
- Optional.empty(),
+ Optional.absent(),
false,
Amount.of(1, Time.DAYS),
- Optional.empty());
+ Optional.absent());
}
@Test
public void testWithCredentials() {
ZooKeeperConfig config =
new ZooKeeperConfig(
+ false,
SERVERS,
- Optional.empty(),
+ Optional.absent(),
false,
Amount.of(1, Time.HOURS),
- Optional.empty()); // credentials
+ Optional.absent()); // credentials
assertFalse(config.getCredentials().isPresent());
Credentials joeCreds = Credentials.digestCredentials("Joe", "Schmoe");
@@ -66,8 +70,9 @@ public class ZooKeeperConfigTest {
@Test
public void testCreateFactory() {
- ZooKeeperConfig config = ZooKeeperConfig.create(SERVERS);
+ ZooKeeperConfig config = ZooKeeperConfig.create(true, SERVERS);
+ assertTrue(config.isUseCurator());
assertEquals(SERVERS, ImmutableList.copyOf(config.getServers()));
assertFalse(config.getChrootPath().isPresent());
assertFalse(config.isInProcess());
http://git-wip-us.apache.org/repos/asf/aurora/blob/16e4651d/src/test/java/org/apache/aurora/scheduler/http/AbstractJettyTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/http/AbstractJettyTest.java b/src/test/java/org/apache/aurora/scheduler/http/AbstractJettyTest.java
index d9e7374..fb03f25 100644
--- a/src/test/java/org/apache/aurora/scheduler/http/AbstractJettyTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/http/AbstractJettyTest.java
@@ -48,9 +48,9 @@ import org.apache.aurora.scheduler.AppStartup;
import org.apache.aurora.scheduler.SchedulerServicesModule;
import org.apache.aurora.scheduler.TierManager;
import org.apache.aurora.scheduler.app.LifecycleModule;
+import org.apache.aurora.scheduler.app.ServiceGroupMonitor;
import org.apache.aurora.scheduler.async.AsyncModule;
import org.apache.aurora.scheduler.cron.CronJobManager;
-import org.apache.aurora.scheduler.discovery.ServiceGroupMonitor;
import org.apache.aurora.scheduler.http.api.GsonMessageBodyHandler;
import org.apache.aurora.scheduler.offers.OfferManager;
import org.apache.aurora.scheduler.scheduling.RescheduleCalculator;
@@ -132,8 +132,9 @@ public abstract class AbstractJettyTest extends EasyMockTest {
bindMock(Thread.UncaughtExceptionHandler.class);
bindMock(TaskGroups.TaskGroupBatchWorker.class);
- bind(ServletContextListener.class)
- .toProvider(() -> makeServletContextListener(injector, getChildServletModule()));
+ bind(ServletContextListener.class).toProvider(() -> {
+ return makeServletContextListener(injector, getChildServletModule());
+ });
}
},
new JettyServerModule(false));
@@ -146,12 +147,12 @@ public abstract class AbstractJettyTest extends EasyMockTest {
expect(serviceGroupMonitor.get()).andAnswer(schedulers::get).anyTimes();
}
- void setLeadingScheduler(String host, int port) {
+ protected void setLeadingScheduler(String host, int port) {
schedulers.set(
ImmutableSet.of(new ServiceInstance().setServiceEndpoint(new Endpoint(host, port))));
}
- void unsetLeadingSchduler() {
+ protected void unsetLeadingSchduler() {
schedulers.set(ImmutableSet.of());
}
@@ -161,7 +162,9 @@ public abstract class AbstractJettyTest extends EasyMockTest {
ServiceManagerIface service =
injector.getInstance(Key.get(ServiceManagerIface.class, AppStartup.class));
service.startAsync().awaitHealthy();
- addTearDown(() -> service.stopAsync().awaitStopped(5L, TimeUnit.SECONDS));
+ addTearDown(() -> {
+ service.stopAsync().awaitStopped(5L, TimeUnit.SECONDS);
+ });
} catch (Exception e) {
throw Throwables.propagate(e);
}
http://git-wip-us.apache.org/repos/asf/aurora/blob/16e4651d/src/test/java/org/apache/aurora/scheduler/http/LeaderRedirectTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/http/LeaderRedirectTest.java b/src/test/java/org/apache/aurora/scheduler/http/LeaderRedirectTest.java
index a308ba2..a16058f 100644
--- a/src/test/java/org/apache/aurora/scheduler/http/LeaderRedirectTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/http/LeaderRedirectTest.java
@@ -27,8 +27,8 @@ import com.google.common.net.HostAndPort;
import org.apache.aurora.common.testing.easymock.EasyMockTest;
import org.apache.aurora.common.thrift.Endpoint;
import org.apache.aurora.common.thrift.ServiceInstance;
-import org.apache.aurora.scheduler.discovery.ServiceGroupMonitor;
-import org.apache.aurora.scheduler.discovery.ServiceGroupMonitor.MonitorException;
+import org.apache.aurora.scheduler.app.ServiceGroupMonitor;
+import org.apache.aurora.scheduler.app.ServiceGroupMonitor.MonitorException;
import org.junit.Before;
import org.junit.Test;
http://git-wip-us.apache.org/repos/asf/aurora/blob/16e4651d/src/test/java/org/apache/aurora/scheduler/thrift/ThriftIT.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/thrift/ThriftIT.java b/src/test/java/org/apache/aurora/scheduler/thrift/ThriftIT.java
index 0119ccb..86861e1 100644
--- a/src/test/java/org/apache/aurora/scheduler/thrift/ThriftIT.java
+++ b/src/test/java/org/apache/aurora/scheduler/thrift/ThriftIT.java
@@ -42,12 +42,12 @@ import org.apache.aurora.gen.TaskQuery;
import org.apache.aurora.scheduler.TierModule;
import org.apache.aurora.scheduler.app.AppModule;
import org.apache.aurora.scheduler.app.LifecycleModule;
+import org.apache.aurora.scheduler.app.ServiceGroupMonitor;
import org.apache.aurora.scheduler.app.local.FakeNonVolatileStorage;
import org.apache.aurora.scheduler.base.TaskTestUtil;
import org.apache.aurora.scheduler.configuration.ConfigurationManager.ConfigurationManagerSettings;
import org.apache.aurora.scheduler.configuration.executor.ExecutorSettings;
import org.apache.aurora.scheduler.cron.quartz.CronModule;
-import org.apache.aurora.scheduler.discovery.ServiceGroupMonitor;
import org.apache.aurora.scheduler.mesos.DriverFactory;
import org.apache.aurora.scheduler.mesos.DriverSettings;
import org.apache.aurora.scheduler.mesos.TestExecutorSettings;
[3/4] aurora git commit: Revert removal of twitter/commons/zk based
leadership code
Posted by dm...@apache.org.
http://git-wip-us.apache.org/repos/asf/aurora/blob/16e4651d/commons/src/main/java/org/apache/aurora/common/zookeeper/ZooKeeperClient.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/org/apache/aurora/common/zookeeper/ZooKeeperClient.java b/commons/src/main/java/org/apache/aurora/common/zookeeper/ZooKeeperClient.java
new file mode 100644
index 0000000..ce243fb
--- /dev/null
+++ b/commons/src/main/java/org/apache/aurora/common/zookeeper/ZooKeeperClient.java
@@ -0,0 +1,372 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.common.zookeeper;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Joiner;
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterables;
+
+import org.apache.aurora.common.base.Command;
+import org.apache.aurora.common.net.InetSocketAddressHelper;
+import org.apache.aurora.common.quantity.Amount;
+import org.apache.aurora.common.quantity.Time;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.KeeperException.SessionExpiredException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.Watcher.Event.EventType;
+import org.apache.zookeeper.Watcher.Event.KeeperState;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.common.PathUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Manages a connection to a ZooKeeper cluster.
+ */
+public class ZooKeeperClient {
+
+ /**
+ * Indicates an error connecting to a zookeeper cluster.
+ */
+ public class ZooKeeperConnectionException extends Exception {
+ ZooKeeperConnectionException(String message, Throwable cause) {
+ super(message, cause);
+ }
+ }
+
+ private final class SessionState {
+ private final long sessionId;
+ private final byte[] sessionPasswd;
+
+ private SessionState(long sessionId, byte[] sessionPasswd) {
+ this.sessionId = sessionId;
+ this.sessionPasswd = sessionPasswd;
+ }
+ }
+
+ private static final Logger LOG = LoggerFactory.getLogger(ZooKeeperClient.class);
+
+ private static final Amount<Long,Time> WAIT_FOREVER = Amount.of(0L, Time.MILLISECONDS);
+
+ private final int sessionTimeoutMs;
+ private final Optional<Credentials> credentials;
+ private final String zooKeeperServers;
+ // GuardedBy "this", but still volatile for tests, where we want to be able to see writes
+ // made from within long synchronized blocks.
+ private volatile ZooKeeper zooKeeper;
+ private SessionState sessionState;
+
+ private final Set<Watcher> watchers = new CopyOnWriteArraySet<Watcher>();
+ private final BlockingQueue<WatchedEvent> eventQueue = new LinkedBlockingQueue<WatchedEvent>();
+
+ private static Iterable<InetSocketAddress> combine(InetSocketAddress address,
+ InetSocketAddress... addresses) {
+ return ImmutableSet.<InetSocketAddress>builder().add(address).add(addresses).build();
+ }
+
+ /**
+ * Creates an unconnected client that will lazily attempt to connect on the first call to
+ * {@link #get()}.
+ *
+ * @param sessionTimeout the ZK session timeout
+ * @param zooKeeperServer the first, required ZK server
+ * @param zooKeeperServers any additional servers forming the ZK cluster
+ */
+ public ZooKeeperClient(Amount<Integer, Time> sessionTimeout, InetSocketAddress zooKeeperServer,
+ InetSocketAddress... zooKeeperServers) {
+ this(sessionTimeout, combine(zooKeeperServer, zooKeeperServers));
+ }
+
+ /**
+ * Creates an unconnected client that will lazily attempt to connect on the first call to
+ * {@link #get}.
+ *
+ * @param sessionTimeout the ZK session timeout
+ * @param zooKeeperServers the set of servers forming the ZK cluster
+ */
+ public ZooKeeperClient(Amount<Integer, Time> sessionTimeout,
+ Iterable<InetSocketAddress> zooKeeperServers) {
+ this(sessionTimeout, Optional.absent(), Optional.absent(), zooKeeperServers);
+ }
+
+ /**
+ * Creates an unconnected client that will lazily attempt to connect on the first call to
+ * {@link #get()}. All successful connections will be authenticated with the given
+ * {@code credentials}.
+ *
+ * @param sessionTimeout the ZK session timeout
+ * @param credentials the credentials to authenticate with
+ * @param zooKeeperServer the first, required ZK server
+ * @param zooKeeperServers any additional servers forming the ZK cluster
+ */
+ public ZooKeeperClient(Amount<Integer, Time> sessionTimeout, Credentials credentials,
+ InetSocketAddress zooKeeperServer, InetSocketAddress... zooKeeperServers) {
+ this(sessionTimeout,
+ Optional.of(credentials),
+ Optional.absent(),
+ combine(zooKeeperServer, zooKeeperServers));
+ }
+
+ /**
+ * Creates an unconnected client that will lazily attempt to connect on the first call to
+ * {@link #get}. All successful connections will be authenticated with the given
+ * {@code credentials}.
+ *
+ * @param sessionTimeout the ZK session timeout
+ * @param credentials the credentials to authenticate with
+ * @param zooKeeperServers the set of servers forming the ZK cluster
+ */
+ public ZooKeeperClient(Amount<Integer, Time> sessionTimeout, Credentials credentials,
+ Iterable<InetSocketAddress> zooKeeperServers) {
+ this(sessionTimeout,
+ Optional.of(credentials),
+ Optional.absent(),
+ zooKeeperServers);
+ }
+
+ /**
+ * Creates an unconnected client that will lazily attempt to connect on the first call to
+ * {@link #get}. All successful connections will be authenticated with the given
+ * {@code credentials}.
+ *
+ * @param sessionTimeout the ZK session timeout
+ * @param credentials the credentials to authenticate with
+ * @param chrootPath an optional chroot path
+ * @param zooKeeperServers the set of servers forming the ZK cluster
+ */
+ public ZooKeeperClient(Amount<Integer, Time> sessionTimeout, Optional<Credentials> credentials,
+ Optional<String> chrootPath, Iterable<InetSocketAddress> zooKeeperServers) {
+ this.sessionTimeoutMs = Preconditions.checkNotNull(sessionTimeout).as(Time.MILLISECONDS);
+ this.credentials = Preconditions.checkNotNull(credentials);
+
+ if (chrootPath.isPresent()) {
+ PathUtils.validatePath(chrootPath.get());
+ }
+
+ Preconditions.checkNotNull(zooKeeperServers);
+ Preconditions.checkArgument(!Iterables.isEmpty(zooKeeperServers),
+ "Must present at least 1 ZK server");
+
+ Thread watcherProcessor = new Thread("ZookeeperClient-watcherProcessor") {
+ @Override
+ public void run() {
+ while (true) {
+ try {
+ WatchedEvent event = eventQueue.take();
+ for (Watcher watcher : watchers) {
+ watcher.process(event);
+ }
+ } catch (InterruptedException e) { /* ignore */ }
+ }
+ }
+ };
+ watcherProcessor.setDaemon(true);
+ watcherProcessor.start();
+
+ Iterable<String> servers =
+ Iterables.transform(ImmutableSet.copyOf(zooKeeperServers),
+ InetSocketAddressHelper::toString);
+ this.zooKeeperServers = Joiner.on(',').join(servers).concat(chrootPath.or(""));
+ }
+
+ /**
+ * Returns the current active ZK connection or establishes a new one if none has yet been
+ * established or a previous connection was disconnected or had its session time out. This method
+ * will attempt to re-use sessions when possible. Equivalent to:
+ * <pre>get(Amount.of(0L, ...)</pre>.
+ *
+ * @return a connected ZooKeeper client
+ * @throws ZooKeeperConnectionException if there was a problem connecting to the ZK cluster
+ * @throws InterruptedException if interrupted while waiting for a connection to be established
+ */
+ public synchronized ZooKeeper get() throws ZooKeeperConnectionException, InterruptedException {
+ try {
+ return get(WAIT_FOREVER);
+ } catch (TimeoutException e) {
+ InterruptedException interruptedException =
+ new InterruptedException("Got an unexpected TimeoutException for 0 wait");
+ interruptedException.initCause(e);
+ throw interruptedException;
+ }
+ }
+
+ /**
+ * Returns the current active ZK connection or establishes a new one if none has yet been
+ * established or a previous connection was disconnected or had its session time out. This
+ * method will attempt to re-use sessions when possible.
+ *
+ * @param connectionTimeout the maximum amount of time to wait for the connection to the ZK
+ * cluster to be established; 0 to wait forever
+ * @return a connected ZooKeeper client
+ * @throws ZooKeeperConnectionException if there was a problem connecting to the ZK cluster
+ * @throws InterruptedException if interrupted while waiting for a connection to be established
+ * @throws TimeoutException if a connection could not be established within the configured
+ * session timeout
+ */
+ public synchronized ZooKeeper get(Amount<Long, Time> connectionTimeout)
+ throws ZooKeeperConnectionException, InterruptedException, TimeoutException {
+
+ if (zooKeeper == null) {
+ final CountDownLatch connected = new CountDownLatch(1);
+ Watcher watcher = event -> {
+ switch (event.getType()) {
+ // Guard the None type since this watch may be used as the default watch on calls by
+ // the client outside our control.
+ case None:
+ switch (event.getState()) {
+ case Expired:
+ LOG.info("Zookeeper session expired. Event: " + event);
+ close();
+ break;
+ case SyncConnected:
+ connected.countDown();
+ break;
+ }
+ }
+
+ eventQueue.offer(event);
+ };
+
+ try {
+ zooKeeper = (sessionState != null)
+ ? new ZooKeeper(zooKeeperServers, sessionTimeoutMs, watcher, sessionState.sessionId,
+ sessionState.sessionPasswd)
+ : new ZooKeeper(zooKeeperServers, sessionTimeoutMs, watcher);
+ } catch (IOException e) {
+ throw new ZooKeeperConnectionException(
+ "Problem connecting to servers: " + zooKeeperServers, e);
+ }
+
+ if (connectionTimeout.getValue() > 0) {
+ if (!connected.await(connectionTimeout.as(Time.MILLISECONDS), TimeUnit.MILLISECONDS)) {
+ close();
+ throw new TimeoutException("Timed out waiting for a ZK connection after "
+ + connectionTimeout);
+ }
+ } else {
+ try {
+ connected.await();
+ } catch (InterruptedException ex) {
+ LOG.info("Interrupted while waiting to connect to zooKeeper");
+ close();
+ throw ex;
+ }
+ }
+ if (credentials.isPresent()) {
+ Credentials zkCredentials = credentials.get();
+ zooKeeper.addAuthInfo(zkCredentials.scheme(), zkCredentials.authToken());
+ }
+
+ sessionState = new SessionState(zooKeeper.getSessionId(), zooKeeper.getSessionPasswd());
+ }
+ return zooKeeper;
+ }
+
+ /**
+ * Clients that need to re-establish state after session expiration can register an
+ * {@code onExpired} command to execute.
+ *
+ * @param onExpired the {@code Command} to register
+ * @return the new {@link Watcher} which can later be passed to {@link #unregister} for
+ * removal.
+ */
+ public Watcher registerExpirationHandler(final Command onExpired) {
+ Watcher watcher = event -> {
+ if (event.getType() == EventType.None && event.getState() == KeeperState.Expired) {
+ onExpired.execute();
+ }
+ };
+ register(watcher);
+ return watcher;
+ }
+
+ /**
+ * Clients that need to register a top-level {@code Watcher} should do so using this method. The
+ * registered {@code watcher} will remain registered across re-connects and session expiration
+ * events.
+ *
+ * @param watcher the {@code Watcher to register}
+ */
+ public void register(Watcher watcher) {
+ watchers.add(watcher);
+ }
+
+ /**
+ * Clients can attempt to unregister a top-level {@code Watcher} that has previously been
+ * registered.
+ *
+ * @param watcher the {@code Watcher} to unregister as a top-level, persistent watch
+ * @return whether the given {@code Watcher} was found and removed from the active set
+ */
+ public boolean unregister(Watcher watcher) {
+ return watchers.remove(watcher);
+ }
+
+ /**
+ * Checks to see if the client might reasonably re-try an operation given the exception thrown
+ * while attempting it. If the ZooKeeper session should be expired to enable the re-try to
+ * succeed this method will expire it as a side-effect.
+ *
+ * @param e the exception to test
+ * @return true if a retry can be attempted
+ */
+ public boolean shouldRetry(KeeperException e) {
+ if (e instanceof SessionExpiredException) {
+ close();
+ }
+ return ZooKeeperUtils.isRetryable(e);
+ }
+
+ /**
+ * Closes the current connection if any expiring the current ZooKeeper session. Any subsequent
+ * calls to this method will no-op until the next successful {@link #get}.
+ */
+ public synchronized void close() {
+ if (zooKeeper != null) {
+ try {
+ zooKeeper.close();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ LOG.warn("Interrupted trying to close zooKeeper");
+ } finally {
+ zooKeeper = null;
+ sessionState = null;
+ }
+ }
+ }
+
+ @VisibleForTesting
+ synchronized boolean isClosed() {
+ return zooKeeper == null;
+ }
+
+ @VisibleForTesting
+ ZooKeeper getZooKeeperClientForTests() {
+ return zooKeeper;
+ }
+}
http://git-wip-us.apache.org/repos/asf/aurora/blob/16e4651d/commons/src/main/java/org/apache/aurora/common/zookeeper/ZooKeeperUtils.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/org/apache/aurora/common/zookeeper/ZooKeeperUtils.java b/commons/src/main/java/org/apache/aurora/common/zookeeper/ZooKeeperUtils.java
new file mode 100644
index 0000000..2ada264
--- /dev/null
+++ b/commons/src/main/java/org/apache/aurora/common/zookeeper/ZooKeeperUtils.java
@@ -0,0 +1,167 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.common.zookeeper;
+
+import java.util.List;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+
+import org.apache.aurora.common.quantity.Amount;
+import org.apache.aurora.common.quantity.Time;
+import org.apache.aurora.common.zookeeper.ZooKeeperClient.ZooKeeperConnectionException;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.common.PathUtils;
+import org.apache.zookeeper.data.ACL;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Utilities for dealing with zoo keeper.
+ */
+public final class ZooKeeperUtils {
+
+ private static final Logger LOG = LoggerFactory.getLogger(ZooKeeperUtils.class);
+
+ /**
+ * An appropriate default session timeout for Twitter ZooKeeper clusters.
+ */
+ public static final Amount<Integer,Time> DEFAULT_ZK_SESSION_TIMEOUT = Amount.of(4, Time.SECONDS);
+
+ /**
+ * The magic version number that allows any mutation to always succeed regardless of actual
+ * version number.
+ */
+ public static final int ANY_VERSION = -1;
+
+ /**
+ * An ACL that gives all permissions any user authenticated or not.
+ */
+ public static final ImmutableList<ACL> OPEN_ACL_UNSAFE =
+ ImmutableList.copyOf(Ids.OPEN_ACL_UNSAFE);
+
+ /**
+ * An ACL that gives all permissions to node creators and read permissions only to everyone else.
+ */
+ public static final ImmutableList<ACL> EVERYONE_READ_CREATOR_ALL =
+ ImmutableList.<ACL>builder()
+ .addAll(Ids.CREATOR_ALL_ACL)
+ .addAll(Ids.READ_ACL_UNSAFE)
+ .build();
+
+ /**
+ * Returns true if the given exception indicates an error that can be resolved by retrying the
+ * operation without modification.
+ *
+ * @param e the exception to check
+ * @return true if the causing operation is strictly retryable
+ */
+ public static boolean isRetryable(KeeperException e) {
+ Preconditions.checkNotNull(e);
+
+ switch (e.code()) {
+ case CONNECTIONLOSS:
+ case SESSIONEXPIRED:
+ case SESSIONMOVED:
+ case OPERATIONTIMEOUT:
+ return true;
+
+ case RUNTIMEINCONSISTENCY:
+ case DATAINCONSISTENCY:
+ case MARSHALLINGERROR:
+ case BADARGUMENTS:
+ case NONODE:
+ case NOAUTH:
+ case BADVERSION:
+ case NOCHILDRENFOREPHEMERALS:
+ case NODEEXISTS:
+ case NOTEMPTY:
+ case INVALIDCALLBACK:
+ case INVALIDACL:
+ case AUTHFAILED:
+ case UNIMPLEMENTED:
+
+ // These two should not be encountered - they are used internally by ZK to specify ranges
+ case SYSTEMERROR:
+ case APIERROR:
+
+ case OK: // This is actually an invalid ZK exception code
+
+ default:
+ return false;
+ }
+ }
+
+ /**
+ * Ensures the given {@code path} exists in the ZK cluster accessed by {@code zkClient}. If the
+ * path already exists, nothing is done; however if any portion of the path is missing, it will be
+ * created with the given {@code acl} as a persistent zookeeper node. The given {@code path} must
+ * be a valid zookeeper absolute path.
+ *
+ * @param zkClient the client to use to access the ZK cluster
+ * @param acl the acl to use if creating path nodes
+ * @param path the path to ensure exists
+ * @throws ZooKeeperConnectionException if there was a problem accessing the ZK cluster
+ * @throws InterruptedException if we were interrupted attempting to connect to the ZK cluster
+ * @throws KeeperException if there was a problem in ZK
+ */
+ public static void ensurePath(ZooKeeperClient zkClient, List<ACL> acl, String path)
+ throws ZooKeeperConnectionException, InterruptedException, KeeperException {
+ Preconditions.checkNotNull(zkClient);
+ Preconditions.checkNotNull(path);
+ Preconditions.checkArgument(path.startsWith("/"));
+
+ ensurePathInternal(zkClient, acl, path);
+ }
+
+ private static void ensurePathInternal(ZooKeeperClient zkClient, List<ACL> acl, String path)
+ throws ZooKeeperConnectionException, InterruptedException, KeeperException {
+ if (zkClient.get().exists(path, false) == null) {
+ // The current path does not exist; so back up a level and ensure the parent path exists
+ // unless we're already a root-level path.
+ int lastPathIndex = path.lastIndexOf('/');
+ if (lastPathIndex > 0) {
+ ensurePathInternal(zkClient, acl, path.substring(0, lastPathIndex));
+ }
+
+ // We've ensured our parent path (if any) exists so we can proceed to create our path.
+ try {
+ zkClient.get().create(path, null, acl, CreateMode.PERSISTENT);
+ } catch (KeeperException.NodeExistsException e) {
+ // This ensures we don't die if a race condition was met between checking existence and
+ // trying to create the node.
+ LOG.info("Node existed when trying to ensure path " + path + ", somebody beat us to it?");
+ }
+ }
+ }
+
+ /**
+ * Validate and return a normalized zookeeper path which doesn't contain consecutive slashes and
+ * never ends with a slash (except for root path).
+ *
+ * @param path the path to be normalized
+ * @return normalized path string
+ */
+ public static String normalizePath(String path) {
+ String normalizedPath = path.replaceAll("//+", "/").replaceFirst("(.+)/$", "$1");
+ PathUtils.validatePath(normalizedPath);
+ return normalizedPath;
+ }
+
+ private ZooKeeperUtils() {
+ // utility
+ }
+}
http://git-wip-us.apache.org/repos/asf/aurora/blob/16e4651d/commons/src/main/java/org/apache/aurora/common/zookeeper/testing/BaseZooKeeperClientTest.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/org/apache/aurora/common/zookeeper/testing/BaseZooKeeperClientTest.java b/commons/src/main/java/org/apache/aurora/common/zookeeper/testing/BaseZooKeeperClientTest.java
new file mode 100644
index 0000000..ba09279
--- /dev/null
+++ b/commons/src/main/java/org/apache/aurora/common/zookeeper/testing/BaseZooKeeperClientTest.java
@@ -0,0 +1,140 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.common.zookeeper.testing;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+
+import org.apache.aurora.common.quantity.Amount;
+import org.apache.aurora.common.quantity.Time;
+import org.apache.aurora.common.zookeeper.Credentials;
+import org.apache.aurora.common.zookeeper.ZooKeeperClient;
+
+/**
+ * A base-class for tests that interact with ZooKeeper via the commons ZooKeeperClient.
+ */
+public abstract class BaseZooKeeperClientTest extends BaseZooKeeperTest {
+
+ private final Amount<Integer, Time> defaultSessionTimeout;
+
+ /**
+ * Creates a test case where the test server uses its
+ * {@link ZooKeeperTestServer#DEFAULT_SESSION_TIMEOUT} for clients created without an explicit
+ * session timeout.
+ */
+ public BaseZooKeeperClientTest() {
+ this(ZooKeeperTestServer.DEFAULT_SESSION_TIMEOUT);
+ }
+
+ /**
+ * Creates a test case where the test server uses the given {@code defaultSessionTimeout} for
+ * clients created without an explicit session timeout.
+ */
+ public BaseZooKeeperClientTest(Amount<Integer, Time> defaultSessionTimeout) {
+ this.defaultSessionTimeout = Preconditions.checkNotNull(defaultSessionTimeout);
+ }
+
+
+ /**
+ * Starts zookeeper back up on the last used port.
+ */
+ protected final void restartNetwork() throws IOException, InterruptedException {
+ getServer().restartNetwork();
+ }
+
+ /**
+ * Shuts down the in-process zookeeper network server.
+ */
+ protected final void shutdownNetwork() {
+ getServer().shutdownNetwork();
+ }
+
+ /**
+ * Expires the active session for the given client. The client should be one returned from
+ * {@link #createZkClient}.
+ *
+ * @param zkClient the client to expire
+ * @throws ZooKeeperClient.ZooKeeperConnectionException if a problem is encountered connecting to
+ * the local zk server while trying to expire the session
+ * @throws InterruptedException if interrupted while requesting expiration
+ */
+ protected final void expireSession(ZooKeeperClient zkClient)
+ throws ZooKeeperClient.ZooKeeperConnectionException, InterruptedException {
+ getServer().expireClientSession(zkClient.get().getSessionId());
+ }
+
+ /**
+ * Returns the current port to connect to the in-process zookeeper instance.
+ */
+ protected final int getPort() {
+ return getServer().getPort();
+ }
+
+ /**
+ * Returns a new unauthenticated zookeeper client connected to the in-process zookeeper server
+ * with the default session timeout.
+ */
+ protected final ZooKeeperClient createZkClient() {
+ return createZkClient(defaultSessionTimeout, Optional.absent(), Optional.absent());
+ }
+
+ /**
+ * Returns a new authenticated zookeeper client connected to the in-process zookeeper server with
+ * the default session timeout.
+ */
+ protected final ZooKeeperClient createZkClient(Credentials credentials) {
+ return createZkClient(defaultSessionTimeout, Optional.of(credentials), Optional.absent());
+ }
+
+ /**
+ * Returns a new authenticated zookeeper client connected to the in-process zookeeper server with
+ * the default session timeout. The client is authenticated in the digest authentication scheme
+ * with the given {@code username} and {@code password}.
+ */
+ protected final ZooKeeperClient createZkClient(String username, String password) {
+ return createZkClient(Credentials.digestCredentials(username, password));
+ }
+
+ /**
+ * Returns a new unauthenticated zookeeper client connected to the in-process zookeeper server
+ * with a custom {@code sessionTimeout}.
+ */
+ protected final ZooKeeperClient createZkClient(Amount<Integer, Time> sessionTimeout) {
+ return createZkClient(sessionTimeout, Optional.absent(), Optional.absent());
+ }
+
+ /**
+ * Returns a new authenticated zookeeper client connected to the in-process zookeeper server with
+ * the default session timeout and the custom chroot path.
+ */
+ protected final ZooKeeperClient createZkClient(String chrootPath) {
+ return createZkClient(defaultSessionTimeout, Optional.absent(),
+ Optional.of(chrootPath));
+ }
+
+ private ZooKeeperClient createZkClient(
+ Amount<Integer, Time> sessionTimeout,
+ Optional<Credentials> credentials,
+ Optional<String> chrootPath) {
+
+ ZooKeeperClient client = new ZooKeeperClient(sessionTimeout, credentials, chrootPath,
+ ImmutableList.of(InetSocketAddress.createUnresolved("127.0.0.1", getPort())));
+ addTearDown(client::close);
+ return client;
+ }
+}
http://git-wip-us.apache.org/repos/asf/aurora/blob/16e4651d/commons/src/main/java/org/apache/aurora/common/zookeeper/testing/BaseZooKeeperTest.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/org/apache/aurora/common/zookeeper/testing/BaseZooKeeperTest.java b/commons/src/main/java/org/apache/aurora/common/zookeeper/testing/BaseZooKeeperTest.java
new file mode 100644
index 0000000..0e68987
--- /dev/null
+++ b/commons/src/main/java/org/apache/aurora/common/zookeeper/testing/BaseZooKeeperTest.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.common.zookeeper.testing;
+
+import org.apache.aurora.common.testing.TearDownTestCase;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.rules.TemporaryFolder;
+
+/**
+ * A base-class for in-process zookeeper tests.
+ */
+public abstract class BaseZooKeeperTest extends TearDownTestCase {
+
+ private ZooKeeperTestServer zkTestServer;
+
+ @Rule
+ public TemporaryFolder tmpFolder = new TemporaryFolder();
+
+ @Before
+ public final void setUp() throws Exception {
+ zkTestServer = new ZooKeeperTestServer(tmpFolder.newFolder(), tmpFolder.newFolder());
+ addTearDown(zkTestServer::stop);
+ zkTestServer.startNetwork();
+ }
+
+ /**
+ * Returns the running in-process ZooKeeper server.
+ *
+ * @return The in-process ZooKeeper server.
+ */
+ protected final ZooKeeperTestServer getServer() {
+ return zkTestServer;
+ }
+}
http://git-wip-us.apache.org/repos/asf/aurora/blob/16e4651d/commons/src/main/java/org/apache/aurora/common/zookeeper/testing/ZooKeeperTestServer.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/org/apache/aurora/common/zookeeper/testing/ZooKeeperTestServer.java b/commons/src/main/java/org/apache/aurora/common/zookeeper/testing/ZooKeeperTestServer.java
new file mode 100644
index 0000000..50acaeb
--- /dev/null
+++ b/commons/src/main/java/org/apache/aurora/common/zookeeper/testing/ZooKeeperTestServer.java
@@ -0,0 +1,121 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.common.zookeeper.testing;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+
+import com.google.common.base.Preconditions;
+
+import org.apache.aurora.common.quantity.Amount;
+import org.apache.aurora.common.quantity.Time;
+import org.apache.zookeeper.server.NIOServerCnxnFactory;
+import org.apache.zookeeper.server.ServerCnxnFactory;
+import org.apache.zookeeper.server.ZooKeeperServer;
+import org.apache.zookeeper.server.ZooKeeperServer.BasicDataTreeBuilder;
+import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
+
+/**
+ * A helper class for starting in-process ZooKeeper server and clients.
+ *
+ * <p>This is ONLY meant to be used for testing.
+ */
+public class ZooKeeperTestServer {
+
+ static final Amount<Integer, Time> DEFAULT_SESSION_TIMEOUT = Amount.of(100, Time.MILLISECONDS);
+
+ private final File dataDir;
+ private final File snapDir;
+
+ private ZooKeeperServer zooKeeperServer;
+ private ServerCnxnFactory connectionFactory;
+ private int port;
+
+ public ZooKeeperTestServer(File dataDir, File snapDir) {
+ this.dataDir = Preconditions.checkNotNull(dataDir);
+ this.snapDir = Preconditions.checkNotNull(snapDir);
+ }
+
+ /**
+ * Starts zookeeper up on an ephemeral port.
+ */
+ public void startNetwork() throws IOException, InterruptedException {
+ zooKeeperServer =
+ new ZooKeeperServer(
+ new FileTxnSnapLog(dataDir, snapDir),
+ new BasicDataTreeBuilder()) {
+
+ // TODO(John Sirois): Introduce a builder to configure the in-process server if and when
+ // some folks need JMX for in-process tests.
+ @Override protected void registerJMX() {
+ // noop
+ }
+ };
+
+ connectionFactory = new NIOServerCnxnFactory();
+ connectionFactory.configure(
+ new InetSocketAddress(port),
+ 60 /* Semi-arbitrary, max 60 connections is the default used by NIOServerCnxnFactory */);
+ connectionFactory.startup(zooKeeperServer);
+ port = zooKeeperServer.getClientPort();
+ }
+
+ /**
+ * Stops the zookeeper server.
+ */
+ public void stop() {
+ shutdownNetwork();
+ }
+
+ /**
+ * Starts zookeeper back up on the last used port.
+ */
+ final void restartNetwork() throws IOException, InterruptedException {
+ checkEphemeralPortAssigned();
+ Preconditions.checkState(connectionFactory == null);
+ startNetwork();
+ }
+
+ /**
+ * Shuts down the in-process zookeeper network server.
+ */
+ final void shutdownNetwork() {
+ if (connectionFactory != null) {
+ connectionFactory.shutdown(); // Also shuts down zooKeeperServer.
+ connectionFactory = null;
+ }
+ }
+
+ /**
+ * Expires the client session with the given {@code sessionId}.
+ *
+ * @param sessionId The id of the client session to expire.
+ */
+ public final void expireClientSession(long sessionId) {
+ zooKeeperServer.closeSession(sessionId);
+ }
+
+ /**
+ * Returns the current port to connect to the in-process zookeeper instance.
+ */
+ public final int getPort() {
+ checkEphemeralPortAssigned();
+ return port;
+ }
+
+ private void checkEphemeralPortAssigned() {
+ Preconditions.checkState(port > 0, "startNetwork must be called first");
+ }
+}
http://git-wip-us.apache.org/repos/asf/aurora/blob/16e4651d/commons/src/test/java/org/apache/aurora/common/zookeeper/CandidateImplTest.java
----------------------------------------------------------------------
diff --git a/commons/src/test/java/org/apache/aurora/common/zookeeper/CandidateImplTest.java b/commons/src/test/java/org/apache/aurora/common/zookeeper/CandidateImplTest.java
new file mode 100644
index 0000000..9c0cebe
--- /dev/null
+++ b/commons/src/test/java/org/apache/aurora/common/zookeeper/CandidateImplTest.java
@@ -0,0 +1,165 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.common.zookeeper;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.LinkedBlockingDeque;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Supplier;
+
+import org.apache.aurora.common.base.ExceptionalCommand;
+import org.apache.aurora.common.quantity.Amount;
+import org.apache.aurora.common.quantity.Time;
+import org.apache.aurora.common.zookeeper.testing.BaseZooKeeperClientTest;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.data.ACL;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
+
+public class CandidateImplTest extends BaseZooKeeperClientTest {
+ private static final List<ACL> ACL = ZooDefs.Ids.OPEN_ACL_UNSAFE;
+ private static final String SERVICE = "/twitter/services/puffin_linkhose/leader";
+ private static final Amount<Integer, Time> TIMEOUT = Amount.of(1, Time.MINUTES);
+
+ private LinkedBlockingDeque<CandidateImpl> candidateBuffer;
+
+ @Before
+ public void mySetUp() throws IOException {
+ candidateBuffer = new LinkedBlockingDeque<>();
+ }
+
+ private Group createGroup(ZooKeeperClient zkClient) throws IOException {
+ return new Group(zkClient, ACL, SERVICE);
+ }
+
+ private class Reign implements Candidate.Leader {
+ private ExceptionalCommand<Group.JoinException> abdicate;
+ private final CandidateImpl candidate;
+ private final String id;
+ private CountDownLatch defeated = new CountDownLatch(1);
+
+ Reign(String id, CandidateImpl candidate) {
+ this.id = id;
+ this.candidate = candidate;
+ }
+
+ @Override
+ public void onElected(ExceptionalCommand<Group.JoinException> abdicate) {
+ candidateBuffer.offerFirst(candidate);
+ this.abdicate = abdicate;
+ }
+
+ @Override
+ public void onDefeated() {
+ defeated.countDown();
+ }
+
+ public void abdicate() throws Group.JoinException {
+ Preconditions.checkState(abdicate != null);
+ abdicate.execute();
+ }
+
+ public void expectDefeated() throws InterruptedException {
+ defeated.await();
+ }
+
+ @Override
+ public String toString() {
+ return id;
+ }
+ }
+
+ @Test
+ public void testOfferLeadership() throws Exception {
+ ZooKeeperClient zkClient1 = createZkClient(TIMEOUT);
+ final CandidateImpl candidate1 = new CandidateImpl(createGroup(zkClient1)) {
+ @Override public String toString() {
+ return "Leader1";
+ }
+ };
+ ZooKeeperClient zkClient2 = createZkClient(TIMEOUT);
+ final CandidateImpl candidate2 = new CandidateImpl(createGroup(zkClient2)) {
+ @Override public String toString() {
+ return "Leader2";
+ }
+ };
+ ZooKeeperClient zkClient3 = createZkClient(TIMEOUT);
+ final CandidateImpl candidate3 = new CandidateImpl(createGroup(zkClient3)) {
+ @Override public String toString() {
+ return "Leader3";
+ }
+ };
+
+ Reign candidate1Reign = new Reign("1", candidate1);
+ Reign candidate2Reign = new Reign("2", candidate2);
+ Reign candidate3Reign = new Reign("3", candidate3);
+
+ Supplier<Boolean> candidate1Leader = candidate1.offerLeadership(candidate1Reign);
+ Supplier<Boolean> candidate2Leader = candidate2.offerLeadership(candidate2Reign);
+ Supplier<Boolean> candidate3Leader = candidate3.offerLeadership(candidate3Reign);
+
+ assertTrue("Since initial group join is synchronous, candidate 1 should be the first leader",
+ candidate1Leader.get());
+
+ shutdownNetwork();
+ restartNetwork();
+
+ assertTrue("A re-connect without a session expiration should leave the leader elected",
+ candidate1Leader.get());
+
+ candidate1Reign.abdicate();
+ assertSame(candidate1, candidateBuffer.takeLast());
+ assertFalse(candidate1Leader.get());
+ // Active abdication should trigger defeat.
+ candidate1Reign.expectDefeated();
+
+ CandidateImpl secondCandidate = candidateBuffer.takeLast();
+ assertTrue("exactly 1 remaining candidate should now be leader: " + secondCandidate + " "
+ + candidateBuffer,
+ candidate2Leader.get() ^ candidate3Leader.get());
+
+ if (secondCandidate == candidate2) {
+ expireSession(zkClient2);
+ assertSame(candidate3, candidateBuffer.takeLast());
+ assertTrue(candidate3Leader.get());
+ // Passive expiration should trigger defeat.
+ candidate2Reign.expectDefeated();
+ } else {
+ expireSession(zkClient3);
+ assertSame(candidate2, candidateBuffer.takeLast());
+ assertTrue(candidate2Leader.get());
+ // Passive expiration should trigger defeat.
+ candidate3Reign.expectDefeated();
+ }
+ }
+
+ @Test
+ public void testEmptyMembership() throws Exception {
+ ZooKeeperClient zkClient1 = createZkClient(TIMEOUT);
+ final CandidateImpl candidate1 = new CandidateImpl(createGroup(zkClient1));
+ Reign candidate1Reign = new Reign("1", candidate1);
+
+ candidate1.offerLeadership(candidate1Reign);
+ assertSame(candidate1, candidateBuffer.takeLast());
+ candidate1Reign.abdicate();
+ assertFalse(candidate1.getLeaderData().isPresent());
+ }
+}
http://git-wip-us.apache.org/repos/asf/aurora/blob/16e4651d/commons/src/test/java/org/apache/aurora/common/zookeeper/GroupTest.java
----------------------------------------------------------------------
diff --git a/commons/src/test/java/org/apache/aurora/common/zookeeper/GroupTest.java b/commons/src/test/java/org/apache/aurora/common/zookeeper/GroupTest.java
new file mode 100644
index 0000000..97a42d1
--- /dev/null
+++ b/commons/src/test/java/org/apache/aurora/common/zookeeper/GroupTest.java
@@ -0,0 +1,321 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.common.zookeeper;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import com.google.common.base.Supplier;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterables;
+
+import org.apache.aurora.common.base.Command;
+import org.apache.aurora.common.quantity.Amount;
+import org.apache.aurora.common.quantity.Time;
+import org.apache.aurora.common.testing.easymock.EasyMockTest;
+import org.apache.aurora.common.zookeeper.Group.GroupChangeListener;
+import org.apache.aurora.common.zookeeper.Group.JoinException;
+import org.apache.aurora.common.zookeeper.Group.Membership;
+import org.apache.aurora.common.zookeeper.Group.NodeScheme;
+import org.apache.aurora.common.zookeeper.testing.BaseZooKeeperClientTest;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.easymock.EasyMock.createMock;
+import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.replay;
+import static org.easymock.EasyMock.reset;
+import static org.easymock.EasyMock.verify;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.fail;
+
+public class GroupTest extends BaseZooKeeperClientTest {
+
+ private ZooKeeperClient zkClient;
+ private Group joinGroup;
+ private Group watchGroup;
+ private Command stopWatching;
+ private Command onLoseMembership;
+
+ private RecordingListener listener;
+
+ public GroupTest() {
+ super(Amount.of(1, Time.DAYS));
+ }
+
+ @Before
+ public void mySetUp() throws Exception {
+ onLoseMembership = createMock(Command.class);
+
+ zkClient = createZkClient("group", "test");
+ joinGroup = new Group(zkClient, ZooKeeperUtils.EVERYONE_READ_CREATOR_ALL, "/a/group");
+ watchGroup = new Group(zkClient, ZooKeeperUtils.EVERYONE_READ_CREATOR_ALL, "/a/group");
+
+ listener = new RecordingListener();
+ stopWatching = watchGroup.watch(listener);
+ }
+
+ private static class RecordingListener implements GroupChangeListener {
+ private final LinkedBlockingQueue<Iterable<String>> membershipChanges =
+ new LinkedBlockingQueue<Iterable<String>>();
+
+ @Override
+ public void onGroupChange(Iterable<String> memberIds) {
+ membershipChanges.add(memberIds);
+ }
+
+ public Iterable<String> take() throws InterruptedException {
+ return membershipChanges.take();
+ }
+
+ public void assertEmpty() {
+ assertEquals(ImmutableList.<Iterable<String>>of(), ImmutableList.copyOf(membershipChanges));
+ }
+
+ @Override
+ public String toString() {
+ return membershipChanges.toString();
+ }
+ }
+
+ private static class CustomScheme implements NodeScheme {
+ static final String NODE_NAME = "custom_name";
+
+ @Override
+ public boolean isMember(String nodeName) {
+ return NODE_NAME.equals(nodeName);
+ }
+
+ @Override
+ public String createName(byte[] membershipData) {
+ return NODE_NAME;
+ }
+
+ @Override
+ public boolean isSequential() {
+ return false;
+ }
+ }
+
+ @Test
+ public void testSessionExpirationTriggersOnLoseMembership() throws Exception {
+ final CountDownLatch lostMembership = new CountDownLatch(1);
+ Command onLoseMembership = lostMembership::countDown;
+ assertEmptyMembershipObserved();
+
+ Membership membership = joinGroup.join(onLoseMembership);
+ assertMembershipObserved(membership.getMemberId());
+ expireSession(zkClient);
+
+ lostMembership.await(); // Will hang this test if onLoseMembership event is not propagated.
+ }
+
+ @Test
+ public void testNodeDeleteTriggersOnLoseMembership() throws Exception {
+ final CountDownLatch lostMembership = new CountDownLatch(1);
+ Command onLoseMembership = lostMembership::countDown;
+ assertEmptyMembershipObserved();
+
+ Membership membership = joinGroup.join(onLoseMembership);
+ assertMembershipObserved(membership.getMemberId());
+ membership.cancel();
+
+ lostMembership.await(); // Will hang this test if onLoseMembership event is not propagated.
+ }
+
+ @Test
+ public void testJoinsAndWatchesSurviveDisconnect() throws Exception {
+ replay(onLoseMembership);
+
+ assertEmptyMembershipObserved();
+
+ Membership membership = joinGroup.join();
+ String originalMemberId = membership.getMemberId();
+ assertMembershipObserved(originalMemberId);
+
+ shutdownNetwork();
+ restartNetwork();
+
+ // The member should still be present under existing ephemeral node since session did not
+ // expire.
+ watchGroup.watch(listener);
+ assertMembershipObserved(originalMemberId);
+
+ membership.cancel();
+
+ assertEmptyMembershipObserved();
+ assertEmptyMembershipObserved(); // and again for 2nd listener
+
+ listener.assertEmpty();
+
+ verify(onLoseMembership);
+ reset(onLoseMembership); // Turn off expectations during ZK server shutdown.
+ }
+
+ @Test
+ public void testJoinsAndWatchesSurviveExpiredSession() throws Exception {
+ onLoseMembership.execute();
+ replay(onLoseMembership);
+
+ assertEmptyMembershipObserved();
+
+ Membership membership = joinGroup.join(onLoseMembership);
+ String originalMemberId = membership.getMemberId();
+ assertMembershipObserved(originalMemberId);
+
+ expireSession(zkClient);
+
+ // We should have lost our group membership and then re-gained it with a new ephemeral node.
+ // We may or may-not see the intermediate state change but we must see the final state
+ Iterable<String> members = listener.take();
+ if (Iterables.isEmpty(members)) {
+ members = listener.take();
+ }
+ assertEquals(1, Iterables.size(members));
+ assertNotEquals(originalMemberId, Iterables.getOnlyElement(members));
+ assertNotEquals(originalMemberId, membership.getMemberId());
+
+ listener.assertEmpty();
+
+ verify(onLoseMembership);
+ reset(onLoseMembership); // Turn off expectations during ZK server shutdown.
+ }
+
+ @Test
+ public void testJoinCustomNamingScheme() throws Exception {
+ Group group = new Group(zkClient, ZooKeeperUtils.EVERYONE_READ_CREATOR_ALL, "/a/group",
+ new CustomScheme());
+
+ listener = new RecordingListener();
+ group.watch(listener);
+ assertEmptyMembershipObserved();
+
+ Membership membership = group.join();
+ String memberId = membership.getMemberId();
+
+ assertEquals("Wrong member ID.", CustomScheme.NODE_NAME, memberId);
+ assertMembershipObserved(memberId);
+
+ expireSession(zkClient);
+ }
+
+ @Test
+ public void testUpdateMembershipData() throws Exception {
+ Supplier<byte[]> dataSupplier = new EasyMockTest.Clazz<Supplier<byte[]>>() {}.createMock();
+
+ byte[] initial = "start".getBytes();
+ expect(dataSupplier.get()).andReturn(initial);
+
+ byte[] second = "update".getBytes();
+ expect(dataSupplier.get()).andReturn(second);
+
+ replay(dataSupplier);
+
+ Membership membership = joinGroup.join(dataSupplier, onLoseMembership);
+ assertArrayEquals("Initial setting is incorrect.", initial, zkClient.get()
+ .getData(membership.getMemberPath(), false, null));
+
+ assertArrayEquals("Updating supplier should not change membership data",
+ initial, zkClient.get().getData(membership.getMemberPath(), false, null));
+
+ membership.updateMemberData();
+ assertArrayEquals("Updating membership should change data",
+ second, zkClient.get().getData(membership.getMemberPath(), false, null));
+
+ verify(dataSupplier);
+ }
+
+ @Test
+ public void testAcls() throws Exception {
+ Group securedMembership =
+ new Group(createZkClient("secured", "group"), ZooKeeperUtils.EVERYONE_READ_CREATOR_ALL,
+ "/secured/group/membership");
+
+ String memberId = securedMembership.join().getMemberId();
+
+ Group unauthenticatedObserver =
+ new Group(createZkClient(),
+ Ids.READ_ACL_UNSAFE,
+ "/secured/group/membership");
+ RecordingListener unauthenticatedListener = new RecordingListener();
+ unauthenticatedObserver.watch(unauthenticatedListener);
+
+ assertMembershipObserved(unauthenticatedListener, memberId);
+
+ try {
+ unauthenticatedObserver.join();
+ fail("Expected join exception for unauthenticated observer");
+ } catch (JoinException e) {
+ // expected
+ }
+
+ Group unauthorizedObserver =
+ new Group(createZkClient("joe", "schmoe"),
+ Ids.READ_ACL_UNSAFE,
+ "/secured/group/membership");
+ RecordingListener unauthorizedListener = new RecordingListener();
+ unauthorizedObserver.watch(unauthorizedListener);
+
+ assertMembershipObserved(unauthorizedListener, memberId);
+
+ try {
+ unauthorizedObserver.join();
+ fail("Expected join exception for unauthorized observer");
+ } catch (JoinException e) {
+ // expected
+ }
+ }
+
+ @Test
+ public void testStopWatching() throws Exception {
+ replay(onLoseMembership);
+
+ assertEmptyMembershipObserved();
+
+ Membership member1 = joinGroup.join();
+ String memberId1 = member1.getMemberId();
+ assertMembershipObserved(memberId1);
+
+ Membership member2 = joinGroup.join();
+ String memberId2 = member2.getMemberId();
+ assertMembershipObserved(memberId1, memberId2);
+
+ stopWatching.execute();
+
+ member1.cancel();
+ Membership member3 = joinGroup.join();
+ member2.cancel();
+ member3.cancel();
+
+ listener.assertEmpty();
+ }
+
+ private void assertEmptyMembershipObserved() throws InterruptedException {
+ assertMembershipObserved();
+ }
+
+ private void assertMembershipObserved(String... expectedMemberIds) throws InterruptedException {
+ assertMembershipObserved(listener, expectedMemberIds);
+ }
+
+ private void assertMembershipObserved(RecordingListener listener, String... expectedMemberIds)
+ throws InterruptedException {
+
+ assertEquals(ImmutableSet.copyOf(expectedMemberIds), ImmutableSet.copyOf(listener.take()));
+ }
+}
http://git-wip-us.apache.org/repos/asf/aurora/blob/16e4651d/commons/src/test/java/org/apache/aurora/common/zookeeper/JsonCodecTest.java
----------------------------------------------------------------------
diff --git a/commons/src/test/java/org/apache/aurora/common/zookeeper/JsonCodecTest.java b/commons/src/test/java/org/apache/aurora/common/zookeeper/JsonCodecTest.java
new file mode 100644
index 0000000..2166123
--- /dev/null
+++ b/commons/src/test/java/org/apache/aurora/common/zookeeper/JsonCodecTest.java
@@ -0,0 +1,151 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.common.zookeeper;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+
+import com.google.common.base.Charsets;
+import com.google.common.collect.ImmutableMap;
+import com.google.gson.Gson;
+import com.google.gson.JsonIOException;
+
+import org.apache.aurora.common.io.Codec;
+import org.apache.aurora.common.thrift.Endpoint;
+import org.apache.aurora.common.thrift.ServiceInstance;
+import org.apache.aurora.common.thrift.Status;
+import org.easymock.EasyMock;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.api.easymock.PowerMock;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(Gson.class)
+public class JsonCodecTest {
+
+ private static final Codec<ServiceInstance> STANDARD_JSON_CODEC = new JsonCodec();
+
+ @Test
+ public void testJsonCodecRoundtrip() throws Exception {
+ Codec<ServiceInstance> codec = STANDARD_JSON_CODEC;
+ ServiceInstance instance1 = new ServiceInstance(
+ new Endpoint("foo", 1000),
+ ImmutableMap.of("http", new Endpoint("foo", 8080)),
+ Status.ALIVE)
+ .setShard(0);
+ byte[] data = ServerSets.serializeServiceInstance(instance1, codec);
+ assertTrue(ServerSets.deserializeServiceInstance(data, codec).getServiceEndpoint().isSetPort());
+ assertTrue(ServerSets.deserializeServiceInstance(data, codec).isSetShard());
+
+ ServiceInstance instance2 = new ServiceInstance(
+ new Endpoint("foo", 1000),
+ ImmutableMap.of("http-admin1", new Endpoint("foo", 8080)),
+ Status.ALIVE);
+ data = ServerSets.serializeServiceInstance(instance2, codec);
+ assertTrue(ServerSets.deserializeServiceInstance(data, codec).getServiceEndpoint().isSetPort());
+ assertFalse(ServerSets.deserializeServiceInstance(data, codec).isSetShard());
+
+ ServiceInstance instance3 = new ServiceInstance(
+ new Endpoint("foo", 1000),
+ ImmutableMap.<String, Endpoint>of(),
+ Status.ALIVE);
+ data = ServerSets.serializeServiceInstance(instance3, codec);
+ assertTrue(ServerSets.deserializeServiceInstance(data, codec).getServiceEndpoint().isSetPort());
+ assertFalse(ServerSets.deserializeServiceInstance(data, codec).isSetShard());
+ }
+
+ @Test
+ public void testJsonCompatibility() throws IOException {
+ ServiceInstance instance = new ServiceInstance(
+ new Endpoint("foo", 1000),
+ ImmutableMap.of("http", new Endpoint("foo", 8080)),
+ Status.ALIVE).setShard(42);
+
+ ByteArrayOutputStream results = new ByteArrayOutputStream();
+ STANDARD_JSON_CODEC.serialize(instance, results);
+ assertEquals(
+ "{\"serviceEndpoint\":{\"host\":\"foo\",\"port\":1000},"
+ + "\"additionalEndpoints\":{\"http\":{\"host\":\"foo\",\"port\":8080}},"
+ + "\"status\":\"ALIVE\","
+ + "\"shard\":42}",
+ results.toString());
+ }
+
+ @Test
+ public void testInvalidSerialize() {
+ // Gson is final so we need to call on PowerMock here.
+ Gson gson = PowerMock.createMock(Gson.class);
+ gson.toJson(EasyMock.isA(Object.class), EasyMock.isA(Appendable.class));
+ EasyMock.expectLastCall().andThrow(new JsonIOException("error"));
+ PowerMock.replay(gson);
+
+ ServiceInstance instance =
+ new ServiceInstance(new Endpoint("foo", 1000), ImmutableMap.of(), Status.ALIVE);
+
+ try {
+ new JsonCodec(gson).serialize(instance, new ByteArrayOutputStream());
+ fail();
+ } catch (IOException e) {
+ // Expected.
+ }
+
+ PowerMock.verify(gson);
+ }
+
+ @Test
+ public void testDeserializeMinimal() throws IOException {
+ String minimal = "{\"serviceEndpoint\":{\"host\":\"foo\",\"port\":1000},\"status\":\"ALIVE\"}";
+ ByteArrayInputStream source = new ByteArrayInputStream(minimal.getBytes(Charsets.UTF_8));
+ ServiceInstance actual = STANDARD_JSON_CODEC.deserialize(source);
+ ServiceInstance expected =
+ new ServiceInstance(new Endpoint("foo", 1000), ImmutableMap.of(), Status.ALIVE);
+ assertEquals(expected, actual);
+ }
+
+ @Test
+ public void testInvalidDeserialize() {
+ // Not JSON.
+ assertInvalidDeserialize(new byte[] {0xC, 0xA, 0xF, 0xE});
+
+ // No JSON object.
+ assertInvalidDeserialize("");
+ assertInvalidDeserialize("[]");
+
+ // Missing required fields.
+ assertInvalidDeserialize("{}");
+ assertInvalidDeserialize("{\"serviceEndpoint\":{\"host\":\"foo\",\"port\":1000}}");
+ assertInvalidDeserialize("{\"status\":\"ALIVE\"}");
+ }
+
+ private void assertInvalidDeserialize(String data) {
+ assertInvalidDeserialize(data.getBytes(Charsets.UTF_8));
+ }
+
+ private void assertInvalidDeserialize(byte[] data) {
+ try {
+ STANDARD_JSON_CODEC.deserialize(new ByteArrayInputStream(data));
+ fail();
+ } catch (IOException e) {
+ // Expected.
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/aurora/blob/16e4651d/commons/src/test/java/org/apache/aurora/common/zookeeper/ServerSetImplTest.java
----------------------------------------------------------------------
diff --git a/commons/src/test/java/org/apache/aurora/common/zookeeper/ServerSetImplTest.java b/commons/src/test/java/org/apache/aurora/common/zookeeper/ServerSetImplTest.java
new file mode 100644
index 0000000..f0c0cb4
--- /dev/null
+++ b/commons/src/test/java/org/apache/aurora/common/zookeeper/ServerSetImplTest.java
@@ -0,0 +1,258 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.common.zookeeper;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterables;
+
+import org.apache.aurora.common.base.Command;
+import org.apache.aurora.common.net.pool.DynamicHostSet;
+import org.apache.aurora.common.thrift.Endpoint;
+import org.apache.aurora.common.thrift.ServiceInstance;
+import org.apache.aurora.common.thrift.Status;
+import org.apache.aurora.common.zookeeper.Group.JoinException;
+import org.apache.aurora.common.zookeeper.testing.BaseZooKeeperClientTest;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.data.ACL;
+import org.easymock.IMocksControl;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.easymock.EasyMock.anyObject;
+import static org.easymock.EasyMock.createControl;
+import static org.easymock.EasyMock.expect;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ *
+ * TODO(William Farner): Change this to remove thrift dependency.
+ */
+public class ServerSetImplTest extends BaseZooKeeperClientTest {
+ private static final List<ACL> ACL = ZooDefs.Ids.OPEN_ACL_UNSAFE;
+ private static final String SERVICE = "/twitter/services/puffin_hosebird";
+
+ private LinkedBlockingQueue<ImmutableSet<ServiceInstance>> serverSetBuffer;
+ private DynamicHostSet.HostChangeMonitor<ServiceInstance> serverSetMonitor;
+
+ @Before
+ public void mySetUp() throws IOException {
+ serverSetBuffer = new LinkedBlockingQueue<>();
+ serverSetMonitor = serverSetBuffer::offer;
+ }
+
+ private ServerSetImpl createServerSet() throws IOException {
+ return new ServerSetImpl(createZkClient(), ACL, SERVICE);
+ }
+
+ @Test
+ public void testLifecycle() throws Exception {
+ ServerSetImpl client = createServerSet();
+ client.watch(serverSetMonitor);
+ assertChangeFiredEmpty();
+
+ ServerSetImpl server = createServerSet();
+ ServerSet.EndpointStatus status = server.join(
+ InetSocketAddress.createUnresolved("foo", 1234), makePortMap("http-admin", 8080));
+
+ ServiceInstance serviceInstance = new ServiceInstance(
+ new Endpoint("foo", 1234),
+ ImmutableMap.of("http-admin", new Endpoint("foo", 8080)),
+ Status.ALIVE);
+
+ assertChangeFired(serviceInstance);
+
+ status.leave();
+ assertChangeFiredEmpty();
+ assertTrue(serverSetBuffer.isEmpty());
+ }
+
+ @Test
+ public void testMembershipChanges() throws Exception {
+ ServerSetImpl client = createServerSet();
+ client.watch(serverSetMonitor);
+ assertChangeFiredEmpty();
+
+ ServerSetImpl server = createServerSet();
+
+ ServerSet.EndpointStatus foo = join(server, "foo");
+ assertChangeFired("foo");
+
+ expireSession(client.getZkClient());
+
+ ServerSet.EndpointStatus bar = join(server, "bar");
+
+ // We should've auto re-monitored membership, but not been notifed of "foo" since this was not a
+ // change, just "foo", "bar" since this was an addition.
+ assertChangeFired("foo", "bar");
+
+ foo.leave();
+ assertChangeFired("bar");
+
+ ServerSet.EndpointStatus baz = join(server, "baz");
+ assertChangeFired("bar", "baz");
+
+ baz.leave();
+ assertChangeFired("bar");
+
+ bar.leave();
+ assertChangeFiredEmpty();
+
+ assertTrue(serverSetBuffer.isEmpty());
+ }
+
+ @Test
+ public void testStopMonitoring() throws Exception {
+ ServerSetImpl client = createServerSet();
+ Command stopMonitoring = client.watch(serverSetMonitor);
+ assertChangeFiredEmpty();
+
+ ServerSetImpl server = createServerSet();
+
+ ServerSet.EndpointStatus foo = join(server, "foo");
+ assertChangeFired("foo");
+ ServerSet.EndpointStatus bar = join(server, "bar");
+ assertChangeFired("foo", "bar");
+
+ stopMonitoring.execute();
+
+ // No new updates should be received since monitoring has stopped.
+ foo.leave();
+ assertTrue(serverSetBuffer.isEmpty());
+
+ // Expiration event.
+ assertTrue(serverSetBuffer.isEmpty());
+ }
+
+ @Test
+ public void testOrdering() throws Exception {
+ ServerSetImpl client = createServerSet();
+ client.watch(serverSetMonitor);
+ assertChangeFiredEmpty();
+
+ Map<String, InetSocketAddress> server1Ports = makePortMap("http-admin1", 8080);
+ Map<String, InetSocketAddress> server2Ports = makePortMap("http-admin2", 8081);
+ Map<String, InetSocketAddress> server3Ports = makePortMap("http-admin3", 8082);
+
+ ServerSetImpl server1 = createServerSet();
+ ServerSetImpl server2 = createServerSet();
+ ServerSetImpl server3 = createServerSet();
+
+ ServiceInstance instance1 = new ServiceInstance(
+ new Endpoint("foo", 1000),
+ ImmutableMap.of("http-admin1", new Endpoint("foo", 8080)),
+ Status.ALIVE);
+ ServiceInstance instance2 = new ServiceInstance(
+ new Endpoint("foo", 1001),
+ ImmutableMap.of("http-admin2", new Endpoint("foo", 8081)),
+ Status.ALIVE);
+ ServiceInstance instance3 = new ServiceInstance(
+ new Endpoint("foo", 1002),
+ ImmutableMap.of("http-admin3", new Endpoint("foo", 8082)),
+ Status.ALIVE);
+
+ server1.join(InetSocketAddress.createUnresolved("foo", 1000), server1Ports);
+ assertEquals(ImmutableList.of(instance1), ImmutableList.copyOf(serverSetBuffer.take()));
+
+ ServerSet.EndpointStatus status2 = server2.join(
+ InetSocketAddress.createUnresolved("foo", 1001),
+ server2Ports);
+ assertEquals(ImmutableList.of(instance1, instance2),
+ ImmutableList.copyOf(serverSetBuffer.take()));
+
+ server3.join(InetSocketAddress.createUnresolved("foo", 1002), server3Ports);
+ assertEquals(ImmutableList.of(instance1, instance2, instance3),
+ ImmutableList.copyOf(serverSetBuffer.take()));
+
+ status2.leave();
+ assertEquals(ImmutableList.of(instance1, instance3),
+ ImmutableList.copyOf(serverSetBuffer.take()));
+ }
+
+ @Test
+ public void testUnwatchOnException() throws Exception {
+ IMocksControl control = createControl();
+
+ ZooKeeperClient zkClient = control.createMock(ZooKeeperClient.class);
+ Watcher onExpirationWatcher = control.createMock(Watcher.class);
+
+ expect(zkClient.registerExpirationHandler(anyObject(Command.class)))
+ .andReturn(onExpirationWatcher);
+
+ expect(zkClient.get()).andThrow(new InterruptedException()); // See interrupted() note below.
+ expect(zkClient.unregister(onExpirationWatcher)).andReturn(true);
+ control.replay();
+
+ Group group = new Group(zkClient, ZooDefs.Ids.OPEN_ACL_UNSAFE, "/blabla");
+ ServerSetImpl serverset = new ServerSetImpl(zkClient, group);
+
+ try {
+ serverset.watch(hostSet -> {});
+ fail("Expected MonitorException");
+ } catch (DynamicHostSet.MonitorException e) {
+ // NB: The assert is not important to this test, but the call to `Thread.interrupted()` is.
+ // That call both returns the current interrupted status as well as clearing it. The clearing
+ // is crucial depending on the order tests are run in this class. If this test runs before
+ // one of the tests above that uses a `ZooKeeperClient` for example, those tests will fail
+ // executing `ZooKeeperClient.get` which internally blocks on s sync-point that takes part in
+ // the interruption mechanism and so immediately throws `InterruptedException` based on the
+ // un-cleared interrupted bit.
+ assertTrue(Thread.interrupted());
+ }
+ control.verify();
+ }
+
+ private static Map<String, InetSocketAddress> makePortMap(String name, int port) {
+ return ImmutableMap.of(name, InetSocketAddress.createUnresolved("foo", port));
+ }
+
+ private ServerSet.EndpointStatus join(ServerSet serverSet, String host)
+ throws JoinException, InterruptedException {
+
+ return serverSet.join(
+ InetSocketAddress.createUnresolved(host, 42), ImmutableMap.<String, InetSocketAddress>of());
+ }
+
+ private void assertChangeFired(String... serviceHosts)
+ throws InterruptedException {
+
+ assertChangeFired(ImmutableSet.copyOf(Iterables.transform(ImmutableSet.copyOf(serviceHosts),
+ serviceHost -> new ServiceInstance(new Endpoint(serviceHost, 42),
+ ImmutableMap.<String, Endpoint>of(), Status.ALIVE))));
+ }
+
+ protected void assertChangeFiredEmpty() throws InterruptedException {
+ assertChangeFired(ImmutableSet.<ServiceInstance>of());
+ }
+
+ protected void assertChangeFired(ServiceInstance... serviceInstances)
+ throws InterruptedException {
+ assertChangeFired(ImmutableSet.copyOf(serviceInstances));
+ }
+
+ protected void assertChangeFired(ImmutableSet<ServiceInstance> serviceInstances)
+ throws InterruptedException {
+ assertEquals(serviceInstances, serverSetBuffer.take());
+ }
+}
http://git-wip-us.apache.org/repos/asf/aurora/blob/16e4651d/commons/src/test/java/org/apache/aurora/common/zookeeper/ServerSetsTest.java
----------------------------------------------------------------------
diff --git a/commons/src/test/java/org/apache/aurora/common/zookeeper/ServerSetsTest.java b/commons/src/test/java/org/apache/aurora/common/zookeeper/ServerSetsTest.java
new file mode 100644
index 0000000..0e67191
--- /dev/null
+++ b/commons/src/test/java/org/apache/aurora/common/zookeeper/ServerSetsTest.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.common.zookeeper;
+
+import java.net.InetSocketAddress;
+import java.util.Map;
+
+import com.google.common.collect.ImmutableMap;
+
+import org.apache.aurora.common.thrift.Endpoint;
+import org.apache.aurora.common.thrift.ServiceInstance;
+import org.apache.aurora.common.thrift.Status;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class ServerSetsTest {
+ @Test
+ public void testSimpleSerialization() throws Exception {
+ InetSocketAddress endpoint = new InetSocketAddress(12345);
+ Map<String, Endpoint > additionalEndpoints = ImmutableMap.of();
+ Status status = Status.ALIVE;
+
+ byte[] data = ServerSets.serializeServiceInstance(
+ endpoint, additionalEndpoints, status, ServerSet.JSON_CODEC);
+
+ ServiceInstance instance = ServerSets.deserializeServiceInstance(data, ServerSet.JSON_CODEC);
+
+ assertEquals(endpoint.getPort(), instance.getServiceEndpoint().getPort());
+ assertEquals(additionalEndpoints, instance.getAdditionalEndpoints());
+ assertEquals(Status.ALIVE, instance.getStatus());
+ }
+}
http://git-wip-us.apache.org/repos/asf/aurora/blob/16e4651d/commons/src/test/java/org/apache/aurora/common/zookeeper/SingletonServiceImplTest.java
----------------------------------------------------------------------
diff --git a/commons/src/test/java/org/apache/aurora/common/zookeeper/SingletonServiceImplTest.java b/commons/src/test/java/org/apache/aurora/common/zookeeper/SingletonServiceImplTest.java
new file mode 100644
index 0000000..5f6cdd8
--- /dev/null
+++ b/commons/src/test/java/org/apache/aurora/common/zookeeper/SingletonServiceImplTest.java
@@ -0,0 +1,243 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.common.zookeeper;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.List;
+import java.util.Map;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+
+import org.apache.aurora.common.base.ExceptionalCommand;
+import org.apache.aurora.common.zookeeper.Candidate.Leader;
+import org.apache.aurora.common.zookeeper.SingletonService.LeaderControl;
+import org.apache.aurora.common.zookeeper.SingletonService.LeadershipListener;
+import org.apache.aurora.common.zookeeper.testing.BaseZooKeeperClientTest;
+import org.easymock.Capture;
+import org.easymock.IExpectationSetters;
+import org.easymock.IMocksControl;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.apache.aurora.common.testing.easymock.EasyMockTest.createCapture;
+import static org.easymock.EasyMock.capture;
+import static org.easymock.EasyMock.createControl;
+import static org.easymock.EasyMock.expect;
+import static org.junit.Assert.fail;
+
+public class SingletonServiceImplTest extends BaseZooKeeperClientTest {
+ private static final int PORT_A = 1234;
+ private static final int PORT_B = 8080;
+ private static final InetSocketAddress PRIMARY_ENDPOINT =
+ InetSocketAddress.createUnresolved("foo", PORT_A);
+ private static final Map<String, InetSocketAddress> AUX_ENDPOINTS =
+ ImmutableMap.of("http-admin", InetSocketAddress.createUnresolved("foo", PORT_B));
+
+ private IMocksControl control;
+ private SingletonServiceImpl.LeadershipListener listener;
+ private ServerSet serverSet;
+ private ServerSet.EndpointStatus endpointStatus;
+ private Candidate candidate;
+ private ExceptionalCommand<Group.JoinException> abdicate;
+
+ private SingletonService service;
+
+ @Before
+ @SuppressWarnings("unchecked")
+ public void mySetUp() throws IOException {
+ control = createControl();
+ addTearDown(control::verify);
+ listener = control.createMock(SingletonServiceImpl.LeadershipListener.class);
+ serverSet = control.createMock(ServerSet.class);
+ candidate = control.createMock(Candidate.class);
+ endpointStatus = control.createMock(ServerSet.EndpointStatus.class);
+ abdicate = control.createMock(ExceptionalCommand.class);
+
+ service = new SingletonServiceImpl(serverSet, candidate);
+ }
+
+ private void newLeader(
+ final String hostName,
+ Capture<Leader> leader,
+ LeadershipListener listener) throws Exception {
+
+ service.lead(InetSocketAddress.createUnresolved(hostName, PORT_A),
+ ImmutableMap.of("http-admin", InetSocketAddress.createUnresolved(hostName, PORT_B)),
+ listener);
+
+ // This actually elects the leader.
+ leader.getValue().onElected(abdicate);
+ }
+
+ private void newLeader(String hostName, Capture<Leader> leader) throws Exception {
+ newLeader(hostName, leader, listener);
+ }
+
+ private IExpectationSetters<ServerSet.EndpointStatus> expectJoin() throws Exception {
+ return expect(serverSet.join(PRIMARY_ENDPOINT, AUX_ENDPOINTS));
+ }
+
+ @Test
+ public void testLeadAdvertise() throws Exception {
+ Capture<Leader> leaderCapture = createCapture();
+
+ expect(candidate.offerLeadership(capture(leaderCapture))).andReturn(null);
+ Capture<LeaderControl> controlCapture = createCapture();
+ listener.onLeading(capture(controlCapture));
+
+ expectJoin().andReturn(endpointStatus);
+ endpointStatus.leave();
+ abdicate.execute();
+
+ control.replay();
+
+ newLeader("foo", leaderCapture);
+ controlCapture.getValue().advertise();
+ controlCapture.getValue().leave();
+ }
+
+ @Test
+ public void teatLeadLeaveNoAdvertise() throws Exception {
+ Capture<Leader> leaderCapture = createCapture();
+
+ expect(candidate.offerLeadership(capture(leaderCapture))).andReturn(null);
+ abdicate.execute();
+
+ Capture<LeaderControl> controlCapture = createCapture();
+ listener.onLeading(capture(controlCapture));
+
+ control.replay();
+
+ newLeader("foo", leaderCapture);
+ controlCapture.getValue().leave();
+ }
+
+ @Test
+ public void testLeadJoinFailure() throws Exception {
+ Capture<Leader> leaderCapture = new Capture<Leader>();
+
+ expect(candidate.offerLeadership(capture(leaderCapture))).andReturn(null);
+ Capture<LeaderControl> controlCapture = createCapture();
+ listener.onLeading(capture(controlCapture));
+
+ expectJoin().andThrow(new Group.JoinException("Injected join failure.", new Exception()));
+ abdicate.execute();
+
+ control.replay();
+
+ newLeader("foo", leaderCapture);
+
+ try {
+ controlCapture.getValue().advertise();
+ fail("Join should have failed.");
+ } catch (SingletonService.AdvertiseException e) {
+ // Expected.
+ }
+
+ controlCapture.getValue().leave();
+ }
+
+ @Test(expected = IllegalStateException.class)
+ public void testMultipleAdvertise() throws Exception {
+ Capture<Leader> leaderCapture = createCapture();
+
+ expect(candidate.offerLeadership(capture(leaderCapture))).andReturn(null);
+ Capture<LeaderControl> controlCapture = createCapture();
+ listener.onLeading(capture(controlCapture));
+
+ expectJoin().andReturn(endpointStatus);
+
+ control.replay();
+
+ newLeader("foo", leaderCapture);
+ controlCapture.getValue().advertise();
+ controlCapture.getValue().advertise();
+ }
+
+ @Test(expected = IllegalStateException.class)
+ public void testMultipleLeave() throws Exception {
+ Capture<Leader> leaderCapture = createCapture();
+
+ expect(candidate.offerLeadership(capture(leaderCapture))).andReturn(null);
+ Capture<LeaderControl> controlCapture = createCapture();
+ listener.onLeading(capture(controlCapture));
+
+ expectJoin().andReturn(endpointStatus);
+ endpointStatus.leave();
+ abdicate.execute();
+
+ control.replay();
+
+ newLeader("foo", leaderCapture);
+ controlCapture.getValue().advertise();
+ controlCapture.getValue().leave();
+ controlCapture.getValue().leave();
+ }
+
+ @Test(expected = IllegalStateException.class)
+ public void testAdvertiseAfterLeave() throws Exception {
+ Capture<Leader> leaderCapture = createCapture();
+
+ expect(candidate.offerLeadership(capture(leaderCapture))).andReturn(null);
+ Capture<LeaderControl> controlCapture = createCapture();
+ listener.onLeading(capture(controlCapture));
+
+ abdicate.execute();
+
+ control.replay();
+
+ newLeader("foo", leaderCapture);
+ controlCapture.getValue().leave();
+ controlCapture.getValue().advertise();
+ }
+
+ @Test
+ public void testLeadMulti() throws Exception {
+ List<Capture<Leader>> leaderCaptures = Lists.newArrayList();
+ List<Capture<LeaderControl>> leaderControlCaptures = Lists.newArrayList();
+
+ for (int i = 0; i < 5; i++) {
+ Capture<Leader> leaderCapture = new Capture<Leader>();
+ leaderCaptures.add(leaderCapture);
+ Capture<LeaderControl> controlCapture = createCapture();
+ leaderControlCaptures.add(controlCapture);
+
+ expect(candidate.offerLeadership(capture(leaderCapture))).andReturn(null);
+ listener.onLeading(capture(controlCapture));
+ InetSocketAddress primary = InetSocketAddress.createUnresolved("foo" + i, PORT_A);
+ Map<String, InetSocketAddress> aux =
+ ImmutableMap.of("http-admin", InetSocketAddress.createUnresolved("foo" + i, PORT_B));
+ expect(serverSet.join(primary, aux)).andReturn(endpointStatus);
+ endpointStatus.leave();
+ abdicate.execute();
+ }
+
+ control.replay();
+
+ for (int i = 0; i < 5; i++) {
+ final String leaderName = "foo" + i;
+ newLeader(leaderName, leaderCaptures.get(i));
+ leaderControlCaptures.get(i).getValue().advertise();
+ leaderControlCaptures.get(i).getValue().leave();
+ }
+ }
+
+ @Test
+ public void testLeaderLeaves() throws Exception {
+ control.replay();
+ shutdownNetwork();
+ }
+}
http://git-wip-us.apache.org/repos/asf/aurora/blob/16e4651d/commons/src/test/java/org/apache/aurora/common/zookeeper/ZooKeeperClientTest.java
----------------------------------------------------------------------
diff --git a/commons/src/test/java/org/apache/aurora/common/zookeeper/ZooKeeperClientTest.java b/commons/src/test/java/org/apache/aurora/common/zookeeper/ZooKeeperClientTest.java
new file mode 100644
index 0000000..5eee235
--- /dev/null
+++ b/commons/src/test/java/org/apache/aurora/common/zookeeper/ZooKeeperClientTest.java
@@ -0,0 +1,210 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.common.zookeeper;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.aurora.common.quantity.Amount;
+import org.apache.aurora.common.quantity.Time;
+import org.apache.aurora.common.zookeeper.ZooKeeperClient.ZooKeeperConnectionException;
+import org.apache.aurora.common.zookeeper.testing.BaseZooKeeperClientTest;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException.ConnectionLossException;
+import org.apache.zookeeper.KeeperException.NoAuthException;
+import org.apache.zookeeper.ZooKeeper;
+import org.junit.Test;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * @author John Sirois
+ */
+public class ZooKeeperClientTest extends BaseZooKeeperClientTest {
+
+ public ZooKeeperClientTest() {
+ super(Amount.of(1, Time.DAYS));
+ }
+
+ @Test
+ public void testGet() throws Exception {
+ final ZooKeeperClient zkClient = createZkClient();
+ shutdownNetwork();
+ try {
+ zkClient.get(Amount.of(50L, Time.MILLISECONDS));
+ fail("Expected client connection to timeout while network down");
+ } catch (TimeoutException e) {
+ assertTrue(zkClient.isClosed());
+ }
+ assertNull(zkClient.getZooKeeperClientForTests());
+
+ final CountDownLatch blockingGetComplete = new CountDownLatch(1);
+ final AtomicReference<ZooKeeper> client = new AtomicReference<ZooKeeper>();
+ new Thread(() -> {
+ try {
+ client.set(zkClient.get());
+ } catch (ZooKeeperConnectionException e) {
+ throw new RuntimeException(e);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ } finally {
+ blockingGetComplete.countDown();
+ }
+ }).start();
+
+ restartNetwork();
+
+ // Hung blocking connects should succeed when server connection comes up
+ blockingGetComplete.await();
+ assertNotNull(client.get());
+
+ // New connections should succeed now that network is back up
+ long sessionId = zkClient.get().getSessionId();
+
+ // While connected the same client should be reused (no new connections while healthy)
+ assertSame(client.get(), zkClient.get());
+
+ shutdownNetwork();
+ // Our client doesn't know the network is down yet so we should be able to get()
+ ZooKeeper zooKeeper = zkClient.get();
+ try {
+ zooKeeper.exists("/", false);
+ fail("Expected client operation to fail while network down");
+ } catch (ConnectionLossException e) {
+ // expected
+ }
+
+ restartNetwork();
+ assertEquals("Expected connection to be re-established with existing session",
+ sessionId, zkClient.get().getSessionId());
+ }
+
+ /**
+ * Test that if a blocking get() call gets interrupted, after a connection has been created
+ * but before it's connected, the zk connection gets closed.
+ */
+ @Test
+ public void testGetInterrupted() throws Exception {
+ final ZooKeeperClient zkClient = createZkClient();
+ shutdownNetwork();
+
+ final CountDownLatch blockingGetComplete = new CountDownLatch(1);
+ final AtomicBoolean interrupted = new AtomicBoolean();
+ final AtomicReference<ZooKeeper> client = new AtomicReference<ZooKeeper>();
+ Thread getThread = new Thread(() -> {
+ try {
+ client.set(zkClient.get());
+ } catch (ZooKeeperConnectionException e) {
+ throw new RuntimeException(e);
+ } catch (InterruptedException e) {
+ interrupted.set(true);
+ throw new RuntimeException(e);
+ } finally {
+ blockingGetComplete.countDown();
+ }
+ });
+ getThread.start();
+
+ while (zkClient.getZooKeeperClientForTests() == null) {
+ Thread.sleep(100);
+ }
+
+ getThread.interrupt();
+ blockingGetComplete.await();
+
+ assertNull("The zk connection should have been closed", zkClient.getZooKeeperClientForTests());
+ assertTrue("The waiter thread should have been interrupted", interrupted.get());
+ assertTrue(zkClient.isClosed());
+ }
+
+ @Test
+ public void testClose() throws Exception {
+ ZooKeeperClient zkClient = createZkClient();
+ zkClient.close();
+
+ // Close should be idempotent
+ zkClient.close();
+
+ long firstSessionId = zkClient.get().getSessionId();
+
+ // Close on an open client should force session re-establishment
+ zkClient.close();
+
+ assertNotEquals(firstSessionId, zkClient.get().getSessionId());
+ }
+
+ @Test
+ public void testCredentials() throws Exception {
+ String path = "/test";
+ ZooKeeperClient authenticatedClient = createZkClient("creator", "creator");
+ assertEquals(path,
+ authenticatedClient.get().create(path, "42".getBytes(),
+ ZooKeeperUtils.EVERYONE_READ_CREATOR_ALL, CreateMode.PERSISTENT));
+
+ ZooKeeperClient unauthenticatedClient = createZkClient();
+ assertEquals("42", getData(unauthenticatedClient, path));
+ try {
+ setData(unauthenticatedClient, path, "37");
+ fail("Expected unauthenticated write attempt to fail");
+ } catch (NoAuthException e) {
+ assertEquals("42", getData(unauthenticatedClient, path));
+ }
+
+ ZooKeeperClient nonOwnerClient = createZkClient("nonowner", "nonowner");
+ assertEquals("42", getData(nonOwnerClient, path));
+ try {
+ setData(nonOwnerClient, path, "37");
+ fail("Expected non owner write attempt to fail");
+ } catch (NoAuthException e) {
+ assertEquals("42", getData(nonOwnerClient, path));
+ }
+
+ ZooKeeperClient authenticatedClient2 = createZkClient("creator", "creator");
+ setData(authenticatedClient2, path, "37");
+ assertEquals("37", getData(authenticatedClient2, path));
+ }
+
+ @Test
+ public void testChrootPath() throws Exception {
+ ZooKeeperClient rootClient = createZkClient();
+ String rootPath = "/test";
+ String subPath = "/test/subtest";
+ assertEquals(rootPath,
+ rootClient.get().create(rootPath, "42".getBytes(),
+ ZooKeeperUtils.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT));
+ assertEquals(subPath,
+ rootClient.get().create(subPath, "37".getBytes(),
+ ZooKeeperUtils.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT));
+
+ ZooKeeperClient chrootedClient = createZkClient(rootPath);
+ assertArrayEquals("37".getBytes(), chrootedClient.get().getData("/subtest", false, null));
+ }
+
+ private void setData(ZooKeeperClient zkClient, String path, String data) throws Exception {
+ zkClient.get().setData(path, data.getBytes(), ZooKeeperUtils.ANY_VERSION);
+ }
+
+ private String getData(ZooKeeperClient zkClient, String path) throws Exception {
+ return new String(zkClient.get().getData(path, false, null));
+ }
+}
[2/4] aurora git commit: Revert removal of twitter/commons/zk based
leadership code
Posted by dm...@apache.org.
http://git-wip-us.apache.org/repos/asf/aurora/blob/16e4651d/commons/src/test/java/org/apache/aurora/common/zookeeper/ZooKeeperUtilsTest.java
----------------------------------------------------------------------
diff --git a/commons/src/test/java/org/apache/aurora/common/zookeeper/ZooKeeperUtilsTest.java b/commons/src/test/java/org/apache/aurora/common/zookeeper/ZooKeeperUtilsTest.java
new file mode 100644
index 0000000..9e482a6
--- /dev/null
+++ b/commons/src/test/java/org/apache/aurora/common/zookeeper/ZooKeeperUtilsTest.java
@@ -0,0 +1,139 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.common.zookeeper;
+
+import com.google.common.base.Charsets;
+
+import org.apache.aurora.common.zookeeper.testing.BaseZooKeeperClientTest;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException.BadVersionException;
+import org.apache.zookeeper.KeeperException.NoAuthException;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.data.Stat;
+import org.junit.Test;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * @author John Sirois
+ */
+public class ZooKeeperUtilsTest extends BaseZooKeeperClientTest {
+ @Test
+ public void testEnsurePath() throws Exception {
+ ZooKeeperClient zkClient = createZkClient();
+ zkClient.get().addAuthInfo("digest", "client1:boo".getBytes(Charsets.UTF_8));
+
+ assertNull(zkClient.get().exists("/foo", false));
+ ZooKeeperUtils.ensurePath(zkClient, ZooDefs.Ids.CREATOR_ALL_ACL, "/foo/bar/baz");
+
+ zkClient = createZkClient();
+ zkClient.get().addAuthInfo("digest", "client2:bap".getBytes(Charsets.UTF_8));
+
+ // Anyone can check for existence in ZK
+ assertNotNull(zkClient.get().exists("/foo", false));
+ assertNotNull(zkClient.get().exists("/foo/bar", false));
+ assertNotNull(zkClient.get().exists("/foo/bar/baz", false));
+
+ try {
+ zkClient.get().delete("/foo/bar/baz", -1 /* delete no matter what */);
+ fail("Expected CREATOR_ALL_ACL to be applied to created path and client2 mutations to be "
+ + "rejected");
+ } catch (NoAuthException e) {
+ // expected
+ }
+ }
+
+ @Test
+ public void testMagicVersionNumberAllowsUnconditionalUpdate() throws Exception {
+ String nodePath = "/foo";
+ ZooKeeperClient zkClient = createZkClient();
+
+ zkClient.get().create(nodePath, "init".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,
+ CreateMode.PERSISTENT);
+
+ Stat initStat = new Stat();
+ byte[] initialData = zkClient.get().getData(nodePath, false, initStat);
+ assertArrayEquals("init".getBytes(), initialData);
+
+ // bump the version
+ Stat rev1Stat = zkClient.get().setData(nodePath, "rev1".getBytes(), initStat.getVersion());
+
+ try {
+ zkClient.get().setData(nodePath, "rev2".getBytes(), initStat.getVersion());
+ fail("expected correct version to be required");
+ } catch (BadVersionException e) {
+ // expected
+ }
+
+ // expect using the correct version to work
+ Stat rev2Stat = zkClient.get().setData(nodePath, "rev2".getBytes(), rev1Stat.getVersion());
+ assertNotEquals(ZooKeeperUtils.ANY_VERSION, rev2Stat.getVersion());
+
+ zkClient.get().setData(nodePath, "force-write".getBytes(), ZooKeeperUtils.ANY_VERSION);
+ Stat forceWriteStat = new Stat();
+ byte[] forceWriteData = zkClient.get().getData(nodePath, false, forceWriteStat);
+ assertArrayEquals("force-write".getBytes(), forceWriteData);
+
+ assertTrue(forceWriteStat.getVersion() > rev2Stat.getVersion());
+ assertNotEquals(ZooKeeperUtils.ANY_VERSION, forceWriteStat.getVersion());
+ }
+
+ @Test
+ public void testNormalizingPath() throws Exception {
+ assertEquals("/", ZooKeeperUtils.normalizePath("/"));
+ assertEquals("/foo", ZooKeeperUtils.normalizePath("/foo/"));
+ assertEquals("/foo/bar", ZooKeeperUtils.normalizePath("/foo//bar"));
+ assertEquals("/foo/bar", ZooKeeperUtils.normalizePath("//foo/bar"));
+ assertEquals("/foo/bar", ZooKeeperUtils.normalizePath("/foo/bar/"));
+ assertEquals("/foo/bar", ZooKeeperUtils.normalizePath("/foo/bar//"));
+ assertEquals("/foo/bar", ZooKeeperUtils.normalizePath("/foo/bar"));
+ }
+
+ @Test
+ public void testLenientPaths() {
+ assertEquals("/", ZooKeeperUtils.normalizePath("///"));
+ assertEquals("/a/group", ZooKeeperUtils.normalizePath("/a/group"));
+ assertEquals("/a/group", ZooKeeperUtils.normalizePath("/a/group/"));
+ assertEquals("/a/group", ZooKeeperUtils.normalizePath("/a//group"));
+ assertEquals("/a/group", ZooKeeperUtils.normalizePath("/a//group//"));
+
+ try {
+ ZooKeeperUtils.normalizePath("a/group");
+ fail("Relative paths should not be allowed.");
+ } catch (IllegalArgumentException e) {
+ // expected
+ }
+
+ try {
+ ZooKeeperUtils.normalizePath("/a/./group");
+ fail("Relative paths should not be allowed.");
+ } catch (IllegalArgumentException e) {
+ // expected
+ }
+
+ try {
+ ZooKeeperUtils.normalizePath("/a/../group");
+ fail("Relative paths should not be allowed.");
+ } catch (IllegalArgumentException e) {
+ // expected
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/aurora/blob/16e4651d/config/findbugs/excludeFilter.xml
----------------------------------------------------------------------
diff --git a/config/findbugs/excludeFilter.xml b/config/findbugs/excludeFilter.xml
index 5eaa11a..f7d5ae0 100644
--- a/config/findbugs/excludeFilter.xml
+++ b/config/findbugs/excludeFilter.xml
@@ -74,14 +74,6 @@ limitations under the License.
<Bug pattern="UMAC_UNCALLABLE_METHOD_OF_ANONYMOUS_CLASS" />
</Match>
- <!-- We're forced to use the platform default encoding to generate a byte array from digest
- credentials because the underlying ZooKeeper API dictates this - also noted in the
- offending code. -->
- <Match>
- <Class name="org.apache.aurora.scheduler.discovery.Credentials" />
- <Bug pattern="DM_DEFAULT_ENCODING" />
- </Match>
-
<!-- Technical debt. -->
<Match>
<Class name="org.apache.aurora.scheduler.log.mesos.MesosLog$LogStream" />
http://git-wip-us.apache.org/repos/asf/aurora/blob/16e4651d/docs/features/service-discovery.md
----------------------------------------------------------------------
diff --git a/docs/features/service-discovery.md b/docs/features/service-discovery.md
index 511c96d..36823c8 100644
--- a/docs/features/service-discovery.md
+++ b/docs/features/service-discovery.md
@@ -6,7 +6,7 @@ the purpose of service discovery. ServerSets use the Zookeeper [group membershi
of which there are several reference implementations:
- [C++](https://github.com/apache/mesos/blob/master/src/zookeeper/group.cpp)
- - [Java](http://curator.apache.org/curator-recipes/group-member.html)
+ - [Java](https://github.com/twitter/commons/blob/master/src/java/com/twitter/common/zookeeper/ServerSetImpl.java#L221)
- [Python](https://github.com/twitter/commons/blob/master/src/python/twitter/common/zookeeper/serverset/serverset.py#L51)
These can also be used natively in Finagle using the [ZookeeperServerSetCluster](https://github.com/twitter/finagle/blob/master/finagle-serversets/src/main/scala/com/twitter/finagle/zookeeper/ZookeeperServerSetCluster.scala).
http://git-wip-us.apache.org/repos/asf/aurora/blob/16e4651d/docs/reference/scheduler-configuration.md
----------------------------------------------------------------------
diff --git a/docs/reference/scheduler-configuration.md b/docs/reference/scheduler-configuration.md
index 8955653..d4e0a9a 100644
--- a/docs/reference/scheduler-configuration.md
+++ b/docs/reference/scheduler-configuration.md
@@ -252,5 +252,7 @@ Optional flags:
Launches an embedded zookeeper server for local testing causing -zk_endpoints to be ignored if specified.
-zk_session_timeout (default (4, secs))
The ZooKeeper session timeout.
+-zk_use_curator (default true)
+ DEPRECATED: Uses Apache Curator as the zookeeper client; otherwise a copy of Twitter commons/zookeeper (the legacy library) is used.
-------------------------------------------------------------------------
```
http://git-wip-us.apache.org/repos/asf/aurora/blob/16e4651d/src/main/java/org/apache/aurora/scheduler/SchedulerLifecycle.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/SchedulerLifecycle.java b/src/main/java/org/apache/aurora/scheduler/SchedulerLifecycle.java
index 76209b1..4e354b6 100644
--- a/src/main/java/org/apache/aurora/scheduler/SchedulerLifecycle.java
+++ b/src/main/java/org/apache/aurora/scheduler/SchedulerLifecycle.java
@@ -46,8 +46,8 @@ import org.apache.aurora.common.quantity.Time;
import org.apache.aurora.common.stats.StatsProvider;
import org.apache.aurora.common.util.StateMachine;
import org.apache.aurora.common.util.StateMachine.Transition;
-import org.apache.aurora.scheduler.discovery.SingletonService;
-import org.apache.aurora.scheduler.discovery.SingletonService.LeaderControl;
+import org.apache.aurora.common.zookeeper.SingletonService;
+import org.apache.aurora.common.zookeeper.SingletonService.LeaderControl;
import org.apache.aurora.scheduler.events.PubsubEvent.DriverRegistered;
import org.apache.aurora.scheduler.events.PubsubEvent.EventSubscriber;
import org.apache.aurora.scheduler.mesos.Driver;
@@ -57,7 +57,7 @@ import org.slf4j.LoggerFactory;
import static java.util.Objects.requireNonNull;
-import static org.apache.aurora.scheduler.discovery.SingletonService.LeadershipListener;
+import static org.apache.aurora.common.zookeeper.SingletonService.LeadershipListener;
/**
* The central driver of the scheduler runtime lifecycle. Handles the transitions from startup and
http://git-wip-us.apache.org/repos/asf/aurora/blob/16e4651d/src/main/java/org/apache/aurora/scheduler/app/SchedulerMain.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/app/SchedulerMain.java b/src/main/java/org/apache/aurora/scheduler/app/SchedulerMain.java
index e0d32de..43cc5b4 100644
--- a/src/main/java/org/apache/aurora/scheduler/app/SchedulerMain.java
+++ b/src/main/java/org/apache/aurora/scheduler/app/SchedulerMain.java
@@ -42,6 +42,8 @@ import org.apache.aurora.common.args.constraints.NotEmpty;
import org.apache.aurora.common.args.constraints.NotNull;
import org.apache.aurora.common.inject.Bindings;
import org.apache.aurora.common.stats.Stats;
+import org.apache.aurora.common.zookeeper.SingletonService;
+import org.apache.aurora.common.zookeeper.SingletonService.LeadershipListener;
import org.apache.aurora.gen.ServerInfo;
import org.apache.aurora.scheduler.AppStartup;
import org.apache.aurora.scheduler.SchedulerLifecycle;
@@ -50,8 +52,6 @@ import org.apache.aurora.scheduler.configuration.executor.ExecutorModule;
import org.apache.aurora.scheduler.cron.quartz.CronModule;
import org.apache.aurora.scheduler.discovery.FlaggedZooKeeperConfig;
import org.apache.aurora.scheduler.discovery.ServiceDiscoveryModule;
-import org.apache.aurora.scheduler.discovery.SingletonService;
-import org.apache.aurora.scheduler.discovery.SingletonService.LeadershipListener;
import org.apache.aurora.scheduler.events.WebhookModule;
import org.apache.aurora.scheduler.http.HttpService;
import org.apache.aurora.scheduler.log.mesos.MesosLogStreamModule;
http://git-wip-us.apache.org/repos/asf/aurora/blob/16e4651d/src/main/java/org/apache/aurora/scheduler/app/ServiceGroupMonitor.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/app/ServiceGroupMonitor.java b/src/main/java/org/apache/aurora/scheduler/app/ServiceGroupMonitor.java
new file mode 100644
index 0000000..a1329fd
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/app/ServiceGroupMonitor.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.app;
+
+import java.io.Closeable;
+import java.util.function.Supplier;
+
+import com.google.common.collect.ImmutableSet;
+
+import org.apache.aurora.common.thrift.ServiceInstance;
+
+/**
+ * Monitors a service group's membership and supplies a live view of the most recent set.
+ */
+public interface ServiceGroupMonitor extends Supplier<ImmutableSet<ServiceInstance>>, Closeable {
+
+ /**
+ * Indicates a problem initiating monitoring of a service group.
+ */
+ class MonitorException extends Exception {
+ public MonitorException(String message, Throwable cause) {
+ super(message, cause);
+ }
+ }
+
+ /**
+ * Starts monitoring the service group.
+ *
+ * When the service group membership no longer needs to be maintained, this monitor should be
+ * {@link #close() closed}.
+ *
+ * @throws MonitorException if there is a problem initiating monitoring of the service group.
+ */
+ void start() throws MonitorException;
+}
http://git-wip-us.apache.org/repos/asf/aurora/blob/16e4651d/src/main/java/org/apache/aurora/scheduler/discovery/CommonsServiceDiscoveryModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/discovery/CommonsServiceDiscoveryModule.java b/src/main/java/org/apache/aurora/scheduler/discovery/CommonsServiceDiscoveryModule.java
new file mode 100644
index 0000000..339f63b
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/discovery/CommonsServiceDiscoveryModule.java
@@ -0,0 +1,102 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.discovery;
+
+import java.net.InetSocketAddress;
+import java.util.List;
+
+import javax.inject.Singleton;
+
+import com.google.inject.Exposed;
+import com.google.inject.PrivateModule;
+import com.google.inject.Provides;
+
+import org.apache.aurora.common.net.pool.DynamicHostSet;
+import org.apache.aurora.common.thrift.ServiceInstance;
+import org.apache.aurora.common.zookeeper.ServerSetImpl;
+import org.apache.aurora.common.zookeeper.SingletonService;
+import org.apache.aurora.common.zookeeper.SingletonServiceImpl;
+import org.apache.aurora.common.zookeeper.ZooKeeperClient;
+import org.apache.aurora.common.zookeeper.ZooKeeperUtils;
+import org.apache.aurora.scheduler.app.ServiceGroupMonitor;
+import org.apache.zookeeper.data.ACL;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Binding module for utilities to advertise the network presence of the scheduler.
+ *
+ * Uses a fork of Twitter commons/zookeeper.
+ */
+class CommonsServiceDiscoveryModule extends PrivateModule {
+
+ private final String discoveryPath;
+ private final ZooKeeperConfig zooKeeperConfig;
+
+ CommonsServiceDiscoveryModule(String discoveryPath, ZooKeeperConfig zooKeeperConfig) {
+ this.discoveryPath = ZooKeeperUtils.normalizePath(discoveryPath);
+ this.zooKeeperConfig = requireNonNull(zooKeeperConfig);
+ }
+
+ @Override
+ protected void configure() {
+ requireBinding(ServiceDiscoveryBindings.ZOO_KEEPER_CLUSTER_KEY);
+ requireBinding(ServiceDiscoveryBindings.ZOO_KEEPER_ACL_KEY);
+
+ bind(ServiceGroupMonitor.class).to(CommonsServiceGroupMonitor.class).in(Singleton.class);
+ expose(ServiceGroupMonitor.class);
+ }
+
+ @Provides
+ @Singleton
+ ZooKeeperClient provideZooKeeperClient(
+ @ServiceDiscoveryBindings.ZooKeeper Iterable<InetSocketAddress> zooKeeperCluster) {
+
+ return new ZooKeeperClient(
+ zooKeeperConfig.getSessionTimeout(),
+ zooKeeperConfig.getCredentials(),
+ zooKeeperConfig.getChrootPath(),
+ zooKeeperCluster);
+ }
+
+ @Provides
+ @Singleton
+ ServerSetImpl provideServerSet(
+ ZooKeeperClient client,
+ @ServiceDiscoveryBindings.ZooKeeper List<ACL> zooKeeperAcls) {
+
+ return new ServerSetImpl(client, zooKeeperAcls, discoveryPath);
+ }
+
+ @Provides
+ @Singleton
+ DynamicHostSet<ServiceInstance> provideServerSet(ServerSetImpl serverSet) {
+ // Used for a type re-binding of the server set.
+ return serverSet;
+ }
+
+ // NB: We only take a ServerSetImpl instead of a ServerSet here to simplify binding.
+ @Provides
+ @Singleton
+ @Exposed
+ SingletonService provideSingletonService(
+ ZooKeeperClient client,
+ ServerSetImpl serverSet,
+ @ServiceDiscoveryBindings.ZooKeeper List<ACL> zookeeperAcls) {
+
+ return new SingletonServiceImpl(
+ serverSet,
+ SingletonServiceImpl.createSingletonCandidate(client, discoveryPath, zookeeperAcls));
+ }
+}
http://git-wip-us.apache.org/repos/asf/aurora/blob/16e4651d/src/main/java/org/apache/aurora/scheduler/discovery/CommonsServiceGroupMonitor.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/discovery/CommonsServiceGroupMonitor.java b/src/main/java/org/apache/aurora/scheduler/discovery/CommonsServiceGroupMonitor.java
new file mode 100644
index 0000000..9161455
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/discovery/CommonsServiceGroupMonitor.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.discovery;
+
+import java.util.Optional;
+import java.util.concurrent.atomic.AtomicReference;
+
+import javax.inject.Inject;
+
+import com.google.common.collect.ImmutableSet;
+
+import org.apache.aurora.common.base.Command;
+import org.apache.aurora.common.net.pool.DynamicHostSet;
+import org.apache.aurora.common.thrift.ServiceInstance;
+import org.apache.aurora.scheduler.app.ServiceGroupMonitor;
+
+import static java.util.Objects.requireNonNull;
+
+class CommonsServiceGroupMonitor implements ServiceGroupMonitor {
+ private Optional<Command> closeCommand = Optional.empty();
+ private final DynamicHostSet<ServiceInstance> serverSet;
+ private final AtomicReference<ImmutableSet<ServiceInstance>> services =
+ new AtomicReference<>(ImmutableSet.of());
+
+ @Inject
+ CommonsServiceGroupMonitor(DynamicHostSet<ServiceInstance> serverSet) {
+ this.serverSet = requireNonNull(serverSet);
+ }
+
+ @Override
+ public void start() throws MonitorException {
+ try {
+ closeCommand = Optional.of(serverSet.watch(services::set));
+ } catch (DynamicHostSet.MonitorException e) {
+ throw new MonitorException("Unable to watch scheduler host set.", e);
+ }
+ }
+
+ @Override
+ public void close() {
+ closeCommand.ifPresent(Command::execute);
+ }
+
+ @Override
+ public ImmutableSet<ServiceInstance> get() {
+ return services.get();
+ }
+}
http://git-wip-us.apache.org/repos/asf/aurora/blob/16e4651d/src/main/java/org/apache/aurora/scheduler/discovery/Credentials.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/discovery/Credentials.java b/src/main/java/org/apache/aurora/scheduler/discovery/Credentials.java
deleted file mode 100644
index 75d58e7..0000000
--- a/src/main/java/org/apache/aurora/scheduler/discovery/Credentials.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.scheduler.discovery;
-
-import java.util.Arrays;
-
-import com.google.common.base.Objects;
-import com.google.common.base.Preconditions;
-
-import org.apache.aurora.common.base.MorePreconditions;
-import org.apache.commons.lang.builder.EqualsBuilder;
-
-import static java.util.Objects.requireNonNull;
-
-/**
- * Encapsulates a user's ZooKeeper credentials.
- */
-public final class Credentials {
-
- /**
- * Creates a set of credentials for the ZooKeeper digest authentication mechanism.
- *
- * @param username the username to authenticate with
- * @param password the password to authenticate with
- * @return a set of credentials that can be used to authenticate the zoo keeper client
- */
- public static Credentials digestCredentials(String username, String password) {
- MorePreconditions.checkNotBlank(username);
- Preconditions.checkNotNull(password);
-
- // TODO(John Sirois): DigestAuthenticationProvider is broken - uses platform default charset
- // (on server) and so we just have to hope here that clients are deployed in compatible jvms.
- // Consider writing and installing a version of DigestAuthenticationProvider that controls its
- // Charset explicitly.
- return new Credentials("digest", (username + ":" + password).getBytes());
- }
-
- private final String authScheme;
- private final byte[] authToken;
-
- /**
- * Creates a new set of credentials for the given ZooKeeper authentication scheme.
- *
- * @param scheme The name of the authentication scheme the {@code token} is valid in.
- * @param token The authentication token for the given {@code scheme}.
- */
- public Credentials(String scheme, byte[] token) {
- authScheme = MorePreconditions.checkNotBlank(scheme);
- authToken = requireNonNull(token);
- }
-
- /**
- * Returns the authentication scheme these credentials are for.
- *
- * @return the scheme these credentials are for.
- */
- public String scheme() {
- return authScheme;
- }
-
- /**
- * Returns the authentication token.
- *
- * @return the authentication token.
- */
- public byte[] token() {
- return Arrays.copyOf(authToken, authToken.length);
- }
-
- @Override
- public boolean equals(Object o) {
- if (!(o instanceof Credentials)) {
- return false;
- }
-
- Credentials other = (Credentials) o;
- return new EqualsBuilder()
- .append(authScheme, other.scheme())
- .append(authToken, other.token())
- .isEquals();
- }
-
- @Override
- public int hashCode() {
- return Objects.hashCode(authScheme, authToken);
- }
-}
http://git-wip-us.apache.org/repos/asf/aurora/blob/16e4651d/src/main/java/org/apache/aurora/scheduler/discovery/CuratorServiceDiscoveryModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/discovery/CuratorServiceDiscoveryModule.java b/src/main/java/org/apache/aurora/scheduler/discovery/CuratorServiceDiscoveryModule.java
index e690d14..999a542 100644
--- a/src/main/java/org/apache/aurora/scheduler/discovery/CuratorServiceDiscoveryModule.java
+++ b/src/main/java/org/apache/aurora/scheduler/discovery/CuratorServiceDiscoveryModule.java
@@ -33,6 +33,10 @@ import org.apache.aurora.common.net.InetSocketAddressHelper;
import org.apache.aurora.common.quantity.Amount;
import org.apache.aurora.common.quantity.Time;
import org.apache.aurora.common.thrift.ServiceInstance;
+import org.apache.aurora.common.zookeeper.Credentials;
+import org.apache.aurora.common.zookeeper.ServerSet;
+import org.apache.aurora.common.zookeeper.SingletonService;
+import org.apache.aurora.scheduler.app.ServiceGroupMonitor;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
@@ -64,7 +68,7 @@ class CuratorServiceDiscoveryModule extends PrivateModule {
requireBinding(ServiceDiscoveryBindings.ZOO_KEEPER_CLUSTER_KEY);
requireBinding(ServiceDiscoveryBindings.ZOO_KEEPER_ACL_KEY);
- bind(new TypeLiteral<Codec<ServiceInstance>>() { }).toInstance(JsonCodec.INSTANCE);
+ bind(new TypeLiteral<Codec<ServiceInstance>>() { }).toInstance(ServerSet.JSON_CODEC);
}
@Provides
@@ -102,7 +106,7 @@ class CuratorServiceDiscoveryModule extends PrivateModule {
if (zooKeeperConfig.getCredentials().isPresent()) {
Credentials credentials = zooKeeperConfig.getCredentials().get();
- builder.authorization(credentials.scheme(), credentials.token());
+ builder.authorization(credentials.scheme(), credentials.authToken());
}
CuratorFramework curatorFramework = builder.build();
http://git-wip-us.apache.org/repos/asf/aurora/blob/16e4651d/src/main/java/org/apache/aurora/scheduler/discovery/CuratorServiceGroupMonitor.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/discovery/CuratorServiceGroupMonitor.java b/src/main/java/org/apache/aurora/scheduler/discovery/CuratorServiceGroupMonitor.java
index db886df..0b86fb6 100644
--- a/src/main/java/org/apache/aurora/scheduler/discovery/CuratorServiceGroupMonitor.java
+++ b/src/main/java/org/apache/aurora/scheduler/discovery/CuratorServiceGroupMonitor.java
@@ -24,6 +24,7 @@ import org.apache.aurora.GuavaUtils;
import org.apache.aurora.common.io.Codec;
import org.apache.aurora.common.thrift.ServiceInstance;
import org.apache.aurora.scheduler.app.SchedulerMain;
+import org.apache.aurora.scheduler.app.ServiceGroupMonitor;
import org.apache.curator.framework.recipes.cache.ChildData;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.utils.ZKPaths;
http://git-wip-us.apache.org/repos/asf/aurora/blob/16e4651d/src/main/java/org/apache/aurora/scheduler/discovery/CuratorSingletonService.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/discovery/CuratorSingletonService.java b/src/main/java/org/apache/aurora/scheduler/discovery/CuratorSingletonService.java
index 321bbb3..c378172 100644
--- a/src/main/java/org/apache/aurora/scheduler/discovery/CuratorSingletonService.java
+++ b/src/main/java/org/apache/aurora/scheduler/discovery/CuratorSingletonService.java
@@ -27,6 +27,7 @@ import org.apache.aurora.common.io.Codec;
import org.apache.aurora.common.thrift.Endpoint;
import org.apache.aurora.common.thrift.ServiceInstance;
import org.apache.aurora.common.thrift.Status;
+import org.apache.aurora.common.zookeeper.SingletonService;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.leader.LeaderLatch;
import org.apache.curator.framework.recipes.leader.LeaderLatchListener;
http://git-wip-us.apache.org/repos/asf/aurora/blob/16e4651d/src/main/java/org/apache/aurora/scheduler/discovery/FlaggedZooKeeperConfig.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/discovery/FlaggedZooKeeperConfig.java b/src/main/java/org/apache/aurora/scheduler/discovery/FlaggedZooKeeperConfig.java
index d5019bf..e8aafe4 100644
--- a/src/main/java/org/apache/aurora/scheduler/discovery/FlaggedZooKeeperConfig.java
+++ b/src/main/java/org/apache/aurora/scheduler/discovery/FlaggedZooKeeperConfig.java
@@ -15,10 +15,10 @@ package org.apache.aurora.scheduler.discovery;
import java.net.InetSocketAddress;
import java.util.List;
-import java.util.Optional;
import javax.annotation.Nullable;
+import com.google.common.base.Optional;
import com.google.common.base.Splitter;
import com.google.common.collect.ImmutableList;
@@ -27,12 +27,23 @@ import org.apache.aurora.common.args.CmdLine;
import org.apache.aurora.common.args.constraints.NotEmpty;
import org.apache.aurora.common.quantity.Amount;
import org.apache.aurora.common.quantity.Time;
+import org.apache.aurora.common.zookeeper.Credentials;
+import org.apache.aurora.common.zookeeper.ZooKeeperUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* A factory that creates a {@link ZooKeeperConfig} instance based on command line argument
* values.
*/
public final class FlaggedZooKeeperConfig {
+ private static final Logger LOG = LoggerFactory.getLogger(FlaggedZooKeeperConfig.class);
+
+ @CmdLine(name = "zk_use_curator",
+ help = "DEPRECATED: Uses Apache Curator as the zookeeper client; otherwise a copy of Twitter "
+ + "commons/zookeeper (the legacy library) is used.")
+ private static final Arg<Boolean> USE_CURATOR = Arg.create(true);
+
@CmdLine(name = "zk_in_proc",
help = "Launches an embedded zookeeper server for local testing causing -zk_endpoints "
+ "to be ignored if specified.")
@@ -63,9 +74,13 @@ public final class FlaggedZooKeeperConfig {
* @return Configuration instance.
*/
public static ZooKeeperConfig create() {
+ if (USE_CURATOR.hasAppliedValue()) {
+ LOG.warn("The -zk_use_curator flag is deprecated and will be removed in a future release.");
+ }
return new ZooKeeperConfig(
+ USE_CURATOR.get(),
ZK_ENDPOINTS.get(),
- Optional.ofNullable(CHROOT_PATH.get()),
+ Optional.fromNullable(CHROOT_PATH.get()),
IN_PROCESS.get(),
SESSION_TIMEOUT.get(),
getCredentials(DIGEST_CREDENTIALS.get()));
@@ -73,7 +88,7 @@ public final class FlaggedZooKeeperConfig {
private static Optional<Credentials> getCredentials(@Nullable String userAndPass) {
if (userAndPass == null) {
- return Optional.empty();
+ return Optional.absent();
}
List<String> parts = ImmutableList.copyOf(Splitter.on(":").split(userAndPass));
http://git-wip-us.apache.org/repos/asf/aurora/blob/16e4651d/src/main/java/org/apache/aurora/scheduler/discovery/JsonCodec.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/discovery/JsonCodec.java b/src/main/java/org/apache/aurora/scheduler/discovery/JsonCodec.java
deleted file mode 100644
index 9d22b76..0000000
--- a/src/main/java/org/apache/aurora/scheduler/discovery/JsonCodec.java
+++ /dev/null
@@ -1,147 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.scheduler.discovery;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-import java.io.OutputStream;
-import java.io.OutputStreamWriter;
-import java.io.Writer;
-import java.nio.charset.Charset;
-import java.util.Map;
-
-import javax.annotation.Nullable;
-
-import com.google.common.base.Charsets;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Maps;
-import com.google.gson.Gson;
-import com.google.gson.JsonIOException;
-import com.google.gson.JsonParseException;
-
-import org.apache.aurora.common.io.Codec;
-import org.apache.aurora.common.thrift.Endpoint;
-import org.apache.aurora.common.thrift.ServiceInstance;
-import org.apache.aurora.common.thrift.Status;
-
-import static java.util.Objects.requireNonNull;
-
-/**
- * Encodes a {@link ServiceInstance} as a JSON object.
- */
-class JsonCodec implements Codec<ServiceInstance> {
-
- private static void assertRequiredField(String fieldName, Object fieldValue) {
- if (fieldValue == null) {
- throw new JsonParseException(String.format("Field %s is required", fieldName));
- }
- }
-
- private static class EndpointSchema {
- private final String host;
- private final Integer port;
-
- EndpointSchema(Endpoint endpoint) {
- host = endpoint.getHost();
- port = endpoint.getPort();
- }
-
- Endpoint asEndpoint() {
- assertRequiredField("host", host);
- assertRequiredField("port", port);
-
- return new Endpoint(host, port);
- }
- }
-
- private static class ServiceInstanceSchema {
- private final EndpointSchema serviceEndpoint;
- private final Map<String, EndpointSchema> additionalEndpoints;
- private final Status status;
- @Nullable private final Integer shard;
-
- ServiceInstanceSchema(ServiceInstance instance) {
- serviceEndpoint = new EndpointSchema(instance.getServiceEndpoint());
- if (instance.isSetAdditionalEndpoints()) {
- additionalEndpoints =
- Maps.transformValues(instance.getAdditionalEndpoints(), EndpointSchema::new);
- } else {
- additionalEndpoints = ImmutableMap.of();
- }
- status = instance.getStatus();
- shard = instance.isSetShard() ? instance.getShard() : null;
- }
-
- ServiceInstance asServiceInstance() {
- assertRequiredField("serviceEndpoint", serviceEndpoint);
- assertRequiredField("status", status);
-
- Map<String, EndpointSchema> extraEndpoints =
- additionalEndpoints == null ? ImmutableMap.of() : additionalEndpoints;
-
- ServiceInstance instance =
- new ServiceInstance(
- serviceEndpoint.asEndpoint(),
- Maps.transformValues(extraEndpoints, EndpointSchema::asEndpoint),
- status);
- if (shard != null) {
- instance.setShard(shard);
- }
- return instance;
- }
- }
-
- /**
- * The encoding for service instance data in ZooKeeper expected by Aurora clients.
- */
- static final Codec<ServiceInstance> INSTANCE = new JsonCodec();
-
- private static final Charset ENCODING = Charsets.UTF_8;
-
- private final Gson gson;
-
- private JsonCodec() {
- this(new Gson());
- }
-
- JsonCodec(Gson gson) {
- this.gson = requireNonNull(gson);
- }
-
- @Override
- public void serialize(ServiceInstance instance, OutputStream sink) throws IOException {
- Writer writer = new OutputStreamWriter(sink, ENCODING);
- try {
- gson.toJson(new ServiceInstanceSchema(instance), writer);
- } catch (JsonIOException e) {
- throw new IOException(String.format("Problem serializing %s to JSON", instance), e);
- }
- writer.flush();
- }
-
- @Override
- public ServiceInstance deserialize(InputStream source) throws IOException {
- InputStreamReader reader = new InputStreamReader(source, ENCODING);
- try {
- @Nullable ServiceInstanceSchema schema = gson.fromJson(reader, ServiceInstanceSchema.class);
- if (schema == null) {
- throw new IOException("JSON did not include a ServiceInstance object");
- }
- return schema.asServiceInstance();
- } catch (JsonParseException e) {
- throw new IOException("Problem parsing JSON ServiceInstance.", e);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/aurora/blob/16e4651d/src/main/java/org/apache/aurora/scheduler/discovery/ServiceDiscoveryModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/discovery/ServiceDiscoveryModule.java b/src/main/java/org/apache/aurora/scheduler/discovery/ServiceDiscoveryModule.java
index b7ca62c..07a6302 100644
--- a/src/main/java/org/apache/aurora/scheduler/discovery/ServiceDiscoveryModule.java
+++ b/src/main/java/org/apache/aurora/scheduler/discovery/ServiceDiscoveryModule.java
@@ -25,14 +25,16 @@ import com.google.common.io.Files;
import com.google.common.util.concurrent.AbstractIdleService;
import com.google.inject.AbstractModule;
import com.google.inject.Inject;
+import com.google.inject.Module;
import com.google.inject.Provider;
import com.google.inject.Provides;
import com.google.inject.binder.LinkedBindingBuilder;
import org.apache.aurora.common.application.ShutdownRegistry;
import org.apache.aurora.common.base.MorePreconditions;
+import org.apache.aurora.common.zookeeper.ZooKeeperUtils;
+import org.apache.aurora.common.zookeeper.testing.ZooKeeperTestServer;
import org.apache.aurora.scheduler.SchedulerServicesModule;
-import org.apache.aurora.scheduler.discovery.testing.ZooKeeperTestServer;
import org.apache.zookeeper.data.ACL;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -44,15 +46,15 @@ import static java.util.Objects.requireNonNull;
*/
public class ServiceDiscoveryModule extends AbstractModule {
- private static final Logger LOG = LoggerFactory.getLogger(ServiceDiscoveryModule.class);
+ private static final Logger LOG = LoggerFactory.getLogger(CommonsServiceDiscoveryModule.class);
private final ZooKeeperConfig zooKeeperConfig;
private final String discoveryPath;
/**
* Creates a Guice module that will bind a
- * {@link SingletonService} for scheduler leader election and a
- * {@link ServiceGroupMonitor} that can be used to find the
+ * {@link org.apache.aurora.common.zookeeper.SingletonService} for scheduler leader election and a
+ * {@link org.apache.aurora.scheduler.app.ServiceGroupMonitor} that can be used to find the
* leading scheduler.
*
* @param zooKeeperConfig The ZooKeeper client configuration to use to interact with ZooKeeper.
@@ -80,7 +82,15 @@ public class ServiceDiscoveryModule extends AbstractModule {
clusterBinder.toInstance(zooKeeperConfig.getServers());
}
- install(new CuratorServiceDiscoveryModule(discoveryPath, zooKeeperConfig));
+ install(discoveryModule());
+ }
+
+ private Module discoveryModule() {
+ if (zooKeeperConfig.isUseCurator()) {
+ return new CuratorServiceDiscoveryModule(discoveryPath, zooKeeperConfig);
+ } else {
+ return new CommonsServiceDiscoveryModule(discoveryPath, zooKeeperConfig);
+ }
}
@Provides
http://git-wip-us.apache.org/repos/asf/aurora/blob/16e4651d/src/main/java/org/apache/aurora/scheduler/discovery/ServiceGroupMonitor.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/discovery/ServiceGroupMonitor.java b/src/main/java/org/apache/aurora/scheduler/discovery/ServiceGroupMonitor.java
deleted file mode 100644
index fea896c..0000000
--- a/src/main/java/org/apache/aurora/scheduler/discovery/ServiceGroupMonitor.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.scheduler.discovery;
-
-import java.io.Closeable;
-import java.util.function.Supplier;
-
-import com.google.common.collect.ImmutableSet;
-
-import org.apache.aurora.common.thrift.ServiceInstance;
-
-/**
- * Monitors a service group's membership and supplies a live view of the most recent set.
- */
-public interface ServiceGroupMonitor extends Supplier<ImmutableSet<ServiceInstance>>, Closeable {
-
- /**
- * Indicates a problem initiating monitoring of a service group.
- */
- class MonitorException extends Exception {
- MonitorException(String message, Throwable cause) {
- super(message, cause);
- }
- }
-
- /**
- * Starts monitoring the service group.
- *
- * When the service group membership no longer needs to be maintained, this monitor should be
- * {@link #close() closed}.
- *
- * @throws MonitorException if there is a problem initiating monitoring of the service group.
- */
- void start() throws MonitorException;
-}
http://git-wip-us.apache.org/repos/asf/aurora/blob/16e4651d/src/main/java/org/apache/aurora/scheduler/discovery/SingletonService.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/discovery/SingletonService.java b/src/main/java/org/apache/aurora/scheduler/discovery/SingletonService.java
deleted file mode 100644
index adbc318..0000000
--- a/src/main/java/org/apache/aurora/scheduler/discovery/SingletonService.java
+++ /dev/null
@@ -1,114 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.scheduler.discovery;
-
-import java.net.InetSocketAddress;
-import java.util.Map;
-
-/**
- * A service that uses master election to only allow a single service instance to be active amongst
- * a set of potential servers at a time.
- */
-public interface SingletonService {
-
- /**
- * Indicates an error attempting to lead a group of servers.
- */
- class LeadException extends Exception {
- LeadException(String message, Throwable cause) {
- super(message, cause);
- }
- }
-
- /**
- * Indicates an error attempting to advertise leadership of a group of servers.
- */
- class AdvertiseException extends Exception {
- AdvertiseException(String message) {
- super(message);
- }
-
- AdvertiseException(String message, Throwable cause) {
- super(message, cause);
- }
- }
-
- /**
- * Indicates an error attempting to leave a group of servers, abdicating leadership of the group.
- */
- class LeaveException extends Exception {
- LeaveException(String message, Throwable cause) {
- super(message, cause);
- }
- }
-
- /**
- * Attempts to lead the singleton service.
- *
- * @param endpoint The primary endpoint to register as a leader candidate in the service.
- * @param additionalEndpoints Additional endpoints that are available on the host.
- * @param listener Handler to call when the candidate is elected or defeated.
- * @throws LeadException If there was a problem joining or watching the ZooKeeper group.
- * @throws InterruptedException If the thread watching/joining the group was interrupted.
- */
- void lead(
- InetSocketAddress endpoint,
- Map<String, InetSocketAddress> additionalEndpoints,
- LeadershipListener listener)
- throws LeadException, InterruptedException;
-
- /**
- * A listener to be notified of changes in the leadership status.
- * Implementers should be careful to avoid blocking operations in these callbacks.
- */
- interface LeadershipListener {
-
- /**
- * Notifies the listener that is is current leader.
- *
- * @param control A controller handle to advertise and/or leave advertised presence.
- */
- void onLeading(LeaderControl control);
-
- /**
- * Notifies the listener that it is no longer leader.
- */
- void onDefeated();
- }
-
- /**
- * A controller for the state of the leader. This will be provided to the leader upon election,
- * which allows the leader to decide when to advertise as leader of the server set and terminate
- * leadership at will.
- */
- interface LeaderControl {
-
- /**
- * Advertises the leader's server presence to clients.
- *
- * @throws AdvertiseException If there was an error advertising the singleton leader to clients
- * of the server set.
- * @throws InterruptedException If interrupted while advertising.
- */
- void advertise() throws AdvertiseException, InterruptedException;
-
- /**
- * Leaves candidacy for leadership, removing advertised server presence if applicable.
- *
- * @throws LeaveException If the leader's status could not be updated or there was an error
- * abdicating server set leadership.
- */
- void leave() throws LeaveException;
- }
-}
http://git-wip-us.apache.org/repos/asf/aurora/blob/16e4651d/src/main/java/org/apache/aurora/scheduler/discovery/ZooKeeperConfig.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/discovery/ZooKeeperConfig.java b/src/main/java/org/apache/aurora/scheduler/discovery/ZooKeeperConfig.java
index acb7905..3f32a62 100644
--- a/src/main/java/org/apache/aurora/scheduler/discovery/ZooKeeperConfig.java
+++ b/src/main/java/org/apache/aurora/scheduler/discovery/ZooKeeperConfig.java
@@ -14,11 +14,14 @@
package org.apache.aurora.scheduler.discovery;
import java.net.InetSocketAddress;
-import java.util.Optional;
+
+import com.google.common.base.Optional;
import org.apache.aurora.common.base.MorePreconditions;
import org.apache.aurora.common.quantity.Amount;
import org.apache.aurora.common.quantity.Time;
+import org.apache.aurora.common.zookeeper.Credentials;
+import org.apache.aurora.common.zookeeper.ZooKeeperUtils;
import static java.util.Objects.requireNonNull;
@@ -32,18 +35,21 @@ public class ZooKeeperConfig {
/**
* Creates a new client configuration with defaults for the session timeout and credentials.
*
+ * @param useCurator {@code true} to use Apache Curator; otherwise commons/zookeeper is used.
* @param servers ZooKeeper server addresses.
* @return A new configuration.
*/
- public static ZooKeeperConfig create(Iterable<InetSocketAddress> servers) {
+ public static ZooKeeperConfig create(boolean useCurator, Iterable<InetSocketAddress> servers) {
return new ZooKeeperConfig(
+ useCurator,
servers,
- Optional.empty(), // chrootPath
+ Optional.absent(), // chrootPath
false,
ZooKeeperUtils.DEFAULT_ZK_SESSION_TIMEOUT,
- Optional.empty()); // credentials
+ Optional.absent()); // credentials
}
+ private final boolean useCurator;
private final Iterable<InetSocketAddress> servers;
private final boolean inProcess;
private final Amount<Integer, Time> sessionTimeout;
@@ -60,12 +66,14 @@ public class ZooKeeperConfig {
* @param credentials ZooKeeper authentication credentials.
*/
ZooKeeperConfig(
+ boolean useCurator,
Iterable<InetSocketAddress> servers,
Optional<String> chrootPath,
boolean inProcess,
Amount<Integer, Time> sessionTimeout,
Optional<Credentials> credentials) {
+ this.useCurator = useCurator;
this.servers = MorePreconditions.checkNotBlank(servers);
this.chrootPath = requireNonNull(chrootPath);
this.inProcess = inProcess;
@@ -82,6 +90,7 @@ public class ZooKeeperConfig {
*/
public ZooKeeperConfig withCredentials(Credentials newCredentials) {
return new ZooKeeperConfig(
+ useCurator,
servers,
chrootPath,
inProcess,
@@ -89,6 +98,10 @@ public class ZooKeeperConfig {
Optional.of(newCredentials));
}
+ boolean isUseCurator() {
+ return useCurator;
+ }
+
public Iterable<InetSocketAddress> getServers() {
return servers;
}
http://git-wip-us.apache.org/repos/asf/aurora/blob/16e4651d/src/main/java/org/apache/aurora/scheduler/discovery/ZooKeeperUtils.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/discovery/ZooKeeperUtils.java b/src/main/java/org/apache/aurora/scheduler/discovery/ZooKeeperUtils.java
deleted file mode 100644
index 211aa50..0000000
--- a/src/main/java/org/apache/aurora/scheduler/discovery/ZooKeeperUtils.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.scheduler.discovery;
-
-import com.google.common.collect.ImmutableList;
-
-import org.apache.aurora.common.quantity.Amount;
-import org.apache.aurora.common.quantity.Time;
-import org.apache.zookeeper.ZooDefs.Ids;
-import org.apache.zookeeper.data.ACL;
-
-/**
- * Utilities for dealing with ZooKeeper.
- */
-final class ZooKeeperUtils {
-
- /**
- * An appropriate default session timeout for ZooKeeper clusters.
- */
- static final Amount<Integer, Time> DEFAULT_ZK_SESSION_TIMEOUT = Amount.of(4, Time.SECONDS);
-
- /**
- * An ACL that gives all permissions any user authenticated or not.
- */
- static final ImmutableList<ACL> OPEN_ACL_UNSAFE =
- ImmutableList.copyOf(Ids.OPEN_ACL_UNSAFE);
-
- /**
- * An ACL that gives all permissions to node creators and read permissions only to everyone else.
- */
- static final ImmutableList<ACL> EVERYONE_READ_CREATOR_ALL =
- ImmutableList.<ACL>builder()
- .addAll(Ids.CREATOR_ALL_ACL)
- .addAll(Ids.READ_ACL_UNSAFE)
- .build();
-
- private ZooKeeperUtils() {
- // utility
- }
-}
http://git-wip-us.apache.org/repos/asf/aurora/blob/16e4651d/src/main/java/org/apache/aurora/scheduler/discovery/testing/BaseZooKeeperTest.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/discovery/testing/BaseZooKeeperTest.java b/src/main/java/org/apache/aurora/scheduler/discovery/testing/BaseZooKeeperTest.java
deleted file mode 100644
index d84037e..0000000
--- a/src/main/java/org/apache/aurora/scheduler/discovery/testing/BaseZooKeeperTest.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.scheduler.discovery.testing;
-
-import org.apache.aurora.common.testing.TearDownTestCase;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.rules.TemporaryFolder;
-
-/**
- * A base-class for in-process zookeeper tests.
- */
-public abstract class BaseZooKeeperTest extends TearDownTestCase {
-
- private ZooKeeperTestServer zkTestServer;
-
- @Rule
- public TemporaryFolder tmpFolder = new TemporaryFolder();
-
- @Before
- public final void setUp() throws Exception {
- zkTestServer = new ZooKeeperTestServer(tmpFolder.newFolder(), tmpFolder.newFolder());
- addTearDown(zkTestServer::stop);
- zkTestServer.startNetwork();
- }
-
- /**
- * Returns the running in-process ZooKeeper server.
- *
- * @return The in-process ZooKeeper server.
- */
- protected final ZooKeeperTestServer getServer() {
- return zkTestServer;
- }
-
- /**
- * Returns the current port to connect to the in-process zookeeper instance.
- */
- protected final int getPort() {
- return getServer().getPort();
- }
-}
http://git-wip-us.apache.org/repos/asf/aurora/blob/16e4651d/src/main/java/org/apache/aurora/scheduler/discovery/testing/ZooKeeperTestServer.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/discovery/testing/ZooKeeperTestServer.java b/src/main/java/org/apache/aurora/scheduler/discovery/testing/ZooKeeperTestServer.java
deleted file mode 100644
index a7bb48b..0000000
--- a/src/main/java/org/apache/aurora/scheduler/discovery/testing/ZooKeeperTestServer.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.scheduler.discovery.testing;
-
-import java.io.File;
-import java.io.IOException;
-import java.net.InetSocketAddress;
-
-import com.google.common.base.Preconditions;
-
-import org.apache.zookeeper.server.NIOServerCnxnFactory;
-import org.apache.zookeeper.server.ServerCnxnFactory;
-import org.apache.zookeeper.server.ZooKeeperServer;
-import org.apache.zookeeper.server.ZooKeeperServer.BasicDataTreeBuilder;
-import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
-
-/**
- * A helper class for starting in-process ZooKeeper server and clients.
- *
- * <p>This is ONLY meant to be used for testing.
- */
-public class ZooKeeperTestServer {
-
- private final File dataDir;
- private final File snapDir;
-
- private ZooKeeperServer zooKeeperServer;
- private ServerCnxnFactory connectionFactory;
- private int port;
-
- public ZooKeeperTestServer(File dataDir, File snapDir) {
- this.dataDir = Preconditions.checkNotNull(dataDir);
- this.snapDir = Preconditions.checkNotNull(snapDir);
- }
-
- /**
- * Starts zookeeper up on an ephemeral port.
- */
- public void startNetwork() throws IOException, InterruptedException {
- zooKeeperServer =
- new ZooKeeperServer(
- new FileTxnSnapLog(dataDir, snapDir),
- new BasicDataTreeBuilder()) {
-
- // TODO(John Sirois): Introduce a builder to configure the in-process server if and when
- // some folks need JMX for in-process tests.
- @Override protected void registerJMX() {
- // noop
- }
- };
-
- connectionFactory = new NIOServerCnxnFactory();
- connectionFactory.configure(
- new InetSocketAddress(port),
- 60 /* Semi-arbitrary, max 60 connections is the default used by NIOServerCnxnFactory */);
- connectionFactory.startup(zooKeeperServer);
- port = zooKeeperServer.getClientPort();
- }
-
- /**
- * Stops the zookeeper server.
- */
- public void stop() {
- if (connectionFactory != null) {
- connectionFactory.shutdown(); // Also shuts down zooKeeperServer.
- connectionFactory = null;
- }
- }
-
- /**
- * Expires the client session with the given {@code sessionId}.
- *
- * @param sessionId The id of the client session to expire.
- */
- public final void expireClientSession(long sessionId) {
- zooKeeperServer.closeSession(sessionId);
- }
-
- /**
- * Returns the current port to connect to the in-process zookeeper instance.
- */
- public final int getPort() {
- checkEphemeralPortAssigned();
- return port;
- }
-
- private void checkEphemeralPortAssigned() {
- Preconditions.checkState(port > 0, "startNetwork must be called first");
- }
-}
http://git-wip-us.apache.org/repos/asf/aurora/blob/16e4651d/src/main/java/org/apache/aurora/scheduler/http/JettyServerModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/http/JettyServerModule.java b/src/main/java/org/apache/aurora/scheduler/http/JettyServerModule.java
index af8567f..53ebc0b 100644
--- a/src/main/java/org/apache/aurora/scheduler/http/JettyServerModule.java
+++ b/src/main/java/org/apache/aurora/scheduler/http/JettyServerModule.java
@@ -64,7 +64,7 @@ import org.apache.aurora.common.net.http.handlers.TimeSeriesDataSource;
import org.apache.aurora.common.net.http.handlers.VarsHandler;
import org.apache.aurora.common.net.http.handlers.VarsJsonHandler;
import org.apache.aurora.scheduler.SchedulerServicesModule;
-import org.apache.aurora.scheduler.discovery.ServiceGroupMonitor.MonitorException;
+import org.apache.aurora.scheduler.app.ServiceGroupMonitor.MonitorException;
import org.apache.aurora.scheduler.http.api.ApiModule;
import org.apache.aurora.scheduler.http.api.security.HttpSecurityModule;
import org.apache.aurora.scheduler.thrift.ThriftModule;
http://git-wip-us.apache.org/repos/asf/aurora/blob/16e4651d/src/main/java/org/apache/aurora/scheduler/http/LeaderRedirect.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/http/LeaderRedirect.java b/src/main/java/org/apache/aurora/scheduler/http/LeaderRedirect.java
index 662d6d5..bc0e2a8 100644
--- a/src/main/java/org/apache/aurora/scheduler/http/LeaderRedirect.java
+++ b/src/main/java/org/apache/aurora/scheduler/http/LeaderRedirect.java
@@ -27,8 +27,8 @@ import com.google.common.net.HostAndPort;
import org.apache.aurora.common.thrift.Endpoint;
import org.apache.aurora.common.thrift.ServiceInstance;
-import org.apache.aurora.scheduler.discovery.ServiceGroupMonitor;
-import org.apache.aurora.scheduler.discovery.ServiceGroupMonitor.MonitorException;
+import org.apache.aurora.scheduler.app.ServiceGroupMonitor;
+import org.apache.aurora.scheduler.app.ServiceGroupMonitor.MonitorException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
http://git-wip-us.apache.org/repos/asf/aurora/blob/16e4651d/src/main/java/org/apache/aurora/scheduler/log/mesos/MesosLogStreamModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/log/mesos/MesosLogStreamModule.java b/src/main/java/org/apache/aurora/scheduler/log/mesos/MesosLogStreamModule.java
index c7c0387..6704a32 100644
--- a/src/main/java/org/apache/aurora/scheduler/log/mesos/MesosLogStreamModule.java
+++ b/src/main/java/org/apache/aurora/scheduler/log/mesos/MesosLogStreamModule.java
@@ -33,8 +33,8 @@ import org.apache.aurora.common.args.CmdLine;
import org.apache.aurora.common.net.InetSocketAddressHelper;
import org.apache.aurora.common.quantity.Amount;
import org.apache.aurora.common.quantity.Time;
+import org.apache.aurora.common.zookeeper.Credentials;
import org.apache.aurora.gen.storage.LogEntry;
-import org.apache.aurora.scheduler.discovery.Credentials;
import org.apache.aurora.scheduler.discovery.ZooKeeperConfig;
import org.apache.aurora.scheduler.log.mesos.LogInterface.ReaderInterface;
import org.apache.aurora.scheduler.log.mesos.LogInterface.WriterInterface;
@@ -157,7 +157,7 @@ public class MesosLogStreamModule extends PrivateModule {
zkClientConfig.getSessionTimeout().getUnit().getTimeUnit(),
zkLogGroupPath,
zkCredentials.scheme(),
- zkCredentials.token());
+ zkCredentials.authToken());
} else {
return new Log(
QUORUM_SIZE.get(),
http://git-wip-us.apache.org/repos/asf/aurora/blob/16e4651d/src/test/java/org/apache/aurora/scheduler/SchedulerLifecycleTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/SchedulerLifecycleTest.java b/src/test/java/org/apache/aurora/scheduler/SchedulerLifecycleTest.java
index 4324ea9..051c520 100644
--- a/src/test/java/org/apache/aurora/scheduler/SchedulerLifecycleTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/SchedulerLifecycleTest.java
@@ -21,9 +21,9 @@ import org.apache.aurora.common.application.ShutdownRegistry;
import org.apache.aurora.common.base.Command;
import org.apache.aurora.common.base.ExceptionalCommand;
import org.apache.aurora.common.testing.easymock.EasyMockTest;
+import org.apache.aurora.common.zookeeper.SingletonService.LeaderControl;
+import org.apache.aurora.common.zookeeper.SingletonService.LeadershipListener;
import org.apache.aurora.scheduler.SchedulerLifecycle.DelayedActions;
-import org.apache.aurora.scheduler.discovery.SingletonService.LeaderControl;
-import org.apache.aurora.scheduler.discovery.SingletonService.LeadershipListener;
import org.apache.aurora.scheduler.events.PubsubEvent.DriverRegistered;
import org.apache.aurora.scheduler.mesos.Driver;
import org.apache.aurora.scheduler.storage.Storage.StorageException;
http://git-wip-us.apache.org/repos/asf/aurora/blob/16e4651d/src/test/java/org/apache/aurora/scheduler/app/SchedulerIT.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/app/SchedulerIT.java b/src/test/java/org/apache/aurora/scheduler/app/SchedulerIT.java
index 84d7753..29a3b4a 100644
--- a/src/test/java/org/apache/aurora/scheduler/app/SchedulerIT.java
+++ b/src/test/java/org/apache/aurora/scheduler/app/SchedulerIT.java
@@ -17,7 +17,6 @@ import java.io.File;
import java.net.InetSocketAddress;
import java.util.Arrays;
import java.util.Set;
-import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -44,6 +43,10 @@ import org.apache.aurora.GuavaUtils;
import org.apache.aurora.codec.ThriftBinaryCodec.CodingException;
import org.apache.aurora.common.application.Lifecycle;
import org.apache.aurora.common.stats.Stats;
+import org.apache.aurora.common.zookeeper.Credentials;
+import org.apache.aurora.common.zookeeper.ServerSetImpl;
+import org.apache.aurora.common.zookeeper.ZooKeeperClient;
+import org.apache.aurora.common.zookeeper.testing.BaseZooKeeperClientTest;
import org.apache.aurora.gen.HostAttributes;
import org.apache.aurora.gen.MaintenanceMode;
import org.apache.aurora.gen.ScheduleStatus;
@@ -60,11 +63,8 @@ import org.apache.aurora.scheduler.AppStartup;
import org.apache.aurora.scheduler.TierModule;
import org.apache.aurora.scheduler.base.TaskTestUtil;
import org.apache.aurora.scheduler.configuration.executor.ExecutorSettings;
-import org.apache.aurora.scheduler.discovery.Credentials;
import org.apache.aurora.scheduler.discovery.ServiceDiscoveryModule;
-import org.apache.aurora.scheduler.discovery.ServiceGroupMonitor;
import org.apache.aurora.scheduler.discovery.ZooKeeperConfig;
-import org.apache.aurora.scheduler.discovery.testing.BaseZooKeeperTest;
import org.apache.aurora.scheduler.log.Log;
import org.apache.aurora.scheduler.log.Log.Entry;
import org.apache.aurora.scheduler.log.Log.Position;
@@ -107,9 +107,10 @@ import static org.easymock.EasyMock.createControl;
import static org.easymock.EasyMock.eq;
import static org.easymock.EasyMock.expect;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
-public class SchedulerIT extends BaseZooKeeperTest {
+public class SchedulerIT extends BaseZooKeeperClientTest {
private static final Logger LOG = LoggerFactory.getLogger(SchedulerIT.class);
@@ -144,6 +145,7 @@ public class SchedulerIT extends BaseZooKeeperTest {
private Stream logStream;
private StreamMatcher streamMatcher;
private EntrySerializer entrySerializer;
+ private ZooKeeperClient zkClient;
private File backupDir;
@Rule
public TemporaryFolder temporaryFolder = new TemporaryFolder();
@@ -171,9 +173,11 @@ public class SchedulerIT extends BaseZooKeeperTest {
entrySerializer = new EntrySerializer.EntrySerializerImpl(
LogStorageModule.MAX_LOG_ENTRY_SIZE.get(),
Hashing.md5());
+
+ zkClient = createZkClient();
}
- private Callable<Void> startScheduler() throws Exception {
+ private void startScheduler() throws Exception {
// TODO(wfarner): Try to accomplish all this by subclassing SchedulerMain and actually using
// AppLauncher.
Module testModule = new AbstractModule() {
@@ -198,8 +202,10 @@ public class SchedulerIT extends BaseZooKeeperTest {
};
ZooKeeperConfig zkClientConfig =
ZooKeeperConfig.create(
+ true, // useCurator
ImmutableList.of(InetSocketAddress.createUnresolved("localhost", getPort())))
.withCredentials(Credentials.digestCredentials("mesos", "mesos"));
+ SchedulerMain main = SchedulerMain.class.newInstance();
Injector injector = Guice.createInjector(
ImmutableList.<Module>builder()
.add(SchedulerMain.getUniversalModule())
@@ -209,8 +215,8 @@ public class SchedulerIT extends BaseZooKeeperTest {
.add(testModule)
.build()
);
- SchedulerMain main = new SchedulerMain();
injector.injectMembers(main);
+ Lifecycle lifecycle = injector.getInstance(Lifecycle.class);
executor.submit(() -> {
try {
@@ -220,32 +226,28 @@ public class SchedulerIT extends BaseZooKeeperTest {
executor.shutdownNow();
}
});
-
- Lifecycle lifecycle = injector.getInstance(Lifecycle.class);
addTearDown(() -> {
lifecycle.shutdown();
MoreExecutors.shutdownAndAwaitTermination(executor, 10, TimeUnit.SECONDS);
});
-
injector.getInstance(Key.get(GuavaUtils.ServiceManagerIface.class, AppStartup.class))
.awaitHealthy();
+ }
- ServiceGroupMonitor schedulerMonitor = injector.getInstance(ServiceGroupMonitor.class);
- CountDownLatch schedulerReady = new CountDownLatch(1);
+ private void awaitSchedulerReady() throws Exception {
executor.submit(() -> {
- while (schedulerMonitor.get().isEmpty()) {
- try {
- Thread.sleep(10);
- } catch (InterruptedException e) {
- throw new RuntimeException(e);
+ ServerSetImpl schedulerService = new ServerSetImpl(zkClient, SERVERSET_PATH);
+ final CountDownLatch schedulerReady = new CountDownLatch(1);
+ schedulerService.watch(hostSet -> {
+ if (!hostSet.isEmpty()) {
+ schedulerReady.countDown();
}
- }
- schedulerReady.countDown();
- });
- return () -> {
- schedulerReady.await();
+ });
+ // A timeout is used because certain types of assertion errors (mocks) will not surface
+ // until the main test thread exits this body of code.
+ assertTrue(schedulerReady.await(5L, TimeUnit.MINUTES));
return null;
- };
+ }).get();
}
private final AtomicInteger curPosition = new AtomicInteger();
@@ -330,14 +332,14 @@ public class SchedulerIT extends BaseZooKeeperTest {
expect(driver.stop(true)).andReturn(Status.DRIVER_STOPPED).anyTimes();
control.replay();
- Callable<Void> awaitSchedulerReady = startScheduler();
+ startScheduler();
driverStarted.await();
scheduler.getValue().registered(driver,
FrameworkID.newBuilder().setValue(FRAMEWORK_ID).build(),
MasterInfo.getDefaultInstance());
- awaitSchedulerReady.call();
+ awaitSchedulerReady();
assertEquals(0L, Stats.<Long>getVariable("task_store_PENDING").read().longValue());
assertEquals(1L, Stats.<Long>getVariable("task_store_ASSIGNED").read().longValue());
http://git-wip-us.apache.org/repos/asf/aurora/blob/16e4651d/src/test/java/org/apache/aurora/scheduler/discovery/AbstractDiscoveryModuleTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/discovery/AbstractDiscoveryModuleTest.java b/src/test/java/org/apache/aurora/scheduler/discovery/AbstractDiscoveryModuleTest.java
new file mode 100644
index 0000000..d90192b
--- /dev/null
+++ b/src/test/java/org/apache/aurora/scheduler/discovery/AbstractDiscoveryModuleTest.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.discovery;
+
+import java.net.InetSocketAddress;
+
+import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableList;
+import com.google.inject.AbstractModule;
+import com.google.inject.Binder;
+import com.google.inject.Guice;
+import com.google.inject.Injector;
+import com.google.inject.Module;
+
+import org.apache.aurora.common.quantity.Amount;
+import org.apache.aurora.common.quantity.Time;
+import org.apache.aurora.common.testing.TearDownTestCase;
+import org.apache.aurora.common.zookeeper.Credentials;
+import org.apache.aurora.common.zookeeper.SingletonService;
+import org.apache.aurora.common.zookeeper.ZooKeeperUtils;
+import org.apache.aurora.scheduler.app.ServiceGroupMonitor;
+import org.junit.Test;
+
+import static org.junit.Assert.assertNotNull;
+
+abstract class AbstractDiscoveryModuleTest extends TearDownTestCase {
+
+ @Test
+ public void testBindingContract() {
+ ZooKeeperConfig zooKeeperConfig =
+ new ZooKeeperConfig(
+ isCurator(),
+ ImmutableList.of(InetSocketAddress.createUnresolved("localhost", 42)),
+ Optional.of("/chroot"),
+ false, // inProcess
+ Amount.of(1, Time.DAYS),
+ Optional.of(Credentials.digestCredentials("test", "user")));
+
+ Injector injector =
+ Guice.createInjector(
+ new AbstractModule() {
+ @Override
+ protected void configure() {
+ bind(ServiceDiscoveryBindings.ZOO_KEEPER_CLUSTER_KEY)
+ .toInstance(
+ ImmutableList.of(InetSocketAddress.createUnresolved("localhost", 42)));
+ bind(ServiceDiscoveryBindings.ZOO_KEEPER_ACL_KEY)
+ .toInstance(ZooKeeperUtils.OPEN_ACL_UNSAFE);
+
+ bindExtraRequirements(binder());
+ }
+ },
+ createModule("/discovery/path", zooKeeperConfig));
+
+ assertNotNull(injector.getBinding(SingletonService.class).getProvider().get());
+ assertNotNull(injector.getBinding(ServiceGroupMonitor.class).getProvider().get());
+ }
+
+ void bindExtraRequirements(Binder binder) {
+ // Noop.
+ }
+
+ abstract Module createModule(String discoveryPath, ZooKeeperConfig zooKeeperConfig);
+
+ abstract boolean isCurator();
+}
http://git-wip-us.apache.org/repos/asf/aurora/blob/16e4651d/src/test/java/org/apache/aurora/scheduler/discovery/BaseCuratorDiscoveryTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/discovery/BaseCuratorDiscoveryTest.java b/src/test/java/org/apache/aurora/scheduler/discovery/BaseCuratorDiscoveryTest.java
index 9f86add..a2b4125 100644
--- a/src/test/java/org/apache/aurora/scheduler/discovery/BaseCuratorDiscoveryTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/discovery/BaseCuratorDiscoveryTest.java
@@ -21,10 +21,13 @@ import java.util.function.Predicate;
import com.google.common.collect.ImmutableMap;
+import org.apache.aurora.common.io.Codec;
import org.apache.aurora.common.thrift.Endpoint;
import org.apache.aurora.common.thrift.ServiceInstance;
import org.apache.aurora.common.thrift.Status;
-import org.apache.aurora.scheduler.discovery.testing.BaseZooKeeperTest;
+import org.apache.aurora.common.zookeeper.ServerSet;
+import org.apache.aurora.common.zookeeper.testing.BaseZooKeeperTest;
+import org.apache.aurora.scheduler.app.ServiceGroupMonitor;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
@@ -35,6 +38,7 @@ class BaseCuratorDiscoveryTest extends BaseZooKeeperTest {
static final String GROUP_PATH = "/group/root";
static final String MEMBER_TOKEN = "member_";
+ static final Codec<ServiceInstance> CODEC = ServerSet.JSON_CODEC;
static final int PRIMARY_PORT = 42;
private CuratorFramework client;
@@ -51,7 +55,7 @@ class BaseCuratorDiscoveryTest extends BaseZooKeeperTest {
groupCache.getListenable().addListener((c, event) -> groupEvents.put(event));
Predicate<String> memberSelector = name -> name.contains(MEMBER_TOKEN);
- groupMonitor = new CuratorServiceGroupMonitor(groupCache, memberSelector, JsonCodec.INSTANCE);
+ groupMonitor = new CuratorServiceGroupMonitor(groupCache, memberSelector, ServerSet.JSON_CODEC);
}
final CuratorFramework startNewClient() {
@@ -97,7 +101,7 @@ class BaseCuratorDiscoveryTest extends BaseZooKeeperTest {
final byte[] serialize(ServiceInstance serviceInstance) throws IOException {
ByteArrayOutputStream sink = new ByteArrayOutputStream();
- JsonCodec.INSTANCE.serialize(serviceInstance, sink);
+ CODEC.serialize(serviceInstance, sink);
return sink.toByteArray();
}
http://git-wip-us.apache.org/repos/asf/aurora/blob/16e4651d/src/test/java/org/apache/aurora/scheduler/discovery/CommonsDiscoveryModuleTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/discovery/CommonsDiscoveryModuleTest.java b/src/test/java/org/apache/aurora/scheduler/discovery/CommonsDiscoveryModuleTest.java
new file mode 100644
index 0000000..7a4c4dd
--- /dev/null
+++ b/src/test/java/org/apache/aurora/scheduler/discovery/CommonsDiscoveryModuleTest.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.discovery;
+
+import com.google.inject.Module;
+
+public class CommonsDiscoveryModuleTest extends AbstractDiscoveryModuleTest {
+
+ @Override
+ Module createModule(String discoveryPath, ZooKeeperConfig zooKeeperConfig) {
+ return new CommonsServiceDiscoveryModule(discoveryPath, zooKeeperConfig);
+ }
+
+ @Override
+ boolean isCurator() {
+ return false;
+ }
+}
http://git-wip-us.apache.org/repos/asf/aurora/blob/16e4651d/src/test/java/org/apache/aurora/scheduler/discovery/CommonsServiceGroupMonitorTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/discovery/CommonsServiceGroupMonitorTest.java b/src/test/java/org/apache/aurora/scheduler/discovery/CommonsServiceGroupMonitorTest.java
new file mode 100644
index 0000000..42a2224
--- /dev/null
+++ b/src/test/java/org/apache/aurora/scheduler/discovery/CommonsServiceGroupMonitorTest.java
@@ -0,0 +1,137 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.discovery;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+
+import org.apache.aurora.common.base.Command;
+import org.apache.aurora.common.net.pool.DynamicHostSet;
+import org.apache.aurora.common.net.pool.DynamicHostSet.HostChangeMonitor;
+import org.apache.aurora.common.testing.easymock.EasyMockTest;
+import org.apache.aurora.common.thrift.Endpoint;
+import org.apache.aurora.common.thrift.ServiceInstance;
+import org.apache.aurora.common.thrift.Status;
+import org.apache.aurora.scheduler.app.ServiceGroupMonitor;
+import org.easymock.Capture;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.easymock.EasyMock.capture;
+import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.expectLastCall;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+public class CommonsServiceGroupMonitorTest extends EasyMockTest {
+
+ private DynamicHostSet<ServiceInstance> serverSet;
+ private Capture<HostChangeMonitor<ServiceInstance>> hostChangeMonitorCapture;
+ private Command stopCommand;
+
+ @Before
+ public void setUp() throws Exception {
+ serverSet = createMock(new Clazz<DynamicHostSet<ServiceInstance>>() { });
+ hostChangeMonitorCapture = createCapture();
+ stopCommand = createMock(Command.class);
+ }
+
+ private void expectSuccessfulWatch() throws Exception {
+ expect(serverSet.watch(capture(hostChangeMonitorCapture))).andReturn(stopCommand);
+ }
+
+ private void expectFailedWatch() throws Exception {
+ DynamicHostSet.MonitorException watchError =
+ new DynamicHostSet.MonitorException(
+ "Problem watching service group",
+ new RuntimeException());
+ expect(serverSet.watch(capture(hostChangeMonitorCapture))).andThrow(watchError);
+ }
+
+ @Test
+ public void testNominalLifecycle() throws Exception {
+ expectSuccessfulWatch();
+
+ stopCommand.execute();
+ expectLastCall();
+
+ control.replay();
+
+ CommonsServiceGroupMonitor groupMonitor = new CommonsServiceGroupMonitor(serverSet);
+ groupMonitor.start();
+ groupMonitor.close();
+ }
+
+ @Test
+ public void testExceptionalLifecycle() throws Exception {
+ expectFailedWatch();
+ control.replay();
+
+ CommonsServiceGroupMonitor groupMonitor = new CommonsServiceGroupMonitor(serverSet);
+ try {
+ groupMonitor.start();
+ fail();
+ } catch (ServiceGroupMonitor.MonitorException e) {
+ // expected
+ }
+
+ // Close on a non-started monitor should be allowed.
+ groupMonitor.close();
+ }
+
+ @Test
+ public void testNoHosts() throws Exception {
+ expectSuccessfulWatch();
+ control.replay();
+
+ CommonsServiceGroupMonitor groupMonitor = new CommonsServiceGroupMonitor(serverSet);
+ assertEquals(ImmutableSet.of(), groupMonitor.get());
+
+ groupMonitor.start();
+ assertEquals(ImmutableSet.of(), groupMonitor.get());
+
+ hostChangeMonitorCapture.getValue().onChange(ImmutableSet.of());
+ assertEquals(ImmutableSet.of(), groupMonitor.get());
+ }
+
+ @Test
+ public void testHostUpdates() throws Exception {
+ expectSuccessfulWatch();
+ control.replay();
+
+ CommonsServiceGroupMonitor groupMonitor = new CommonsServiceGroupMonitor(serverSet);
+ groupMonitor.start();
+
+ ImmutableSet<ServiceInstance> twoHosts =
+ ImmutableSet.of(serviceInstance("one"), serviceInstance("two"));
+ hostChangeMonitorCapture.getValue().onChange(twoHosts);
+ assertEquals(twoHosts, groupMonitor.get());
+
+ ImmutableSet<ServiceInstance> oneHost = ImmutableSet.of(serviceInstance("one"));
+ hostChangeMonitorCapture.getValue().onChange(oneHost);
+ assertEquals(oneHost, groupMonitor.get());
+
+ ImmutableSet<ServiceInstance> anotherHost = ImmutableSet.of(serviceInstance("three"));
+ hostChangeMonitorCapture.getValue().onChange(anotherHost);
+ assertEquals(anotherHost, groupMonitor.get());
+
+ ImmutableSet<ServiceInstance> noHosts = ImmutableSet.of();
+ hostChangeMonitorCapture.getValue().onChange(noHosts);
+ assertEquals(noHosts, groupMonitor.get());
+ }
+
+ private ServiceInstance serviceInstance(String hostName) {
+ return new ServiceInstance(new Endpoint(hostName, 42), ImmutableMap.of(), Status.ALIVE);
+ }
+}
http://git-wip-us.apache.org/repos/asf/aurora/blob/16e4651d/src/test/java/org/apache/aurora/scheduler/discovery/CuratorDiscoveryModuleTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/discovery/CuratorDiscoveryModuleTest.java b/src/test/java/org/apache/aurora/scheduler/discovery/CuratorDiscoveryModuleTest.java
index 4ebda5e..f1a02e4 100644
--- a/src/test/java/org/apache/aurora/scheduler/discovery/CuratorDiscoveryModuleTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/discovery/CuratorDiscoveryModuleTest.java
@@ -13,27 +13,37 @@
*/
package org.apache.aurora.scheduler.discovery;
-import java.net.InetSocketAddress;
-import java.util.Optional;
-
import com.google.common.collect.ImmutableList;
-import com.google.inject.AbstractModule;
-import com.google.inject.Guice;
-import com.google.inject.Injector;
+import com.google.inject.Binder;
+import com.google.inject.Module;
import org.apache.aurora.common.application.ShutdownRegistry;
import org.apache.aurora.common.application.ShutdownRegistry.ShutdownRegistryImpl;
-import org.apache.aurora.common.quantity.Amount;
-import org.apache.aurora.common.quantity.Time;
-import org.apache.aurora.common.testing.TearDownTestCase;
+import org.apache.aurora.common.zookeeper.ZooKeeperUtils;
import org.apache.curator.framework.api.ACLProvider;
import org.apache.zookeeper.data.ACL;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-public class CuratorDiscoveryModuleTest extends TearDownTestCase {
+public class CuratorDiscoveryModuleTest extends AbstractDiscoveryModuleTest {
+
+ @Override
+ void bindExtraRequirements(Binder binder) {
+ ShutdownRegistryImpl shutdownRegistry = new ShutdownRegistryImpl();
+ binder.bind(ShutdownRegistry.class).toInstance(shutdownRegistry);
+ addTearDown(shutdownRegistry::execute);
+ }
+
+ @Override
+ Module createModule(String discoveryPath, ZooKeeperConfig zooKeeperConfig) {
+ return new CuratorServiceDiscoveryModule(discoveryPath, zooKeeperConfig);
+ }
+
+ @Override
+ boolean isCurator() {
+ return false;
+ }
@Test
public void testSingleACLProvider() {
@@ -54,36 +64,4 @@ public class CuratorDiscoveryModuleTest extends TearDownTestCase {
public void testSingleACLProviderEmpty() {
new CuratorServiceDiscoveryModule.SingleACLProvider(ImmutableList.of());
}
-
- @Test
- public void testBindingContract() {
- ZooKeeperConfig zooKeeperConfig =
- new ZooKeeperConfig(
- ImmutableList.of(InetSocketAddress.createUnresolved("localhost", 42)),
- Optional.of("/chroot"),
- false, // inProcess
- Amount.of(1, Time.DAYS),
- Optional.of(Credentials.digestCredentials("test", "user")));
-
- Injector injector =
- Guice.createInjector(
- new AbstractModule() {
- @Override
- protected void configure() {
- bind(ServiceDiscoveryBindings.ZOO_KEEPER_CLUSTER_KEY)
- .toInstance(
- ImmutableList.of(InetSocketAddress.createUnresolved("localhost", 42)));
- bind(ServiceDiscoveryBindings.ZOO_KEEPER_ACL_KEY)
- .toInstance(ZooKeeperUtils.OPEN_ACL_UNSAFE);
-
- ShutdownRegistryImpl shutdownRegistry = new ShutdownRegistryImpl();
- binder().bind(ShutdownRegistry.class).toInstance(shutdownRegistry);
- addTearDown(shutdownRegistry::execute);
- }
- },
- new CuratorServiceDiscoveryModule("/discovery/path", zooKeeperConfig));
-
- assertNotNull(injector.getBinding(SingletonService.class).getProvider().get());
- assertNotNull(injector.getBinding(ServiceGroupMonitor.class).getProvider().get());
- }
}
http://git-wip-us.apache.org/repos/asf/aurora/blob/16e4651d/src/test/java/org/apache/aurora/scheduler/discovery/CuratorSingletonServiceTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/discovery/CuratorSingletonServiceTest.java b/src/test/java/org/apache/aurora/scheduler/discovery/CuratorSingletonServiceTest.java
index bb3d080..6ea49b0 100644
--- a/src/test/java/org/apache/aurora/scheduler/discovery/CuratorSingletonServiceTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/discovery/CuratorSingletonServiceTest.java
@@ -19,6 +19,7 @@ import java.util.concurrent.CountDownLatch;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
+import org.apache.aurora.common.zookeeper.SingletonService;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.easymock.Capture;
@@ -56,7 +57,7 @@ public class CuratorSingletonServiceTest extends BaseCuratorDiscoveryTest {
throws Exception {
CuratorSingletonService singletonService =
- new CuratorSingletonService(client, GROUP_PATH, MEMBER_TOKEN, JsonCodec.INSTANCE);
+ new CuratorSingletonService(client, GROUP_PATH, MEMBER_TOKEN, CODEC);
InetSocketAddress leaderEndpoint = InetSocketAddress.createUnresolved(hostName, PRIMARY_PORT);
singletonService.lead(leaderEndpoint, ImmutableMap.of(), listener);
}
[4/4] aurora git commit: Revert removal of twitter/commons/zk based
leadership code
Posted by dm...@apache.org.
Revert removal of twitter/commons/zk based leadership code
See discussion here: https://issues.apache.org/jira/browse/AURORA-1840
Reviewed at https://reviews.apache.org/r/54250/
Project: http://git-wip-us.apache.org/repos/asf/aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/aurora/commit/16e4651d
Tree: http://git-wip-us.apache.org/repos/asf/aurora/tree/16e4651d
Diff: http://git-wip-us.apache.org/repos/asf/aurora/diff/16e4651d
Branch: refs/heads/master
Commit: 16e4651d5ff038dad0e9977edea7c57aeb37fe12
Parents: 8bcad84
Author: David McLaughlin <da...@dmclaughlin.com>
Authored: Thu Dec 1 09:01:33 2016 -0800
Committer: David McLaughlin <dm...@twitter.com>
Committed: Thu Dec 1 09:01:33 2016 -0800
----------------------------------------------------------------------
RELEASE-NOTES.md | 1 +
build.gradle | 16 +-
.../aurora/common/zookeeper/Candidate.java | 78 +++
.../aurora/common/zookeeper/CandidateImpl.java | 127 ++++
.../aurora/common/zookeeper/Credentials.java | 90 +++
.../apache/aurora/common/zookeeper/Group.java | 674 +++++++++++++++++++
.../aurora/common/zookeeper/JsonCodec.java | 139 ++++
.../aurora/common/zookeeper/ServerSet.java | 74 ++
.../aurora/common/zookeeper/ServerSetImpl.java | 349 ++++++++++
.../aurora/common/zookeeper/ServerSets.java | 118 ++++
.../common/zookeeper/SingletonService.java | 114 ++++
.../common/zookeeper/SingletonServiceImpl.java | 122 ++++
.../common/zookeeper/ZooKeeperClient.java | 372 ++++++++++
.../aurora/common/zookeeper/ZooKeeperUtils.java | 167 +++++
.../testing/BaseZooKeeperClientTest.java | 140 ++++
.../zookeeper/testing/BaseZooKeeperTest.java | 46 ++
.../zookeeper/testing/ZooKeeperTestServer.java | 121 ++++
.../common/zookeeper/CandidateImplTest.java | 165 +++++
.../aurora/common/zookeeper/GroupTest.java | 321 +++++++++
.../aurora/common/zookeeper/JsonCodecTest.java | 151 +++++
.../common/zookeeper/ServerSetImplTest.java | 258 +++++++
.../aurora/common/zookeeper/ServerSetsTest.java | 44 ++
.../zookeeper/SingletonServiceImplTest.java | 243 +++++++
.../common/zookeeper/ZooKeeperClientTest.java | 210 ++++++
.../common/zookeeper/ZooKeeperUtilsTest.java | 139 ++++
config/findbugs/excludeFilter.xml | 8 -
docs/features/service-discovery.md | 2 +-
docs/reference/scheduler-configuration.md | 2 +
.../aurora/scheduler/SchedulerLifecycle.java | 6 +-
.../aurora/scheduler/app/SchedulerMain.java | 4 +-
.../scheduler/app/ServiceGroupMonitor.java | 46 ++
.../CommonsServiceDiscoveryModule.java | 102 +++
.../discovery/CommonsServiceGroupMonitor.java | 59 ++
.../aurora/scheduler/discovery/Credentials.java | 98 ---
.../CuratorServiceDiscoveryModule.java | 8 +-
.../discovery/CuratorServiceGroupMonitor.java | 1 +
.../discovery/CuratorSingletonService.java | 1 +
.../discovery/FlaggedZooKeeperConfig.java | 21 +-
.../aurora/scheduler/discovery/JsonCodec.java | 147 ----
.../discovery/ServiceDiscoveryModule.java | 20 +-
.../discovery/ServiceGroupMonitor.java | 46 --
.../scheduler/discovery/SingletonService.java | 114 ----
.../scheduler/discovery/ZooKeeperConfig.java | 21 +-
.../scheduler/discovery/ZooKeeperUtils.java | 51 --
.../discovery/testing/BaseZooKeeperTest.java | 53 --
.../discovery/testing/ZooKeeperTestServer.java | 101 ---
.../scheduler/http/JettyServerModule.java | 2 +-
.../aurora/scheduler/http/LeaderRedirect.java | 4 +-
.../log/mesos/MesosLogStreamModule.java | 4 +-
.../scheduler/SchedulerLifecycleTest.java | 4 +-
.../aurora/scheduler/app/SchedulerIT.java | 52 +-
.../discovery/AbstractDiscoveryModuleTest.java | 77 +++
.../discovery/BaseCuratorDiscoveryTest.java | 10 +-
.../discovery/CommonsDiscoveryModuleTest.java | 29 +
.../CommonsServiceGroupMonitorTest.java | 137 ++++
.../discovery/CuratorDiscoveryModuleTest.java | 64 +-
.../discovery/CuratorSingletonServiceTest.java | 3 +-
.../scheduler/discovery/JsonCodecTest.java | 159 -----
.../discovery/ZooKeeperConfigTest.java | 17 +-
.../scheduler/http/AbstractJettyTest.java | 15 +-
.../scheduler/http/LeaderRedirectTest.java | 4 +-
.../aurora/scheduler/thrift/ThriftIT.java | 2 +-
62 files changed, 4871 insertions(+), 902 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/aurora/blob/16e4651d/RELEASE-NOTES.md
----------------------------------------------------------------------
diff --git a/RELEASE-NOTES.md b/RELEASE-NOTES.md
index 96926f4..7a3d331 100644
--- a/RELEASE-NOTES.md
+++ b/RELEASE-NOTES.md
@@ -26,6 +26,7 @@
- The scheduler flag `-zk_use_curator` has been removed. If you have never set the flag and are
upgrading you should take care as described in the [note](#zk_use_curator_upgrade) below.
+=======
0.16.0
======
http://git-wip-us.apache.org/repos/asf/aurora/blob/16e4651d/build.gradle
----------------------------------------------------------------------
diff --git a/build.gradle b/build.gradle
index f257440..2f23b85 100644
--- a/build.gradle
+++ b/build.gradle
@@ -164,7 +164,6 @@ project(':commons') {
dependencies {
compile project(':commons-args')
- compile "ch.qos.logback:logback-classic:${logbackRev}"
compile "com.google.code.findbugs:jsr305:${jsrRev}"
compile "com.google.code.gson:gson:${gsonRev}"
compile "com.google.guava:guava:${guavaRev}"
@@ -175,13 +174,17 @@ project(':commons') {
compile "javax.servlet:javax.servlet-api:${servletRev}"
compile "joda-time:joda-time:2.9.1"
compile "org.antlr:stringtemplate:${stringTemplateRev}"
+ compile "org.apache.zookeeper:zookeeper:${zookeeperRev}"
compile "org.easymock:easymock:3.4"
// There are a few testing support libs in the src/main/java trees that use junit - currently:
+ // src/main/java/org/apache/aurora/common/zookeeper/testing
// src/main/java/org/apache/aurora/common/testing
compile "junit:junit:${junitRev}"
testCompile "junit:junit:${junitRev}"
+ testCompile "org.powermock:powermock-module-junit4:1.6.4"
+ testCompile "org.powermock:powermock-api-easymock:1.6.4"
}
}
@@ -347,11 +350,9 @@ dependencies {
compile project(':commons')
compile project(':commons-args')
-
compile 'aopalliance:aopalliance:1.0'
- compile "ch.qos.logback:logback-classic:${logbackRev}"
+ compile 'ch.qos.logback:logback-classic:1.1.3'
compile "com.google.code.findbugs:jsr305:${jsrRev}"
- compile "com.google.code.gson:gson:${gsonRev}"
compile "com.google.inject:guice:${guiceRev}"
compile "com.google.inject.extensions:guice-assistedinject:${guiceRev}"
compile "com.google.protobuf:protobuf-java:${protobufRev}"
@@ -385,15 +386,8 @@ dependencies {
compile 'org.quartz-scheduler:quartz:2.2.2'
compile "uno.perk:forward:1.0.0"
- // There are a few testing support libs in the src/main/java trees that use junit - currently:
- // src/main/java/org/apache/aurora/common/zookeeper/testing
- compile "junit:junit:${junitRev}"
-
testCompile "com.sun.jersey:jersey-client:${jerseyRev}"
testCompile "junit:junit:${junitRev}"
- testCompile "org.powermock:powermock-module-junit4:1.6.4"
- testCompile "org.powermock:powermock-api-easymock:1.6.4"
-
}
// For normal developer builds, avoid running the often-time-consuming code quality checks.
http://git-wip-us.apache.org/repos/asf/aurora/blob/16e4651d/commons/src/main/java/org/apache/aurora/common/zookeeper/Candidate.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/org/apache/aurora/common/zookeeper/Candidate.java b/commons/src/main/java/org/apache/aurora/common/zookeeper/Candidate.java
new file mode 100644
index 0000000..75c1b14
--- /dev/null
+++ b/commons/src/main/java/org/apache/aurora/common/zookeeper/Candidate.java
@@ -0,0 +1,78 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.common.zookeeper;
+
+import com.google.common.base.Optional;
+import com.google.common.base.Supplier;
+
+import org.apache.aurora.common.base.ExceptionalCommand;
+import org.apache.aurora.common.zookeeper.Group.JoinException;
+import org.apache.aurora.common.zookeeper.Group.WatchException;
+import org.apache.aurora.common.zookeeper.ZooKeeperClient.ZooKeeperConnectionException;
+import org.apache.zookeeper.KeeperException;
+
+/**
+ * Interface definition for becoming or querying for a ZooKeeper-based group leader.
+ */
+public interface Candidate {
+
+ /**
+ * Returns the current group leader by querying ZooKeeper synchronously.
+ *
+ * @return the current group leader's identifying data or {@link Optional#absent()} if there is
+ * no leader
+ * @throws ZooKeeperConnectionException if there was a problem connecting to ZooKeeper
+ * @throws KeeperException if there was a problem reading the leader information
+ * @throws InterruptedException if this thread is interrupted getting the leader
+ */
+ public Optional<byte[]> getLeaderData()
+ throws ZooKeeperConnectionException, KeeperException, InterruptedException;
+
+ /**
+ * Encapsulates a leader that can be elected and subsequently defeated.
+ */
+ interface Leader {
+
+ /**
+ * Called when this leader has been elected.
+ *
+ * @param abdicate a command that can be used to abdicate leadership and force a new election
+ */
+ void onElected(ExceptionalCommand<JoinException> abdicate);
+
+ /**
+ * Called when the leader has been ousted. Can occur either if the leader abdicates or if an
+ * external event causes the leader to lose its leadership role (session expiration).
+ */
+ void onDefeated();
+ }
+
+ /**
+ * Offers this candidate in leadership elections for as long as the current jvm process is alive.
+ * Upon election, the {@code onElected} callback will be executed and a command that can be used
+ * to abdicate leadership will be passed in. If the elected leader jvm process dies or the
+ * elected leader successfully abdicates then a new leader will be elected. Leaders that
+ * successfully abdicate are removed from the group and will not be eligible for leadership
+ * election unless {@link #offerLeadership(Leader)} is called again.
+ *
+ * @param leader the leader to notify of election and defeat events
+ * @throws JoinException if there was a problem joining the group
+ * @throws WatchException if there is a problem generating the 1st group membership list
+ * @throws InterruptedException if interrupted waiting to join the group and determine initial
+ * election results
+ * @return a supplier that can be queried to find out if this leader is currently elected
+ */
+ public Supplier<Boolean> offerLeadership(Leader leader)
+ throws JoinException, WatchException, InterruptedException;
+}
http://git-wip-us.apache.org/repos/asf/aurora/blob/16e4651d/commons/src/main/java/org/apache/aurora/common/zookeeper/CandidateImpl.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/org/apache/aurora/common/zookeeper/CandidateImpl.java b/commons/src/main/java/org/apache/aurora/common/zookeeper/CandidateImpl.java
new file mode 100644
index 0000000..98b5ee4
--- /dev/null
+++ b/commons/src/main/java/org/apache/aurora/common/zookeeper/CandidateImpl.java
@@ -0,0 +1,127 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.common.zookeeper;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import javax.annotation.Nullable;
+
+import com.google.common.base.Charsets;
+import com.google.common.base.Function;
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Supplier;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Ordering;
+
+import org.apache.aurora.common.zookeeper.Group.JoinException;
+import org.apache.aurora.common.zookeeper.Group.Membership;
+import org.apache.aurora.common.zookeeper.Group.WatchException;
+import org.apache.aurora.common.zookeeper.ZooKeeperClient.ZooKeeperConnectionException;
+import org.apache.zookeeper.KeeperException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Implements leader election for small groups of candidates. This implementation is subject to the
+ * <a href="http://hadoop.apache.org/zookeeper/docs/r3.2.1/recipes.html#sc_leaderElection">
+ * herd effect</a> for a given group and should only be used for small (~10 member) candidate pools.
+ */
+public class CandidateImpl implements Candidate {
+ private static final Logger LOG = LoggerFactory.getLogger(CandidateImpl.class);
+
+ private static final byte[] UNKNOWN_CANDIDATE_DATA = "<unknown>".getBytes(Charsets.UTF_8);
+
+ private static final Supplier<byte[]> IP_ADDRESS_DATA_SUPPLIER = () -> {
+ try {
+ return InetAddress.getLocalHost().getHostAddress().getBytes();
+ } catch (UnknownHostException e) {
+ LOG.warn("Failed to determine local address!", e);
+ return UNKNOWN_CANDIDATE_DATA;
+ }
+ };
+
+ private static final Function<Iterable<String>, String> MOST_RECENT_JUDGE =
+ candidates -> Ordering.natural().min(candidates);
+
+ private final Group group;
+
+ /**
+ * Creates a candidate that can be used to offer leadership for the given {@code group}.
+ */
+ public CandidateImpl(Group group) {
+ this.group = Preconditions.checkNotNull(group);
+ }
+
+ @Override
+ public Optional<byte[]> getLeaderData()
+ throws ZooKeeperConnectionException, KeeperException, InterruptedException {
+
+ String leaderId = getLeader(group.getMemberIds());
+ return leaderId == null
+ ? Optional.<byte[]>absent()
+ : Optional.of(group.getMemberData(leaderId));
+ }
+
+ @Override
+ public Supplier<Boolean> offerLeadership(final Leader leader)
+ throws JoinException, WatchException, InterruptedException {
+
+ final Membership membership = group.join(IP_ADDRESS_DATA_SUPPLIER, leader::onDefeated);
+
+ final AtomicBoolean elected = new AtomicBoolean(false);
+ final AtomicBoolean abdicated = new AtomicBoolean(false);
+ group.watch(memberIds -> {
+ boolean noCandidates = Iterables.isEmpty(memberIds);
+ String memberId = membership.getMemberId();
+
+ if (noCandidates) {
+ LOG.warn("All candidates have temporarily left the group: " + group);
+ } else if (!Iterables.contains(memberIds, memberId)) {
+ LOG.error(
+ "Current member ID {} is not a candidate for leader, current voting: {}",
+ memberId, memberIds);
+ } else {
+ boolean electedLeader = memberId.equals(getLeader(memberIds));
+ boolean previouslyElected = elected.getAndSet(electedLeader);
+
+ if (!previouslyElected && electedLeader) {
+ LOG.info("Candidate {} is now leader of group: {}",
+ membership.getMemberPath(), memberIds);
+
+ leader.onElected(() -> {
+ membership.cancel();
+ abdicated.set(true);
+ });
+ } else if (!electedLeader) {
+ if (previouslyElected) {
+ leader.onDefeated();
+ }
+ LOG.info(
+ "Candidate {} waiting for the next leader election, current voting: {}",
+ membership.getMemberPath(), memberIds);
+ }
+ }
+ });
+
+ return () -> !abdicated.get() && elected.get();
+ }
+
+ @Nullable
+ private String getLeader(Iterable<String> memberIds) {
+ return Iterables.isEmpty(memberIds) ? null : MOST_RECENT_JUDGE.apply(memberIds);
+ }
+}
http://git-wip-us.apache.org/repos/asf/aurora/blob/16e4651d/commons/src/main/java/org/apache/aurora/common/zookeeper/Credentials.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/org/apache/aurora/common/zookeeper/Credentials.java b/commons/src/main/java/org/apache/aurora/common/zookeeper/Credentials.java
new file mode 100644
index 0000000..18319a3
--- /dev/null
+++ b/commons/src/main/java/org/apache/aurora/common/zookeeper/Credentials.java
@@ -0,0 +1,90 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.common.zookeeper;
+
+import com.google.common.base.Objects;
+import com.google.common.base.Preconditions;
+
+import org.apache.aurora.common.base.MorePreconditions;
+import org.apache.commons.lang.builder.EqualsBuilder;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Encapsulates a user's ZooKeeper credentials.
+ */
+public final class Credentials {
+
+ /**
+ * Creates a set of credentials for the ZooKeeper digest authentication mechanism.
+ *
+ * @param username the username to authenticate with
+ * @param password the password to authenticate with
+ * @return a set of credentials that can be used to authenticate the zoo keeper client
+ */
+ public static Credentials digestCredentials(String username, String password) {
+ MorePreconditions.checkNotBlank(username);
+ Preconditions.checkNotNull(password);
+
+ // TODO(John Sirois): DigestAuthenticationProvider is broken - uses platform default charset
+ // (on server) and so we just have to hope here that clients are deployed in compatible jvms.
+ // Consider writing and installing a version of DigestAuthenticationProvider that controls its
+ // Charset explicitly.
+ return new Credentials("digest", (username + ":" + password).getBytes());
+ }
+
+ private final String scheme;
+ private final byte[] authToken;
+
+ public Credentials(String scheme, byte[] authToken) {
+ this.scheme = MorePreconditions.checkNotBlank(scheme);
+ this.authToken = requireNonNull(authToken);
+ }
+
+ /**
+ * Returns the authentication scheme these credentials are for.
+ *
+ * @return the scheme these credentials are for.
+ */
+ public String scheme() {
+ return scheme;
+ }
+
+ /**
+ * Returns the authentication token.
+ *
+ * @return the authentication token.
+ */
+ public byte[] authToken() {
+ return authToken;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (!(o instanceof Credentials)) {
+ return false;
+ }
+
+ Credentials other = (Credentials) o;
+ return new EqualsBuilder()
+ .append(scheme, other.scheme())
+ .append(authToken, other.authToken())
+ .isEquals();
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hashCode(scheme, authToken);
+ }
+}
http://git-wip-us.apache.org/repos/asf/aurora/blob/16e4651d/commons/src/main/java/org/apache/aurora/common/zookeeper/Group.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/org/apache/aurora/common/zookeeper/Group.java b/commons/src/main/java/org/apache/aurora/common/zookeeper/Group.java
new file mode 100644
index 0000000..2720dd1
--- /dev/null
+++ b/commons/src/main/java/org/apache/aurora/common/zookeeper/Group.java
@@ -0,0 +1,674 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.common.zookeeper;
+
+import java.util.List;
+import java.util.Set;
+import java.util.regex.Pattern;
+
+import javax.annotation.Nullable;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Predicate;
+import com.google.common.base.Supplier;
+import com.google.common.base.Suppliers;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterables;
+
+import org.apache.aurora.common.base.Command;
+import org.apache.aurora.common.base.Commands;
+import org.apache.aurora.common.base.ExceptionalSupplier;
+import org.apache.aurora.common.base.MorePreconditions;
+import org.apache.aurora.common.util.BackoffHelper;
+import org.apache.aurora.common.zookeeper.ZooKeeperClient.ZooKeeperConnectionException;
+import org.apache.commons.lang.ArrayUtils;
+import org.apache.commons.lang.StringUtils;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.KeeperException.NoNodeException;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.Watcher.Event.EventType;
+import org.apache.zookeeper.data.ACL;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class exposes methods for joining and monitoring distributed groups. The groups this class
+ * monitors are realized as persistent paths in ZooKeeper with ephemeral child nodes for
+ * each member of a group.
+ */
+public class Group {
+ private static final Logger LOG = LoggerFactory.getLogger(Group.class);
+
+ private static final Supplier<byte[]> NO_MEMBER_DATA = Suppliers.ofInstance(null);
+ private static final String DEFAULT_NODE_NAME_PREFIX = "member_";
+
+ private final ZooKeeperClient zkClient;
+ private final ImmutableList<ACL> acl;
+ private final String path;
+
+ private final NodeScheme nodeScheme;
+ private final Predicate<String> nodeNameFilter;
+
+ private final BackoffHelper backoffHelper;
+
+ /**
+ * Creates a group rooted at the given {@code path}. Paths must be absolute and trailing or
+ * duplicate slashes will be normalized. For example, all the following paths would create a
+ * group at the normalized path /my/distributed/group:
+ * <ul>
+ * <li>/my/distributed/group
+ * <li>/my/distributed/group/
+ * <li>/my/distributed//group
+ * </ul>
+ *
+ * @param zkClient the client to use for interactions with ZooKeeper
+ * @param acl the ACL to use for creating the persistent group path if it does not already exist
+ * @param path the absolute persistent path that represents this group
+ * @param nodeScheme the scheme that defines how nodes are created
+ */
+ public Group(ZooKeeperClient zkClient, Iterable<ACL> acl, String path, NodeScheme nodeScheme) {
+ this.zkClient = Preconditions.checkNotNull(zkClient);
+ this.acl = ImmutableList.copyOf(acl);
+ this.path = ZooKeeperUtils.normalizePath(Preconditions.checkNotNull(path));
+
+ this.nodeScheme = Preconditions.checkNotNull(nodeScheme);
+ nodeNameFilter = Group.this.nodeScheme::isMember;
+
+ backoffHelper = new BackoffHelper();
+ }
+
+ /**
+ * Equivalent to {@link #Group(ZooKeeperClient, Iterable, String, String)} with a
+ * {@code namePrefix} of 'member_'.
+ */
+ public Group(ZooKeeperClient zkClient, Iterable<ACL> acl, String path) {
+ this(zkClient, acl, path, DEFAULT_NODE_NAME_PREFIX);
+ }
+
+ /**
+ * Equivalent to {@link #Group(ZooKeeperClient, Iterable, String, NodeScheme)} with a
+ * {@link DefaultScheme} using {@code namePrefix}.
+ */
+ public Group(ZooKeeperClient zkClient, Iterable<ACL> acl, String path, String namePrefix) {
+ this(zkClient, acl, path, new DefaultScheme(namePrefix));
+ }
+
+ public String getMemberPath(String memberId) {
+ return path + "/" + MorePreconditions.checkNotBlank(memberId);
+ }
+
+ public String getPath() {
+ return path;
+ }
+
+ public String getMemberId(String nodePath) {
+ MorePreconditions.checkNotBlank(nodePath);
+ Preconditions.checkArgument(nodePath.startsWith(path + "/"),
+ "Not a member of this group[%s]: %s", path, nodePath);
+
+ String memberId = StringUtils.substringAfterLast(nodePath, "/");
+ Preconditions.checkArgument(nodeScheme.isMember(memberId),
+ "Not a group member: %s", memberId);
+ return memberId;
+ }
+
+ /**
+ * Returns the current list of group member ids by querying ZooKeeper synchronously.
+ *
+ * @return the ids of all the present members of this group
+ * @throws ZooKeeperConnectionException if there was a problem connecting to ZooKeeper
+ * @throws KeeperException if there was a problem reading this group's member ids
+ * @throws InterruptedException if this thread is interrupted listing the group members
+ */
+ public Iterable<String> getMemberIds()
+ throws ZooKeeperConnectionException, KeeperException, InterruptedException {
+ return Iterables.filter(zkClient.get().getChildren(path, false), nodeNameFilter);
+ }
+
+ /**
+ * Gets the data for one of this groups members by querying ZooKeeper synchronously.
+ *
+ * @param memberId the id of the member whose data to retrieve
+ * @return the data associated with the {@code memberId}
+ * @throws ZooKeeperConnectionException if there was a problem connecting to ZooKeeper
+ * @throws KeeperException if there was a problem reading this member's data
+ * @throws InterruptedException if this thread is interrupted retrieving the member data
+ */
+ public byte[] getMemberData(String memberId)
+ throws ZooKeeperConnectionException, KeeperException, InterruptedException {
+ return zkClient.get().getData(getMemberPath(memberId), false, null);
+ }
+
+ /**
+ * Represents membership in a distributed group.
+ */
+ public interface Membership {
+
+ /**
+ * Returns the persistent ZooKeeper path that represents this group.
+ */
+ String getGroupPath();
+
+ /**
+ * Returns the id (ZooKeeper node name) of this group member. May change over time if the
+ * ZooKeeper session expires.
+ */
+ String getMemberId();
+
+ /**
+ * Returns the full ZooKeeper path to this group member. May change over time if the
+ * ZooKeeper session expires.
+ */
+ String getMemberPath();
+
+ /**
+ * Updates the membership data synchronously using the {@code Supplier<byte[]>} passed to
+ * {@link Group#join()}.
+ *
+ * @return the new membership data
+ * @throws UpdateException if there was a problem updating the membership data
+ */
+ byte[] updateMemberData() throws UpdateException;
+
+ /**
+ * Cancels group membership by deleting the associated ZooKeeper member node.
+ *
+ * @throws JoinException if there is a problem deleting the node
+ */
+ void cancel() throws JoinException;
+ }
+
+ /**
+ * Indicates an error joining a group.
+ */
+ public static class JoinException extends Exception {
+ public JoinException(String message, Throwable cause) {
+ super(message, cause);
+ }
+ }
+
+ /**
+ * Indicates an error updating a group member's data.
+ */
+ public static class UpdateException extends Exception {
+ public UpdateException(String message, Throwable cause) {
+ super(message, cause);
+ }
+ }
+
+ /**
+ * Equivalent to calling {@code join(null, null)}.
+ */
+ public final Membership join() throws JoinException, InterruptedException {
+ return join(NO_MEMBER_DATA, null);
+ }
+
+ /**
+ * Equivalent to calling {@code join(memberData, null)}.
+ */
+ public final Membership join(Supplier<byte[]> memberData)
+ throws JoinException, InterruptedException {
+
+ return join(memberData, null);
+ }
+
+ /**
+ * Equivalent to calling {@code join(null, onLoseMembership)}.
+ */
+ public final Membership join(@Nullable final Command onLoseMembership)
+ throws JoinException, InterruptedException {
+
+ return join(NO_MEMBER_DATA, onLoseMembership);
+ }
+
+ /**
+ * Joins this group and returns the resulting Membership when successful. Membership will be
+ * automatically cancelled when the current jvm process dies; however the returned Membership
+ * object can be used to cancel membership earlier. Unless
+ * {@link Group.Membership#cancel()} is called the membership will
+ * be maintained by re-establishing it silently in the background.
+ *
+ * <p>Any {@code memberData} given is persisted in the member node in ZooKeeper. If an
+ * {@code onLoseMembership} callback is supplied, it will be notified each time this member loses
+ * membership in the group.
+ *
+ * @param memberData a supplier of the data to store in the member node
+ * @param onLoseMembership a callback to notify when membership is lost
+ * @return a Membership object with the member details
+ * @throws JoinException if there was a problem joining the group
+ * @throws InterruptedException if this thread is interrupted awaiting completion of the join
+ */
+ public final Membership join(Supplier<byte[]> memberData, @Nullable Command onLoseMembership)
+ throws JoinException, InterruptedException {
+
+ Preconditions.checkNotNull(memberData);
+ ensurePersistentGroupPath();
+
+ final ActiveMembership groupJoiner = new ActiveMembership(memberData, onLoseMembership);
+ return backoffHelper.doUntilResult(() -> {
+ try {
+ return groupJoiner.join();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new JoinException("Interrupted trying to join group at path: " + path, e);
+ } catch (ZooKeeperConnectionException e) {
+ LOG.warn("Temporary error trying to join group at path: " + path, e);
+ return null;
+ } catch (KeeperException e) {
+ if (zkClient.shouldRetry(e)) {
+ LOG.warn("Temporary error trying to join group at path: " + path, e);
+ return null;
+ } else {
+ throw new JoinException("Problem joining partition group at path: " + path, e);
+ }
+ }
+ });
+ }
+
+ private void ensurePersistentGroupPath() throws JoinException, InterruptedException {
+ backoffHelper.doUntilSuccess(() -> {
+ try {
+ ZooKeeperUtils.ensurePath(zkClient, acl, path);
+ return true;
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new JoinException("Interrupted trying to ensure group at path: " + path, e);
+ } catch (ZooKeeperConnectionException e) {
+ LOG.warn("Problem connecting to ZooKeeper, retrying", e);
+ return false;
+ } catch (KeeperException e) {
+ if (zkClient.shouldRetry(e)) {
+ LOG.warn("Temporary error ensuring path: " + path, e);
+ return false;
+ } else {
+ throw new JoinException("Problem ensuring group at path: " + path, e);
+ }
+ }
+ });
+ }
+
+ private class ActiveMembership implements Membership {
+ private final Supplier<byte[]> memberData;
+ private final Command onLoseMembership;
+ private String nodePath;
+ private String memberId;
+ private volatile boolean cancelled;
+ private byte[] membershipData;
+
+ public ActiveMembership(Supplier<byte[]> memberData, @Nullable Command onLoseMembership) {
+ this.memberData = memberData;
+ this.onLoseMembership = (onLoseMembership == null) ? Commands.NOOP : onLoseMembership;
+ }
+
+ @Override
+ public String getGroupPath() {
+ return path;
+ }
+
+ @Override
+ public synchronized String getMemberId() {
+ return memberId;
+ }
+
+ @Override
+ public synchronized String getMemberPath() {
+ return nodePath;
+ }
+
+ @Override
+ public synchronized byte[] updateMemberData() throws UpdateException {
+ byte[] membershipData = memberData.get();
+ if (!ArrayUtils.isEquals(this.membershipData, membershipData)) {
+ try {
+ zkClient.get().setData(nodePath, membershipData, ZooKeeperUtils.ANY_VERSION);
+ this.membershipData = membershipData;
+ } catch (KeeperException e) {
+ throw new UpdateException("Problem updating membership data.", e);
+ } catch (InterruptedException e) {
+ throw new UpdateException("Interrupted attempting to update membership data.", e);
+ } catch (ZooKeeperConnectionException e) {
+ throw new UpdateException(
+ "Could not connect to the ZooKeeper cluster to update membership data.", e);
+ }
+ }
+ return membershipData;
+ }
+
+ @Override
+ public synchronized void cancel() throws JoinException {
+ if (!cancelled) {
+ try {
+ backoffHelper.doUntilSuccess(() -> {
+ try {
+ zkClient.get().delete(nodePath, ZooKeeperUtils.ANY_VERSION);
+ return true;
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new JoinException("Interrupted trying to cancel membership: " + nodePath, e);
+ } catch (ZooKeeperConnectionException e) {
+ LOG.warn("Problem connecting to ZooKeeper, retrying", e);
+ return false;
+ } catch (NoNodeException e) {
+ LOG.info("Membership already cancelled, node at path: " + nodePath +
+ " has been deleted");
+ return true;
+ } catch (KeeperException e) {
+ if (zkClient.shouldRetry(e)) {
+ LOG.warn("Temporary error cancelling membership: " + nodePath, e);
+ return false;
+ } else {
+ throw new JoinException("Problem cancelling membership: " + nodePath, e);
+ }
+ }
+ });
+ cancelled = true; // Prevent auto-re-join logic from undoing this cancel.
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new JoinException("Problem cancelling membership: " + nodePath, e);
+ }
+ }
+ }
+
+ private class CancelledException extends IllegalStateException { /* marker */ }
+
+ synchronized Membership join()
+ throws ZooKeeperConnectionException, InterruptedException, KeeperException {
+
+ if (cancelled) {
+ throw new CancelledException();
+ }
+
+ if (nodePath == null) {
+ // Re-join if our ephemeral node goes away due to session expiry - only needs to be
+ // registered once.
+ zkClient.registerExpirationHandler(this::tryJoin);
+ }
+
+ byte[] membershipData = memberData.get();
+ String nodeName = nodeScheme.createName(membershipData);
+ CreateMode createMode = nodeScheme.isSequential()
+ ? CreateMode.EPHEMERAL_SEQUENTIAL
+ : CreateMode.EPHEMERAL;
+ nodePath = zkClient.get().create(path + "/" + nodeName, membershipData, acl, createMode);
+ memberId = Group.this.getMemberId(nodePath);
+ LOG.info("Set group member ID to " + memberId);
+ this.membershipData = membershipData;
+
+ // Re-join if our ephemeral node goes away due to maliciousness.
+ zkClient.get().exists(nodePath, event -> {
+ if (event.getType() == EventType.NodeDeleted) {
+ tryJoin();
+ }
+ });
+
+ return this;
+ }
+
+ private final ExceptionalSupplier<Boolean, InterruptedException> tryJoin =
+ () -> {
+ try {
+ join();
+ return true;
+ } catch (CancelledException e) {
+ // Lost a cancel race - that's ok.
+ return true;
+ } catch (ZooKeeperConnectionException e) {
+ LOG.warn("Problem connecting to ZooKeeper, retrying", e);
+ return false;
+ } catch (KeeperException e) {
+ if (zkClient.shouldRetry(e)) {
+ LOG.warn("Temporary error re-joining group: " + path, e);
+ return false;
+ } else {
+ throw new IllegalStateException("Permanent problem re-joining group: " + path, e);
+ }
+ }
+ };
+
+ private synchronized void tryJoin() {
+ onLoseMembership.execute();
+ try {
+ backoffHelper.doUntilSuccess(tryJoin);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException(
+ String.format("Interrupted while trying to re-join group: %s, giving up", path), e);
+ }
+ }
+ }
+
+ /**
+ * An interface to an object that listens for changes to a group's membership.
+ */
+ public interface GroupChangeListener {
+
+ /**
+ * Called whenever group membership changes with the new list of member ids.
+ *
+ * @param memberIds the current member ids
+ */
+ void onGroupChange(Iterable<String> memberIds);
+ }
+
+ /**
+ * An interface that dictates the scheme to use for storing and filtering nodes that represent
+ * members of a distributed group.
+ */
+ public interface NodeScheme {
+ /**
+ * Determines if a child node is a member of a group by examining the node's name.
+ *
+ * @param nodeName the name of a child node found in a group
+ * @return {@code true} if {@code nodeName} identifies a group member in this scheme
+ */
+ boolean isMember(String nodeName);
+
+ /**
+ * Generates a node name for the node representing this process in the distributed group.
+ *
+ * @param membershipData the data that will be stored in this node
+ * @return the name for the node that will represent this process in the group
+ */
+ String createName(byte[] membershipData);
+
+ /**
+ * Indicates whether this scheme needs ephemeral sequential nodes or just ephemeral nodes.
+ *
+ * @return {@code true} if this scheme requires sequential node names; {@code false} otherwise
+ */
+ boolean isSequential();
+ }
+
+ /**
+ * Indicates an error watching a group.
+ */
+ public static class WatchException extends Exception {
+ public WatchException(String message, Throwable cause) {
+ super(message, cause);
+ }
+ }
+
+ /**
+ * Watches this group for the lifetime of this jvm process. This method will block until the
+ * current group members are available, notify the {@code groupChangeListener} and then return.
+ * All further changes to the group membership will cause notifications on a background thread.
+ *
+ * @param groupChangeListener the listener to notify of group membership change events
+ * @return A command which, when executed, will stop watching the group.
+ * @throws WatchException if there is a problem generating the 1st group membership list
+ * @throws InterruptedException if interrupted waiting to gather the 1st group membership list
+ */
+ public final Command watch(final GroupChangeListener groupChangeListener)
+ throws WatchException, InterruptedException {
+ Preconditions.checkNotNull(groupChangeListener);
+
+ try {
+ ensurePersistentGroupPath();
+ } catch (JoinException e) {
+ throw new WatchException("Failed to create group path: " + path, e);
+ }
+
+ final GroupMonitor groupMonitor = new GroupMonitor(groupChangeListener);
+ backoffHelper.doUntilSuccess(() -> {
+ try {
+ groupMonitor.watchGroup();
+ return true;
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new WatchException("Interrupted trying to watch group at path: " + path, e);
+ } catch (ZooKeeperConnectionException e) {
+ LOG.warn("Temporary error trying to watch group at path: " + path, e);
+ return null;
+ } catch (KeeperException e) {
+ if (zkClient.shouldRetry(e)) {
+ LOG.warn("Temporary error trying to watch group at path: " + path, e);
+ return null;
+ } else {
+ throw new WatchException("Problem trying to watch group at path: " + path, e);
+ }
+ }
+ });
+ return groupMonitor::stopWatching;
+ }
+
+ /**
+ * Helps continuously monitor a group for membership changes.
+ */
+ private class GroupMonitor {
+ private final GroupChangeListener groupChangeListener;
+ private volatile boolean stopped = false;
+ private Set<String> members;
+
+ GroupMonitor(GroupChangeListener groupChangeListener) {
+ this.groupChangeListener = groupChangeListener;
+ }
+
+ private final Watcher groupWatcher = event -> {
+ if (event.getType() == EventType.NodeChildrenChanged) {
+ tryWatchGroup();
+ }
+ };
+
+ private final ExceptionalSupplier<Boolean, InterruptedException> tryWatchGroup =
+ () -> {
+ try {
+ watchGroup();
+ return true;
+ } catch (ZooKeeperConnectionException e) {
+ LOG.warn("Problem connecting to ZooKeeper, retrying", e);
+ return false;
+ } catch (KeeperException e) {
+ if (zkClient.shouldRetry(e)) {
+ LOG.warn("Temporary error re-watching group: " + path, e);
+ return false;
+ } else {
+ throw new IllegalStateException("Permanent problem re-watching group: " + path, e);
+ }
+ }
+ };
+
+ private void tryWatchGroup() {
+ if (stopped) {
+ return;
+ }
+
+ try {
+ backoffHelper.doUntilSuccess(tryWatchGroup);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException(
+ String.format("Interrupted while trying to re-watch group: %s, giving up", path), e);
+ }
+ }
+
+ private void watchGroup()
+ throws ZooKeeperConnectionException, InterruptedException, KeeperException {
+
+ if (stopped) {
+ return;
+ }
+
+ List<String> children = zkClient.get().getChildren(path, groupWatcher);
+ setMembers(Iterables.filter(children, nodeNameFilter));
+ }
+
+ private void stopWatching() {
+ // TODO(William Farner): Cancel the watch when
+ // https://issues.apache.org/jira/browse/ZOOKEEPER-442 is resolved.
+ LOG.info("Stopping watch on " + this);
+ stopped = true;
+ }
+
+ synchronized void setMembers(Iterable<String> members) {
+ if (stopped) {
+ LOG.info("Suppressing membership update, no longer watching " + this);
+ return;
+ }
+
+ if (this.members == null) {
+ // Reset our watch on the group if session expires - only needs to be registered once.
+ zkClient.registerExpirationHandler(this::tryWatchGroup);
+ }
+
+ Set<String> membership = ImmutableSet.copyOf(members);
+ if (!membership.equals(this.members)) {
+ groupChangeListener.onGroupChange(members);
+ this.members = membership;
+ }
+ }
+ }
+
+ /**
+ * Default naming scheme implementation. Stores nodes at [given path] + "/" + [given prefix] +
+ * ZooKeeper-generated member ID. For example, if the path is "/discovery/servicename", and the
+ * prefix is "member_", the node's full path will look something like
+ * {@code /discovery/servicename/member_0000000007}.
+ */
+ public static class DefaultScheme implements NodeScheme {
+ private final String namePrefix;
+ private final Pattern namePattern;
+
+ /**
+ * Creates a sequential node scheme based on the given node name prefix.
+ *
+ * @param namePrefix the prefix for the names of the member nodes
+ */
+ public DefaultScheme(String namePrefix) {
+ this.namePrefix = MorePreconditions.checkNotBlank(namePrefix);
+ namePattern = Pattern.compile("^" + Pattern.quote(namePrefix) + "-?[0-9]+$");
+ }
+
+ @Override
+ public boolean isMember(String nodeName) {
+ return namePattern.matcher(nodeName).matches();
+ }
+
+ @Override
+ public String createName(byte[] membershipData) {
+ return namePrefix;
+ }
+
+ @Override
+ public boolean isSequential() {
+ return true;
+ }
+ }
+
+ @Override
+ public String toString() {
+ return "Group " + path;
+ }
+}
http://git-wip-us.apache.org/repos/asf/aurora/blob/16e4651d/commons/src/main/java/org/apache/aurora/common/zookeeper/JsonCodec.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/org/apache/aurora/common/zookeeper/JsonCodec.java b/commons/src/main/java/org/apache/aurora/common/zookeeper/JsonCodec.java
new file mode 100644
index 0000000..9d31608
--- /dev/null
+++ b/commons/src/main/java/org/apache/aurora/common/zookeeper/JsonCodec.java
@@ -0,0 +1,139 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.common.zookeeper;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.OutputStream;
+import java.io.OutputStreamWriter;
+import java.io.Writer;
+import java.nio.charset.Charset;
+import java.util.Map;
+
+import javax.annotation.Nullable;
+
+import com.google.common.base.Charsets;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Maps;
+import com.google.gson.Gson;
+import com.google.gson.JsonIOException;
+import com.google.gson.JsonParseException;
+
+import org.apache.aurora.common.io.Codec;
+import org.apache.aurora.common.thrift.Endpoint;
+import org.apache.aurora.common.thrift.ServiceInstance;
+import org.apache.aurora.common.thrift.Status;
+
+import static java.util.Objects.requireNonNull;
+
+class JsonCodec implements Codec<ServiceInstance> {
+
+ private static void assertRequiredField(String fieldName, Object fieldValue) {
+ if (fieldValue == null) {
+ throw new JsonParseException(String.format("Field %s is required", fieldName));
+ }
+ }
+
+ private static class EndpointSchema {
+ private final String host;
+ private final Integer port;
+
+ EndpointSchema(Endpoint endpoint) {
+ host = endpoint.getHost();
+ port = endpoint.getPort();
+ }
+
+ Endpoint asEndpoint() {
+ assertRequiredField("host", host);
+ assertRequiredField("port", port);
+
+ return new Endpoint(host, port);
+ }
+ }
+
+ private static class ServiceInstanceSchema {
+ private final EndpointSchema serviceEndpoint;
+ private final Map<String, EndpointSchema> additionalEndpoints;
+ private final Status status;
+ private final @Nullable Integer shard;
+
+ ServiceInstanceSchema(ServiceInstance instance) {
+ serviceEndpoint = new EndpointSchema(instance.getServiceEndpoint());
+ if (instance.isSetAdditionalEndpoints()) {
+ additionalEndpoints =
+ Maps.transformValues(instance.getAdditionalEndpoints(), EndpointSchema::new);
+ } else {
+ additionalEndpoints = ImmutableMap.of();
+ }
+ status = instance.getStatus();
+ shard = instance.isSetShard() ? instance.getShard() : null;
+ }
+
+ ServiceInstance asServiceInstance() {
+ assertRequiredField("serviceEndpoint", serviceEndpoint);
+ assertRequiredField("status", status);
+
+ Map<String, EndpointSchema> extraEndpoints =
+ additionalEndpoints == null ? ImmutableMap.of() : additionalEndpoints;
+
+ ServiceInstance instance =
+ new ServiceInstance(
+ serviceEndpoint.asEndpoint(),
+ Maps.transformValues(extraEndpoints, EndpointSchema::asEndpoint),
+ status);
+ if (shard != null) {
+ instance.setShard(shard);
+ }
+ return instance;
+ }
+ }
+
+ private static final Charset ENCODING = Charsets.UTF_8;
+
+ private final Gson gson;
+
+ JsonCodec() {
+ this(new Gson());
+ }
+
+ JsonCodec(Gson gson) {
+ this.gson = requireNonNull(gson);
+ }
+
+ @Override
+ public void serialize(ServiceInstance instance, OutputStream sink) throws IOException {
+ Writer writer = new OutputStreamWriter(sink, ENCODING);
+ try {
+ gson.toJson(new ServiceInstanceSchema(instance), writer);
+ } catch (JsonIOException e) {
+ throw new IOException(String.format("Problem serializing %s to JSON", instance), e);
+ }
+ writer.flush();
+ }
+
+ @Override
+ public ServiceInstance deserialize(InputStream source) throws IOException {
+ InputStreamReader reader = new InputStreamReader(source, ENCODING);
+ try {
+ @Nullable ServiceInstanceSchema schema = gson.fromJson(reader, ServiceInstanceSchema.class);
+ if (schema == null) {
+ throw new IOException("JSON did not include a ServiceInstance object");
+ }
+ return schema.asServiceInstance();
+ } catch (JsonParseException e) {
+ throw new IOException("Problem parsing JSON ServiceInstance.", e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/aurora/blob/16e4651d/commons/src/main/java/org/apache/aurora/common/zookeeper/ServerSet.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/org/apache/aurora/common/zookeeper/ServerSet.java b/commons/src/main/java/org/apache/aurora/common/zookeeper/ServerSet.java
new file mode 100644
index 0000000..aeea02d
--- /dev/null
+++ b/commons/src/main/java/org/apache/aurora/common/zookeeper/ServerSet.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.common.zookeeper;
+
+import java.net.InetSocketAddress;
+import java.util.Map;
+
+import org.apache.aurora.common.io.Codec;
+import org.apache.aurora.common.thrift.ServiceInstance;
+import org.apache.aurora.common.zookeeper.Group.JoinException;
+
+/**
+ * A logical set of servers registered in ZooKeeper. Intended to be used by servers in a
+ * common service to advertise their presence to server-set protocol-aware clients.
+ *
+ * Standard implementations should use the {@link #JSON_CODEC} to serialize the service instance
+ * rendezvous data to zookeeper so that standard clients can interoperate.
+ */
+public interface ServerSet {
+
+ /**
+ * Encodes a {@link ServiceInstance} as a JSON object.
+ *
+ * This is the default encoding for service instance data in ZooKeeper.
+ */
+ Codec<ServiceInstance> JSON_CODEC = new JsonCodec();
+
+ /**
+ * Attempts to join a server set for this logical service group.
+ *
+ * @param endpoint the primary service endpoint
+ * @param additionalEndpoints and additional endpoints keyed by their logical name
+ * @return an EndpointStatus object that allows the endpoint to adjust its status
+ * @throws JoinException if there was a problem joining the server set
+ * @throws InterruptedException if interrupted while waiting to join the server set
+ */
+ EndpointStatus join(
+ InetSocketAddress endpoint,
+ Map<String, InetSocketAddress> additionalEndpoints)
+ throws JoinException, InterruptedException;
+
+ /**
+ * A handle to a service endpoint's status data that allows updating it to track current events.
+ */
+ interface EndpointStatus {
+
+ /**
+ * Removes the endpoint from the server set.
+ *
+ * @throws UpdateException if there was a problem leaving the ServerSet.
+ */
+ void leave() throws UpdateException;
+ }
+
+ /**
+ * Indicates an error updating a service's status information.
+ */
+ class UpdateException extends Exception {
+ public UpdateException(String message, Throwable cause) {
+ super(message, cause);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/aurora/blob/16e4651d/commons/src/main/java/org/apache/aurora/common/zookeeper/ServerSetImpl.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/org/apache/aurora/common/zookeeper/ServerSetImpl.java b/commons/src/main/java/org/apache/aurora/common/zookeeper/ServerSetImpl.java
new file mode 100644
index 0000000..ace4980
--- /dev/null
+++ b/commons/src/main/java/org/apache/aurora/common/zookeeper/ServerSetImpl.java
@@ -0,0 +1,349 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.common.zookeeper;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.Map;
+import java.util.Set;
+
+import javax.annotation.Nullable;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Function;
+import com.google.common.base.Joiner;
+import com.google.common.base.Predicates;
+import com.google.common.base.Supplier;
+import com.google.common.base.Throwables;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.ImmutableSortedSet;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import com.google.common.collect.Sets.SetView;
+import com.google.common.util.concurrent.UncheckedExecutionException;
+
+import org.apache.aurora.common.base.Command;
+import org.apache.aurora.common.io.Codec;
+import org.apache.aurora.common.net.pool.DynamicHostSet;
+import org.apache.aurora.common.thrift.ServiceInstance;
+import org.apache.aurora.common.thrift.Status;
+import org.apache.aurora.common.util.BackoffHelper;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.KeeperException.NoNodeException;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.data.ACL;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * ZooKeeper-backed implementation of {@link ServerSet} and {@link DynamicHostSet}.
+ */
+public class ServerSetImpl implements ServerSet, DynamicHostSet<ServiceInstance> {
+ private static final Logger LOG = LoggerFactory.getLogger(ServerSetImpl.class);
+
+ private final ZooKeeperClient zkClient;
+ private final Group group;
+ private final Codec<ServiceInstance> codec;
+ private final BackoffHelper backoffHelper;
+
+ /**
+ * Creates a new ServerSet using open ZooKeeper node ACLs.
+ *
+ * @param zkClient the client to use for interactions with ZooKeeper
+ * @param path the name-service path of the service to connect to
+ */
+ public ServerSetImpl(ZooKeeperClient zkClient, String path) {
+ this(zkClient, ZooDefs.Ids.OPEN_ACL_UNSAFE, path);
+ }
+
+ /**
+ * Creates a new ServerSet for the given service {@code path}.
+ *
+ * @param zkClient the client to use for interactions with ZooKeeper
+ * @param acl the ACL to use for creating the persistent group path if it does not already exist
+ * @param path the name-service path of the service to connect to
+ */
+ public ServerSetImpl(ZooKeeperClient zkClient, Iterable<ACL> acl, String path) {
+ this(zkClient, new Group(zkClient, acl, path), JSON_CODEC);
+ }
+
+ /**
+ * Creates a new ServerSet using the given service {@code group}.
+ *
+ * @param zkClient the client to use for interactions with ZooKeeper
+ * @param group the server group
+ */
+ public ServerSetImpl(ZooKeeperClient zkClient, Group group) {
+ this(zkClient, group, JSON_CODEC);
+ }
+
+ /**
+ * Creates a new ServerSet using the given service {@code group} and a custom {@code codec}.
+ *
+ * @param zkClient the client to use for interactions with ZooKeeper
+ * @param group the server group
+ * @param codec a codec to use for serializing and de-serializing the ServiceInstance data to and
+ * from a byte array
+ */
+ public ServerSetImpl(ZooKeeperClient zkClient, Group group, Codec<ServiceInstance> codec) {
+ this.zkClient = checkNotNull(zkClient);
+ this.group = checkNotNull(group);
+ this.codec = checkNotNull(codec);
+
+ // TODO(John Sirois): Inject the helper so that backoff strategy can be configurable.
+ backoffHelper = new BackoffHelper();
+ }
+
+ @VisibleForTesting
+ ZooKeeperClient getZkClient() {
+ return zkClient;
+ }
+
+ @Override
+ public EndpointStatus join(
+ InetSocketAddress endpoint,
+ Map<String, InetSocketAddress> additionalEndpoints)
+ throws Group.JoinException, InterruptedException {
+
+ checkNotNull(endpoint);
+ checkNotNull(additionalEndpoints);
+
+ MemberStatus memberStatus = new MemberStatus(endpoint, additionalEndpoints);
+ Supplier<byte[]> serviceInstanceSupplier = memberStatus::serializeServiceInstance;
+ Group.Membership membership = group.join(serviceInstanceSupplier);
+
+ return () -> memberStatus.leave(membership);
+ }
+
+ @Override
+ public Command watch(HostChangeMonitor<ServiceInstance> monitor) throws MonitorException {
+ ServerSetWatcher serverSetWatcher = new ServerSetWatcher(zkClient, monitor);
+ try {
+ return serverSetWatcher.watch();
+ } catch (Group.WatchException e) {
+ throw new MonitorException("ZooKeeper watch failed.", e);
+ } catch (InterruptedException e) {
+ throw new MonitorException("Interrupted while watching ZooKeeper.", e);
+ }
+ }
+
+ private class MemberStatus {
+ private final InetSocketAddress endpoint;
+ private final Map<String, InetSocketAddress> additionalEndpoints;
+
+ private MemberStatus(
+ InetSocketAddress endpoint,
+ Map<String, InetSocketAddress> additionalEndpoints) {
+
+ this.endpoint = endpoint;
+ this.additionalEndpoints = additionalEndpoints;
+ }
+
+ synchronized void leave(Group.Membership membership) throws UpdateException {
+ try {
+ membership.cancel();
+ } catch (Group.JoinException e) {
+ throw new UpdateException(
+ "Failed to auto-cancel group membership on transition to DEAD status", e);
+ }
+ }
+
+ byte[] serializeServiceInstance() {
+ ServiceInstance serviceInstance = new ServiceInstance(
+ ServerSets.toEndpoint(endpoint),
+ Maps.transformValues(additionalEndpoints, ServerSets.TO_ENDPOINT),
+ Status.ALIVE);
+
+ LOG.debug("updating endpoint data to:\n\t" + serviceInstance);
+ try {
+ return ServerSets.serializeServiceInstance(serviceInstance, codec);
+ } catch (IOException e) {
+ throw new IllegalStateException("Unexpected problem serializing thrift struct " +
+ serviceInstance + "to a byte[]", e);
+ }
+ }
+ }
+
+ private static class ServiceInstanceFetchException extends RuntimeException {
+ ServiceInstanceFetchException(String message, Throwable cause) {
+ super(message, cause);
+ }
+ }
+
+ private static class ServiceInstanceDeletedException extends RuntimeException {
+ ServiceInstanceDeletedException(String path) {
+ super(path);
+ }
+ }
+
+ private class ServerSetWatcher {
+ private final ZooKeeperClient zkClient;
+ private final HostChangeMonitor<ServiceInstance> monitor;
+ @Nullable private ImmutableSet<ServiceInstance> serverSet;
+
+ ServerSetWatcher(ZooKeeperClient zkClient, HostChangeMonitor<ServiceInstance> monitor) {
+ this.zkClient = zkClient;
+ this.monitor = monitor;
+ }
+
+ public Command watch() throws Group.WatchException, InterruptedException {
+ Watcher onExpirationWatcher = zkClient.registerExpirationHandler(this::rebuildServerSet);
+
+ try {
+ return group.watch(this::notifyGroupChange);
+ } catch (Group.WatchException e) {
+ zkClient.unregister(onExpirationWatcher);
+ throw e;
+ } catch (InterruptedException e) {
+ zkClient.unregister(onExpirationWatcher);
+ throw e;
+ }
+ }
+
+ private ServiceInstance getServiceInstance(final String nodePath) {
+ try {
+ return backoffHelper.doUntilResult(() -> {
+ try {
+ byte[] data = zkClient.get().getData(nodePath, false, null);
+ return ServerSets.deserializeServiceInstance(data, codec);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new ServiceInstanceFetchException(
+ "Interrupted updating service data for: " + nodePath, e);
+ } catch (ZooKeeperClient.ZooKeeperConnectionException e) {
+ LOG.warn("Temporary error trying to updating service data for: " + nodePath, e);
+ return null;
+ } catch (NoNodeException e) {
+ invalidateNodePath(nodePath);
+ throw new ServiceInstanceDeletedException(nodePath);
+ } catch (KeeperException e) {
+ if (zkClient.shouldRetry(e)) {
+ LOG.warn("Temporary error trying to update service data for: " + nodePath, e);
+ return null;
+ } else {
+ throw new ServiceInstanceFetchException(
+ "Failed to update service data for: " + nodePath, e);
+ }
+ } catch (IOException e) {
+ throw new ServiceInstanceFetchException(
+ "Failed to deserialize the ServiceInstance data for: " + nodePath, e);
+ }
+ });
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new ServiceInstanceFetchException(
+ "Interrupted trying to update service data for: " + nodePath, e);
+ }
+ }
+
+ private final LoadingCache<String, ServiceInstance> servicesByMemberId =
+ CacheBuilder.newBuilder().build(new CacheLoader<String, ServiceInstance>() {
+ @Override public ServiceInstance load(String memberId) {
+ return getServiceInstance(group.getMemberPath(memberId));
+ }
+ });
+
+ private void rebuildServerSet() {
+ Set<String> memberIds = ImmutableSet.copyOf(servicesByMemberId.asMap().keySet());
+ servicesByMemberId.invalidateAll();
+ notifyGroupChange(memberIds);
+ }
+
+ private String invalidateNodePath(String deletedPath) {
+ String memberId = group.getMemberId(deletedPath);
+ servicesByMemberId.invalidate(memberId);
+ return memberId;
+ }
+
+ private final Function<String, ServiceInstance> MAYBE_FETCH_NODE =
+ memberId -> {
+ // This get will trigger a fetch
+ try {
+ return servicesByMemberId.getUnchecked(memberId);
+ } catch (UncheckedExecutionException e) {
+ Throwable cause = e.getCause();
+ if (!(cause instanceof ServiceInstanceDeletedException)) {
+ Throwables.propagateIfInstanceOf(cause, ServiceInstanceFetchException.class);
+ throw new IllegalStateException(
+ "Unexpected error fetching member data for: " + memberId, e);
+ }
+ return null;
+ }
+ };
+
+ private synchronized void notifyGroupChange(Iterable<String> memberIds) {
+ ImmutableSet<String> newMemberIds = ImmutableSortedSet.copyOf(memberIds);
+ Set<String> existingMemberIds = servicesByMemberId.asMap().keySet();
+
+ // Ignore no-op state changes except for the 1st when we've seen no group yet.
+ if ((serverSet == null) || !newMemberIds.equals(existingMemberIds)) {
+ SetView<String> deletedMemberIds = Sets.difference(existingMemberIds, newMemberIds);
+ // Implicit removal from servicesByMemberId.
+ existingMemberIds.removeAll(ImmutableSet.copyOf(deletedMemberIds));
+
+ Iterable<ServiceInstance> serviceInstances = Iterables.filter(
+ Iterables.transform(newMemberIds, MAYBE_FETCH_NODE), Predicates.notNull());
+
+ notifyServerSetChange(ImmutableSet.copyOf(serviceInstances));
+ }
+ }
+
+ private void notifyServerSetChange(ImmutableSet<ServiceInstance> currentServerSet) {
+ // ZK nodes may have changed if there was a session expiry for a server in the server set, but
+ // if the server's status has not changed, we can skip any onChange updates.
+ if (!currentServerSet.equals(serverSet)) {
+ if (currentServerSet.isEmpty()) {
+ LOG.warn("server set empty for path " + group.getPath());
+ } else {
+ if (serverSet == null) {
+ LOG.info("received initial membership {}", currentServerSet);
+ } else {
+ logChange(currentServerSet);
+ }
+ }
+ serverSet = currentServerSet;
+ monitor.onChange(serverSet);
+ }
+ }
+
+ private void logChange(ImmutableSet<ServiceInstance> newServerSet) {
+ StringBuilder message = new StringBuilder("server set " + group.getPath() + " change: ");
+ if (serverSet.size() != newServerSet.size()) {
+ message.append("from ").append(serverSet.size())
+ .append(" members to ").append(newServerSet.size());
+ }
+
+ Joiner joiner = Joiner.on("\n\t\t");
+
+ SetView<ServiceInstance> left = Sets.difference(serverSet, newServerSet);
+ if (!left.isEmpty()) {
+ message.append("\n\tleft:\n\t\t").append(joiner.join(left));
+ }
+
+ SetView<ServiceInstance> joined = Sets.difference(newServerSet, serverSet);
+ if (!joined.isEmpty()) {
+ message.append("\n\tjoined:\n\t\t").append(joiner.join(joined));
+ }
+
+ LOG.info(message.toString());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/aurora/blob/16e4651d/commons/src/main/java/org/apache/aurora/common/zookeeper/ServerSets.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/org/apache/aurora/common/zookeeper/ServerSets.java b/commons/src/main/java/org/apache/aurora/common/zookeeper/ServerSets.java
new file mode 100644
index 0000000..01a54a5
--- /dev/null
+++ b/commons/src/main/java/org/apache/aurora/common/zookeeper/ServerSets.java
@@ -0,0 +1,118 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.common.zookeeper;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.Map;
+
+import com.google.common.base.Function;
+import com.google.common.base.Preconditions;
+
+import org.apache.aurora.common.base.MorePreconditions;
+import org.apache.aurora.common.io.Codec;
+import org.apache.aurora.common.thrift.Endpoint;
+import org.apache.aurora.common.thrift.ServiceInstance;
+import org.apache.aurora.common.thrift.Status;
+import org.apache.zookeeper.data.ACL;
+
+/**
+ * Common ServerSet related functions
+ */
+public class ServerSets {
+
+ private ServerSets() {
+ // Utility class.
+ }
+
+ /**
+ * A function that invokes {@link #toEndpoint(InetSocketAddress)}.
+ */
+ public static final Function<InetSocketAddress, Endpoint> TO_ENDPOINT =
+ ServerSets::toEndpoint;
+
+ /**
+ * Creates a server set that registers at a single path applying the given ACL to all nodes
+ * created in the path.
+ *
+ * @param zkClient ZooKeeper client to register with.
+ * @param acl The ACL to apply to the {@code zkPath} nodes the ServerSet creates.
+ * @param zkPath Path to register at. @see #create(ZooKeeperClient, java.util.Set)
+ * @return A server set that registers at {@code zkPath}.
+ */
+ public static ServerSet create(ZooKeeperClient zkClient, Iterable<ACL> acl, String zkPath) {
+ Preconditions.checkNotNull(zkClient);
+ MorePreconditions.checkNotBlank(acl);
+ MorePreconditions.checkNotBlank(zkPath);
+
+ return new ServerSetImpl(zkClient, acl, zkPath);
+ }
+
+ /**
+ * Returns a serialized Thrift service instance object, with given endpoints and codec.
+ *
+ * @param serviceInstance the Thrift service instance object to be serialized
+ * @param codec the codec to use to serialize a Thrift service instance object
+ * @return byte array that contains a serialized Thrift service instance
+ */
+ public static byte[] serializeServiceInstance(
+ ServiceInstance serviceInstance, Codec<ServiceInstance> codec) throws IOException {
+
+ ByteArrayOutputStream output = new ByteArrayOutputStream();
+ codec.serialize(serviceInstance, output);
+ return output.toByteArray();
+ }
+
+ /**
+ * Serializes a service instance based on endpoints.
+ * @see #serializeServiceInstance(ServiceInstance, Codec)
+ *
+ * @param address the target address of the service instance
+ * @param additionalEndpoints additional endpoints of the service instance
+ * @param status service status
+ */
+ public static byte[] serializeServiceInstance(
+ InetSocketAddress address,
+ Map<String, Endpoint> additionalEndpoints,
+ Status status,
+ Codec<ServiceInstance> codec) throws IOException {
+
+ ServiceInstance serviceInstance =
+ new ServiceInstance(toEndpoint(address), additionalEndpoints, status);
+ return serializeServiceInstance(serviceInstance, codec);
+ }
+
+ /**
+ * Creates a service instance object deserialized from byte array.
+ *
+ * @param data the byte array contains a serialized Thrift service instance
+ * @param codec the codec to use to deserialize the byte array
+ */
+ public static ServiceInstance deserializeServiceInstance(
+ byte[] data, Codec<ServiceInstance> codec) throws IOException {
+
+ return codec.deserialize(new ByteArrayInputStream(data));
+ }
+
+ /**
+ * Creates an endpoint for the given InetSocketAddress.
+ *
+ * @param address the target address to create the endpoint for
+ */
+ public static Endpoint toEndpoint(InetSocketAddress address) {
+ return new Endpoint(address.getHostName(), address.getPort());
+ }
+}
http://git-wip-us.apache.org/repos/asf/aurora/blob/16e4651d/commons/src/main/java/org/apache/aurora/common/zookeeper/SingletonService.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/org/apache/aurora/common/zookeeper/SingletonService.java b/commons/src/main/java/org/apache/aurora/common/zookeeper/SingletonService.java
new file mode 100644
index 0000000..7f962eb
--- /dev/null
+++ b/commons/src/main/java/org/apache/aurora/common/zookeeper/SingletonService.java
@@ -0,0 +1,114 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.common.zookeeper;
+
+import java.net.InetSocketAddress;
+import java.util.Map;
+
+/**
+ * A service that uses master election to only allow a single service instance to be active amongst
+ * a set of potential servers at a time.
+ */
+public interface SingletonService {
+
+ /**
+ * Indicates an error attempting to lead a group of servers.
+ */
+ class LeadException extends Exception {
+ public LeadException(String message, Throwable cause) {
+ super(message, cause);
+ }
+ }
+
+ /**
+ * Indicates an error attempting to advertise leadership of a group of servers.
+ */
+ class AdvertiseException extends Exception {
+ public AdvertiseException(String message) {
+ super(message);
+ }
+
+ public AdvertiseException(String message, Throwable cause) {
+ super(message, cause);
+ }
+ }
+
+ /**
+ * Indicates an error attempting to leave a group of servers, abdicating leadership of the group.
+ */
+ class LeaveException extends Exception {
+ public LeaveException(String message, Throwable cause) {
+ super(message, cause);
+ }
+ }
+
+ /**
+ * Attempts to lead the singleton service.
+ *
+ * @param endpoint The primary endpoint to register as a leader candidate in the service.
+ * @param additionalEndpoints Additional endpoints that are available on the host.
+ * @param listener Handler to call when the candidate is elected or defeated.
+ * @throws LeadException If there was a problem joining or watching the ZooKeeper group.
+ * @throws InterruptedException If the thread watching/joining the group was interrupted.
+ */
+ void lead(
+ InetSocketAddress endpoint,
+ Map<String, InetSocketAddress> additionalEndpoints,
+ LeadershipListener listener)
+ throws LeadException, InterruptedException;
+
+ /**
+ * A listener to be notified of changes in the leadership status.
+ * Implementers should be careful to avoid blocking operations in these callbacks.
+ */
+ interface LeadershipListener {
+
+ /**
+ * Notifies the listener that is is current leader.
+ *
+ * @param control A controller handle to advertise and/or leave advertised presence.
+ */
+ void onLeading(LeaderControl control);
+
+ /**
+ * Notifies the listener that it is no longer leader.
+ */
+ void onDefeated();
+ }
+
+ /**
+ * A controller for the state of the leader. This will be provided to the leader upon election,
+ * which allows the leader to decide when to advertise as leader of the server set and terminate
+ * leadership at will.
+ */
+ interface LeaderControl {
+
+ /**
+ * Advertises the leader's server presence to clients.
+ *
+ * @throws AdvertiseException If there was an error advertising the singleton leader to clients
+ * of the server set.
+ * @throws InterruptedException If interrupted while advertising.
+ */
+ void advertise() throws AdvertiseException, InterruptedException;
+
+ /**
+ * Leaves candidacy for leadership, removing advertised server presence if applicable.
+ *
+ * @throws LeaveException If the leader's status could not be updated or there was an error
+ * abdicating server set leadership.
+ */
+ void leave() throws LeaveException;
+ }
+}
http://git-wip-us.apache.org/repos/asf/aurora/blob/16e4651d/commons/src/main/java/org/apache/aurora/common/zookeeper/SingletonServiceImpl.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/org/apache/aurora/common/zookeeper/SingletonServiceImpl.java b/commons/src/main/java/org/apache/aurora/common/zookeeper/SingletonServiceImpl.java
new file mode 100644
index 0000000..d9978a9
--- /dev/null
+++ b/commons/src/main/java/org/apache/aurora/common/zookeeper/SingletonServiceImpl.java
@@ -0,0 +1,122 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.common.zookeeper;
+
+import java.net.InetSocketAddress;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+
+import org.apache.aurora.common.base.ExceptionalCommand;
+import org.apache.aurora.common.zookeeper.Candidate.Leader;
+import org.apache.aurora.common.zookeeper.Group.JoinException;
+import org.apache.zookeeper.data.ACL;
+
+public class SingletonServiceImpl implements SingletonService {
+ @VisibleForTesting
+ static final String LEADER_ELECT_NODE_PREFIX = "singleton_candidate_";
+
+ /**
+ * Creates a candidate that can be combined with an existing server set to form a singleton
+ * service using {@link #SingletonServiceImpl(ServerSet, Candidate)}.
+ *
+ * @param zkClient The ZooKeeper client to use.
+ * @param servicePath The path where service nodes live.
+ * @param acl The acl to apply to newly created candidate nodes and serverset nodes.
+ * @return A candidate that can be housed with a standard server set under a single zk path.
+ */
+ public static Candidate createSingletonCandidate(
+ ZooKeeperClient zkClient,
+ String servicePath,
+ Iterable<ACL> acl) {
+
+ return new CandidateImpl(new Group(zkClient, acl, servicePath, LEADER_ELECT_NODE_PREFIX));
+ }
+
+ private final ServerSet serverSet;
+ private final Candidate candidate;
+
+ /**
+ * Creates a new singleton service that uses the supplied candidate to vie for leadership and then
+ * advertises itself in the given server set once elected.
+ *
+ * @param serverSet The server set to advertise in on election.
+ * @param candidate The candidacy to use to vie for election.
+ */
+ public SingletonServiceImpl(ServerSet serverSet, Candidate candidate) {
+ this.serverSet = Preconditions.checkNotNull(serverSet);
+ this.candidate = Preconditions.checkNotNull(candidate);
+ }
+
+ @Override
+ public void lead(final InetSocketAddress endpoint,
+ final Map<String, InetSocketAddress> additionalEndpoints,
+ final LeadershipListener listener)
+ throws LeadException, InterruptedException {
+
+ Preconditions.checkNotNull(listener);
+
+ try {
+ candidate.offerLeadership(new Leader() {
+ @Override public void onElected(final ExceptionalCommand<JoinException> abdicate) {
+ listener.onLeading(new LeaderControl() {
+ ServerSet.EndpointStatus endpointStatus = null;
+ final AtomicBoolean left = new AtomicBoolean(false);
+
+ // Methods are synchronized to prevent simultaneous invocations.
+ @Override public synchronized void advertise()
+ throws AdvertiseException, InterruptedException {
+
+ Preconditions.checkState(!left.get(), "Cannot advertise after leaving.");
+ Preconditions.checkState(endpointStatus == null, "Cannot advertise more than once.");
+ try {
+ endpointStatus = serverSet.join(endpoint, additionalEndpoints);
+ } catch (JoinException e) {
+ throw new AdvertiseException("Problem advertising endpoint " + endpoint, e);
+ }
+ }
+
+ @Override public synchronized void leave() throws LeaveException {
+ Preconditions.checkState(left.compareAndSet(false, true),
+ "Cannot leave more than once.");
+ if (endpointStatus != null) {
+ try {
+ endpointStatus.leave();
+ } catch (ServerSet.UpdateException e) {
+ throw new LeaveException("Problem updating endpoint status for abdicating leader " +
+ "at endpoint " + endpoint, e);
+ }
+ }
+ try {
+ abdicate.execute();
+ } catch (JoinException e) {
+ throw new LeaveException("Problem abdicating leadership for endpoint " + endpoint, e);
+ }
+ }
+ });
+ }
+
+ @Override public void onDefeated() {
+ listener.onDefeated();
+ }
+ });
+ } catch (JoinException e) {
+ throw new LeadException("Problem joining leadership group for endpoint " + endpoint, e);
+ } catch (Group.WatchException e) {
+ throw new LeadException("Problem getting initial membership list for leadership group.", e);
+ }
+ }
+}