You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aurora.apache.org by dm...@apache.org on 2016/12/01 17:02:31 UTC
[2/4] aurora git commit: Revert removal of twitter/commons/zk based
leadership code
http://git-wip-us.apache.org/repos/asf/aurora/blob/16e4651d/commons/src/test/java/org/apache/aurora/common/zookeeper/ZooKeeperUtilsTest.java
----------------------------------------------------------------------
diff --git a/commons/src/test/java/org/apache/aurora/common/zookeeper/ZooKeeperUtilsTest.java b/commons/src/test/java/org/apache/aurora/common/zookeeper/ZooKeeperUtilsTest.java
new file mode 100644
index 0000000..9e482a6
--- /dev/null
+++ b/commons/src/test/java/org/apache/aurora/common/zookeeper/ZooKeeperUtilsTest.java
@@ -0,0 +1,139 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.common.zookeeper;
+
+import com.google.common.base.Charsets;
+
+import org.apache.aurora.common.zookeeper.testing.BaseZooKeeperClientTest;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException.BadVersionException;
+import org.apache.zookeeper.KeeperException.NoAuthException;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.data.Stat;
+import org.junit.Test;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * @author John Sirois
+ */
+public class ZooKeeperUtilsTest extends BaseZooKeeperClientTest {
+ @Test
+ public void testEnsurePath() throws Exception {
+ ZooKeeperClient zkClient = createZkClient();
+ zkClient.get().addAuthInfo("digest", "client1:boo".getBytes(Charsets.UTF_8));
+
+ assertNull(zkClient.get().exists("/foo", false));
+ ZooKeeperUtils.ensurePath(zkClient, ZooDefs.Ids.CREATOR_ALL_ACL, "/foo/bar/baz");
+
+ zkClient = createZkClient();
+ zkClient.get().addAuthInfo("digest", "client2:bap".getBytes(Charsets.UTF_8));
+
+ // Anyone can check for existence in ZK
+ assertNotNull(zkClient.get().exists("/foo", false));
+ assertNotNull(zkClient.get().exists("/foo/bar", false));
+ assertNotNull(zkClient.get().exists("/foo/bar/baz", false));
+
+ try {
+ zkClient.get().delete("/foo/bar/baz", -1 /* delete no matter what */);
+ fail("Expected CREATOR_ALL_ACL to be applied to created path and client2 mutations to be "
+ + "rejected");
+ } catch (NoAuthException e) {
+ // expected
+ }
+ }
+
+ @Test
+ public void testMagicVersionNumberAllowsUnconditionalUpdate() throws Exception {
+ String nodePath = "/foo";
+ ZooKeeperClient zkClient = createZkClient();
+
+ zkClient.get().create(nodePath, "init".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,
+ CreateMode.PERSISTENT);
+
+ Stat initStat = new Stat();
+ byte[] initialData = zkClient.get().getData(nodePath, false, initStat);
+ assertArrayEquals("init".getBytes(), initialData);
+
+ // bump the version
+ Stat rev1Stat = zkClient.get().setData(nodePath, "rev1".getBytes(), initStat.getVersion());
+
+ try {
+ zkClient.get().setData(nodePath, "rev2".getBytes(), initStat.getVersion());
+ fail("expected correct version to be required");
+ } catch (BadVersionException e) {
+ // expected
+ }
+
+ // expect using the correct version to work
+ Stat rev2Stat = zkClient.get().setData(nodePath, "rev2".getBytes(), rev1Stat.getVersion());
+ assertNotEquals(ZooKeeperUtils.ANY_VERSION, rev2Stat.getVersion());
+
+ zkClient.get().setData(nodePath, "force-write".getBytes(), ZooKeeperUtils.ANY_VERSION);
+ Stat forceWriteStat = new Stat();
+ byte[] forceWriteData = zkClient.get().getData(nodePath, false, forceWriteStat);
+ assertArrayEquals("force-write".getBytes(), forceWriteData);
+
+ assertTrue(forceWriteStat.getVersion() > rev2Stat.getVersion());
+ assertNotEquals(ZooKeeperUtils.ANY_VERSION, forceWriteStat.getVersion());
+ }
+
+ @Test
+ public void testNormalizingPath() throws Exception {
+ assertEquals("/", ZooKeeperUtils.normalizePath("/"));
+ assertEquals("/foo", ZooKeeperUtils.normalizePath("/foo/"));
+ assertEquals("/foo/bar", ZooKeeperUtils.normalizePath("/foo//bar"));
+ assertEquals("/foo/bar", ZooKeeperUtils.normalizePath("//foo/bar"));
+ assertEquals("/foo/bar", ZooKeeperUtils.normalizePath("/foo/bar/"));
+ assertEquals("/foo/bar", ZooKeeperUtils.normalizePath("/foo/bar//"));
+ assertEquals("/foo/bar", ZooKeeperUtils.normalizePath("/foo/bar"));
+ }
+
+ @Test
+ public void testLenientPaths() {
+ assertEquals("/", ZooKeeperUtils.normalizePath("///"));
+ assertEquals("/a/group", ZooKeeperUtils.normalizePath("/a/group"));
+ assertEquals("/a/group", ZooKeeperUtils.normalizePath("/a/group/"));
+ assertEquals("/a/group", ZooKeeperUtils.normalizePath("/a//group"));
+ assertEquals("/a/group", ZooKeeperUtils.normalizePath("/a//group//"));
+
+ try {
+ ZooKeeperUtils.normalizePath("a/group");
+ fail("Relative paths should not be allowed.");
+ } catch (IllegalArgumentException e) {
+ // expected
+ }
+
+ try {
+ ZooKeeperUtils.normalizePath("/a/./group");
+ fail("Relative paths should not be allowed.");
+ } catch (IllegalArgumentException e) {
+ // expected
+ }
+
+ try {
+ ZooKeeperUtils.normalizePath("/a/../group");
+ fail("Relative paths should not be allowed.");
+ } catch (IllegalArgumentException e) {
+ // expected
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/aurora/blob/16e4651d/config/findbugs/excludeFilter.xml
----------------------------------------------------------------------
diff --git a/config/findbugs/excludeFilter.xml b/config/findbugs/excludeFilter.xml
index 5eaa11a..f7d5ae0 100644
--- a/config/findbugs/excludeFilter.xml
+++ b/config/findbugs/excludeFilter.xml
@@ -74,14 +74,6 @@ limitations under the License.
<Bug pattern="UMAC_UNCALLABLE_METHOD_OF_ANONYMOUS_CLASS" />
</Match>
- <!-- We're forced to use the platform default encoding to generate a byte array from digest
- credentials because the underlying ZooKeeper API dictates this - also noted in the
- offending code. -->
- <Match>
- <Class name="org.apache.aurora.scheduler.discovery.Credentials" />
- <Bug pattern="DM_DEFAULT_ENCODING" />
- </Match>
-
<!-- Technical debt. -->
<Match>
<Class name="org.apache.aurora.scheduler.log.mesos.MesosLog$LogStream" />
http://git-wip-us.apache.org/repos/asf/aurora/blob/16e4651d/docs/features/service-discovery.md
----------------------------------------------------------------------
diff --git a/docs/features/service-discovery.md b/docs/features/service-discovery.md
index 511c96d..36823c8 100644
--- a/docs/features/service-discovery.md
+++ b/docs/features/service-discovery.md
@@ -6,7 +6,7 @@ the purpose of service discovery. ServerSets use the Zookeeper [group membershi
of which there are several reference implementations:
- [C++](https://github.com/apache/mesos/blob/master/src/zookeeper/group.cpp)
- - [Java](http://curator.apache.org/curator-recipes/group-member.html)
+ - [Java](https://github.com/twitter/commons/blob/master/src/java/com/twitter/common/zookeeper/ServerSetImpl.java#L221)
- [Python](https://github.com/twitter/commons/blob/master/src/python/twitter/common/zookeeper/serverset/serverset.py#L51)
These can also be used natively in Finagle using the [ZookeeperServerSetCluster](https://github.com/twitter/finagle/blob/master/finagle-serversets/src/main/scala/com/twitter/finagle/zookeeper/ZookeeperServerSetCluster.scala).
http://git-wip-us.apache.org/repos/asf/aurora/blob/16e4651d/docs/reference/scheduler-configuration.md
----------------------------------------------------------------------
diff --git a/docs/reference/scheduler-configuration.md b/docs/reference/scheduler-configuration.md
index 8955653..d4e0a9a 100644
--- a/docs/reference/scheduler-configuration.md
+++ b/docs/reference/scheduler-configuration.md
@@ -252,5 +252,7 @@ Optional flags:
Launches an embedded zookeeper server for local testing causing -zk_endpoints to be ignored if specified.
-zk_session_timeout (default (4, secs))
The ZooKeeper session timeout.
+-zk_use_curator (default true)
+ DEPRECATED: Uses Apache Curator as the zookeeper client; otherwise a copy of Twitter commons/zookeeper (the legacy library) is used.
-------------------------------------------------------------------------
```
http://git-wip-us.apache.org/repos/asf/aurora/blob/16e4651d/src/main/java/org/apache/aurora/scheduler/SchedulerLifecycle.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/SchedulerLifecycle.java b/src/main/java/org/apache/aurora/scheduler/SchedulerLifecycle.java
index 76209b1..4e354b6 100644
--- a/src/main/java/org/apache/aurora/scheduler/SchedulerLifecycle.java
+++ b/src/main/java/org/apache/aurora/scheduler/SchedulerLifecycle.java
@@ -46,8 +46,8 @@ import org.apache.aurora.common.quantity.Time;
import org.apache.aurora.common.stats.StatsProvider;
import org.apache.aurora.common.util.StateMachine;
import org.apache.aurora.common.util.StateMachine.Transition;
-import org.apache.aurora.scheduler.discovery.SingletonService;
-import org.apache.aurora.scheduler.discovery.SingletonService.LeaderControl;
+import org.apache.aurora.common.zookeeper.SingletonService;
+import org.apache.aurora.common.zookeeper.SingletonService.LeaderControl;
import org.apache.aurora.scheduler.events.PubsubEvent.DriverRegistered;
import org.apache.aurora.scheduler.events.PubsubEvent.EventSubscriber;
import org.apache.aurora.scheduler.mesos.Driver;
@@ -57,7 +57,7 @@ import org.slf4j.LoggerFactory;
import static java.util.Objects.requireNonNull;
-import static org.apache.aurora.scheduler.discovery.SingletonService.LeadershipListener;
+import static org.apache.aurora.common.zookeeper.SingletonService.LeadershipListener;
/**
* The central driver of the scheduler runtime lifecycle. Handles the transitions from startup and
http://git-wip-us.apache.org/repos/asf/aurora/blob/16e4651d/src/main/java/org/apache/aurora/scheduler/app/SchedulerMain.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/app/SchedulerMain.java b/src/main/java/org/apache/aurora/scheduler/app/SchedulerMain.java
index e0d32de..43cc5b4 100644
--- a/src/main/java/org/apache/aurora/scheduler/app/SchedulerMain.java
+++ b/src/main/java/org/apache/aurora/scheduler/app/SchedulerMain.java
@@ -42,6 +42,8 @@ import org.apache.aurora.common.args.constraints.NotEmpty;
import org.apache.aurora.common.args.constraints.NotNull;
import org.apache.aurora.common.inject.Bindings;
import org.apache.aurora.common.stats.Stats;
+import org.apache.aurora.common.zookeeper.SingletonService;
+import org.apache.aurora.common.zookeeper.SingletonService.LeadershipListener;
import org.apache.aurora.gen.ServerInfo;
import org.apache.aurora.scheduler.AppStartup;
import org.apache.aurora.scheduler.SchedulerLifecycle;
@@ -50,8 +52,6 @@ import org.apache.aurora.scheduler.configuration.executor.ExecutorModule;
import org.apache.aurora.scheduler.cron.quartz.CronModule;
import org.apache.aurora.scheduler.discovery.FlaggedZooKeeperConfig;
import org.apache.aurora.scheduler.discovery.ServiceDiscoveryModule;
-import org.apache.aurora.scheduler.discovery.SingletonService;
-import org.apache.aurora.scheduler.discovery.SingletonService.LeadershipListener;
import org.apache.aurora.scheduler.events.WebhookModule;
import org.apache.aurora.scheduler.http.HttpService;
import org.apache.aurora.scheduler.log.mesos.MesosLogStreamModule;
http://git-wip-us.apache.org/repos/asf/aurora/blob/16e4651d/src/main/java/org/apache/aurora/scheduler/app/ServiceGroupMonitor.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/app/ServiceGroupMonitor.java b/src/main/java/org/apache/aurora/scheduler/app/ServiceGroupMonitor.java
new file mode 100644
index 0000000..a1329fd
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/app/ServiceGroupMonitor.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.app;
+
+import java.io.Closeable;
+import java.util.function.Supplier;
+
+import com.google.common.collect.ImmutableSet;
+
+import org.apache.aurora.common.thrift.ServiceInstance;
+
+/**
+ * Monitors a service group's membership and supplies a live view of the most recent set.
+ */
+public interface ServiceGroupMonitor extends Supplier<ImmutableSet<ServiceInstance>>, Closeable {
+
+ /**
+ * Indicates a problem initiating monitoring of a service group.
+ */
+ class MonitorException extends Exception {
+ public MonitorException(String message, Throwable cause) {
+ super(message, cause);
+ }
+ }
+
+ /**
+ * Starts monitoring the service group.
+ *
+ * When the service group membership no longer needs to be maintained, this monitor should be
+ * {@link #close() closed}.
+ *
+ * @throws MonitorException if there is a problem initiating monitoring of the service group.
+ */
+ void start() throws MonitorException;
+}
http://git-wip-us.apache.org/repos/asf/aurora/blob/16e4651d/src/main/java/org/apache/aurora/scheduler/discovery/CommonsServiceDiscoveryModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/discovery/CommonsServiceDiscoveryModule.java b/src/main/java/org/apache/aurora/scheduler/discovery/CommonsServiceDiscoveryModule.java
new file mode 100644
index 0000000..339f63b
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/discovery/CommonsServiceDiscoveryModule.java
@@ -0,0 +1,102 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.discovery;
+
+import java.net.InetSocketAddress;
+import java.util.List;
+
+import javax.inject.Singleton;
+
+import com.google.inject.Exposed;
+import com.google.inject.PrivateModule;
+import com.google.inject.Provides;
+
+import org.apache.aurora.common.net.pool.DynamicHostSet;
+import org.apache.aurora.common.thrift.ServiceInstance;
+import org.apache.aurora.common.zookeeper.ServerSetImpl;
+import org.apache.aurora.common.zookeeper.SingletonService;
+import org.apache.aurora.common.zookeeper.SingletonServiceImpl;
+import org.apache.aurora.common.zookeeper.ZooKeeperClient;
+import org.apache.aurora.common.zookeeper.ZooKeeperUtils;
+import org.apache.aurora.scheduler.app.ServiceGroupMonitor;
+import org.apache.zookeeper.data.ACL;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Binding module for utilities to advertise the network presence of the scheduler.
+ *
+ * Uses a fork of Twitter commons/zookeeper.
+ */
+class CommonsServiceDiscoveryModule extends PrivateModule {
+
+ private final String discoveryPath;
+ private final ZooKeeperConfig zooKeeperConfig;
+
+ CommonsServiceDiscoveryModule(String discoveryPath, ZooKeeperConfig zooKeeperConfig) {
+ this.discoveryPath = ZooKeeperUtils.normalizePath(discoveryPath);
+ this.zooKeeperConfig = requireNonNull(zooKeeperConfig);
+ }
+
+ @Override
+ protected void configure() {
+ requireBinding(ServiceDiscoveryBindings.ZOO_KEEPER_CLUSTER_KEY);
+ requireBinding(ServiceDiscoveryBindings.ZOO_KEEPER_ACL_KEY);
+
+ bind(ServiceGroupMonitor.class).to(CommonsServiceGroupMonitor.class).in(Singleton.class);
+ expose(ServiceGroupMonitor.class);
+ }
+
+ @Provides
+ @Singleton
+ ZooKeeperClient provideZooKeeperClient(
+ @ServiceDiscoveryBindings.ZooKeeper Iterable<InetSocketAddress> zooKeeperCluster) {
+
+ return new ZooKeeperClient(
+ zooKeeperConfig.getSessionTimeout(),
+ zooKeeperConfig.getCredentials(),
+ zooKeeperConfig.getChrootPath(),
+ zooKeeperCluster);
+ }
+
+ @Provides
+ @Singleton
+ ServerSetImpl provideServerSet(
+ ZooKeeperClient client,
+ @ServiceDiscoveryBindings.ZooKeeper List<ACL> zooKeeperAcls) {
+
+ return new ServerSetImpl(client, zooKeeperAcls, discoveryPath);
+ }
+
+ @Provides
+ @Singleton
+ DynamicHostSet<ServiceInstance> provideServerSet(ServerSetImpl serverSet) {
+ // Used for a type re-binding of the server set.
+ return serverSet;
+ }
+
+ // NB: We only take a ServerSetImpl instead of a ServerSet here to simplify binding.
+ @Provides
+ @Singleton
+ @Exposed
+ SingletonService provideSingletonService(
+ ZooKeeperClient client,
+ ServerSetImpl serverSet,
+ @ServiceDiscoveryBindings.ZooKeeper List<ACL> zookeeperAcls) {
+
+ return new SingletonServiceImpl(
+ serverSet,
+ SingletonServiceImpl.createSingletonCandidate(client, discoveryPath, zookeeperAcls));
+ }
+}
http://git-wip-us.apache.org/repos/asf/aurora/blob/16e4651d/src/main/java/org/apache/aurora/scheduler/discovery/CommonsServiceGroupMonitor.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/discovery/CommonsServiceGroupMonitor.java b/src/main/java/org/apache/aurora/scheduler/discovery/CommonsServiceGroupMonitor.java
new file mode 100644
index 0000000..9161455
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/discovery/CommonsServiceGroupMonitor.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.discovery;
+
+import java.util.Optional;
+import java.util.concurrent.atomic.AtomicReference;
+
+import javax.inject.Inject;
+
+import com.google.common.collect.ImmutableSet;
+
+import org.apache.aurora.common.base.Command;
+import org.apache.aurora.common.net.pool.DynamicHostSet;
+import org.apache.aurora.common.thrift.ServiceInstance;
+import org.apache.aurora.scheduler.app.ServiceGroupMonitor;
+
+import static java.util.Objects.requireNonNull;
+
+class CommonsServiceGroupMonitor implements ServiceGroupMonitor {
+ private Optional<Command> closeCommand = Optional.empty();
+ private final DynamicHostSet<ServiceInstance> serverSet;
+ private final AtomicReference<ImmutableSet<ServiceInstance>> services =
+ new AtomicReference<>(ImmutableSet.of());
+
+ @Inject
+ CommonsServiceGroupMonitor(DynamicHostSet<ServiceInstance> serverSet) {
+ this.serverSet = requireNonNull(serverSet);
+ }
+
+ @Override
+ public void start() throws MonitorException {
+ try {
+ closeCommand = Optional.of(serverSet.watch(services::set));
+ } catch (DynamicHostSet.MonitorException e) {
+ throw new MonitorException("Unable to watch scheduler host set.", e);
+ }
+ }
+
+ @Override
+ public void close() {
+ closeCommand.ifPresent(Command::execute);
+ }
+
+ @Override
+ public ImmutableSet<ServiceInstance> get() {
+ return services.get();
+ }
+}
http://git-wip-us.apache.org/repos/asf/aurora/blob/16e4651d/src/main/java/org/apache/aurora/scheduler/discovery/Credentials.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/discovery/Credentials.java b/src/main/java/org/apache/aurora/scheduler/discovery/Credentials.java
deleted file mode 100644
index 75d58e7..0000000
--- a/src/main/java/org/apache/aurora/scheduler/discovery/Credentials.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.scheduler.discovery;
-
-import java.util.Arrays;
-
-import com.google.common.base.Objects;
-import com.google.common.base.Preconditions;
-
-import org.apache.aurora.common.base.MorePreconditions;
-import org.apache.commons.lang.builder.EqualsBuilder;
-
-import static java.util.Objects.requireNonNull;
-
-/**
- * Encapsulates a user's ZooKeeper credentials.
- */
-public final class Credentials {
-
- /**
- * Creates a set of credentials for the ZooKeeper digest authentication mechanism.
- *
- * @param username the username to authenticate with
- * @param password the password to authenticate with
- * @return a set of credentials that can be used to authenticate the zoo keeper client
- */
- public static Credentials digestCredentials(String username, String password) {
- MorePreconditions.checkNotBlank(username);
- Preconditions.checkNotNull(password);
-
- // TODO(John Sirois): DigestAuthenticationProvider is broken - uses platform default charset
- // (on server) and so we just have to hope here that clients are deployed in compatible jvms.
- // Consider writing and installing a version of DigestAuthenticationProvider that controls its
- // Charset explicitly.
- return new Credentials("digest", (username + ":" + password).getBytes());
- }
-
- private final String authScheme;
- private final byte[] authToken;
-
- /**
- * Creates a new set of credentials for the given ZooKeeper authentication scheme.
- *
- * @param scheme The name of the authentication scheme the {@code token} is valid in.
- * @param token The authentication token for the given {@code scheme}.
- */
- public Credentials(String scheme, byte[] token) {
- authScheme = MorePreconditions.checkNotBlank(scheme);
- authToken = requireNonNull(token);
- }
-
- /**
- * Returns the authentication scheme these credentials are for.
- *
- * @return the scheme these credentials are for.
- */
- public String scheme() {
- return authScheme;
- }
-
- /**
- * Returns the authentication token.
- *
- * @return the authentication token.
- */
- public byte[] token() {
- return Arrays.copyOf(authToken, authToken.length);
- }
-
- @Override
- public boolean equals(Object o) {
- if (!(o instanceof Credentials)) {
- return false;
- }
-
- Credentials other = (Credentials) o;
- return new EqualsBuilder()
- .append(authScheme, other.scheme())
- .append(authToken, other.token())
- .isEquals();
- }
-
- @Override
- public int hashCode() {
- return Objects.hashCode(authScheme, authToken);
- }
-}
http://git-wip-us.apache.org/repos/asf/aurora/blob/16e4651d/src/main/java/org/apache/aurora/scheduler/discovery/CuratorServiceDiscoveryModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/discovery/CuratorServiceDiscoveryModule.java b/src/main/java/org/apache/aurora/scheduler/discovery/CuratorServiceDiscoveryModule.java
index e690d14..999a542 100644
--- a/src/main/java/org/apache/aurora/scheduler/discovery/CuratorServiceDiscoveryModule.java
+++ b/src/main/java/org/apache/aurora/scheduler/discovery/CuratorServiceDiscoveryModule.java
@@ -33,6 +33,10 @@ import org.apache.aurora.common.net.InetSocketAddressHelper;
import org.apache.aurora.common.quantity.Amount;
import org.apache.aurora.common.quantity.Time;
import org.apache.aurora.common.thrift.ServiceInstance;
+import org.apache.aurora.common.zookeeper.Credentials;
+import org.apache.aurora.common.zookeeper.ServerSet;
+import org.apache.aurora.common.zookeeper.SingletonService;
+import org.apache.aurora.scheduler.app.ServiceGroupMonitor;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
@@ -64,7 +68,7 @@ class CuratorServiceDiscoveryModule extends PrivateModule {
requireBinding(ServiceDiscoveryBindings.ZOO_KEEPER_CLUSTER_KEY);
requireBinding(ServiceDiscoveryBindings.ZOO_KEEPER_ACL_KEY);
- bind(new TypeLiteral<Codec<ServiceInstance>>() { }).toInstance(JsonCodec.INSTANCE);
+ bind(new TypeLiteral<Codec<ServiceInstance>>() { }).toInstance(ServerSet.JSON_CODEC);
}
@Provides
@@ -102,7 +106,7 @@ class CuratorServiceDiscoveryModule extends PrivateModule {
if (zooKeeperConfig.getCredentials().isPresent()) {
Credentials credentials = zooKeeperConfig.getCredentials().get();
- builder.authorization(credentials.scheme(), credentials.token());
+ builder.authorization(credentials.scheme(), credentials.authToken());
}
CuratorFramework curatorFramework = builder.build();
http://git-wip-us.apache.org/repos/asf/aurora/blob/16e4651d/src/main/java/org/apache/aurora/scheduler/discovery/CuratorServiceGroupMonitor.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/discovery/CuratorServiceGroupMonitor.java b/src/main/java/org/apache/aurora/scheduler/discovery/CuratorServiceGroupMonitor.java
index db886df..0b86fb6 100644
--- a/src/main/java/org/apache/aurora/scheduler/discovery/CuratorServiceGroupMonitor.java
+++ b/src/main/java/org/apache/aurora/scheduler/discovery/CuratorServiceGroupMonitor.java
@@ -24,6 +24,7 @@ import org.apache.aurora.GuavaUtils;
import org.apache.aurora.common.io.Codec;
import org.apache.aurora.common.thrift.ServiceInstance;
import org.apache.aurora.scheduler.app.SchedulerMain;
+import org.apache.aurora.scheduler.app.ServiceGroupMonitor;
import org.apache.curator.framework.recipes.cache.ChildData;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.utils.ZKPaths;
http://git-wip-us.apache.org/repos/asf/aurora/blob/16e4651d/src/main/java/org/apache/aurora/scheduler/discovery/CuratorSingletonService.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/discovery/CuratorSingletonService.java b/src/main/java/org/apache/aurora/scheduler/discovery/CuratorSingletonService.java
index 321bbb3..c378172 100644
--- a/src/main/java/org/apache/aurora/scheduler/discovery/CuratorSingletonService.java
+++ b/src/main/java/org/apache/aurora/scheduler/discovery/CuratorSingletonService.java
@@ -27,6 +27,7 @@ import org.apache.aurora.common.io.Codec;
import org.apache.aurora.common.thrift.Endpoint;
import org.apache.aurora.common.thrift.ServiceInstance;
import org.apache.aurora.common.thrift.Status;
+import org.apache.aurora.common.zookeeper.SingletonService;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.leader.LeaderLatch;
import org.apache.curator.framework.recipes.leader.LeaderLatchListener;
http://git-wip-us.apache.org/repos/asf/aurora/blob/16e4651d/src/main/java/org/apache/aurora/scheduler/discovery/FlaggedZooKeeperConfig.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/discovery/FlaggedZooKeeperConfig.java b/src/main/java/org/apache/aurora/scheduler/discovery/FlaggedZooKeeperConfig.java
index d5019bf..e8aafe4 100644
--- a/src/main/java/org/apache/aurora/scheduler/discovery/FlaggedZooKeeperConfig.java
+++ b/src/main/java/org/apache/aurora/scheduler/discovery/FlaggedZooKeeperConfig.java
@@ -15,10 +15,10 @@ package org.apache.aurora.scheduler.discovery;
import java.net.InetSocketAddress;
import java.util.List;
-import java.util.Optional;
import javax.annotation.Nullable;
+import com.google.common.base.Optional;
import com.google.common.base.Splitter;
import com.google.common.collect.ImmutableList;
@@ -27,12 +27,23 @@ import org.apache.aurora.common.args.CmdLine;
import org.apache.aurora.common.args.constraints.NotEmpty;
import org.apache.aurora.common.quantity.Amount;
import org.apache.aurora.common.quantity.Time;
+import org.apache.aurora.common.zookeeper.Credentials;
+import org.apache.aurora.common.zookeeper.ZooKeeperUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* A factory that creates a {@link ZooKeeperConfig} instance based on command line argument
* values.
*/
public final class FlaggedZooKeeperConfig {
+ private static final Logger LOG = LoggerFactory.getLogger(FlaggedZooKeeperConfig.class);
+
+ @CmdLine(name = "zk_use_curator",
+ help = "DEPRECATED: Uses Apache Curator as the zookeeper client; otherwise a copy of Twitter "
+ + "commons/zookeeper (the legacy library) is used.")
+ private static final Arg<Boolean> USE_CURATOR = Arg.create(true);
+
@CmdLine(name = "zk_in_proc",
help = "Launches an embedded zookeeper server for local testing causing -zk_endpoints "
+ "to be ignored if specified.")
@@ -63,9 +74,13 @@ public final class FlaggedZooKeeperConfig {
* @return Configuration instance.
*/
public static ZooKeeperConfig create() {
+ if (USE_CURATOR.hasAppliedValue()) {
+ LOG.warn("The -zk_use_curator flag is deprecated and will be removed in a future release.");
+ }
return new ZooKeeperConfig(
+ USE_CURATOR.get(),
ZK_ENDPOINTS.get(),
- Optional.ofNullable(CHROOT_PATH.get()),
+ Optional.fromNullable(CHROOT_PATH.get()),
IN_PROCESS.get(),
SESSION_TIMEOUT.get(),
getCredentials(DIGEST_CREDENTIALS.get()));
@@ -73,7 +88,7 @@ public final class FlaggedZooKeeperConfig {
private static Optional<Credentials> getCredentials(@Nullable String userAndPass) {
if (userAndPass == null) {
- return Optional.empty();
+ return Optional.absent();
}
List<String> parts = ImmutableList.copyOf(Splitter.on(":").split(userAndPass));
http://git-wip-us.apache.org/repos/asf/aurora/blob/16e4651d/src/main/java/org/apache/aurora/scheduler/discovery/JsonCodec.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/discovery/JsonCodec.java b/src/main/java/org/apache/aurora/scheduler/discovery/JsonCodec.java
deleted file mode 100644
index 9d22b76..0000000
--- a/src/main/java/org/apache/aurora/scheduler/discovery/JsonCodec.java
+++ /dev/null
@@ -1,147 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.scheduler.discovery;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-import java.io.OutputStream;
-import java.io.OutputStreamWriter;
-import java.io.Writer;
-import java.nio.charset.Charset;
-import java.util.Map;
-
-import javax.annotation.Nullable;
-
-import com.google.common.base.Charsets;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Maps;
-import com.google.gson.Gson;
-import com.google.gson.JsonIOException;
-import com.google.gson.JsonParseException;
-
-import org.apache.aurora.common.io.Codec;
-import org.apache.aurora.common.thrift.Endpoint;
-import org.apache.aurora.common.thrift.ServiceInstance;
-import org.apache.aurora.common.thrift.Status;
-
-import static java.util.Objects.requireNonNull;
-
-/**
- * Encodes a {@link ServiceInstance} as a JSON object.
- */
-class JsonCodec implements Codec<ServiceInstance> {
-
- private static void assertRequiredField(String fieldName, Object fieldValue) {
- if (fieldValue == null) {
- throw new JsonParseException(String.format("Field %s is required", fieldName));
- }
- }
-
- private static class EndpointSchema {
- private final String host;
- private final Integer port;
-
- EndpointSchema(Endpoint endpoint) {
- host = endpoint.getHost();
- port = endpoint.getPort();
- }
-
- Endpoint asEndpoint() {
- assertRequiredField("host", host);
- assertRequiredField("port", port);
-
- return new Endpoint(host, port);
- }
- }
-
- private static class ServiceInstanceSchema {
- private final EndpointSchema serviceEndpoint;
- private final Map<String, EndpointSchema> additionalEndpoints;
- private final Status status;
- @Nullable private final Integer shard;
-
- ServiceInstanceSchema(ServiceInstance instance) {
- serviceEndpoint = new EndpointSchema(instance.getServiceEndpoint());
- if (instance.isSetAdditionalEndpoints()) {
- additionalEndpoints =
- Maps.transformValues(instance.getAdditionalEndpoints(), EndpointSchema::new);
- } else {
- additionalEndpoints = ImmutableMap.of();
- }
- status = instance.getStatus();
- shard = instance.isSetShard() ? instance.getShard() : null;
- }
-
- ServiceInstance asServiceInstance() {
- assertRequiredField("serviceEndpoint", serviceEndpoint);
- assertRequiredField("status", status);
-
- Map<String, EndpointSchema> extraEndpoints =
- additionalEndpoints == null ? ImmutableMap.of() : additionalEndpoints;
-
- ServiceInstance instance =
- new ServiceInstance(
- serviceEndpoint.asEndpoint(),
- Maps.transformValues(extraEndpoints, EndpointSchema::asEndpoint),
- status);
- if (shard != null) {
- instance.setShard(shard);
- }
- return instance;
- }
- }
-
- /**
- * The encoding for service instance data in ZooKeeper expected by Aurora clients.
- */
- static final Codec<ServiceInstance> INSTANCE = new JsonCodec();
-
- private static final Charset ENCODING = Charsets.UTF_8;
-
- private final Gson gson;
-
- private JsonCodec() {
- this(new Gson());
- }
-
- JsonCodec(Gson gson) {
- this.gson = requireNonNull(gson);
- }
-
- @Override
- public void serialize(ServiceInstance instance, OutputStream sink) throws IOException {
- Writer writer = new OutputStreamWriter(sink, ENCODING);
- try {
- gson.toJson(new ServiceInstanceSchema(instance), writer);
- } catch (JsonIOException e) {
- throw new IOException(String.format("Problem serializing %s to JSON", instance), e);
- }
- writer.flush();
- }
-
- @Override
- public ServiceInstance deserialize(InputStream source) throws IOException {
- InputStreamReader reader = new InputStreamReader(source, ENCODING);
- try {
- @Nullable ServiceInstanceSchema schema = gson.fromJson(reader, ServiceInstanceSchema.class);
- if (schema == null) {
- throw new IOException("JSON did not include a ServiceInstance object");
- }
- return schema.asServiceInstance();
- } catch (JsonParseException e) {
- throw new IOException("Problem parsing JSON ServiceInstance.", e);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/aurora/blob/16e4651d/src/main/java/org/apache/aurora/scheduler/discovery/ServiceDiscoveryModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/discovery/ServiceDiscoveryModule.java b/src/main/java/org/apache/aurora/scheduler/discovery/ServiceDiscoveryModule.java
index b7ca62c..07a6302 100644
--- a/src/main/java/org/apache/aurora/scheduler/discovery/ServiceDiscoveryModule.java
+++ b/src/main/java/org/apache/aurora/scheduler/discovery/ServiceDiscoveryModule.java
@@ -25,14 +25,16 @@ import com.google.common.io.Files;
import com.google.common.util.concurrent.AbstractIdleService;
import com.google.inject.AbstractModule;
import com.google.inject.Inject;
+import com.google.inject.Module;
import com.google.inject.Provider;
import com.google.inject.Provides;
import com.google.inject.binder.LinkedBindingBuilder;
import org.apache.aurora.common.application.ShutdownRegistry;
import org.apache.aurora.common.base.MorePreconditions;
+import org.apache.aurora.common.zookeeper.ZooKeeperUtils;
+import org.apache.aurora.common.zookeeper.testing.ZooKeeperTestServer;
import org.apache.aurora.scheduler.SchedulerServicesModule;
-import org.apache.aurora.scheduler.discovery.testing.ZooKeeperTestServer;
import org.apache.zookeeper.data.ACL;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -44,15 +46,15 @@ import static java.util.Objects.requireNonNull;
*/
public class ServiceDiscoveryModule extends AbstractModule {
- private static final Logger LOG = LoggerFactory.getLogger(ServiceDiscoveryModule.class);
+ private static final Logger LOG = LoggerFactory.getLogger(CommonsServiceDiscoveryModule.class);
private final ZooKeeperConfig zooKeeperConfig;
private final String discoveryPath;
/**
* Creates a Guice module that will bind a
- * {@link SingletonService} for scheduler leader election and a
- * {@link ServiceGroupMonitor} that can be used to find the
+ * {@link org.apache.aurora.common.zookeeper.SingletonService} for scheduler leader election and a
+ * {@link org.apache.aurora.scheduler.app.ServiceGroupMonitor} that can be used to find the
* leading scheduler.
*
* @param zooKeeperConfig The ZooKeeper client configuration to use to interact with ZooKeeper.
@@ -80,7 +82,15 @@ public class ServiceDiscoveryModule extends AbstractModule {
clusterBinder.toInstance(zooKeeperConfig.getServers());
}
- install(new CuratorServiceDiscoveryModule(discoveryPath, zooKeeperConfig));
+ install(discoveryModule());
+ }
+
+ private Module discoveryModule() {
+ if (zooKeeperConfig.isUseCurator()) {
+ return new CuratorServiceDiscoveryModule(discoveryPath, zooKeeperConfig);
+ } else {
+ return new CommonsServiceDiscoveryModule(discoveryPath, zooKeeperConfig);
+ }
}
@Provides
http://git-wip-us.apache.org/repos/asf/aurora/blob/16e4651d/src/main/java/org/apache/aurora/scheduler/discovery/ServiceGroupMonitor.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/discovery/ServiceGroupMonitor.java b/src/main/java/org/apache/aurora/scheduler/discovery/ServiceGroupMonitor.java
deleted file mode 100644
index fea896c..0000000
--- a/src/main/java/org/apache/aurora/scheduler/discovery/ServiceGroupMonitor.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.scheduler.discovery;
-
-import java.io.Closeable;
-import java.util.function.Supplier;
-
-import com.google.common.collect.ImmutableSet;
-
-import org.apache.aurora.common.thrift.ServiceInstance;
-
-/**
- * Monitors a service group's membership and supplies a live view of the most recent set.
- */
-public interface ServiceGroupMonitor extends Supplier<ImmutableSet<ServiceInstance>>, Closeable {
-
- /**
- * Indicates a problem initiating monitoring of a service group.
- */
- class MonitorException extends Exception {
- MonitorException(String message, Throwable cause) {
- super(message, cause);
- }
- }
-
- /**
- * Starts monitoring the service group.
- *
- * When the service group membership no longer needs to be maintained, this monitor should be
- * {@link #close() closed}.
- *
- * @throws MonitorException if there is a problem initiating monitoring of the service group.
- */
- void start() throws MonitorException;
-}
http://git-wip-us.apache.org/repos/asf/aurora/blob/16e4651d/src/main/java/org/apache/aurora/scheduler/discovery/SingletonService.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/discovery/SingletonService.java b/src/main/java/org/apache/aurora/scheduler/discovery/SingletonService.java
deleted file mode 100644
index adbc318..0000000
--- a/src/main/java/org/apache/aurora/scheduler/discovery/SingletonService.java
+++ /dev/null
@@ -1,114 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.scheduler.discovery;
-
-import java.net.InetSocketAddress;
-import java.util.Map;
-
-/**
- * A service that uses master election to only allow a single service instance to be active amongst
- * a set of potential servers at a time.
- */
-public interface SingletonService {
-
- /**
- * Indicates an error attempting to lead a group of servers.
- */
- class LeadException extends Exception {
- LeadException(String message, Throwable cause) {
- super(message, cause);
- }
- }
-
- /**
- * Indicates an error attempting to advertise leadership of a group of servers.
- */
- class AdvertiseException extends Exception {
- AdvertiseException(String message) {
- super(message);
- }
-
- AdvertiseException(String message, Throwable cause) {
- super(message, cause);
- }
- }
-
- /**
- * Indicates an error attempting to leave a group of servers, abdicating leadership of the group.
- */
- class LeaveException extends Exception {
- LeaveException(String message, Throwable cause) {
- super(message, cause);
- }
- }
-
- /**
- * Attempts to lead the singleton service.
- *
- * @param endpoint The primary endpoint to register as a leader candidate in the service.
- * @param additionalEndpoints Additional endpoints that are available on the host.
- * @param listener Handler to call when the candidate is elected or defeated.
- * @throws LeadException If there was a problem joining or watching the ZooKeeper group.
- * @throws InterruptedException If the thread watching/joining the group was interrupted.
- */
- void lead(
- InetSocketAddress endpoint,
- Map<String, InetSocketAddress> additionalEndpoints,
- LeadershipListener listener)
- throws LeadException, InterruptedException;
-
- /**
- * A listener to be notified of changes in the leadership status.
- * Implementers should be careful to avoid blocking operations in these callbacks.
- */
- interface LeadershipListener {
-
- /**
- * Notifies the listener that is is current leader.
- *
- * @param control A controller handle to advertise and/or leave advertised presence.
- */
- void onLeading(LeaderControl control);
-
- /**
- * Notifies the listener that it is no longer leader.
- */
- void onDefeated();
- }
-
- /**
- * A controller for the state of the leader. This will be provided to the leader upon election,
- * which allows the leader to decide when to advertise as leader of the server set and terminate
- * leadership at will.
- */
- interface LeaderControl {
-
- /**
- * Advertises the leader's server presence to clients.
- *
- * @throws AdvertiseException If there was an error advertising the singleton leader to clients
- * of the server set.
- * @throws InterruptedException If interrupted while advertising.
- */
- void advertise() throws AdvertiseException, InterruptedException;
-
- /**
- * Leaves candidacy for leadership, removing advertised server presence if applicable.
- *
- * @throws LeaveException If the leader's status could not be updated or there was an error
- * abdicating server set leadership.
- */
- void leave() throws LeaveException;
- }
-}
http://git-wip-us.apache.org/repos/asf/aurora/blob/16e4651d/src/main/java/org/apache/aurora/scheduler/discovery/ZooKeeperConfig.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/discovery/ZooKeeperConfig.java b/src/main/java/org/apache/aurora/scheduler/discovery/ZooKeeperConfig.java
index acb7905..3f32a62 100644
--- a/src/main/java/org/apache/aurora/scheduler/discovery/ZooKeeperConfig.java
+++ b/src/main/java/org/apache/aurora/scheduler/discovery/ZooKeeperConfig.java
@@ -14,11 +14,14 @@
package org.apache.aurora.scheduler.discovery;
import java.net.InetSocketAddress;
-import java.util.Optional;
+
+import com.google.common.base.Optional;
import org.apache.aurora.common.base.MorePreconditions;
import org.apache.aurora.common.quantity.Amount;
import org.apache.aurora.common.quantity.Time;
+import org.apache.aurora.common.zookeeper.Credentials;
+import org.apache.aurora.common.zookeeper.ZooKeeperUtils;
import static java.util.Objects.requireNonNull;
@@ -32,18 +35,21 @@ public class ZooKeeperConfig {
/**
* Creates a new client configuration with defaults for the session timeout and credentials.
*
+ * @param useCurator {@code true} to use Apache Curator; otherwise commons/zookeeper is used.
* @param servers ZooKeeper server addresses.
* @return A new configuration.
*/
- public static ZooKeeperConfig create(Iterable<InetSocketAddress> servers) {
+ public static ZooKeeperConfig create(boolean useCurator, Iterable<InetSocketAddress> servers) {
return new ZooKeeperConfig(
+ useCurator,
servers,
- Optional.empty(), // chrootPath
+ Optional.absent(), // chrootPath
false,
ZooKeeperUtils.DEFAULT_ZK_SESSION_TIMEOUT,
- Optional.empty()); // credentials
+ Optional.absent()); // credentials
}
+ private final boolean useCurator;
private final Iterable<InetSocketAddress> servers;
private final boolean inProcess;
private final Amount<Integer, Time> sessionTimeout;
@@ -60,12 +66,14 @@ public class ZooKeeperConfig {
* @param credentials ZooKeeper authentication credentials.
*/
ZooKeeperConfig(
+ boolean useCurator,
Iterable<InetSocketAddress> servers,
Optional<String> chrootPath,
boolean inProcess,
Amount<Integer, Time> sessionTimeout,
Optional<Credentials> credentials) {
+ this.useCurator = useCurator;
this.servers = MorePreconditions.checkNotBlank(servers);
this.chrootPath = requireNonNull(chrootPath);
this.inProcess = inProcess;
@@ -82,6 +90,7 @@ public class ZooKeeperConfig {
*/
public ZooKeeperConfig withCredentials(Credentials newCredentials) {
return new ZooKeeperConfig(
+ useCurator,
servers,
chrootPath,
inProcess,
@@ -89,6 +98,10 @@ public class ZooKeeperConfig {
Optional.of(newCredentials));
}
+ boolean isUseCurator() {
+ return useCurator;
+ }
+
public Iterable<InetSocketAddress> getServers() {
return servers;
}
http://git-wip-us.apache.org/repos/asf/aurora/blob/16e4651d/src/main/java/org/apache/aurora/scheduler/discovery/ZooKeeperUtils.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/discovery/ZooKeeperUtils.java b/src/main/java/org/apache/aurora/scheduler/discovery/ZooKeeperUtils.java
deleted file mode 100644
index 211aa50..0000000
--- a/src/main/java/org/apache/aurora/scheduler/discovery/ZooKeeperUtils.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.scheduler.discovery;
-
-import com.google.common.collect.ImmutableList;
-
-import org.apache.aurora.common.quantity.Amount;
-import org.apache.aurora.common.quantity.Time;
-import org.apache.zookeeper.ZooDefs.Ids;
-import org.apache.zookeeper.data.ACL;
-
-/**
- * Utilities for dealing with ZooKeeper.
- */
-final class ZooKeeperUtils {
-
- /**
- * An appropriate default session timeout for ZooKeeper clusters.
- */
- static final Amount<Integer, Time> DEFAULT_ZK_SESSION_TIMEOUT = Amount.of(4, Time.SECONDS);
-
- /**
- * An ACL that gives all permissions any user authenticated or not.
- */
- static final ImmutableList<ACL> OPEN_ACL_UNSAFE =
- ImmutableList.copyOf(Ids.OPEN_ACL_UNSAFE);
-
- /**
- * An ACL that gives all permissions to node creators and read permissions only to everyone else.
- */
- static final ImmutableList<ACL> EVERYONE_READ_CREATOR_ALL =
- ImmutableList.<ACL>builder()
- .addAll(Ids.CREATOR_ALL_ACL)
- .addAll(Ids.READ_ACL_UNSAFE)
- .build();
-
- private ZooKeeperUtils() {
- // utility
- }
-}
http://git-wip-us.apache.org/repos/asf/aurora/blob/16e4651d/src/main/java/org/apache/aurora/scheduler/discovery/testing/BaseZooKeeperTest.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/discovery/testing/BaseZooKeeperTest.java b/src/main/java/org/apache/aurora/scheduler/discovery/testing/BaseZooKeeperTest.java
deleted file mode 100644
index d84037e..0000000
--- a/src/main/java/org/apache/aurora/scheduler/discovery/testing/BaseZooKeeperTest.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.scheduler.discovery.testing;
-
-import org.apache.aurora.common.testing.TearDownTestCase;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.rules.TemporaryFolder;
-
-/**
- * A base-class for in-process zookeeper tests.
- */
-public abstract class BaseZooKeeperTest extends TearDownTestCase {
-
- private ZooKeeperTestServer zkTestServer;
-
- @Rule
- public TemporaryFolder tmpFolder = new TemporaryFolder();
-
- @Before
- public final void setUp() throws Exception {
- zkTestServer = new ZooKeeperTestServer(tmpFolder.newFolder(), tmpFolder.newFolder());
- addTearDown(zkTestServer::stop);
- zkTestServer.startNetwork();
- }
-
- /**
- * Returns the running in-process ZooKeeper server.
- *
- * @return The in-process ZooKeeper server.
- */
- protected final ZooKeeperTestServer getServer() {
- return zkTestServer;
- }
-
- /**
- * Returns the current port to connect to the in-process zookeeper instance.
- */
- protected final int getPort() {
- return getServer().getPort();
- }
-}
http://git-wip-us.apache.org/repos/asf/aurora/blob/16e4651d/src/main/java/org/apache/aurora/scheduler/discovery/testing/ZooKeeperTestServer.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/discovery/testing/ZooKeeperTestServer.java b/src/main/java/org/apache/aurora/scheduler/discovery/testing/ZooKeeperTestServer.java
deleted file mode 100644
index a7bb48b..0000000
--- a/src/main/java/org/apache/aurora/scheduler/discovery/testing/ZooKeeperTestServer.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.scheduler.discovery.testing;
-
-import java.io.File;
-import java.io.IOException;
-import java.net.InetSocketAddress;
-
-import com.google.common.base.Preconditions;
-
-import org.apache.zookeeper.server.NIOServerCnxnFactory;
-import org.apache.zookeeper.server.ServerCnxnFactory;
-import org.apache.zookeeper.server.ZooKeeperServer;
-import org.apache.zookeeper.server.ZooKeeperServer.BasicDataTreeBuilder;
-import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
-
-/**
- * A helper class for starting in-process ZooKeeper server and clients.
- *
- * <p>This is ONLY meant to be used for testing.
- */
-public class ZooKeeperTestServer {
-
- private final File dataDir;
- private final File snapDir;
-
- private ZooKeeperServer zooKeeperServer;
- private ServerCnxnFactory connectionFactory;
- private int port;
-
- public ZooKeeperTestServer(File dataDir, File snapDir) {
- this.dataDir = Preconditions.checkNotNull(dataDir);
- this.snapDir = Preconditions.checkNotNull(snapDir);
- }
-
- /**
- * Starts zookeeper up on an ephemeral port.
- */
- public void startNetwork() throws IOException, InterruptedException {
- zooKeeperServer =
- new ZooKeeperServer(
- new FileTxnSnapLog(dataDir, snapDir),
- new BasicDataTreeBuilder()) {
-
- // TODO(John Sirois): Introduce a builder to configure the in-process server if and when
- // some folks need JMX for in-process tests.
- @Override protected void registerJMX() {
- // noop
- }
- };
-
- connectionFactory = new NIOServerCnxnFactory();
- connectionFactory.configure(
- new InetSocketAddress(port),
- 60 /* Semi-arbitrary, max 60 connections is the default used by NIOServerCnxnFactory */);
- connectionFactory.startup(zooKeeperServer);
- port = zooKeeperServer.getClientPort();
- }
-
- /**
- * Stops the zookeeper server.
- */
- public void stop() {
- if (connectionFactory != null) {
- connectionFactory.shutdown(); // Also shuts down zooKeeperServer.
- connectionFactory = null;
- }
- }
-
- /**
- * Expires the client session with the given {@code sessionId}.
- *
- * @param sessionId The id of the client session to expire.
- */
- public final void expireClientSession(long sessionId) {
- zooKeeperServer.closeSession(sessionId);
- }
-
- /**
- * Returns the current port to connect to the in-process zookeeper instance.
- */
- public final int getPort() {
- checkEphemeralPortAssigned();
- return port;
- }
-
- private void checkEphemeralPortAssigned() {
- Preconditions.checkState(port > 0, "startNetwork must be called first");
- }
-}
http://git-wip-us.apache.org/repos/asf/aurora/blob/16e4651d/src/main/java/org/apache/aurora/scheduler/http/JettyServerModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/http/JettyServerModule.java b/src/main/java/org/apache/aurora/scheduler/http/JettyServerModule.java
index af8567f..53ebc0b 100644
--- a/src/main/java/org/apache/aurora/scheduler/http/JettyServerModule.java
+++ b/src/main/java/org/apache/aurora/scheduler/http/JettyServerModule.java
@@ -64,7 +64,7 @@ import org.apache.aurora.common.net.http.handlers.TimeSeriesDataSource;
import org.apache.aurora.common.net.http.handlers.VarsHandler;
import org.apache.aurora.common.net.http.handlers.VarsJsonHandler;
import org.apache.aurora.scheduler.SchedulerServicesModule;
-import org.apache.aurora.scheduler.discovery.ServiceGroupMonitor.MonitorException;
+import org.apache.aurora.scheduler.app.ServiceGroupMonitor.MonitorException;
import org.apache.aurora.scheduler.http.api.ApiModule;
import org.apache.aurora.scheduler.http.api.security.HttpSecurityModule;
import org.apache.aurora.scheduler.thrift.ThriftModule;
http://git-wip-us.apache.org/repos/asf/aurora/blob/16e4651d/src/main/java/org/apache/aurora/scheduler/http/LeaderRedirect.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/http/LeaderRedirect.java b/src/main/java/org/apache/aurora/scheduler/http/LeaderRedirect.java
index 662d6d5..bc0e2a8 100644
--- a/src/main/java/org/apache/aurora/scheduler/http/LeaderRedirect.java
+++ b/src/main/java/org/apache/aurora/scheduler/http/LeaderRedirect.java
@@ -27,8 +27,8 @@ import com.google.common.net.HostAndPort;
import org.apache.aurora.common.thrift.Endpoint;
import org.apache.aurora.common.thrift.ServiceInstance;
-import org.apache.aurora.scheduler.discovery.ServiceGroupMonitor;
-import org.apache.aurora.scheduler.discovery.ServiceGroupMonitor.MonitorException;
+import org.apache.aurora.scheduler.app.ServiceGroupMonitor;
+import org.apache.aurora.scheduler.app.ServiceGroupMonitor.MonitorException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
http://git-wip-us.apache.org/repos/asf/aurora/blob/16e4651d/src/main/java/org/apache/aurora/scheduler/log/mesos/MesosLogStreamModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/log/mesos/MesosLogStreamModule.java b/src/main/java/org/apache/aurora/scheduler/log/mesos/MesosLogStreamModule.java
index c7c0387..6704a32 100644
--- a/src/main/java/org/apache/aurora/scheduler/log/mesos/MesosLogStreamModule.java
+++ b/src/main/java/org/apache/aurora/scheduler/log/mesos/MesosLogStreamModule.java
@@ -33,8 +33,8 @@ import org.apache.aurora.common.args.CmdLine;
import org.apache.aurora.common.net.InetSocketAddressHelper;
import org.apache.aurora.common.quantity.Amount;
import org.apache.aurora.common.quantity.Time;
+import org.apache.aurora.common.zookeeper.Credentials;
import org.apache.aurora.gen.storage.LogEntry;
-import org.apache.aurora.scheduler.discovery.Credentials;
import org.apache.aurora.scheduler.discovery.ZooKeeperConfig;
import org.apache.aurora.scheduler.log.mesos.LogInterface.ReaderInterface;
import org.apache.aurora.scheduler.log.mesos.LogInterface.WriterInterface;
@@ -157,7 +157,7 @@ public class MesosLogStreamModule extends PrivateModule {
zkClientConfig.getSessionTimeout().getUnit().getTimeUnit(),
zkLogGroupPath,
zkCredentials.scheme(),
- zkCredentials.token());
+ zkCredentials.authToken());
} else {
return new Log(
QUORUM_SIZE.get(),
http://git-wip-us.apache.org/repos/asf/aurora/blob/16e4651d/src/test/java/org/apache/aurora/scheduler/SchedulerLifecycleTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/SchedulerLifecycleTest.java b/src/test/java/org/apache/aurora/scheduler/SchedulerLifecycleTest.java
index 4324ea9..051c520 100644
--- a/src/test/java/org/apache/aurora/scheduler/SchedulerLifecycleTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/SchedulerLifecycleTest.java
@@ -21,9 +21,9 @@ import org.apache.aurora.common.application.ShutdownRegistry;
import org.apache.aurora.common.base.Command;
import org.apache.aurora.common.base.ExceptionalCommand;
import org.apache.aurora.common.testing.easymock.EasyMockTest;
+import org.apache.aurora.common.zookeeper.SingletonService.LeaderControl;
+import org.apache.aurora.common.zookeeper.SingletonService.LeadershipListener;
import org.apache.aurora.scheduler.SchedulerLifecycle.DelayedActions;
-import org.apache.aurora.scheduler.discovery.SingletonService.LeaderControl;
-import org.apache.aurora.scheduler.discovery.SingletonService.LeadershipListener;
import org.apache.aurora.scheduler.events.PubsubEvent.DriverRegistered;
import org.apache.aurora.scheduler.mesos.Driver;
import org.apache.aurora.scheduler.storage.Storage.StorageException;
http://git-wip-us.apache.org/repos/asf/aurora/blob/16e4651d/src/test/java/org/apache/aurora/scheduler/app/SchedulerIT.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/app/SchedulerIT.java b/src/test/java/org/apache/aurora/scheduler/app/SchedulerIT.java
index 84d7753..29a3b4a 100644
--- a/src/test/java/org/apache/aurora/scheduler/app/SchedulerIT.java
+++ b/src/test/java/org/apache/aurora/scheduler/app/SchedulerIT.java
@@ -17,7 +17,6 @@ import java.io.File;
import java.net.InetSocketAddress;
import java.util.Arrays;
import java.util.Set;
-import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -44,6 +43,10 @@ import org.apache.aurora.GuavaUtils;
import org.apache.aurora.codec.ThriftBinaryCodec.CodingException;
import org.apache.aurora.common.application.Lifecycle;
import org.apache.aurora.common.stats.Stats;
+import org.apache.aurora.common.zookeeper.Credentials;
+import org.apache.aurora.common.zookeeper.ServerSetImpl;
+import org.apache.aurora.common.zookeeper.ZooKeeperClient;
+import org.apache.aurora.common.zookeeper.testing.BaseZooKeeperClientTest;
import org.apache.aurora.gen.HostAttributes;
import org.apache.aurora.gen.MaintenanceMode;
import org.apache.aurora.gen.ScheduleStatus;
@@ -60,11 +63,8 @@ import org.apache.aurora.scheduler.AppStartup;
import org.apache.aurora.scheduler.TierModule;
import org.apache.aurora.scheduler.base.TaskTestUtil;
import org.apache.aurora.scheduler.configuration.executor.ExecutorSettings;
-import org.apache.aurora.scheduler.discovery.Credentials;
import org.apache.aurora.scheduler.discovery.ServiceDiscoveryModule;
-import org.apache.aurora.scheduler.discovery.ServiceGroupMonitor;
import org.apache.aurora.scheduler.discovery.ZooKeeperConfig;
-import org.apache.aurora.scheduler.discovery.testing.BaseZooKeeperTest;
import org.apache.aurora.scheduler.log.Log;
import org.apache.aurora.scheduler.log.Log.Entry;
import org.apache.aurora.scheduler.log.Log.Position;
@@ -107,9 +107,10 @@ import static org.easymock.EasyMock.createControl;
import static org.easymock.EasyMock.eq;
import static org.easymock.EasyMock.expect;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
-public class SchedulerIT extends BaseZooKeeperTest {
+public class SchedulerIT extends BaseZooKeeperClientTest {
private static final Logger LOG = LoggerFactory.getLogger(SchedulerIT.class);
@@ -144,6 +145,7 @@ public class SchedulerIT extends BaseZooKeeperTest {
private Stream logStream;
private StreamMatcher streamMatcher;
private EntrySerializer entrySerializer;
+ private ZooKeeperClient zkClient;
private File backupDir;
@Rule
public TemporaryFolder temporaryFolder = new TemporaryFolder();
@@ -171,9 +173,11 @@ public class SchedulerIT extends BaseZooKeeperTest {
entrySerializer = new EntrySerializer.EntrySerializerImpl(
LogStorageModule.MAX_LOG_ENTRY_SIZE.get(),
Hashing.md5());
+
+ zkClient = createZkClient();
}
- private Callable<Void> startScheduler() throws Exception {
+ private void startScheduler() throws Exception {
// TODO(wfarner): Try to accomplish all this by subclassing SchedulerMain and actually using
// AppLauncher.
Module testModule = new AbstractModule() {
@@ -198,8 +202,10 @@ public class SchedulerIT extends BaseZooKeeperTest {
};
ZooKeeperConfig zkClientConfig =
ZooKeeperConfig.create(
+ true, // useCurator
ImmutableList.of(InetSocketAddress.createUnresolved("localhost", getPort())))
.withCredentials(Credentials.digestCredentials("mesos", "mesos"));
+ SchedulerMain main = SchedulerMain.class.newInstance();
Injector injector = Guice.createInjector(
ImmutableList.<Module>builder()
.add(SchedulerMain.getUniversalModule())
@@ -209,8 +215,8 @@ public class SchedulerIT extends BaseZooKeeperTest {
.add(testModule)
.build()
);
- SchedulerMain main = new SchedulerMain();
injector.injectMembers(main);
+ Lifecycle lifecycle = injector.getInstance(Lifecycle.class);
executor.submit(() -> {
try {
@@ -220,32 +226,28 @@ public class SchedulerIT extends BaseZooKeeperTest {
executor.shutdownNow();
}
});
-
- Lifecycle lifecycle = injector.getInstance(Lifecycle.class);
addTearDown(() -> {
lifecycle.shutdown();
MoreExecutors.shutdownAndAwaitTermination(executor, 10, TimeUnit.SECONDS);
});
-
injector.getInstance(Key.get(GuavaUtils.ServiceManagerIface.class, AppStartup.class))
.awaitHealthy();
+ }
- ServiceGroupMonitor schedulerMonitor = injector.getInstance(ServiceGroupMonitor.class);
- CountDownLatch schedulerReady = new CountDownLatch(1);
+ private void awaitSchedulerReady() throws Exception {
executor.submit(() -> {
- while (schedulerMonitor.get().isEmpty()) {
- try {
- Thread.sleep(10);
- } catch (InterruptedException e) {
- throw new RuntimeException(e);
+ ServerSetImpl schedulerService = new ServerSetImpl(zkClient, SERVERSET_PATH);
+ final CountDownLatch schedulerReady = new CountDownLatch(1);
+ schedulerService.watch(hostSet -> {
+ if (!hostSet.isEmpty()) {
+ schedulerReady.countDown();
}
- }
- schedulerReady.countDown();
- });
- return () -> {
- schedulerReady.await();
+ });
+ // A timeout is used because certain types of assertion errors (mocks) will not surface
+ // until the main test thread exits this body of code.
+ assertTrue(schedulerReady.await(5L, TimeUnit.MINUTES));
return null;
- };
+ }).get();
}
private final AtomicInteger curPosition = new AtomicInteger();
@@ -330,14 +332,14 @@ public class SchedulerIT extends BaseZooKeeperTest {
expect(driver.stop(true)).andReturn(Status.DRIVER_STOPPED).anyTimes();
control.replay();
- Callable<Void> awaitSchedulerReady = startScheduler();
+ startScheduler();
driverStarted.await();
scheduler.getValue().registered(driver,
FrameworkID.newBuilder().setValue(FRAMEWORK_ID).build(),
MasterInfo.getDefaultInstance());
- awaitSchedulerReady.call();
+ awaitSchedulerReady();
assertEquals(0L, Stats.<Long>getVariable("task_store_PENDING").read().longValue());
assertEquals(1L, Stats.<Long>getVariable("task_store_ASSIGNED").read().longValue());
http://git-wip-us.apache.org/repos/asf/aurora/blob/16e4651d/src/test/java/org/apache/aurora/scheduler/discovery/AbstractDiscoveryModuleTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/discovery/AbstractDiscoveryModuleTest.java b/src/test/java/org/apache/aurora/scheduler/discovery/AbstractDiscoveryModuleTest.java
new file mode 100644
index 0000000..d90192b
--- /dev/null
+++ b/src/test/java/org/apache/aurora/scheduler/discovery/AbstractDiscoveryModuleTest.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.discovery;
+
+import java.net.InetSocketAddress;
+
+import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableList;
+import com.google.inject.AbstractModule;
+import com.google.inject.Binder;
+import com.google.inject.Guice;
+import com.google.inject.Injector;
+import com.google.inject.Module;
+
+import org.apache.aurora.common.quantity.Amount;
+import org.apache.aurora.common.quantity.Time;
+import org.apache.aurora.common.testing.TearDownTestCase;
+import org.apache.aurora.common.zookeeper.Credentials;
+import org.apache.aurora.common.zookeeper.SingletonService;
+import org.apache.aurora.common.zookeeper.ZooKeeperUtils;
+import org.apache.aurora.scheduler.app.ServiceGroupMonitor;
+import org.junit.Test;
+
+import static org.junit.Assert.assertNotNull;
+
+abstract class AbstractDiscoveryModuleTest extends TearDownTestCase {
+
+ @Test
+ public void testBindingContract() {
+ ZooKeeperConfig zooKeeperConfig =
+ new ZooKeeperConfig(
+ isCurator(),
+ ImmutableList.of(InetSocketAddress.createUnresolved("localhost", 42)),
+ Optional.of("/chroot"),
+ false, // inProcess
+ Amount.of(1, Time.DAYS),
+ Optional.of(Credentials.digestCredentials("test", "user")));
+
+ Injector injector =
+ Guice.createInjector(
+ new AbstractModule() {
+ @Override
+ protected void configure() {
+ bind(ServiceDiscoveryBindings.ZOO_KEEPER_CLUSTER_KEY)
+ .toInstance(
+ ImmutableList.of(InetSocketAddress.createUnresolved("localhost", 42)));
+ bind(ServiceDiscoveryBindings.ZOO_KEEPER_ACL_KEY)
+ .toInstance(ZooKeeperUtils.OPEN_ACL_UNSAFE);
+
+ bindExtraRequirements(binder());
+ }
+ },
+ createModule("/discovery/path", zooKeeperConfig));
+
+ assertNotNull(injector.getBinding(SingletonService.class).getProvider().get());
+ assertNotNull(injector.getBinding(ServiceGroupMonitor.class).getProvider().get());
+ }
+
+ void bindExtraRequirements(Binder binder) {
+ // Noop.
+ }
+
+ abstract Module createModule(String discoveryPath, ZooKeeperConfig zooKeeperConfig);
+
+ abstract boolean isCurator();
+}
http://git-wip-us.apache.org/repos/asf/aurora/blob/16e4651d/src/test/java/org/apache/aurora/scheduler/discovery/BaseCuratorDiscoveryTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/discovery/BaseCuratorDiscoveryTest.java b/src/test/java/org/apache/aurora/scheduler/discovery/BaseCuratorDiscoveryTest.java
index 9f86add..a2b4125 100644
--- a/src/test/java/org/apache/aurora/scheduler/discovery/BaseCuratorDiscoveryTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/discovery/BaseCuratorDiscoveryTest.java
@@ -21,10 +21,13 @@ import java.util.function.Predicate;
import com.google.common.collect.ImmutableMap;
+import org.apache.aurora.common.io.Codec;
import org.apache.aurora.common.thrift.Endpoint;
import org.apache.aurora.common.thrift.ServiceInstance;
import org.apache.aurora.common.thrift.Status;
-import org.apache.aurora.scheduler.discovery.testing.BaseZooKeeperTest;
+import org.apache.aurora.common.zookeeper.ServerSet;
+import org.apache.aurora.common.zookeeper.testing.BaseZooKeeperTest;
+import org.apache.aurora.scheduler.app.ServiceGroupMonitor;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
@@ -35,6 +38,7 @@ class BaseCuratorDiscoveryTest extends BaseZooKeeperTest {
static final String GROUP_PATH = "/group/root";
static final String MEMBER_TOKEN = "member_";
+ static final Codec<ServiceInstance> CODEC = ServerSet.JSON_CODEC;
static final int PRIMARY_PORT = 42;
private CuratorFramework client;
@@ -51,7 +55,7 @@ class BaseCuratorDiscoveryTest extends BaseZooKeeperTest {
groupCache.getListenable().addListener((c, event) -> groupEvents.put(event));
Predicate<String> memberSelector = name -> name.contains(MEMBER_TOKEN);
- groupMonitor = new CuratorServiceGroupMonitor(groupCache, memberSelector, JsonCodec.INSTANCE);
+ groupMonitor = new CuratorServiceGroupMonitor(groupCache, memberSelector, ServerSet.JSON_CODEC);
}
final CuratorFramework startNewClient() {
@@ -97,7 +101,7 @@ class BaseCuratorDiscoveryTest extends BaseZooKeeperTest {
final byte[] serialize(ServiceInstance serviceInstance) throws IOException {
ByteArrayOutputStream sink = new ByteArrayOutputStream();
- JsonCodec.INSTANCE.serialize(serviceInstance, sink);
+ CODEC.serialize(serviceInstance, sink);
return sink.toByteArray();
}
http://git-wip-us.apache.org/repos/asf/aurora/blob/16e4651d/src/test/java/org/apache/aurora/scheduler/discovery/CommonsDiscoveryModuleTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/discovery/CommonsDiscoveryModuleTest.java b/src/test/java/org/apache/aurora/scheduler/discovery/CommonsDiscoveryModuleTest.java
new file mode 100644
index 0000000..7a4c4dd
--- /dev/null
+++ b/src/test/java/org/apache/aurora/scheduler/discovery/CommonsDiscoveryModuleTest.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.discovery;
+
+import com.google.inject.Module;
+
+public class CommonsDiscoveryModuleTest extends AbstractDiscoveryModuleTest {
+
+ @Override
+ Module createModule(String discoveryPath, ZooKeeperConfig zooKeeperConfig) {
+ return new CommonsServiceDiscoveryModule(discoveryPath, zooKeeperConfig);
+ }
+
+ @Override
+ boolean isCurator() {
+ return false;
+ }
+}
http://git-wip-us.apache.org/repos/asf/aurora/blob/16e4651d/src/test/java/org/apache/aurora/scheduler/discovery/CommonsServiceGroupMonitorTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/discovery/CommonsServiceGroupMonitorTest.java b/src/test/java/org/apache/aurora/scheduler/discovery/CommonsServiceGroupMonitorTest.java
new file mode 100644
index 0000000..42a2224
--- /dev/null
+++ b/src/test/java/org/apache/aurora/scheduler/discovery/CommonsServiceGroupMonitorTest.java
@@ -0,0 +1,137 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.discovery;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+
+import org.apache.aurora.common.base.Command;
+import org.apache.aurora.common.net.pool.DynamicHostSet;
+import org.apache.aurora.common.net.pool.DynamicHostSet.HostChangeMonitor;
+import org.apache.aurora.common.testing.easymock.EasyMockTest;
+import org.apache.aurora.common.thrift.Endpoint;
+import org.apache.aurora.common.thrift.ServiceInstance;
+import org.apache.aurora.common.thrift.Status;
+import org.apache.aurora.scheduler.app.ServiceGroupMonitor;
+import org.easymock.Capture;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.easymock.EasyMock.capture;
+import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.expectLastCall;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+public class CommonsServiceGroupMonitorTest extends EasyMockTest {
+
+ private DynamicHostSet<ServiceInstance> serverSet;
+ private Capture<HostChangeMonitor<ServiceInstance>> hostChangeMonitorCapture;
+ private Command stopCommand;
+
+ @Before
+ public void setUp() throws Exception {
+ serverSet = createMock(new Clazz<DynamicHostSet<ServiceInstance>>() { });
+ hostChangeMonitorCapture = createCapture();
+ stopCommand = createMock(Command.class);
+ }
+
+ private void expectSuccessfulWatch() throws Exception {
+ expect(serverSet.watch(capture(hostChangeMonitorCapture))).andReturn(stopCommand);
+ }
+
+ private void expectFailedWatch() throws Exception {
+ DynamicHostSet.MonitorException watchError =
+ new DynamicHostSet.MonitorException(
+ "Problem watching service group",
+ new RuntimeException());
+ expect(serverSet.watch(capture(hostChangeMonitorCapture))).andThrow(watchError);
+ }
+
+ @Test
+ public void testNominalLifecycle() throws Exception {
+ expectSuccessfulWatch();
+
+ stopCommand.execute();
+ expectLastCall();
+
+ control.replay();
+
+ CommonsServiceGroupMonitor groupMonitor = new CommonsServiceGroupMonitor(serverSet);
+ groupMonitor.start();
+ groupMonitor.close();
+ }
+
+ @Test
+ public void testExceptionalLifecycle() throws Exception {
+ expectFailedWatch();
+ control.replay();
+
+ CommonsServiceGroupMonitor groupMonitor = new CommonsServiceGroupMonitor(serverSet);
+ try {
+ groupMonitor.start();
+ fail();
+ } catch (ServiceGroupMonitor.MonitorException e) {
+ // expected
+ }
+
+ // Close on a non-started monitor should be allowed.
+ groupMonitor.close();
+ }
+
+ @Test
+ public void testNoHosts() throws Exception {
+ expectSuccessfulWatch();
+ control.replay();
+
+ CommonsServiceGroupMonitor groupMonitor = new CommonsServiceGroupMonitor(serverSet);
+ assertEquals(ImmutableSet.of(), groupMonitor.get());
+
+ groupMonitor.start();
+ assertEquals(ImmutableSet.of(), groupMonitor.get());
+
+ hostChangeMonitorCapture.getValue().onChange(ImmutableSet.of());
+ assertEquals(ImmutableSet.of(), groupMonitor.get());
+ }
+
+ @Test
+ public void testHostUpdates() throws Exception {
+ expectSuccessfulWatch();
+ control.replay();
+
+ CommonsServiceGroupMonitor groupMonitor = new CommonsServiceGroupMonitor(serverSet);
+ groupMonitor.start();
+
+ ImmutableSet<ServiceInstance> twoHosts =
+ ImmutableSet.of(serviceInstance("one"), serviceInstance("two"));
+ hostChangeMonitorCapture.getValue().onChange(twoHosts);
+ assertEquals(twoHosts, groupMonitor.get());
+
+ ImmutableSet<ServiceInstance> oneHost = ImmutableSet.of(serviceInstance("one"));
+ hostChangeMonitorCapture.getValue().onChange(oneHost);
+ assertEquals(oneHost, groupMonitor.get());
+
+ ImmutableSet<ServiceInstance> anotherHost = ImmutableSet.of(serviceInstance("three"));
+ hostChangeMonitorCapture.getValue().onChange(anotherHost);
+ assertEquals(anotherHost, groupMonitor.get());
+
+ ImmutableSet<ServiceInstance> noHosts = ImmutableSet.of();
+ hostChangeMonitorCapture.getValue().onChange(noHosts);
+ assertEquals(noHosts, groupMonitor.get());
+ }
+
+ private ServiceInstance serviceInstance(String hostName) {
+ return new ServiceInstance(new Endpoint(hostName, 42), ImmutableMap.of(), Status.ALIVE);
+ }
+}
http://git-wip-us.apache.org/repos/asf/aurora/blob/16e4651d/src/test/java/org/apache/aurora/scheduler/discovery/CuratorDiscoveryModuleTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/discovery/CuratorDiscoveryModuleTest.java b/src/test/java/org/apache/aurora/scheduler/discovery/CuratorDiscoveryModuleTest.java
index 4ebda5e..f1a02e4 100644
--- a/src/test/java/org/apache/aurora/scheduler/discovery/CuratorDiscoveryModuleTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/discovery/CuratorDiscoveryModuleTest.java
@@ -13,27 +13,37 @@
*/
package org.apache.aurora.scheduler.discovery;
-import java.net.InetSocketAddress;
-import java.util.Optional;
-
import com.google.common.collect.ImmutableList;
-import com.google.inject.AbstractModule;
-import com.google.inject.Guice;
-import com.google.inject.Injector;
+import com.google.inject.Binder;
+import com.google.inject.Module;
import org.apache.aurora.common.application.ShutdownRegistry;
import org.apache.aurora.common.application.ShutdownRegistry.ShutdownRegistryImpl;
-import org.apache.aurora.common.quantity.Amount;
-import org.apache.aurora.common.quantity.Time;
-import org.apache.aurora.common.testing.TearDownTestCase;
+import org.apache.aurora.common.zookeeper.ZooKeeperUtils;
import org.apache.curator.framework.api.ACLProvider;
import org.apache.zookeeper.data.ACL;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-public class CuratorDiscoveryModuleTest extends TearDownTestCase {
+public class CuratorDiscoveryModuleTest extends AbstractDiscoveryModuleTest {
+
+ @Override
+ void bindExtraRequirements(Binder binder) {
+ ShutdownRegistryImpl shutdownRegistry = new ShutdownRegistryImpl();
+ binder.bind(ShutdownRegistry.class).toInstance(shutdownRegistry);
+ addTearDown(shutdownRegistry::execute);
+ }
+
+ @Override
+ Module createModule(String discoveryPath, ZooKeeperConfig zooKeeperConfig) {
+ return new CuratorServiceDiscoveryModule(discoveryPath, zooKeeperConfig);
+ }
+
+ @Override
+ boolean isCurator() {
+ return false;
+ }
@Test
public void testSingleACLProvider() {
@@ -54,36 +64,4 @@ public class CuratorDiscoveryModuleTest extends TearDownTestCase {
public void testSingleACLProviderEmpty() {
new CuratorServiceDiscoveryModule.SingleACLProvider(ImmutableList.of());
}
-
- @Test
- public void testBindingContract() {
- ZooKeeperConfig zooKeeperConfig =
- new ZooKeeperConfig(
- ImmutableList.of(InetSocketAddress.createUnresolved("localhost", 42)),
- Optional.of("/chroot"),
- false, // inProcess
- Amount.of(1, Time.DAYS),
- Optional.of(Credentials.digestCredentials("test", "user")));
-
- Injector injector =
- Guice.createInjector(
- new AbstractModule() {
- @Override
- protected void configure() {
- bind(ServiceDiscoveryBindings.ZOO_KEEPER_CLUSTER_KEY)
- .toInstance(
- ImmutableList.of(InetSocketAddress.createUnresolved("localhost", 42)));
- bind(ServiceDiscoveryBindings.ZOO_KEEPER_ACL_KEY)
- .toInstance(ZooKeeperUtils.OPEN_ACL_UNSAFE);
-
- ShutdownRegistryImpl shutdownRegistry = new ShutdownRegistryImpl();
- binder().bind(ShutdownRegistry.class).toInstance(shutdownRegistry);
- addTearDown(shutdownRegistry::execute);
- }
- },
- new CuratorServiceDiscoveryModule("/discovery/path", zooKeeperConfig));
-
- assertNotNull(injector.getBinding(SingletonService.class).getProvider().get());
- assertNotNull(injector.getBinding(ServiceGroupMonitor.class).getProvider().get());
- }
}
http://git-wip-us.apache.org/repos/asf/aurora/blob/16e4651d/src/test/java/org/apache/aurora/scheduler/discovery/CuratorSingletonServiceTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/discovery/CuratorSingletonServiceTest.java b/src/test/java/org/apache/aurora/scheduler/discovery/CuratorSingletonServiceTest.java
index bb3d080..6ea49b0 100644
--- a/src/test/java/org/apache/aurora/scheduler/discovery/CuratorSingletonServiceTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/discovery/CuratorSingletonServiceTest.java
@@ -19,6 +19,7 @@ import java.util.concurrent.CountDownLatch;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
+import org.apache.aurora.common.zookeeper.SingletonService;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.easymock.Capture;
@@ -56,7 +57,7 @@ public class CuratorSingletonServiceTest extends BaseCuratorDiscoveryTest {
throws Exception {
CuratorSingletonService singletonService =
- new CuratorSingletonService(client, GROUP_PATH, MEMBER_TOKEN, JsonCodec.INSTANCE);
+ new CuratorSingletonService(client, GROUP_PATH, MEMBER_TOKEN, CODEC);
InetSocketAddress leaderEndpoint = InetSocketAddress.createUnresolved(hostName, PRIMARY_PORT);
singletonService.lead(leaderEndpoint, ImmutableMap.of(), listener);
}