You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aurora.apache.org by wf...@apache.org on 2017/10/19 00:34:05 UTC
[2/3] aurora git commit: Remove legacy commons ZK code
http://git-wip-us.apache.org/repos/asf/aurora/blob/15cb049f/commons/src/main/java/org/apache/aurora/common/zookeeper/ZooKeeperUtils.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/org/apache/aurora/common/zookeeper/ZooKeeperUtils.java b/commons/src/main/java/org/apache/aurora/common/zookeeper/ZooKeeperUtils.java
index 93ddd89..f091384 100644
--- a/commons/src/main/java/org/apache/aurora/common/zookeeper/ZooKeeperUtils.java
+++ b/commons/src/main/java/org/apache/aurora/common/zookeeper/ZooKeeperUtils.java
@@ -13,29 +13,19 @@
*/
package org.apache.aurora.common.zookeeper;
-import java.util.List;
-
-import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import org.apache.aurora.common.quantity.Amount;
import org.apache.aurora.common.quantity.Time;
-import org.apache.aurora.common.zookeeper.ZooKeeperClient.ZooKeeperConnectionException;
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.common.PathUtils;
import org.apache.zookeeper.data.ACL;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
/**
- * Utilities for dealing with zoo keeper.
+ * Utilities for dealing with ZooKeeper.
*/
public final class ZooKeeperUtils {
- private static final Logger LOG = LoggerFactory.getLogger(ZooKeeperUtils.class);
-
/**
* An appropriate default session timeout for Twitter ZooKeeper clusters.
*/
@@ -44,12 +34,6 @@ public final class ZooKeeperUtils {
public static final Amount<Integer,Time> DEFAULT_ZK_CONNECTION_TIMEOUT = Amount.of(10, Time.SECONDS);
/**
- * The magic version number that allows any mutation to always succeed regardless of actual
- * version number.
- */
- public static final int ANY_VERSION = -1;
-
- /**
* An ACL that gives all permissions any user authenticated or not.
*/
public static final ImmutableList<ACL> OPEN_ACL_UNSAFE =
@@ -65,99 +49,13 @@ public final class ZooKeeperUtils {
.build();
/**
- * Returns true if the given exception indicates an error that can be resolved by retrying the
- * operation without modification.
- *
- * @param e the exception to check
- * @return true if the causing operation is strictly retryable
- */
- public static boolean isRetryable(KeeperException e) {
- Preconditions.checkNotNull(e);
-
- switch (e.code()) {
- case CONNECTIONLOSS:
- case SESSIONEXPIRED:
- case SESSIONMOVED:
- case OPERATIONTIMEOUT:
- return true;
-
- case RUNTIMEINCONSISTENCY:
- case DATAINCONSISTENCY:
- case MARSHALLINGERROR:
- case BADARGUMENTS:
- case NONODE:
- case NOAUTH:
- case BADVERSION:
- case NOCHILDRENFOREPHEMERALS:
- case NODEEXISTS:
- case NOTEMPTY:
- case INVALIDCALLBACK:
- case INVALIDACL:
- case AUTHFAILED:
- case UNIMPLEMENTED:
-
- // These two should not be encountered - they are used internally by ZK to specify ranges
- case SYSTEMERROR:
- case APIERROR:
-
- case OK: // This is actually an invalid ZK exception code
-
- default:
- return false;
- }
- }
-
- /**
- * Ensures the given {@code path} exists in the ZK cluster accessed by {@code zkClient}. If the
- * path already exists, nothing is done; however if any portion of the path is missing, it will be
- * created with the given {@code acl} as a persistent zookeeper node. The given {@code path} must
- * be a valid zookeeper absolute path.
- *
- * @param zkClient the client to use to access the ZK cluster
- * @param acl the acl to use if creating path nodes
- * @param path the path to ensure exists
- * @throws ZooKeeperConnectionException if there was a problem accessing the ZK cluster
- * @throws InterruptedException if we were interrupted attempting to connect to the ZK cluster
- * @throws KeeperException if there was a problem in ZK
- */
- public static void ensurePath(ZooKeeperClient zkClient, List<ACL> acl, String path)
- throws ZooKeeperConnectionException, InterruptedException, KeeperException {
- Preconditions.checkNotNull(zkClient);
- Preconditions.checkNotNull(path);
- Preconditions.checkArgument(path.startsWith("/"));
-
- ensurePathInternal(zkClient, acl, path);
- }
-
- private static void ensurePathInternal(ZooKeeperClient zkClient, List<ACL> acl, String path)
- throws ZooKeeperConnectionException, InterruptedException, KeeperException {
- if (zkClient.get().exists(path, false) == null) {
- // The current path does not exist; so back up a level and ensure the parent path exists
- // unless we're already a root-level path.
- int lastPathIndex = path.lastIndexOf('/');
- if (lastPathIndex > 0) {
- ensurePathInternal(zkClient, acl, path.substring(0, lastPathIndex));
- }
-
- // We've ensured our parent path (if any) exists so we can proceed to create our path.
- try {
- zkClient.get().create(path, null, acl, CreateMode.PERSISTENT);
- } catch (KeeperException.NodeExistsException e) {
- // This ensures we don't die if a race condition was met between checking existence and
- // trying to create the node.
- LOG.info("Node existed when trying to ensure path " + path + ", somebody beat us to it?");
- }
- }
- }
-
- /**
* Validate and return a normalized zookeeper path which doesn't contain consecutive slashes and
* never ends with a slash (except for root path).
*
* @param path the path to be normalized
* @return normalized path string
*/
- public static String normalizePath(String path) {
+ static String normalizePath(String path) {
String normalizedPath = path.replaceAll("//+", "/").replaceFirst("(.+)/$", "$1");
PathUtils.validatePath(normalizedPath);
return normalizedPath;
http://git-wip-us.apache.org/repos/asf/aurora/blob/15cb049f/commons/src/main/java/org/apache/aurora/common/zookeeper/testing/BaseZooKeeperClientTest.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/org/apache/aurora/common/zookeeper/testing/BaseZooKeeperClientTest.java b/commons/src/main/java/org/apache/aurora/common/zookeeper/testing/BaseZooKeeperClientTest.java
deleted file mode 100644
index ba09279..0000000
--- a/commons/src/main/java/org/apache/aurora/common/zookeeper/testing/BaseZooKeeperClientTest.java
+++ /dev/null
@@ -1,140 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.common.zookeeper.testing;
-
-import java.io.IOException;
-import java.net.InetSocketAddress;
-
-import com.google.common.base.Optional;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableList;
-
-import org.apache.aurora.common.quantity.Amount;
-import org.apache.aurora.common.quantity.Time;
-import org.apache.aurora.common.zookeeper.Credentials;
-import org.apache.aurora.common.zookeeper.ZooKeeperClient;
-
-/**
- * A base-class for tests that interact with ZooKeeper via the commons ZooKeeperClient.
- */
-public abstract class BaseZooKeeperClientTest extends BaseZooKeeperTest {
-
- private final Amount<Integer, Time> defaultSessionTimeout;
-
- /**
- * Creates a test case where the test server uses its
- * {@link ZooKeeperTestServer#DEFAULT_SESSION_TIMEOUT} for clients created without an explicit
- * session timeout.
- */
- public BaseZooKeeperClientTest() {
- this(ZooKeeperTestServer.DEFAULT_SESSION_TIMEOUT);
- }
-
- /**
- * Creates a test case where the test server uses the given {@code defaultSessionTimeout} for
- * clients created without an explicit session timeout.
- */
- public BaseZooKeeperClientTest(Amount<Integer, Time> defaultSessionTimeout) {
- this.defaultSessionTimeout = Preconditions.checkNotNull(defaultSessionTimeout);
- }
-
-
- /**
- * Starts zookeeper back up on the last used port.
- */
- protected final void restartNetwork() throws IOException, InterruptedException {
- getServer().restartNetwork();
- }
-
- /**
- * Shuts down the in-process zookeeper network server.
- */
- protected final void shutdownNetwork() {
- getServer().shutdownNetwork();
- }
-
- /**
- * Expires the active session for the given client. The client should be one returned from
- * {@link #createZkClient}.
- *
- * @param zkClient the client to expire
- * @throws ZooKeeperClient.ZooKeeperConnectionException if a problem is encountered connecting to
- * the local zk server while trying to expire the session
- * @throws InterruptedException if interrupted while requesting expiration
- */
- protected final void expireSession(ZooKeeperClient zkClient)
- throws ZooKeeperClient.ZooKeeperConnectionException, InterruptedException {
- getServer().expireClientSession(zkClient.get().getSessionId());
- }
-
- /**
- * Returns the current port to connect to the in-process zookeeper instance.
- */
- protected final int getPort() {
- return getServer().getPort();
- }
-
- /**
- * Returns a new unauthenticated zookeeper client connected to the in-process zookeeper server
- * with the default session timeout.
- */
- protected final ZooKeeperClient createZkClient() {
- return createZkClient(defaultSessionTimeout, Optional.absent(), Optional.absent());
- }
-
- /**
- * Returns a new authenticated zookeeper client connected to the in-process zookeeper server with
- * the default session timeout.
- */
- protected final ZooKeeperClient createZkClient(Credentials credentials) {
- return createZkClient(defaultSessionTimeout, Optional.of(credentials), Optional.absent());
- }
-
- /**
- * Returns a new authenticated zookeeper client connected to the in-process zookeeper server with
- * the default session timeout. The client is authenticated in the digest authentication scheme
- * with the given {@code username} and {@code password}.
- */
- protected final ZooKeeperClient createZkClient(String username, String password) {
- return createZkClient(Credentials.digestCredentials(username, password));
- }
-
- /**
- * Returns a new unauthenticated zookeeper client connected to the in-process zookeeper server
- * with a custom {@code sessionTimeout}.
- */
- protected final ZooKeeperClient createZkClient(Amount<Integer, Time> sessionTimeout) {
- return createZkClient(sessionTimeout, Optional.absent(), Optional.absent());
- }
-
- /**
- * Returns a new authenticated zookeeper client connected to the in-process zookeeper server with
- * the default session timeout and the custom chroot path.
- */
- protected final ZooKeeperClient createZkClient(String chrootPath) {
- return createZkClient(defaultSessionTimeout, Optional.absent(),
- Optional.of(chrootPath));
- }
-
- private ZooKeeperClient createZkClient(
- Amount<Integer, Time> sessionTimeout,
- Optional<Credentials> credentials,
- Optional<String> chrootPath) {
-
- ZooKeeperClient client = new ZooKeeperClient(sessionTimeout, credentials, chrootPath,
- ImmutableList.of(InetSocketAddress.createUnresolved("127.0.0.1", getPort())));
- addTearDown(client::close);
- return client;
- }
-}
http://git-wip-us.apache.org/repos/asf/aurora/blob/15cb049f/commons/src/main/java/org/apache/aurora/common/zookeeper/testing/ZooKeeperTestServer.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/org/apache/aurora/common/zookeeper/testing/ZooKeeperTestServer.java b/commons/src/main/java/org/apache/aurora/common/zookeeper/testing/ZooKeeperTestServer.java
index 29204cd..a4504b8 100644
--- a/commons/src/main/java/org/apache/aurora/common/zookeeper/testing/ZooKeeperTestServer.java
+++ b/commons/src/main/java/org/apache/aurora/common/zookeeper/testing/ZooKeeperTestServer.java
@@ -19,8 +19,6 @@ import java.net.InetSocketAddress;
import com.google.common.base.Preconditions;
-import org.apache.aurora.common.quantity.Amount;
-import org.apache.aurora.common.quantity.Time;
import org.apache.zookeeper.server.NIOServerCnxnFactory;
import org.apache.zookeeper.server.ServerCnxnFactory;
import org.apache.zookeeper.server.ZooKeeperServer;
@@ -34,8 +32,6 @@ import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
*/
public class ZooKeeperTestServer {
- static final Amount<Integer, Time> DEFAULT_SESSION_TIMEOUT = Amount.of(100, Time.MILLISECONDS);
-
private final File dataDir;
private final File snapDir;
@@ -91,7 +87,7 @@ public class ZooKeeperTestServer {
/**
* Shuts down the in-process zookeeper network server.
*/
- final void shutdownNetwork() {
+ private void shutdownNetwork() {
if (connectionFactory != null) {
connectionFactory.shutdown(); // Also shuts down zooKeeperServer.
connectionFactory = null;
http://git-wip-us.apache.org/repos/asf/aurora/blob/15cb049f/commons/src/test/java/org/apache/aurora/common/zookeeper/CandidateImplTest.java
----------------------------------------------------------------------
diff --git a/commons/src/test/java/org/apache/aurora/common/zookeeper/CandidateImplTest.java b/commons/src/test/java/org/apache/aurora/common/zookeeper/CandidateImplTest.java
deleted file mode 100644
index 9c0cebe..0000000
--- a/commons/src/test/java/org/apache/aurora/common/zookeeper/CandidateImplTest.java
+++ /dev/null
@@ -1,165 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.common.zookeeper;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.LinkedBlockingDeque;
-
-import com.google.common.base.Preconditions;
-import com.google.common.base.Supplier;
-
-import org.apache.aurora.common.base.ExceptionalCommand;
-import org.apache.aurora.common.quantity.Amount;
-import org.apache.aurora.common.quantity.Time;
-import org.apache.aurora.common.zookeeper.testing.BaseZooKeeperClientTest;
-import org.apache.zookeeper.ZooDefs;
-import org.apache.zookeeper.data.ACL;
-import org.junit.Before;
-import org.junit.Test;
-
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertSame;
-import static org.junit.Assert.assertTrue;
-
-public class CandidateImplTest extends BaseZooKeeperClientTest {
- private static final List<ACL> ACL = ZooDefs.Ids.OPEN_ACL_UNSAFE;
- private static final String SERVICE = "/twitter/services/puffin_linkhose/leader";
- private static final Amount<Integer, Time> TIMEOUT = Amount.of(1, Time.MINUTES);
-
- private LinkedBlockingDeque<CandidateImpl> candidateBuffer;
-
- @Before
- public void mySetUp() throws IOException {
- candidateBuffer = new LinkedBlockingDeque<>();
- }
-
- private Group createGroup(ZooKeeperClient zkClient) throws IOException {
- return new Group(zkClient, ACL, SERVICE);
- }
-
- private class Reign implements Candidate.Leader {
- private ExceptionalCommand<Group.JoinException> abdicate;
- private final CandidateImpl candidate;
- private final String id;
- private CountDownLatch defeated = new CountDownLatch(1);
-
- Reign(String id, CandidateImpl candidate) {
- this.id = id;
- this.candidate = candidate;
- }
-
- @Override
- public void onElected(ExceptionalCommand<Group.JoinException> abdicate) {
- candidateBuffer.offerFirst(candidate);
- this.abdicate = abdicate;
- }
-
- @Override
- public void onDefeated() {
- defeated.countDown();
- }
-
- public void abdicate() throws Group.JoinException {
- Preconditions.checkState(abdicate != null);
- abdicate.execute();
- }
-
- public void expectDefeated() throws InterruptedException {
- defeated.await();
- }
-
- @Override
- public String toString() {
- return id;
- }
- }
-
- @Test
- public void testOfferLeadership() throws Exception {
- ZooKeeperClient zkClient1 = createZkClient(TIMEOUT);
- final CandidateImpl candidate1 = new CandidateImpl(createGroup(zkClient1)) {
- @Override public String toString() {
- return "Leader1";
- }
- };
- ZooKeeperClient zkClient2 = createZkClient(TIMEOUT);
- final CandidateImpl candidate2 = new CandidateImpl(createGroup(zkClient2)) {
- @Override public String toString() {
- return "Leader2";
- }
- };
- ZooKeeperClient zkClient3 = createZkClient(TIMEOUT);
- final CandidateImpl candidate3 = new CandidateImpl(createGroup(zkClient3)) {
- @Override public String toString() {
- return "Leader3";
- }
- };
-
- Reign candidate1Reign = new Reign("1", candidate1);
- Reign candidate2Reign = new Reign("2", candidate2);
- Reign candidate3Reign = new Reign("3", candidate3);
-
- Supplier<Boolean> candidate1Leader = candidate1.offerLeadership(candidate1Reign);
- Supplier<Boolean> candidate2Leader = candidate2.offerLeadership(candidate2Reign);
- Supplier<Boolean> candidate3Leader = candidate3.offerLeadership(candidate3Reign);
-
- assertTrue("Since initial group join is synchronous, candidate 1 should be the first leader",
- candidate1Leader.get());
-
- shutdownNetwork();
- restartNetwork();
-
- assertTrue("A re-connect without a session expiration should leave the leader elected",
- candidate1Leader.get());
-
- candidate1Reign.abdicate();
- assertSame(candidate1, candidateBuffer.takeLast());
- assertFalse(candidate1Leader.get());
- // Active abdication should trigger defeat.
- candidate1Reign.expectDefeated();
-
- CandidateImpl secondCandidate = candidateBuffer.takeLast();
- assertTrue("exactly 1 remaining candidate should now be leader: " + secondCandidate + " "
- + candidateBuffer,
- candidate2Leader.get() ^ candidate3Leader.get());
-
- if (secondCandidate == candidate2) {
- expireSession(zkClient2);
- assertSame(candidate3, candidateBuffer.takeLast());
- assertTrue(candidate3Leader.get());
- // Passive expiration should trigger defeat.
- candidate2Reign.expectDefeated();
- } else {
- expireSession(zkClient3);
- assertSame(candidate2, candidateBuffer.takeLast());
- assertTrue(candidate2Leader.get());
- // Passive expiration should trigger defeat.
- candidate3Reign.expectDefeated();
- }
- }
-
- @Test
- public void testEmptyMembership() throws Exception {
- ZooKeeperClient zkClient1 = createZkClient(TIMEOUT);
- final CandidateImpl candidate1 = new CandidateImpl(createGroup(zkClient1));
- Reign candidate1Reign = new Reign("1", candidate1);
-
- candidate1.offerLeadership(candidate1Reign);
- assertSame(candidate1, candidateBuffer.takeLast());
- candidate1Reign.abdicate();
- assertFalse(candidate1.getLeaderData().isPresent());
- }
-}
http://git-wip-us.apache.org/repos/asf/aurora/blob/15cb049f/commons/src/test/java/org/apache/aurora/common/zookeeper/EncodingTest.java
----------------------------------------------------------------------
diff --git a/commons/src/test/java/org/apache/aurora/common/zookeeper/EncodingTest.java b/commons/src/test/java/org/apache/aurora/common/zookeeper/EncodingTest.java
new file mode 100644
index 0000000..16c0171
--- /dev/null
+++ b/commons/src/test/java/org/apache/aurora/common/zookeeper/EncodingTest.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.common.zookeeper;
+
+import java.net.InetSocketAddress;
+import java.util.Map;
+
+import com.google.common.collect.ImmutableMap;
+
+import org.apache.aurora.common.thrift.Endpoint;
+import org.apache.aurora.common.thrift.ServiceInstance;
+import org.apache.aurora.common.thrift.Status;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class EncodingTest {
+ @Test
+ public void testSimpleSerialization() throws Exception {
+ InetSocketAddress endpoint = new InetSocketAddress(12345);
+ Map<String, Endpoint > additionalEndpoints = ImmutableMap.of();
+ Status status = Status.ALIVE;
+
+ byte[] data = Encoding.serializeServiceInstance(
+ endpoint, additionalEndpoints, status, Encoding.JSON_CODEC);
+
+ ServiceInstance instance = Encoding.deserializeServiceInstance(data, Encoding.JSON_CODEC);
+
+ assertEquals(endpoint.getPort(), instance.getServiceEndpoint().getPort());
+ assertEquals(additionalEndpoints, instance.getAdditionalEndpoints());
+ assertEquals(Status.ALIVE, instance.getStatus());
+ }
+}
http://git-wip-us.apache.org/repos/asf/aurora/blob/15cb049f/commons/src/test/java/org/apache/aurora/common/zookeeper/GroupTest.java
----------------------------------------------------------------------
diff --git a/commons/src/test/java/org/apache/aurora/common/zookeeper/GroupTest.java b/commons/src/test/java/org/apache/aurora/common/zookeeper/GroupTest.java
deleted file mode 100644
index 97a42d1..0000000
--- a/commons/src/test/java/org/apache/aurora/common/zookeeper/GroupTest.java
+++ /dev/null
@@ -1,321 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.common.zookeeper;
-
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.LinkedBlockingQueue;
-
-import com.google.common.base.Supplier;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Iterables;
-
-import org.apache.aurora.common.base.Command;
-import org.apache.aurora.common.quantity.Amount;
-import org.apache.aurora.common.quantity.Time;
-import org.apache.aurora.common.testing.easymock.EasyMockTest;
-import org.apache.aurora.common.zookeeper.Group.GroupChangeListener;
-import org.apache.aurora.common.zookeeper.Group.JoinException;
-import org.apache.aurora.common.zookeeper.Group.Membership;
-import org.apache.aurora.common.zookeeper.Group.NodeScheme;
-import org.apache.aurora.common.zookeeper.testing.BaseZooKeeperClientTest;
-import org.apache.zookeeper.ZooDefs.Ids;
-import org.junit.Before;
-import org.junit.Test;
-
-import static org.easymock.EasyMock.createMock;
-import static org.easymock.EasyMock.expect;
-import static org.easymock.EasyMock.replay;
-import static org.easymock.EasyMock.reset;
-import static org.easymock.EasyMock.verify;
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotEquals;
-import static org.junit.Assert.fail;
-
-public class GroupTest extends BaseZooKeeperClientTest {
-
- private ZooKeeperClient zkClient;
- private Group joinGroup;
- private Group watchGroup;
- private Command stopWatching;
- private Command onLoseMembership;
-
- private RecordingListener listener;
-
- public GroupTest() {
- super(Amount.of(1, Time.DAYS));
- }
-
- @Before
- public void mySetUp() throws Exception {
- onLoseMembership = createMock(Command.class);
-
- zkClient = createZkClient("group", "test");
- joinGroup = new Group(zkClient, ZooKeeperUtils.EVERYONE_READ_CREATOR_ALL, "/a/group");
- watchGroup = new Group(zkClient, ZooKeeperUtils.EVERYONE_READ_CREATOR_ALL, "/a/group");
-
- listener = new RecordingListener();
- stopWatching = watchGroup.watch(listener);
- }
-
- private static class RecordingListener implements GroupChangeListener {
- private final LinkedBlockingQueue<Iterable<String>> membershipChanges =
- new LinkedBlockingQueue<Iterable<String>>();
-
- @Override
- public void onGroupChange(Iterable<String> memberIds) {
- membershipChanges.add(memberIds);
- }
-
- public Iterable<String> take() throws InterruptedException {
- return membershipChanges.take();
- }
-
- public void assertEmpty() {
- assertEquals(ImmutableList.<Iterable<String>>of(), ImmutableList.copyOf(membershipChanges));
- }
-
- @Override
- public String toString() {
- return membershipChanges.toString();
- }
- }
-
- private static class CustomScheme implements NodeScheme {
- static final String NODE_NAME = "custom_name";
-
- @Override
- public boolean isMember(String nodeName) {
- return NODE_NAME.equals(nodeName);
- }
-
- @Override
- public String createName(byte[] membershipData) {
- return NODE_NAME;
- }
-
- @Override
- public boolean isSequential() {
- return false;
- }
- }
-
- @Test
- public void testSessionExpirationTriggersOnLoseMembership() throws Exception {
- final CountDownLatch lostMembership = new CountDownLatch(1);
- Command onLoseMembership = lostMembership::countDown;
- assertEmptyMembershipObserved();
-
- Membership membership = joinGroup.join(onLoseMembership);
- assertMembershipObserved(membership.getMemberId());
- expireSession(zkClient);
-
- lostMembership.await(); // Will hang this test if onLoseMembership event is not propagated.
- }
-
- @Test
- public void testNodeDeleteTriggersOnLoseMembership() throws Exception {
- final CountDownLatch lostMembership = new CountDownLatch(1);
- Command onLoseMembership = lostMembership::countDown;
- assertEmptyMembershipObserved();
-
- Membership membership = joinGroup.join(onLoseMembership);
- assertMembershipObserved(membership.getMemberId());
- membership.cancel();
-
- lostMembership.await(); // Will hang this test if onLoseMembership event is not propagated.
- }
-
- @Test
- public void testJoinsAndWatchesSurviveDisconnect() throws Exception {
- replay(onLoseMembership);
-
- assertEmptyMembershipObserved();
-
- Membership membership = joinGroup.join();
- String originalMemberId = membership.getMemberId();
- assertMembershipObserved(originalMemberId);
-
- shutdownNetwork();
- restartNetwork();
-
- // The member should still be present under existing ephemeral node since session did not
- // expire.
- watchGroup.watch(listener);
- assertMembershipObserved(originalMemberId);
-
- membership.cancel();
-
- assertEmptyMembershipObserved();
- assertEmptyMembershipObserved(); // and again for 2nd listener
-
- listener.assertEmpty();
-
- verify(onLoseMembership);
- reset(onLoseMembership); // Turn off expectations during ZK server shutdown.
- }
-
- @Test
- public void testJoinsAndWatchesSurviveExpiredSession() throws Exception {
- onLoseMembership.execute();
- replay(onLoseMembership);
-
- assertEmptyMembershipObserved();
-
- Membership membership = joinGroup.join(onLoseMembership);
- String originalMemberId = membership.getMemberId();
- assertMembershipObserved(originalMemberId);
-
- expireSession(zkClient);
-
- // We should have lost our group membership and then re-gained it with a new ephemeral node.
- // We may or may-not see the intermediate state change but we must see the final state
- Iterable<String> members = listener.take();
- if (Iterables.isEmpty(members)) {
- members = listener.take();
- }
- assertEquals(1, Iterables.size(members));
- assertNotEquals(originalMemberId, Iterables.getOnlyElement(members));
- assertNotEquals(originalMemberId, membership.getMemberId());
-
- listener.assertEmpty();
-
- verify(onLoseMembership);
- reset(onLoseMembership); // Turn off expectations during ZK server shutdown.
- }
-
- @Test
- public void testJoinCustomNamingScheme() throws Exception {
- Group group = new Group(zkClient, ZooKeeperUtils.EVERYONE_READ_CREATOR_ALL, "/a/group",
- new CustomScheme());
-
- listener = new RecordingListener();
- group.watch(listener);
- assertEmptyMembershipObserved();
-
- Membership membership = group.join();
- String memberId = membership.getMemberId();
-
- assertEquals("Wrong member ID.", CustomScheme.NODE_NAME, memberId);
- assertMembershipObserved(memberId);
-
- expireSession(zkClient);
- }
-
- @Test
- public void testUpdateMembershipData() throws Exception {
- Supplier<byte[]> dataSupplier = new EasyMockTest.Clazz<Supplier<byte[]>>() {}.createMock();
-
- byte[] initial = "start".getBytes();
- expect(dataSupplier.get()).andReturn(initial);
-
- byte[] second = "update".getBytes();
- expect(dataSupplier.get()).andReturn(second);
-
- replay(dataSupplier);
-
- Membership membership = joinGroup.join(dataSupplier, onLoseMembership);
- assertArrayEquals("Initial setting is incorrect.", initial, zkClient.get()
- .getData(membership.getMemberPath(), false, null));
-
- assertArrayEquals("Updating supplier should not change membership data",
- initial, zkClient.get().getData(membership.getMemberPath(), false, null));
-
- membership.updateMemberData();
- assertArrayEquals("Updating membership should change data",
- second, zkClient.get().getData(membership.getMemberPath(), false, null));
-
- verify(dataSupplier);
- }
-
- @Test
- public void testAcls() throws Exception {
- Group securedMembership =
- new Group(createZkClient("secured", "group"), ZooKeeperUtils.EVERYONE_READ_CREATOR_ALL,
- "/secured/group/membership");
-
- String memberId = securedMembership.join().getMemberId();
-
- Group unauthenticatedObserver =
- new Group(createZkClient(),
- Ids.READ_ACL_UNSAFE,
- "/secured/group/membership");
- RecordingListener unauthenticatedListener = new RecordingListener();
- unauthenticatedObserver.watch(unauthenticatedListener);
-
- assertMembershipObserved(unauthenticatedListener, memberId);
-
- try {
- unauthenticatedObserver.join();
- fail("Expected join exception for unauthenticated observer");
- } catch (JoinException e) {
- // expected
- }
-
- Group unauthorizedObserver =
- new Group(createZkClient("joe", "schmoe"),
- Ids.READ_ACL_UNSAFE,
- "/secured/group/membership");
- RecordingListener unauthorizedListener = new RecordingListener();
- unauthorizedObserver.watch(unauthorizedListener);
-
- assertMembershipObserved(unauthorizedListener, memberId);
-
- try {
- unauthorizedObserver.join();
- fail("Expected join exception for unauthorized observer");
- } catch (JoinException e) {
- // expected
- }
- }
-
- @Test
- public void testStopWatching() throws Exception {
- replay(onLoseMembership);
-
- assertEmptyMembershipObserved();
-
- Membership member1 = joinGroup.join();
- String memberId1 = member1.getMemberId();
- assertMembershipObserved(memberId1);
-
- Membership member2 = joinGroup.join();
- String memberId2 = member2.getMemberId();
- assertMembershipObserved(memberId1, memberId2);
-
- stopWatching.execute();
-
- member1.cancel();
- Membership member3 = joinGroup.join();
- member2.cancel();
- member3.cancel();
-
- listener.assertEmpty();
- }
-
- private void assertEmptyMembershipObserved() throws InterruptedException {
- assertMembershipObserved();
- }
-
- private void assertMembershipObserved(String... expectedMemberIds) throws InterruptedException {
- assertMembershipObserved(listener, expectedMemberIds);
- }
-
- private void assertMembershipObserved(RecordingListener listener, String... expectedMemberIds)
- throws InterruptedException {
-
- assertEquals(ImmutableSet.copyOf(expectedMemberIds), ImmutableSet.copyOf(listener.take()));
- }
-}
http://git-wip-us.apache.org/repos/asf/aurora/blob/15cb049f/commons/src/test/java/org/apache/aurora/common/zookeeper/JsonCodecTest.java
----------------------------------------------------------------------
diff --git a/commons/src/test/java/org/apache/aurora/common/zookeeper/JsonCodecTest.java b/commons/src/test/java/org/apache/aurora/common/zookeeper/JsonCodecTest.java
index 2166123..6cf335d 100644
--- a/commons/src/test/java/org/apache/aurora/common/zookeeper/JsonCodecTest.java
+++ b/commons/src/test/java/org/apache/aurora/common/zookeeper/JsonCodecTest.java
@@ -52,25 +52,25 @@ public class JsonCodecTest {
ImmutableMap.of("http", new Endpoint("foo", 8080)),
Status.ALIVE)
.setShard(0);
- byte[] data = ServerSets.serializeServiceInstance(instance1, codec);
- assertTrue(ServerSets.deserializeServiceInstance(data, codec).getServiceEndpoint().isSetPort());
- assertTrue(ServerSets.deserializeServiceInstance(data, codec).isSetShard());
+ byte[] data = Encoding.serializeServiceInstance(instance1, codec);
+ assertTrue(Encoding.deserializeServiceInstance(data, codec).getServiceEndpoint().isSetPort());
+ assertTrue(Encoding.deserializeServiceInstance(data, codec).isSetShard());
ServiceInstance instance2 = new ServiceInstance(
new Endpoint("foo", 1000),
ImmutableMap.of("http-admin1", new Endpoint("foo", 8080)),
Status.ALIVE);
- data = ServerSets.serializeServiceInstance(instance2, codec);
- assertTrue(ServerSets.deserializeServiceInstance(data, codec).getServiceEndpoint().isSetPort());
- assertFalse(ServerSets.deserializeServiceInstance(data, codec).isSetShard());
+ data = Encoding.serializeServiceInstance(instance2, codec);
+ assertTrue(Encoding.deserializeServiceInstance(data, codec).getServiceEndpoint().isSetPort());
+ assertFalse(Encoding.deserializeServiceInstance(data, codec).isSetShard());
ServiceInstance instance3 = new ServiceInstance(
new Endpoint("foo", 1000),
ImmutableMap.<String, Endpoint>of(),
Status.ALIVE);
- data = ServerSets.serializeServiceInstance(instance3, codec);
- assertTrue(ServerSets.deserializeServiceInstance(data, codec).getServiceEndpoint().isSetPort());
- assertFalse(ServerSets.deserializeServiceInstance(data, codec).isSetShard());
+ data = Encoding.serializeServiceInstance(instance3, codec);
+ assertTrue(Encoding.deserializeServiceInstance(data, codec).getServiceEndpoint().isSetPort());
+ assertFalse(Encoding.deserializeServiceInstance(data, codec).isSetShard());
}
@Test
http://git-wip-us.apache.org/repos/asf/aurora/blob/15cb049f/commons/src/test/java/org/apache/aurora/common/zookeeper/ServerSetImplTest.java
----------------------------------------------------------------------
diff --git a/commons/src/test/java/org/apache/aurora/common/zookeeper/ServerSetImplTest.java b/commons/src/test/java/org/apache/aurora/common/zookeeper/ServerSetImplTest.java
deleted file mode 100644
index f0c0cb4..0000000
--- a/commons/src/test/java/org/apache/aurora/common/zookeeper/ServerSetImplTest.java
+++ /dev/null
@@ -1,258 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.common.zookeeper;
-
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.LinkedBlockingQueue;
-
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Iterables;
-
-import org.apache.aurora.common.base.Command;
-import org.apache.aurora.common.net.pool.DynamicHostSet;
-import org.apache.aurora.common.thrift.Endpoint;
-import org.apache.aurora.common.thrift.ServiceInstance;
-import org.apache.aurora.common.thrift.Status;
-import org.apache.aurora.common.zookeeper.Group.JoinException;
-import org.apache.aurora.common.zookeeper.testing.BaseZooKeeperClientTest;
-import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.ZooDefs;
-import org.apache.zookeeper.data.ACL;
-import org.easymock.IMocksControl;
-import org.junit.Before;
-import org.junit.Test;
-
-import static org.easymock.EasyMock.anyObject;
-import static org.easymock.EasyMock.createControl;
-import static org.easymock.EasyMock.expect;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-/**
- *
- * TODO(William Farner): Change this to remove thrift dependency.
- */
-public class ServerSetImplTest extends BaseZooKeeperClientTest {
- private static final List<ACL> ACL = ZooDefs.Ids.OPEN_ACL_UNSAFE;
- private static final String SERVICE = "/twitter/services/puffin_hosebird";
-
- private LinkedBlockingQueue<ImmutableSet<ServiceInstance>> serverSetBuffer;
- private DynamicHostSet.HostChangeMonitor<ServiceInstance> serverSetMonitor;
-
- @Before
- public void mySetUp() throws IOException {
- serverSetBuffer = new LinkedBlockingQueue<>();
- serverSetMonitor = serverSetBuffer::offer;
- }
-
- private ServerSetImpl createServerSet() throws IOException {
- return new ServerSetImpl(createZkClient(), ACL, SERVICE);
- }
-
- @Test
- public void testLifecycle() throws Exception {
- ServerSetImpl client = createServerSet();
- client.watch(serverSetMonitor);
- assertChangeFiredEmpty();
-
- ServerSetImpl server = createServerSet();
- ServerSet.EndpointStatus status = server.join(
- InetSocketAddress.createUnresolved("foo", 1234), makePortMap("http-admin", 8080));
-
- ServiceInstance serviceInstance = new ServiceInstance(
- new Endpoint("foo", 1234),
- ImmutableMap.of("http-admin", new Endpoint("foo", 8080)),
- Status.ALIVE);
-
- assertChangeFired(serviceInstance);
-
- status.leave();
- assertChangeFiredEmpty();
- assertTrue(serverSetBuffer.isEmpty());
- }
-
- @Test
- public void testMembershipChanges() throws Exception {
- ServerSetImpl client = createServerSet();
- client.watch(serverSetMonitor);
- assertChangeFiredEmpty();
-
- ServerSetImpl server = createServerSet();
-
- ServerSet.EndpointStatus foo = join(server, "foo");
- assertChangeFired("foo");
-
- expireSession(client.getZkClient());
-
- ServerSet.EndpointStatus bar = join(server, "bar");
-
- // We should've auto re-monitored membership, but not been notifed of "foo" since this was not a
- // change, just "foo", "bar" since this was an addition.
- assertChangeFired("foo", "bar");
-
- foo.leave();
- assertChangeFired("bar");
-
- ServerSet.EndpointStatus baz = join(server, "baz");
- assertChangeFired("bar", "baz");
-
- baz.leave();
- assertChangeFired("bar");
-
- bar.leave();
- assertChangeFiredEmpty();
-
- assertTrue(serverSetBuffer.isEmpty());
- }
-
- @Test
- public void testStopMonitoring() throws Exception {
- ServerSetImpl client = createServerSet();
- Command stopMonitoring = client.watch(serverSetMonitor);
- assertChangeFiredEmpty();
-
- ServerSetImpl server = createServerSet();
-
- ServerSet.EndpointStatus foo = join(server, "foo");
- assertChangeFired("foo");
- ServerSet.EndpointStatus bar = join(server, "bar");
- assertChangeFired("foo", "bar");
-
- stopMonitoring.execute();
-
- // No new updates should be received since monitoring has stopped.
- foo.leave();
- assertTrue(serverSetBuffer.isEmpty());
-
- // Expiration event.
- assertTrue(serverSetBuffer.isEmpty());
- }
-
- @Test
- public void testOrdering() throws Exception {
- ServerSetImpl client = createServerSet();
- client.watch(serverSetMonitor);
- assertChangeFiredEmpty();
-
- Map<String, InetSocketAddress> server1Ports = makePortMap("http-admin1", 8080);
- Map<String, InetSocketAddress> server2Ports = makePortMap("http-admin2", 8081);
- Map<String, InetSocketAddress> server3Ports = makePortMap("http-admin3", 8082);
-
- ServerSetImpl server1 = createServerSet();
- ServerSetImpl server2 = createServerSet();
- ServerSetImpl server3 = createServerSet();
-
- ServiceInstance instance1 = new ServiceInstance(
- new Endpoint("foo", 1000),
- ImmutableMap.of("http-admin1", new Endpoint("foo", 8080)),
- Status.ALIVE);
- ServiceInstance instance2 = new ServiceInstance(
- new Endpoint("foo", 1001),
- ImmutableMap.of("http-admin2", new Endpoint("foo", 8081)),
- Status.ALIVE);
- ServiceInstance instance3 = new ServiceInstance(
- new Endpoint("foo", 1002),
- ImmutableMap.of("http-admin3", new Endpoint("foo", 8082)),
- Status.ALIVE);
-
- server1.join(InetSocketAddress.createUnresolved("foo", 1000), server1Ports);
- assertEquals(ImmutableList.of(instance1), ImmutableList.copyOf(serverSetBuffer.take()));
-
- ServerSet.EndpointStatus status2 = server2.join(
- InetSocketAddress.createUnresolved("foo", 1001),
- server2Ports);
- assertEquals(ImmutableList.of(instance1, instance2),
- ImmutableList.copyOf(serverSetBuffer.take()));
-
- server3.join(InetSocketAddress.createUnresolved("foo", 1002), server3Ports);
- assertEquals(ImmutableList.of(instance1, instance2, instance3),
- ImmutableList.copyOf(serverSetBuffer.take()));
-
- status2.leave();
- assertEquals(ImmutableList.of(instance1, instance3),
- ImmutableList.copyOf(serverSetBuffer.take()));
- }
-
- @Test
- public void testUnwatchOnException() throws Exception {
- IMocksControl control = createControl();
-
- ZooKeeperClient zkClient = control.createMock(ZooKeeperClient.class);
- Watcher onExpirationWatcher = control.createMock(Watcher.class);
-
- expect(zkClient.registerExpirationHandler(anyObject(Command.class)))
- .andReturn(onExpirationWatcher);
-
- expect(zkClient.get()).andThrow(new InterruptedException()); // See interrupted() note below.
- expect(zkClient.unregister(onExpirationWatcher)).andReturn(true);
- control.replay();
-
- Group group = new Group(zkClient, ZooDefs.Ids.OPEN_ACL_UNSAFE, "/blabla");
- ServerSetImpl serverset = new ServerSetImpl(zkClient, group);
-
- try {
- serverset.watch(hostSet -> {});
- fail("Expected MonitorException");
- } catch (DynamicHostSet.MonitorException e) {
- // NB: The assert is not important to this test, but the call to `Thread.interrupted()` is.
- // That call both returns the current interrupted status as well as clearing it. The clearing
- // is crucial depending on the order tests are run in this class. If this test runs before
- // one of the tests above that uses a `ZooKeeperClient` for example, those tests will fail
- // executing `ZooKeeperClient.get` which internally blocks on s sync-point that takes part in
- // the interruption mechanism and so immediately throws `InterruptedException` based on the
- // un-cleared interrupted bit.
- assertTrue(Thread.interrupted());
- }
- control.verify();
- }
-
- private static Map<String, InetSocketAddress> makePortMap(String name, int port) {
- return ImmutableMap.of(name, InetSocketAddress.createUnresolved("foo", port));
- }
-
- private ServerSet.EndpointStatus join(ServerSet serverSet, String host)
- throws JoinException, InterruptedException {
-
- return serverSet.join(
- InetSocketAddress.createUnresolved(host, 42), ImmutableMap.<String, InetSocketAddress>of());
- }
-
- private void assertChangeFired(String... serviceHosts)
- throws InterruptedException {
-
- assertChangeFired(ImmutableSet.copyOf(Iterables.transform(ImmutableSet.copyOf(serviceHosts),
- serviceHost -> new ServiceInstance(new Endpoint(serviceHost, 42),
- ImmutableMap.<String, Endpoint>of(), Status.ALIVE))));
- }
-
- protected void assertChangeFiredEmpty() throws InterruptedException {
- assertChangeFired(ImmutableSet.<ServiceInstance>of());
- }
-
- protected void assertChangeFired(ServiceInstance... serviceInstances)
- throws InterruptedException {
- assertChangeFired(ImmutableSet.copyOf(serviceInstances));
- }
-
- protected void assertChangeFired(ImmutableSet<ServiceInstance> serviceInstances)
- throws InterruptedException {
- assertEquals(serviceInstances, serverSetBuffer.take());
- }
-}
http://git-wip-us.apache.org/repos/asf/aurora/blob/15cb049f/commons/src/test/java/org/apache/aurora/common/zookeeper/ServerSetsTest.java
----------------------------------------------------------------------
diff --git a/commons/src/test/java/org/apache/aurora/common/zookeeper/ServerSetsTest.java b/commons/src/test/java/org/apache/aurora/common/zookeeper/ServerSetsTest.java
deleted file mode 100644
index 0e67191..0000000
--- a/commons/src/test/java/org/apache/aurora/common/zookeeper/ServerSetsTest.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.common.zookeeper;
-
-import java.net.InetSocketAddress;
-import java.util.Map;
-
-import com.google.common.collect.ImmutableMap;
-
-import org.apache.aurora.common.thrift.Endpoint;
-import org.apache.aurora.common.thrift.ServiceInstance;
-import org.apache.aurora.common.thrift.Status;
-import org.junit.Test;
-
-import static org.junit.Assert.assertEquals;
-
-public class ServerSetsTest {
- @Test
- public void testSimpleSerialization() throws Exception {
- InetSocketAddress endpoint = new InetSocketAddress(12345);
- Map<String, Endpoint > additionalEndpoints = ImmutableMap.of();
- Status status = Status.ALIVE;
-
- byte[] data = ServerSets.serializeServiceInstance(
- endpoint, additionalEndpoints, status, ServerSet.JSON_CODEC);
-
- ServiceInstance instance = ServerSets.deserializeServiceInstance(data, ServerSet.JSON_CODEC);
-
- assertEquals(endpoint.getPort(), instance.getServiceEndpoint().getPort());
- assertEquals(additionalEndpoints, instance.getAdditionalEndpoints());
- assertEquals(Status.ALIVE, instance.getStatus());
- }
-}
http://git-wip-us.apache.org/repos/asf/aurora/blob/15cb049f/commons/src/test/java/org/apache/aurora/common/zookeeper/SingletonServiceImplTest.java
----------------------------------------------------------------------
diff --git a/commons/src/test/java/org/apache/aurora/common/zookeeper/SingletonServiceImplTest.java b/commons/src/test/java/org/apache/aurora/common/zookeeper/SingletonServiceImplTest.java
deleted file mode 100644
index 5f6cdd8..0000000
--- a/commons/src/test/java/org/apache/aurora/common/zookeeper/SingletonServiceImplTest.java
+++ /dev/null
@@ -1,243 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.common.zookeeper;
-
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.util.List;
-import java.util.Map;
-
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Lists;
-
-import org.apache.aurora.common.base.ExceptionalCommand;
-import org.apache.aurora.common.zookeeper.Candidate.Leader;
-import org.apache.aurora.common.zookeeper.SingletonService.LeaderControl;
-import org.apache.aurora.common.zookeeper.SingletonService.LeadershipListener;
-import org.apache.aurora.common.zookeeper.testing.BaseZooKeeperClientTest;
-import org.easymock.Capture;
-import org.easymock.IExpectationSetters;
-import org.easymock.IMocksControl;
-import org.junit.Before;
-import org.junit.Test;
-
-import static org.apache.aurora.common.testing.easymock.EasyMockTest.createCapture;
-import static org.easymock.EasyMock.capture;
-import static org.easymock.EasyMock.createControl;
-import static org.easymock.EasyMock.expect;
-import static org.junit.Assert.fail;
-
-public class SingletonServiceImplTest extends BaseZooKeeperClientTest {
- private static final int PORT_A = 1234;
- private static final int PORT_B = 8080;
- private static final InetSocketAddress PRIMARY_ENDPOINT =
- InetSocketAddress.createUnresolved("foo", PORT_A);
- private static final Map<String, InetSocketAddress> AUX_ENDPOINTS =
- ImmutableMap.of("http-admin", InetSocketAddress.createUnresolved("foo", PORT_B));
-
- private IMocksControl control;
- private SingletonServiceImpl.LeadershipListener listener;
- private ServerSet serverSet;
- private ServerSet.EndpointStatus endpointStatus;
- private Candidate candidate;
- private ExceptionalCommand<Group.JoinException> abdicate;
-
- private SingletonService service;
-
- @Before
- @SuppressWarnings("unchecked")
- public void mySetUp() throws IOException {
- control = createControl();
- addTearDown(control::verify);
- listener = control.createMock(SingletonServiceImpl.LeadershipListener.class);
- serverSet = control.createMock(ServerSet.class);
- candidate = control.createMock(Candidate.class);
- endpointStatus = control.createMock(ServerSet.EndpointStatus.class);
- abdicate = control.createMock(ExceptionalCommand.class);
-
- service = new SingletonServiceImpl(serverSet, candidate);
- }
-
- private void newLeader(
- final String hostName,
- Capture<Leader> leader,
- LeadershipListener listener) throws Exception {
-
- service.lead(InetSocketAddress.createUnresolved(hostName, PORT_A),
- ImmutableMap.of("http-admin", InetSocketAddress.createUnresolved(hostName, PORT_B)),
- listener);
-
- // This actually elects the leader.
- leader.getValue().onElected(abdicate);
- }
-
- private void newLeader(String hostName, Capture<Leader> leader) throws Exception {
- newLeader(hostName, leader, listener);
- }
-
- private IExpectationSetters<ServerSet.EndpointStatus> expectJoin() throws Exception {
- return expect(serverSet.join(PRIMARY_ENDPOINT, AUX_ENDPOINTS));
- }
-
- @Test
- public void testLeadAdvertise() throws Exception {
- Capture<Leader> leaderCapture = createCapture();
-
- expect(candidate.offerLeadership(capture(leaderCapture))).andReturn(null);
- Capture<LeaderControl> controlCapture = createCapture();
- listener.onLeading(capture(controlCapture));
-
- expectJoin().andReturn(endpointStatus);
- endpointStatus.leave();
- abdicate.execute();
-
- control.replay();
-
- newLeader("foo", leaderCapture);
- controlCapture.getValue().advertise();
- controlCapture.getValue().leave();
- }
-
- @Test
- public void teatLeadLeaveNoAdvertise() throws Exception {
- Capture<Leader> leaderCapture = createCapture();
-
- expect(candidate.offerLeadership(capture(leaderCapture))).andReturn(null);
- abdicate.execute();
-
- Capture<LeaderControl> controlCapture = createCapture();
- listener.onLeading(capture(controlCapture));
-
- control.replay();
-
- newLeader("foo", leaderCapture);
- controlCapture.getValue().leave();
- }
-
- @Test
- public void testLeadJoinFailure() throws Exception {
- Capture<Leader> leaderCapture = new Capture<Leader>();
-
- expect(candidate.offerLeadership(capture(leaderCapture))).andReturn(null);
- Capture<LeaderControl> controlCapture = createCapture();
- listener.onLeading(capture(controlCapture));
-
- expectJoin().andThrow(new Group.JoinException("Injected join failure.", new Exception()));
- abdicate.execute();
-
- control.replay();
-
- newLeader("foo", leaderCapture);
-
- try {
- controlCapture.getValue().advertise();
- fail("Join should have failed.");
- } catch (SingletonService.AdvertiseException e) {
- // Expected.
- }
-
- controlCapture.getValue().leave();
- }
-
- @Test(expected = IllegalStateException.class)
- public void testMultipleAdvertise() throws Exception {
- Capture<Leader> leaderCapture = createCapture();
-
- expect(candidate.offerLeadership(capture(leaderCapture))).andReturn(null);
- Capture<LeaderControl> controlCapture = createCapture();
- listener.onLeading(capture(controlCapture));
-
- expectJoin().andReturn(endpointStatus);
-
- control.replay();
-
- newLeader("foo", leaderCapture);
- controlCapture.getValue().advertise();
- controlCapture.getValue().advertise();
- }
-
- @Test(expected = IllegalStateException.class)
- public void testMultipleLeave() throws Exception {
- Capture<Leader> leaderCapture = createCapture();
-
- expect(candidate.offerLeadership(capture(leaderCapture))).andReturn(null);
- Capture<LeaderControl> controlCapture = createCapture();
- listener.onLeading(capture(controlCapture));
-
- expectJoin().andReturn(endpointStatus);
- endpointStatus.leave();
- abdicate.execute();
-
- control.replay();
-
- newLeader("foo", leaderCapture);
- controlCapture.getValue().advertise();
- controlCapture.getValue().leave();
- controlCapture.getValue().leave();
- }
-
- @Test(expected = IllegalStateException.class)
- public void testAdvertiseAfterLeave() throws Exception {
- Capture<Leader> leaderCapture = createCapture();
-
- expect(candidate.offerLeadership(capture(leaderCapture))).andReturn(null);
- Capture<LeaderControl> controlCapture = createCapture();
- listener.onLeading(capture(controlCapture));
-
- abdicate.execute();
-
- control.replay();
-
- newLeader("foo", leaderCapture);
- controlCapture.getValue().leave();
- controlCapture.getValue().advertise();
- }
-
- @Test
- public void testLeadMulti() throws Exception {
- List<Capture<Leader>> leaderCaptures = Lists.newArrayList();
- List<Capture<LeaderControl>> leaderControlCaptures = Lists.newArrayList();
-
- for (int i = 0; i < 5; i++) {
- Capture<Leader> leaderCapture = new Capture<Leader>();
- leaderCaptures.add(leaderCapture);
- Capture<LeaderControl> controlCapture = createCapture();
- leaderControlCaptures.add(controlCapture);
-
- expect(candidate.offerLeadership(capture(leaderCapture))).andReturn(null);
- listener.onLeading(capture(controlCapture));
- InetSocketAddress primary = InetSocketAddress.createUnresolved("foo" + i, PORT_A);
- Map<String, InetSocketAddress> aux =
- ImmutableMap.of("http-admin", InetSocketAddress.createUnresolved("foo" + i, PORT_B));
- expect(serverSet.join(primary, aux)).andReturn(endpointStatus);
- endpointStatus.leave();
- abdicate.execute();
- }
-
- control.replay();
-
- for (int i = 0; i < 5; i++) {
- final String leaderName = "foo" + i;
- newLeader(leaderName, leaderCaptures.get(i));
- leaderControlCaptures.get(i).getValue().advertise();
- leaderControlCaptures.get(i).getValue().leave();
- }
- }
-
- @Test
- public void testLeaderLeaves() throws Exception {
- control.replay();
- shutdownNetwork();
- }
-}
http://git-wip-us.apache.org/repos/asf/aurora/blob/15cb049f/commons/src/test/java/org/apache/aurora/common/zookeeper/ZooKeeperClientTest.java
----------------------------------------------------------------------
diff --git a/commons/src/test/java/org/apache/aurora/common/zookeeper/ZooKeeperClientTest.java b/commons/src/test/java/org/apache/aurora/common/zookeeper/ZooKeeperClientTest.java
deleted file mode 100644
index 5eee235..0000000
--- a/commons/src/test/java/org/apache/aurora/common/zookeeper/ZooKeeperClientTest.java
+++ /dev/null
@@ -1,210 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.common.zookeeper;
-
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicReference;
-
-import org.apache.aurora.common.quantity.Amount;
-import org.apache.aurora.common.quantity.Time;
-import org.apache.aurora.common.zookeeper.ZooKeeperClient.ZooKeeperConnectionException;
-import org.apache.aurora.common.zookeeper.testing.BaseZooKeeperClientTest;
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.KeeperException.ConnectionLossException;
-import org.apache.zookeeper.KeeperException.NoAuthException;
-import org.apache.zookeeper.ZooKeeper;
-import org.junit.Test;
-
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertSame;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-/**
- * @author John Sirois
- */
-public class ZooKeeperClientTest extends BaseZooKeeperClientTest {
-
- public ZooKeeperClientTest() {
- super(Amount.of(1, Time.DAYS));
- }
-
- @Test
- public void testGet() throws Exception {
- final ZooKeeperClient zkClient = createZkClient();
- shutdownNetwork();
- try {
- zkClient.get(Amount.of(50L, Time.MILLISECONDS));
- fail("Expected client connection to timeout while network down");
- } catch (TimeoutException e) {
- assertTrue(zkClient.isClosed());
- }
- assertNull(zkClient.getZooKeeperClientForTests());
-
- final CountDownLatch blockingGetComplete = new CountDownLatch(1);
- final AtomicReference<ZooKeeper> client = new AtomicReference<ZooKeeper>();
- new Thread(() -> {
- try {
- client.set(zkClient.get());
- } catch (ZooKeeperConnectionException e) {
- throw new RuntimeException(e);
- } catch (InterruptedException e) {
- throw new RuntimeException(e);
- } finally {
- blockingGetComplete.countDown();
- }
- }).start();
-
- restartNetwork();
-
- // Hung blocking connects should succeed when server connection comes up
- blockingGetComplete.await();
- assertNotNull(client.get());
-
- // New connections should succeed now that network is back up
- long sessionId = zkClient.get().getSessionId();
-
- // While connected the same client should be reused (no new connections while healthy)
- assertSame(client.get(), zkClient.get());
-
- shutdownNetwork();
- // Our client doesn't know the network is down yet so we should be able to get()
- ZooKeeper zooKeeper = zkClient.get();
- try {
- zooKeeper.exists("/", false);
- fail("Expected client operation to fail while network down");
- } catch (ConnectionLossException e) {
- // expected
- }
-
- restartNetwork();
- assertEquals("Expected connection to be re-established with existing session",
- sessionId, zkClient.get().getSessionId());
- }
-
- /**
- * Test that if a blocking get() call gets interrupted, after a connection has been created
- * but before it's connected, the zk connection gets closed.
- */
- @Test
- public void testGetInterrupted() throws Exception {
- final ZooKeeperClient zkClient = createZkClient();
- shutdownNetwork();
-
- final CountDownLatch blockingGetComplete = new CountDownLatch(1);
- final AtomicBoolean interrupted = new AtomicBoolean();
- final AtomicReference<ZooKeeper> client = new AtomicReference<ZooKeeper>();
- Thread getThread = new Thread(() -> {
- try {
- client.set(zkClient.get());
- } catch (ZooKeeperConnectionException e) {
- throw new RuntimeException(e);
- } catch (InterruptedException e) {
- interrupted.set(true);
- throw new RuntimeException(e);
- } finally {
- blockingGetComplete.countDown();
- }
- });
- getThread.start();
-
- while (zkClient.getZooKeeperClientForTests() == null) {
- Thread.sleep(100);
- }
-
- getThread.interrupt();
- blockingGetComplete.await();
-
- assertNull("The zk connection should have been closed", zkClient.getZooKeeperClientForTests());
- assertTrue("The waiter thread should have been interrupted", interrupted.get());
- assertTrue(zkClient.isClosed());
- }
-
- @Test
- public void testClose() throws Exception {
- ZooKeeperClient zkClient = createZkClient();
- zkClient.close();
-
- // Close should be idempotent
- zkClient.close();
-
- long firstSessionId = zkClient.get().getSessionId();
-
- // Close on an open client should force session re-establishment
- zkClient.close();
-
- assertNotEquals(firstSessionId, zkClient.get().getSessionId());
- }
-
- @Test
- public void testCredentials() throws Exception {
- String path = "/test";
- ZooKeeperClient authenticatedClient = createZkClient("creator", "creator");
- assertEquals(path,
- authenticatedClient.get().create(path, "42".getBytes(),
- ZooKeeperUtils.EVERYONE_READ_CREATOR_ALL, CreateMode.PERSISTENT));
-
- ZooKeeperClient unauthenticatedClient = createZkClient();
- assertEquals("42", getData(unauthenticatedClient, path));
- try {
- setData(unauthenticatedClient, path, "37");
- fail("Expected unauthenticated write attempt to fail");
- } catch (NoAuthException e) {
- assertEquals("42", getData(unauthenticatedClient, path));
- }
-
- ZooKeeperClient nonOwnerClient = createZkClient("nonowner", "nonowner");
- assertEquals("42", getData(nonOwnerClient, path));
- try {
- setData(nonOwnerClient, path, "37");
- fail("Expected non owner write attempt to fail");
- } catch (NoAuthException e) {
- assertEquals("42", getData(nonOwnerClient, path));
- }
-
- ZooKeeperClient authenticatedClient2 = createZkClient("creator", "creator");
- setData(authenticatedClient2, path, "37");
- assertEquals("37", getData(authenticatedClient2, path));
- }
-
- @Test
- public void testChrootPath() throws Exception {
- ZooKeeperClient rootClient = createZkClient();
- String rootPath = "/test";
- String subPath = "/test/subtest";
- assertEquals(rootPath,
- rootClient.get().create(rootPath, "42".getBytes(),
- ZooKeeperUtils.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT));
- assertEquals(subPath,
- rootClient.get().create(subPath, "37".getBytes(),
- ZooKeeperUtils.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT));
-
- ZooKeeperClient chrootedClient = createZkClient(rootPath);
- assertArrayEquals("37".getBytes(), chrootedClient.get().getData("/subtest", false, null));
- }
-
- private void setData(ZooKeeperClient zkClient, String path, String data) throws Exception {
- zkClient.get().setData(path, data.getBytes(), ZooKeeperUtils.ANY_VERSION);
- }
-
- private String getData(ZooKeeperClient zkClient, String path) throws Exception {
- return new String(zkClient.get().getData(path, false, null));
- }
-}
http://git-wip-us.apache.org/repos/asf/aurora/blob/15cb049f/commons/src/test/java/org/apache/aurora/common/zookeeper/ZooKeeperUtilsTest.java
----------------------------------------------------------------------
diff --git a/commons/src/test/java/org/apache/aurora/common/zookeeper/ZooKeeperUtilsTest.java b/commons/src/test/java/org/apache/aurora/common/zookeeper/ZooKeeperUtilsTest.java
index 9e482a6..5eb3c5e 100644
--- a/commons/src/test/java/org/apache/aurora/common/zookeeper/ZooKeeperUtilsTest.java
+++ b/commons/src/test/java/org/apache/aurora/common/zookeeper/ZooKeeperUtilsTest.java
@@ -13,87 +13,16 @@
*/
package org.apache.aurora.common.zookeeper;
-import com.google.common.base.Charsets;
-
-import org.apache.aurora.common.zookeeper.testing.BaseZooKeeperClientTest;
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.KeeperException.BadVersionException;
-import org.apache.zookeeper.KeeperException.NoAuthException;
-import org.apache.zookeeper.ZooDefs;
-import org.apache.zookeeper.data.Stat;
+import org.apache.aurora.common.zookeeper.testing.BaseZooKeeperTest;
import org.junit.Test;
-import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
/**
* @author John Sirois
*/
-public class ZooKeeperUtilsTest extends BaseZooKeeperClientTest {
- @Test
- public void testEnsurePath() throws Exception {
- ZooKeeperClient zkClient = createZkClient();
- zkClient.get().addAuthInfo("digest", "client1:boo".getBytes(Charsets.UTF_8));
-
- assertNull(zkClient.get().exists("/foo", false));
- ZooKeeperUtils.ensurePath(zkClient, ZooDefs.Ids.CREATOR_ALL_ACL, "/foo/bar/baz");
-
- zkClient = createZkClient();
- zkClient.get().addAuthInfo("digest", "client2:bap".getBytes(Charsets.UTF_8));
-
- // Anyone can check for existence in ZK
- assertNotNull(zkClient.get().exists("/foo", false));
- assertNotNull(zkClient.get().exists("/foo/bar", false));
- assertNotNull(zkClient.get().exists("/foo/bar/baz", false));
-
- try {
- zkClient.get().delete("/foo/bar/baz", -1 /* delete no matter what */);
- fail("Expected CREATOR_ALL_ACL to be applied to created path and client2 mutations to be "
- + "rejected");
- } catch (NoAuthException e) {
- // expected
- }
- }
-
- @Test
- public void testMagicVersionNumberAllowsUnconditionalUpdate() throws Exception {
- String nodePath = "/foo";
- ZooKeeperClient zkClient = createZkClient();
-
- zkClient.get().create(nodePath, "init".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,
- CreateMode.PERSISTENT);
-
- Stat initStat = new Stat();
- byte[] initialData = zkClient.get().getData(nodePath, false, initStat);
- assertArrayEquals("init".getBytes(), initialData);
-
- // bump the version
- Stat rev1Stat = zkClient.get().setData(nodePath, "rev1".getBytes(), initStat.getVersion());
-
- try {
- zkClient.get().setData(nodePath, "rev2".getBytes(), initStat.getVersion());
- fail("expected correct version to be required");
- } catch (BadVersionException e) {
- // expected
- }
-
- // expect using the correct version to work
- Stat rev2Stat = zkClient.get().setData(nodePath, "rev2".getBytes(), rev1Stat.getVersion());
- assertNotEquals(ZooKeeperUtils.ANY_VERSION, rev2Stat.getVersion());
-
- zkClient.get().setData(nodePath, "force-write".getBytes(), ZooKeeperUtils.ANY_VERSION);
- Stat forceWriteStat = new Stat();
- byte[] forceWriteData = zkClient.get().getData(nodePath, false, forceWriteStat);
- assertArrayEquals("force-write".getBytes(), forceWriteData);
-
- assertTrue(forceWriteStat.getVersion() > rev2Stat.getVersion());
- assertNotEquals(ZooKeeperUtils.ANY_VERSION, forceWriteStat.getVersion());
- }
+public class ZooKeeperUtilsTest extends BaseZooKeeperTest {
@Test
public void testNormalizingPath() throws Exception {
@@ -135,5 +64,4 @@ public class ZooKeeperUtilsTest extends BaseZooKeeperClientTest {
// expected
}
}
-
}
http://git-wip-us.apache.org/repos/asf/aurora/blob/15cb049f/src/main/java/org/apache/aurora/scheduler/discovery/CommonsServiceDiscoveryModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/discovery/CommonsServiceDiscoveryModule.java b/src/main/java/org/apache/aurora/scheduler/discovery/CommonsServiceDiscoveryModule.java
deleted file mode 100644
index 339f63b..0000000
--- a/src/main/java/org/apache/aurora/scheduler/discovery/CommonsServiceDiscoveryModule.java
+++ /dev/null
@@ -1,102 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.scheduler.discovery;
-
-import java.net.InetSocketAddress;
-import java.util.List;
-
-import javax.inject.Singleton;
-
-import com.google.inject.Exposed;
-import com.google.inject.PrivateModule;
-import com.google.inject.Provides;
-
-import org.apache.aurora.common.net.pool.DynamicHostSet;
-import org.apache.aurora.common.thrift.ServiceInstance;
-import org.apache.aurora.common.zookeeper.ServerSetImpl;
-import org.apache.aurora.common.zookeeper.SingletonService;
-import org.apache.aurora.common.zookeeper.SingletonServiceImpl;
-import org.apache.aurora.common.zookeeper.ZooKeeperClient;
-import org.apache.aurora.common.zookeeper.ZooKeeperUtils;
-import org.apache.aurora.scheduler.app.ServiceGroupMonitor;
-import org.apache.zookeeper.data.ACL;
-
-import static java.util.Objects.requireNonNull;
-
-/**
- * Binding module for utilities to advertise the network presence of the scheduler.
- *
- * Uses a fork of Twitter commons/zookeeper.
- */
-class CommonsServiceDiscoveryModule extends PrivateModule {
-
- private final String discoveryPath;
- private final ZooKeeperConfig zooKeeperConfig;
-
- CommonsServiceDiscoveryModule(String discoveryPath, ZooKeeperConfig zooKeeperConfig) {
- this.discoveryPath = ZooKeeperUtils.normalizePath(discoveryPath);
- this.zooKeeperConfig = requireNonNull(zooKeeperConfig);
- }
-
- @Override
- protected void configure() {
- requireBinding(ServiceDiscoveryBindings.ZOO_KEEPER_CLUSTER_KEY);
- requireBinding(ServiceDiscoveryBindings.ZOO_KEEPER_ACL_KEY);
-
- bind(ServiceGroupMonitor.class).to(CommonsServiceGroupMonitor.class).in(Singleton.class);
- expose(ServiceGroupMonitor.class);
- }
-
- @Provides
- @Singleton
- ZooKeeperClient provideZooKeeperClient(
- @ServiceDiscoveryBindings.ZooKeeper Iterable<InetSocketAddress> zooKeeperCluster) {
-
- return new ZooKeeperClient(
- zooKeeperConfig.getSessionTimeout(),
- zooKeeperConfig.getCredentials(),
- zooKeeperConfig.getChrootPath(),
- zooKeeperCluster);
- }
-
- @Provides
- @Singleton
- ServerSetImpl provideServerSet(
- ZooKeeperClient client,
- @ServiceDiscoveryBindings.ZooKeeper List<ACL> zooKeeperAcls) {
-
- return new ServerSetImpl(client, zooKeeperAcls, discoveryPath);
- }
-
- @Provides
- @Singleton
- DynamicHostSet<ServiceInstance> provideServerSet(ServerSetImpl serverSet) {
- // Used for a type re-binding of the server set.
- return serverSet;
- }
-
- // NB: We only take a ServerSetImpl instead of a ServerSet here to simplify binding.
- @Provides
- @Singleton
- @Exposed
- SingletonService provideSingletonService(
- ZooKeeperClient client,
- ServerSetImpl serverSet,
- @ServiceDiscoveryBindings.ZooKeeper List<ACL> zookeeperAcls) {
-
- return new SingletonServiceImpl(
- serverSet,
- SingletonServiceImpl.createSingletonCandidate(client, discoveryPath, zookeeperAcls));
- }
-}
http://git-wip-us.apache.org/repos/asf/aurora/blob/15cb049f/src/main/java/org/apache/aurora/scheduler/discovery/CommonsServiceGroupMonitor.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/discovery/CommonsServiceGroupMonitor.java b/src/main/java/org/apache/aurora/scheduler/discovery/CommonsServiceGroupMonitor.java
deleted file mode 100644
index 9161455..0000000
--- a/src/main/java/org/apache/aurora/scheduler/discovery/CommonsServiceGroupMonitor.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.scheduler.discovery;
-
-import java.util.Optional;
-import java.util.concurrent.atomic.AtomicReference;
-
-import javax.inject.Inject;
-
-import com.google.common.collect.ImmutableSet;
-
-import org.apache.aurora.common.base.Command;
-import org.apache.aurora.common.net.pool.DynamicHostSet;
-import org.apache.aurora.common.thrift.ServiceInstance;
-import org.apache.aurora.scheduler.app.ServiceGroupMonitor;
-
-import static java.util.Objects.requireNonNull;
-
-class CommonsServiceGroupMonitor implements ServiceGroupMonitor {
- private Optional<Command> closeCommand = Optional.empty();
- private final DynamicHostSet<ServiceInstance> serverSet;
- private final AtomicReference<ImmutableSet<ServiceInstance>> services =
- new AtomicReference<>(ImmutableSet.of());
-
- @Inject
- CommonsServiceGroupMonitor(DynamicHostSet<ServiceInstance> serverSet) {
- this.serverSet = requireNonNull(serverSet);
- }
-
- @Override
- public void start() throws MonitorException {
- try {
- closeCommand = Optional.of(serverSet.watch(services::set));
- } catch (DynamicHostSet.MonitorException e) {
- throw new MonitorException("Unable to watch scheduler host set.", e);
- }
- }
-
- @Override
- public void close() {
- closeCommand.ifPresent(Command::execute);
- }
-
- @Override
- public ImmutableSet<ServiceInstance> get() {
- return services.get();
- }
-}
http://git-wip-us.apache.org/repos/asf/aurora/blob/15cb049f/src/main/java/org/apache/aurora/scheduler/discovery/CuratorServiceDiscoveryModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/discovery/CuratorServiceDiscoveryModule.java b/src/main/java/org/apache/aurora/scheduler/discovery/CuratorServiceDiscoveryModule.java
index 40cda8c..77f90ee 100644
--- a/src/main/java/org/apache/aurora/scheduler/discovery/CuratorServiceDiscoveryModule.java
+++ b/src/main/java/org/apache/aurora/scheduler/discovery/CuratorServiceDiscoveryModule.java
@@ -37,7 +37,7 @@ import org.apache.aurora.common.quantity.Time;
import org.apache.aurora.common.stats.StatsProvider;
import org.apache.aurora.common.thrift.ServiceInstance;
import org.apache.aurora.common.zookeeper.Credentials;
-import org.apache.aurora.common.zookeeper.ServerSet;
+import org.apache.aurora.common.zookeeper.Encoding;
import org.apache.aurora.common.zookeeper.SingletonService;
import org.apache.aurora.scheduler.app.ServiceGroupMonitor;
import org.apache.curator.RetryPolicy;
@@ -76,7 +76,7 @@ class CuratorServiceDiscoveryModule extends PrivateModule {
requireBinding(ServiceDiscoveryBindings.ZOO_KEEPER_CLUSTER_KEY);
requireBinding(ServiceDiscoveryBindings.ZOO_KEEPER_ACL_KEY);
- bind(new TypeLiteral<Codec<ServiceInstance>>() { }).toInstance(ServerSet.JSON_CODEC);
+ bind(new TypeLiteral<Codec<ServiceInstance>>() { }).toInstance(Encoding.JSON_CODEC);
}
@Provides
http://git-wip-us.apache.org/repos/asf/aurora/blob/15cb049f/src/main/java/org/apache/aurora/scheduler/discovery/FlaggedZooKeeperConfig.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/discovery/FlaggedZooKeeperConfig.java b/src/main/java/org/apache/aurora/scheduler/discovery/FlaggedZooKeeperConfig.java
index 1e7b9ce..48c7bfd 100644
--- a/src/main/java/org/apache/aurora/scheduler/discovery/FlaggedZooKeeperConfig.java
+++ b/src/main/java/org/apache/aurora/scheduler/discovery/FlaggedZooKeeperConfig.java
@@ -40,13 +40,6 @@ public final class FlaggedZooKeeperConfig {
@Parameters(separators = "=")
public static class Options {
- @Parameter(names = "-zk_use_curator",
- description =
- "DEPRECATED: Uses Apache Curator as the zookeeper client; otherwise a copy of Twitter "
- + "commons/zookeeper (the legacy library) is used.",
- arity = 1)
- public boolean useCurator = true;
-
@Parameter(names = "-zk_in_proc",
description =
"Launches an embedded zookeeper server for local testing causing -zk_endpoints "
@@ -87,7 +80,6 @@ public final class FlaggedZooKeeperConfig {
*/
public static ZooKeeperConfig create(Options opts) {
return new ZooKeeperConfig(
- opts.useCurator,
opts.zkEndpoints,
Optional.fromNullable(opts.chrootPath),
opts.inProcess,
http://git-wip-us.apache.org/repos/asf/aurora/blob/15cb049f/src/main/java/org/apache/aurora/scheduler/discovery/ServiceDiscoveryModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/discovery/ServiceDiscoveryModule.java b/src/main/java/org/apache/aurora/scheduler/discovery/ServiceDiscoveryModule.java
index 917a567..7e3b6c4 100644
--- a/src/main/java/org/apache/aurora/scheduler/discovery/ServiceDiscoveryModule.java
+++ b/src/main/java/org/apache/aurora/scheduler/discovery/ServiceDiscoveryModule.java
@@ -28,7 +28,6 @@ import com.google.common.io.Files;
import com.google.common.util.concurrent.AbstractIdleService;
import com.google.inject.AbstractModule;
import com.google.inject.Inject;
-import com.google.inject.Module;
import com.google.inject.Provider;
import com.google.inject.Provides;
import com.google.inject.binder.LinkedBindingBuilder;
@@ -85,15 +84,7 @@ public class ServiceDiscoveryModule extends AbstractModule {
clusterBinder.toInstance(zooKeeperConfig.getServers());
}
- install(discoveryModule());
- }
-
- private Module discoveryModule() {
- if (zooKeeperConfig.isUseCurator()) {
- return new CuratorServiceDiscoveryModule(discoveryPath, zooKeeperConfig);
- } else {
- return new CommonsServiceDiscoveryModule(discoveryPath, zooKeeperConfig);
- }
+ install(new CuratorServiceDiscoveryModule(discoveryPath, zooKeeperConfig));
}
@Provides
http://git-wip-us.apache.org/repos/asf/aurora/blob/15cb049f/src/main/java/org/apache/aurora/scheduler/discovery/ZooKeeperConfig.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/discovery/ZooKeeperConfig.java b/src/main/java/org/apache/aurora/scheduler/discovery/ZooKeeperConfig.java
index 433ed31..1a7e8cb 100644
--- a/src/main/java/org/apache/aurora/scheduler/discovery/ZooKeeperConfig.java
+++ b/src/main/java/org/apache/aurora/scheduler/discovery/ZooKeeperConfig.java
@@ -44,13 +44,11 @@ public class ZooKeeperConfig {
/**
* Creates a new client configuration with defaults for the session timeout and credentials.
*
- * @param useCurator {@code true} to use Apache Curator; otherwise commons/zookeeper is used.
* @param servers ZooKeeper server addresses.
* @return A new configuration.
*/
- public static ZooKeeperConfig create(boolean useCurator, Iterable<InetSocketAddress> servers) {
+ public static ZooKeeperConfig create(Iterable<InetSocketAddress> servers) {
return new ZooKeeperConfig(
- useCurator,
servers,
Optional.absent(), // chrootPath
false,
@@ -59,7 +57,6 @@ public class ZooKeeperConfig {
Optional.absent()); // credentials
}
- private final boolean useCurator;
private final Iterable<InetSocketAddress> servers;
private final boolean inProcess;
private final Amount<Integer, Time> sessionTimeout;
@@ -77,7 +74,6 @@ public class ZooKeeperConfig {
* @param credentials ZooKeeper authentication credentials.
*/
ZooKeeperConfig(
- boolean useCurator,
Iterable<InetSocketAddress> servers,
Optional<String> chrootPath,
boolean inProcess,
@@ -85,7 +81,6 @@ public class ZooKeeperConfig {
Amount<Integer, Time> connectionTimeout,
Optional<Credentials> credentials) {
- this.useCurator = useCurator;
this.servers = MorePreconditions.checkNotBlank(servers);
this.chrootPath = requireNonNull(chrootPath);
this.inProcess = inProcess;
@@ -103,7 +98,6 @@ public class ZooKeeperConfig {
*/
public ZooKeeperConfig withCredentials(Credentials newCredentials) {
return new ZooKeeperConfig(
- useCurator,
servers,
chrootPath,
inProcess,
@@ -112,10 +106,6 @@ public class ZooKeeperConfig {
Optional.of(newCredentials));
}
- boolean isUseCurator() {
- return useCurator;
- }
-
Iterable<InetSocketAddress> getServers() {
return servers;
}
http://git-wip-us.apache.org/repos/asf/aurora/blob/15cb049f/src/test/java/org/apache/aurora/scheduler/app/SchedulerIT.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/app/SchedulerIT.java b/src/test/java/org/apache/aurora/scheduler/app/SchedulerIT.java
index a363e70..8e3c1de 100644
--- a/src/test/java/org/apache/aurora/scheduler/app/SchedulerIT.java
+++ b/src/test/java/org/apache/aurora/scheduler/app/SchedulerIT.java
@@ -14,6 +14,7 @@
package org.apache.aurora.scheduler.app;
import java.io.File;
+import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.Arrays;
import java.util.Set;
@@ -47,9 +48,7 @@ import org.apache.aurora.common.quantity.Amount;
import org.apache.aurora.common.quantity.Data;
import org.apache.aurora.common.stats.Stats;
import org.apache.aurora.common.zookeeper.Credentials;
-import org.apache.aurora.common.zookeeper.ServerSetImpl;
-import org.apache.aurora.common.zookeeper.ZooKeeperClient;
-import org.apache.aurora.common.zookeeper.testing.BaseZooKeeperClientTest;
+import org.apache.aurora.common.zookeeper.testing.BaseZooKeeperTest;
import org.apache.aurora.gen.HostAttributes;
import org.apache.aurora.gen.MaintenanceMode;
import org.apache.aurora.gen.ScheduleStatus;
@@ -110,10 +109,9 @@ import static org.easymock.EasyMock.createControl;
import static org.easymock.EasyMock.eq;
import static org.easymock.EasyMock.expect;
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
-public class SchedulerIT extends BaseZooKeeperClientTest {
+public class SchedulerIT extends BaseZooKeeperTest {
private static final Logger LOG = LoggerFactory.getLogger(SchedulerIT.class);
@@ -153,7 +151,6 @@ public class SchedulerIT extends BaseZooKeeperClientTest {
private Stream logStream;
private StreamMatcher streamMatcher;
private EntrySerializer entrySerializer;
- private ZooKeeperClient zkClient;
private File backupDir;
@Rule
public TemporaryFolder temporaryFolder = new TemporaryFolder();
@@ -181,11 +178,9 @@ public class SchedulerIT extends BaseZooKeeperClientTest {
entrySerializer = new EntrySerializer.EntrySerializerImpl(
Amount.of(512, Data.KB),
Hashing.md5());
-
- zkClient = createZkClient();
}
- private void startScheduler() throws Exception {
+ private Injector startScheduler() throws Exception {
// TODO(wfarner): Try to accomplish all this by subclassing SchedulerMain and actually using
// AppLauncher.
Module testModule = new AbstractModule() {
@@ -215,8 +210,8 @@ public class SchedulerIT extends BaseZooKeeperClientTest {
};
ZooKeeperConfig zkClientConfig =
ZooKeeperConfig.create(
- true, // useCurator
- ImmutableList.of(InetSocketAddress.createUnresolved("localhost", getPort())))
+ ImmutableList.of(
+ InetSocketAddress.createUnresolved("localhost", getServer().getPort())))
.withCredentials(Credentials.digestCredentials("mesos", "mesos"));
SchedulerMain main = SchedulerMain.class.newInstance();
Injector injector = Guice.createInjector(
@@ -245,21 +240,35 @@ public class SchedulerIT extends BaseZooKeeperClientTest {
});
injector.getInstance(Key.get(GuavaUtils.ServiceManagerIface.class, AppStartup.class))
.awaitHealthy();
+ return injector;
}
- private void awaitSchedulerReady() throws Exception {
+ private void awaitSchedulerReady(Injector injector) throws Exception {
executor.submit(() -> {
- ServerSetImpl schedulerService = new ServerSetImpl(zkClient, SERVERSET_PATH);
- final CountDownLatch schedulerReady = new CountDownLatch(1);
- schedulerService.watch(hostSet -> {
- if (!hostSet.isEmpty()) {
- schedulerReady.countDown();
+ ServiceGroupMonitor groupMonitor = injector.getInstance(ServiceGroupMonitor.class);
+ try {
+ // A timeout is used because certain types of assertion errors (mocks) will not surface
+ // until the main test thread exits this body of code.
+ long waited = 0;
+ while (waited < 5000) {
+ if (groupMonitor.get().isEmpty()) {
+ try {
+ Thread.sleep(100);
+ waited += 100;
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ } else {
+ break;
+ }
}
- });
- // A timeout is used because certain types of assertion errors (mocks) will not surface
- // until the main test thread exits this body of code.
- assertTrue(schedulerReady.await(5L, TimeUnit.MINUTES));
- return null;
+ } finally {
+ try {
+ groupMonitor.close();
+ } catch (IOException e) {
+ LOG.info("Failed to close:" + e, e);
+ }
+ }
}).get();
}
@@ -345,7 +354,7 @@ public class SchedulerIT extends BaseZooKeeperClientTest {
expect(driver.stop(true)).andReturn(Protos.Status.DRIVER_STOPPED).anyTimes();
control.replay();
- startScheduler();
+ Injector injector = startScheduler();
driverStarted.await();
scheduler.getValue().registered(
@@ -353,7 +362,7 @@ public class SchedulerIT extends BaseZooKeeperClientTest {
Protos.FrameworkID.newBuilder().setValue(FRAMEWORK_ID).build(),
MASTER);
- awaitSchedulerReady();
+ awaitSchedulerReady(injector);
assertEquals(0L, Stats.<Long>getVariable("task_store_PENDING").read().longValue());
assertEquals(1L, Stats.<Long>getVariable("task_store_ASSIGNED").read().longValue());