You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@twill.apache.org by ch...@apache.org on 2015/05/23 02:47:05 UTC

[12/15] incubator-twill git commit: (TWILL-129) Fix race condition in leader election

(TWILL-129) Fix race condition in leader election

- If the node to watch is gone, re-run the leader election
- Use getData() instead of exists() to set the watch
  - Avoid “leaking” watches on node that is gone

Signed-off-by: Terence Yim <ch...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/incubator-twill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-twill/commit/c4463ee8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-twill/tree/c4463ee8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-twill/diff/c4463ee8

Branch: refs/heads/site
Commit: c4463ee8551b5f0ee0d41bd35d2236a958f4a925
Parents: beb8882
Author: Terence Yim <ch...@apache.org>
Authored: Thu May 21 23:03:36 2015 -0700
Committer: Terence Yim <ch...@apache.org>
Committed: Fri May 22 14:48:08 2015 -0700

----------------------------------------------------------------------
 .../zookeeper/DefaultZKClientService.java       |  4 ++
 .../internal/zookeeper/LeaderElection.java      | 11 +++--
 .../internal/zookeeper/LeaderElectionTest.java  | 49 ++++++++++++++++++++
 3 files changed, 59 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/c4463ee8/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/DefaultZKClientService.java
----------------------------------------------------------------------
diff --git a/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/DefaultZKClientService.java b/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/DefaultZKClientService.java
index fee874b..7b9b345 100644
--- a/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/DefaultZKClientService.java
+++ b/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/DefaultZKClientService.java
@@ -357,6 +357,10 @@ public final class DefaultZKClientService extends AbstractZKClient implements ZK
     return new Watcher() {
       @Override
       public void process(final WatchedEvent event) {
+        if (eventExecutor.isShutdown()) {
+          LOG.debug("Already shutdown. Discarding event: {}", event);
+          return;
+        }
         eventExecutor.execute(new Runnable() {
           @Override
           public void run() {

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/c4463ee8/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/LeaderElection.java
----------------------------------------------------------------------
diff --git a/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/LeaderElection.java b/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/LeaderElection.java
index de11f95..837a5ae 100644
--- a/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/LeaderElection.java
+++ b/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/LeaderElection.java
@@ -27,13 +27,13 @@ import com.google.common.util.concurrent.SettableFuture;
 import org.apache.twill.api.ElectionHandler;
 import org.apache.twill.common.Threads;
 import org.apache.twill.zookeeper.NodeChildren;
+import org.apache.twill.zookeeper.NodeData;
 import org.apache.twill.zookeeper.OperationFuture;
 import org.apache.twill.zookeeper.ZKClient;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.data.Stat;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -240,10 +240,10 @@ public final class LeaderElection extends AbstractService {
    * Starts watching for the max. of smaller node.
    */
   private void watchNode(final String nodePath, Watcher watcher) {
-    OperationFuture<Stat> watchFuture = zkClient.exists(nodePath, watcher);
-    Futures.addCallback(watchFuture, new FutureCallback<Stat>() {
+    OperationFuture<NodeData> watchFuture = zkClient.getData(nodePath, watcher);
+    Futures.addCallback(watchFuture, new FutureCallback<NodeData>() {
       @Override
-      public void onSuccess(Stat result) {
+      public void onSuccess(NodeData nodeData) {
         if (state != State.CANCELLED) {
           becomeFollower();
         }
@@ -251,7 +251,8 @@ public final class LeaderElection extends AbstractService {
 
       @Override
       public void onFailure(Throwable t) {
-        LOG.warn("Exception while setting watch on node {}. Retry.", nodePath, t);
+        // On any kind of failure, just rerun the election.
+        LOG.debug("Exception while setting watch on node {}. Retry.", nodePath, t);
         runElection();
       }
     }, executor);

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/c4463ee8/twill-zookeeper/src/test/java/org/apache/twill/internal/zookeeper/LeaderElectionTest.java
----------------------------------------------------------------------
diff --git a/twill-zookeeper/src/test/java/org/apache/twill/internal/zookeeper/LeaderElectionTest.java b/twill-zookeeper/src/test/java/org/apache/twill/internal/zookeeper/LeaderElectionTest.java
index 847b149..2d4b5d5 100644
--- a/twill-zookeeper/src/test/java/org/apache/twill/internal/zookeeper/LeaderElectionTest.java
+++ b/twill-zookeeper/src/test/java/org/apache/twill/internal/zookeeper/LeaderElectionTest.java
@@ -20,6 +20,7 @@ package org.apache.twill.internal.zookeeper;
 import com.google.common.collect.Lists;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.Uninterruptibles;
 import org.apache.twill.api.ElectionHandler;
 import org.apache.twill.zookeeper.ZKClientService;
 import org.junit.AfterClass;
@@ -278,6 +279,54 @@ public class LeaderElectionTest {
     }
   }
 
+  @Test
+  public void testRace() throws InterruptedException {
+    ExecutorService executor = Executors.newFixedThreadPool(2);
+    final AtomicInteger leaderCount = new AtomicInteger(0);
+    final CountDownLatch completeLatch = new CountDownLatch(2);
+
+    // Starts two threads and try to compete for leader and immediate drop leadership.
+    // This is to test the case when a follower tries to watch for leader node, but the leader is already gone
+    for (int i = 0; i < 2; i++) {
+      final ZKClientService zkClient = ZKClientService.Builder.of(zkServer.getConnectionStr()).build();
+      zkClient.startAndWait();
+      executor.execute(new Runnable() {
+        @Override
+        public void run() {
+          try {
+            for (int i = 0; i < 1000; i++) {
+              final CountDownLatch leaderLatch = new CountDownLatch(1);
+              LeaderElection election = new LeaderElection(zkClient, "/testRace", new ElectionHandler() {
+                @Override
+                public void leader() {
+                  leaderCount.incrementAndGet();
+                  leaderLatch.countDown();
+                }
+
+                @Override
+                public void follower() {
+                  // no-op
+                }
+              });
+              election.startAndWait();
+              Uninterruptibles.awaitUninterruptibly(leaderLatch);
+              election.stopAndWait();
+            }
+            completeLatch.countDown();
+          } finally {
+            zkClient.stopAndWait();
+          }
+        }
+      });
+    }
+
+    try {
+      Assert.assertTrue(completeLatch.await(2, TimeUnit.MINUTES));
+    } finally {
+      executor.shutdownNow();
+    }
+  }
+
   @BeforeClass
   public static void init() throws IOException {
     zkServer = InMemoryZKServer.builder().setDataDir(tmpFolder.newFolder()).build();