You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@twill.apache.org by ch...@apache.org on 2015/06/15 19:22:26 UTC
[1/2] incubator-twill git commit: (TWILL-133) Make the ZKClient
stoppable even it is still trying to connect
Repository: incubator-twill
Updated Branches:
refs/heads/feature/twill-133 [created] 74720b804
(TWILL-133) Make the ZKClient stoppable even it is still trying to connect
- Also include refactoring of the DefaultZKClient class to simplify
shutdown logic
Project: http://git-wip-us.apache.org/repos/asf/incubator-twill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-twill/commit/bda99086
Tree: http://git-wip-us.apache.org/repos/asf/incubator-twill/tree/bda99086
Diff: http://git-wip-us.apache.org/repos/asf/incubator-twill/diff/bda99086
Branch: refs/heads/feature/twill-133
Commit: bda99086633241bea8c999ec7833eddd01a9e341
Parents: 0d3c5ee
Author: Terence Yim <ch...@apache.org>
Authored: Fri Jun 12 17:13:05 2015 -0700
Committer: Terence Yim <ch...@apache.org>
Committed: Fri Jun 12 17:21:35 2015 -0700
----------------------------------------------------------------------
.../zookeeper/DefaultZKClientService.java | 119 ++++++++++++-------
.../apache/twill/zookeeper/ZKClientTest.java | 85 ++++++++-----
2 files changed, 134 insertions(+), 70 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/bda99086/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/DefaultZKClientService.java
----------------------------------------------------------------------
diff --git a/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/DefaultZKClientService.java b/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/DefaultZKClientService.java
index 7b9b345..71a0118 100644
--- a/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/DefaultZKClientService.java
+++ b/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/DefaultZKClientService.java
@@ -58,7 +58,6 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import javax.annotation.Nullable;
@@ -91,11 +90,11 @@ public final class DefaultZKClientService extends AbstractZKClient implements ZK
Watcher connectionWatcher, Multimap<String, byte[]> authInfos) {
this.zkStr = zkStr;
this.sessionTimeout = sessionTimeout;
- this.connectionWatchers = new CopyOnWriteArrayList<Watcher>();
+ this.connectionWatchers = new CopyOnWriteArrayList<>();
this.authInfos = copyAuthInfo(authInfos);
addConnectionWatcher(connectionWatcher);
- this.zooKeeper = new AtomicReference<ZooKeeper>();
+ this.zooKeeper = new AtomicReference<>();
serviceDelegate = new ServiceDelegate();
}
@@ -111,7 +110,7 @@ public final class DefaultZKClientService extends AbstractZKClient implements ZK
}
@Override
- public Cancellable addConnectionWatcher(Watcher watcher) {
+ public Cancellable addConnectionWatcher(final Watcher watcher) {
if (watcher == null) {
return new Cancellable() {
@Override
@@ -121,12 +120,13 @@ public final class DefaultZKClientService extends AbstractZKClient implements ZK
};
}
- final Watcher wrappedWatcher = wrapWatcher(watcher);
- connectionWatchers.add(wrappedWatcher);
+ // Invocation of connection watchers are already done inside the event thread,
+ // hence no need to wrap the watcher again.
+ connectionWatchers.add(watcher);
return new Cancellable() {
@Override
public void cancel() {
- connectionWatchers.remove(wrappedWatcher);
+ connectionWatchers.remove(watcher);
}
};
}
@@ -391,25 +391,49 @@ public final class DefaultZKClientService extends AbstractZKClient implements ZK
private final class ServiceDelegate extends AbstractService implements Watcher {
- private final AtomicBoolean stopNotified = new AtomicBoolean(false);
- private volatile boolean executorStopped;
+ private ServiceDelegate() {
+ // Add a listener for state changes so that we can terminate the service even it is in STARTING state upoon
+ // stop is requested
+ addListener(new Listener() {
+ @Override
+ public void starting() {
+ // no-op
+ }
- @Override
- protected void doStart() {
- // A single thread executor
- eventExecutor = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(),
- Threads.createDaemonThreadFactory("zk-client-EventThread")) {
@Override
- protected void terminated() {
- super.terminated();
+ public void running() {
+ // no-op
+ }
- // Only call notifyStopped if the executor.shutdown() returned, otherwise deadlock (TWILL-110) can occur.
- // Also, notifyStopped() should only be called once.
- if (executorStopped && stopNotified.compareAndSet(false, true)) {
- notifyStopped();
+ @Override
+ public void stopping(State from) {
+ if (from == State.STARTING) {
+ // If it is still starting, just notify that it's started to transit out of the STARTING phase.
+ notifyStarted();
}
}
- };
+
+ @Override
+ public void terminated(State from) {
+ // no-op
+ }
+
+ @Override
+ public void failed(State from, Throwable failure) {
+ closeZooKeeper(zooKeeper.getAndSet(null));
+ }
+ }, Threads.SAME_THREAD_EXECUTOR);
+ }
+
+ @Override
+ protected void doStart() {
+ // A single thread executor for all events
+ ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS,
+ new LinkedBlockingQueue<Runnable>(),
+ Threads.createDaemonThreadFactory("zk-client-EventThread"));
+ // Just discard the execution if the executor is closed
+ executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());
+ eventExecutor = executor;
try {
zooKeeper.set(createZooKeeper());
@@ -420,23 +444,18 @@ public final class DefaultZKClientService extends AbstractZKClient implements ZK
@Override
protected void doStop() {
- ZooKeeper zk = zooKeeper.getAndSet(null);
- if (zk != null) {
- try {
- zk.close();
- } catch (InterruptedException e) {
- notifyFailed(e);
- } finally {
- eventExecutor.shutdown();
- executorStopped = true;
-
- // If the executor state is terminated, meaning the terminate() method is triggered,
- // call notifyStopped() if it hasn't been called yet.
- if (eventExecutor.isTerminated() && stopNotified.compareAndSet(false, true)) {
+ eventExecutor.submit(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ closeZooKeeper(zooKeeper.getAndSet(null));
notifyStopped();
+ } catch (Exception e) {
+ notifyFailed(e);
}
}
- }
+ });
+ eventExecutor.shutdown();
}
@Override
@@ -451,23 +470,23 @@ public final class DefaultZKClientService extends AbstractZKClient implements ZK
LOG.info("ZooKeeper session expired: {}", zkStr);
// When connection expired, simply reconnect again
- Thread t = new Thread(new Runnable() {
+ eventExecutor.submit(new Runnable() {
@Override
public void run() {
+ if (state() != State.RUNNING) {
+ return;
+ }
try {
LOG.info("Reconnect to ZooKeeper due to expiration: {}", zkStr);
- zooKeeper.set(createZooKeeper());
+ closeZooKeeper(zooKeeper.getAndSet(createZooKeeper()));
} catch (IOException e) {
- zooKeeper.set(null);
notifyFailed(e);
}
}
- }, "zk-reconnect");
- t.setDaemon(true);
- t.start();
+ });
}
} finally {
- if (event.getType() == Event.EventType.None && !connectionWatchers.isEmpty()) {
+ if (event.getType() == Event.EventType.None) {
for (Watcher connectionWatcher : connectionWatchers) {
connectionWatcher.process(event);
}
@@ -479,12 +498,26 @@ public final class DefaultZKClientService extends AbstractZKClient implements ZK
* Creates a new ZooKeeper connection.
*/
private ZooKeeper createZooKeeper() throws IOException {
- ZooKeeper zk = new ZooKeeper(zkStr, sessionTimeout, this);
+ ZooKeeper zk = new ZooKeeper(zkStr, sessionTimeout, wrapWatcher(this));
for (Map.Entry<String, byte[]> authInfo : authInfos.entries()) {
zk.addAuthInfo(authInfo.getKey(), authInfo.getValue());
}
return zk;
}
+
+ /**
+ * Closes the given {@link ZooKeeper} if it is not null. If there is InterruptedException,
+ * it will get logged.
+ */
+ private void closeZooKeeper(@Nullable ZooKeeper zk) {
+ try {
+ if (zk != null) {
+ zk.close();
+ }
+ } catch (InterruptedException e) {
+ LOG.warn("Interrupted when closing ZooKeeper", e);
+ }
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/bda99086/twill-zookeeper/src/test/java/org/apache/twill/zookeeper/ZKClientTest.java
----------------------------------------------------------------------
diff --git a/twill-zookeeper/src/test/java/org/apache/twill/zookeeper/ZKClientTest.java b/twill-zookeeper/src/test/java/org/apache/twill/zookeeper/ZKClientTest.java
index 97dec03..162d4db 100644
--- a/twill-zookeeper/src/test/java/org/apache/twill/zookeeper/ZKClientTest.java
+++ b/twill-zookeeper/src/test/java/org/apache/twill/zookeeper/ZKClientTest.java
@@ -40,6 +40,7 @@ import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
+import java.net.ServerSocket;
import java.security.NoSuchAlgorithmException;
import java.util.Arrays;
import java.util.List;
@@ -235,38 +236,40 @@ public class ZKClientTest {
}
}
}).build(), RetryStrategies.fixDelay(0, TimeUnit.SECONDS)));
- client.startAndWait();
-
- zkServer.stopAndWait();
-
- Assert.assertTrue(disconnectLatch.await(1, TimeUnit.SECONDS));
final CountDownLatch createLatch = new CountDownLatch(1);
- Futures.addCallback(client.create("/testretry/test", null, CreateMode.PERSISTENT), new FutureCallback<String>() {
- @Override
- public void onSuccess(String result) {
- createLatch.countDown();
- }
+ client.startAndWait();
+ try {
+ zkServer.stopAndWait();
- @Override
- public void onFailure(Throwable t) {
- t.printStackTrace(System.out);
- }
- });
-
- TimeUnit.SECONDS.sleep(2);
- zkServer = InMemoryZKServer.builder()
- .setDataDir(dataDir)
- .setAutoCleanDataDir(true)
- .setPort(port)
- .setTickTime(1000)
- .build();
- zkServer.startAndWait();
+ Assert.assertTrue(disconnectLatch.await(1, TimeUnit.SECONDS));
+ Futures.addCallback(client.create("/testretry/test", null, CreateMode.PERSISTENT), new FutureCallback<String>() {
+ @Override
+ public void onSuccess(String result) {
+ createLatch.countDown();
+ }
- try {
- Assert.assertTrue(createLatch.await(10, TimeUnit.SECONDS));
+ @Override
+ public void onFailure(Throwable t) {
+ t.printStackTrace(System.out);
+ }
+ });
+
+ TimeUnit.SECONDS.sleep(2);
+ zkServer = InMemoryZKServer.builder()
+ .setDataDir(dataDir)
+ .setAutoCleanDataDir(true)
+ .setPort(port)
+ .setTickTime(1000)
+ .build();
+ zkServer.startAndWait();
+ try {
+ Assert.assertTrue(createLatch.await(10, TimeUnit.SECONDS));
+ } finally {
+ zkServer.stopAndWait();
+ }
} finally {
- zkServer.stopAndWait();
+ client.stopAndWait();
}
}
@@ -353,4 +356,32 @@ public class ZKClientTest {
zkServer.stopAndWait();
}
}
+
+ @Test
+ public void testStop() throws IOException, InterruptedException, ExecutionException {
+ try (final ServerSocket serverSocket = new ServerSocket(0)) {
+ // A latch to make sure at least one connection attempt from the zk client has been made
+ final CountDownLatch connectLatch = new CountDownLatch(1);
+ Thread serverThread = new Thread() {
+ public void run() {
+ try {
+ while (!interrupted()) {
+ serverSocket.accept().close();
+ connectLatch.countDown();
+ }
+ } catch (Exception e) {
+ // no-op
+ }
+ }
+ };
+ serverThread.start();
+
+ ZKClientService zkClient = ZKClientService.Builder.of("localhost:" + serverSocket.getLocalPort()).build();
+ zkClient.start();
+ Assert.assertTrue(connectLatch.await(10, TimeUnit.SECONDS));
+
+ zkClient.stopAndWait();
+ serverThread.interrupt();
+ }
+ }
}
[2/2] incubator-twill git commit: Better state handling and more
comments to explain some of the race conditions may arise during stopping of
the ZKClient.
Posted by ch...@apache.org.
Better state handling and more comments to explain some of the race conditions may arise during stopping of the ZKClient.
Project: http://git-wip-us.apache.org/repos/asf/incubator-twill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-twill/commit/74720b80
Tree: http://git-wip-us.apache.org/repos/asf/incubator-twill/tree/74720b80
Diff: http://git-wip-us.apache.org/repos/asf/incubator-twill/diff/74720b80
Branch: refs/heads/feature/twill-133
Commit: 74720b804e5216e106b2d0f1e9cf1dbbb2d6282b
Parents: bda9908
Author: Terence Yim <ch...@apache.org>
Authored: Mon Jun 15 10:22:17 2015 -0700
Committer: Terence Yim <ch...@apache.org>
Committed: Mon Jun 15 10:22:17 2015 -0700
----------------------------------------------------------------------
.../zookeeper/DefaultZKClientService.java | 57 +++++++++++++++-----
1 file changed, 45 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/74720b80/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/DefaultZKClientService.java
----------------------------------------------------------------------
diff --git a/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/DefaultZKClientService.java b/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/DefaultZKClientService.java
index 71a0118..6fae663 100644
--- a/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/DefaultZKClientService.java
+++ b/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/DefaultZKClientService.java
@@ -391,7 +391,13 @@ public final class DefaultZKClientService extends AbstractZKClient implements ZK
private final class ServiceDelegate extends AbstractService implements Watcher {
+ private final Runnable stopTask;
+
private ServiceDelegate() {
+ // Creates the stop task runnable in constructor so that if the stop() method is called from shutdown hook,
+ // it won't fail with class loading error due to failure to load inner class from the shutdown thread.
+ this.stopTask = createStopTask();
+
// Add a listener for state changes so that we can terminate the service even it is in STARTING state upoon
// stop is requested
addListener(new Listener() {
@@ -420,6 +426,8 @@ public final class DefaultZKClientService extends AbstractZKClient implements ZK
@Override
public void failed(State from, Throwable failure) {
+ eventExecutor.shutdownNow();
+ // Close the ZK client if there is exception. It is needed because the stop task may not get executed
closeZooKeeper(zooKeeper.getAndSet(null));
}
}, Threads.SAME_THREAD_EXECUTOR);
@@ -444,24 +452,21 @@ public final class DefaultZKClientService extends AbstractZKClient implements ZK
@Override
protected void doStop() {
- eventExecutor.submit(new Runnable() {
- @Override
- public void run() {
- try {
- closeZooKeeper(zooKeeper.getAndSet(null));
- notifyStopped();
- } catch (Exception e) {
- notifyFailed(e);
- }
- }
- });
+ // Submit a task to the executor to make sure all pending events in the executor are fired before
+ // transiting this Service into STOPPED state
+ eventExecutor.submit(stopTask);
eventExecutor.shutdown();
}
@Override
public void process(WatchedEvent event) {
+ State state = state();
+ if (state == State.TERMINATED || state == State.FAILED) {
+ return;
+ }
+
try {
- if (event.getState() == Event.KeeperState.SyncConnected && state() == State.STARTING) {
+ if (event.getState() == Event.KeeperState.SyncConnected && state == State.STARTING) {
LOG.debug("Connected to ZooKeeper: {}", zkStr);
notifyStarted();
return;
@@ -473,6 +478,7 @@ public final class DefaultZKClientService extends AbstractZKClient implements ZK
eventExecutor.submit(new Runnable() {
@Override
public void run() {
+ // Only reconnect if the current state is running
if (state() != State.RUNNING) {
return;
}
@@ -494,6 +500,33 @@ public final class DefaultZKClientService extends AbstractZKClient implements ZK
}
}
+
+ /**
+ * Creates a {@link Runnable} task that will get executed in the event executor for transiting this
+ * Service into STOPPED state.
+ */
+ private Runnable createStopTask() {
+ return new Runnable() {
+ @Override
+ public void run() {
+ try {
+ // Close the ZK connection in this task will make sure if there is ZK connection created
+ // after doStop() was called but before this task has been executed is also closed.
+ // It is possible to happen when the following sequence happens:
+ //
+ // 1. session expired, hence the expired event is triggered
+ // 2. The reconnect task executed. With Service.state() == RUNNING, it creates a new ZK client
+ // 3. Service.stop() gets called, Service.state() changed to STOPPING
+ // 4. The new ZK client created from the reconnect thread update the zooKeeper with the new one
+ closeZooKeeper(zooKeeper.getAndSet(null));
+ notifyStopped();
+ } catch (Exception e) {
+ notifyFailed(e);
+ }
+ }
+ };
+ }
+
/**
* Creates a new ZooKeeper connection.
*/