You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@twill.apache.org by as...@apache.org on 2014/01/16 23:10:29 UTC
git commit: Improve service discovery API to allow push model for
getting endpoint changes.
Updated Branches:
refs/heads/master 1380f21c0 -> b0dd8e241
Improve service discovery API to allow push model for getting endpoint changes.
This major change to to have the DiscoveryServiceClient.discovery() method to return
a ServiceDiscovered object, which allows caller to set listener to receives changes
service endpoint. The ServiceDiscovered object by itself extends from Iterable to maintain
existing pull model and backward compatibility.
Signed-off-by: Albert Shau <al...@continuuity.com>
Project: http://git-wip-us.apache.org/repos/asf/incubator-twill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-twill/commit/b0dd8e24
Tree: http://git-wip-us.apache.org/repos/asf/incubator-twill/tree/b0dd8e24
Diff: http://git-wip-us.apache.org/repos/asf/incubator-twill/diff/b0dd8e24
Branch: refs/heads/master
Commit: b0dd8e241b55532c0a8c5d18f5bccf6395e3b2ea
Parents: 1380f21
Author: Terence Yim <te...@continuuity.com>
Authored: Thu Jan 16 12:59:43 2014 -0800
Committer: Albert Shau <al...@continuuity.com>
Committed: Thu Jan 16 14:09:46 2014 -0800
----------------------------------------------------------------------
.../org/apache/twill/api/TwillController.java | 6 +-
.../twill/internal/AbstractTwillController.java | 4 +-
.../twill/discovery/DiscoveryServiceClient.java | 6 +-
.../twill/discovery/ServiceDiscovered.java | 71 +++++
.../discovery/DefaultServiceDiscovered.java | 159 +++++++++++
.../twill/discovery/DiscoverableAdapter.java | 102 +++++++
.../discovery/InMemoryDiscoveryService.java | 63 +++--
.../twill/discovery/ZKDiscoveryService.java | 111 +-------
.../discovery/DiscoveryServiceTestBase.java | 278 +++++++++++++++++++
.../discovery/InMemoryDiscoveryServiceTest.java | 44 +--
.../twill/discovery/ZKDiscoveryServiceTest.java | 130 +--------
11 files changed, 690 insertions(+), 284 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/b0dd8e24/twill-api/src/main/java/org/apache/twill/api/TwillController.java
----------------------------------------------------------------------
diff --git a/twill-api/src/main/java/org/apache/twill/api/TwillController.java b/twill-api/src/main/java/org/apache/twill/api/TwillController.java
index f31d3f9..a5906f4 100644
--- a/twill-api/src/main/java/org/apache/twill/api/TwillController.java
+++ b/twill-api/src/main/java/org/apache/twill/api/TwillController.java
@@ -19,6 +19,7 @@ package org.apache.twill.api;
import org.apache.twill.api.logging.LogHandler;
import org.apache.twill.discovery.Discoverable;
+import org.apache.twill.discovery.ServiceDiscovered;
import com.google.common.util.concurrent.ListenableFuture;
/**
@@ -35,10 +36,9 @@ public interface TwillController extends ServiceController {
/**
* Discovers the set of {@link Discoverable} endpoints that provides service for the given service name.
* @param serviceName Name of the service to discovery.
- * @return An {@link Iterable} that gives set of latest {@link Discoverable} every time when
- * {@link Iterable#iterator()}} is invoked.
+ * @return A {@link org.apache.twill.discovery.ServiceDiscovered} object representing the result.
*/
- Iterable<Discoverable> discoverService(String serviceName);
+ ServiceDiscovered discoverService(String serviceName);
/**
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/b0dd8e24/twill-core/src/main/java/org/apache/twill/internal/AbstractTwillController.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/AbstractTwillController.java b/twill-core/src/main/java/org/apache/twill/internal/AbstractTwillController.java
index 5806f9d..97e0a8f 100644
--- a/twill-core/src/main/java/org/apache/twill/internal/AbstractTwillController.java
+++ b/twill-core/src/main/java/org/apache/twill/internal/AbstractTwillController.java
@@ -21,8 +21,8 @@ import org.apache.twill.api.RunId;
import org.apache.twill.api.TwillController;
import org.apache.twill.api.logging.LogEntry;
import org.apache.twill.api.logging.LogHandler;
-import org.apache.twill.discovery.Discoverable;
import org.apache.twill.discovery.DiscoveryServiceClient;
+import org.apache.twill.discovery.ServiceDiscovered;
import org.apache.twill.discovery.ZKDiscoveryService;
import org.apache.twill.internal.json.StackTraceElementCodec;
import org.apache.twill.internal.kafka.client.SimpleKafkaClient;
@@ -97,7 +97,7 @@ public abstract class AbstractTwillController extends AbstractZKServiceControlle
}
@Override
- public final Iterable<Discoverable> discoverService(String serviceName) {
+ public final ServiceDiscovered discoverService(String serviceName) {
return discoveryServiceClient.discover(serviceName);
}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/b0dd8e24/twill-discovery-api/src/main/java/org/apache/twill/discovery/DiscoveryServiceClient.java
----------------------------------------------------------------------
diff --git a/twill-discovery-api/src/main/java/org/apache/twill/discovery/DiscoveryServiceClient.java b/twill-discovery-api/src/main/java/org/apache/twill/discovery/DiscoveryServiceClient.java
index 89cf269..a58c83d 100644
--- a/twill-discovery-api/src/main/java/org/apache/twill/discovery/DiscoveryServiceClient.java
+++ b/twill-discovery-api/src/main/java/org/apache/twill/discovery/DiscoveryServiceClient.java
@@ -26,9 +26,7 @@ public interface DiscoveryServiceClient {
* Retrieves a list of {@link Discoverable} for the a service with the given name.
*
* @param name Name of the service
- * @return A live {@link Iterable} that on each call to {@link Iterable#iterator()} returns
- * an {@link java.util.Iterator Iterator} that reflects the latest set of
- * available {@link Discoverable} services.
+ * @return A {@link org.apache.twill.discovery.ServiceDiscovered} object representing the result.
*/
- Iterable<Discoverable> discover(String name);
+ ServiceDiscovered discover(String name);
}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/b0dd8e24/twill-discovery-api/src/main/java/org/apache/twill/discovery/ServiceDiscovered.java
----------------------------------------------------------------------
diff --git a/twill-discovery-api/src/main/java/org/apache/twill/discovery/ServiceDiscovered.java b/twill-discovery-api/src/main/java/org/apache/twill/discovery/ServiceDiscovered.java
new file mode 100644
index 0000000..5de15f0
--- /dev/null
+++ b/twill-discovery-api/src/main/java/org/apache/twill/discovery/ServiceDiscovered.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.discovery;
+
+import org.apache.twill.common.Cancellable;
+
+import java.util.concurrent.Executor;
+
+/**
+ * Represents the result of service discovery. It extends from
+ * {@link Iterable} that gives set of latest {@link Discoverable} every time when
+ * {@link #iterator()} is invoked.
+ */
+public interface ServiceDiscovered extends Iterable<Discoverable> {
+
+ /**
+ * Represents a callback for watching changes in the discovery list.
+ *
+ * @see #watchChanges(ServiceDiscovered.ChangeListener, java.util.concurrent.Executor)
+ */
+ interface ChangeListener {
+
+ /**
+ * This method will be invoked when the discovery list changed.
+ *
+ * @param serviceDiscovered The {@link ServiceDiscovered} that this listener is attached to.
+ */
+ void onChange(ServiceDiscovered serviceDiscovered);
+ }
+
+ /**
+ * Returns the name of the service being discovered.
+ *
+ * @return Name of the service.
+ */
+ String getName();
+
+ /**
+ * Registers a {@link ChangeListener} to watch for changes in the discovery list.
+ * The {@link ChangeListener#onChange(ServiceDiscovered)} method will be triggered when start watching,
+ * and on every subsequent changes in the discovery list.
+ *
+ * @param listener A {@link ChangeListener} to watch for changes.
+ * @param executor A {@link Executor} for issuing call to the given listener.
+ * @return A {@link Cancellable} to cancel the watch.
+ */
+ Cancellable watchChanges(ChangeListener listener, Executor executor);
+
+ /**
+ * Checks if the given discoverable contains in the current discovery list.
+ *
+ * @param discoverable The {@link Discoverable} to check for.
+ * @return {@code true} if it exists, {@code false} otherwise.
+ */
+ boolean contains(Discoverable discoverable);
+}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/b0dd8e24/twill-discovery-core/src/main/java/org/apache/twill/discovery/DefaultServiceDiscovered.java
----------------------------------------------------------------------
diff --git a/twill-discovery-core/src/main/java/org/apache/twill/discovery/DefaultServiceDiscovered.java b/twill-discovery-core/src/main/java/org/apache/twill/discovery/DefaultServiceDiscovered.java
new file mode 100644
index 0000000..49fb641
--- /dev/null
+++ b/twill-discovery-core/src/main/java/org/apache/twill/discovery/DefaultServiceDiscovered.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.discovery;
+
+import org.apache.twill.common.Cancellable;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Lists;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+/**
+ * A package private class for implementing {@link ServiceDiscovered}.
+ */
+final class DefaultServiceDiscovered implements ServiceDiscovered {
+
+ private final String name;
+ private final AtomicReference<Set<Discoverable>> discoverables;
+ private final List<ListenerCaller> listenerCallers;
+ private final ReadWriteLock callerLock;
+
+ DefaultServiceDiscovered(String name) {
+ this.name = name;
+ this.discoverables = new AtomicReference<Set<Discoverable>>(ImmutableSet.<Discoverable>of());
+ this.listenerCallers = Lists.newLinkedList();
+ this.callerLock = new ReentrantReadWriteLock();
+ }
+
+ void setDiscoverables(Set<Discoverable> discoverables) {
+ this.discoverables.set(discoverables);
+
+ // Collect all listeners with a read lock to the listener list.
+ List<ListenerCaller> callers = Lists.newArrayList();
+ Lock readLock = callerLock.readLock();
+ readLock.lock();
+ try {
+ callers.addAll(listenerCallers);
+ } finally {
+ readLock.unlock();
+ }
+
+ // Invoke listeners.
+ for (ListenerCaller caller : callers) {
+ caller.invoke();
+ }
+ }
+
+ @Override
+ public String getName() {
+ return name;
+ }
+
+ @Override
+ public Cancellable watchChanges(ChangeListener listener, Executor executor) {
+ ListenerCaller caller = new ListenerCaller(listener, executor);
+
+ // Add the new listener with a write lock.
+ Lock writeLock = callerLock.writeLock();
+ writeLock.lock();
+ try {
+ listenerCallers.add(caller);
+ } finally {
+ writeLock.unlock();
+ }
+
+ // Invoke listener for the first time.
+ // Race would happen between this method and the setDiscoverables() method, but it's ok as the contract of
+ // adding a new listener is that onChange will be called at least once. The actual changes is already
+ // reflected by the atomic reference "discoverables", hence it's consistent.
+ caller.invoke();
+ return caller;
+ }
+
+ @Override
+ public boolean contains(Discoverable discoverable) {
+ // If the name doesn't match, it shouldn't be in the list.
+ if (!discoverable.getName().equals(name)) {
+ return false;
+ }
+
+ // Wrap it if necessary for hashCode/equals comparison.
+ Discoverable target = discoverable;
+ if (!(target instanceof DiscoverableWrapper)) {
+ target = new DiscoverableWrapper(target);
+ }
+
+ return discoverables.get().contains(target);
+ }
+
+ @Override
+ public Iterator<Discoverable> iterator() {
+ return discoverables.get().iterator();
+ }
+
+ /**
+ * Private helper class for invoking the change listener from an executor.
+ * It also responsible to remove itself from the listener list.
+ */
+ private final class ListenerCaller implements Runnable, Cancellable {
+
+ private final ChangeListener listener;
+ private final Executor executor;
+ private final AtomicBoolean cancelled;
+
+ private ListenerCaller(ChangeListener listener, Executor executor) {
+ this.listener = listener;
+ this.executor = executor;
+ this.cancelled = new AtomicBoolean(false);
+ }
+
+ void invoke() {
+ if (!cancelled.get()) {
+ executor.execute(this);
+ }
+ }
+
+ @Override
+ public void run() {
+ if (!cancelled.get()) {
+ listener.onChange(DefaultServiceDiscovered.this);
+ }
+ }
+
+ @Override
+ public void cancel() {
+ if (cancelled.compareAndSet(false, true)) {
+ Lock writeLock = callerLock.writeLock();
+ writeLock.lock();
+ try {
+ listenerCallers.remove(this);
+ } finally {
+ writeLock.unlock();
+ }
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/b0dd8e24/twill-discovery-core/src/main/java/org/apache/twill/discovery/DiscoverableAdapter.java
----------------------------------------------------------------------
diff --git a/twill-discovery-core/src/main/java/org/apache/twill/discovery/DiscoverableAdapter.java b/twill-discovery-core/src/main/java/org/apache/twill/discovery/DiscoverableAdapter.java
new file mode 100644
index 0000000..0e4bcc3
--- /dev/null
+++ b/twill-discovery-core/src/main/java/org/apache/twill/discovery/DiscoverableAdapter.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.discovery;
+
+import com.google.common.base.Charsets;
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import com.google.gson.JsonDeserializationContext;
+import com.google.gson.JsonDeserializer;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParseException;
+import com.google.gson.JsonSerializationContext;
+import com.google.gson.JsonSerializer;
+
+import java.lang.reflect.Type;
+import java.net.InetSocketAddress;
+
+/**
+ * Helper class to serialize and deserialize {@link Discoverable}.
+ */
+final class DiscoverableAdapter {
+
+ private static final Gson GSON =
+ new GsonBuilder().registerTypeAdapter(Discoverable.class, new DiscoverableCodec()).create();
+
+ /**
+ * Helper function for encoding an instance of {@link Discoverable} into array of bytes.
+ * @param discoverable An instance of {@link Discoverable}
+ * @return array of bytes representing an instance of <code>discoverable</code>
+ */
+ static byte[] encode(Discoverable discoverable) {
+ return GSON.toJson(discoverable, Discoverable.class).getBytes(Charsets.UTF_8);
+ }
+
+ /**
+ * Helper function for decoding array of bytes into a {@link Discoverable} object.
+ * @param encoded representing serialized {@link Discoverable}
+ * @return {@code null} if encoded bytes are null; else an instance of {@link Discoverable}
+ */
+ static Discoverable decode(byte[] encoded) {
+ if (encoded == null) {
+ return null;
+ }
+ return GSON.fromJson(new String(encoded, Charsets.UTF_8), Discoverable.class);
+ }
+
+ private DiscoverableAdapter() {
+ }
+
+ /**
+ * SerDe for converting a {@link Discoverable} into a JSON object
+ * or from a JSON object into {@link Discoverable}.
+ */
+ private static final class DiscoverableCodec implements JsonSerializer<Discoverable>, JsonDeserializer<Discoverable> {
+
+ @Override
+ public Discoverable deserialize(JsonElement json, Type typeOfT,
+ JsonDeserializationContext context) throws JsonParseException {
+ JsonObject jsonObj = json.getAsJsonObject();
+ final String service = jsonObj.get("service").getAsString();
+ String hostname = jsonObj.get("hostname").getAsString();
+ int port = jsonObj.get("port").getAsInt();
+ final InetSocketAddress address = new InetSocketAddress(hostname, port);
+ return new DiscoverableWrapper(new Discoverable() {
+ @Override
+ public String getName() {
+ return service;
+ }
+
+ @Override
+ public InetSocketAddress getSocketAddress() {
+ return address;
+ }
+ });
+ }
+
+ @Override
+ public JsonElement serialize(Discoverable src, Type typeOfSrc, JsonSerializationContext context) {
+ JsonObject jsonObj = new JsonObject();
+ jsonObj.addProperty("service", src.getName());
+ jsonObj.addProperty("hostname", src.getSocketAddress().getHostName());
+ jsonObj.addProperty("port", src.getSocketAddress().getPort());
+ return jsonObj;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/b0dd8e24/twill-discovery-core/src/main/java/org/apache/twill/discovery/InMemoryDiscoveryService.java
----------------------------------------------------------------------
diff --git a/twill-discovery-core/src/main/java/org/apache/twill/discovery/InMemoryDiscoveryService.java b/twill-discovery-core/src/main/java/org/apache/twill/discovery/InMemoryDiscoveryService.java
index 7a9e984..2f950b8 100644
--- a/twill-discovery-core/src/main/java/org/apache/twill/discovery/InMemoryDiscoveryService.java
+++ b/twill-discovery-core/src/main/java/org/apache/twill/discovery/InMemoryDiscoveryService.java
@@ -18,11 +18,11 @@
package org.apache.twill.discovery;
import org.apache.twill.common.Cancellable;
-import com.google.common.collect.HashMultimap;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Multimap;
+import com.google.common.collect.LinkedHashMultimap;
+import com.google.common.collect.Maps;
+import com.google.common.collect.SetMultimap;
-import java.util.Iterator;
+import java.util.Map;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
@@ -31,43 +31,58 @@ import java.util.concurrent.locks.ReentrantLock;
*/
public class InMemoryDiscoveryService implements DiscoveryService, DiscoveryServiceClient {
- private final Multimap<String, Discoverable> services = HashMultimap.create();
+ private final SetMultimap<String, Discoverable> services = LinkedHashMultimap.create();
+ private final Map<String, DefaultServiceDiscovered> serviceDiscoveredMap = Maps.newHashMap();
private final Lock lock = new ReentrantLock();
@Override
public Cancellable register(final Discoverable discoverable) {
+ final Discoverable wrapper = new DiscoverableWrapper(discoverable);
+ final String serviceName = wrapper.getName();
+
lock.lock();
try {
- final Discoverable wrapper = new DiscoverableWrapper(discoverable);
- services.put(wrapper.getName(), wrapper);
- return new Cancellable() {
- @Override
- public void cancel() {
- lock.lock();
- try {
- services.remove(wrapper.getName(), wrapper);
- } finally {
- lock.unlock();
- }
- }
- };
+ services.put(serviceName, wrapper);
+
+ DefaultServiceDiscovered serviceDiscovered = serviceDiscoveredMap.get(serviceName);
+ if (serviceDiscovered != null) {
+ serviceDiscovered.setDiscoverables(services.get(serviceName));
+ }
} finally {
lock.unlock();
}
- }
- @Override
- public Iterable<Discoverable> discover(final String name) {
- return new Iterable<Discoverable>() {
+ return new Cancellable() {
@Override
- public Iterator<Discoverable> iterator() {
+ public void cancel() {
lock.lock();
try {
- return ImmutableList.copyOf(services.get(name)).iterator();
+ services.remove(serviceName, wrapper);
+
+ DefaultServiceDiscovered serviceDiscovered = serviceDiscoveredMap.get(serviceName);
+ if (serviceDiscovered != null) {
+ serviceDiscovered.setDiscoverables(services.get(serviceName));
+ }
} finally {
lock.unlock();
}
}
};
}
+
+ @Override
+ public ServiceDiscovered discover(final String name) {
+ lock.lock();
+ try {
+ DefaultServiceDiscovered serviceDiscovered = serviceDiscoveredMap.get(name);
+ if (serviceDiscovered == null) {
+ serviceDiscovered = new DefaultServiceDiscovered(name);
+ serviceDiscovered.setDiscoverables(services.get(name));
+ serviceDiscoveredMap.put(name, serviceDiscovered);
+ }
+ return serviceDiscovered;
+ } finally {
+ lock.unlock();
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/b0dd8e24/twill-discovery-core/src/main/java/org/apache/twill/discovery/ZKDiscoveryService.java
----------------------------------------------------------------------
diff --git a/twill-discovery-core/src/main/java/org/apache/twill/discovery/ZKDiscoveryService.java b/twill-discovery-core/src/main/java/org/apache/twill/discovery/ZKDiscoveryService.java
index 862fcbc..0543626 100644
--- a/twill-discovery-core/src/main/java/org/apache/twill/discovery/ZKDiscoveryService.java
+++ b/twill-discovery-core/src/main/java/org/apache/twill/discovery/ZKDiscoveryService.java
@@ -25,12 +25,11 @@ import org.apache.twill.zookeeper.OperationFuture;
import org.apache.twill.zookeeper.ZKClient;
import org.apache.twill.zookeeper.ZKClients;
import org.apache.twill.zookeeper.ZKOperations;
-import com.google.common.base.Charsets;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.common.collect.HashMultimap;
-import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import com.google.common.collect.Multimap;
import com.google.common.hash.Hashing;
@@ -38,14 +37,6 @@ import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
-import com.google.gson.GsonBuilder;
-import com.google.gson.JsonDeserializationContext;
-import com.google.gson.JsonDeserializer;
-import com.google.gson.JsonElement;
-import com.google.gson.JsonObject;
-import com.google.gson.JsonParseException;
-import com.google.gson.JsonSerializationContext;
-import com.google.gson.JsonSerializer;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
@@ -54,16 +45,13 @@ import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.lang.reflect.Type;
import java.net.InetSocketAddress;
-import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
@@ -96,7 +84,7 @@ import java.util.concurrent.locks.ReentrantLock;
* });
* ...
* ...
- * Iterable<Discoverable> services = service.discovery("service-name");
+ * ServiceDiscovered services = service.discovery("service-name");
* ...
* }
* </pre>
@@ -114,7 +102,7 @@ public class ZKDiscoveryService implements DiscoveryService, DiscoveryServiceCli
private final Multimap<Discoverable, DiscoveryCancellable> discoverables;
private final Lock lock;
- private final LoadingCache<String, Iterable<Discoverable>> services;
+ private final LoadingCache<String, ServiceDiscovered> services;
private final ZKClient zkClient;
private final ScheduledExecutorService retryExecutor;
@@ -193,7 +181,7 @@ public class ZKDiscoveryService implements DiscoveryService, DiscoveryServiceCli
}
@Override
- public Iterable<Discoverable> discover(String service) {
+ public ServiceDiscovered discover(String service) {
return services.getUnchecked(service);
}
@@ -251,7 +239,7 @@ public class ZKDiscoveryService implements DiscoveryService, DiscoveryServiceCli
}
private OperationFuture<String> doRegister(Discoverable discoverable) {
- byte[] discoverableBytes = encode(discoverable);
+ byte[] discoverableBytes = DiscoverableAdapter.encode(discoverable);
return zkClient.create(getNodePath(discoverable), discoverableBytes, CreateMode.EPHEMERAL, true);
}
@@ -330,14 +318,11 @@ public class ZKDiscoveryService implements DiscoveryService, DiscoveryServiceCli
/**
* Creates a CacheLoader for creating live Iterable for watching instances changes for a given service.
*/
- private CacheLoader<String, Iterable<Discoverable>> createServiceLoader() {
- return new CacheLoader<String, Iterable<Discoverable>>() {
+ private CacheLoader<String, ServiceDiscovered> createServiceLoader() {
+ return new CacheLoader<String, ServiceDiscovered>() {
@Override
- public Iterable<Discoverable> load(String service) throws Exception {
- // The atomic reference is to keep the resulting Iterable live. It always contains a
- // immutable snapshot of the latest detected set of Discoverable.
- final AtomicReference<Iterable<Discoverable>> iterable =
- new AtomicReference<Iterable<Discoverable>>(ImmutableList.<Discoverable>of());
+ public ServiceDiscovered load(String service) throws Exception {
+ final DefaultServiceDiscovered serviceDiscovered = new DefaultServiceDiscovered(service);
final String serviceBase = "/" + service;
// Watch for children changes in /service
@@ -356,60 +341,27 @@ public class ZKDiscoveryService implements DiscoveryService, DiscoveryServiceCli
fetchFuture.addListener(new Runnable() {
@Override
public void run() {
- ImmutableList.Builder<Discoverable> builder = ImmutableList.builder();
+ ImmutableSet.Builder<Discoverable> builder = ImmutableSet.builder();
for (NodeData nodeData : Futures.getUnchecked(fetchFuture)) {
// For successful fetch, decode the content.
if (nodeData != null) {
- Discoverable discoverable = decode(nodeData.getData());
+ Discoverable discoverable = DiscoverableAdapter.decode(nodeData.getData());
if (discoverable != null) {
builder.add(discoverable);
}
}
}
- iterable.set(builder.build());
+ serviceDiscovered.setDiscoverables(builder.build());
}
}, Threads.SAME_THREAD_EXECUTOR);
}
});
-
- return new Iterable<Discoverable>() {
- @Override
- public Iterator<Discoverable> iterator() {
- return iterable.get().iterator();
- }
- };
+ return serviceDiscovered;
}
};
}
/**
- * Static helper function for decoding array of bytes into a {@link DiscoverableWrapper} object.
- * @param bytes representing serialized {@link DiscoverableWrapper}
- * @return null if bytes are null; else an instance of {@link DiscoverableWrapper}
- */
- private static Discoverable decode(byte[] bytes) {
- if (bytes == null) {
- return null;
- }
- String content = new String(bytes, Charsets.UTF_8);
- return new GsonBuilder().registerTypeAdapter(Discoverable.class, new DiscoverableCodec())
- .create()
- .fromJson(content, Discoverable.class);
- }
-
- /**
- * Static helper function for encoding an instance of {@link DiscoverableWrapper} into array of bytes.
- * @param discoverable An instance of {@link Discoverable}
- * @return array of bytes representing an instance of <code>discoverable</code>
- */
- private static byte[] encode(Discoverable discoverable) {
- return new GsonBuilder().registerTypeAdapter(DiscoverableWrapper.class, new DiscoverableCodec())
- .create()
- .toJson(discoverable, DiscoverableWrapper.class)
- .getBytes(Charsets.UTF_8);
- }
-
- /**
* Inner class for cancelling (un-register) discovery service.
*/
private final class DiscoveryCancellable implements Cancellable {
@@ -471,42 +423,5 @@ public class ZKDiscoveryService implements DiscoveryService, DiscoveryServiceCli
LOG.debug("Service unregistered: {} {}", discoverable, path);
}
}
-
- /**
- * SerDe for converting a {@link DiscoverableWrapper} into a JSON object
- * or from a JSON object into {@link DiscoverableWrapper}.
- */
- private static final class DiscoverableCodec implements JsonSerializer<Discoverable>, JsonDeserializer<Discoverable> {
-
- @Override
- public Discoverable deserialize(JsonElement json, Type typeOfT,
- JsonDeserializationContext context) throws JsonParseException {
- JsonObject jsonObj = json.getAsJsonObject();
- final String service = jsonObj.get("service").getAsString();
- String hostname = jsonObj.get("hostname").getAsString();
- int port = jsonObj.get("port").getAsInt();
- final InetSocketAddress address = new InetSocketAddress(hostname, port);
- return new Discoverable() {
- @Override
- public String getName() {
- return service;
- }
-
- @Override
- public InetSocketAddress getSocketAddress() {
- return address;
- }
- };
- }
-
- @Override
- public JsonElement serialize(Discoverable src, Type typeOfSrc, JsonSerializationContext context) {
- JsonObject jsonObj = new JsonObject();
- jsonObj.addProperty("service", src.getName());
- jsonObj.addProperty("hostname", src.getSocketAddress().getHostName());
- jsonObj.addProperty("port", src.getSocketAddress().getPort());
- return jsonObj;
- }
- }
}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/b0dd8e24/twill-discovery-core/src/test/java/org/apache/twill/discovery/DiscoveryServiceTestBase.java
----------------------------------------------------------------------
diff --git a/twill-discovery-core/src/test/java/org/apache/twill/discovery/DiscoveryServiceTestBase.java b/twill-discovery-core/src/test/java/org/apache/twill/discovery/DiscoveryServiceTestBase.java
new file mode 100644
index 0000000..17b526b
--- /dev/null
+++ b/twill-discovery-core/src/test/java/org/apache/twill/discovery/DiscoveryServiceTestBase.java
@@ -0,0 +1,278 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.discovery;
+
+import org.apache.twill.common.Cancellable;
+import org.apache.twill.common.Threads;
+import com.google.common.base.Throwables;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.net.InetSocketAddress;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executor;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Base class for testing different discovery service implementation.
+ */
+public abstract class DiscoveryServiceTestBase {
+
+ protected abstract Map.Entry<DiscoveryService, DiscoveryServiceClient> create();
+
+ @Test
+ public void simpleDiscoverable() throws Exception {
+ Map.Entry<DiscoveryService, DiscoveryServiceClient> entry = create();
+ DiscoveryService discoveryService = entry.getKey();
+ DiscoveryServiceClient discoveryServiceClient = entry.getValue();
+
+ // Register one service running on one host:port
+ Cancellable cancellable = register(discoveryService, "foo", "localhost", 8090);
+
+ // Discover that registered host:port.
+ ServiceDiscovered serviceDiscovered = discoveryServiceClient.discover("foo");
+ Assert.assertTrue(waitTillExpected(1, serviceDiscovered));
+
+ Discoverable discoverable = new Discoverable() {
+ @Override
+ public String getName() {
+ return "foo";
+ }
+
+ @Override
+ public InetSocketAddress getSocketAddress() {
+ return new InetSocketAddress("localhost", 8090);
+ }
+ };
+
+ // Check it exists.
+ Assert.assertTrue(serviceDiscovered.contains(discoverable));
+
+ // Remove the service
+ cancellable.cancel();
+
+ // There should be no service.
+ Assert.assertTrue(waitTillExpected(0, serviceDiscovered));
+
+ Assert.assertFalse(serviceDiscovered.contains(discoverable));
+ }
+
+ @Test
+ public void testChangeListener() throws InterruptedException {
+ Map.Entry<DiscoveryService, DiscoveryServiceClient> entry = create();
+ DiscoveryService discoveryService = entry.getKey();
+ DiscoveryServiceClient discoveryServiceClient = entry.getValue();
+
+ // Start discovery
+ String serviceName = "listener_test";
+ ServiceDiscovered serviceDiscovered = discoveryServiceClient.discover(serviceName);
+
+ // Watch for changes.
+ final BlockingQueue<List<Discoverable>> events = new ArrayBlockingQueue<List<Discoverable>>(10);
+ serviceDiscovered.watchChanges(new ServiceDiscovered.ChangeListener() {
+ @Override
+ public void onChange(ServiceDiscovered serviceDiscovered) {
+ events.add(ImmutableList.copyOf(serviceDiscovered));
+ }
+ }, Threads.SAME_THREAD_EXECUTOR);
+
+ // An empty list will be received first, as no endpoint has been registered.
+ List<Discoverable> discoverables = events.poll(5, TimeUnit.SECONDS);
+ Assert.assertNotNull(discoverables);
+ Assert.assertTrue(discoverables.isEmpty());
+
+ // Register a service
+ Cancellable cancellable = register(discoveryService, serviceName, "localhost", 10000);
+
+ discoverables = events.poll(5, TimeUnit.SECONDS);
+ Assert.assertNotNull(discoverables);
+ Assert.assertEquals(1, discoverables.size());
+
+ // Register another service endpoint
+ Cancellable cancellable2 = register(discoveryService, serviceName, "localhost", 10001);
+
+ discoverables = events.poll(5, TimeUnit.SECONDS);
+ Assert.assertNotNull(discoverables);
+ Assert.assertEquals(2, discoverables.size());
+
+ // Cancel both of them
+ cancellable.cancel();
+ cancellable2.cancel();
+
+ // There could be more than one event triggered, but the last event should be an empty list.
+ discoverables = events.poll(5, TimeUnit.SECONDS);
+ Assert.assertNotNull(discoverables);
+ if (!discoverables.isEmpty()) {
+ discoverables = events.poll(5, TimeUnit.SECONDS);
+ }
+
+ Assert.assertTrue(discoverables.isEmpty());
+ }
+
+ @Test
+ public void testCancelChangeListener() throws InterruptedException {
+ Map.Entry<DiscoveryService, DiscoveryServiceClient> entry = create();
+ DiscoveryService discoveryService = entry.getKey();
+ DiscoveryServiceClient discoveryServiceClient = entry.getValue();
+
+ String serviceName = "cancel_listener";
+ ServiceDiscovered serviceDiscovered = discoveryServiceClient.discover(serviceName);
+
+ // An executor that delay execute a Runnable. It's for testing race because listener cancel and discovery changes.
+ Executor delayExecutor = new Executor() {
+ @Override
+ public void execute(final Runnable command) {
+ Thread t = new Thread() {
+ @Override
+ public void run() {
+ try {
+ TimeUnit.SECONDS.sleep(2);
+ command.run();
+ } catch (InterruptedException e) {
+ throw Throwables.propagate(e);
+ }
+ }
+ };
+ t.start();
+ }
+ };
+
+ final BlockingQueue<List<Discoverable>> events = new ArrayBlockingQueue<List<Discoverable>>(10);
+ Cancellable cancelWatch = serviceDiscovered.watchChanges(new ServiceDiscovered.ChangeListener() {
+ @Override
+ public void onChange(ServiceDiscovered serviceDiscovered) {
+ events.add(ImmutableList.copyOf(serviceDiscovered));
+ }
+ }, delayExecutor);
+
+ // Wait for the init event call
+ Assert.assertNotNull(events.poll(3, TimeUnit.SECONDS));
+
+ // Register a new service endpoint, wait a short while and then cancel the listener
+ register(discoveryService, serviceName, "localhost", 1);
+ TimeUnit.SECONDS.sleep(1);
+ cancelWatch.cancel();
+
+ // The change listener shouldn't get any event, since the invocation is delayed by the executor.
+ Assert.assertNull(events.poll(3, TimeUnit.SECONDS));
+ }
+
+ @Test
+ public void manySameDiscoverable() throws Exception {
+ Map.Entry<DiscoveryService, DiscoveryServiceClient> entry = create();
+ DiscoveryService discoveryService = entry.getKey();
+ DiscoveryServiceClient discoveryServiceClient = entry.getValue();
+
+ List<Cancellable> cancellables = Lists.newArrayList();
+
+ cancellables.add(register(discoveryService, "manyDiscoverable", "localhost", 1));
+ cancellables.add(register(discoveryService, "manyDiscoverable", "localhost", 2));
+ cancellables.add(register(discoveryService, "manyDiscoverable", "localhost", 3));
+ cancellables.add(register(discoveryService, "manyDiscoverable", "localhost", 4));
+ cancellables.add(register(discoveryService, "manyDiscoverable", "localhost", 5));
+
+ ServiceDiscovered serviceDiscovered = discoveryServiceClient.discover("manyDiscoverable");
+ Assert.assertTrue(waitTillExpected(5, serviceDiscovered));
+
+ for (int i = 0; i < 5; i++) {
+ cancellables.get(i).cancel();
+ Assert.assertTrue(waitTillExpected(4 - i, serviceDiscovered));
+ }
+ }
+
+ @Test
+ public void multiServiceDiscoverable() throws Exception {
+ Map.Entry<DiscoveryService, DiscoveryServiceClient> entry = create();
+ DiscoveryService discoveryService = entry.getKey();
+ DiscoveryServiceClient discoveryServiceClient = entry.getValue();
+
+ List<Cancellable> cancellables = Lists.newArrayList();
+
+ cancellables.add(register(discoveryService, "service1", "localhost", 1));
+ cancellables.add(register(discoveryService, "service1", "localhost", 2));
+ cancellables.add(register(discoveryService, "service1", "localhost", 3));
+ cancellables.add(register(discoveryService, "service1", "localhost", 4));
+ cancellables.add(register(discoveryService, "service1", "localhost", 5));
+
+ cancellables.add(register(discoveryService, "service2", "localhost", 1));
+ cancellables.add(register(discoveryService, "service2", "localhost", 2));
+ cancellables.add(register(discoveryService, "service2", "localhost", 3));
+
+ cancellables.add(register(discoveryService, "service3", "localhost", 1));
+ cancellables.add(register(discoveryService, "service3", "localhost", 2));
+
+ ServiceDiscovered serviceDiscovered = discoveryServiceClient.discover("service1");
+ Assert.assertTrue(waitTillExpected(5, serviceDiscovered));
+
+ serviceDiscovered = discoveryServiceClient.discover("service2");
+ Assert.assertTrue(waitTillExpected(3, serviceDiscovered));
+
+ serviceDiscovered = discoveryServiceClient.discover("service3");
+ Assert.assertTrue(waitTillExpected(2, serviceDiscovered));
+
+ cancellables.add(register(discoveryService, "service3", "localhost", 3));
+ Assert.assertTrue(waitTillExpected(3, serviceDiscovered)); // Shows live iterator.
+
+ for (Cancellable cancellable : cancellables) {
+ cancellable.cancel();
+ }
+
+ Assert.assertTrue(waitTillExpected(0, discoveryServiceClient.discover("service1")));
+ Assert.assertTrue(waitTillExpected(0, discoveryServiceClient.discover("service2")));
+ Assert.assertTrue(waitTillExpected(0, discoveryServiceClient.discover("service3")));
+ }
+
+ protected Cancellable register(DiscoveryService service, final String name, final String host, final int port) {
+ return service.register(new Discoverable() {
+ @Override
+ public String getName() {
+ return name;
+ }
+
+ @Override
+ public InetSocketAddress getSocketAddress() {
+ return new InetSocketAddress(host, port);
+ }
+ });
+ }
+
+ protected boolean waitTillExpected(final int expected, ServiceDiscovered serviceDiscovered) {
+ final CountDownLatch latch = new CountDownLatch(1);
+ serviceDiscovered.watchChanges(new ServiceDiscovered.ChangeListener() {
+ @Override
+ public void onChange(ServiceDiscovered serviceDiscovered) {
+ if (expected == Iterables.size(serviceDiscovered)) {
+ latch.countDown();
+ }
+ }
+ }, Threads.SAME_THREAD_EXECUTOR);
+
+ try {
+ return latch.await(5, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ throw Throwables.propagate(e);
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/b0dd8e24/twill-discovery-core/src/test/java/org/apache/twill/discovery/InMemoryDiscoveryServiceTest.java
----------------------------------------------------------------------
diff --git a/twill-discovery-core/src/test/java/org/apache/twill/discovery/InMemoryDiscoveryServiceTest.java b/twill-discovery-core/src/test/java/org/apache/twill/discovery/InMemoryDiscoveryServiceTest.java
index d8cc375..d49e8a5 100644
--- a/twill-discovery-core/src/test/java/org/apache/twill/discovery/InMemoryDiscoveryServiceTest.java
+++ b/twill-discovery-core/src/test/java/org/apache/twill/discovery/InMemoryDiscoveryServiceTest.java
@@ -18,50 +18,18 @@
package org.apache.twill.discovery;
-import org.apache.twill.common.Cancellable;
-import com.google.common.collect.Iterables;
-import org.junit.Assert;
-import org.junit.Test;
+import com.google.common.collect.Maps;
-import java.net.InetSocketAddress;
-import java.util.concurrent.TimeUnit;
+import java.util.Map;
/**
* Test memory based service discovery service.
*/
-public class InMemoryDiscoveryServiceTest {
- private Cancellable register(DiscoveryService service, final String name, final String host, final int port) {
- return service.register(new Discoverable() {
- @Override
- public String getName() {
- return name;
- }
+public class InMemoryDiscoveryServiceTest extends DiscoveryServiceTestBase {
- @Override
- public InetSocketAddress getSocketAddress() {
- return new InetSocketAddress(host, port);
- }
- });
- }
-
- @Test
- public void simpleDiscoverable() throws Exception {
+ @Override
+ protected Map.Entry<DiscoveryService, DiscoveryServiceClient> create() {
DiscoveryService discoveryService = new InMemoryDiscoveryService();
- DiscoveryServiceClient discoveryServiceClient = (DiscoveryServiceClient) discoveryService;
-
- // Register one service running on one host:port
- Cancellable cancellable = register(discoveryService, "foo", "localhost", 8090);
- Iterable<Discoverable> discoverables = discoveryServiceClient.discover("foo");
-
- // Discover that registered host:port.
- Assert.assertTrue(Iterables.size(discoverables) == 1);
-
- // Remove the service
- cancellable.cancel();
-
- // There should be no service.
- discoverables = discoveryServiceClient.discover("foo");
- TimeUnit.MILLISECONDS.sleep(100);
- Assert.assertTrue(Iterables.size(discoverables) == 0);
+ return Maps.immutableEntry(discoveryService, (DiscoveryServiceClient) discoveryService);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/b0dd8e24/twill-discovery-core/src/test/java/org/apache/twill/discovery/ZKDiscoveryServiceTest.java
----------------------------------------------------------------------
diff --git a/twill-discovery-core/src/test/java/org/apache/twill/discovery/ZKDiscoveryServiceTest.java b/twill-discovery-core/src/test/java/org/apache/twill/discovery/ZKDiscoveryServiceTest.java
index feee8db..6f0cde0 100644
--- a/twill-discovery-core/src/test/java/org/apache/twill/discovery/ZKDiscoveryServiceTest.java
+++ b/twill-discovery-core/src/test/java/org/apache/twill/discovery/ZKDiscoveryServiceTest.java
@@ -25,8 +25,7 @@ import org.apache.twill.zookeeper.RetryStrategies;
import org.apache.twill.zookeeper.ZKClientService;
import org.apache.twill.zookeeper.ZKClientServices;
import org.apache.twill.zookeeper.ZKClients;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
import com.google.common.util.concurrent.Futures;
import org.junit.AfterClass;
import org.junit.Assert;
@@ -35,14 +34,13 @@ import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.net.InetSocketAddress;
-import java.util.List;
+import java.util.Map;
import java.util.concurrent.TimeUnit;
/**
* Test Zookeeper based discovery service.
*/
-public class ZKDiscoveryServiceTest {
+public class ZKDiscoveryServiceTest extends DiscoveryServiceTestBase {
private static final Logger LOG = LoggerFactory.getLogger(ZKDiscoveryServiceTest.class);
private static InMemoryZKServer zkServer;
@@ -66,41 +64,17 @@ public class ZKDiscoveryServiceTest {
Futures.getUnchecked(Services.chainStop(zkClient, zkServer));
}
- private Cancellable register(DiscoveryService service, final String name, final String host, final int port) {
- return service.register(new Discoverable() {
- @Override
- public String getName() {
- return name;
- }
-
- @Override
- public InetSocketAddress getSocketAddress() {
- return new InetSocketAddress(host, port);
- }
- });
- }
-
-
- private boolean waitTillExpected(int expected, Iterable<Discoverable> discoverables) throws Exception {
- for (int i = 0; i < 10; ++i) {
- TimeUnit.MILLISECONDS.sleep(10);
- if (Iterables.size(discoverables) == expected) {
- return true;
- }
- }
- return (Iterables.size(discoverables) == expected);
- }
-
@Test (timeout = 5000)
public void testDoubleRegister() throws Exception {
- ZKDiscoveryService discoveryService = new ZKDiscoveryService(zkClient);
- DiscoveryServiceClient discoveryServiceClient = discoveryService;
+ Map.Entry<DiscoveryService, DiscoveryServiceClient> entry = create();
+ DiscoveryService discoveryService = entry.getKey();
+ DiscoveryServiceClient discoveryServiceClient = entry.getValue();
// Register on the same host port, it shouldn't fail.
Cancellable cancellable = register(discoveryService, "test_double_reg", "localhost", 54321);
Cancellable cancellable2 = register(discoveryService, "test_double_reg", "localhost", 54321);
- Iterable<Discoverable> discoverables = discoveryServiceClient.discover("test_double_reg");
+ ServiceDiscovered discoverables = discoveryServiceClient.discover("test_double_reg");
Assert.assertTrue(waitTillExpected(1, discoverables));
@@ -116,7 +90,7 @@ public class ZKDiscoveryServiceTest {
zkClient2.startAndWait();
try {
- ZKDiscoveryService discoveryService2 = new ZKDiscoveryService(zkClient2);
+ DiscoveryService discoveryService2 = new ZKDiscoveryService(zkClient2);
cancellable2 = register(discoveryService2, "test_multi_client", "localhost", 54321);
// Schedule a thread to shutdown zkClient2.
@@ -143,12 +117,13 @@ public class ZKDiscoveryServiceTest {
@Test
public void testSessionExpires() throws Exception {
- ZKDiscoveryService discoveryService = new ZKDiscoveryService(zkClient);
- DiscoveryServiceClient discoveryServiceClient = discoveryService;
+ Map.Entry<DiscoveryService, DiscoveryServiceClient> entry = create();
+ DiscoveryService discoveryService = entry.getKey();
+ DiscoveryServiceClient discoveryServiceClient = entry.getValue();
Cancellable cancellable = register(discoveryService, "test_expires", "localhost", 54321);
- Iterable<Discoverable> discoverables = discoveryServiceClient.discover("test_expires");
+ ServiceDiscovered discoverables = discoveryServiceClient.discover("test_expires");
// Discover that registered host:port.
Assert.assertTrue(waitTillExpected(1, discoverables));
@@ -168,86 +143,11 @@ public class ZKDiscoveryServiceTest {
Assert.assertTrue(waitTillExpected(0, discoverables));
}
- @Test
- public void simpleDiscoverable() throws Exception {
- DiscoveryService discoveryService = new ZKDiscoveryService(zkClient);
- DiscoveryServiceClient discoveryServiceClient = new ZKDiscoveryService(zkClient);
-
- // Register one service running on one host:port
- Cancellable cancellable = register(discoveryService, "foo", "localhost", 8090);
- Iterable<Discoverable> discoverables = discoveryServiceClient.discover("foo");
-
- // Discover that registered host:port.
- Assert.assertTrue(waitTillExpected(1, discoverables));
-
- // Remove the service
- cancellable.cancel();
-
- // There should be no service.
-
- discoverables = discoveryServiceClient.discover("foo");
-
- Assert.assertTrue(waitTillExpected(0, discoverables));
- }
-
- @Test
- public void manySameDiscoverable() throws Exception {
- List<Cancellable> cancellables = Lists.newArrayList();
+ @Override
+ protected Map.Entry<DiscoveryService, DiscoveryServiceClient> create() {
DiscoveryService discoveryService = new ZKDiscoveryService(zkClient);
DiscoveryServiceClient discoveryServiceClient = new ZKDiscoveryService(zkClient);
- cancellables.add(register(discoveryService, "manyDiscoverable", "localhost", 1));
- cancellables.add(register(discoveryService, "manyDiscoverable", "localhost", 2));
- cancellables.add(register(discoveryService, "manyDiscoverable", "localhost", 3));
- cancellables.add(register(discoveryService, "manyDiscoverable", "localhost", 4));
- cancellables.add(register(discoveryService, "manyDiscoverable", "localhost", 5));
-
- Iterable<Discoverable> discoverables = discoveryServiceClient.discover("manyDiscoverable");
- Assert.assertTrue(waitTillExpected(5, discoverables));
-
- for (int i = 0; i < 5; i++) {
- cancellables.get(i).cancel();
- Assert.assertTrue(waitTillExpected(4 - i, discoverables));
- }
- }
-
- @Test
- public void multiServiceDiscoverable() throws Exception {
- List<Cancellable> cancellables = Lists.newArrayList();
- DiscoveryService discoveryService = new ZKDiscoveryService(zkClient);
- DiscoveryServiceClient discoveryServiceClient = new ZKDiscoveryService(zkClient);
-
- cancellables.add(register(discoveryService, "service1", "localhost", 1));
- cancellables.add(register(discoveryService, "service1", "localhost", 2));
- cancellables.add(register(discoveryService, "service1", "localhost", 3));
- cancellables.add(register(discoveryService, "service1", "localhost", 4));
- cancellables.add(register(discoveryService, "service1", "localhost", 5));
-
- cancellables.add(register(discoveryService, "service2", "localhost", 1));
- cancellables.add(register(discoveryService, "service2", "localhost", 2));
- cancellables.add(register(discoveryService, "service2", "localhost", 3));
-
- cancellables.add(register(discoveryService, "service3", "localhost", 1));
- cancellables.add(register(discoveryService, "service3", "localhost", 2));
-
- Iterable<Discoverable> discoverables = discoveryServiceClient.discover("service1");
- Assert.assertTrue(waitTillExpected(5, discoverables));
-
- discoverables = discoveryServiceClient.discover("service2");
- Assert.assertTrue(waitTillExpected(3, discoverables));
-
- discoverables = discoveryServiceClient.discover("service3");
- Assert.assertTrue(waitTillExpected(2, discoverables));
-
- cancellables.add(register(discoveryService, "service3", "localhost", 3));
- Assert.assertTrue(waitTillExpected(3, discoverables)); // Shows live iterator.
-
- for (Cancellable cancellable : cancellables) {
- cancellable.cancel();
- }
-
- Assert.assertTrue(waitTillExpected(0, discoveryServiceClient.discover("service1")));
- Assert.assertTrue(waitTillExpected(0, discoveryServiceClient.discover("service2")));
- Assert.assertTrue(waitTillExpected(0, discoveryServiceClient.discover("service3")));
+ return Maps.immutableEntry(discoveryService, discoveryServiceClient);
}
}