You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@twill.apache.org by ch...@apache.org on 2016/10/04 16:25:38 UTC

[11/16] twill git commit: (TWILL-177) Make ZKDiscoveryService AutoCloseable

(TWILL-177) Make ZKDiscoveryService AutoCloseable

- Release ZK watches when closed
- Fixed a race condition in ZKDiscoveryServiceTest
- Fix the easy to fail InitializeFailTestRun
  - Match with an error message emitted from AM instead of from the container.
    - It\u2019s possible that the one from the container don\u2019t have chance to
      send out before it gets killed.

This closes #8 on Github.

Signed-off-by: Terence Yim <ch...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/twill/repo
Commit: http://git-wip-us.apache.org/repos/asf/twill/commit/ced2044f
Tree: http://git-wip-us.apache.org/repos/asf/twill/tree/ced2044f
Diff: http://git-wip-us.apache.org/repos/asf/twill/diff/ced2044f

Branch: refs/heads/site
Commit: ced2044f185db59192e38411cb9e28f597a5ea04
Parents: fd9d726
Author: Terence Yim <ch...@apache.org>
Authored: Thu Aug 25 18:41:15 2016 -0700
Committer: Terence Yim <ch...@apache.org>
Committed: Tue Sep 13 14:18:49 2016 -0700

----------------------------------------------------------------------
 .../twill/internal/AbstractTwillController.java |  17 +-
 .../twill/internal/AbstractTwillService.java    |   8 +-
 .../discovery/DefaultServiceDiscovered.java     |   9 +-
 .../twill/discovery/ZKDiscoveryService.java     | 165 +++++++--
 .../discovery/DiscoveryServiceTestBase.java     | 349 +++++++++++--------
 .../twill/discovery/ZKDiscoveryServiceTest.java | 114 +++---
 .../internal/container/TwillContainerMain.java  |  30 +-
 .../org/apache/twill/yarn/BaseYarnTest.java     |  15 +
 .../apache/twill/yarn/EchoServerTestRun.java    |   4 +-
 .../twill/yarn/InitializeFailTestRun.java       |  12 +-
 .../org/apache/twill/yarn/YarnTestSuite.java    |   1 -
 .../internal/zookeeper/LeaderElection.java      |   7 +-
 12 files changed, 484 insertions(+), 247 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/twill/blob/ced2044f/twill-core/src/main/java/org/apache/twill/internal/AbstractTwillController.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/AbstractTwillController.java b/twill-core/src/main/java/org/apache/twill/internal/AbstractTwillController.java
index 41a044b..e60f6a3 100644
--- a/twill-core/src/main/java/org/apache/twill/internal/AbstractTwillController.java
+++ b/twill-core/src/main/java/org/apache/twill/internal/AbstractTwillController.java
@@ -28,7 +28,6 @@ import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.gson.Gson;
 import com.google.gson.GsonBuilder;
-
 import org.apache.twill.api.Command;
 import org.apache.twill.api.ResourceReport;
 import org.apache.twill.api.RunId;
@@ -38,7 +37,6 @@ import org.apache.twill.api.logging.LogEntry;
 import org.apache.twill.api.logging.LogHandler;
 import org.apache.twill.api.logging.LogThrowable;
 import org.apache.twill.common.Cancellable;
-import org.apache.twill.discovery.DiscoveryServiceClient;
 import org.apache.twill.discovery.ServiceDiscovered;
 import org.apache.twill.discovery.ZKDiscoveryService;
 import org.apache.twill.internal.json.LogEntryDecoder;
@@ -73,14 +71,13 @@ public abstract class AbstractTwillController extends AbstractZKServiceControlle
 
   private final Queue<LogHandler> logHandlers;
   private final KafkaClientService kafkaClient;
-  private final DiscoveryServiceClient discoveryServiceClient;
-  private volatile Cancellable logCancellable;
+  private ZKDiscoveryService discoveryServiceClient;
+  private Cancellable logCancellable;
 
   public AbstractTwillController(RunId runId, ZKClient zkClient, Iterable<LogHandler> logHandlers) {
     super(runId, zkClient);
     this.logHandlers = new ConcurrentLinkedQueue<>();
     this.kafkaClient = new ZKKafkaClientService(ZKClients.namespace(zkClient, "/" + runId.getId() + "/kafka"));
-    this.discoveryServiceClient = new ZKDiscoveryService(zkClient);
     Iterables.addAll(this.logHandlers, logHandlers);
   }
 
@@ -95,10 +92,13 @@ public abstract class AbstractTwillController extends AbstractZKServiceControlle
   }
 
   @Override
-  protected void doShutDown() {
+  protected synchronized void doShutDown() {
     if (logCancellable != null) {
       logCancellable.cancel();
     }
+    if (discoveryServiceClient != null) {
+      discoveryServiceClient.close();
+    }
     // Safe to call stop no matter when state the KafkaClientService is in.
     kafkaClient.stopAndWait();
   }
@@ -115,7 +115,10 @@ public abstract class AbstractTwillController extends AbstractZKServiceControlle
   }
 
   @Override
