You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@twill.apache.org by ch...@apache.org on 2016/10/04 16:25:38 UTC
[11/16] twill git commit: (TWILL-177) Make ZKDiscoveryService
AutoCloseable
(TWILL-177) Make ZKDiscoveryService AutoCloseable
- Release ZK watches when closed
- Fixed a race condition in ZKDiscoveryServiceTest
- Fix the easy to fail InitializeFailTestRun
- Match with an error message emitted from AM instead of from the container.
- It\u2019s possible that the one from the container don\u2019t have chance to
send out before it gets killed.
This closes #8 on Github.
Signed-off-by: Terence Yim <ch...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/twill/repo
Commit: http://git-wip-us.apache.org/repos/asf/twill/commit/ced2044f
Tree: http://git-wip-us.apache.org/repos/asf/twill/tree/ced2044f
Diff: http://git-wip-us.apache.org/repos/asf/twill/diff/ced2044f
Branch: refs/heads/site
Commit: ced2044f185db59192e38411cb9e28f597a5ea04
Parents: fd9d726
Author: Terence Yim <ch...@apache.org>
Authored: Thu Aug 25 18:41:15 2016 -0700
Committer: Terence Yim <ch...@apache.org>
Committed: Tue Sep 13 14:18:49 2016 -0700
----------------------------------------------------------------------
.../twill/internal/AbstractTwillController.java | 17 +-
.../twill/internal/AbstractTwillService.java | 8 +-
.../discovery/DefaultServiceDiscovered.java | 9 +-
.../twill/discovery/ZKDiscoveryService.java | 165 +++++++--
.../discovery/DiscoveryServiceTestBase.java | 349 +++++++++++--------
.../twill/discovery/ZKDiscoveryServiceTest.java | 114 +++---
.../internal/container/TwillContainerMain.java | 30 +-
.../org/apache/twill/yarn/BaseYarnTest.java | 15 +
.../apache/twill/yarn/EchoServerTestRun.java | 4 +-
.../twill/yarn/InitializeFailTestRun.java | 12 +-
.../org/apache/twill/yarn/YarnTestSuite.java | 1 -
.../internal/zookeeper/LeaderElection.java | 7 +-
12 files changed, 484 insertions(+), 247 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/twill/blob/ced2044f/twill-core/src/main/java/org/apache/twill/internal/AbstractTwillController.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/AbstractTwillController.java b/twill-core/src/main/java/org/apache/twill/internal/AbstractTwillController.java
index 41a044b..e60f6a3 100644
--- a/twill-core/src/main/java/org/apache/twill/internal/AbstractTwillController.java
+++ b/twill-core/src/main/java/org/apache/twill/internal/AbstractTwillController.java
@@ -28,7 +28,6 @@ import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
-
import org.apache.twill.api.Command;
import org.apache.twill.api.ResourceReport;
import org.apache.twill.api.RunId;
@@ -38,7 +37,6 @@ import org.apache.twill.api.logging.LogEntry;
import org.apache.twill.api.logging.LogHandler;
import org.apache.twill.api.logging.LogThrowable;
import org.apache.twill.common.Cancellable;
-import org.apache.twill.discovery.DiscoveryServiceClient;
import org.apache.twill.discovery.ServiceDiscovered;
import org.apache.twill.discovery.ZKDiscoveryService;
import org.apache.twill.internal.json.LogEntryDecoder;
@@ -73,14 +71,13 @@ public abstract class AbstractTwillController extends AbstractZKServiceControlle
private final Queue<LogHandler> logHandlers;
private final KafkaClientService kafkaClient;
- private final DiscoveryServiceClient discoveryServiceClient;
- private volatile Cancellable logCancellable;
+ private ZKDiscoveryService discoveryServiceClient;
+ private Cancellable logCancellable;
public AbstractTwillController(RunId runId, ZKClient zkClient, Iterable<LogHandler> logHandlers) {
super(runId, zkClient);
this.logHandlers = new ConcurrentLinkedQueue<>();
this.kafkaClient = new ZKKafkaClientService(ZKClients.namespace(zkClient, "/" + runId.getId() + "/kafka"));
- this.discoveryServiceClient = new ZKDiscoveryService(zkClient);
Iterables.addAll(this.logHandlers, logHandlers);
}
@@ -95,10 +92,13 @@ public abstract class AbstractTwillController extends AbstractZKServiceControlle
}
@Override
- protected void doShutDown() {
+ protected synchronized void doShutDown() {
if (logCancellable != null) {
logCancellable.cancel();
}
+ if (discoveryServiceClient != null) {
+ discoveryServiceClient.close();
+ }
// Safe to call stop no matter when state the KafkaClientService is in.
kafkaClient.stopAndWait();
}
@@ -115,7 +115,10 @@ public abstract class AbstractTwillController extends AbstractZKServiceControlle
}
@Override
- public final ServiceDiscovered discoverService(String serviceName) {
+ public final synchronized ServiceDiscovered discoverService(String serviceName) {
+ if (discoveryServiceClient == null) {
+ discoveryServiceClient = new ZKDiscoveryService(zkClient);
+ }
return discoveryServiceClient.discover(serviceName);
}
http://git-wip-us.apache.org/repos/asf/twill/blob/ced2044f/twill-core/src/main/java/org/apache/twill/internal/AbstractTwillService.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/AbstractTwillService.java b/twill-core/src/main/java/org/apache/twill/internal/AbstractTwillService.java
index 8688d0b..1717117 100644
--- a/twill-core/src/main/java/org/apache/twill/internal/AbstractTwillService.java
+++ b/twill-core/src/main/java/org/apache/twill/internal/AbstractTwillService.java
@@ -27,6 +27,7 @@ import com.google.common.util.concurrent.Service;
import com.google.gson.Gson;
import com.google.gson.JsonObject;
import org.apache.twill.api.RunId;
+import org.apache.twill.common.Cancellable;
import org.apache.twill.common.Threads;
import org.apache.twill.internal.state.Message;
import org.apache.twill.internal.state.MessageCallback;
@@ -88,6 +89,7 @@ public abstract class AbstractTwillService extends AbstractExecutionThreadServic
protected final ZKClient zkClient;
protected final RunId runId;
private ExecutorService messageCallbackExecutor;
+ private Cancellable watcherCancellable;
protected AbstractTwillService(final ZKClient zkClient, RunId runId) {
this.zkClient = zkClient;
@@ -145,7 +147,7 @@ public abstract class AbstractTwillService extends AbstractExecutionThreadServic
new ThreadPoolExecutor.DiscardPolicy());
// Watch for session expiration, recreate the live node if reconnected after expiration.
- zkClient.addConnectionWatcher(new Watcher() {
+ watcherCancellable = zkClient.addConnectionWatcher(new Watcher() {
private boolean expired = false;
@Override
@@ -181,6 +183,10 @@ public abstract class AbstractTwillService extends AbstractExecutionThreadServic
@Override
protected final void shutDown() throws Exception {
+ if (watcherCancellable != null) {
+ watcherCancellable.cancel();
+ }
+
messageCallbackExecutor.shutdownNow();
try {
doStop();
http://git-wip-us.apache.org/repos/asf/twill/blob/ced2044f/twill-discovery-core/src/main/java/org/apache/twill/discovery/DefaultServiceDiscovered.java
----------------------------------------------------------------------
diff --git a/twill-discovery-core/src/main/java/org/apache/twill/discovery/DefaultServiceDiscovered.java b/twill-discovery-core/src/main/java/org/apache/twill/discovery/DefaultServiceDiscovered.java
index dbb0f7c..ca55390 100644
--- a/twill-discovery-core/src/main/java/org/apache/twill/discovery/DefaultServiceDiscovered.java
+++ b/twill-discovery-core/src/main/java/org/apache/twill/discovery/DefaultServiceDiscovered.java
@@ -20,6 +20,8 @@ package org.apache.twill.discovery;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import org.apache.twill.common.Cancellable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.util.Iterator;
import java.util.List;
@@ -36,6 +38,8 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
*/
final class DefaultServiceDiscovered implements ServiceDiscovered {
+ private static final Logger LOG = LoggerFactory.getLogger(DefaultServiceDiscovered.class);
+
private final String name;
private final AtomicReference<Set<Discoverable>> discoverables;
private final List<ListenerCaller> listenerCallers;
@@ -49,7 +53,10 @@ final class DefaultServiceDiscovered implements ServiceDiscovered {
}
void setDiscoverables(Set<Discoverable> discoverables) {
- this.discoverables.set(ImmutableSet.copyOf(discoverables));
+ Set<Discoverable> newDiscoverables = ImmutableSet.copyOf(discoverables);
+ LOG.debug("Discoverables changed: {}={}", name, newDiscoverables);
+
+ this.discoverables.set(newDiscoverables);
// Collect all listeners with a read lock to the listener list.
List<ListenerCaller> callers = Lists.newArrayList();
http://git-wip-us.apache.org/repos/asf/twill/blob/ced2044f/twill-discovery-core/src/main/java/org/apache/twill/discovery/ZKDiscoveryService.java
----------------------------------------------------------------------
diff --git a/twill-discovery-core/src/main/java/org/apache/twill/discovery/ZKDiscoveryService.java b/twill-discovery-core/src/main/java/org/apache/twill/discovery/ZKDiscoveryService.java
index 4f2d0f7..6a3f64d 100644
--- a/twill-discovery-core/src/main/java/org/apache/twill/discovery/ZKDiscoveryService.java
+++ b/twill-discovery-core/src/main/java/org/apache/twill/discovery/ZKDiscoveryService.java
@@ -20,6 +20,8 @@ package org.apache.twill.discovery;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
+import com.google.common.cache.RemovalListener;
+import com.google.common.cache.RemovalNotification;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
@@ -47,14 +49,18 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
+import javax.annotation.Nullable;
/**
* Zookeeper implementation of {@link DiscoveryService} and {@link DiscoveryServiceClient}.
@@ -93,19 +99,21 @@ import java.util.concurrent.locks.ReentrantLock;
* </pre>
* </blockquote>
*/
-public class ZKDiscoveryService implements DiscoveryService, DiscoveryServiceClient {
+public class ZKDiscoveryService implements DiscoveryService, DiscoveryServiceClient, AutoCloseable {
private static final Logger LOG = LoggerFactory.getLogger(ZKDiscoveryService.class);
private static final long RETRY_MILLIS = 1000;
+ private final AtomicBoolean closed;
// In memory map for recreating ephemeral nodes after session expires.
// It map from discoverable to the corresponding Cancellable
private final Multimap<Discoverable, DiscoveryCancellable> discoverables;
private final Lock lock;
- private final LoadingCache<String, ServiceDiscovered> services;
+ private final LoadingCache<String, ServiceDiscoveredCacheEntry> services;
private final ZKClient zkClient;
private final ScheduledExecutorService retryExecutor;
+ private final Cancellable watcherCancellable;
/**
* Constructs ZKDiscoveryService using the provided zookeeper client for storing service registry.
@@ -122,13 +130,24 @@ public class ZKDiscoveryService implements DiscoveryService, DiscoveryServiceCli
* If namespace is {@code null}, no namespace will be used.
*/
public ZKDiscoveryService(ZKClient zkClient, String namespace) {
+ this.closed = new AtomicBoolean();
this.discoverables = HashMultimap.create();
this.lock = new ReentrantLock();
this.retryExecutor = Executors.newSingleThreadScheduledExecutor(
Threads.createDaemonThreadFactory("zk-discovery-retry"));
this.zkClient = namespace == null ? zkClient : ZKClients.namespace(zkClient, namespace);
- this.services = CacheBuilder.newBuilder().build(createServiceLoader());
- this.zkClient.addConnectionWatcher(createConnectionWatcher());
+ this.services = CacheBuilder.newBuilder()
+ .removalListener(new RemovalListener<String, ServiceDiscoveredCacheEntry>() {
+ @Override
+ public void onRemoval(RemovalNotification<String, ServiceDiscoveredCacheEntry> notification) {
+ ServiceDiscoveredCacheEntry entry = notification.getValue();
+ if (entry != null) {
+ entry.cancel();
+ }
+ }
+ })
+ .build(createServiceLoader());
+ this.watcherCancellable = this.zkClient.addConnectionWatcher(createConnectionWatcher());
}
/**
@@ -146,6 +165,10 @@ public class ZKDiscoveryService implements DiscoveryService, DiscoveryServiceCli
*/
@Override
public Cancellable register(final Discoverable discoverable) {
+ if (closed.get()) {
+ throw new IllegalStateException("Cannot register discoverable through a closed ZKDiscoveryService");
+ }
+
final SettableFuture<String> future = SettableFuture.create();
final DiscoveryCancellable cancellable = new DiscoveryCancellable(discoverable);
@@ -157,7 +180,11 @@ public class ZKDiscoveryService implements DiscoveryService, DiscoveryServiceCli
cancellable.setPath(result);
lock.lock();
try {
- discoverables.put(discoverable, cancellable);
+ if (!closed.get()) {
+ discoverables.put(discoverable, cancellable);
+ } else {
+ cancellable.asyncCancel();
+ }
} finally {
lock.unlock();
}
@@ -182,9 +209,47 @@ public class ZKDiscoveryService implements DiscoveryService, DiscoveryServiceCli
@Override
public ServiceDiscovered discover(String service) {
+ if (closed.get()) {
+ throw new IllegalStateException("Cannot discover through a closed ZKDiscoveryService");
+ }
return services.getUnchecked(service);
}
+ @Override
+ public void close() {
+ if (!closed.compareAndSet(false, true)) {
+ return;
+ }
+
+ // Stop the registration retry executor
+ retryExecutor.shutdownNow();
+
+ // Cancel the connection watcher
+ watcherCancellable.cancel();
+
+ // Cancel all registered services
+ List<ListenableFuture<?>> futures = new ArrayList<>();
+ lock.lock();
+ try {
+ for (Map.Entry<Discoverable, DiscoveryCancellable> entry : discoverables.entries()) {
+ LOG.debug("Un-registering service {} - {}", entry.getKey().getName(), entry.getKey().getSocketAddress());
+ futures.add(entry.getValue().asyncCancel());
+ }
+ } finally {
+ lock.unlock();
+ }
+ try {
+ Futures.successfulAsList(futures).get();
+ LOG.debug("All services unregistered");
+ } catch (Exception e) {
+ // This is not expected to happen
+ LOG.warn("Unexpected exception when waiting for all services to get unregistered", e);
+ }
+
+ // Cancel all services being watched
+ services.invalidateAll();
+ }
+
/**
* Handle registration failure.
*
@@ -197,11 +262,14 @@ public class ZKDiscoveryService implements DiscoveryService, DiscoveryServiceCli
final SettableFuture<String> completion,
final FutureCallback<String> creationCallback,
final Throwable failureCause) {
+ if (closed.get()) {
+ return;
+ }
final String path = getNodePath(discoverable);
Futures.addCallback(zkClient.exists(path), new FutureCallback<Stat>() {
@Override
- public void onSuccess(Stat result) {
+ public void onSuccess(@Nullable Stat result) {
if (result == null) {
// If the node is gone, simply retry.
LOG.info("Node {} is gone. Retry registration for {}.", path, discoverable);
@@ -244,16 +312,21 @@ public class ZKDiscoveryService implements DiscoveryService, DiscoveryServiceCli
}
private void retryRegister(final Discoverable discoverable, final FutureCallback<String> creationCallback) {
+ if (closed.get()) {
+ return;
+ }
+
retryExecutor.schedule(new Runnable() {
@Override
public void run() {
- Futures.addCallback(doRegister(discoverable), creationCallback, Threads.SAME_THREAD_EXECUTOR);
+ if (!closed.get()) {
+ Futures.addCallback(doRegister(discoverable), creationCallback, Threads.SAME_THREAD_EXECUTOR);
+ }
}
}, RETRY_MILLIS, TimeUnit.MILLISECONDS);
}
-
/**
* Generate unique node path for a given {@link Discoverable}.
* @param discoverable An instance of {@link Discoverable}.
@@ -288,6 +361,11 @@ public class ZKDiscoveryService implements DiscoveryService, DiscoveryServiceCli
lock.lock();
try {
for (final Map.Entry<Discoverable, DiscoveryCancellable> entry : discoverables.entries()) {
+ if (closed.get()) {
+ entry.getValue().asyncCancel();
+ continue;
+ }
+
LOG.info("Re-registering service: {}", entry.getKey());
// Must be non-blocking in here.
@@ -318,22 +396,22 @@ public class ZKDiscoveryService implements DiscoveryService, DiscoveryServiceCli
/**
* Creates a CacheLoader for creating live Iterable for watching instances changes for a given service.
*/
- private CacheLoader<String, ServiceDiscovered> createServiceLoader() {
- return new CacheLoader<String, ServiceDiscovered>() {
+ private CacheLoader<String, ServiceDiscoveredCacheEntry> createServiceLoader() {
+ return new CacheLoader<String, ServiceDiscoveredCacheEntry>() {
@Override
- public ServiceDiscovered load(String service) throws Exception {
+ public ServiceDiscoveredCacheEntry load(String service) throws Exception {
final DefaultServiceDiscovered serviceDiscovered = new DefaultServiceDiscovered(service);
- final String serviceBase = "/" + service;
+ final String pathBase = "/" + service;
// Watch for children changes in /service
- ZKOperations.watchChildren(zkClient, serviceBase, new ZKOperations.ChildrenCallback() {
+ Cancellable cancellable = ZKOperations.watchChildren(zkClient, pathBase, new ZKOperations.ChildrenCallback() {
@Override
public void updated(NodeChildren nodeChildren) {
// Fetch data of all children nodes in parallel.
List<String> children = nodeChildren.getChildren();
List<OperationFuture<NodeData>> dataFutures = Lists.newArrayListWithCapacity(children.size());
for (String child : children) {
- dataFutures.add(zkClient.getData(serviceBase + "/" + child));
+ dataFutures.add(zkClient.getData(pathBase + "/" + child));
}
// Update the service map when all fetching are done.
@@ -356,7 +434,7 @@ public class ZKDiscoveryService implements DiscoveryService, DiscoveryServiceCli
}, Threads.SAME_THREAD_EXECUTOR);
}
});
- return serviceDiscovered;
+ return new ServiceDiscoveredCacheEntry(serviceDiscovered, cancellable);
}
};
}
@@ -394,18 +472,23 @@ public class ZKDiscoveryService implements DiscoveryService, DiscoveryServiceCli
@Override
public void cancel() {
+ Futures.getUnchecked(asyncCancel());
+ LOG.debug("Service unregistered: {} {}", discoverable, path);
+ }
+
+ ListenableFuture<?> asyncCancel() {
if (!cancelled.compareAndSet(false, true)) {
- return;
+ return Futures.immediateFuture(null);
}
// Take a snapshot of the volatile path.
String path = this.path;
- // If it is null, meaning cancel() is called before the ephemeral node is created, hence
+ // If it is null, meaning cancel is called before the ephemeral node is created, hence
// setPath() will be called in future (through zk callback when creation is completed)
// so that deletion will be done in setPath().
if (path == null) {
- return;
+ return Futures.immediateFuture(null);
}
// Remove this Cancellable from the map so that upon session expiration won't try to register.
@@ -418,9 +501,49 @@ public class ZKDiscoveryService implements DiscoveryService, DiscoveryServiceCli
// Delete the path. It's ok if the path not exists
// (e.g. what session expired and before node has been re-created)
- Futures.getUnchecked(ZKOperations.ignoreError(zkClient.delete(path),
- KeeperException.NoNodeException.class, path));
- LOG.debug("Service unregistered: {} {}", discoverable, path);
+ try {
+ return ZKOperations.ignoreError(zkClient.delete(path), KeeperException.NoNodeException.class, path);
+ } catch (Exception e) {
+ return Futures.immediateFailedFuture(e);
+ }
+ }
+ }
+
+ /**
+ * Class to be used as the service discovered cache entry.
+ */
+ private static final class ServiceDiscoveredCacheEntry implements Cancellable, ServiceDiscovered {
+ private final ServiceDiscovered serviceDiscovered;
+ private final Cancellable cancellable;
+
+ private ServiceDiscoveredCacheEntry(ServiceDiscovered serviceDiscovered, Cancellable cancellable) {
+ this.serviceDiscovered = serviceDiscovered;
+ this.cancellable = cancellable;
+ }
+
+ @Override
+ public void cancel() {
+ cancellable.cancel();
+ }
+
+ @Override
+ public String getName() {
+ return serviceDiscovered.getName();
+ }
+
+ @Override
+ public Cancellable watchChanges(ChangeListener listener, Executor executor) {
+ return serviceDiscovered.watchChanges(listener, executor);
+ }
+
+ @Override
+ public boolean contains(Discoverable discoverable) {
+ return serviceDiscovered.contains(discoverable);
+ }
+
+ @Override
+ public Iterator<Discoverable> iterator() {
+ return serviceDiscovered.iterator();
}
}
}
http://git-wip-us.apache.org/repos/asf/twill/blob/ced2044f/twill-discovery-core/src/test/java/org/apache/twill/discovery/DiscoveryServiceTestBase.java
----------------------------------------------------------------------
diff --git a/twill-discovery-core/src/test/java/org/apache/twill/discovery/DiscoveryServiceTestBase.java b/twill-discovery-core/src/test/java/org/apache/twill/discovery/DiscoveryServiceTestBase.java
index 8fe5a37..f2d2a83 100644
--- a/twill-discovery-core/src/test/java/org/apache/twill/discovery/DiscoveryServiceTestBase.java
+++ b/twill-discovery-core/src/test/java/org/apache/twill/discovery/DiscoveryServiceTestBase.java
@@ -25,8 +25,11 @@ import org.apache.twill.common.Cancellable;
import org.apache.twill.common.Threads;
import org.junit.Assert;
import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.net.InetSocketAddress;
+import java.nio.charset.StandardCharsets;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -41,225 +44,260 @@ import java.util.concurrent.TimeUnit;
*/
public abstract class DiscoveryServiceTestBase {
+ private static final Logger LOG = LoggerFactory.getLogger(DiscoveryServiceTestBase.class);
+
protected abstract Map.Entry<DiscoveryService, DiscoveryServiceClient> create();
@Test
public void simpleDiscoverable() throws Exception {
- final String payload = "data";
+ final byte[] payload = "data".getBytes(StandardCharsets.UTF_8);
Map.Entry<DiscoveryService, DiscoveryServiceClient> entry = create();
- DiscoveryService discoveryService = entry.getKey();
- DiscoveryServiceClient discoveryServiceClient = entry.getValue();
+ try {
+ DiscoveryService discoveryService = entry.getKey();
+ DiscoveryServiceClient discoveryServiceClient = entry.getValue();
- // Register one service running on one host:port
- Cancellable cancellable = register(discoveryService, "foo", "localhost", 8090, payload.getBytes());
+ // Register one service running on one host:port
+ Cancellable cancellable = register(discoveryService, "foo", "localhost", 8090, payload);
- // Discover that registered host:port.
- ServiceDiscovered serviceDiscovered = discoveryServiceClient.discover("foo");
- Assert.assertTrue(waitTillExpected(1, serviceDiscovered));
+ // Discover that registered host:port.
+ ServiceDiscovered serviceDiscovered = discoveryServiceClient.discover("foo");
+ Assert.assertTrue(waitTillExpected(1, serviceDiscovered));
- Discoverable discoverable = new Discoverable("foo", new InetSocketAddress("localhost", 8090), payload.getBytes());
+ Discoverable discoverable = new Discoverable("foo", new InetSocketAddress("localhost", 8090), payload);
- // Check it exists.
- Assert.assertTrue(serviceDiscovered.contains(discoverable));
+ // Check it exists.
+ Assert.assertTrue(serviceDiscovered.contains(discoverable));
- // Remove the service
- cancellable.cancel();
+ // Remove the service
+ cancellable.cancel();
- // There should be no service.
- Assert.assertTrue(waitTillExpected(0, serviceDiscovered));
+ // There should be no service.
+ Assert.assertTrue(waitTillExpected(0, serviceDiscovered));
- Assert.assertFalse(serviceDiscovered.contains(discoverable));
+ Assert.assertFalse(serviceDiscovered.contains(discoverable));
+ } finally {
+ closeServices(entry);
+ }
}
@Test
public void testChangeListener() throws InterruptedException {
Map.Entry<DiscoveryService, DiscoveryServiceClient> entry = create();
- DiscoveryService discoveryService = entry.getKey();
- DiscoveryServiceClient discoveryServiceClient = entry.getValue();
-
- // Start discovery
- String serviceName = "listener_test";
- ServiceDiscovered serviceDiscovered = discoveryServiceClient.discover(serviceName);
+ try {
+ DiscoveryService discoveryService = entry.getKey();
+ DiscoveryServiceClient discoveryServiceClient = entry.getValue();
+
+ // Start discovery
+ final String serviceName = "listener_test";
+ ServiceDiscovered serviceDiscovered = discoveryServiceClient.discover(serviceName);
+
+ // Watch for changes.
+ final BlockingQueue<List<Discoverable>> events = new ArrayBlockingQueue<List<Discoverable>>(10);
+ serviceDiscovered.watchChanges(new ServiceDiscovered.ChangeListener() {
+ @Override
+ public void onChange(ServiceDiscovered serviceDiscovered) {
+ List<Discoverable> discoverables = ImmutableList.copyOf(serviceDiscovered);
+ LOG.info("ServiceDiscovered callback: {}={}", serviceName, discoverables);
+ events.add(discoverables);
+ }
+ }, Threads.SAME_THREAD_EXECUTOR);
- // Watch for changes.
- final BlockingQueue<List<Discoverable>> events = new ArrayBlockingQueue<List<Discoverable>>(10);
- serviceDiscovered.watchChanges(new ServiceDiscovered.ChangeListener() {
- @Override
- public void onChange(ServiceDiscovered serviceDiscovered) {
- events.add(ImmutableList.copyOf(serviceDiscovered));
- }
- }, Threads.SAME_THREAD_EXECUTOR);
+ // An empty list will be received first, as no endpoint has been registered.
+ List<Discoverable> discoverables = events.poll(20, TimeUnit.SECONDS);
+ Assert.assertNotNull(discoverables);
+ Assert.assertTrue(discoverables.isEmpty());
- // An empty list will be received first, as no endpoint has been registered.
- List<Discoverable> discoverables = events.poll(20, TimeUnit.SECONDS);
- Assert.assertNotNull(discoverables);
- Assert.assertTrue(discoverables.isEmpty());
+ // Register a service
+ Cancellable cancellable = register(discoveryService, serviceName, "localhost", 10000);
- // Register a service
- Cancellable cancellable = register(discoveryService, serviceName, "localhost", 10000);
+ discoverables = events.poll(20, TimeUnit.SECONDS);
+ Assert.assertNotNull(discoverables);
- discoverables = events.poll(20, TimeUnit.SECONDS);
- Assert.assertNotNull(discoverables);
- Assert.assertEquals(1, discoverables.size());
+ // It's possible that the list retrieved is empty in ZK case due to the creation of the parent node
+ // when creating the service discovery ZK node
+ if (discoverables.isEmpty()) {
+ // If that's the case, poll it again and it should be non-empty
+ discoverables = events.poll(20, TimeUnit.SECONDS);
+ }
+ Assert.assertEquals(1, discoverables.size());
- // Register another service endpoint
- Cancellable cancellable2 = register(discoveryService, serviceName, "localhost", 10001);
+ // Register another service endpoint
+ Cancellable cancellable2 = register(discoveryService, serviceName, "localhost", 10001);
- discoverables = events.poll(20, TimeUnit.SECONDS);
- Assert.assertNotNull(discoverables);
- Assert.assertEquals(2, discoverables.size());
+ discoverables = events.poll(20, TimeUnit.SECONDS);
+ Assert.assertNotNull(discoverables);
+ Assert.assertEquals(2, discoverables.size());
- // Cancel both of them
- cancellable.cancel();
- cancellable2.cancel();
+ // Cancel both of them
+ cancellable.cancel();
+ cancellable2.cancel();
- // There could be more than one event triggered, but the last event should be an empty list.
- discoverables = events.poll(20, TimeUnit.SECONDS);
- Assert.assertNotNull(discoverables);
- if (!discoverables.isEmpty()) {
+ // There could be more than one event triggered, but the last event should be an empty list.
discoverables = events.poll(20, TimeUnit.SECONDS);
- }
+ Assert.assertNotNull(discoverables);
+ if (!discoverables.isEmpty()) {
+ discoverables = events.poll(20, TimeUnit.SECONDS);
+ }
- Assert.assertTrue(discoverables.isEmpty());
+ Assert.assertTrue(discoverables.isEmpty());
+ } finally {
+ closeServices(entry);
+ }
}
@Test
public void testCancelChangeListener() throws InterruptedException {
Map.Entry<DiscoveryService, DiscoveryServiceClient> entry = create();
- DiscoveryService discoveryService = entry.getKey();
- DiscoveryServiceClient discoveryServiceClient = entry.getValue();
-
- String serviceName = "cancel_listener";
- ServiceDiscovered serviceDiscovered = discoveryServiceClient.discover(serviceName);
-
- // An executor that delay execute a Runnable. It's for testing race because listener cancel and discovery changes.
- Executor delayExecutor = new Executor() {
- @Override
- public void execute(final Runnable command) {
- Thread t = new Thread() {
- @Override
- public void run() {
- try {
- TimeUnit.SECONDS.sleep(2);
- command.run();
- } catch (InterruptedException e) {
- throw Throwables.propagate(e);
+ try {
+ DiscoveryService discoveryService = entry.getKey();
+ DiscoveryServiceClient discoveryServiceClient = entry.getValue();
+
+ String serviceName = "cancel_listener";
+ ServiceDiscovered serviceDiscovered = discoveryServiceClient.discover(serviceName);
+
+ // An executor that delay execute a Runnable. It's for testing race because listener cancel and discovery changes.
+ Executor delayExecutor = new Executor() {
+ @Override
+ public void execute(final Runnable command) {
+ Thread t = new Thread() {
+ @Override
+ public void run() {
+ try {
+ TimeUnit.SECONDS.sleep(2);
+ command.run();
+ } catch (InterruptedException e) {
+ throw Throwables.propagate(e);
+ }
}
- }
- };
- t.start();
- }
- };
+ };
+ t.start();
+ }
+ };
- final BlockingQueue<List<Discoverable>> events = new ArrayBlockingQueue<List<Discoverable>>(10);
- Cancellable cancelWatch = serviceDiscovered.watchChanges(new ServiceDiscovered.ChangeListener() {
- @Override
- public void onChange(ServiceDiscovered serviceDiscovered) {
- events.add(ImmutableList.copyOf(serviceDiscovered));
- }
- }, delayExecutor);
+ final BlockingQueue<List<Discoverable>> events = new ArrayBlockingQueue<List<Discoverable>>(10);
+ Cancellable cancelWatch = serviceDiscovered.watchChanges(new ServiceDiscovered.ChangeListener() {
+ @Override
+ public void onChange(ServiceDiscovered serviceDiscovered) {
+ events.add(ImmutableList.copyOf(serviceDiscovered));
+ }
+ }, delayExecutor);
- // Wait for the init event call
- Assert.assertNotNull(events.poll(3, TimeUnit.SECONDS));
+ // Wait for the init event call
+ Assert.assertNotNull(events.poll(3, TimeUnit.SECONDS));
- // Register a new service endpoint, wait a short while and then cancel the listener
- register(discoveryService, serviceName, "localhost", 1);
- TimeUnit.SECONDS.sleep(1);
- cancelWatch.cancel();
+ // Register a new service endpoint, wait a short while and then cancel the listener
+ register(discoveryService, serviceName, "localhost", 1);
+ TimeUnit.SECONDS.sleep(1);
+ cancelWatch.cancel();
- // The change listener shouldn't get any event, since the invocation is delayed by the executor.
- Assert.assertNull(events.poll(3, TimeUnit.SECONDS));
+ // The change listener shouldn't get any event, since the invocation is delayed by the executor.
+ Assert.assertNull(events.poll(3, TimeUnit.SECONDS));
+ } finally {
+ closeServices(entry);
+ }
}
@Test
public void manySameDiscoverable() throws Exception {
Map.Entry<DiscoveryService, DiscoveryServiceClient> entry = create();
- DiscoveryService discoveryService = entry.getKey();
- DiscoveryServiceClient discoveryServiceClient = entry.getValue();
+ try {
+ DiscoveryService discoveryService = entry.getKey();
+ DiscoveryServiceClient discoveryServiceClient = entry.getValue();
- List<Cancellable> cancellables = Lists.newArrayList();
+ List<Cancellable> cancellables = Lists.newArrayList();
- cancellables.add(register(discoveryService, "manyDiscoverable", "localhost", 1));
- cancellables.add(register(discoveryService, "manyDiscoverable", "localhost", 2));
- cancellables.add(register(discoveryService, "manyDiscoverable", "localhost", 3));
- cancellables.add(register(discoveryService, "manyDiscoverable", "localhost", 4));
- cancellables.add(register(discoveryService, "manyDiscoverable", "localhost", 5));
+ cancellables.add(register(discoveryService, "manyDiscoverable", "localhost", 1));
+ cancellables.add(register(discoveryService, "manyDiscoverable", "localhost", 2));
+ cancellables.add(register(discoveryService, "manyDiscoverable", "localhost", 3));
+ cancellables.add(register(discoveryService, "manyDiscoverable", "localhost", 4));
+ cancellables.add(register(discoveryService, "manyDiscoverable", "localhost", 5));
- ServiceDiscovered serviceDiscovered = discoveryServiceClient.discover("manyDiscoverable");
- Assert.assertTrue(waitTillExpected(5, serviceDiscovered));
+ ServiceDiscovered serviceDiscovered = discoveryServiceClient.discover("manyDiscoverable");
+ Assert.assertTrue(waitTillExpected(5, serviceDiscovered));
- for (int i = 0; i < 5; i++) {
- cancellables.get(i).cancel();
- Assert.assertTrue(waitTillExpected(4 - i, serviceDiscovered));
+ for (int i = 0; i < 5; i++) {
+ cancellables.get(i).cancel();
+ Assert.assertTrue(waitTillExpected(4 - i, serviceDiscovered));
+ }
+ } finally {
+ closeServices(entry);
}
}
@Test
public void multiServiceDiscoverable() throws Exception {
Map.Entry<DiscoveryService, DiscoveryServiceClient> entry = create();
- DiscoveryService discoveryService = entry.getKey();
- DiscoveryServiceClient discoveryServiceClient = entry.getValue();
+ try {
+ DiscoveryService discoveryService = entry.getKey();
+ DiscoveryServiceClient discoveryServiceClient = entry.getValue();
- List<Cancellable> cancellables = Lists.newArrayList();
+ List<Cancellable> cancellables = Lists.newArrayList();
- cancellables.add(register(discoveryService, "service1", "localhost", 1));
- cancellables.add(register(discoveryService, "service1", "localhost", 2));
- cancellables.add(register(discoveryService, "service1", "localhost", 3));
- cancellables.add(register(discoveryService, "service1", "localhost", 4));
- cancellables.add(register(discoveryService, "service1", "localhost", 5));
+ cancellables.add(register(discoveryService, "service1", "localhost", 1));
+ cancellables.add(register(discoveryService, "service1", "localhost", 2));
+ cancellables.add(register(discoveryService, "service1", "localhost", 3));
+ cancellables.add(register(discoveryService, "service1", "localhost", 4));
+ cancellables.add(register(discoveryService, "service1", "localhost", 5));
- cancellables.add(register(discoveryService, "service2", "localhost", 1));
- cancellables.add(register(discoveryService, "service2", "localhost", 2));
- cancellables.add(register(discoveryService, "service2", "localhost", 3));
+ cancellables.add(register(discoveryService, "service2", "localhost", 1));
+ cancellables.add(register(discoveryService, "service2", "localhost", 2));
+ cancellables.add(register(discoveryService, "service2", "localhost", 3));
- cancellables.add(register(discoveryService, "service3", "localhost", 1));
- cancellables.add(register(discoveryService, "service3", "localhost", 2));
+ cancellables.add(register(discoveryService, "service3", "localhost", 1));
+ cancellables.add(register(discoveryService, "service3", "localhost", 2));
- ServiceDiscovered serviceDiscovered = discoveryServiceClient.discover("service1");
- Assert.assertTrue(waitTillExpected(5, serviceDiscovered));
+ ServiceDiscovered serviceDiscovered = discoveryServiceClient.discover("service1");
+ Assert.assertTrue(waitTillExpected(5, serviceDiscovered));
- serviceDiscovered = discoveryServiceClient.discover("service2");
- Assert.assertTrue(waitTillExpected(3, serviceDiscovered));
+ serviceDiscovered = discoveryServiceClient.discover("service2");
+ Assert.assertTrue(waitTillExpected(3, serviceDiscovered));
- serviceDiscovered = discoveryServiceClient.discover("service3");
- Assert.assertTrue(waitTillExpected(2, serviceDiscovered));
+ serviceDiscovered = discoveryServiceClient.discover("service3");
+ Assert.assertTrue(waitTillExpected(2, serviceDiscovered));
- cancellables.add(register(discoveryService, "service3", "localhost", 3));
- Assert.assertTrue(waitTillExpected(3, serviceDiscovered)); // Shows live iterator.
+ cancellables.add(register(discoveryService, "service3", "localhost", 3));
+ Assert.assertTrue(waitTillExpected(3, serviceDiscovered)); // Shows live iterator.
- for (Cancellable cancellable : cancellables) {
- cancellable.cancel();
- }
+ for (Cancellable cancellable : cancellables) {
+ cancellable.cancel();
+ }
- Assert.assertTrue(waitTillExpected(0, discoveryServiceClient.discover("service1")));
- Assert.assertTrue(waitTillExpected(0, discoveryServiceClient.discover("service2")));
- Assert.assertTrue(waitTillExpected(0, discoveryServiceClient.discover("service3")));
+ Assert.assertTrue(waitTillExpected(0, discoveryServiceClient.discover("service1")));
+ Assert.assertTrue(waitTillExpected(0, discoveryServiceClient.discover("service2")));
+ Assert.assertTrue(waitTillExpected(0, discoveryServiceClient.discover("service3")));
+ } finally {
+ closeServices(entry);
+ }
}
@Test
public void testIterator() throws InterruptedException {
// This test is to verify TWILL-75
Map.Entry<DiscoveryService, DiscoveryServiceClient> entry = create();
- final DiscoveryService service = entry.getKey();
- DiscoveryServiceClient client = entry.getValue();
+ try {
+ final DiscoveryService service = entry.getKey();
+ DiscoveryServiceClient client = entry.getValue();
- final String serviceName = "iterator";
- ServiceDiscovered discovered = client.discover(serviceName);
+ final String serviceName = "iterator";
+ ServiceDiscovered discovered = client.discover(serviceName);
- // Create a thread for performing registration.
- Thread t = new Thread() {
- @Override
- public void run() {
- service.register(new Discoverable(serviceName, new InetSocketAddress(12345), new byte[]{}));
- }
- };
+ // Create a thread for performing registration.
+ Thread t = new Thread() {
+ @Override
+ public void run() {
+ service.register(new Discoverable(serviceName, new InetSocketAddress(12345), new byte[]{}));
+ }
+ };
- Iterator<Discoverable> iterator = discovered.iterator();
- t.start();
- t.join();
+ Iterator<Discoverable> iterator = discovered.iterator();
+ t.start();
+ t.join();
- // This would throw exception if there is race condition.
- Assert.assertFalse(iterator.hasNext());
+ // This would throw exception if there is race condition.
+ Assert.assertFalse(iterator.hasNext());
+ } finally {
+ closeServices(entry);
+ }
}
protected Cancellable register(DiscoveryService service, final String name, final String host, final int port) {
@@ -288,4 +326,21 @@ public abstract class DiscoveryServiceTestBase {
throw Throwables.propagate(e);
}
}
+
+ protected final void closeServices(Map.Entry<DiscoveryService, DiscoveryServiceClient> entry) {
+ if (entry.getKey() instanceof AutoCloseable) {
+ try {
+ ((AutoCloseable) entry.getKey()).close();
+ } catch (Exception e) {
+ LOG.warn("Failed to close DiscoveryService {}", entry.getKey(), e);
+ }
+ }
+ if (entry.getValue() instanceof AutoCloseable) {
+ try {
+ ((AutoCloseable) entry.getValue()).close();
+ } catch (Exception e) {
+ LOG.warn("Failed to close DiscoveryServiceClient {}", entry.getValue(), e);
+ }
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/twill/blob/ced2044f/twill-discovery-core/src/test/java/org/apache/twill/discovery/ZKDiscoveryServiceTest.java
----------------------------------------------------------------------
diff --git a/twill-discovery-core/src/test/java/org/apache/twill/discovery/ZKDiscoveryServiceTest.java b/twill-discovery-core/src/test/java/org/apache/twill/discovery/ZKDiscoveryServiceTest.java
index 7d6e369..dcf3935 100644
--- a/twill-discovery-core/src/test/java/org/apache/twill/discovery/ZKDiscoveryServiceTest.java
+++ b/twill-discovery-core/src/test/java/org/apache/twill/discovery/ZKDiscoveryServiceTest.java
@@ -66,80 +66,86 @@ public class ZKDiscoveryServiceTest extends DiscoveryServiceTestBase {
@Test (timeout = 30000)
public void testDoubleRegister() throws Exception {
Map.Entry<DiscoveryService, DiscoveryServiceClient> entry = create();
- DiscoveryService discoveryService = entry.getKey();
- DiscoveryServiceClient discoveryServiceClient = entry.getValue();
-
- // Register on the same host port, it shouldn't fail.
- Cancellable cancellable = register(discoveryService, "test_double_reg", "localhost", 54321);
- Cancellable cancellable2 = register(discoveryService, "test_double_reg", "localhost", 54321);
-
- ServiceDiscovered discoverables = discoveryServiceClient.discover("test_double_reg");
-
- Assert.assertTrue(waitTillExpected(1, discoverables));
+ try {
+ DiscoveryService discoveryService = entry.getKey();
+ DiscoveryServiceClient discoveryServiceClient = entry.getValue();
- cancellable.cancel();
- cancellable2.cancel();
+ // Register on the same host port, it shouldn't fail.
+ Cancellable cancellable = register(discoveryService, "test_double_reg", "localhost", 54321);
+ Cancellable cancellable2 = register(discoveryService, "test_double_reg", "localhost", 54321);
- // Register again with two different clients, but killing session of the first one.
- final ZKClientService zkClient2 = ZKClientServices.delegate(
- ZKClients.retryOnFailure(
- ZKClients.reWatchOnExpire(
- ZKClientService.Builder.of(zkServer.getConnectionStr()).build()),
- RetryStrategies.fixDelay(1, TimeUnit.SECONDS)));
- zkClient2.startAndWait();
+ ServiceDiscovered discoverables = discoveryServiceClient.discover("test_double_reg");
- try {
- DiscoveryService discoveryService2 = new ZKDiscoveryService(zkClient2);
- cancellable2 = register(discoveryService2, "test_multi_client", "localhost", 54321);
-
- // Schedule a thread to shutdown zkClient2.
- new Thread() {
- @Override
- public void run() {
- try {
- TimeUnit.SECONDS.sleep(2);
- zkClient2.stopAndWait();
- } catch (InterruptedException e) {
- LOG.error(e.getMessage(), e);
- }
- }
- }.start();
+ Assert.assertTrue(waitTillExpected(1, discoverables));
- // This call would block until zkClient2 is shutdown.
- cancellable = register(discoveryService, "test_multi_client", "localhost", 54321);
cancellable.cancel();
-
+ cancellable2.cancel();
+
+ // Register again with two different clients, but killing session of the first one.
+ final ZKClientService zkClient2 = ZKClientServices.delegate(
+ ZKClients.retryOnFailure(
+ ZKClients.reWatchOnExpire(
+ ZKClientService.Builder.of(zkServer.getConnectionStr()).build()),
+ RetryStrategies.fixDelay(1, TimeUnit.SECONDS)));
+ zkClient2.startAndWait();
+
+ try (ZKDiscoveryService discoveryService2 = new ZKDiscoveryService(zkClient2)) {
+ cancellable2 = register(discoveryService2, "test_multi_client", "localhost", 54321);
+
+ // Schedule a thread to shutdown zkClient2.
+ new Thread() {
+ @Override
+ public void run() {
+ try {
+ TimeUnit.SECONDS.sleep(2);
+ zkClient2.stopAndWait();
+ } catch (InterruptedException e) {
+ LOG.error(e.getMessage(), e);
+ }
+ }
+ }.start();
+
+ // This call would block until zkClient2 is shutdown.
+ cancellable = register(discoveryService, "test_multi_client", "localhost", 54321);
+ cancellable.cancel();
+ } finally {
+ zkClient2.stopAndWait();
+ }
} finally {
- zkClient2.stopAndWait();
+ closeServices(entry);
}
}
@Test
public void testSessionExpires() throws Exception {
Map.Entry<DiscoveryService, DiscoveryServiceClient> entry = create();
- DiscoveryService discoveryService = entry.getKey();
- DiscoveryServiceClient discoveryServiceClient = entry.getValue();
+ try {
+ DiscoveryService discoveryService = entry.getKey();
+ DiscoveryServiceClient discoveryServiceClient = entry.getValue();
- Cancellable cancellable = register(discoveryService, "test_expires", "localhost", 54321);
+ Cancellable cancellable = register(discoveryService, "test_expires", "localhost", 54321);
- ServiceDiscovered discoverables = discoveryServiceClient.discover("test_expires");
+ ServiceDiscovered discoverables = discoveryServiceClient.discover("test_expires");
- // Discover that registered host:port.
- Assert.assertTrue(waitTillExpected(1, discoverables));
+ // Discover that registered host:port.
+ Assert.assertTrue(waitTillExpected(1, discoverables));
- KillZKSession.kill(zkClient.getZooKeeperSupplier().get(), zkServer.getConnectionStr(), 10000);
+ KillZKSession.kill(zkClient.getZooKeeperSupplier().get(), zkServer.getConnectionStr(), 10000);
- // Register one more endpoint to make sure state has been reflected after reconnection
- Cancellable cancellable2 = register(discoveryService, "test_expires", "localhost", 54322);
+ // Register one more endpoint to make sure state has been reflected after reconnection
+ Cancellable cancellable2 = register(discoveryService, "test_expires", "localhost", 54322);
- // Reconnection would trigger re-registration.
- Assert.assertTrue(waitTillExpected(2, discoverables));
+ // Reconnection would trigger re-registration.
+ Assert.assertTrue(waitTillExpected(2, discoverables));
- cancellable.cancel();
- cancellable2.cancel();
+ cancellable.cancel();
+ cancellable2.cancel();
- // Verify that both are now gone.
- Assert.assertTrue(waitTillExpected(0, discoverables));
+ // Verify that both are now gone.
+ Assert.assertTrue(waitTillExpected(0, discoverables));
+ } finally {
+ closeServices(entry);
+ }
}
@Override
http://git-wip-us.apache.org/repos/asf/twill/blob/ced2044f/twill-yarn/src/main/java/org/apache/twill/internal/container/TwillContainerMain.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/container/TwillContainerMain.java b/twill-yarn/src/main/java/org/apache/twill/internal/container/TwillContainerMain.java
index 3ea786a..efccd89 100644
--- a/twill-yarn/src/main/java/org/apache/twill/internal/container/TwillContainerMain.java
+++ b/twill-yarn/src/main/java/org/apache/twill/internal/container/TwillContainerMain.java
@@ -20,6 +20,7 @@ package org.apache.twill.internal.container;
import com.google.common.base.Charsets;
import com.google.common.base.Strings;
import com.google.common.io.Files;
+import com.google.common.util.concurrent.AbstractIdleService;
import com.google.common.util.concurrent.AbstractService;
import com.google.common.util.concurrent.Service;
import org.apache.hadoop.conf.Configuration;
@@ -102,9 +103,11 @@ public final class TwillContainerMain extends ServiceMain {
createAppLocation(conf));
new TwillContainerMain().doMain(
service,
- new LogFlushService(),
zkClientService,
- new TwillZKPathService(containerZKClient, runId));
+ new LogFlushService(),
+ new TwillZKPathService(containerZKClient, runId),
+ new CloseableServiceWrapper(discoveryService)
+ );
}
@Override
@@ -196,4 +199,27 @@ public final class TwillContainerMain extends ServiceMain {
notifyStopped();
}
}
+
+ /**
+ * A wrapper to adapt a {@link AutoCloseable} to a {@link ServiceMain}. The service has no-op during start up
+ * and will call the {@link AutoCloseable#close()} on shutdown.
+ */
+ private static final class CloseableServiceWrapper extends AbstractIdleService {
+
+ private final AutoCloseable closeable;
+
+ private CloseableServiceWrapper(AutoCloseable closeable) {
+ this.closeable = closeable;
+ }
+
+ @Override
+ protected void startUp() throws Exception {
+ // no-op
+ }
+
+ @Override
+ protected void shutDown() throws Exception {
+ closeable.close();
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/twill/blob/ced2044f/twill-yarn/src/test/java/org/apache/twill/yarn/BaseYarnTest.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/test/java/org/apache/twill/yarn/BaseYarnTest.java b/twill-yarn/src/test/java/org/apache/twill/yarn/BaseYarnTest.java
index 2cac2f3..4f95391 100644
--- a/twill-yarn/src/test/java/org/apache/twill/yarn/BaseYarnTest.java
+++ b/twill-yarn/src/test/java/org/apache/twill/yarn/BaseYarnTest.java
@@ -24,8 +24,11 @@ import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.twill.api.TwillController;
import org.apache.twill.api.TwillRunner;
import org.junit.After;
+import org.junit.Before;
import org.junit.ClassRule;
+import org.junit.Rule;
import org.junit.rules.TemporaryFolder;
+import org.junit.rules.TestName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -66,6 +69,18 @@ public abstract class BaseYarnTest {
}
};
+ @Rule
+ public final TestName testName = new TestName();
+
+ @Before
+ public void beforeTest() {
+ LOG.info("Before test {}", testName.getMethodName());
+ }
+
+ @After
+ public void afterTest() {
+ LOG.info("After test {}", testName.getMethodName());
+ }
@After
public final void cleanupTest() {
http://git-wip-us.apache.org/repos/asf/twill/blob/ced2044f/twill-yarn/src/test/java/org/apache/twill/yarn/EchoServerTestRun.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/test/java/org/apache/twill/yarn/EchoServerTestRun.java b/twill-yarn/src/test/java/org/apache/twill/yarn/EchoServerTestRun.java
index 211b7ba..278a436 100644
--- a/twill-yarn/src/test/java/org/apache/twill/yarn/EchoServerTestRun.java
+++ b/twill-yarn/src/test/java/org/apache/twill/yarn/EchoServerTestRun.java
@@ -197,7 +197,7 @@ public final class EchoServerTestRun extends BaseYarnTest {
}
// There should be two instances up and running.
- echoServices = controller.discoverService("echo");
+ echoServices = controllers.get(1).discoverService("echo");
Assert.assertTrue(waitForSize(echoServices, 2, 120));
// Stop one instance of the app
@@ -207,7 +207,7 @@ public final class EchoServerTestRun extends BaseYarnTest {
Assert.assertNotNull(zkClient.exists("/EchoServer").get());
// We should still be able to do discovery, which depends on the ZK node.
- echoServices = controller.discoverService("echo");
+ echoServices = controllers.get(1).discoverService("echo");
Assert.assertTrue(waitForSize(echoServices, 1, 120));
// Stop second instance of the app
http://git-wip-us.apache.org/repos/asf/twill/blob/ced2044f/twill-yarn/src/test/java/org/apache/twill/yarn/InitializeFailTestRun.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/test/java/org/apache/twill/yarn/InitializeFailTestRun.java b/twill-yarn/src/test/java/org/apache/twill/yarn/InitializeFailTestRun.java
index 64f5a69..c446879 100644
--- a/twill-yarn/src/test/java/org/apache/twill/yarn/InitializeFailTestRun.java
+++ b/twill-yarn/src/test/java/org/apache/twill/yarn/InitializeFailTestRun.java
@@ -23,7 +23,6 @@ import org.apache.twill.api.TwillController;
import org.apache.twill.api.TwillRunner;
import org.apache.twill.api.logging.LogEntry;
import org.apache.twill.api.logging.LogHandler;
-import org.apache.twill.api.logging.LogThrowable;
import org.apache.twill.api.logging.PrinterLogHandler;
import org.junit.Assert;
import org.junit.Test;
@@ -48,15 +47,8 @@ public class InitializeFailTestRun extends BaseYarnTest {
LogHandler logVerifyHandler = new LogHandler() {
@Override
public void onLog(LogEntry logEntry) {
- LogThrowable logThrowable = logEntry.getThrowable();
- if (logThrowable != null) {
- while (logThrowable.getCause() != null) {
- logThrowable = logThrowable.getCause();
- }
- if (IllegalStateException.class.getName().equals(logThrowable.getClassName())
- && logThrowable.getMessage().contains("Fail to init")) {
- logLatch.countDown();
- }
+ if (logEntry.getMessage().endsWith("exited abnormally with state COMPLETE, exit code 10.")) {
+ logLatch.countDown();
}
}
};
http://git-wip-us.apache.org/repos/asf/twill/blob/ced2044f/twill-yarn/src/test/java/org/apache/twill/yarn/YarnTestSuite.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/test/java/org/apache/twill/yarn/YarnTestSuite.java b/twill-yarn/src/test/java/org/apache/twill/yarn/YarnTestSuite.java
index f1e1ac9..6dd281d 100644
--- a/twill-yarn/src/test/java/org/apache/twill/yarn/YarnTestSuite.java
+++ b/twill-yarn/src/test/java/org/apache/twill/yarn/YarnTestSuite.java
@@ -44,5 +44,4 @@ import org.junit.runners.Suite;
RestartRunnableTestRun.class
})
public final class YarnTestSuite extends BaseYarnTest {
-
}
http://git-wip-us.apache.org/repos/asf/twill/blob/ced2044f/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/LeaderElection.java
----------------------------------------------------------------------
diff --git a/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/LeaderElection.java b/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/LeaderElection.java
index 8433b18..d8bb49d 100644
--- a/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/LeaderElection.java
+++ b/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/LeaderElection.java
@@ -25,6 +25,7 @@ import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import org.apache.twill.api.ElectionHandler;
+import org.apache.twill.common.Cancellable;
import org.apache.twill.common.Threads;
import org.apache.twill.zookeeper.NodeChildren;
import org.apache.twill.zookeeper.NodeData;
@@ -70,6 +71,7 @@ public final class LeaderElection extends AbstractService {
private ExecutorService executor;
private String zkNodePath;
private State state;
+ private Cancellable watcherCancellable;
public LeaderElection(ZKClient zkClient, String prefix, ElectionHandler handler) {
this.guid = UUID.randomUUID().toString();
@@ -89,7 +91,7 @@ public final class LeaderElection extends AbstractService {
@Override
public void run() {
register();
- LeaderElection.this.zkClient.addConnectionWatcher(wrapWatcher(new ConnectionWatcher()));
+ watcherCancellable = zkClient.addConnectionWatcher(wrapWatcher(new ConnectionWatcher()));
}
});
notifyStarted();
@@ -121,6 +123,9 @@ public final class LeaderElection extends AbstractService {
executor.execute(new Runnable() {
@Override
public void run() {
+ if (watcherCancellable != null) {
+ watcherCancellable.cancel();
+ }
if (state != State.CANCELLED) {
// becomeFollower has to be called before deleting node to make sure no two active leader.
if (state == State.LEADER) {