You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@twill.apache.org by as...@apache.org on 2014/01/16 23:10:29 UTC

git commit: Improve service discovery API to allow push model for getting endpoint changes.

Updated Branches:
  refs/heads/master 1380f21c0 -> b0dd8e241


Improve service discovery API to allow push model for getting endpoint changes.

This major change to to have the DiscoveryServiceClient.discovery() method to return
a ServiceDiscovered object, which allows caller to set listener to receives changes
service endpoint. The ServiceDiscovered object by itself extends from Iterable to maintain
existing pull model and backward compatibility.

Signed-off-by: Albert Shau <al...@continuuity.com>


Project: http://git-wip-us.apache.org/repos/asf/incubator-twill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-twill/commit/b0dd8e24
Tree: http://git-wip-us.apache.org/repos/asf/incubator-twill/tree/b0dd8e24
Diff: http://git-wip-us.apache.org/repos/asf/incubator-twill/diff/b0dd8e24

Branch: refs/heads/master
Commit: b0dd8e241b55532c0a8c5d18f5bccf6395e3b2ea
Parents: 1380f21
Author: Terence Yim <te...@continuuity.com>
Authored: Thu Jan 16 12:59:43 2014 -0800
Committer: Albert Shau <al...@continuuity.com>
Committed: Thu Jan 16 14:09:46 2014 -0800

----------------------------------------------------------------------
 .../org/apache/twill/api/TwillController.java   |   6 +-
 .../twill/internal/AbstractTwillController.java |   4 +-
 .../twill/discovery/DiscoveryServiceClient.java |   6 +-
 .../twill/discovery/ServiceDiscovered.java      |  71 +++++
 .../discovery/DefaultServiceDiscovered.java     | 159 +++++++++++
 .../twill/discovery/DiscoverableAdapter.java    | 102 +++++++
 .../discovery/InMemoryDiscoveryService.java     |  63 +++--
 .../twill/discovery/ZKDiscoveryService.java     | 111 +-------
 .../discovery/DiscoveryServiceTestBase.java     | 278 +++++++++++++++++++
 .../discovery/InMemoryDiscoveryServiceTest.java |  44 +--
 .../twill/discovery/ZKDiscoveryServiceTest.java | 130 +--------
 11 files changed, 690 insertions(+), 284 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/b0dd8e24/twill-api/src/main/java/org/apache/twill/api/TwillController.java
----------------------------------------------------------------------
diff --git a/twill-api/src/main/java/org/apache/twill/api/TwillController.java b/twill-api/src/main/java/org/apache/twill/api/TwillController.java
index f31d3f9..a5906f4 100644
--- a/twill-api/src/main/java/org/apache/twill/api/TwillController.java
+++ b/twill-api/src/main/java/org/apache/twill/api/TwillController.java
@@ -19,6 +19,7 @@ package org.apache.twill.api;
 
 import org.apache.twill.api.logging.LogHandler;
 import org.apache.twill.discovery.Discoverable;
+import org.apache.twill.discovery.ServiceDiscovered;
 import com.google.common.util.concurrent.ListenableFuture;
 
 /**
@@ -35,10 +36,9 @@ public interface TwillController extends ServiceController {
   /**
    * Discovers the set of {@link Discoverable} endpoints that provides service for the given service name.
    * @param serviceName Name of the service to discovery.
-   * @return An {@link Iterable} that gives set of latest {@link Discoverable} every time when
-   *         {@link Iterable#iterator()}} is invoked.
+   * @return A {@link org.apache.twill.discovery.ServiceDiscovered} object representing the result.
    */
-  Iterable<Discoverable> discoverService(String serviceName);
+  ServiceDiscovered discoverService(String serviceName);
 
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/b0dd8e24/twill-core/src/main/java/org/apache/twill/internal/AbstractTwillController.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/AbstractTwillController.java b/twill-core/src/main/java/org/apache/twill/internal/AbstractTwillController.java
index 5806f9d..97e0a8f 100644
--- a/twill-core/src/main/java/org/apache/twill/internal/AbstractTwillController.java
+++ b/twill-core/src/main/java/org/apache/twill/internal/AbstractTwillController.java
@@ -21,8 +21,8 @@ import org.apache.twill.api.RunId;
 import org.apache.twill.api.TwillController;
 import org.apache.twill.api.logging.LogEntry;
 import org.apache.twill.api.logging.LogHandler;
-import org.apache.twill.discovery.Discoverable;
 import org.apache.twill.discovery.DiscoveryServiceClient;
+import org.apache.twill.discovery.ServiceDiscovered;
 import org.apache.twill.discovery.ZKDiscoveryService;
 import org.apache.twill.internal.json.StackTraceElementCodec;
 import org.apache.twill.internal.kafka.client.SimpleKafkaClient;
@@ -97,7 +97,7 @@ public abstract class AbstractTwillController extends AbstractZKServiceControlle
   }
 
   @Override
