You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@twill.apache.org by ch...@apache.org on 2015/05/23 02:47:05 UTC
[12/15] incubator-twill git commit: (TWILL-129) Fix race condition in
leader election
(TWILL-129) Fix race condition in leader election
- If the node to watch is gone, re-run the leader election
- Use getData() instead of exists() to set the watch
- Avoid “leaking” watches on node that is gone
Signed-off-by: Terence Yim <ch...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/incubator-twill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-twill/commit/c4463ee8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-twill/tree/c4463ee8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-twill/diff/c4463ee8
Branch: refs/heads/site
Commit: c4463ee8551b5f0ee0d41bd35d2236a958f4a925
Parents: beb8882
Author: Terence Yim <ch...@apache.org>
Authored: Thu May 21 23:03:36 2015 -0700
Committer: Terence Yim <ch...@apache.org>
Committed: Fri May 22 14:48:08 2015 -0700
----------------------------------------------------------------------
.../zookeeper/DefaultZKClientService.java | 4 ++
.../internal/zookeeper/LeaderElection.java | 11 +++--
.../internal/zookeeper/LeaderElectionTest.java | 49 ++++++++++++++++++++
3 files changed, 59 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/c4463ee8/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/DefaultZKClientService.java
----------------------------------------------------------------------
diff --git a/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/DefaultZKClientService.java b/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/DefaultZKClientService.java
index fee874b..7b9b345 100644
--- a/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/DefaultZKClientService.java
+++ b/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/DefaultZKClientService.java
@@ -357,6 +357,10 @@ public final class DefaultZKClientService extends AbstractZKClient implements ZK
return new Watcher() {
@Override
public void process(final WatchedEvent event) {
+ if (eventExecutor.isShutdown()) {
+ LOG.debug("Already shutdown. Discarding event: {}", event);
+ return;
+ }
eventExecutor.execute(new Runnable() {
@Override
public void run() {
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/c4463ee8/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/LeaderElection.java
----------------------------------------------------------------------
diff --git a/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/LeaderElection.java b/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/LeaderElection.java
index de11f95..837a5ae 100644
--- a/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/LeaderElection.java
+++ b/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/LeaderElection.java
@@ -27,13 +27,13 @@ import com.google.common.util.concurrent.SettableFuture;
import org.apache.twill.api.ElectionHandler;
import org.apache.twill.common.Threads;
import org.apache.twill.zookeeper.NodeChildren;
+import org.apache.twill.zookeeper.NodeData;
import org.apache.twill.zookeeper.OperationFuture;
import org.apache.twill.zookeeper.ZKClient;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -240,10 +240,10 @@ public final class LeaderElection extends AbstractService {
* Starts watching for the max. of smaller node.
*/
private void watchNode(final String nodePath, Watcher watcher) {
- OperationFuture<Stat> watchFuture = zkClient.exists(nodePath, watcher);
- Futures.addCallback(watchFuture, new FutureCallback<Stat>() {
+ OperationFuture<NodeData> watchFuture = zkClient.getData(nodePath, watcher);
+ Futures.addCallback(watchFuture, new FutureCallback<NodeData>() {
@Override
- public void onSuccess(Stat result) {
+ public void onSuccess(NodeData nodeData) {
if (state != State.CANCELLED) {
becomeFollower();
}
@@ -251,7 +251,8 @@ public final class LeaderElection extends AbstractService {
@Override
public void onFailure(Throwable t) {
- LOG.warn("Exception while setting watch on node {}. Retry.", nodePath, t);
+ // On any kind of failure, just rerun the election.
+ LOG.debug("Exception while setting watch on node {}. Retry.", nodePath, t);
runElection();
}
}, executor);
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/c4463ee8/twill-zookeeper/src/test/java/org/apache/twill/internal/zookeeper/LeaderElectionTest.java
----------------------------------------------------------------------
diff --git a/twill-zookeeper/src/test/java/org/apache/twill/internal/zookeeper/LeaderElectionTest.java b/twill-zookeeper/src/test/java/org/apache/twill/internal/zookeeper/LeaderElectionTest.java
index 847b149..2d4b5d5 100644
--- a/twill-zookeeper/src/test/java/org/apache/twill/internal/zookeeper/LeaderElectionTest.java
+++ b/twill-zookeeper/src/test/java/org/apache/twill/internal/zookeeper/LeaderElectionTest.java
@@ -20,6 +20,7 @@ package org.apache.twill.internal.zookeeper;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.Uninterruptibles;
import org.apache.twill.api.ElectionHandler;
import org.apache.twill.zookeeper.ZKClientService;
import org.junit.AfterClass;
@@ -278,6 +279,54 @@ public class LeaderElectionTest {
}
}
+ @Test
+ public void testRace() throws InterruptedException {
+ ExecutorService executor = Executors.newFixedThreadPool(2);
+ final AtomicInteger leaderCount = new AtomicInteger(0);
+ final CountDownLatch completeLatch = new CountDownLatch(2);
+
+ // Starts two threads and try to compete for leader and immediate drop leadership.
+ // This is to test the case when a follower tries to watch for leader node, but the leader is already gone
+ for (int i = 0; i < 2; i++) {
+ final ZKClientService zkClient = ZKClientService.Builder.of(zkServer.getConnectionStr()).build();
+ zkClient.startAndWait();
+ executor.execute(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ for (int i = 0; i < 1000; i++) {
+ final CountDownLatch leaderLatch = new CountDownLatch(1);
+ LeaderElection election = new LeaderElection(zkClient, "/testRace", new ElectionHandler() {
+ @Override
+ public void leader() {
+ leaderCount.incrementAndGet();
+ leaderLatch.countDown();
+ }
+
+ @Override
+ public void follower() {
+ // no-op
+ }
+ });
+ election.startAndWait();
+ Uninterruptibles.awaitUninterruptibly(leaderLatch);
+ election.stopAndWait();
+ }
+ completeLatch.countDown();
+ } finally {
+ zkClient.stopAndWait();
+ }
+ }
+ });
+ }
+
+ try {
+ Assert.assertTrue(completeLatch.await(2, TimeUnit.MINUTES));
+ } finally {
+ executor.shutdownNow();
+ }
+ }
+
@BeforeClass
public static void init() throws IOException {
zkServer = InMemoryZKServer.builder().setDataDir(tmpFolder.newFolder()).build();