You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aurora.apache.org by dm...@apache.org on 2016/12/01 17:02:32 UTC
[3/4] aurora git commit: Revert removal of twitter/commons/zk based
leadership code
http://git-wip-us.apache.org/repos/asf/aurora/blob/16e4651d/commons/src/main/java/org/apache/aurora/common/zookeeper/ZooKeeperClient.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/org/apache/aurora/common/zookeeper/ZooKeeperClient.java b/commons/src/main/java/org/apache/aurora/common/zookeeper/ZooKeeperClient.java
new file mode 100644
index 0000000..ce243fb
--- /dev/null
+++ b/commons/src/main/java/org/apache/aurora/common/zookeeper/ZooKeeperClient.java
@@ -0,0 +1,372 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.common.zookeeper;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Joiner;
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterables;
+
+import org.apache.aurora.common.base.Command;
+import org.apache.aurora.common.net.InetSocketAddressHelper;
+import org.apache.aurora.common.quantity.Amount;
+import org.apache.aurora.common.quantity.Time;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.KeeperException.SessionExpiredException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.Watcher.Event.EventType;
+import org.apache.zookeeper.Watcher.Event.KeeperState;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.common.PathUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Manages a connection to a ZooKeeper cluster.
+ */
+public class ZooKeeperClient {
+
+ /**
+ * Indicates an error connecting to a zookeeper cluster.
+ */
+ public class ZooKeeperConnectionException extends Exception {
+ ZooKeeperConnectionException(String message, Throwable cause) {
+ super(message, cause);
+ }
+ }
+
+ private final class SessionState {
+ private final long sessionId;
+ private final byte[] sessionPasswd;
+
+ private SessionState(long sessionId, byte[] sessionPasswd) {
+ this.sessionId = sessionId;
+ this.sessionPasswd = sessionPasswd;
+ }
+ }
+
+ private static final Logger LOG = LoggerFactory.getLogger(ZooKeeperClient.class);
+
+ private static final Amount<Long,Time> WAIT_FOREVER = Amount.of(0L, Time.MILLISECONDS);
+
+ private final int sessionTimeoutMs;
+ private final Optional<Credentials> credentials;
+ private final String zooKeeperServers;
+ // GuardedBy "this", but still volatile for tests, where we want to be able to see writes
+ // made from within long synchronized blocks.
+ private volatile ZooKeeper zooKeeper;
+ private SessionState sessionState;
+
+ private final Set<Watcher> watchers = new CopyOnWriteArraySet<Watcher>();
+ private final BlockingQueue<WatchedEvent> eventQueue = new LinkedBlockingQueue<WatchedEvent>();
+
+ private static Iterable<InetSocketAddress> combine(InetSocketAddress address,
+ InetSocketAddress... addresses) {
+ return ImmutableSet.<InetSocketAddress>builder().add(address).add(addresses).build();
+ }
+
+ /**
+ * Creates an unconnected client that will lazily attempt to connect on the first call to
+ * {@link #get()}.
+ *
+ * @param sessionTimeout the ZK session timeout
+ * @param zooKeeperServer the first, required ZK server
+ * @param zooKeeperServers any additional servers forming the ZK cluster
+ */
+ public ZooKeeperClient(Amount<Integer, Time> sessionTimeout, InetSocketAddress zooKeeperServer,
+ InetSocketAddress... zooKeeperServers) {
+ this(sessionTimeout, combine(zooKeeperServer, zooKeeperServers));
+ }
+
+ /**
+ * Creates an unconnected client that will lazily attempt to connect on the first call to
+ * {@link #get}.
+ *
+ * @param sessionTimeout the ZK session timeout
+ * @param zooKeeperServers the set of servers forming the ZK cluster
+ */
+ public ZooKeeperClient(Amount<Integer, Time> sessionTimeout,
+ Iterable<InetSocketAddress> zooKeeperServers) {
+ this(sessionTimeout, Optional.absent(), Optional.absent(), zooKeeperServers);
+ }
+
+ /**
+ * Creates an unconnected client that will lazily attempt to connect on the first call to
+ * {@link #get()}. All successful connections will be authenticated with the given
+ * {@code credentials}.
+ *
+ * @param sessionTimeout the ZK session timeout
+ * @param credentials the credentials to authenticate with
+ * @param zooKeeperServer the first, required ZK server
+ * @param zooKeeperServers any additional servers forming the ZK cluster
+ */
+ public ZooKeeperClient(Amount<Integer, Time> sessionTimeout, Credentials credentials,
+ InetSocketAddress zooKeeperServer, InetSocketAddress... zooKeeperServers) {
+ this(sessionTimeout,
+ Optional.of(credentials),
+ Optional.absent(),
+ combine(zooKeeperServer, zooKeeperServers));
+ }
+
+ /**
+ * Creates an unconnected client that will lazily attempt to connect on the first call to
+ * {@link #get}. All successful connections will be authenticated with the given
+ * {@code credentials}.
+ *
+ * @param sessionTimeout the ZK session timeout
+ * @param credentials the credentials to authenticate with
+ * @param zooKeeperServers the set of servers forming the ZK cluster
+ */
+ public ZooKeeperClient(Amount<Integer, Time> sessionTimeout, Credentials credentials,
+ Iterable<InetSocketAddress> zooKeeperServers) {
+ this(sessionTimeout,
+ Optional.of(credentials),
+ Optional.absent(),
+ zooKeeperServers);
+ }
+
+ /**
+ * Creates an unconnected client that will lazily attempt to connect on the first call to
+ * {@link #get}. All successful connections will be authenticated with the given
+ * {@code credentials}.
+ *
+ * @param sessionTimeout the ZK session timeout
+ * @param credentials the credentials to authenticate with
+ * @param chrootPath an optional chroot path
+ * @param zooKeeperServers the set of servers forming the ZK cluster
+ */
+ public ZooKeeperClient(Amount<Integer, Time> sessionTimeout, Optional<Credentials> credentials,
+ Optional<String> chrootPath, Iterable<InetSocketAddress> zooKeeperServers) {
+ this.sessionTimeoutMs = Preconditions.checkNotNull(sessionTimeout).as(Time.MILLISECONDS);
+ this.credentials = Preconditions.checkNotNull(credentials);
+
+ if (chrootPath.isPresent()) {
+ PathUtils.validatePath(chrootPath.get());
+ }
+
+ Preconditions.checkNotNull(zooKeeperServers);
+ Preconditions.checkArgument(!Iterables.isEmpty(zooKeeperServers),
+ "Must present at least 1 ZK server");
+
+ Thread watcherProcessor = new Thread("ZookeeperClient-watcherProcessor") {
+ @Override
+ public void run() {
+ while (true) {
+ try {
+ WatchedEvent event = eventQueue.take();
+ for (Watcher watcher : watchers) {
+ watcher.process(event);
+ }
+ } catch (InterruptedException e) { /* ignore */ }
+ }
+ }
+ };
+ watcherProcessor.setDaemon(true);
+ watcherProcessor.start();
+
+ Iterable<String> servers =
+ Iterables.transform(ImmutableSet.copyOf(zooKeeperServers),
+ InetSocketAddressHelper::toString);
+ this.zooKeeperServers = Joiner.on(',').join(servers).concat(chrootPath.or(""));
+ }
+
+ /**
+ * Returns the current active ZK connection or establishes a new one if none has yet been
+ * established or a previous connection was disconnected or had its session time out. This method
+ * will attempt to re-use sessions when possible. Equivalent to:
+ * <pre>get(Amount.of(0L, ...)</pre>.
+ *
+ * @return a connected ZooKeeper client
+ * @throws ZooKeeperConnectionException if there was a problem connecting to the ZK cluster
+ * @throws InterruptedException if interrupted while waiting for a connection to be established
+ */
+ public synchronized ZooKeeper get() throws ZooKeeperConnectionException, InterruptedException {
+ try {
+ return get(WAIT_FOREVER);
+ } catch (TimeoutException e) {
+ InterruptedException interruptedException =
+ new InterruptedException("Got an unexpected TimeoutException for 0 wait");
+ interruptedException.initCause(e);
+ throw interruptedException;
+ }
+ }
+
+ /**
+ * Returns the current active ZK connection or establishes a new one if none has yet been
+ * established or a previous connection was disconnected or had its session time out. This
+ * method will attempt to re-use sessions when possible.
+ *
+ * @param connectionTimeout the maximum amount of time to wait for the connection to the ZK
+ * cluster to be established; 0 to wait forever
+ * @return a connected ZooKeeper client
+ * @throws ZooKeeperConnectionException if there was a problem connecting to the ZK cluster
+ * @throws InterruptedException if interrupted while waiting for a connection to be established
+ * @throws TimeoutException if a connection could not be established within the configured
+ * session timeout
+ */
+ public synchronized ZooKeeper get(Amount<Long, Time> connectionTimeout)
+ throws ZooKeeperConnectionException, InterruptedException, TimeoutException {
+
+ if (zooKeeper == null) {
+ final CountDownLatch connected = new CountDownLatch(1);
+ Watcher watcher = event -> {
+ switch (event.getType()) {
+ // Guard the None type since this watch may be used as the default watch on calls by
+ // the client outside our control.
+ case None:
+ switch (event.getState()) {
+ case Expired:
+ LOG.info("Zookeeper session expired. Event: " + event);
+ close();
+ break;
+ case SyncConnected:
+ connected.countDown();
+ break;
+ }
+ }
+
+ eventQueue.offer(event);
+ };
+
+ try {
+ zooKeeper = (sessionState != null)
+ ? new ZooKeeper(zooKeeperServers, sessionTimeoutMs, watcher, sessionState.sessionId,
+ sessionState.sessionPasswd)
+ : new ZooKeeper(zooKeeperServers, sessionTimeoutMs, watcher);
+ } catch (IOException e) {
+ throw new ZooKeeperConnectionException(
+ "Problem connecting to servers: " + zooKeeperServers, e);
+ }
+
+ if (connectionTimeout.getValue() > 0) {
+ if (!connected.await(connectionTimeout.as(Time.MILLISECONDS), TimeUnit.MILLISECONDS)) {
+ close();
+ throw new TimeoutException("Timed out waiting for a ZK connection after "
+ + connectionTimeout);
+ }
+ } else {
+ try {
+ connected.await();
+ } catch (InterruptedException ex) {
+ LOG.info("Interrupted while waiting to connect to zooKeeper");
+ close();
+ throw ex;
+ }
+ }
+ if (credentials.isPresent()) {
+ Credentials zkCredentials = credentials.get();
+ zooKeeper.addAuthInfo(zkCredentials.scheme(), zkCredentials.authToken());
+ }
+
+ sessionState = new SessionState(zooKeeper.getSessionId(), zooKeeper.getSessionPasswd());
+ }
+ return zooKeeper;
+ }
+
+ /**
+ * Clients that need to re-establish state after session expiration can register an
+ * {@code onExpired} command to execute.
+ *
+ * @param onExpired the {@code Command} to register
+ * @return the new {@link Watcher} which can later be passed to {@link #unregister} for
+ * removal.
+ */
+ public Watcher registerExpirationHandler(final Command onExpired) {
+ Watcher watcher = event -> {
+ if (event.getType() == EventType.None && event.getState() == KeeperState.Expired) {
+ onExpired.execute();
+ }
+ };
+ register(watcher);
+ return watcher;
+ }
+
+ /**
+ * Clients that need to register a top-level {@code Watcher} should do so using this method. The
+ * registered {@code watcher} will remain registered across re-connects and session expiration
+ * events.
+ *
+ * @param watcher the {@code Watcher to register}
+ */
+ public void register(Watcher watcher) {
+ watchers.add(watcher);
+ }
+
+ /**
+ * Clients can attempt to unregister a top-level {@code Watcher} that has previously been
+ * registered.
+ *
+ * @param watcher the {@code Watcher} to unregister as a top-level, persistent watch
+ * @return whether the given {@code Watcher} was found and removed from the active set
+ */
+ public boolean unregister(Watcher watcher) {
+ return watchers.remove(watcher);
+ }
+
+ /**
+ * Checks to see if the client might reasonably re-try an operation given the exception thrown
+ * while attempting it. If the ZooKeeper session should be expired to enable the re-try to
+ * succeed this method will expire it as a side-effect.
+ *
+ * @param e the exception to test
+ * @return true if a retry can be attempted
+ */
+ public boolean shouldRetry(KeeperException e) {
+ if (e instanceof SessionExpiredException) {
+ close();
+ }
+ return ZooKeeperUtils.isRetryable(e);
+ }
+
+ /**
+ * Closes the current connection if any expiring the current ZooKeeper session. Any subsequent
+ * calls to this method will no-op until the next successful {@link #get}.
+ */
+ public synchronized void close() {
+ if (zooKeeper != null) {
+ try {
+ zooKeeper.close();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ LOG.warn("Interrupted trying to close zooKeeper");
+ } finally {
+ zooKeeper = null;
+ sessionState = null;
+ }
+ }
+ }
+
+ @VisibleForTesting
+ synchronized boolean isClosed() {
+ return zooKeeper == null;
+ }
+
+ @VisibleForTesting
+ ZooKeeper getZooKeeperClientForTests() {
+ return zooKeeper;
+ }
+}
http://git-wip-us.apache.org/repos/asf/aurora/blob/16e4651d/commons/src/main/java/org/apache/aurora/common/zookeeper/ZooKeeperUtils.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/org/apache/aurora/common/zookeeper/ZooKeeperUtils.java b/commons/src/main/java/org/apache/aurora/common/zookeeper/ZooKeeperUtils.java
new file mode 100644
index 0000000..2ada264
--- /dev/null
+++ b/commons/src/main/java/org/apache/aurora/common/zookeeper/ZooKeeperUtils.java
@@ -0,0 +1,167 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.common.zookeeper;
+
+import java.util.List;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+
+import org.apache.aurora.common.quantity.Amount;
+import org.apache.aurora.common.quantity.Time;
+import org.apache.aurora.common.zookeeper.ZooKeeperClient.ZooKeeperConnectionException;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.common.PathUtils;
+import org.apache.zookeeper.data.ACL;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Utilities for dealing with zoo keeper.
+ */
+public final class ZooKeeperUtils {
+
+ private static final Logger LOG = LoggerFactory.getLogger(ZooKeeperUtils.class);
+
+ /**
+ * An appropriate default session timeout for Twitter ZooKeeper clusters.
+ */
+ public static final Amount<Integer,Time> DEFAULT_ZK_SESSION_TIMEOUT = Amount.of(4, Time.SECONDS);
+
+ /**
+ * The magic version number that allows any mutation to always succeed regardless of actual
+ * version number.
+ */
+ public static final int ANY_VERSION = -1;
+
+ /**
+ * An ACL that gives all permissions any user authenticated or not.
+ */
+ public static final ImmutableList<ACL> OPEN_ACL_UNSAFE =
+ ImmutableList.copyOf(Ids.OPEN_ACL_UNSAFE);
+
+ /**
+ * An ACL that gives all permissions to node creators and read permissions only to everyone else.
+ */
+ public static final ImmutableList<ACL> EVERYONE_READ_CREATOR_ALL =
+ ImmutableList.<ACL>builder()
+ .addAll(Ids.CREATOR_ALL_ACL)
+ .addAll(Ids.READ_ACL_UNSAFE)
+ .build();
+
+ /**
+ * Returns true if the given exception indicates an error that can be resolved by retrying the
+ * operation without modification.
+ *
+ * @param e the exception to check
+ * @return true if the causing operation is strictly retryable
+ */
+ public static boolean isRetryable(KeeperException e) {
+ Preconditions.checkNotNull(e);
+
+ switch (e.code()) {
+ case CONNECTIONLOSS:
+ case SESSIONEXPIRED:
+ case SESSIONMOVED:
+ case OPERATIONTIMEOUT:
+ return true;
+
+ case RUNTIMEINCONSISTENCY:
+ case DATAINCONSISTENCY:
+ case MARSHALLINGERROR:
+ case BADARGUMENTS:
+ case NONODE:
+ case NOAUTH:
+ case BADVERSION:
+ case NOCHILDRENFOREPHEMERALS:
+ case NODEEXISTS:
+ case NOTEMPTY:
+ case INVALIDCALLBACK:
+ case INVALIDACL:
+ case AUTHFAILED:
+ case UNIMPLEMENTED:
+
+ // These two should not be encountered - they are used internally by ZK to specify ranges
+ case SYSTEMERROR:
+ case APIERROR:
+
+ case OK: // This is actually an invalid ZK exception code
+
+ default:
+ return false;
+ }
+ }
+
+ /**
+ * Ensures the given {@code path} exists in the ZK cluster accessed by {@code zkClient}. If the
+ * path already exists, nothing is done; however if any portion of the path is missing, it will be
+ * created with the given {@code acl} as a persistent zookeeper node. The given {@code path} must
+ * be a valid zookeeper absolute path.
+ *
+ * @param zkClient the client to use to access the ZK cluster
+ * @param acl the acl to use if creating path nodes
+ * @param path the path to ensure exists
+ * @throws ZooKeeperConnectionException if there was a problem accessing the ZK cluster
+ * @throws InterruptedException if we were interrupted attempting to connect to the ZK cluster
+ * @throws KeeperException if there was a problem in ZK
+ */
+ public static void ensurePath(ZooKeeperClient zkClient, List<ACL> acl, String path)
+ throws ZooKeeperConnectionException, InterruptedException, KeeperException {
+ Preconditions.checkNotNull(zkClient);
+ Preconditions.checkNotNull(path);
+ Preconditions.checkArgument(path.startsWith("/"));
+
+ ensurePathInternal(zkClient, acl, path);
+ }
+
+ private static void ensurePathInternal(ZooKeeperClient zkClient, List<ACL> acl, String path)
+ throws ZooKeeperConnectionException, InterruptedException, KeeperException {
+ if (zkClient.get().exists(path, false) == null) {
+ // The current path does not exist; so back up a level and ensure the parent path exists
+ // unless we're already a root-level path.
+ int lastPathIndex = path.lastIndexOf('/');
+ if (lastPathIndex > 0) {
+ ensurePathInternal(zkClient, acl, path.substring(0, lastPathIndex));
+ }
+
+ // We've ensured our parent path (if any) exists so we can proceed to create our path.
+ try {
+ zkClient.get().create(path, null, acl, CreateMode.PERSISTENT);
+ } catch (KeeperException.NodeExistsException e) {
+ // This ensures we don't die if a race condition was met between checking existence and
+ // trying to create the node.
+ LOG.info("Node existed when trying to ensure path " + path + ", somebody beat us to it?");
+ }
+ }
+ }
+
+ /**
+ * Validate and return a normalized zookeeper path which doesn't contain consecutive slashes and
+ * never ends with a slash (except for root path).
+ *
+ * @param path the path to be normalized
+ * @return normalized path string
+ */
+ public static String normalizePath(String path) {
+ String normalizedPath = path.replaceAll("//+", "/").replaceFirst("(.+)/$", "$1");
+ PathUtils.validatePath(normalizedPath);
+ return normalizedPath;
+ }
+
+ private ZooKeeperUtils() {
+ // utility
+ }
+}
http://git-wip-us.apache.org/repos/asf/aurora/blob/16e4651d/commons/src/main/java/org/apache/aurora/common/zookeeper/testing/BaseZooKeeperClientTest.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/org/apache/aurora/common/zookeeper/testing/BaseZooKeeperClientTest.java b/commons/src/main/java/org/apache/aurora/common/zookeeper/testing/BaseZooKeeperClientTest.java
new file mode 100644
index 0000000..ba09279
--- /dev/null
+++ b/commons/src/main/java/org/apache/aurora/common/zookeeper/testing/BaseZooKeeperClientTest.java
@@ -0,0 +1,140 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.common.zookeeper.testing;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+
+import org.apache.aurora.common.quantity.Amount;
+import org.apache.aurora.common.quantity.Time;
+import org.apache.aurora.common.zookeeper.Credentials;
+import org.apache.aurora.common.zookeeper.ZooKeeperClient;
+
+/**
+ * A base-class for tests that interact with ZooKeeper via the commons ZooKeeperClient.
+ */
+public abstract class BaseZooKeeperClientTest extends BaseZooKeeperTest {
+
+ private final Amount<Integer, Time> defaultSessionTimeout;
+
+ /**
+ * Creates a test case where the test server uses its
+ * {@link ZooKeeperTestServer#DEFAULT_SESSION_TIMEOUT} for clients created without an explicit
+ * session timeout.
+ */
+ public BaseZooKeeperClientTest() {
+ this(ZooKeeperTestServer.DEFAULT_SESSION_TIMEOUT);
+ }
+
+ /**
+ * Creates a test case where the test server uses the given {@code defaultSessionTimeout} for
+ * clients created without an explicit session timeout.
+ */
+ public BaseZooKeeperClientTest(Amount<Integer, Time> defaultSessionTimeout) {
+ this.defaultSessionTimeout = Preconditions.checkNotNull(defaultSessionTimeout);
+ }
+
+
+ /**
+ * Starts zookeeper back up on the last used port.
+ */
+ protected final void restartNetwork() throws IOException, InterruptedException {
+ getServer().restartNetwork();
+ }
+
+ /**
+ * Shuts down the in-process zookeeper network server.
+ */
+ protected final void shutdownNetwork() {
+ getServer().shutdownNetwork();
+ }
+
+ /**
+ * Expires the active session for the given client. The client should be one returned from
+ * {@link #createZkClient}.
+ *
+ * @param zkClient the client to expire
+ * @throws ZooKeeperClient.ZooKeeperConnectionException if a problem is encountered connecting to
+ * the local zk server while trying to expire the session
+ * @throws InterruptedException if interrupted while requesting expiration
+ */
+ protected final void expireSession(ZooKeeperClient zkClient)
+ throws ZooKeeperClient.ZooKeeperConnectionException, InterruptedException {
+ getServer().expireClientSession(zkClient.get().getSessionId());
+ }
+
+ /**
+ * Returns the current port to connect to the in-process zookeeper instance.
+ */
+ protected final int getPort() {
+ return getServer().getPort();
+ }
+
+ /**
+ * Returns a new unauthenticated zookeeper client connected to the in-process zookeeper server
+ * with the default session timeout.
+ */
+ protected final ZooKeeperClient createZkClient() {
+ return createZkClient(defaultSessionTimeout, Optional.absent(), Optional.absent());
+ }
+
+ /**
+ * Returns a new authenticated zookeeper client connected to the in-process zookeeper server with
+ * the default session timeout.
+ */
+ protected final ZooKeeperClient createZkClient(Credentials credentials) {
+ return createZkClient(defaultSessionTimeout, Optional.of(credentials), Optional.absent());
+ }
+
+ /**
+ * Returns a new authenticated zookeeper client connected to the in-process zookeeper server with
+ * the default session timeout. The client is authenticated in the digest authentication scheme
+ * with the given {@code username} and {@code password}.
+ */
+ protected final ZooKeeperClient createZkClient(String username, String password) {
+ return createZkClient(Credentials.digestCredentials(username, password));
+ }
+
+ /**
+ * Returns a new unauthenticated zookeeper client connected to the in-process zookeeper server
+ * with a custom {@code sessionTimeout}.
+ */
+ protected final ZooKeeperClient createZkClient(Amount<Integer, Time> sessionTimeout) {
+ return createZkClient(sessionTimeout, Optional.absent(), Optional.absent());
+ }
+
+ /**
+ * Returns a new authenticated zookeeper client connected to the in-process zookeeper server with
+ * the default session timeout and the custom chroot path.
+ */
+ protected final ZooKeeperClient createZkClient(String chrootPath) {
+ return createZkClient(defaultSessionTimeout, Optional.absent(),
+ Optional.of(chrootPath));
+ }
+
+ private ZooKeeperClient createZkClient(
+ Amount<Integer, Time> sessionTimeout,
+ Optional<Credentials> credentials,
+ Optional<String> chrootPath) {
+
+ ZooKeeperClient client = new ZooKeeperClient(sessionTimeout, credentials, chrootPath,
+ ImmutableList.of(InetSocketAddress.createUnresolved("127.0.0.1", getPort())));
+ addTearDown(client::close);
+ return client;
+ }
+}
http://git-wip-us.apache.org/repos/asf/aurora/blob/16e4651d/commons/src/main/java/org/apache/aurora/common/zookeeper/testing/BaseZooKeeperTest.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/org/apache/aurora/common/zookeeper/testing/BaseZooKeeperTest.java b/commons/src/main/java/org/apache/aurora/common/zookeeper/testing/BaseZooKeeperTest.java
new file mode 100644
index 0000000..0e68987
--- /dev/null
+++ b/commons/src/main/java/org/apache/aurora/common/zookeeper/testing/BaseZooKeeperTest.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.common.zookeeper.testing;
+
+import org.apache.aurora.common.testing.TearDownTestCase;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.rules.TemporaryFolder;
+
+/**
+ * A base-class for in-process zookeeper tests.
+ */
+public abstract class BaseZooKeeperTest extends TearDownTestCase {
+
+ private ZooKeeperTestServer zkTestServer;
+
+ @Rule
+ public TemporaryFolder tmpFolder = new TemporaryFolder();
+
+ @Before
+ public final void setUp() throws Exception {
+ zkTestServer = new ZooKeeperTestServer(tmpFolder.newFolder(), tmpFolder.newFolder());
+ addTearDown(zkTestServer::stop);
+ zkTestServer.startNetwork();
+ }
+
+ /**
+ * Returns the running in-process ZooKeeper server.
+ *
+ * @return The in-process ZooKeeper server.
+ */
+ protected final ZooKeeperTestServer getServer() {
+ return zkTestServer;
+ }
+}
http://git-wip-us.apache.org/repos/asf/aurora/blob/16e4651d/commons/src/main/java/org/apache/aurora/common/zookeeper/testing/ZooKeeperTestServer.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/org/apache/aurora/common/zookeeper/testing/ZooKeeperTestServer.java b/commons/src/main/java/org/apache/aurora/common/zookeeper/testing/ZooKeeperTestServer.java
new file mode 100644
index 0000000..50acaeb
--- /dev/null
+++ b/commons/src/main/java/org/apache/aurora/common/zookeeper/testing/ZooKeeperTestServer.java
@@ -0,0 +1,121 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.common.zookeeper.testing;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+
+import com.google.common.base.Preconditions;
+
+import org.apache.aurora.common.quantity.Amount;
+import org.apache.aurora.common.quantity.Time;
+import org.apache.zookeeper.server.NIOServerCnxnFactory;
+import org.apache.zookeeper.server.ServerCnxnFactory;
+import org.apache.zookeeper.server.ZooKeeperServer;
+import org.apache.zookeeper.server.ZooKeeperServer.BasicDataTreeBuilder;
+import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
+
+/**
+ * A helper class for starting in-process ZooKeeper server and clients.
+ *
+ * <p>This is ONLY meant to be used for testing.
+ */
+public class ZooKeeperTestServer {
+
+ static final Amount<Integer, Time> DEFAULT_SESSION_TIMEOUT = Amount.of(100, Time.MILLISECONDS);
+
+ private final File dataDir;
+ private final File snapDir;
+
+ private ZooKeeperServer zooKeeperServer;
+ private ServerCnxnFactory connectionFactory;
+ private int port;
+
+ public ZooKeeperTestServer(File dataDir, File snapDir) {
+ this.dataDir = Preconditions.checkNotNull(dataDir);
+ this.snapDir = Preconditions.checkNotNull(snapDir);
+ }
+
+ /**
+ * Starts zookeeper up on an ephemeral port.
+ */
+ public void startNetwork() throws IOException, InterruptedException {
+ zooKeeperServer =
+ new ZooKeeperServer(
+ new FileTxnSnapLog(dataDir, snapDir),
+ new BasicDataTreeBuilder()) {
+
+ // TODO(John Sirois): Introduce a builder to configure the in-process server if and when
+ // some folks need JMX for in-process tests.
+ @Override protected void registerJMX() {
+ // noop
+ }
+ };
+
+ connectionFactory = new NIOServerCnxnFactory();
+ connectionFactory.configure(
+ new InetSocketAddress(port),
+ 60 /* Semi-arbitrary, max 60 connections is the default used by NIOServerCnxnFactory */);
+ connectionFactory.startup(zooKeeperServer);
+ port = zooKeeperServer.getClientPort();
+ }
+
+ /**
+ * Stops the zookeeper server.
+ */
+ public void stop() {
+ shutdownNetwork();
+ }
+
+ /**
+ * Starts zookeeper back up on the last used port.
+ */
+ final void restartNetwork() throws IOException, InterruptedException {
+ checkEphemeralPortAssigned();
+ Preconditions.checkState(connectionFactory == null);
+ startNetwork();
+ }
+
+ /**
+ * Shuts down the in-process zookeeper network server.
+ */
+ final void shutdownNetwork() {
+ if (connectionFactory != null) {
+ connectionFactory.shutdown(); // Also shuts down zooKeeperServer.
+ connectionFactory = null;
+ }
+ }
+
+ /**
+ * Expires the client session with the given {@code sessionId}.
+ *
+ * @param sessionId The id of the client session to expire.
+ */
+ public final void expireClientSession(long sessionId) {
+ zooKeeperServer.closeSession(sessionId);
+ }
+
+ /**
+ * Returns the current port to connect to the in-process zookeeper instance.
+ */
+ public final int getPort() {
+ checkEphemeralPortAssigned();
+ return port;
+ }
+
+ private void checkEphemeralPortAssigned() {
+ Preconditions.checkState(port > 0, "startNetwork must be called first");
+ }
+}
http://git-wip-us.apache.org/repos/asf/aurora/blob/16e4651d/commons/src/test/java/org/apache/aurora/common/zookeeper/CandidateImplTest.java
----------------------------------------------------------------------
diff --git a/commons/src/test/java/org/apache/aurora/common/zookeeper/CandidateImplTest.java b/commons/src/test/java/org/apache/aurora/common/zookeeper/CandidateImplTest.java
new file mode 100644
index 0000000..9c0cebe
--- /dev/null
+++ b/commons/src/test/java/org/apache/aurora/common/zookeeper/CandidateImplTest.java
@@ -0,0 +1,165 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.common.zookeeper;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.LinkedBlockingDeque;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Supplier;
+
+import org.apache.aurora.common.base.ExceptionalCommand;
+import org.apache.aurora.common.quantity.Amount;
+import org.apache.aurora.common.quantity.Time;
+import org.apache.aurora.common.zookeeper.testing.BaseZooKeeperClientTest;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.data.ACL;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
+
+public class CandidateImplTest extends BaseZooKeeperClientTest {
+ private static final List<ACL> ACL = ZooDefs.Ids.OPEN_ACL_UNSAFE;
+ private static final String SERVICE = "/twitter/services/puffin_linkhose/leader";
+ private static final Amount<Integer, Time> TIMEOUT = Amount.of(1, Time.MINUTES);
+
+ private LinkedBlockingDeque<CandidateImpl> candidateBuffer;
+
+ @Before
+ public void mySetUp() throws IOException {
+ candidateBuffer = new LinkedBlockingDeque<>();
+ }
+
+ private Group createGroup(ZooKeeperClient zkClient) throws IOException {
+ return new Group(zkClient, ACL, SERVICE);
+ }
+
+ private class Reign implements Candidate.Leader {
+ private ExceptionalCommand<Group.JoinException> abdicate;
+ private final CandidateImpl candidate;
+ private final String id;
+ private CountDownLatch defeated = new CountDownLatch(1);
+
+ Reign(String id, CandidateImpl candidate) {
+ this.id = id;
+ this.candidate = candidate;
+ }
+
+ @Override
+ public void onElected(ExceptionalCommand<Group.JoinException> abdicate) {
+ candidateBuffer.offerFirst(candidate);
+ this.abdicate = abdicate;
+ }
+
+ @Override
+ public void onDefeated() {
+ defeated.countDown();
+ }
+
+ public void abdicate() throws Group.JoinException {
+ Preconditions.checkState(abdicate != null);
+ abdicate.execute();
+ }
+
+ public void expectDefeated() throws InterruptedException {
+ defeated.await();
+ }
+
+ @Override
+ public String toString() {
+ return id;
+ }
+ }
+
+ @Test
+ public void testOfferLeadership() throws Exception {
+ ZooKeeperClient zkClient1 = createZkClient(TIMEOUT);
+ final CandidateImpl candidate1 = new CandidateImpl(createGroup(zkClient1)) {
+ @Override public String toString() {
+ return "Leader1";
+ }
+ };
+ ZooKeeperClient zkClient2 = createZkClient(TIMEOUT);
+ final CandidateImpl candidate2 = new CandidateImpl(createGroup(zkClient2)) {
+ @Override public String toString() {
+ return "Leader2";
+ }
+ };
+ ZooKeeperClient zkClient3 = createZkClient(TIMEOUT);
+ final CandidateImpl candidate3 = new CandidateImpl(createGroup(zkClient3)) {
+ @Override public String toString() {
+ return "Leader3";
+ }
+ };
+
+ Reign candidate1Reign = new Reign("1", candidate1);
+ Reign candidate2Reign = new Reign("2", candidate2);
+ Reign candidate3Reign = new Reign("3", candidate3);
+
+ Supplier<Boolean> candidate1Leader = candidate1.offerLeadership(candidate1Reign);
+ Supplier<Boolean> candidate2Leader = candidate2.offerLeadership(candidate2Reign);
+ Supplier<Boolean> candidate3Leader = candidate3.offerLeadership(candidate3Reign);
+
+ assertTrue("Since initial group join is synchronous, candidate 1 should be the first leader",
+ candidate1Leader.get());
+
+ shutdownNetwork();
+ restartNetwork();
+
+ assertTrue("A re-connect without a session expiration should leave the leader elected",
+ candidate1Leader.get());
+
+ candidate1Reign.abdicate();
+ assertSame(candidate1, candidateBuffer.takeLast());
+ assertFalse(candidate1Leader.get());
+ // Active abdication should trigger defeat.
+ candidate1Reign.expectDefeated();
+
+ CandidateImpl secondCandidate = candidateBuffer.takeLast();
+ assertTrue("exactly 1 remaining candidate should now be leader: " + secondCandidate + " "
+ + candidateBuffer,
+ candidate2Leader.get() ^ candidate3Leader.get());
+
+ if (secondCandidate == candidate2) {
+ expireSession(zkClient2);
+ assertSame(candidate3, candidateBuffer.takeLast());
+ assertTrue(candidate3Leader.get());
+ // Passive expiration should trigger defeat.
+ candidate2Reign.expectDefeated();
+ } else {
+ expireSession(zkClient3);
+ assertSame(candidate2, candidateBuffer.takeLast());
+ assertTrue(candidate2Leader.get());
+ // Passive expiration should trigger defeat.
+ candidate3Reign.expectDefeated();
+ }
+ }
+
+ @Test
+ public void testEmptyMembership() throws Exception {
+ ZooKeeperClient zkClient1 = createZkClient(TIMEOUT);
+ final CandidateImpl candidate1 = new CandidateImpl(createGroup(zkClient1));
+ Reign candidate1Reign = new Reign("1", candidate1);
+
+ candidate1.offerLeadership(candidate1Reign);
+ assertSame(candidate1, candidateBuffer.takeLast());
+ candidate1Reign.abdicate();
+ assertFalse(candidate1.getLeaderData().isPresent());
+ }
+}
http://git-wip-us.apache.org/repos/asf/aurora/blob/16e4651d/commons/src/test/java/org/apache/aurora/common/zookeeper/GroupTest.java
----------------------------------------------------------------------
diff --git a/commons/src/test/java/org/apache/aurora/common/zookeeper/GroupTest.java b/commons/src/test/java/org/apache/aurora/common/zookeeper/GroupTest.java
new file mode 100644
index 0000000..97a42d1
--- /dev/null
+++ b/commons/src/test/java/org/apache/aurora/common/zookeeper/GroupTest.java
@@ -0,0 +1,321 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.common.zookeeper;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import com.google.common.base.Supplier;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterables;
+
+import org.apache.aurora.common.base.Command;
+import org.apache.aurora.common.quantity.Amount;
+import org.apache.aurora.common.quantity.Time;
+import org.apache.aurora.common.testing.easymock.EasyMockTest;
+import org.apache.aurora.common.zookeeper.Group.GroupChangeListener;
+import org.apache.aurora.common.zookeeper.Group.JoinException;
+import org.apache.aurora.common.zookeeper.Group.Membership;
+import org.apache.aurora.common.zookeeper.Group.NodeScheme;
+import org.apache.aurora.common.zookeeper.testing.BaseZooKeeperClientTest;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.easymock.EasyMock.createMock;
+import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.replay;
+import static org.easymock.EasyMock.reset;
+import static org.easymock.EasyMock.verify;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.fail;
+
+public class GroupTest extends BaseZooKeeperClientTest {
+
+ private ZooKeeperClient zkClient;
+ private Group joinGroup;
+ private Group watchGroup;
+ private Command stopWatching;
+ private Command onLoseMembership;
+
+ private RecordingListener listener;
+
+ public GroupTest() {
+ super(Amount.of(1, Time.DAYS));
+ }
+
+ @Before
+ public void mySetUp() throws Exception {
+ onLoseMembership = createMock(Command.class);
+
+ zkClient = createZkClient("group", "test");
+ joinGroup = new Group(zkClient, ZooKeeperUtils.EVERYONE_READ_CREATOR_ALL, "/a/group");
+ watchGroup = new Group(zkClient, ZooKeeperUtils.EVERYONE_READ_CREATOR_ALL, "/a/group");
+
+ listener = new RecordingListener();
+ stopWatching = watchGroup.watch(listener);
+ }
+
+ private static class RecordingListener implements GroupChangeListener {
+ private final LinkedBlockingQueue<Iterable<String>> membershipChanges =
+ new LinkedBlockingQueue<Iterable<String>>();
+
+ @Override
+ public void onGroupChange(Iterable<String> memberIds) {
+ membershipChanges.add(memberIds);
+ }
+
+ public Iterable<String> take() throws InterruptedException {
+ return membershipChanges.take();
+ }
+
+ public void assertEmpty() {
+ assertEquals(ImmutableList.<Iterable<String>>of(), ImmutableList.copyOf(membershipChanges));
+ }
+
+ @Override
+ public String toString() {
+ return membershipChanges.toString();
+ }
+ }
+
+ private static class CustomScheme implements NodeScheme {
+ static final String NODE_NAME = "custom_name";
+
+ @Override
+ public boolean isMember(String nodeName) {
+ return NODE_NAME.equals(nodeName);
+ }
+
+ @Override
+ public String createName(byte[] membershipData) {
+ return NODE_NAME;
+ }
+
+ @Override
+ public boolean isSequential() {
+ return false;
+ }
+ }
+
+ @Test
+ public void testSessionExpirationTriggersOnLoseMembership() throws Exception {
+ final CountDownLatch lostMembership = new CountDownLatch(1);
+ Command onLoseMembership = lostMembership::countDown;
+ assertEmptyMembershipObserved();
+
+ Membership membership = joinGroup.join(onLoseMembership);
+ assertMembershipObserved(membership.getMemberId());
+ expireSession(zkClient);
+
+ lostMembership.await(); // Will hang this test if onLoseMembership event is not propagated.
+ }
+
+ @Test
+ public void testNodeDeleteTriggersOnLoseMembership() throws Exception {
+ final CountDownLatch lostMembership = new CountDownLatch(1);
+ Command onLoseMembership = lostMembership::countDown;
+ assertEmptyMembershipObserved();
+
+ Membership membership = joinGroup.join(onLoseMembership);
+ assertMembershipObserved(membership.getMemberId());
+ membership.cancel();
+
+ lostMembership.await(); // Will hang this test if onLoseMembership event is not propagated.
+ }
+
+ @Test
+ public void testJoinsAndWatchesSurviveDisconnect() throws Exception {
+ replay(onLoseMembership);
+
+ assertEmptyMembershipObserved();
+
+ Membership membership = joinGroup.join();
+ String originalMemberId = membership.getMemberId();
+ assertMembershipObserved(originalMemberId);
+
+ shutdownNetwork();
+ restartNetwork();
+
+ // The member should still be present under existing ephemeral node since session did not
+ // expire.
+ watchGroup.watch(listener);
+ assertMembershipObserved(originalMemberId);
+
+ membership.cancel();
+
+ assertEmptyMembershipObserved();
+ assertEmptyMembershipObserved(); // and again for 2nd listener
+
+ listener.assertEmpty();
+
+ verify(onLoseMembership);
+ reset(onLoseMembership); // Turn off expectations during ZK server shutdown.
+ }
+
+ @Test
+ public void testJoinsAndWatchesSurviveExpiredSession() throws Exception {
+ onLoseMembership.execute();
+ replay(onLoseMembership);
+
+ assertEmptyMembershipObserved();
+
+ Membership membership = joinGroup.join(onLoseMembership);
+ String originalMemberId = membership.getMemberId();
+ assertMembershipObserved(originalMemberId);
+
+ expireSession(zkClient);
+
+ // We should have lost our group membership and then re-gained it with a new ephemeral node.
+ // We may or may-not see the intermediate state change but we must see the final state
+ Iterable<String> members = listener.take();
+ if (Iterables.isEmpty(members)) {
+ members = listener.take();
+ }
+ assertEquals(1, Iterables.size(members));
+ assertNotEquals(originalMemberId, Iterables.getOnlyElement(members));
+ assertNotEquals(originalMemberId, membership.getMemberId());
+
+ listener.assertEmpty();
+
+ verify(onLoseMembership);
+ reset(onLoseMembership); // Turn off expectations during ZK server shutdown.
+ }
+
+ @Test
+ public void testJoinCustomNamingScheme() throws Exception {
+ Group group = new Group(zkClient, ZooKeeperUtils.EVERYONE_READ_CREATOR_ALL, "/a/group",
+ new CustomScheme());
+
+ listener = new RecordingListener();
+ group.watch(listener);
+ assertEmptyMembershipObserved();
+
+ Membership membership = group.join();
+ String memberId = membership.getMemberId();
+
+ assertEquals("Wrong member ID.", CustomScheme.NODE_NAME, memberId);
+ assertMembershipObserved(memberId);
+
+ expireSession(zkClient);
+ }
+
+ @Test
+ public void testUpdateMembershipData() throws Exception {
+ Supplier<byte[]> dataSupplier = new EasyMockTest.Clazz<Supplier<byte[]>>() {}.createMock();
+
+ byte[] initial = "start".getBytes();
+ expect(dataSupplier.get()).andReturn(initial);
+
+ byte[] second = "update".getBytes();
+ expect(dataSupplier.get()).andReturn(second);
+
+ replay(dataSupplier);
+
+ Membership membership = joinGroup.join(dataSupplier, onLoseMembership);
+ assertArrayEquals("Initial setting is incorrect.", initial, zkClient.get()
+ .getData(membership.getMemberPath(), false, null));
+
+ assertArrayEquals("Updating supplier should not change membership data",
+ initial, zkClient.get().getData(membership.getMemberPath(), false, null));
+
+ membership.updateMemberData();
+ assertArrayEquals("Updating membership should change data",
+ second, zkClient.get().getData(membership.getMemberPath(), false, null));
+
+ verify(dataSupplier);
+ }
+
+ @Test
+ public void testAcls() throws Exception {
+ Group securedMembership =
+ new Group(createZkClient("secured", "group"), ZooKeeperUtils.EVERYONE_READ_CREATOR_ALL,
+ "/secured/group/membership");
+
+ String memberId = securedMembership.join().getMemberId();
+
+ Group unauthenticatedObserver =
+ new Group(createZkClient(),
+ Ids.READ_ACL_UNSAFE,
+ "/secured/group/membership");
+ RecordingListener unauthenticatedListener = new RecordingListener();
+ unauthenticatedObserver.watch(unauthenticatedListener);
+
+ assertMembershipObserved(unauthenticatedListener, memberId);
+
+ try {
+ unauthenticatedObserver.join();
+ fail("Expected join exception for unauthenticated observer");
+ } catch (JoinException e) {
+ // expected
+ }
+
+ Group unauthorizedObserver =
+ new Group(createZkClient("joe", "schmoe"),
+ Ids.READ_ACL_UNSAFE,
+ "/secured/group/membership");
+ RecordingListener unauthorizedListener = new RecordingListener();
+ unauthorizedObserver.watch(unauthorizedListener);
+
+ assertMembershipObserved(unauthorizedListener, memberId);
+
+ try {
+ unauthorizedObserver.join();
+ fail("Expected join exception for unauthorized observer");
+ } catch (JoinException e) {
+ // expected
+ }
+ }
+
+ @Test
+ public void testStopWatching() throws Exception {
+ replay(onLoseMembership);
+
+ assertEmptyMembershipObserved();
+
+ Membership member1 = joinGroup.join();
+ String memberId1 = member1.getMemberId();
+ assertMembershipObserved(memberId1);
+
+ Membership member2 = joinGroup.join();
+ String memberId2 = member2.getMemberId();
+ assertMembershipObserved(memberId1, memberId2);
+
+ stopWatching.execute();
+
+ member1.cancel();
+ Membership member3 = joinGroup.join();
+ member2.cancel();
+ member3.cancel();
+
+ listener.assertEmpty();
+ }
+
+ private void assertEmptyMembershipObserved() throws InterruptedException {
+ assertMembershipObserved();
+ }
+
+ private void assertMembershipObserved(String... expectedMemberIds) throws InterruptedException {
+ assertMembershipObserved(listener, expectedMemberIds);
+ }
+
+ private void assertMembershipObserved(RecordingListener listener, String... expectedMemberIds)
+ throws InterruptedException {
+
+ assertEquals(ImmutableSet.copyOf(expectedMemberIds), ImmutableSet.copyOf(listener.take()));
+ }
+}
http://git-wip-us.apache.org/repos/asf/aurora/blob/16e4651d/commons/src/test/java/org/apache/aurora/common/zookeeper/JsonCodecTest.java
----------------------------------------------------------------------
diff --git a/commons/src/test/java/org/apache/aurora/common/zookeeper/JsonCodecTest.java b/commons/src/test/java/org/apache/aurora/common/zookeeper/JsonCodecTest.java
new file mode 100644
index 0000000..2166123
--- /dev/null
+++ b/commons/src/test/java/org/apache/aurora/common/zookeeper/JsonCodecTest.java
@@ -0,0 +1,151 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.common.zookeeper;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+
+import com.google.common.base.Charsets;
+import com.google.common.collect.ImmutableMap;
+import com.google.gson.Gson;
+import com.google.gson.JsonIOException;
+
+import org.apache.aurora.common.io.Codec;
+import org.apache.aurora.common.thrift.Endpoint;
+import org.apache.aurora.common.thrift.ServiceInstance;
+import org.apache.aurora.common.thrift.Status;
+import org.easymock.EasyMock;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.api.easymock.PowerMock;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(Gson.class)
+public class JsonCodecTest {
+
+ private static final Codec<ServiceInstance> STANDARD_JSON_CODEC = new JsonCodec();
+
+ @Test
+ public void testJsonCodecRoundtrip() throws Exception {
+ Codec<ServiceInstance> codec = STANDARD_JSON_CODEC;
+ ServiceInstance instance1 = new ServiceInstance(
+ new Endpoint("foo", 1000),
+ ImmutableMap.of("http", new Endpoint("foo", 8080)),
+ Status.ALIVE)
+ .setShard(0);
+ byte[] data = ServerSets.serializeServiceInstance(instance1, codec);
+ assertTrue(ServerSets.deserializeServiceInstance(data, codec).getServiceEndpoint().isSetPort());
+ assertTrue(ServerSets.deserializeServiceInstance(data, codec).isSetShard());
+
+ ServiceInstance instance2 = new ServiceInstance(
+ new Endpoint("foo", 1000),
+ ImmutableMap.of("http-admin1", new Endpoint("foo", 8080)),
+ Status.ALIVE);
+ data = ServerSets.serializeServiceInstance(instance2, codec);
+ assertTrue(ServerSets.deserializeServiceInstance(data, codec).getServiceEndpoint().isSetPort());
+ assertFalse(ServerSets.deserializeServiceInstance(data, codec).isSetShard());
+
+ ServiceInstance instance3 = new ServiceInstance(
+ new Endpoint("foo", 1000),
+ ImmutableMap.<String, Endpoint>of(),
+ Status.ALIVE);
+ data = ServerSets.serializeServiceInstance(instance3, codec);
+ assertTrue(ServerSets.deserializeServiceInstance(data, codec).getServiceEndpoint().isSetPort());
+ assertFalse(ServerSets.deserializeServiceInstance(data, codec).isSetShard());
+ }
+
+ @Test
+ public void testJsonCompatibility() throws IOException {
+ ServiceInstance instance = new ServiceInstance(
+ new Endpoint("foo", 1000),
+ ImmutableMap.of("http", new Endpoint("foo", 8080)),
+ Status.ALIVE).setShard(42);
+
+ ByteArrayOutputStream results = new ByteArrayOutputStream();
+ STANDARD_JSON_CODEC.serialize(instance, results);
+ assertEquals(
+ "{\"serviceEndpoint\":{\"host\":\"foo\",\"port\":1000},"
+ + "\"additionalEndpoints\":{\"http\":{\"host\":\"foo\",\"port\":8080}},"
+ + "\"status\":\"ALIVE\","
+ + "\"shard\":42}",
+ results.toString());
+ }
+
+ @Test
+ public void testInvalidSerialize() {
+ // Gson is final so we need to call on PowerMock here.
+ Gson gson = PowerMock.createMock(Gson.class);
+ gson.toJson(EasyMock.isA(Object.class), EasyMock.isA(Appendable.class));
+ EasyMock.expectLastCall().andThrow(new JsonIOException("error"));
+ PowerMock.replay(gson);
+
+ ServiceInstance instance =
+ new ServiceInstance(new Endpoint("foo", 1000), ImmutableMap.of(), Status.ALIVE);
+
+ try {
+ new JsonCodec(gson).serialize(instance, new ByteArrayOutputStream());
+ fail();
+ } catch (IOException e) {
+ // Expected.
+ }
+
+ PowerMock.verify(gson);
+ }
+
+ @Test
+ public void testDeserializeMinimal() throws IOException {
+ String minimal = "{\"serviceEndpoint\":{\"host\":\"foo\",\"port\":1000},\"status\":\"ALIVE\"}";
+ ByteArrayInputStream source = new ByteArrayInputStream(minimal.getBytes(Charsets.UTF_8));
+ ServiceInstance actual = STANDARD_JSON_CODEC.deserialize(source);
+ ServiceInstance expected =
+ new ServiceInstance(new Endpoint("foo", 1000), ImmutableMap.of(), Status.ALIVE);
+ assertEquals(expected, actual);
+ }
+
+ @Test
+ public void testInvalidDeserialize() {
+ // Not JSON.
+ assertInvalidDeserialize(new byte[] {0xC, 0xA, 0xF, 0xE});
+
+ // No JSON object.
+ assertInvalidDeserialize("");
+ assertInvalidDeserialize("[]");
+
+ // Missing required fields.
+ assertInvalidDeserialize("{}");
+ assertInvalidDeserialize("{\"serviceEndpoint\":{\"host\":\"foo\",\"port\":1000}}");
+ assertInvalidDeserialize("{\"status\":\"ALIVE\"}");
+ }
+
+ private void assertInvalidDeserialize(String data) {
+ assertInvalidDeserialize(data.getBytes(Charsets.UTF_8));
+ }
+
+ private void assertInvalidDeserialize(byte[] data) {
+ try {
+ STANDARD_JSON_CODEC.deserialize(new ByteArrayInputStream(data));
+ fail();
+ } catch (IOException e) {
+ // Expected.
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/aurora/blob/16e4651d/commons/src/test/java/org/apache/aurora/common/zookeeper/ServerSetImplTest.java
----------------------------------------------------------------------
diff --git a/commons/src/test/java/org/apache/aurora/common/zookeeper/ServerSetImplTest.java b/commons/src/test/java/org/apache/aurora/common/zookeeper/ServerSetImplTest.java
new file mode 100644
index 0000000..f0c0cb4
--- /dev/null
+++ b/commons/src/test/java/org/apache/aurora/common/zookeeper/ServerSetImplTest.java
@@ -0,0 +1,258 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.common.zookeeper;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterables;
+
+import org.apache.aurora.common.base.Command;
+import org.apache.aurora.common.net.pool.DynamicHostSet;
+import org.apache.aurora.common.thrift.Endpoint;
+import org.apache.aurora.common.thrift.ServiceInstance;
+import org.apache.aurora.common.thrift.Status;
+import org.apache.aurora.common.zookeeper.Group.JoinException;
+import org.apache.aurora.common.zookeeper.testing.BaseZooKeeperClientTest;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.data.ACL;
+import org.easymock.IMocksControl;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.easymock.EasyMock.anyObject;
+import static org.easymock.EasyMock.createControl;
+import static org.easymock.EasyMock.expect;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ *
+ * TODO(William Farner): Change this to remove thrift dependency.
+ */
+public class ServerSetImplTest extends BaseZooKeeperClientTest {
+ private static final List<ACL> ACL = ZooDefs.Ids.OPEN_ACL_UNSAFE;
+ private static final String SERVICE = "/twitter/services/puffin_hosebird";
+
+ private LinkedBlockingQueue<ImmutableSet<ServiceInstance>> serverSetBuffer;
+ private DynamicHostSet.HostChangeMonitor<ServiceInstance> serverSetMonitor;
+
+ @Before
+ public void mySetUp() throws IOException {
+ serverSetBuffer = new LinkedBlockingQueue<>();
+ serverSetMonitor = serverSetBuffer::offer;
+ }
+
+ private ServerSetImpl createServerSet() throws IOException {
+ return new ServerSetImpl(createZkClient(), ACL, SERVICE);
+ }
+
+ @Test
+ public void testLifecycle() throws Exception {
+ ServerSetImpl client = createServerSet();
+ client.watch(serverSetMonitor);
+ assertChangeFiredEmpty();
+
+ ServerSetImpl server = createServerSet();
+ ServerSet.EndpointStatus status = server.join(
+ InetSocketAddress.createUnresolved("foo", 1234), makePortMap("http-admin", 8080));
+
+ ServiceInstance serviceInstance = new ServiceInstance(
+ new Endpoint("foo", 1234),
+ ImmutableMap.of("http-admin", new Endpoint("foo", 8080)),
+ Status.ALIVE);
+
+ assertChangeFired(serviceInstance);
+
+ status.leave();
+ assertChangeFiredEmpty();
+ assertTrue(serverSetBuffer.isEmpty());
+ }
+
+ @Test
+ public void testMembershipChanges() throws Exception {
+ ServerSetImpl client = createServerSet();
+ client.watch(serverSetMonitor);
+ assertChangeFiredEmpty();
+
+ ServerSetImpl server = createServerSet();
+
+ ServerSet.EndpointStatus foo = join(server, "foo");
+ assertChangeFired("foo");
+
+ expireSession(client.getZkClient());
+
+ ServerSet.EndpointStatus bar = join(server, "bar");
+
+ // We should've auto re-monitored membership, but not been notifed of "foo" since this was not a
+ // change, just "foo", "bar" since this was an addition.
+ assertChangeFired("foo", "bar");
+
+ foo.leave();
+ assertChangeFired("bar");
+
+ ServerSet.EndpointStatus baz = join(server, "baz");
+ assertChangeFired("bar", "baz");
+
+ baz.leave();
+ assertChangeFired("bar");
+
+ bar.leave();
+ assertChangeFiredEmpty();
+
+ assertTrue(serverSetBuffer.isEmpty());
+ }
+
+ @Test
+ public void testStopMonitoring() throws Exception {
+ ServerSetImpl client = createServerSet();
+ Command stopMonitoring = client.watch(serverSetMonitor);
+ assertChangeFiredEmpty();
+
+ ServerSetImpl server = createServerSet();
+
+ ServerSet.EndpointStatus foo = join(server, "foo");
+ assertChangeFired("foo");
+ ServerSet.EndpointStatus bar = join(server, "bar");
+ assertChangeFired("foo", "bar");
+
+ stopMonitoring.execute();
+
+ // No new updates should be received since monitoring has stopped.
+ foo.leave();
+ assertTrue(serverSetBuffer.isEmpty());
+
+ // Expiration event.
+ assertTrue(serverSetBuffer.isEmpty());
+ }
+
+ @Test
+ public void testOrdering() throws Exception {
+ ServerSetImpl client = createServerSet();
+ client.watch(serverSetMonitor);
+ assertChangeFiredEmpty();
+
+ Map<String, InetSocketAddress> server1Ports = makePortMap("http-admin1", 8080);
+ Map<String, InetSocketAddress> server2Ports = makePortMap("http-admin2", 8081);
+ Map<String, InetSocketAddress> server3Ports = makePortMap("http-admin3", 8082);
+
+ ServerSetImpl server1 = createServerSet();
+ ServerSetImpl server2 = createServerSet();
+ ServerSetImpl server3 = createServerSet();
+
+ ServiceInstance instance1 = new ServiceInstance(
+ new Endpoint("foo", 1000),
+ ImmutableMap.of("http-admin1", new Endpoint("foo", 8080)),
+ Status.ALIVE);
+ ServiceInstance instance2 = new ServiceInstance(
+ new Endpoint("foo", 1001),
+ ImmutableMap.of("http-admin2", new Endpoint("foo", 8081)),
+ Status.ALIVE);
+ ServiceInstance instance3 = new ServiceInstance(
+ new Endpoint("foo", 1002),
+ ImmutableMap.of("http-admin3", new Endpoint("foo", 8082)),
+ Status.ALIVE);
+
+ server1.join(InetSocketAddress.createUnresolved("foo", 1000), server1Ports);
+ assertEquals(ImmutableList.of(instance1), ImmutableList.copyOf(serverSetBuffer.take()));
+
+ ServerSet.EndpointStatus status2 = server2.join(
+ InetSocketAddress.createUnresolved("foo", 1001),
+ server2Ports);
+ assertEquals(ImmutableList.of(instance1, instance2),
+ ImmutableList.copyOf(serverSetBuffer.take()));
+
+ server3.join(InetSocketAddress.createUnresolved("foo", 1002), server3Ports);
+ assertEquals(ImmutableList.of(instance1, instance2, instance3),
+ ImmutableList.copyOf(serverSetBuffer.take()));
+
+ status2.leave();
+ assertEquals(ImmutableList.of(instance1, instance3),
+ ImmutableList.copyOf(serverSetBuffer.take()));
+ }
+
+ @Test
+ public void testUnwatchOnException() throws Exception {
+ IMocksControl control = createControl();
+
+ ZooKeeperClient zkClient = control.createMock(ZooKeeperClient.class);
+ Watcher onExpirationWatcher = control.createMock(Watcher.class);
+
+ expect(zkClient.registerExpirationHandler(anyObject(Command.class)))
+ .andReturn(onExpirationWatcher);
+
+ expect(zkClient.get()).andThrow(new InterruptedException()); // See interrupted() note below.
+ expect(zkClient.unregister(onExpirationWatcher)).andReturn(true);
+ control.replay();
+
+ Group group = new Group(zkClient, ZooDefs.Ids.OPEN_ACL_UNSAFE, "/blabla");
+ ServerSetImpl serverset = new ServerSetImpl(zkClient, group);
+
+ try {
+ serverset.watch(hostSet -> {});
+ fail("Expected MonitorException");
+ } catch (DynamicHostSet.MonitorException e) {
+ // NB: The assert is not important to this test, but the call to `Thread.interrupted()` is.
+ // That call both returns the current interrupted status as well as clearing it. The clearing
+ // is crucial depending on the order tests are run in this class. If this test runs before
+ // one of the tests above that uses a `ZooKeeperClient` for example, those tests will fail
+ // executing `ZooKeeperClient.get` which internally blocks on s sync-point that takes part in
+ // the interruption mechanism and so immediately throws `InterruptedException` based on the
+ // un-cleared interrupted bit.
+ assertTrue(Thread.interrupted());
+ }
+ control.verify();
+ }
+
+ private static Map<String, InetSocketAddress> makePortMap(String name, int port) {
+ return ImmutableMap.of(name, InetSocketAddress.createUnresolved("foo", port));
+ }
+
+ private ServerSet.EndpointStatus join(ServerSet serverSet, String host)
+ throws JoinException, InterruptedException {
+
+ return serverSet.join(
+ InetSocketAddress.createUnresolved(host, 42), ImmutableMap.<String, InetSocketAddress>of());
+ }
+
+ private void assertChangeFired(String... serviceHosts)
+ throws InterruptedException {
+
+ assertChangeFired(ImmutableSet.copyOf(Iterables.transform(ImmutableSet.copyOf(serviceHosts),
+ serviceHost -> new ServiceInstance(new Endpoint(serviceHost, 42),
+ ImmutableMap.<String, Endpoint>of(), Status.ALIVE))));
+ }
+
+ protected void assertChangeFiredEmpty() throws InterruptedException {
+ assertChangeFired(ImmutableSet.<ServiceInstance>of());
+ }
+
+ protected void assertChangeFired(ServiceInstance... serviceInstances)
+ throws InterruptedException {
+ assertChangeFired(ImmutableSet.copyOf(serviceInstances));
+ }
+
+ protected void assertChangeFired(ImmutableSet<ServiceInstance> serviceInstances)
+ throws InterruptedException {
+ assertEquals(serviceInstances, serverSetBuffer.take());
+ }
+}
http://git-wip-us.apache.org/repos/asf/aurora/blob/16e4651d/commons/src/test/java/org/apache/aurora/common/zookeeper/ServerSetsTest.java
----------------------------------------------------------------------
diff --git a/commons/src/test/java/org/apache/aurora/common/zookeeper/ServerSetsTest.java b/commons/src/test/java/org/apache/aurora/common/zookeeper/ServerSetsTest.java
new file mode 100644
index 0000000..0e67191
--- /dev/null
+++ b/commons/src/test/java/org/apache/aurora/common/zookeeper/ServerSetsTest.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.common.zookeeper;
+
+import java.net.InetSocketAddress;
+import java.util.Map;
+
+import com.google.common.collect.ImmutableMap;
+
+import org.apache.aurora.common.thrift.Endpoint;
+import org.apache.aurora.common.thrift.ServiceInstance;
+import org.apache.aurora.common.thrift.Status;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class ServerSetsTest {
+ @Test
+ public void testSimpleSerialization() throws Exception {
+ InetSocketAddress endpoint = new InetSocketAddress(12345);
+ Map<String, Endpoint > additionalEndpoints = ImmutableMap.of();
+ Status status = Status.ALIVE;
+
+ byte[] data = ServerSets.serializeServiceInstance(
+ endpoint, additionalEndpoints, status, ServerSet.JSON_CODEC);
+
+ ServiceInstance instance = ServerSets.deserializeServiceInstance(data, ServerSet.JSON_CODEC);
+
+ assertEquals(endpoint.getPort(), instance.getServiceEndpoint().getPort());
+ assertEquals(additionalEndpoints, instance.getAdditionalEndpoints());
+ assertEquals(Status.ALIVE, instance.getStatus());
+ }
+}
http://git-wip-us.apache.org/repos/asf/aurora/blob/16e4651d/commons/src/test/java/org/apache/aurora/common/zookeeper/SingletonServiceImplTest.java
----------------------------------------------------------------------
diff --git a/commons/src/test/java/org/apache/aurora/common/zookeeper/SingletonServiceImplTest.java b/commons/src/test/java/org/apache/aurora/common/zookeeper/SingletonServiceImplTest.java
new file mode 100644
index 0000000..5f6cdd8
--- /dev/null
+++ b/commons/src/test/java/org/apache/aurora/common/zookeeper/SingletonServiceImplTest.java
@@ -0,0 +1,243 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.common.zookeeper;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.List;
+import java.util.Map;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+
+import org.apache.aurora.common.base.ExceptionalCommand;
+import org.apache.aurora.common.zookeeper.Candidate.Leader;
+import org.apache.aurora.common.zookeeper.SingletonService.LeaderControl;
+import org.apache.aurora.common.zookeeper.SingletonService.LeadershipListener;
+import org.apache.aurora.common.zookeeper.testing.BaseZooKeeperClientTest;
+import org.easymock.Capture;
+import org.easymock.IExpectationSetters;
+import org.easymock.IMocksControl;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.apache.aurora.common.testing.easymock.EasyMockTest.createCapture;
+import static org.easymock.EasyMock.capture;
+import static org.easymock.EasyMock.createControl;
+import static org.easymock.EasyMock.expect;
+import static org.junit.Assert.fail;
+
+public class SingletonServiceImplTest extends BaseZooKeeperClientTest {
+ private static final int PORT_A = 1234;
+ private static final int PORT_B = 8080;
+ private static final InetSocketAddress PRIMARY_ENDPOINT =
+ InetSocketAddress.createUnresolved("foo", PORT_A);
+ private static final Map<String, InetSocketAddress> AUX_ENDPOINTS =
+ ImmutableMap.of("http-admin", InetSocketAddress.createUnresolved("foo", PORT_B));
+
+ private IMocksControl control;
+ private SingletonServiceImpl.LeadershipListener listener;
+ private ServerSet serverSet;
+ private ServerSet.EndpointStatus endpointStatus;
+ private Candidate candidate;
+ private ExceptionalCommand<Group.JoinException> abdicate;
+
+ private SingletonService service;
+
+ @Before
+ @SuppressWarnings("unchecked")
+ public void mySetUp() throws IOException {
+ control = createControl();
+ addTearDown(control::verify);
+ listener = control.createMock(SingletonServiceImpl.LeadershipListener.class);
+ serverSet = control.createMock(ServerSet.class);
+ candidate = control.createMock(Candidate.class);
+ endpointStatus = control.createMock(ServerSet.EndpointStatus.class);
+ abdicate = control.createMock(ExceptionalCommand.class);
+
+ service = new SingletonServiceImpl(serverSet, candidate);
+ }
+
+ private void newLeader(
+ final String hostName,
+ Capture<Leader> leader,
+ LeadershipListener listener) throws Exception {
+
+ service.lead(InetSocketAddress.createUnresolved(hostName, PORT_A),
+ ImmutableMap.of("http-admin", InetSocketAddress.createUnresolved(hostName, PORT_B)),
+ listener);
+
+ // This actually elects the leader.
+ leader.getValue().onElected(abdicate);
+ }
+
+ private void newLeader(String hostName, Capture<Leader> leader) throws Exception {
+ newLeader(hostName, leader, listener);
+ }
+
+ private IExpectationSetters<ServerSet.EndpointStatus> expectJoin() throws Exception {
+ return expect(serverSet.join(PRIMARY_ENDPOINT, AUX_ENDPOINTS));
+ }
+
+ @Test
+ public void testLeadAdvertise() throws Exception {
+ Capture<Leader> leaderCapture = createCapture();
+
+ expect(candidate.offerLeadership(capture(leaderCapture))).andReturn(null);
+ Capture<LeaderControl> controlCapture = createCapture();
+ listener.onLeading(capture(controlCapture));
+
+ expectJoin().andReturn(endpointStatus);
+ endpointStatus.leave();
+ abdicate.execute();
+
+ control.replay();
+
+ newLeader("foo", leaderCapture);
+ controlCapture.getValue().advertise();
+ controlCapture.getValue().leave();
+ }
+
+ @Test
+ public void teatLeadLeaveNoAdvertise() throws Exception {
+ Capture<Leader> leaderCapture = createCapture();
+
+ expect(candidate.offerLeadership(capture(leaderCapture))).andReturn(null);
+ abdicate.execute();
+
+ Capture<LeaderControl> controlCapture = createCapture();
+ listener.onLeading(capture(controlCapture));
+
+ control.replay();
+
+ newLeader("foo", leaderCapture);
+ controlCapture.getValue().leave();
+ }
+
+ @Test
+ public void testLeadJoinFailure() throws Exception {
+ Capture<Leader> leaderCapture = new Capture<Leader>();
+
+ expect(candidate.offerLeadership(capture(leaderCapture))).andReturn(null);
+ Capture<LeaderControl> controlCapture = createCapture();
+ listener.onLeading(capture(controlCapture));
+
+ expectJoin().andThrow(new Group.JoinException("Injected join failure.", new Exception()));
+ abdicate.execute();
+
+ control.replay();
+
+ newLeader("foo", leaderCapture);
+
+ try {
+ controlCapture.getValue().advertise();
+ fail("Join should have failed.");
+ } catch (SingletonService.AdvertiseException e) {
+ // Expected.
+ }
+
+ controlCapture.getValue().leave();
+ }
+
+ @Test(expected = IllegalStateException.class)
+ public void testMultipleAdvertise() throws Exception {
+ Capture<Leader> leaderCapture = createCapture();
+
+ expect(candidate.offerLeadership(capture(leaderCapture))).andReturn(null);
+ Capture<LeaderControl> controlCapture = createCapture();
+ listener.onLeading(capture(controlCapture));
+
+ expectJoin().andReturn(endpointStatus);
+
+ control.replay();
+
+ newLeader("foo", leaderCapture);
+ controlCapture.getValue().advertise();
+ controlCapture.getValue().advertise();
+ }
+
+ @Test(expected = IllegalStateException.class)
+ public void testMultipleLeave() throws Exception {
+ Capture<Leader> leaderCapture = createCapture();
+
+ expect(candidate.offerLeadership(capture(leaderCapture))).andReturn(null);
+ Capture<LeaderControl> controlCapture = createCapture();
+ listener.onLeading(capture(controlCapture));
+
+ expectJoin().andReturn(endpointStatus);
+ endpointStatus.leave();
+ abdicate.execute();
+
+ control.replay();
+
+ newLeader("foo", leaderCapture);
+ controlCapture.getValue().advertise();
+ controlCapture.getValue().leave();
+ controlCapture.getValue().leave();
+ }
+
+ @Test(expected = IllegalStateException.class)
+ public void testAdvertiseAfterLeave() throws Exception {
+ Capture<Leader> leaderCapture = createCapture();
+
+ expect(candidate.offerLeadership(capture(leaderCapture))).andReturn(null);
+ Capture<LeaderControl> controlCapture = createCapture();
+ listener.onLeading(capture(controlCapture));
+
+ abdicate.execute();
+
+ control.replay();
+
+ newLeader("foo", leaderCapture);
+ controlCapture.getValue().leave();
+ controlCapture.getValue().advertise();
+ }
+
+ @Test
+ public void testLeadMulti() throws Exception {
+ List<Capture<Leader>> leaderCaptures = Lists.newArrayList();
+ List<Capture<LeaderControl>> leaderControlCaptures = Lists.newArrayList();
+
+ for (int i = 0; i < 5; i++) {
+ Capture<Leader> leaderCapture = new Capture<Leader>();
+ leaderCaptures.add(leaderCapture);
+ Capture<LeaderControl> controlCapture = createCapture();
+ leaderControlCaptures.add(controlCapture);
+
+ expect(candidate.offerLeadership(capture(leaderCapture))).andReturn(null);
+ listener.onLeading(capture(controlCapture));
+ InetSocketAddress primary = InetSocketAddress.createUnresolved("foo" + i, PORT_A);
+ Map<String, InetSocketAddress> aux =
+ ImmutableMap.of("http-admin", InetSocketAddress.createUnresolved("foo" + i, PORT_B));
+ expect(serverSet.join(primary, aux)).andReturn(endpointStatus);
+ endpointStatus.leave();
+ abdicate.execute();
+ }
+
+ control.replay();
+
+ for (int i = 0; i < 5; i++) {
+ final String leaderName = "foo" + i;
+ newLeader(leaderName, leaderCaptures.get(i));
+ leaderControlCaptures.get(i).getValue().advertise();
+ leaderControlCaptures.get(i).getValue().leave();
+ }
+ }
+
+ @Test
+ public void testLeaderLeaves() throws Exception {
+ control.replay();
+ shutdownNetwork();
+ }
+}
http://git-wip-us.apache.org/repos/asf/aurora/blob/16e4651d/commons/src/test/java/org/apache/aurora/common/zookeeper/ZooKeeperClientTest.java
----------------------------------------------------------------------
diff --git a/commons/src/test/java/org/apache/aurora/common/zookeeper/ZooKeeperClientTest.java b/commons/src/test/java/org/apache/aurora/common/zookeeper/ZooKeeperClientTest.java
new file mode 100644
index 0000000..5eee235
--- /dev/null
+++ b/commons/src/test/java/org/apache/aurora/common/zookeeper/ZooKeeperClientTest.java
@@ -0,0 +1,210 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.common.zookeeper;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.aurora.common.quantity.Amount;
+import org.apache.aurora.common.quantity.Time;
+import org.apache.aurora.common.zookeeper.ZooKeeperClient.ZooKeeperConnectionException;
+import org.apache.aurora.common.zookeeper.testing.BaseZooKeeperClientTest;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException.ConnectionLossException;
+import org.apache.zookeeper.KeeperException.NoAuthException;
+import org.apache.zookeeper.ZooKeeper;
+import org.junit.Test;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * @author John Sirois
+ */
+public class ZooKeeperClientTest extends BaseZooKeeperClientTest {
+
+ public ZooKeeperClientTest() {
+ super(Amount.of(1, Time.DAYS));
+ }
+
+ @Test
+ public void testGet() throws Exception {
+ final ZooKeeperClient zkClient = createZkClient();
+ shutdownNetwork();
+ try {
+ zkClient.get(Amount.of(50L, Time.MILLISECONDS));
+ fail("Expected client connection to timeout while network down");
+ } catch (TimeoutException e) {
+ assertTrue(zkClient.isClosed());
+ }
+ assertNull(zkClient.getZooKeeperClientForTests());
+
+ final CountDownLatch blockingGetComplete = new CountDownLatch(1);
+ final AtomicReference<ZooKeeper> client = new AtomicReference<ZooKeeper>();
+ new Thread(() -> {
+ try {
+ client.set(zkClient.get());
+ } catch (ZooKeeperConnectionException e) {
+ throw new RuntimeException(e);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ } finally {
+ blockingGetComplete.countDown();
+ }
+ }).start();
+
+ restartNetwork();
+
+ // Hung blocking connects should succeed when server connection comes up
+ blockingGetComplete.await();
+ assertNotNull(client.get());
+
+ // New connections should succeed now that network is back up
+ long sessionId = zkClient.get().getSessionId();
+
+ // While connected the same client should be reused (no new connections while healthy)
+ assertSame(client.get(), zkClient.get());
+
+ shutdownNetwork();
+ // Our client doesn't know the network is down yet so we should be able to get()
+ ZooKeeper zooKeeper = zkClient.get();
+ try {
+ zooKeeper.exists("/", false);
+ fail("Expected client operation to fail while network down");
+ } catch (ConnectionLossException e) {
+ // expected
+ }
+
+ restartNetwork();
+ assertEquals("Expected connection to be re-established with existing session",
+ sessionId, zkClient.get().getSessionId());
+ }
+
+ /**
+ * Test that if a blocking get() call gets interrupted, after a connection has been created
+ * but before it's connected, the zk connection gets closed.
+ */
+ @Test
+ public void testGetInterrupted() throws Exception {
+ final ZooKeeperClient zkClient = createZkClient();
+ shutdownNetwork();
+
+ final CountDownLatch blockingGetComplete = new CountDownLatch(1);
+ final AtomicBoolean interrupted = new AtomicBoolean();
+ final AtomicReference<ZooKeeper> client = new AtomicReference<ZooKeeper>();
+ Thread getThread = new Thread(() -> {
+ try {
+ client.set(zkClient.get());
+ } catch (ZooKeeperConnectionException e) {
+ throw new RuntimeException(e);
+ } catch (InterruptedException e) {
+ interrupted.set(true);
+ throw new RuntimeException(e);
+ } finally {
+ blockingGetComplete.countDown();
+ }
+ });
+ getThread.start();
+
+ while (zkClient.getZooKeeperClientForTests() == null) {
+ Thread.sleep(100);
+ }
+
+ getThread.interrupt();
+ blockingGetComplete.await();
+
+ assertNull("The zk connection should have been closed", zkClient.getZooKeeperClientForTests());
+ assertTrue("The waiter thread should have been interrupted", interrupted.get());
+ assertTrue(zkClient.isClosed());
+ }
+
+ @Test
+ public void testClose() throws Exception {
+ ZooKeeperClient zkClient = createZkClient();
+ zkClient.close();
+
+ // Close should be idempotent
+ zkClient.close();
+
+ long firstSessionId = zkClient.get().getSessionId();
+
+ // Close on an open client should force session re-establishment
+ zkClient.close();
+
+ assertNotEquals(firstSessionId, zkClient.get().getSessionId());
+ }
+
+ @Test
+ public void testCredentials() throws Exception {
+ String path = "/test";
+ ZooKeeperClient authenticatedClient = createZkClient("creator", "creator");
+ assertEquals(path,
+ authenticatedClient.get().create(path, "42".getBytes(),
+ ZooKeeperUtils.EVERYONE_READ_CREATOR_ALL, CreateMode.PERSISTENT));
+
+ ZooKeeperClient unauthenticatedClient = createZkClient();
+ assertEquals("42", getData(unauthenticatedClient, path));
+ try {
+ setData(unauthenticatedClient, path, "37");
+ fail("Expected unauthenticated write attempt to fail");
+ } catch (NoAuthException e) {
+ assertEquals("42", getData(unauthenticatedClient, path));
+ }
+
+ ZooKeeperClient nonOwnerClient = createZkClient("nonowner", "nonowner");
+ assertEquals("42", getData(nonOwnerClient, path));
+ try {
+ setData(nonOwnerClient, path, "37");
+ fail("Expected non owner write attempt to fail");
+ } catch (NoAuthException e) {
+ assertEquals("42", getData(nonOwnerClient, path));
+ }
+
+ ZooKeeperClient authenticatedClient2 = createZkClient("creator", "creator");
+ setData(authenticatedClient2, path, "37");
+ assertEquals("37", getData(authenticatedClient2, path));
+ }
+
+ @Test
+ public void testChrootPath() throws Exception {
+ ZooKeeperClient rootClient = createZkClient();
+ String rootPath = "/test";
+ String subPath = "/test/subtest";
+ assertEquals(rootPath,
+ rootClient.get().create(rootPath, "42".getBytes(),
+ ZooKeeperUtils.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT));
+ assertEquals(subPath,
+ rootClient.get().create(subPath, "37".getBytes(),
+ ZooKeeperUtils.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT));
+
+ ZooKeeperClient chrootedClient = createZkClient(rootPath);
+ assertArrayEquals("37".getBytes(), chrootedClient.get().getData("/subtest", false, null));
+ }
+
+ private void setData(ZooKeeperClient zkClient, String path, String data) throws Exception {
+ zkClient.get().setData(path, data.getBytes(), ZooKeeperUtils.ANY_VERSION);
+ }
+
+ private String getData(ZooKeeperClient zkClient, String path) throws Exception {
+ return new String(zkClient.get().getData(path, false, null));
+ }
+}