-  public final Iterable<Discoverable> discoverService(String serviceName) {
+  public final ServiceDiscovered discoverService(String serviceName) {
     return discoveryServiceClient.discover(serviceName);
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/b0dd8e24/twill-discovery-api/src/main/java/org/apache/twill/discovery/DiscoveryServiceClient.java
----------------------------------------------------------------------
diff --git a/twill-discovery-api/src/main/java/org/apache/twill/discovery/DiscoveryServiceClient.java b/twill-discovery-api/src/main/java/org/apache/twill/discovery/DiscoveryServiceClient.java
index 89cf269..a58c83d 100644
--- a/twill-discovery-api/src/main/java/org/apache/twill/discovery/DiscoveryServiceClient.java
+++ b/twill-discovery-api/src/main/java/org/apache/twill/discovery/DiscoveryServiceClient.java
@@ -26,9 +26,7 @@ public interface DiscoveryServiceClient {
    * Retrieves a list of {@link Discoverable} for the a service with the given name.
    *
    * @param name Name of the service
-   * @return A live {@link Iterable} that on each call to {@link Iterable#iterator()} returns
-   *         an {@link java.util.Iterator Iterator} that reflects the latest set of
-   *         available {@link Discoverable} services.
+   * @return A {@link org.apache.twill.discovery.ServiceDiscovered} object representing the result.
    */
-  Iterable<Discoverable> discover(String name);
+  ServiceDiscovered discover(String name);
 }

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/b0dd8e24/twill-discovery-api/src/main/java/org/apache/twill/discovery/ServiceDiscovered.java
----------------------------------------------------------------------
diff --git a/twill-discovery-api/src/main/java/org/apache/twill/discovery/ServiceDiscovered.java b/twill-discovery-api/src/main/java/org/apache/twill/discovery/ServiceDiscovered.java
new file mode 100644
index 0000000..5de15f0
--- /dev/null
+++ b/twill-discovery-api/src/main/java/org/apache/twill/discovery/ServiceDiscovered.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.discovery;
+
+import org.apache.twill.common.Cancellable;
+
+import java.util.concurrent.Executor;
+
+/**
+ * Represents the result of service discovery. It extends from
+ * {@link Iterable} that gives set of latest {@link Discoverable} every time when
+ * {@link #iterator()} is invoked.
+ */
+public interface ServiceDiscovered extends Iterable<Discoverable> {
+
+  /**
+   * Represents a callback for watching changes in the discovery list.
+   *
+   * @see #watchChanges(ServiceDiscovered.ChangeListener, java.util.concurrent.Executor)
+   */
+  interface ChangeListener {
+
+    /**
+     * This method will be invoked when the discovery list changed.
+     *
+     * @param serviceDiscovered The {@link ServiceDiscovered} that this listener is attached to.
+     */
+    void onChange(ServiceDiscovered serviceDiscovered);
+  }
+
+  /**
+   * Returns the name of the service being discovered.
+   *
+   * @return Name of the service.
+   */
+  String getName();
+
+  /**
+   * Registers a {@link ChangeListener} to watch for changes in the discovery list.
+   * The {@link ChangeListener#onChange(ServiceDiscovered)} method will be triggered when start watching,
+   * and on every subsequent changes in the discovery list.
+   *
+   * @param listener A {@link ChangeListener} to watch for changes.
+   * @param executor A {@link Executor} for issuing call to the given listener.
+   * @return A {@link Cancellable} to cancel the watch.
+   */
+  Cancellable watchChanges(ChangeListener listener, Executor executor);
+
+  /**
+   * Checks if the given discoverable contains in the current discovery list.
+   *
+   * @param discoverable The {@link Discoverable} to check for.
+   * @return {@code true} if it exists, {@code false} otherwise.
+   */
+  boolean contains(Discoverable discoverable);
+}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/b0dd8e24/twill-discovery-core/src/main/java/org/apache/twill/discovery/DefaultServiceDiscovered.java
----------------------------------------------------------------------
diff --git a/twill-discovery-core/src/main/java/org/apache/twill/discovery/DefaultServiceDiscovered.java b/twill-discovery-core/src/main/java/org/apache/twill/discovery/DefaultServiceDiscovered.java
new file mode 100644
index 0000000..49fb641
--- /dev/null
+++ b/twill-discovery-core/src/main/java/org/apache/twill/discovery/DefaultServiceDiscovered.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.discovery;
+
+import org.apache.twill.common.Cancellable;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Lists;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+/**
+ * A package private class for implementing {@link ServiceDiscovered}.
+ */
+final class DefaultServiceDiscovered implements ServiceDiscovered {
+
+  private final String name;
+  private final AtomicReference<Set<Discoverable>> discoverables;
+  private final List<ListenerCaller> listenerCallers;
+  private final ReadWriteLock callerLock;
+
+  DefaultServiceDiscovered(String name) {
+    this.name = name;
+    this.discoverables = new AtomicReference<Set<Discoverable>>(ImmutableSet.<Discoverable>of());
+    this.listenerCallers = Lists.newLinkedList();
+    this.callerLock = new ReentrantReadWriteLock();
+  }
+
+  void setDiscoverables(Set<Discoverable> discoverables) {
+    this.discoverables.set(discoverables);
+
+    // Collect all listeners with a read lock to the listener list.
+    List<ListenerCaller> callers = Lists.newArrayList();
+    Lock readLock = callerLock.readLock();
+    readLock.lock();
+    try {
+      callers.addAll(listenerCallers);
+    } finally {
+      readLock.unlock();
+    }
+
+    // Invoke listeners.
+    for (ListenerCaller caller : callers) {
+      caller.invoke();
+    }
+  }
+
+  @Override
+  public String getName() {
+    return name;
+  }
+
+  @Override
+  public Cancellable watchChanges(ChangeListener listener, Executor executor) {
+    ListenerCaller caller = new ListenerCaller(listener, executor);
+
+    // Add the new listener with a write lock.
+    Lock writeLock = callerLock.writeLock();
+    writeLock.lock();
+    try {
+      listenerCallers.add(caller);
+    } finally {
+      writeLock.unlock();
+    }
+
+    // Invoke listener for the first time.
+    // Race would happen between this method and the setDiscoverables() method, but it's ok as the contract of
+    // adding a new listener is that onChange will be called at least once. The actual changes is already
+    // reflected by the atomic reference "discoverables", hence it's consistent.
+    caller.invoke();
+    return caller;
+  }
+
+  @Override
+  public boolean contains(Discoverable discoverable) {
+    // If the name doesn't match, it shouldn't be in the list.
+    if (!discoverable.getName().equals(name)) {
+      return false;
+    }
+
+    // Wrap it if necessary for hashCode/equals comparison.
+    Discoverable target = discoverable;
+    if (!(target instanceof DiscoverableWrapper)) {
+      target = new DiscoverableWrapper(target);
+    }
+
+    return discoverables.get().contains(target);
+  }
+
+  @Override
+  public Iterator<Discoverable> iterator() {
+    return discoverables.get().iterator();
+  }
+
+  /**
+   * Private helper class for invoking the change listener from an executor.
+   * It also responsible to remove itself from the listener list.
+   */
+  private final class ListenerCaller implements Runnable, Cancellable {
+
+    private final ChangeListener listener;
+    private final Executor executor;
+    private final AtomicBoolean cancelled;
+
+    private ListenerCaller(ChangeListener listener, Executor executor) {
+      this.listener = listener;
+      this.executor = executor;
+      this.cancelled = new AtomicBoolean(false);
+    }
+
+    void invoke() {
+      if (!cancelled.get()) {
+        executor.execute(this);
+      }
+    }
+
+    @Override
+    public void run() {
+      if (!cancelled.get()) {
+        listener.onChange(DefaultServiceDiscovered.this);
+      }
+    }
+
+    @Override
+    public void cancel() {
+      if (cancelled.compareAndSet(false, true)) {
+        Lock writeLock = callerLock.writeLock();
+        writeLock.lock();
+        try {
+          listenerCallers.remove(this);
+        } finally {
+          writeLock.unlock();
+        }
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/b0dd8e24/twill-discovery-core/src/main/java/org/apache/twill/discovery/DiscoverableAdapter.java
----------------------------------------------------------------------
diff --git a/twill-discovery-core/src/main/java/org/apache/twill/discovery/DiscoverableAdapter.java b/twill-discovery-core/src/main/java/org/apache/twill/discovery/DiscoverableAdapter.java
new file mode 100644
index 0000000..0e4bcc3
--- /dev/null
+++ b/twill-discovery-core/src/main/java/org/apache/twill/discovery/DiscoverableAdapter.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.discovery;
+
+import com.google.common.base.Charsets;
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import com.google.gson.JsonDeserializationContext;
+import com.google.gson.JsonDeserializer;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParseException;
+import com.google.gson.JsonSerializationContext;
+import com.google.gson.JsonSerializer;
+
+import java.lang.reflect.Type;
+import java.net.InetSocketAddress;
+
+/**
+ * Helper class to serialize and deserialize {@link Discoverable}.
+ */
+final class DiscoverableAdapter {
+
+  private static final Gson GSON =
+    new GsonBuilder().registerTypeAdapter(Discoverable.class, new DiscoverableCodec()).create();
+
+  /**
+   * Helper function for encoding an instance of {@link Discoverable} into array of bytes.
+   * @param discoverable An instance of {@link Discoverable}
+   * @return array of bytes representing an instance of <code>discoverable</code>
+   */
+  static byte[] encode(Discoverable discoverable) {
+    return GSON.toJson(discoverable, Discoverable.class).getBytes(Charsets.UTF_8);
+  }
+
+  /**
+   * Helper function for decoding array of bytes into a {@link Discoverable} object.
+   * @param encoded representing serialized {@link Discoverable}
+   * @return {@code null} if encoded bytes are null; else an instance of {@link Discoverable}
+   */
+  static Discoverable decode(byte[] encoded) {
+    if (encoded == null) {
+      return null;
+    }
+    return GSON.fromJson(new String(encoded, Charsets.UTF_8), Discoverable.class);
+  }
+
+  private DiscoverableAdapter() {
+  }
+
+  /**
+   * SerDe for converting a {@link Discoverable} into a JSON object
+   * or from a JSON object into {@link Discoverable}.
+   */
+  private static final class DiscoverableCodec implements JsonSerializer<Discoverable>, JsonDeserializer<Discoverable> {
+
+    @Override
+    public Discoverable deserialize(JsonElement json, Type typeOfT,
+                                    JsonDeserializationContext context) throws JsonParseException {
+      JsonObject jsonObj = json.getAsJsonObject();
+      final String service = jsonObj.get("service").getAsString();
+      String hostname = jsonObj.get("hostname").getAsString();
+      int port = jsonObj.get("port").getAsInt();
+      final InetSocketAddress address = new InetSocketAddress(hostname, port);
+      return new DiscoverableWrapper(new Discoverable() {
+        @Override
+        public String getName() {
+          return service;
+        }
+
+        @Override
+        public InetSocketAddress getSocketAddress() {
+          return address;
+        }
+      });
+    }
+
+    @Override
+    public JsonElement serialize(Discoverable src, Type typeOfSrc, JsonSerializationContext context) {
+      JsonObject jsonObj = new JsonObject();
+      jsonObj.addProperty("service", src.getName());
+      jsonObj.addProperty("hostname", src.getSocketAddress().getHostName());
+      jsonObj.addProperty("port", src.getSocketAddress().getPort());
+      return jsonObj;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/b0dd8e24/twill-discovery-core/src/main/java/org/apache/twill/discovery/InMemoryDiscoveryService.java
----------------------------------------------------------------------
diff --git a/twill-discovery-core/src/main/java/org/apache/twill/discovery/InMemoryDiscoveryService.java b/twill-discovery-core/src/main/java/org/apache/twill/discovery/InMemoryDiscoveryService.java
index 7a9e984..2f950b8 100644
--- a/twill-discovery-core/src/main/java/org/apache/twill/discovery/InMemoryDiscoveryService.java
+++ b/twill-discovery-core/src/main/java/org/apache/twill/discovery/InMemoryDiscoveryService.java
@@ -18,11 +18,11 @@
 package org.apache.twill.discovery;
 
 import org.apache.twill.common.Cancellable;
-import com.google.common.collect.HashMultimap;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Multimap;
+import com.google.common.collect.LinkedHashMultimap;
+import com.google.common.collect.Maps;
+import com.google.common.collect.SetMultimap;
 
-import java.util.Iterator;
+import java.util.Map;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 
@@ -31,43 +31,58 @@ import java.util.concurrent.locks.ReentrantLock;
  */
 public class InMemoryDiscoveryService implements DiscoveryService, DiscoveryServiceClient {
 
-  private final Multimap<String, Discoverable> services = HashMultimap.create();
+  private final SetMultimap<String, Discoverable> services = LinkedHashMultimap.create();
+  private final Map<String, DefaultServiceDiscovered> serviceDiscoveredMap = Maps.newHashMap();
   private final Lock lock = new ReentrantLock();
 
   @Override
   public Cancellable register(final Discoverable discoverable) {
+    final Discoverable wrapper = new DiscoverableWrapper(discoverable);
+    final String serviceName = wrapper.getName();
+
     lock.lock();
     try {
-      final Discoverable wrapper = new DiscoverableWrapper(discoverable);
-      services.put(wrapper.getName(), wrapper);
-      return new Cancellable() {
-        @Override
-        public void cancel() {
-          lock.lock();
-          try {
-            services.remove(wrapper.getName(), wrapper);
-          } finally {
-            lock.unlock();
-          }
-        }
-      };
+      services.put(serviceName, wrapper);
+
+      DefaultServiceDiscovered serviceDiscovered = serviceDiscoveredMap.get(serviceName);
+      if (serviceDiscovered != null) {
+        serviceDiscovered.setDiscoverables(services.get(serviceName));
+      }
     } finally {
       lock.unlock();
     }
-  }
 
-  @Override
-  public Iterable<Discoverable> discover(final String name) {
-    return new Iterable<Discoverable>() {
+    return new Cancellable() {
       @Override
-      public Iterator<Discoverable> iterator() {
+      public void cancel() {
         lock.lock();
         try {
-          return ImmutableList.copyOf(services.get(name)).iterator();
+          services.remove(serviceName, wrapper);
+
+          DefaultServiceDiscovered serviceDiscovered = serviceDiscoveredMap.get(serviceName);
+          if (serviceDiscovered != null) {
+            serviceDiscovered.setDiscoverables(services.get(serviceName));
+          }
         } finally {
           lock.unlock();
         }
       }
     };
   }
+
+  @Override
+  public ServiceDiscovered discover(final String name) {
+    lock.lock();
+    try {
+      DefaultServiceDiscovered serviceDiscovered = serviceDiscoveredMap.get(name);
+      if (serviceDiscovered == null) {
+        serviceDiscovered = new DefaultServiceDiscovered(name);
+        serviceDiscovered.setDiscoverables(services.get(name));
+        serviceDiscoveredMap.put(name, serviceDiscovered);
+      }
+      return serviceDiscovered;
+    } finally {
+      lock.unlock();
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/b0dd8e24/twill-discovery-core/src/main/java/org/apache/twill/discovery/ZKDiscoveryService.java
----------------------------------------------------------------------
diff --git a/twill-discovery-core/src/main/java/org/apache/twill/discovery/ZKDiscoveryService.java b/twill-discovery-core/src/main/java/org/apache/twill/discovery/ZKDiscoveryService.java
index 862fcbc..0543626 100644
--- a/twill-discovery-core/src/main/java/org/apache/twill/discovery/ZKDiscoveryService.java
+++ b/twill-discovery-core/src/main/java/org/apache/twill/discovery/ZKDiscoveryService.java
@@ -25,12 +25,11 @@ import org.apache.twill.zookeeper.OperationFuture;
 import org.apache.twill.zookeeper.ZKClient;
 import org.apache.twill.zookeeper.ZKClients;
 import org.apache.twill.zookeeper.ZKOperations;
-import com.google.common.base.Charsets;
 import com.google.common.cache.CacheBuilder;
 import com.google.common.cache.CacheLoader;
 import com.google.common.cache.LoadingCache;
 import com.google.common.collect.HashMultimap;
-import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Multimap;
 import com.google.common.hash.Hashing;
@@ -38,14 +37,6 @@ import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.SettableFuture;
-import com.google.gson.GsonBuilder;
-import com.google.gson.JsonDeserializationContext;
-import com.google.gson.JsonDeserializer;
-import com.google.gson.JsonElement;
-import com.google.gson.JsonObject;
-import com.google.gson.JsonParseException;
-import com.google.gson.JsonSerializationContext;
-import com.google.gson.JsonSerializer;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.WatchedEvent;
@@ -54,16 +45,13 @@ import org.apache.zookeeper.data.Stat;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.lang.reflect.Type;
 import java.net.InetSocketAddress;
-import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 
@@ -96,7 +84,7 @@ import java.util.concurrent.locks.ReentrantLock;
  *      &#125;);
  *      ...
  *      ...
- *      Iterable<Discoverable> services = service.discovery("service-name");
+ *      ServiceDiscovered services = service.discovery("service-name");
  *      ...
  *      }
  *    </pre>
@@ -114,7 +102,7 @@ public class ZKDiscoveryService implements DiscoveryService, DiscoveryServiceCli
   private final Multimap<Discoverable, DiscoveryCancellable> discoverables;
   private final Lock lock;
 
-  private final LoadingCache<String, Iterable<Discoverable>> services;
+  private final LoadingCache<String, ServiceDiscovered> services;
   private final ZKClient zkClient;
   private final ScheduledExecutorService retryExecutor;
 
@@ -193,7 +181,7 @@ public class ZKDiscoveryService implements DiscoveryService, DiscoveryServiceCli
   }
 
   @Override
-  public Iterable<Discoverable> discover(String service) {
+  public ServiceDiscovered discover(String service) {
     return services.getUnchecked(service);
   }
 
@@ -251,7 +239,7 @@ public class ZKDiscoveryService implements DiscoveryService, DiscoveryServiceCli
   }
 
   private OperationFuture<String> doRegister(Discoverable discoverable) {
-    byte[] discoverableBytes = encode(discoverable);
+    byte[] discoverableBytes = DiscoverableAdapter.encode(discoverable);
     return zkClient.create(getNodePath(discoverable), discoverableBytes, CreateMode.EPHEMERAL, true);
   }
 
@@ -330,14 +318,11 @@ public class ZKDiscoveryService implements DiscoveryService, DiscoveryServiceCli
   /**
    * Creates a CacheLoader for creating live Iterable for watching instances changes for a given service.
    */
-  private CacheLoader<String, Iterable<Discoverable>> createServiceLoader() {
-    return new CacheLoader<String, Iterable<Discoverable>>() {
+  private CacheLoader<String, ServiceDiscovered> createServiceLoader() {
+    return new CacheLoader<String, ServiceDiscovered>() {
       @Override
-      public Iterable<Discoverable> load(String service) throws Exception {
-        // The atomic reference is to keep the resulting Iterable live. It always contains a
-        // immutable snapshot of the latest detected set of Discoverable.
-        final AtomicReference<Iterable<Discoverable>> iterable =
-              new AtomicReference<Iterable<Discoverable>>(ImmutableList.<Discoverable>of());
+      public ServiceDiscovered load(String service) throws Exception {
+        final DefaultServiceDiscovered serviceDiscovered = new DefaultServiceDiscovered(service);
         final String serviceBase = "/" + service;
 
         // Watch for children changes in /service
@@ -356,60 +341,27 @@ public class ZKDiscoveryService implements DiscoveryService, DiscoveryServiceCli
             fetchFuture.addListener(new Runnable() {
               @Override
               public void run() {
-                ImmutableList.Builder<Discoverable> builder = ImmutableList.builder();
+                ImmutableSet.Builder<Discoverable> builder = ImmutableSet.builder();
                 for (NodeData nodeData : Futures.getUnchecked(fetchFuture)) {
                   // For successful fetch, decode the content.
                   if (nodeData != null) {
-                    Discoverable discoverable = decode(nodeData.getData());
+                    Discoverable discoverable = DiscoverableAdapter.decode(nodeData.getData());
                     if (discoverable != null) {
                       builder.add(discoverable);
                     }
                   }
                 }
-                iterable.set(builder.build());
+                serviceDiscovered.setDiscoverables(builder.build());
               }
             }, Threads.SAME_THREAD_EXECUTOR);
           }
         });
-
-        return new Iterable<Discoverable>() {
-          @Override
-          public Iterator<Discoverable> iterator() {
-            return iterable.get().iterator();
-          }
-        };
+        return serviceDiscovered;
       }
     };
   }
 
   /**
-   * Static helper function for decoding array of bytes into a {@link DiscoverableWrapper} object.
-   * @param bytes representing serialized {@link DiscoverableWrapper}
-   * @return null if bytes are null; else an instance of {@link DiscoverableWrapper}
-   */
-  private static Discoverable decode(byte[] bytes) {
-    if (bytes == null) {
-      return null;
-    }
-    String content = new String(bytes, Charsets.UTF_8);
-    return new GsonBuilder().registerTypeAdapter(Discoverable.class, new DiscoverableCodec())
-      .create()
-      .fromJson(content, Discoverable.class);
-  }
-
-  /**
-   * Static helper function for encoding an instance of {@link DiscoverableWrapper} into array of bytes.
-   * @param discoverable An instance of {@link Discoverable}
-   * @return array of bytes representing an instance of <code>discoverable</code>
-   */
-  private static byte[] encode(Discoverable discoverable) {
-    return new GsonBuilder().registerTypeAdapter(DiscoverableWrapper.class, new DiscoverableCodec())
-      .create()
-      .toJson(discoverable, DiscoverableWrapper.class)
-      .getBytes(Charsets.UTF_8);
-  }
-
-  /**
    * Inner class for cancelling (un-register) discovery service.
    */
   private final class DiscoveryCancellable implements Cancellable {
@@ -471,42 +423,5 @@ public class ZKDiscoveryService implements DiscoveryService, DiscoveryServiceCli
       LOG.debug("Service unregistered: {} {}", discoverable, path);
     }
   }
-
-  /**
-   * SerDe for converting a {@link DiscoverableWrapper} into a JSON object
-   * or from a JSON object into {@link DiscoverableWrapper}.
-   */
-  private static final class DiscoverableCodec implements JsonSerializer<Discoverable>, JsonDeserializer<Discoverable> {
-
-    @Override
-    public Discoverable deserialize(JsonElement json, Type typeOfT,
-                                    JsonDeserializationContext context) throws JsonParseException {
-      JsonObject jsonObj = json.getAsJsonObject();
-      final String service = jsonObj.get("service").getAsString();
-      String hostname = jsonObj.get("hostname").getAsString();
-      int port = jsonObj.get("port").getAsInt();
-      final InetSocketAddress address = new InetSocketAddress(hostname, port);
-      return new Discoverable() {
-        @Override
-        public String getName() {
-          return service;
-        }
-
-        @Override
-        public InetSocketAddress getSocketAddress() {
-          return address;
-        }
-      };
-    }
-
-    @Override
-    public JsonElement serialize(Discoverable src, Type typeOfSrc, JsonSerializationContext context) {
-      JsonObject jsonObj = new JsonObject();
-      jsonObj.addProperty("service", src.getName());
-      jsonObj.addProperty("hostname", src.getSocketAddress().getHostName());
-      jsonObj.addProperty("port", src.getSocketAddress().getPort());
-      return jsonObj;
-    }
-  }
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/b0dd8e24/twill-discovery-core/src/test/java/org/apache/twill/discovery/DiscoveryServiceTestBase.java
----------------------------------------------------------------------
diff --git a/twill-discovery-core/src/test/java/org/apache/twill/discovery/DiscoveryServiceTestBase.java b/twill-discovery-core/src/test/java/org/apache/twill/discovery/DiscoveryServiceTestBase.java
new file mode 100644
index 0000000..17b526b
--- /dev/null
+++ b/twill-discovery-core/src/test/java/org/apache/twill/discovery/DiscoveryServiceTestBase.java
@@ -0,0 +1,278 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.discovery;
+
+import org.apache.twill.common.Cancellable;
+import org.apache.twill.common.Threads;
+import com.google.common.base.Throwables;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.net.InetSocketAddress;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executor;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Base class for testing different discovery service implementation.
+ */
+public abstract class DiscoveryServiceTestBase {
+
+  protected abstract Map.Entry<DiscoveryService, DiscoveryServiceClient> create();
+
+  @Test
+  public void simpleDiscoverable() throws Exception {
+    Map.Entry<DiscoveryService, DiscoveryServiceClient> entry = create();
+    DiscoveryService discoveryService = entry.getKey();
+    DiscoveryServiceClient discoveryServiceClient = entry.getValue();
+
+    // Register one service running on one host:port
+    Cancellable cancellable = register(discoveryService, "foo", "localhost", 8090);
+
+    // Discover that registered host:port.
+    ServiceDiscovered serviceDiscovered = discoveryServiceClient.discover("foo");
+    Assert.assertTrue(waitTillExpected(1, serviceDiscovered));
+
+    Discoverable discoverable = new Discoverable() {
+      @Override
+      public String getName() {
+        return "foo";
+      }
+
+      @Override
+      public InetSocketAddress getSocketAddress() {
+        return new InetSocketAddress("localhost", 8090);
+      }
+    };
+
+    // Check it exists.
+    Assert.assertTrue(serviceDiscovered.contains(discoverable));
+
+    // Remove the service
+    cancellable.cancel();
+
+    // There should be no service.
+    Assert.assertTrue(waitTillExpected(0, serviceDiscovered));
+
+    Assert.assertFalse(serviceDiscovered.contains(discoverable));
+  }
+
+  @Test
+  public void testChangeListener() throws InterruptedException {
+    Map.Entry<DiscoveryService, DiscoveryServiceClient> entry = create();
+    DiscoveryService discoveryService = entry.getKey();
+    DiscoveryServiceClient discoveryServiceClient = entry.getValue();
+
+    // Start discovery
+    String serviceName = "listener_test";
+    ServiceDiscovered serviceDiscovered = discoveryServiceClient.discover(serviceName);
+
+    // Watch for changes.
+    final BlockingQueue<List<Discoverable>> events = new ArrayBlockingQueue<List<Discoverable>>(10);
+    serviceDiscovered.watchChanges(new ServiceDiscovered.ChangeListener() {
+      @Override
+      public void onChange(ServiceDiscovered serviceDiscovered) {
+        events.add(ImmutableList.copyOf(serviceDiscovered));
+      }
+    }, Threads.SAME_THREAD_EXECUTOR);
+
+    // An empty list will be received first, as no endpoint has been registered.
+    List<Discoverable> discoverables = events.poll(5, TimeUnit.SECONDS);
+    Assert.assertNotNull(discoverables);
+    Assert.assertTrue(discoverables.isEmpty());
+
+    // Register a service
+    Cancellable cancellable = register(discoveryService, serviceName, "localhost", 10000);
+
+    discoverables = events.poll(5, TimeUnit.SECONDS);
+    Assert.assertNotNull(discoverables);
+    Assert.assertEquals(1, discoverables.size());
+
+    // Register another service endpoint
+    Cancellable cancellable2 = register(discoveryService, serviceName, "localhost", 10001);
+
+    discoverables = events.poll(5, TimeUnit.SECONDS);
+    Assert.assertNotNull(discoverables);
+    Assert.assertEquals(2, discoverables.size());
+
+    // Cancel both of them
+    cancellable.cancel();
+    cancellable2.cancel();
+
+    // There could be more than one event triggered, but the last event should be an empty list.
+    discoverables = events.poll(5, TimeUnit.SECONDS);
+    Assert.assertNotNull(discoverables);
+    if (!discoverables.isEmpty()) {
+      discoverables = events.poll(5, TimeUnit.SECONDS);
+    }
+
+    Assert.assertTrue(discoverables.isEmpty());
+  }
+
+  @Test
+  public void testCancelChangeListener() throws InterruptedException {
+    Map.Entry<DiscoveryService, DiscoveryServiceClient> entry = create();
+    DiscoveryService discoveryService = entry.getKey();
+    DiscoveryServiceClient discoveryServiceClient = entry.getValue();
+
+    String serviceName = "cancel_listener";
+    ServiceDiscovered serviceDiscovered = discoveryServiceClient.discover(serviceName);
+
+    // An executor that delay execute a Runnable. It's for testing race because listener cancel and discovery changes.
+    Executor delayExecutor = new Executor() {
+      @Override
+      public void execute(final Runnable command) {
+        Thread t = new Thread() {
+          @Override
+          public void run() {
+            try {
+              TimeUnit.SECONDS.sleep(2);
+              command.run();
+            } catch (InterruptedException e) {
+              throw Throwables.propagate(e);
+            }
+          }
+        };
+        t.start();
+      }
+    };
+
+    final BlockingQueue<List<Discoverable>> events = new ArrayBlockingQueue<List<Discoverable>>(10);
+    Cancellable cancelWatch = serviceDiscovered.watchChanges(new ServiceDiscovered.ChangeListener() {
+      @Override
+      public void onChange(ServiceDiscovered serviceDiscovered) {
+        events.add(ImmutableList.copyOf(serviceDiscovered));
+      }
+    }, delayExecutor);
+
+    // Wait for the init event call
+    Assert.assertNotNull(events.poll(3, TimeUnit.SECONDS));
+
+    // Register a new service endpoint, wait a short while and then cancel the listener
+    register(discoveryService, serviceName, "localhost", 1);
+    TimeUnit.SECONDS.sleep(1);
+    cancelWatch.cancel();
+
+    // The change listener shouldn't get any event, since the invocation is delayed by the executor.
+    Assert.assertNull(events.poll(3, TimeUnit.SECONDS));
+  }
+
+  @Test
+  public void manySameDiscoverable() throws Exception {
+    Map.Entry<DiscoveryService, DiscoveryServiceClient> entry = create();
+    DiscoveryService discoveryService = entry.getKey();
+    DiscoveryServiceClient discoveryServiceClient = entry.getValue();
+
+    List<Cancellable> cancellables = Lists.newArrayList();
+
+    cancellables.add(register(discoveryService, "manyDiscoverable", "localhost", 1));
+    cancellables.add(register(discoveryService, "manyDiscoverable", "localhost", 2));
+    cancellables.add(register(discoveryService, "manyDiscoverable", "localhost", 3));
+    cancellables.add(register(discoveryService, "manyDiscoverable", "localhost", 4));
+    cancellables.add(register(discoveryService, "manyDiscoverable", "localhost", 5));
+
+    ServiceDiscovered serviceDiscovered = discoveryServiceClient.discover("manyDiscoverable");
+    Assert.assertTrue(waitTillExpected(5, serviceDiscovered));
+
+    for (int i = 0; i < 5; i++) {
+      cancellables.get(i).cancel();
+      Assert.assertTrue(waitTillExpected(4 - i, serviceDiscovered));
+    }
+  }
+
+  @Test
+  public void multiServiceDiscoverable() throws Exception {
+    Map.Entry<DiscoveryService, DiscoveryServiceClient> entry = create();
+    DiscoveryService discoveryService = entry.getKey();
+    DiscoveryServiceClient discoveryServiceClient = entry.getValue();
+
+    List<Cancellable> cancellables = Lists.newArrayList();
+
+    cancellables.add(register(discoveryService, "service1", "localhost", 1));
+    cancellables.add(register(discoveryService, "service1", "localhost", 2));
+    cancellables.add(register(discoveryService, "service1", "localhost", 3));
+    cancellables.add(register(discoveryService, "service1", "localhost", 4));
+    cancellables.add(register(discoveryService, "service1", "localhost", 5));
+
+    cancellables.add(register(discoveryService, "service2", "localhost", 1));
+    cancellables.add(register(discoveryService, "service2", "localhost", 2));
+    cancellables.add(register(discoveryService, "service2", "localhost", 3));
+
+    cancellables.add(register(discoveryService, "service3", "localhost", 1));
+    cancellables.add(register(discoveryService, "service3", "localhost", 2));
+
+    ServiceDiscovered serviceDiscovered = discoveryServiceClient.discover("service1");
+    Assert.assertTrue(waitTillExpected(5, serviceDiscovered));
+
+    serviceDiscovered = discoveryServiceClient.discover("service2");
+    Assert.assertTrue(waitTillExpected(3, serviceDiscovered));
+
+    serviceDiscovered = discoveryServiceClient.discover("service3");
+    Assert.assertTrue(waitTillExpected(2, serviceDiscovered));
+
+    cancellables.add(register(discoveryService, "service3", "localhost", 3));
+    Assert.assertTrue(waitTillExpected(3, serviceDiscovered)); // Shows live iterator.
+
+    for (Cancellable cancellable : cancellables) {
+      cancellable.cancel();
+    }
+
+    Assert.assertTrue(waitTillExpected(0, discoveryServiceClient.discover("service1")));
+    Assert.assertTrue(waitTillExpected(0, discoveryServiceClient.discover("service2")));
+    Assert.assertTrue(waitTillExpected(0, discoveryServiceClient.discover("service3")));
+  }
+
+  protected Cancellable register(DiscoveryService service, final String name, final String host, final int port) {
+    return service.register(new Discoverable() {
+      @Override
+      public String getName() {
+        return name;
+      }
+
+      @Override
+      public InetSocketAddress getSocketAddress() {
+        return new InetSocketAddress(host, port);
+      }
+    });
+  }
+
+  protected boolean waitTillExpected(final int expected, ServiceDiscovered serviceDiscovered) {
+    final CountDownLatch latch = new CountDownLatch(1);
+    serviceDiscovered.watchChanges(new ServiceDiscovered.ChangeListener() {
+      @Override
+      public void onChange(ServiceDiscovered serviceDiscovered) {
+        if (expected == Iterables.size(serviceDiscovered)) {
+          latch.countDown();
+        }
+      }
+    }, Threads.SAME_THREAD_EXECUTOR);
+
+    try {
+      return latch.await(5, TimeUnit.SECONDS);
+    } catch (InterruptedException e) {
+      throw Throwables.propagate(e);
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/b0dd8e24/twill-discovery-core/src/test/java/org/apache/twill/discovery/InMemoryDiscoveryServiceTest.java
----------------------------------------------------------------------
diff --git a/twill-discovery-core/src/test/java/org/apache/twill/discovery/InMemoryDiscoveryServiceTest.java b/twill-discovery-core/src/test/java/org/apache/twill/discovery/InMemoryDiscoveryServiceTest.java
index d8cc375..d49e8a5 100644
--- a/twill-discovery-core/src/test/java/org/apache/twill/discovery/InMemoryDiscoveryServiceTest.java
+++ b/twill-discovery-core/src/test/java/org/apache/twill/discovery/InMemoryDiscoveryServiceTest.java
@@ -18,50 +18,18 @@
 
 package org.apache.twill.discovery;
 
-import org.apache.twill.common.Cancellable;
-import com.google.common.collect.Iterables;
-import org.junit.Assert;
-import org.junit.Test;
+import com.google.common.collect.Maps;
 
-import java.net.InetSocketAddress;
-import java.util.concurrent.TimeUnit;
+import java.util.Map;
 
 /**
  * Test memory based service discovery service.
  */
-public class InMemoryDiscoveryServiceTest {
-  private Cancellable register(DiscoveryService service, final String name, final String host, final int port) {
-    return service.register(new Discoverable() {
-      @Override
-      public String getName() {
-        return name;
-      }
+public class InMemoryDiscoveryServiceTest extends DiscoveryServiceTestBase {
 
-      @Override
-      public InetSocketAddress getSocketAddress() {
-        return new InetSocketAddress(host, port);
-      }
-    });
-  }
-
-  @Test
-  public void simpleDiscoverable() throws Exception {
+  @Override
+  protected Map.Entry<DiscoveryService, DiscoveryServiceClient> create() {
     DiscoveryService discoveryService = new InMemoryDiscoveryService();
-    DiscoveryServiceClient discoveryServiceClient = (DiscoveryServiceClient) discoveryService;
-
-    // Register one service running on one host:port
-    Cancellable cancellable = register(discoveryService, "foo", "localhost", 8090);
-    Iterable<Discoverable> discoverables = discoveryServiceClient.discover("foo");
-
-    // Discover that registered host:port.
-    Assert.assertTrue(Iterables.size(discoverables) == 1);
-
-    // Remove the service
-    cancellable.cancel();
-
-    // There should be no service.
-    discoverables = discoveryServiceClient.discover("foo");
-    TimeUnit.MILLISECONDS.sleep(100);
-    Assert.assertTrue(Iterables.size(discoverables) == 0);
+    return Maps.immutableEntry(discoveryService, (DiscoveryServiceClient) discoveryService);
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/b0dd8e24/twill-discovery-core/src/test/java/org/apache/twill/discovery/ZKDiscoveryServiceTest.java
----------------------------------------------------------------------
diff --git a/twill-discovery-core/src/test/java/org/apache/twill/discovery/ZKDiscoveryServiceTest.java b/twill-discovery-core/src/test/java/org/apache/twill/discovery/ZKDiscoveryServiceTest.java
index feee8db..6f0cde0 100644
--- a/twill-discovery-core/src/test/java/org/apache/twill/discovery/ZKDiscoveryServiceTest.java
+++ b/twill-discovery-core/src/test/java/org/apache/twill/discovery/ZKDiscoveryServiceTest.java
@@ -25,8 +25,7 @@ import org.apache.twill.zookeeper.RetryStrategies;
 import org.apache.twill.zookeeper.ZKClientService;
 import org.apache.twill.zookeeper.ZKClientServices;
 import org.apache.twill.zookeeper.ZKClients;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
 import com.google.common.util.concurrent.Futures;
 import org.junit.AfterClass;
 import org.junit.Assert;
@@ -35,14 +34,13 @@ import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.net.InetSocketAddress;
-import java.util.List;
+import java.util.Map;
 import java.util.concurrent.TimeUnit;
 
 /**
  * Test Zookeeper based discovery service.
  */
-public class ZKDiscoveryServiceTest {
+public class ZKDiscoveryServiceTest extends DiscoveryServiceTestBase {
   private static final Logger LOG = LoggerFactory.getLogger(ZKDiscoveryServiceTest.class);
 
   private static InMemoryZKServer zkServer;
@@ -66,41 +64,17 @@ public class ZKDiscoveryServiceTest {
     Futures.getUnchecked(Services.chainStop(zkClient, zkServer));
   }
 
-  private Cancellable register(DiscoveryService service, final String name, final String host, final int port) {
-    return service.register(new Discoverable() {
-      @Override
-      public String getName() {
-        return name;
-      }
-
-      @Override
-      public InetSocketAddress getSocketAddress() {
-        return new InetSocketAddress(host, port);
-      }
-    });
-  }
-
-
-  private boolean waitTillExpected(int expected, Iterable<Discoverable> discoverables) throws Exception {
-    for (int i = 0; i < 10; ++i) {
-      TimeUnit.MILLISECONDS.sleep(10);
-      if (Iterables.size(discoverables) == expected) {
-        return true;
-      }
-    }
-    return (Iterables.size(discoverables) == expected);
-  }
-
   @Test (timeout = 5000)
   public void testDoubleRegister() throws Exception {
-    ZKDiscoveryService discoveryService = new ZKDiscoveryService(zkClient);
-    DiscoveryServiceClient discoveryServiceClient = discoveryService;
+    Map.Entry<DiscoveryService, DiscoveryServiceClient> entry = create();
+    DiscoveryService discoveryService = entry.getKey();
+    DiscoveryServiceClient discoveryServiceClient = entry.getValue();
 
     // Register on the same host port, it shouldn't fail.
     Cancellable cancellable = register(discoveryService, "test_double_reg", "localhost", 54321);
     Cancellable cancellable2 = register(discoveryService, "test_double_reg", "localhost", 54321);
 
-    Iterable<Discoverable> discoverables = discoveryServiceClient.discover("test_double_reg");
+    ServiceDiscovered discoverables = discoveryServiceClient.discover("test_double_reg");
 
     Assert.assertTrue(waitTillExpected(1, discoverables));
 
@@ -116,7 +90,7 @@ public class ZKDiscoveryServiceTest {
     zkClient2.startAndWait();
 
     try {
-      ZKDiscoveryService discoveryService2 = new ZKDiscoveryService(zkClient2);
+      DiscoveryService discoveryService2 = new ZKDiscoveryService(zkClient2);
       cancellable2 = register(discoveryService2, "test_multi_client", "localhost", 54321);
 
       // Schedule a thread to shutdown zkClient2.
@@ -143,12 +117,13 @@ public class ZKDiscoveryServiceTest {
 
   @Test
   public void testSessionExpires() throws Exception {
-    ZKDiscoveryService discoveryService = new ZKDiscoveryService(zkClient);
-    DiscoveryServiceClient discoveryServiceClient = discoveryService;
+    Map.Entry<DiscoveryService, DiscoveryServiceClient> entry = create();
+    DiscoveryService discoveryService = entry.getKey();
+    DiscoveryServiceClient discoveryServiceClient = entry.getValue();
 
     Cancellable cancellable = register(discoveryService, "test_expires", "localhost", 54321);
 
-    Iterable<Discoverable> discoverables = discoveryServiceClient.discover("test_expires");
+    ServiceDiscovered discoverables = discoveryServiceClient.discover("test_expires");
 
     // Discover that registered host:port.
     Assert.assertTrue(waitTillExpected(1, discoverables));
@@ -168,86 +143,11 @@ public class ZKDiscoveryServiceTest {
     Assert.assertTrue(waitTillExpected(0, discoverables));
   }
 
-  @Test
-  public void simpleDiscoverable() throws Exception {
-    DiscoveryService discoveryService = new ZKDiscoveryService(zkClient);
-    DiscoveryServiceClient discoveryServiceClient = new ZKDiscoveryService(zkClient);
-
-    // Register one service running on one host:port
-    Cancellable cancellable = register(discoveryService, "foo", "localhost", 8090);
-    Iterable<Discoverable> discoverables = discoveryServiceClient.discover("foo");
-
-    // Discover that registered host:port.
-    Assert.assertTrue(waitTillExpected(1, discoverables));
-
-    // Remove the service
-    cancellable.cancel();
-
-    // There should be no service.
-
-    discoverables = discoveryServiceClient.discover("foo");
-
-    Assert.assertTrue(waitTillExpected(0, discoverables));
-  }
-
-  @Test
-  public void manySameDiscoverable() throws Exception {
-    List<Cancellable> cancellables = Lists.newArrayList();
+  @Override
+  protected Map.Entry<DiscoveryService, DiscoveryServiceClient> create() {
     DiscoveryService discoveryService = new ZKDiscoveryService(zkClient);
     DiscoveryServiceClient discoveryServiceClient = new ZKDiscoveryService(zkClient);
 
-    cancellables.add(register(discoveryService, "manyDiscoverable", "localhost", 1));
-    cancellables.add(register(discoveryService, "manyDiscoverable", "localhost", 2));
-    cancellables.add(register(discoveryService, "manyDiscoverable", "localhost", 3));
-    cancellables.add(register(discoveryService, "manyDiscoverable", "localhost", 4));
-    cancellables.add(register(discoveryService, "manyDiscoverable", "localhost", 5));
-
-    Iterable<Discoverable> discoverables = discoveryServiceClient.discover("manyDiscoverable");
-    Assert.assertTrue(waitTillExpected(5, discoverables));
-
-    for (int i = 0; i < 5; i++) {
-      cancellables.get(i).cancel();
-      Assert.assertTrue(waitTillExpected(4 - i, discoverables));
-    }
-  }
-
-  @Test
-  public void multiServiceDiscoverable() throws Exception {
-    List<Cancellable> cancellables = Lists.newArrayList();
-    DiscoveryService discoveryService = new ZKDiscoveryService(zkClient);
-    DiscoveryServiceClient discoveryServiceClient = new ZKDiscoveryService(zkClient);
-
-    cancellables.add(register(discoveryService, "service1", "localhost", 1));
-    cancellables.add(register(discoveryService, "service1", "localhost", 2));
-    cancellables.add(register(discoveryService, "service1", "localhost", 3));
-    cancellables.add(register(discoveryService, "service1", "localhost", 4));
-    cancellables.add(register(discoveryService, "service1", "localhost", 5));
-
-    cancellables.add(register(discoveryService, "service2", "localhost", 1));
-    cancellables.add(register(discoveryService, "service2", "localhost", 2));
-    cancellables.add(register(discoveryService, "service2", "localhost", 3));
-
-    cancellables.add(register(discoveryService, "service3", "localhost", 1));
-    cancellables.add(register(discoveryService, "service3", "localhost", 2));
-
-    Iterable<Discoverable> discoverables = discoveryServiceClient.discover("service1");
-    Assert.assertTrue(waitTillExpected(5, discoverables));
-
-    discoverables = discoveryServiceClient.discover("service2");
-    Assert.assertTrue(waitTillExpected(3, discoverables));
-
-    discoverables = discoveryServiceClient.discover("service3");
-    Assert.assertTrue(waitTillExpected(2, discoverables));
-
-    cancellables.add(register(discoveryService, "service3", "localhost", 3));
-    Assert.assertTrue(waitTillExpected(3, discoverables)); // Shows live iterator.
-
-    for (Cancellable cancellable : cancellables) {
-      cancellable.cancel();
-    }
-
-    Assert.assertTrue(waitTillExpected(0, discoveryServiceClient.discover("service1")));
-    Assert.assertTrue(waitTillExpected(0, discoveryServiceClient.discover("service2")));
-    Assert.assertTrue(waitTillExpected(0, discoveryServiceClient.discover("service3")));
+    return Maps.immutableEntry(discoveryService, discoveryServiceClient);
   }
 }