You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@twill.apache.org by ch...@apache.org on 2014/12/01 23:14:54 UTC

incubator-twill git commit: (TWILL-110) added state guard to prevent possible race-condition when shutting down zk client, which causes deadlock.

Repository: incubator-twill
Updated Branches:
  refs/heads/feature/fix-zk-client-shutdown-deadlock [created] d84c369ba


(TWILL-110) added state guard to prevent possible race-condition when shutting down zk client, which causes deadlock.


Project: http://git-wip-us.apache.org/repos/asf/incubator-twill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-twill/commit/d84c369b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-twill/tree/d84c369b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-twill/diff/d84c369b

Branch: refs/heads/feature/fix-zk-client-shutdown-deadlock
Commit: d84c369ba2d8c78c249753546ca80a7a741e8234
Parents: 8db2d47
Author: Terence Yim <ch...@apache.org>
Authored: Mon Dec 1 14:12:11 2014 -0800
Committer: Terence Yim <ch...@apache.org>
Committed: Mon Dec 1 14:15:12 2014 -0800

----------------------------------------------------------------------
 .../zookeeper/DefaultZKClientService.java       | 20 ++++++++++++--
 .../apache/twill/zookeeper/ZKClientTest.java    | 28 ++++++++++++++++++++
 2 files changed, 46 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/d84c369b/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/DefaultZKClientService.java
----------------------------------------------------------------------
diff --git a/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/DefaultZKClientService.java b/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/DefaultZKClientService.java
index 7c9bd08..dd04b4d 100644
--- a/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/DefaultZKClientService.java
+++ b/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/DefaultZKClientService.java
@@ -57,6 +57,7 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 import javax.annotation.Nullable;
 
@@ -371,6 +372,9 @@ public final class DefaultZKClientService extends AbstractZKClient implements ZK
 
   private final class ServiceDelegate extends AbstractService implements Watcher {
 
+    private final AtomicBoolean notifyStopped = new AtomicBoolean(false);
+    private volatile boolean executorStopped;
+
     @Override
     protected void doStart() {
       // A single thread executor
@@ -379,7 +383,12 @@ public final class DefaultZKClientService extends AbstractZKClient implements ZK
         @Override
         protected void terminated() {
           super.terminated();
-          notifyStopped();
+
+          // Only call notifyStopped if the executor.shutdown() returned, otherwise deadlock (TWILL-110) can occur.
+          // Also, notifyStopped() should only be called once.
+          if (executorStopped && notifyStopped.compareAndSet(false, true)) {
+            notifyStopped();
+          }
         }
       };
 
@@ -400,6 +409,13 @@ public final class DefaultZKClientService extends AbstractZKClient implements ZK
           notifyFailed(e);
         } finally {
           eventExecutor.shutdown();
+          executorStopped = true;
+
+          // If the executor state is terminated, meaning the terminate() method is triggered,
+          // call notifyStopped() if it hasn't been called yet.
+          if (eventExecutor.isTerminated() && notifyStopped.compareAndSet(false, true)) {
+            notifyStopped();
+          }
         }
       }
     }
@@ -408,7 +424,7 @@ public final class DefaultZKClientService extends AbstractZKClient implements ZK
     public void process(WatchedEvent event) {
       try {
         if (event.getState() == Event.KeeperState.SyncConnected && state() == State.STARTING) {
-          LOG.info("Connected to ZooKeeper: " + zkStr);
+          LOG.debug("Connected to ZooKeeper: " + zkStr);
           notifyStarted();
           return;
         }

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/d84c369b/twill-zookeeper/src/test/java/org/apache/twill/zookeeper/ZKClientTest.java
----------------------------------------------------------------------
diff --git a/twill-zookeeper/src/test/java/org/apache/twill/zookeeper/ZKClientTest.java b/twill-zookeeper/src/test/java/org/apache/twill/zookeeper/ZKClientTest.java
index b0c7507..b241436 100644
--- a/twill-zookeeper/src/test/java/org/apache/twill/zookeeper/ZKClientTest.java
+++ b/twill-zookeeper/src/test/java/org/apache/twill/zookeeper/ZKClientTest.java
@@ -35,6 +35,8 @@ import org.junit.Assert;
 import org.junit.ClassRule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.io.IOException;
@@ -54,6 +56,8 @@ import java.util.concurrent.atomic.AtomicBoolean;
  */
 public class ZKClientTest {
 
+  private static final Logger LOG = LoggerFactory.getLogger(ZKClientTest.class);
+
   @ClassRule
   public static TemporaryFolder tmpFolder = new TemporaryFolder();
 
@@ -320,4 +324,28 @@ public class ZKClientTest {
       zkServer.stopAndWait();
     }
   }
+
+  @Test (timeout = 120000L)
+  public void testDeadlock() throws IOException, InterruptedException {
+    // This is to test deadlock bug as described in (TWILL-110)
+    // This test has very high chance to get deadlock, hence failed with timeout, when running under JDK7.
+    InMemoryZKServer zkServer = InMemoryZKServer.builder().setDataDir(tmpFolder.newFolder()).build();
+    zkServer.startAndWait();
+    try {
+      for (int i = 0; i < 5000; i++) {
+        final ZKClientService zkClient = ZKClientService.Builder.of(zkServer.getConnectionStr()).build();
+        zkClient.addConnectionWatcher(new Watcher() {
+          @Override
+          public void process(WatchedEvent event) {
+            LOG.debug("Connection event: {}", event);
+          }
+        });
+        zkClient.startAndWait();
+        zkClient.stopAndWait();
+      }
+
+    } finally {
+      zkServer.stopAndWait();
+    }
+  }
 }