-  public final ServiceDiscovered discoverService(String serviceName) {
+  public final synchronized ServiceDiscovered discoverService(String serviceName) {
+    if (discoveryServiceClient == null) {
+      discoveryServiceClient = new ZKDiscoveryService(zkClient);
+    }
     return discoveryServiceClient.discover(serviceName);
   }
 

http://git-wip-us.apache.org/repos/asf/twill/blob/ced2044f/twill-core/src/main/java/org/apache/twill/internal/AbstractTwillService.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/AbstractTwillService.java b/twill-core/src/main/java/org/apache/twill/internal/AbstractTwillService.java
index 8688d0b..1717117 100644
--- a/twill-core/src/main/java/org/apache/twill/internal/AbstractTwillService.java
+++ b/twill-core/src/main/java/org/apache/twill/internal/AbstractTwillService.java
@@ -27,6 +27,7 @@ import com.google.common.util.concurrent.Service;
 import com.google.gson.Gson;
 import com.google.gson.JsonObject;
 import org.apache.twill.api.RunId;
+import org.apache.twill.common.Cancellable;
 import org.apache.twill.common.Threads;
 import org.apache.twill.internal.state.Message;
 import org.apache.twill.internal.state.MessageCallback;
@@ -88,6 +89,7 @@ public abstract class AbstractTwillService extends AbstractExecutionThreadServic
   protected final ZKClient zkClient;
   protected final RunId runId;
   private ExecutorService messageCallbackExecutor;
+  private Cancellable watcherCancellable;
 
   protected AbstractTwillService(final ZKClient zkClient, RunId runId) {
     this.zkClient = zkClient;
@@ -145,7 +147,7 @@ public abstract class AbstractTwillService extends AbstractExecutionThreadServic
                                                      new ThreadPoolExecutor.DiscardPolicy());
 
     // Watch for session expiration, recreate the live node if reconnected after expiration.
-    zkClient.addConnectionWatcher(new Watcher() {
+    watcherCancellable = zkClient.addConnectionWatcher(new Watcher() {
       private boolean expired = false;
 
       @Override
@@ -181,6 +183,10 @@ public abstract class AbstractTwillService extends AbstractExecutionThreadServic
 
   @Override
   protected final void shutDown() throws Exception {
+    if (watcherCancellable != null) {
+      watcherCancellable.cancel();
+    }
+
     messageCallbackExecutor.shutdownNow();
     try {
       doStop();

http://git-wip-us.apache.org/repos/asf/twill/blob/ced2044f/twill-discovery-core/src/main/java/org/apache/twill/discovery/DefaultServiceDiscovered.java
----------------------------------------------------------------------
diff --git a/twill-discovery-core/src/main/java/org/apache/twill/discovery/DefaultServiceDiscovered.java b/twill-discovery-core/src/main/java/org/apache/twill/discovery/DefaultServiceDiscovered.java
index dbb0f7c..ca55390 100644
--- a/twill-discovery-core/src/main/java/org/apache/twill/discovery/DefaultServiceDiscovered.java
+++ b/twill-discovery-core/src/main/java/org/apache/twill/discovery/DefaultServiceDiscovered.java
@@ -20,6 +20,8 @@ package org.apache.twill.discovery;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Lists;
 import org.apache.twill.common.Cancellable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.Iterator;
 import java.util.List;
@@ -36,6 +38,8 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
  */
 final class DefaultServiceDiscovered implements ServiceDiscovered {
 
+  private static final Logger LOG = LoggerFactory.getLogger(DefaultServiceDiscovered.class);
+
   private final String name;
   private final AtomicReference<Set<Discoverable>> discoverables;
   private final List<ListenerCaller> listenerCallers;
@@ -49,7 +53,10 @@ final class DefaultServiceDiscovered implements ServiceDiscovered {
   }
 
   void setDiscoverables(Set<Discoverable> discoverables) {
-    this.discoverables.set(ImmutableSet.copyOf(discoverables));
+    Set<Discoverable> newDiscoverables = ImmutableSet.copyOf(discoverables);
+    LOG.debug("Discoverables changed: {}={}", name, newDiscoverables);
+
+    this.discoverables.set(newDiscoverables);
 
     // Collect all listeners with a read lock to the listener list.
     List<ListenerCaller> callers = Lists.newArrayList();

http://git-wip-us.apache.org/repos/asf/twill/blob/ced2044f/twill-discovery-core/src/main/java/org/apache/twill/discovery/ZKDiscoveryService.java
----------------------------------------------------------------------
diff --git a/twill-discovery-core/src/main/java/org/apache/twill/discovery/ZKDiscoveryService.java b/twill-discovery-core/src/main/java/org/apache/twill/discovery/ZKDiscoveryService.java
index 4f2d0f7..6a3f64d 100644
--- a/twill-discovery-core/src/main/java/org/apache/twill/discovery/ZKDiscoveryService.java
+++ b/twill-discovery-core/src/main/java/org/apache/twill/discovery/ZKDiscoveryService.java
@@ -20,6 +20,8 @@ package org.apache.twill.discovery;
 import com.google.common.cache.CacheBuilder;
 import com.google.common.cache.CacheLoader;
 import com.google.common.cache.LoadingCache;
+import com.google.common.cache.RemovalListener;
+import com.google.common.cache.RemovalNotification;
 import com.google.common.collect.HashMultimap;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Lists;
@@ -47,14 +49,18 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.Executor;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
+import javax.annotation.Nullable;
 
 /**
  * Zookeeper implementation of {@link DiscoveryService} and {@link DiscoveryServiceClient}.
@@ -93,19 +99,21 @@ import java.util.concurrent.locks.ReentrantLock;
  *   </pre>
  * </blockquote>
  */
-public class ZKDiscoveryService implements DiscoveryService, DiscoveryServiceClient {
+public class ZKDiscoveryService implements DiscoveryService, DiscoveryServiceClient, AutoCloseable {
   private static final Logger LOG = LoggerFactory.getLogger(ZKDiscoveryService.class);
 
   private static final long RETRY_MILLIS = 1000;
 
+  private final AtomicBoolean closed;
   // In memory map for recreating ephemeral nodes after session expires.
   // It map from discoverable to the corresponding Cancellable
   private final Multimap<Discoverable, DiscoveryCancellable> discoverables;
   private final Lock lock;
 
-  private final LoadingCache<String, ServiceDiscovered> services;
+  private final LoadingCache<String, ServiceDiscoveredCacheEntry> services;
   private final ZKClient zkClient;
   private final ScheduledExecutorService retryExecutor;
+  private final Cancellable watcherCancellable;
 
   /**
    * Constructs ZKDiscoveryService using the provided zookeeper client for storing service registry.
@@ -122,13 +130,24 @@ public class ZKDiscoveryService implements DiscoveryService, DiscoveryServiceCli
    *                  If namespace is {@code null}, no namespace will be used.
    */
   public ZKDiscoveryService(ZKClient zkClient, String namespace) {
+    this.closed = new AtomicBoolean();
     this.discoverables = HashMultimap.create();
     this.lock = new ReentrantLock();
     this.retryExecutor = Executors.newSingleThreadScheduledExecutor(
       Threads.createDaemonThreadFactory("zk-discovery-retry"));
     this.zkClient = namespace == null ? zkClient : ZKClients.namespace(zkClient, namespace);
-    this.services = CacheBuilder.newBuilder().build(createServiceLoader());
-    this.zkClient.addConnectionWatcher(createConnectionWatcher());
+    this.services = CacheBuilder.newBuilder()
+      .removalListener(new RemovalListener<String, ServiceDiscoveredCacheEntry>() {
+        @Override
+        public void onRemoval(RemovalNotification<String, ServiceDiscoveredCacheEntry> notification) {
+          ServiceDiscoveredCacheEntry entry = notification.getValue();
+          if (entry != null) {
+            entry.cancel();
+          }
+        }
+      })
+      .build(createServiceLoader());
+    this.watcherCancellable = this.zkClient.addConnectionWatcher(createConnectionWatcher());
   }
 
   /**
@@ -146,6 +165,10 @@ public class ZKDiscoveryService implements DiscoveryService, DiscoveryServiceCli
    */
   @Override
   public Cancellable register(final Discoverable discoverable) {
+    if (closed.get()) {
+      throw new IllegalStateException("Cannot register discoverable through a closed ZKDiscoveryService");
+    }
+
     final SettableFuture<String> future = SettableFuture.create();
     final DiscoveryCancellable cancellable = new DiscoveryCancellable(discoverable);
 
@@ -157,7 +180,11 @@ public class ZKDiscoveryService implements DiscoveryService, DiscoveryServiceCli
         cancellable.setPath(result);
         lock.lock();
         try {
-          discoverables.put(discoverable, cancellable);
+          if (!closed.get()) {
+            discoverables.put(discoverable, cancellable);
+          } else {
+            cancellable.asyncCancel();
+          }
         } finally {
           lock.unlock();
         }
@@ -182,9 +209,47 @@ public class ZKDiscoveryService implements DiscoveryService, DiscoveryServiceCli
 
   @Override
   public ServiceDiscovered discover(String service) {
+    if (closed.get()) {
+      throw new IllegalStateException("Cannot discover through a closed ZKDiscoveryService");
+    }
     return services.getUnchecked(service);
   }
 
+  @Override
+  public void close() {
+    if (!closed.compareAndSet(false, true)) {
+      return;
+    }
+
+    // Stop the registration retry executor
+    retryExecutor.shutdownNow();
+
+    // Cancel the connection watcher
+    watcherCancellable.cancel();
+
+    // Cancel all registered services
+    List<ListenableFuture<?>> futures = new ArrayList<>();
+    lock.lock();
+    try {
+      for (Map.Entry<Discoverable, DiscoveryCancellable> entry : discoverables.entries()) {
+        LOG.debug("Un-registering service {} - {}", entry.getKey().getName(), entry.getKey().getSocketAddress());
+        futures.add(entry.getValue().asyncCancel());
+      }
+    } finally {
+      lock.unlock();
+    }
+    try {
+      Futures.successfulAsList(futures).get();
+      LOG.debug("All services unregistered");
+    } catch (Exception e) {
+      // This is not expected to happen
+      LOG.warn("Unexpected exception when waiting for all services to get unregistered", e);
+    }
+
+    // Cancel all services being watched
+    services.invalidateAll();
+  }
+
   /**
    * Handle registration failure.
    *
@@ -197,11 +262,14 @@ public class ZKDiscoveryService implements DiscoveryService, DiscoveryServiceCli
                                      final SettableFuture<String> completion,
                                      final FutureCallback<String> creationCallback,
                                      final Throwable failureCause) {
+    if (closed.get()) {
+      return;
+    }
 
     final String path = getNodePath(discoverable);
     Futures.addCallback(zkClient.exists(path), new FutureCallback<Stat>() {
       @Override
-      public void onSuccess(Stat result) {
+      public void onSuccess(@Nullable Stat result) {
         if (result == null) {
           // If the node is gone, simply retry.
           LOG.info("Node {} is gone. Retry registration for {}.", path, discoverable);
@@ -244,16 +312,21 @@ public class ZKDiscoveryService implements DiscoveryService, DiscoveryServiceCli
   }
 
   private void retryRegister(final Discoverable discoverable, final FutureCallback<String> creationCallback) {
+    if (closed.get()) {
+      return;
+    }
+
     retryExecutor.schedule(new Runnable() {
 
       @Override
       public void run() {
-        Futures.addCallback(doRegister(discoverable), creationCallback, Threads.SAME_THREAD_EXECUTOR);
+        if (!closed.get()) {
+          Futures.addCallback(doRegister(discoverable), creationCallback, Threads.SAME_THREAD_EXECUTOR);
+        }
       }
     }, RETRY_MILLIS, TimeUnit.MILLISECONDS);
   }
 
-
   /**
    * Generate unique node path for a given {@link Discoverable}.
    * @param discoverable An instance of {@link Discoverable}.
@@ -288,6 +361,11 @@ public class ZKDiscoveryService implements DiscoveryService, DiscoveryServiceCli
           lock.lock();
           try {
             for (final Map.Entry<Discoverable, DiscoveryCancellable> entry : discoverables.entries()) {
+              if (closed.get()) {
+                entry.getValue().asyncCancel();
+                continue;
+              }
+
               LOG.info("Re-registering service: {}", entry.getKey());
 
               // Must be non-blocking in here.
@@ -318,22 +396,22 @@ public class ZKDiscoveryService implements DiscoveryService, DiscoveryServiceCli
   /**
    * Creates a CacheLoader for creating live Iterable for watching instances changes for a given service.
    */
-  private CacheLoader<String, ServiceDiscovered> createServiceLoader() {
-    return new CacheLoader<String, ServiceDiscovered>() {
+  private CacheLoader<String, ServiceDiscoveredCacheEntry> createServiceLoader() {
+    return new CacheLoader<String, ServiceDiscoveredCacheEntry>() {
       @Override
-      public ServiceDiscovered load(String service) throws Exception {
+      public ServiceDiscoveredCacheEntry load(String service) throws Exception {
         final DefaultServiceDiscovered serviceDiscovered = new DefaultServiceDiscovered(service);
-        final String serviceBase = "/" + service;
+        final String pathBase = "/" + service;
 
         // Watch for children changes in /service
-        ZKOperations.watchChildren(zkClient, serviceBase, new ZKOperations.ChildrenCallback() {
+        Cancellable cancellable = ZKOperations.watchChildren(zkClient, pathBase, new ZKOperations.ChildrenCallback() {
           @Override
           public void updated(NodeChildren nodeChildren) {
             // Fetch data of all children nodes in parallel.
             List<String> children = nodeChildren.getChildren();
             List<OperationFuture<NodeData>> dataFutures = Lists.newArrayListWithCapacity(children.size());
             for (String child : children) {
-              dataFutures.add(zkClient.getData(serviceBase + "/" + child));
+              dataFutures.add(zkClient.getData(pathBase + "/" + child));
             }
 
             // Update the service map when all fetching are done.
@@ -356,7 +434,7 @@ public class ZKDiscoveryService implements DiscoveryService, DiscoveryServiceCli
             }, Threads.SAME_THREAD_EXECUTOR);
           }
         });
-        return serviceDiscovered;
+        return new ServiceDiscoveredCacheEntry(serviceDiscovered, cancellable);
       }
     };
   }
@@ -394,18 +472,23 @@ public class ZKDiscoveryService implements DiscoveryService, DiscoveryServiceCli
 
     @Override
     public void cancel() {
+      Futures.getUnchecked(asyncCancel());
+      LOG.debug("Service unregistered: {} {}", discoverable, path);
+    }
+
+    ListenableFuture<?> asyncCancel() {
       if (!cancelled.compareAndSet(false, true)) {
-        return;
+        return Futures.immediateFuture(null);
       }
 
       // Take a snapshot of the volatile path.
       String path = this.path;
 
-      // If it is null, meaning cancel() is called before the ephemeral node is created, hence
+      // If it is null, meaning cancel is called before the ephemeral node is created, hence
       // setPath() will be called in future (through zk callback when creation is completed)
       // so that deletion will be done in setPath().
       if (path == null) {
-        return;
+        return Futures.immediateFuture(null);
       }
 
       // Remove this Cancellable from the map so that upon session expiration won't try to register.
@@ -418,9 +501,49 @@ public class ZKDiscoveryService implements DiscoveryService, DiscoveryServiceCli
 
       // Delete the path. It's ok if the path not exists
       // (e.g. what session expired and before node has been re-created)
-      Futures.getUnchecked(ZKOperations.ignoreError(zkClient.delete(path),
-                                                    KeeperException.NoNodeException.class, path));
-      LOG.debug("Service unregistered: {} {}", discoverable, path);
+      try {
+        return ZKOperations.ignoreError(zkClient.delete(path), KeeperException.NoNodeException.class, path);
+      } catch (Exception e) {
+        return Futures.immediateFailedFuture(e);
+      }
+    }
+  }
+
+  /**
+   * Class to be used as the service discovered cache entry.
+   */
+  private static final class ServiceDiscoveredCacheEntry implements Cancellable, ServiceDiscovered {
+    private final ServiceDiscovered serviceDiscovered;
+    private final Cancellable cancellable;
+
+    private ServiceDiscoveredCacheEntry(ServiceDiscovered serviceDiscovered, Cancellable cancellable) {
+      this.serviceDiscovered = serviceDiscovered;
+      this.cancellable = cancellable;
+    }
+
+    @Override
+    public void cancel() {
+      cancellable.cancel();
+    }
+
+    @Override
+    public String getName() {
+      return serviceDiscovered.getName();
+    }
+
+    @Override
+    public Cancellable watchChanges(ChangeListener listener, Executor executor) {
+      return serviceDiscovered.watchChanges(listener, executor);
+    }
+
+    @Override
+    public boolean contains(Discoverable discoverable) {
+      return serviceDiscovered.contains(discoverable);
+    }
+
+    @Override
+    public Iterator<Discoverable> iterator() {
+      return serviceDiscovered.iterator();
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/twill/blob/ced2044f/twill-discovery-core/src/test/java/org/apache/twill/discovery/DiscoveryServiceTestBase.java
----------------------------------------------------------------------
diff --git a/twill-discovery-core/src/test/java/org/apache/twill/discovery/DiscoveryServiceTestBase.java b/twill-discovery-core/src/test/java/org/apache/twill/discovery/DiscoveryServiceTestBase.java
index 8fe5a37..f2d2a83 100644
--- a/twill-discovery-core/src/test/java/org/apache/twill/discovery/DiscoveryServiceTestBase.java
+++ b/twill-discovery-core/src/test/java/org/apache/twill/discovery/DiscoveryServiceTestBase.java
@@ -25,8 +25,11 @@ import org.apache.twill.common.Cancellable;
 import org.apache.twill.common.Threads;
 import org.junit.Assert;
 import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.net.InetSocketAddress;
+import java.nio.charset.StandardCharsets;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -41,225 +44,260 @@ import java.util.concurrent.TimeUnit;
  */
 public abstract class DiscoveryServiceTestBase {
 
+  private static final Logger LOG = LoggerFactory.getLogger(DiscoveryServiceTestBase.class);
+
   protected abstract Map.Entry<DiscoveryService, DiscoveryServiceClient> create();
 
   @Test
   public void simpleDiscoverable() throws Exception {
-    final String payload = "data";
+    final byte[] payload = "data".getBytes(StandardCharsets.UTF_8);
     Map.Entry<DiscoveryService, DiscoveryServiceClient> entry = create();
-    DiscoveryService discoveryService = entry.getKey();
-    DiscoveryServiceClient discoveryServiceClient = entry.getValue();
+    try {
+      DiscoveryService discoveryService = entry.getKey();
+      DiscoveryServiceClient discoveryServiceClient = entry.getValue();
 
-    // Register one service running on one host:port
-    Cancellable cancellable = register(discoveryService, "foo", "localhost", 8090, payload.getBytes());
+      // Register one service running on one host:port
+      Cancellable cancellable = register(discoveryService, "foo", "localhost", 8090, payload);
 
-    // Discover that registered host:port.
-    ServiceDiscovered serviceDiscovered = discoveryServiceClient.discover("foo");
-    Assert.assertTrue(waitTillExpected(1, serviceDiscovered));
+      // Discover that registered host:port.
+      ServiceDiscovered serviceDiscovered = discoveryServiceClient.discover("foo");
+      Assert.assertTrue(waitTillExpected(1, serviceDiscovered));
 
-    Discoverable discoverable = new Discoverable("foo", new InetSocketAddress("localhost", 8090), payload.getBytes());
+      Discoverable discoverable = new Discoverable("foo", new InetSocketAddress("localhost", 8090), payload);
 
-    // Check it exists.
-    Assert.assertTrue(serviceDiscovered.contains(discoverable));
+      // Check it exists.
+      Assert.assertTrue(serviceDiscovered.contains(discoverable));
 
-    // Remove the service
-    cancellable.cancel();
+      // Remove the service
+      cancellable.cancel();
 
-    // There should be no service.
-    Assert.assertTrue(waitTillExpected(0, serviceDiscovered));
+      // There should be no service.
+      Assert.assertTrue(waitTillExpected(0, serviceDiscovered));
 
-    Assert.assertFalse(serviceDiscovered.contains(discoverable));
+      Assert.assertFalse(serviceDiscovered.contains(discoverable));
+    } finally {
+      closeServices(entry);
+    }
   }
 
   @Test
   public void testChangeListener() throws InterruptedException {
     Map.Entry<DiscoveryService, DiscoveryServiceClient> entry = create();
-    DiscoveryService discoveryService = entry.getKey();
-    DiscoveryServiceClient discoveryServiceClient = entry.getValue();
-
-    // Start discovery
-    String serviceName = "listener_test";
-    ServiceDiscovered serviceDiscovered = discoveryServiceClient.discover(serviceName);
+    try {
+      DiscoveryService discoveryService = entry.getKey();
+      DiscoveryServiceClient discoveryServiceClient = entry.getValue();
+
+      // Start discovery
+      final String serviceName = "listener_test";
+      ServiceDiscovered serviceDiscovered = discoveryServiceClient.discover(serviceName);
+
+      // Watch for changes.
+      final BlockingQueue<List<Discoverable>> events = new ArrayBlockingQueue<List<Discoverable>>(10);
+      serviceDiscovered.watchChanges(new ServiceDiscovered.ChangeListener() {
+        @Override
+        public void onChange(ServiceDiscovered serviceDiscovered) {
+          List<Discoverable> discoverables = ImmutableList.copyOf(serviceDiscovered);
+          LOG.info("ServiceDiscovered callback: {}={}", serviceName, discoverables);
+          events.add(discoverables);
+        }
+      }, Threads.SAME_THREAD_EXECUTOR);
 
-    // Watch for changes.
-    final BlockingQueue<List<Discoverable>> events = new ArrayBlockingQueue<List<Discoverable>>(10);
-    serviceDiscovered.watchChanges(new ServiceDiscovered.ChangeListener() {
-      @Override
-      public void onChange(ServiceDiscovered serviceDiscovered) {
-        events.add(ImmutableList.copyOf(serviceDiscovered));
-      }
-    }, Threads.SAME_THREAD_EXECUTOR);
+      // An empty list will be received first, as no endpoint has been registered.
+      List<Discoverable> discoverables = events.poll(20, TimeUnit.SECONDS);
+      Assert.assertNotNull(discoverables);
+      Assert.assertTrue(discoverables.isEmpty());
 
-    // An empty list will be received first, as no endpoint has been registered.
-    List<Discoverable> discoverables = events.poll(20, TimeUnit.SECONDS);
-    Assert.assertNotNull(discoverables);
-    Assert.assertTrue(discoverables.isEmpty());
+      // Register a service
+      Cancellable cancellable = register(discoveryService, serviceName, "localhost", 10000);
 
-    // Register a service
-    Cancellable cancellable = register(discoveryService, serviceName, "localhost", 10000);
+      discoverables = events.poll(20, TimeUnit.SECONDS);
+      Assert.assertNotNull(discoverables);
 
-    discoverables = events.poll(20, TimeUnit.SECONDS);
-    Assert.assertNotNull(discoverables);
-    Assert.assertEquals(1, discoverables.size());
+      // It's possible that the list retrieved is empty in ZK case due to the creation of the parent node
+      // when creating the service discovery ZK node
+      if (discoverables.isEmpty()) {
+        // If that's the case, poll it again and it should be non-empty
+        discoverables = events.poll(20, TimeUnit.SECONDS);
+      }
+      Assert.assertEquals(1, discoverables.size());
 
-    // Register another service endpoint
-    Cancellable cancellable2 = register(discoveryService, serviceName, "localhost", 10001);
+      // Register another service endpoint
+      Cancellable cancellable2 = register(discoveryService, serviceName, "localhost", 10001);
 
-    discoverables = events.poll(20, TimeUnit.SECONDS);
-    Assert.assertNotNull(discoverables);
-    Assert.assertEquals(2, discoverables.size());
+      discoverables = events.poll(20, TimeUnit.SECONDS);
+      Assert.assertNotNull(discoverables);
+      Assert.assertEquals(2, discoverables.size());
 
-    // Cancel both of them
-    cancellable.cancel();
-    cancellable2.cancel();
+      // Cancel both of them
+      cancellable.cancel();
+      cancellable2.cancel();
 
-    // There could be more than one event triggered, but the last event should be an empty list.
-    discoverables = events.poll(20, TimeUnit.SECONDS);
-    Assert.assertNotNull(discoverables);
-    if (!discoverables.isEmpty()) {
+      // There could be more than one event triggered, but the last event should be an empty list.
       discoverables = events.poll(20, TimeUnit.SECONDS);
-    }
+      Assert.assertNotNull(discoverables);
+      if (!discoverables.isEmpty()) {
+        discoverables = events.poll(20, TimeUnit.SECONDS);
+      }
 
-    Assert.assertTrue(discoverables.isEmpty());
+      Assert.assertTrue(discoverables.isEmpty());
+    } finally {
+      closeServices(entry);
+    }
   }
 
   @Test
   public void testCancelChangeListener() throws InterruptedException {
     Map.Entry<DiscoveryService, DiscoveryServiceClient> entry = create();
-    DiscoveryService discoveryService = entry.getKey();
-    DiscoveryServiceClient discoveryServiceClient = entry.getValue();
-
-    String serviceName = "cancel_listener";
-    ServiceDiscovered serviceDiscovered = discoveryServiceClient.discover(serviceName);
-
-    // An executor that delay execute a Runnable. It's for testing race because listener cancel and discovery changes.
-    Executor delayExecutor = new Executor() {
-      @Override
-      public void execute(final Runnable command) {
-        Thread t = new Thread() {
-          @Override
-          public void run() {
-            try {
-              TimeUnit.SECONDS.sleep(2);
-              command.run();
-            } catch (InterruptedException e) {
-              throw Throwables.propagate(e);
+    try {
+      DiscoveryService discoveryService = entry.getKey();
+      DiscoveryServiceClient discoveryServiceClient = entry.getValue();
+
+      String serviceName = "cancel_listener";
+      ServiceDiscovered serviceDiscovered = discoveryServiceClient.discover(serviceName);
+
+      // An executor that delay execute a Runnable. It's for testing race because listener cancel and discovery changes.
+      Executor delayExecutor = new Executor() {
+        @Override
+        public void execute(final Runnable command) {
+          Thread t = new Thread() {
+            @Override
+            public void run() {
+              try {
+                TimeUnit.SECONDS.sleep(2);
+                command.run();
+              } catch (InterruptedException e) {
+                throw Throwables.propagate(e);
+              }
             }
-          }
-        };
-        t.start();
-      }
-    };
+          };
+          t.start();
+        }
+      };
 
-    final BlockingQueue<List<Discoverable>> events = new ArrayBlockingQueue<List<Discoverable>>(10);
-    Cancellable cancelWatch = serviceDiscovered.watchChanges(new ServiceDiscovered.ChangeListener() {
-      @Override
-      public void onChange(ServiceDiscovered serviceDiscovered) {
-        events.add(ImmutableList.copyOf(serviceDiscovered));
-      }
-    }, delayExecutor);
+      final BlockingQueue<List<Discoverable>> events = new ArrayBlockingQueue<List<Discoverable>>(10);
+      Cancellable cancelWatch = serviceDiscovered.watchChanges(new ServiceDiscovered.ChangeListener() {
+        @Override
+        public void onChange(ServiceDiscovered serviceDiscovered) {
+          events.add(ImmutableList.copyOf(serviceDiscovered));
+        }
+      }, delayExecutor);
 
-    // Wait for the init event call
-    Assert.assertNotNull(events.poll(3, TimeUnit.SECONDS));
+      // Wait for the init event call
+      Assert.assertNotNull(events.poll(3, TimeUnit.SECONDS));
 
-    // Register a new service endpoint, wait a short while and then cancel the listener
-    register(discoveryService, serviceName, "localhost", 1);
-    TimeUnit.SECONDS.sleep(1);
-    cancelWatch.cancel();
+      // Register a new service endpoint, wait a short while and then cancel the listener
+      register(discoveryService, serviceName, "localhost", 1);
+      TimeUnit.SECONDS.sleep(1);
+      cancelWatch.cancel();
 
-    // The change listener shouldn't get any event, since the invocation is delayed by the executor.
-    Assert.assertNull(events.poll(3, TimeUnit.SECONDS));
+      // The change listener shouldn't get any event, since the invocation is delayed by the executor.
+      Assert.assertNull(events.poll(3, TimeUnit.SECONDS));
+    } finally {
+      closeServices(entry);
+    }
   }
 
   @Test
   public void manySameDiscoverable() throws Exception {
     Map.Entry<DiscoveryService, DiscoveryServiceClient> entry = create();
-    DiscoveryService discoveryService = entry.getKey();
-    DiscoveryServiceClient discoveryServiceClient = entry.getValue();
+    try {
+      DiscoveryService discoveryService = entry.getKey();
+      DiscoveryServiceClient discoveryServiceClient = entry.getValue();
 
-    List<Cancellable> cancellables = Lists.newArrayList();
+      List<Cancellable> cancellables = Lists.newArrayList();
 
-    cancellables.add(register(discoveryService, "manyDiscoverable", "localhost", 1));
-    cancellables.add(register(discoveryService, "manyDiscoverable", "localhost", 2));
-    cancellables.add(register(discoveryService, "manyDiscoverable", "localhost", 3));
-    cancellables.add(register(discoveryService, "manyDiscoverable", "localhost", 4));
-    cancellables.add(register(discoveryService, "manyDiscoverable", "localhost", 5));
+      cancellables.add(register(discoveryService, "manyDiscoverable", "localhost", 1));
+      cancellables.add(register(discoveryService, "manyDiscoverable", "localhost", 2));
+      cancellables.add(register(discoveryService, "manyDiscoverable", "localhost", 3));
+      cancellables.add(register(discoveryService, "manyDiscoverable", "localhost", 4));
+      cancellables.add(register(discoveryService, "manyDiscoverable", "localhost", 5));
 
-    ServiceDiscovered serviceDiscovered = discoveryServiceClient.discover("manyDiscoverable");
-    Assert.assertTrue(waitTillExpected(5, serviceDiscovered));
+      ServiceDiscovered serviceDiscovered = discoveryServiceClient.discover("manyDiscoverable");
+      Assert.assertTrue(waitTillExpected(5, serviceDiscovered));
 
-    for (int i = 0; i < 5; i++) {
-      cancellables.get(i).cancel();
-      Assert.assertTrue(waitTillExpected(4 - i, serviceDiscovered));
+      for (int i = 0; i < 5; i++) {
+        cancellables.get(i).cancel();
+        Assert.assertTrue(waitTillExpected(4 - i, serviceDiscovered));
+      }
+    } finally {
+      closeServices(entry);
     }
   }
 
   @Test
   public void multiServiceDiscoverable() throws Exception {
     Map.Entry<DiscoveryService, DiscoveryServiceClient> entry = create();
-    DiscoveryService discoveryService = entry.getKey();
-    DiscoveryServiceClient discoveryServiceClient = entry.getValue();
+    try {
+      DiscoveryService discoveryService = entry.getKey();
+      DiscoveryServiceClient discoveryServiceClient = entry.getValue();
 
-    List<Cancellable> cancellables = Lists.newArrayList();
+      List<Cancellable> cancellables = Lists.newArrayList();
 
-    cancellables.add(register(discoveryService, "service1", "localhost", 1));
-    cancellables.add(register(discoveryService, "service1", "localhost", 2));
-    cancellables.add(register(discoveryService, "service1", "localhost", 3));
-    cancellables.add(register(discoveryService, "service1", "localhost", 4));
-    cancellables.add(register(discoveryService, "service1", "localhost", 5));
+      cancellables.add(register(discoveryService, "service1", "localhost", 1));
+      cancellables.add(register(discoveryService, "service1", "localhost", 2));
+      cancellables.add(register(discoveryService, "service1", "localhost", 3));
+      cancellables.add(register(discoveryService, "service1", "localhost", 4));
+      cancellables.add(register(discoveryService, "service1", "localhost", 5));
 
-    cancellables.add(register(discoveryService, "service2", "localhost", 1));
-    cancellables.add(register(discoveryService, "service2", "localhost", 2));
-    cancellables.add(register(discoveryService, "service2", "localhost", 3));
+      cancellables.add(register(discoveryService, "service2", "localhost", 1));
+      cancellables.add(register(discoveryService, "service2", "localhost", 2));
+      cancellables.add(register(discoveryService, "service2", "localhost", 3));
 
-    cancellables.add(register(discoveryService, "service3", "localhost", 1));
-    cancellables.add(register(discoveryService, "service3", "localhost", 2));
+      cancellables.add(register(discoveryService, "service3", "localhost", 1));
+      cancellables.add(register(discoveryService, "service3", "localhost", 2));
 
-    ServiceDiscovered serviceDiscovered = discoveryServiceClient.discover("service1");
-    Assert.assertTrue(waitTillExpected(5, serviceDiscovered));
+      ServiceDiscovered serviceDiscovered = discoveryServiceClient.discover("service1");
+      Assert.assertTrue(waitTillExpected(5, serviceDiscovered));
 
-    serviceDiscovered = discoveryServiceClient.discover("service2");
-    Assert.assertTrue(waitTillExpected(3, serviceDiscovered));
+      serviceDiscovered = discoveryServiceClient.discover("service2");
+      Assert.assertTrue(waitTillExpected(3, serviceDiscovered));
 
-    serviceDiscovered = discoveryServiceClient.discover("service3");
-    Assert.assertTrue(waitTillExpected(2, serviceDiscovered));
+      serviceDiscovered = discoveryServiceClient.discover("service3");
+      Assert.assertTrue(waitTillExpected(2, serviceDiscovered));
 
-    cancellables.add(register(discoveryService, "service3", "localhost", 3));
-    Assert.assertTrue(waitTillExpected(3, serviceDiscovered)); // Shows live iterator.
+      cancellables.add(register(discoveryService, "service3", "localhost", 3));
+      Assert.assertTrue(waitTillExpected(3, serviceDiscovered)); // Shows live iterator.
 
-    for (Cancellable cancellable : cancellables) {
-      cancellable.cancel();
-    }
+      for (Cancellable cancellable : cancellables) {
+        cancellable.cancel();
+      }
 
-    Assert.assertTrue(waitTillExpected(0, discoveryServiceClient.discover("service1")));
-    Assert.assertTrue(waitTillExpected(0, discoveryServiceClient.discover("service2")));
-    Assert.assertTrue(waitTillExpected(0, discoveryServiceClient.discover("service3")));
+      Assert.assertTrue(waitTillExpected(0, discoveryServiceClient.discover("service1")));
+      Assert.assertTrue(waitTillExpected(0, discoveryServiceClient.discover("service2")));
+      Assert.assertTrue(waitTillExpected(0, discoveryServiceClient.discover("service3")));
+    } finally {
+      closeServices(entry);
+    }
   }
 
   @Test
   public void testIterator() throws InterruptedException {
     // This test is to verify TWILL-75
     Map.Entry<DiscoveryService, DiscoveryServiceClient> entry = create();
-    final DiscoveryService service = entry.getKey();
-    DiscoveryServiceClient client = entry.getValue();
+    try {
+      final DiscoveryService service = entry.getKey();
+      DiscoveryServiceClient client = entry.getValue();
 
-    final String serviceName = "iterator";
-    ServiceDiscovered discovered = client.discover(serviceName);
+      final String serviceName = "iterator";
+      ServiceDiscovered discovered = client.discover(serviceName);
 
-    // Create a thread for performing registration.
-    Thread t = new Thread() {
-      @Override
-      public void run() {
-        service.register(new Discoverable(serviceName, new InetSocketAddress(12345), new byte[]{}));
-      }
-    };
+      // Create a thread for performing registration.
+      Thread t = new Thread() {
+        @Override
+        public void run() {
+          service.register(new Discoverable(serviceName, new InetSocketAddress(12345), new byte[]{}));
+        }
+      };
 
-    Iterator<Discoverable> iterator = discovered.iterator();
-    t.start();
-    t.join();
+      Iterator<Discoverable> iterator = discovered.iterator();
+      t.start();
+      t.join();
 
-    // This would throw exception if there is race condition.
-    Assert.assertFalse(iterator.hasNext());
+      // This would throw exception if there is race condition.
+      Assert.assertFalse(iterator.hasNext());
+    } finally {
+      closeServices(entry);
+    }
   }
 
   protected Cancellable register(DiscoveryService service, final String name, final String host, final int port) {
@@ -288,4 +326,21 @@ public abstract class DiscoveryServiceTestBase {
       throw Throwables.propagate(e);
     }
   }
+
+  protected final void closeServices(Map.Entry<DiscoveryService, DiscoveryServiceClient> entry) {
+    if (entry.getKey() instanceof AutoCloseable) {
+      try {
+        ((AutoCloseable) entry.getKey()).close();
+      } catch (Exception e) {
+        LOG.warn("Failed to close DiscoveryService {}", entry.getKey(), e);
+      }
+    }
+    if (entry.getValue() instanceof AutoCloseable) {
+      try {
+        ((AutoCloseable) entry.getValue()).close();
+      } catch (Exception e) {
+        LOG.warn("Failed to close DiscoveryServiceClient {}", entry.getValue(), e);
+      }
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/twill/blob/ced2044f/twill-discovery-core/src/test/java/org/apache/twill/discovery/ZKDiscoveryServiceTest.java
----------------------------------------------------------------------
diff --git a/twill-discovery-core/src/test/java/org/apache/twill/discovery/ZKDiscoveryServiceTest.java b/twill-discovery-core/src/test/java/org/apache/twill/discovery/ZKDiscoveryServiceTest.java
index 7d6e369..dcf3935 100644
--- a/twill-discovery-core/src/test/java/org/apache/twill/discovery/ZKDiscoveryServiceTest.java
+++ b/twill-discovery-core/src/test/java/org/apache/twill/discovery/ZKDiscoveryServiceTest.java
@@ -66,80 +66,86 @@ public class ZKDiscoveryServiceTest extends DiscoveryServiceTestBase {
   @Test (timeout = 30000)
   public void testDoubleRegister() throws Exception {
     Map.Entry<DiscoveryService, DiscoveryServiceClient> entry = create();
-    DiscoveryService discoveryService = entry.getKey();
-    DiscoveryServiceClient discoveryServiceClient = entry.getValue();
-
-    // Register on the same host port, it shouldn't fail.
-    Cancellable cancellable = register(discoveryService, "test_double_reg", "localhost", 54321);
-    Cancellable cancellable2 = register(discoveryService, "test_double_reg", "localhost", 54321);
-
-    ServiceDiscovered discoverables = discoveryServiceClient.discover("test_double_reg");
-
-    Assert.assertTrue(waitTillExpected(1, discoverables));
+    try {
+      DiscoveryService discoveryService = entry.getKey();
+      DiscoveryServiceClient discoveryServiceClient = entry.getValue();
 
-    cancellable.cancel();
-    cancellable2.cancel();
+      // Register on the same host port, it shouldn't fail.
+      Cancellable cancellable = register(discoveryService, "test_double_reg", "localhost", 54321);
+      Cancellable cancellable2 = register(discoveryService, "test_double_reg", "localhost", 54321);
 
-    // Register again with two different clients, but killing session of the first one.
-    final ZKClientService zkClient2 = ZKClientServices.delegate(
-      ZKClients.retryOnFailure(
-        ZKClients.reWatchOnExpire(
-          ZKClientService.Builder.of(zkServer.getConnectionStr()).build()),
-        RetryStrategies.fixDelay(1, TimeUnit.SECONDS)));
-    zkClient2.startAndWait();
+      ServiceDiscovered discoverables = discoveryServiceClient.discover("test_double_reg");
 
-    try {
-      DiscoveryService discoveryService2 = new ZKDiscoveryService(zkClient2);
-      cancellable2 = register(discoveryService2, "test_multi_client", "localhost", 54321);
-
-      // Schedule a thread to shutdown zkClient2.
-      new Thread() {
-        @Override
-        public void run() {
-          try {
-            TimeUnit.SECONDS.sleep(2);
-            zkClient2.stopAndWait();
-          } catch (InterruptedException e) {
-            LOG.error(e.getMessage(), e);
-          }
-        }
-      }.start();
+      Assert.assertTrue(waitTillExpected(1, discoverables));
 
-      // This call would block until zkClient2 is shutdown.
-      cancellable = register(discoveryService, "test_multi_client", "localhost", 54321);
       cancellable.cancel();
-
+      cancellable2.cancel();
+
+      // Register again with two different clients, but killing session of the first one.
+      final ZKClientService zkClient2 = ZKClientServices.delegate(
+        ZKClients.retryOnFailure(
+          ZKClients.reWatchOnExpire(
+            ZKClientService.Builder.of(zkServer.getConnectionStr()).build()),
+          RetryStrategies.fixDelay(1, TimeUnit.SECONDS)));
+      zkClient2.startAndWait();
+
+      try (ZKDiscoveryService discoveryService2 = new ZKDiscoveryService(zkClient2)) {
+        cancellable2 = register(discoveryService2, "test_multi_client", "localhost", 54321);
+
+        // Schedule a thread to shutdown zkClient2.
+        new Thread() {
+          @Override
+          public void run() {
+            try {
+              TimeUnit.SECONDS.sleep(2);
+              zkClient2.stopAndWait();
+            } catch (InterruptedException e) {
+              LOG.error(e.getMessage(), e);
+            }
+          }
+        }.start();
+
+        // This call would block until zkClient2 is shutdown.
+        cancellable = register(discoveryService, "test_multi_client", "localhost", 54321);
+        cancellable.cancel();
+      } finally {
+        zkClient2.stopAndWait();
+      }
     } finally {
-      zkClient2.stopAndWait();
+      closeServices(entry);
     }
   }
 
   @Test
   public void testSessionExpires() throws Exception {
     Map.Entry<DiscoveryService, DiscoveryServiceClient> entry = create();
-    DiscoveryService discoveryService = entry.getKey();
-    DiscoveryServiceClient discoveryServiceClient = entry.getValue();
+    try {
+      DiscoveryService discoveryService = entry.getKey();
+      DiscoveryServiceClient discoveryServiceClient = entry.getValue();
 
-    Cancellable cancellable = register(discoveryService, "test_expires", "localhost", 54321);
+      Cancellable cancellable = register(discoveryService, "test_expires", "localhost", 54321);
 
-    ServiceDiscovered discoverables = discoveryServiceClient.discover("test_expires");
+      ServiceDiscovered discoverables = discoveryServiceClient.discover("test_expires");
 
-    // Discover that registered host:port.
-    Assert.assertTrue(waitTillExpected(1, discoverables));
+      // Discover that registered host:port.
+      Assert.assertTrue(waitTillExpected(1, discoverables));
 
-    KillZKSession.kill(zkClient.getZooKeeperSupplier().get(), zkServer.getConnectionStr(), 10000);
+      KillZKSession.kill(zkClient.getZooKeeperSupplier().get(), zkServer.getConnectionStr(), 10000);
 
-    // Register one more endpoint to make sure state has been reflected after reconnection
-    Cancellable cancellable2 = register(discoveryService, "test_expires", "localhost", 54322);
+      // Register one more endpoint to make sure state has been reflected after reconnection
+      Cancellable cancellable2 = register(discoveryService, "test_expires", "localhost", 54322);
 
-    // Reconnection would trigger re-registration.
-    Assert.assertTrue(waitTillExpected(2, discoverables));
+      // Reconnection would trigger re-registration.
+      Assert.assertTrue(waitTillExpected(2, discoverables));
 
-    cancellable.cancel();
-    cancellable2.cancel();
+      cancellable.cancel();
+      cancellable2.cancel();
 
-    // Verify that both are now gone.
-    Assert.assertTrue(waitTillExpected(0, discoverables));
+      // Verify that both are now gone.
+      Assert.assertTrue(waitTillExpected(0, discoverables));
+    } finally {
+      closeServices(entry);
+    }
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/twill/blob/ced2044f/twill-yarn/src/main/java/org/apache/twill/internal/container/TwillContainerMain.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/container/TwillContainerMain.java b/twill-yarn/src/main/java/org/apache/twill/internal/container/TwillContainerMain.java
index 3ea786a..efccd89 100644
--- a/twill-yarn/src/main/java/org/apache/twill/internal/container/TwillContainerMain.java
+++ b/twill-yarn/src/main/java/org/apache/twill/internal/container/TwillContainerMain.java
@@ -20,6 +20,7 @@ package org.apache.twill.internal.container;
 import com.google.common.base.Charsets;
 import com.google.common.base.Strings;
 import com.google.common.io.Files;
+import com.google.common.util.concurrent.AbstractIdleService;
 import com.google.common.util.concurrent.AbstractService;
 import com.google.common.util.concurrent.Service;
 import org.apache.hadoop.conf.Configuration;
@@ -102,9 +103,11 @@ public final class TwillContainerMain extends ServiceMain {
                                                 createAppLocation(conf));
     new TwillContainerMain().doMain(
       service,
-      new LogFlushService(),
       zkClientService,
-      new TwillZKPathService(containerZKClient, runId));
+      new LogFlushService(),
+      new TwillZKPathService(containerZKClient, runId),
+      new CloseableServiceWrapper(discoveryService)
+    );
   }
 
   @Override
@@ -196,4 +199,27 @@ public final class TwillContainerMain extends ServiceMain {
       notifyStopped();
     }
   }
+
+  /**
+   * A wrapper to adapt a {@link AutoCloseable} to a {@link ServiceMain}. The service has no-op during start up
+   * and will call the {@link AutoCloseable#close()} on shutdown.
+   */
+  private static final class CloseableServiceWrapper extends AbstractIdleService {
+
+    private final AutoCloseable closeable;
+
+    private CloseableServiceWrapper(AutoCloseable closeable) {
+      this.closeable = closeable;
+    }
+
+    @Override
+    protected void startUp() throws Exception {
+      // no-op
+    }
+
+    @Override
+    protected void shutDown() throws Exception {
+      closeable.close();
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/twill/blob/ced2044f/twill-yarn/src/test/java/org/apache/twill/yarn/BaseYarnTest.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/test/java/org/apache/twill/yarn/BaseYarnTest.java b/twill-yarn/src/test/java/org/apache/twill/yarn/BaseYarnTest.java
index 2cac2f3..4f95391 100644
--- a/twill-yarn/src/test/java/org/apache/twill/yarn/BaseYarnTest.java
+++ b/twill-yarn/src/test/java/org/apache/twill/yarn/BaseYarnTest.java
@@ -24,8 +24,11 @@ import org.apache.hadoop.yarn.api.records.NodeReport;
 import org.apache.twill.api.TwillController;
 import org.apache.twill.api.TwillRunner;
 import org.junit.After;
+import org.junit.Before;
 import org.junit.ClassRule;
+import org.junit.Rule;
 import org.junit.rules.TemporaryFolder;
+import org.junit.rules.TestName;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -66,6 +69,18 @@ public abstract class BaseYarnTest {
     }
   };
 
+  @Rule
+  public final TestName testName = new TestName();
+
+  @Before
+  public void beforeTest() {
+    LOG.info("Before test {}", testName.getMethodName());
+  }
+
+  @After
+  public void afterTest() {
+    LOG.info("After test {}", testName.getMethodName());
+  }
 
   @After
   public final void cleanupTest() {

http://git-wip-us.apache.org/repos/asf/twill/blob/ced2044f/twill-yarn/src/test/java/org/apache/twill/yarn/EchoServerTestRun.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/test/java/org/apache/twill/yarn/EchoServerTestRun.java b/twill-yarn/src/test/java/org/apache/twill/yarn/EchoServerTestRun.java
index 211b7ba..278a436 100644
--- a/twill-yarn/src/test/java/org/apache/twill/yarn/EchoServerTestRun.java
+++ b/twill-yarn/src/test/java/org/apache/twill/yarn/EchoServerTestRun.java
@@ -197,7 +197,7 @@ public final class EchoServerTestRun extends BaseYarnTest {
       }
 
       // There should be two instances up and running.
-      echoServices = controller.discoverService("echo");
+      echoServices = controllers.get(1).discoverService("echo");
       Assert.assertTrue(waitForSize(echoServices, 2, 120));
 
       // Stop one instance of the app
@@ -207,7 +207,7 @@ public final class EchoServerTestRun extends BaseYarnTest {
       Assert.assertNotNull(zkClient.exists("/EchoServer").get());
 
       // We should still be able to do discovery, which depends on the ZK node.
-      echoServices = controller.discoverService("echo");
+      echoServices = controllers.get(1).discoverService("echo");
       Assert.assertTrue(waitForSize(echoServices, 1, 120));
 
       // Stop second instance of the app

http://git-wip-us.apache.org/repos/asf/twill/blob/ced2044f/twill-yarn/src/test/java/org/apache/twill/yarn/InitializeFailTestRun.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/test/java/org/apache/twill/yarn/InitializeFailTestRun.java b/twill-yarn/src/test/java/org/apache/twill/yarn/InitializeFailTestRun.java
index 64f5a69..c446879 100644
--- a/twill-yarn/src/test/java/org/apache/twill/yarn/InitializeFailTestRun.java
+++ b/twill-yarn/src/test/java/org/apache/twill/yarn/InitializeFailTestRun.java
@@ -23,7 +23,6 @@ import org.apache.twill.api.TwillController;
 import org.apache.twill.api.TwillRunner;
 import org.apache.twill.api.logging.LogEntry;
 import org.apache.twill.api.logging.LogHandler;
-import org.apache.twill.api.logging.LogThrowable;
 import org.apache.twill.api.logging.PrinterLogHandler;
 import org.junit.Assert;
 import org.junit.Test;
@@ -48,15 +47,8 @@ public class InitializeFailTestRun extends BaseYarnTest {
     LogHandler logVerifyHandler = new LogHandler() {
       @Override
       public void onLog(LogEntry logEntry) {
-        LogThrowable logThrowable = logEntry.getThrowable();
-        if (logThrowable != null) {
-          while (logThrowable.getCause() != null) {
-            logThrowable = logThrowable.getCause();
-          }
-          if (IllegalStateException.class.getName().equals(logThrowable.getClassName())
-            && logThrowable.getMessage().contains("Fail to init")) {
-            logLatch.countDown();
-          }
+        if (logEntry.getMessage().endsWith("exited abnormally with state COMPLETE, exit code 10.")) {
+          logLatch.countDown();
         }
       }
     };

http://git-wip-us.apache.org/repos/asf/twill/blob/ced2044f/twill-yarn/src/test/java/org/apache/twill/yarn/YarnTestSuite.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/test/java/org/apache/twill/yarn/YarnTestSuite.java b/twill-yarn/src/test/java/org/apache/twill/yarn/YarnTestSuite.java
index f1e1ac9..6dd281d 100644
--- a/twill-yarn/src/test/java/org/apache/twill/yarn/YarnTestSuite.java
+++ b/twill-yarn/src/test/java/org/apache/twill/yarn/YarnTestSuite.java
@@ -44,5 +44,4 @@ import org.junit.runners.Suite;
   RestartRunnableTestRun.class
 })
 public final class YarnTestSuite extends BaseYarnTest {
-
 }

http://git-wip-us.apache.org/repos/asf/twill/blob/ced2044f/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/LeaderElection.java
----------------------------------------------------------------------
diff --git a/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/LeaderElection.java b/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/LeaderElection.java
index 8433b18..d8bb49d 100644
--- a/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/LeaderElection.java
+++ b/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/LeaderElection.java
@@ -25,6 +25,7 @@ import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.SettableFuture;
 import org.apache.twill.api.ElectionHandler;
+import org.apache.twill.common.Cancellable;
 import org.apache.twill.common.Threads;
 import org.apache.twill.zookeeper.NodeChildren;
 import org.apache.twill.zookeeper.NodeData;
@@ -70,6 +71,7 @@ public final class LeaderElection extends AbstractService {
   private ExecutorService executor;
   private String zkNodePath;
   private State state;
+  private Cancellable watcherCancellable;
 
   public LeaderElection(ZKClient zkClient, String prefix, ElectionHandler handler) {
     this.guid = UUID.randomUUID().toString();
@@ -89,7 +91,7 @@ public final class LeaderElection extends AbstractService {
       @Override
       public void run() {
         register();
-        LeaderElection.this.zkClient.addConnectionWatcher(wrapWatcher(new ConnectionWatcher()));
+        watcherCancellable = zkClient.addConnectionWatcher(wrapWatcher(new ConnectionWatcher()));
       }
     });
     notifyStarted();
@@ -121,6 +123,9 @@ public final class LeaderElection extends AbstractService {
     executor.execute(new Runnable() {
       @Override
       public void run() {
+        if (watcherCancellable != null) {
+          watcherCancellable.cancel();
+        }
         if (state != State.CANCELLED) {
           // becomeFollower has to be called before deleting node to make sure no two active leader.
           if (state == State.LEADER) {