You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@twill.apache.org by ch...@apache.org on 2015/06/13 02:13:14 UTC
incubator-twill git commit: (TWILL-133) Make the ZKClient stoppable
even it is still trying to connect
Repository: incubator-twill
Updated Branches:
refs/heads/feature/twill-133 [created] a50bbb601
(TWILL-133) Make the ZKClient stoppable even it is still trying to connect
- Also include refactoring of the DefaultZKClient class to simplify
shutdown logic
Project: http://git-wip-us.apache.org/repos/asf/incubator-twill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-twill/commit/a50bbb60
Tree: http://git-wip-us.apache.org/repos/asf/incubator-twill/tree/a50bbb60
Diff: http://git-wip-us.apache.org/repos/asf/incubator-twill/diff/a50bbb60
Branch: refs/heads/feature/twill-133
Commit: a50bbb6012a45b4c0fc9aae539ea663160263a5f
Parents: 0d3c5ee
Author: Terence Yim <ch...@apache.org>
Authored: Fri Jun 12 17:13:05 2015 -0700
Committer: Terence Yim <ch...@apache.org>
Committed: Fri Jun 12 17:13:05 2015 -0700
----------------------------------------------------------------------
.../zookeeper/DefaultZKClientService.java | 115 ++++++++++++-------
.../apache/twill/zookeeper/ZKClientTest.java | 85 +++++++++-----
2 files changed, 130 insertions(+), 70 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/a50bbb60/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/DefaultZKClientService.java
----------------------------------------------------------------------
diff --git a/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/DefaultZKClientService.java b/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/DefaultZKClientService.java
index 7b9b345..aeb6499 100644
--- a/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/DefaultZKClientService.java
+++ b/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/DefaultZKClientService.java
@@ -58,7 +58,6 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import javax.annotation.Nullable;
@@ -91,11 +90,11 @@ public final class DefaultZKClientService extends AbstractZKClient implements ZK
Watcher connectionWatcher, Multimap<String, byte[]> authInfos) {
this.zkStr = zkStr;
this.sessionTimeout = sessionTimeout;
- this.connectionWatchers = new CopyOnWriteArrayList<Watcher>();
+ this.connectionWatchers = new CopyOnWriteArrayList<>();
this.authInfos = copyAuthInfo(authInfos);
addConnectionWatcher(connectionWatcher);
- this.zooKeeper = new AtomicReference<ZooKeeper>();
+ this.zooKeeper = new AtomicReference<>();
serviceDelegate = new ServiceDelegate();
}
@@ -111,7 +110,7 @@ public final class DefaultZKClientService extends AbstractZKClient implements ZK
}
@Override
- public Cancellable addConnectionWatcher(Watcher watcher) {
+ public Cancellable addConnectionWatcher(final Watcher watcher) {
if (watcher == null) {
return new Cancellable() {
@Override
@@ -121,12 +120,11 @@ public final class DefaultZKClientService extends AbstractZKClient implements ZK
};
}
- final Watcher wrappedWatcher = wrapWatcher(watcher);
- connectionWatchers.add(wrappedWatcher);
+ connectionWatchers.add(watcher);
return new Cancellable() {
@Override
public void cancel() {
- connectionWatchers.remove(wrappedWatcher);
+ connectionWatchers.remove(watcher);
}
};
}
@@ -391,25 +389,47 @@ public final class DefaultZKClientService extends AbstractZKClient implements ZK
private final class ServiceDelegate extends AbstractService implements Watcher {
- private final AtomicBoolean stopNotified = new AtomicBoolean(false);
- private volatile boolean executorStopped;
+ private ServiceDelegate() {
+ addListener(new Listener() {
+ @Override
+ public void starting() {
+ // no-op
+ }
- @Override
- protected void doStart() {
- // A single thread executor
- eventExecutor = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(),
- Threads.createDaemonThreadFactory("zk-client-EventThread")) {
@Override
- protected void terminated() {
- super.terminated();
+ public void running() {
+ // no-op
+ }
- // Only call notifyStopped if the executor.shutdown() returned, otherwise deadlock (TWILL-110) can occur.
- // Also, notifyStopped() should only be called once.
- if (executorStopped && stopNotified.compareAndSet(false, true)) {
- notifyStopped();
+ @Override
+ public void stopping(State from) {
+ if (from == State.STARTING) {
+ // If it is still starting, just notify that it's started to transit out of the STARTING phase.
+ notifyStarted();
}
}
- };
+
+ @Override
+ public void terminated(State from) {
+ // no-op
+ }
+
+ @Override
+ public void failed(State from, Throwable failure) {
+ closeZooKeeper(zooKeeper.getAndSet(null));
+ }
+ }, Threads.SAME_THREAD_EXECUTOR);
+ }
+
+ @Override
+ protected void doStart() {
+ // A single thread executor for all events
+ ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS,
+ new LinkedBlockingQueue<Runnable>(),
+ Threads.createDaemonThreadFactory("zk-client-EventThread"));
+ // Just discard the execution if the executor is closed
+ executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());
+ eventExecutor = executor;
try {
zooKeeper.set(createZooKeeper());
@@ -420,23 +440,18 @@ public final class DefaultZKClientService extends AbstractZKClient implements ZK
@Override
protected void doStop() {
- ZooKeeper zk = zooKeeper.getAndSet(null);
- if (zk != null) {
- try {
- zk.close();
- } catch (InterruptedException e) {
- notifyFailed(e);
- } finally {
- eventExecutor.shutdown();
- executorStopped = true;
-
- // If the executor state is terminated, meaning the terminate() method is triggered,
- // call notifyStopped() if it hasn't been called yet.
- if (eventExecutor.isTerminated() && stopNotified.compareAndSet(false, true)) {
+ eventExecutor.submit(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ closeZooKeeper(zooKeeper.getAndSet(null));
notifyStopped();
+ } catch (Exception e) {
+ notifyFailed(e);
}
}
- }
+ });
+ eventExecutor.shutdown();
}
@Override
@@ -451,23 +466,23 @@ public final class DefaultZKClientService extends AbstractZKClient implements ZK
LOG.info("ZooKeeper session expired: {}", zkStr);
// When connection expired, simply reconnect again
- Thread t = new Thread(new Runnable() {
+ eventExecutor.submit(new Runnable() {
@Override
public void run() {
+ if (state() != State.RUNNING) {
+ return;
+ }
try {
LOG.info("Reconnect to ZooKeeper due to expiration: {}", zkStr);
- zooKeeper.set(createZooKeeper());
+ closeZooKeeper(zooKeeper.getAndSet(createZooKeeper()));
} catch (IOException e) {
- zooKeeper.set(null);
notifyFailed(e);
}
}
- }, "zk-reconnect");
- t.setDaemon(true);
- t.start();
+ });
}
} finally {
- if (event.getType() == Event.EventType.None && !connectionWatchers.isEmpty()) {
+ if (event.getType() == Event.EventType.None) {
for (Watcher connectionWatcher : connectionWatchers) {
connectionWatcher.process(event);
}
@@ -479,12 +494,26 @@ public final class DefaultZKClientService extends AbstractZKClient implements ZK
* Creates a new ZooKeeper connection.
*/
private ZooKeeper createZooKeeper() throws IOException {
- ZooKeeper zk = new ZooKeeper(zkStr, sessionTimeout, this);
+ ZooKeeper zk = new ZooKeeper(zkStr, sessionTimeout, wrapWatcher(this));
for (Map.Entry<String, byte[]> authInfo : authInfos.entries()) {
zk.addAuthInfo(authInfo.getKey(), authInfo.getValue());
}
return zk;
}
+
+ /**
+ * Closes the given {@link ZooKeeper} if it is not null. If there is InterruptedException,
+ * it will get logged.
+ */
+ private void closeZooKeeper(@Nullable ZooKeeper zk) {
+ try {
+ if (zk != null) {
+ zk.close();
+ }
+ } catch (InterruptedException e) {
+ LOG.warn("Interrupted when closing ZooKeeper", e);
+ }
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/a50bbb60/twill-zookeeper/src/test/java/org/apache/twill/zookeeper/ZKClientTest.java
----------------------------------------------------------------------
diff --git a/twill-zookeeper/src/test/java/org/apache/twill/zookeeper/ZKClientTest.java b/twill-zookeeper/src/test/java/org/apache/twill/zookeeper/ZKClientTest.java
index 97dec03..9c7e788 100644
--- a/twill-zookeeper/src/test/java/org/apache/twill/zookeeper/ZKClientTest.java
+++ b/twill-zookeeper/src/test/java/org/apache/twill/zookeeper/ZKClientTest.java
@@ -40,6 +40,7 @@ import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
+import java.net.ServerSocket;
import java.security.NoSuchAlgorithmException;
import java.util.Arrays;
import java.util.List;
@@ -235,38 +236,40 @@ public class ZKClientTest {
}
}
}).build(), RetryStrategies.fixDelay(0, TimeUnit.SECONDS)));
- client.startAndWait();
-
- zkServer.stopAndWait();
-
- Assert.assertTrue(disconnectLatch.await(1, TimeUnit.SECONDS));
final CountDownLatch createLatch = new CountDownLatch(1);
- Futures.addCallback(client.create("/testretry/test", null, CreateMode.PERSISTENT), new FutureCallback<String>() {
- @Override
- public void onSuccess(String result) {
- createLatch.countDown();
- }
+ client.startAndWait();
+ try {
+ zkServer.stopAndWait();
- @Override
- public void onFailure(Throwable t) {
- t.printStackTrace(System.out);
- }
- });
-
- TimeUnit.SECONDS.sleep(2);
- zkServer = InMemoryZKServer.builder()
- .setDataDir(dataDir)
- .setAutoCleanDataDir(true)
- .setPort(port)
- .setTickTime(1000)
- .build();
- zkServer.startAndWait();
+ Assert.assertTrue(disconnectLatch.await(1, TimeUnit.SECONDS));
+ Futures.addCallback(client.create("/testretry/test", null, CreateMode.PERSISTENT), new FutureCallback<String>() {
+ @Override
+ public void onSuccess(String result) {
+ createLatch.countDown();
+ }
- try {
- Assert.assertTrue(createLatch.await(10, TimeUnit.SECONDS));
+ @Override
+ public void onFailure(Throwable t) {
+ t.printStackTrace(System.out);
+ }
+ });
+
+ TimeUnit.SECONDS.sleep(2);
+ zkServer = InMemoryZKServer.builder()
+ .setDataDir(dataDir)
+ .setAutoCleanDataDir(true)
+ .setPort(port)
+ .setTickTime(1000)
+ .build();
+ zkServer.startAndWait();
+ try {
+ Assert.assertTrue(createLatch.await(10, TimeUnit.SECONDS));
+ } finally {
+ zkServer.stopAndWait();
+ }
} finally {
- zkServer.stopAndWait();
+ client.stopAndWait();
}
}
@@ -353,4 +356,32 @@ public class ZKClientTest {
zkServer.stopAndWait();
}
}
+
+ @Test
+ public void testStop() throws IOException, InterruptedException, ExecutionException {
+ try (final ServerSocket serverSocket = new ServerSocket(0)){
+ // A latch to make sure at least one connection attempt from the zk client has been made
+ final CountDownLatch connectLatch = new CountDownLatch(1);
+ Thread serverThread = new Thread() {
+ public void run() {
+ try {
+ while (!interrupted()) {
+ serverSocket.accept().close();
+ connectLatch.countDown();
+ }
+ } catch (Exception e) {
+ // no-op
+ }
+ }
+ };
+ serverThread.start();
+
+ ZKClientService zkClient = ZKClientService.Builder.of("localhost:" + serverSocket.getLocalPort()).build();
+ zkClient.start();
+ Assert.assertTrue(connectLatch.await(10, TimeUnit.SECONDS));
+
+ zkClient.stopAndWait();
+ serverThread.interrupt();
+ }
+ }
}