You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@curator.apache.org by ca...@apache.org on 2016/06/06 00:48:12 UTC

[12/13] curator git commit: If there is a network event after the semaphore's node is created but before getChildren() is called, the previous implementation would orphan the newly created node causing a deadlock later on

If there is a network event after the semaphore's node is created but before getChildren() is called, the previous implementation would orphan the newly created node causing a deadlock later on


Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/8dc0283e
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/8dc0283e
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/8dc0283e

Branch: refs/heads/CURATOR-3.0
Commit: 8dc0283e02ed799d5d76303f370ccc6325662b83
Parents: 5584a61
Author: randgalt <ra...@apache.org>
Authored: Thu Jun 2 17:27:18 2016 -0500
Committer: randgalt <ra...@apache.org>
Committed: Thu Jun 2 17:27:18 2016 -0500

----------------------------------------------------------------------
 .../recipes/locks/InterProcessSemaphoreV2.java  |  24 ++-
 .../locks/TestInterProcessSemaphore.java        | 167 +++++++++++++++++--
 2 files changed, 173 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/curator/blob/8dc0283e/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/InterProcessSemaphoreV2.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/InterProcessSemaphoreV2.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/InterProcessSemaphoreV2.java
index 3d96be2..2b9d48d 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/InterProcessSemaphoreV2.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/InterProcessSemaphoreV2.java
@@ -44,6 +44,7 @@ import java.util.Arrays;
 import java.util.Collection;
 import java.util.List;
 import java.util.Set;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import org.apache.curator.utils.PathUtils;
 
@@ -327,6 +328,9 @@ public class InterProcessSemaphoreV2
         RETRY_DUE_TO_MISSING_NODE
     }
 
+    static volatile CountDownLatch debugAcquireLatch = null;
+    static volatile CountDownLatch debugFailedGetChildrenLatch = null;
+
     private InternalAcquireResult internalAcquire1Lease(ImmutableList.Builder<Lease> builder, long startMs, boolean hasWait, long waitMs) throws Exception
     {
         if ( client.getState() != CuratorFrameworkState.STARTED )
@@ -356,11 +360,29 @@ public class InterProcessSemaphoreV2
             String nodeName = ZKPaths.getNodeFromPath(path);
             lease = makeLease(path);
 
+            if ( debugAcquireLatch != null )
+            {
+                debugAcquireLatch.await();
+            }
+
             synchronized(this)
             {
                 for(;;)
                 {
-                    List<String> children = client.getChildren().usingWatcher(watcher).forPath(leasesPath);
+                    List<String> children;
+                    try
+                    {
+                        children = client.getChildren().usingWatcher(watcher).forPath(leasesPath);
+                    }
+                    catch ( Exception e )
+                    {
+                        if ( debugFailedGetChildrenLatch != null )
+                        {
+                            debugFailedGetChildrenLatch.countDown();
+                        }
+                        returnLease(lease); // otherwise the just created ZNode will be orphaned causing a dead lock
+                        throw e;
+                    }
                     if ( !children.contains(nodeName) )
                     {
                         log.error("Sequential path not found: " + path);

http://git-wip-us.apache.org/repos/asf/curator/blob/8dc0283e/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessSemaphore.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessSemaphore.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessSemaphore.java
index ad45d90..216c2a2 100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessSemaphore.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessSemaphore.java
@@ -20,14 +20,19 @@
 package org.apache.curator.framework.recipes.locks;
 
 import com.google.common.collect.Lists;
-import org.apache.curator.framework.api.CuratorWatcher;
-import org.apache.curator.test.BaseClassForTests;
-import org.apache.curator.utils.CloseableUtils;
+import com.google.common.collect.Queues;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.framework.api.CuratorWatcher;
 import org.apache.curator.framework.recipes.shared.SharedCount;
+import org.apache.curator.framework.state.ConnectionState;
+import org.apache.curator.framework.state.ConnectionStateListener;
+import org.apache.curator.retry.RetryNTimes;
 import org.apache.curator.retry.RetryOneTime;
+import org.apache.curator.test.BaseClassForTests;
 import org.apache.curator.test.Timing;
+import org.apache.curator.utils.CloseableUtils;
+import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.Watcher;
 import org.testng.Assert;
@@ -35,6 +40,7 @@ import org.testng.annotations.Test;
 import java.util.Collection;
 import java.util.List;
 import java.util.Random;
+import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorCompletionService;
@@ -49,6 +55,126 @@ import java.util.concurrent.atomic.AtomicInteger;
 public class TestInterProcessSemaphore extends BaseClassForTests
 {
     @Test
+    public void testAcquireAfterLostServer() throws Exception
+    {
+        // CURATOR-335
+
+        final String SEMAPHORE_PATH = "/test";
+        final int MAX_SEMAPHORES = 1;
+        final int NUM_CLIENTS = 10;
+
+        ExecutorService executor = Executors.newFixedThreadPool(NUM_CLIENTS);
+
+        final Timing timing = new Timing();
+
+        final CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.forWaiting().milliseconds(), timing.connection(), new RetryOneTime(1));  // long session time on purpose
+        try
+        {
+            client.start();
+
+            InterProcessSemaphoreV2.debugAcquireLatch = new CountDownLatch(1);  // cause one of the semaphores to create its node and then wait
+            InterProcessSemaphoreV2.debugFailedGetChildrenLatch = new CountDownLatch(1);    // semaphore will notify when getChildren() fails
+            final CountDownLatch isReadyLatch = new CountDownLatch(NUM_CLIENTS);
+            final BlockingQueue<Boolean> acquiredQueue = Queues.newLinkedBlockingQueue();
+            Runnable runner = new Runnable()
+            {
+                @Override
+                public void run()
+                {
+                    while ( !Thread.currentThread().isInterrupted() )
+                    {
+                        InterProcessSemaphoreV2 semaphore = new InterProcessSemaphoreV2(client, SEMAPHORE_PATH, MAX_SEMAPHORES);
+                        Lease lease = null;
+                        try
+                        {
+                            isReadyLatch.countDown();
+                            lease = semaphore.acquire();
+                            acquiredQueue.add(true);
+                            timing.sleepABit();
+                        }
+                        catch ( InterruptedException e )
+                        {
+                            Thread.currentThread().interrupt();
+                            break;
+                        }
+                        catch ( KeeperException e )
+                        {
+                            try
+                            {
+                                timing.sleepABit();
+                            }
+                            catch ( InterruptedException e2 )
+                            {
+                                Thread.currentThread().interrupt();
+                                break;
+                            }
+                        }
+                        catch ( Exception ignore )
+                        {
+                            // ignore
+                        }
+                        finally
+                        {
+                            if ( lease != null )
+                            {
+                                semaphore.returnLease(lease);
+                            }
+                        }
+                    }
+                }
+            };
+            for ( int i = 0; i < NUM_CLIENTS; ++i )
+            {
+                executor.execute(runner);
+            }
+            Assert.assertTrue(timing.awaitLatch(isReadyLatch));
+            timing.sleepABit();
+
+            final CountDownLatch lostLatch = new CountDownLatch(1);
+            final CountDownLatch restartedLatch = new CountDownLatch(1);
+            client.getConnectionStateListenable().addListener(new ConnectionStateListener()
+            {
+                @Override
+                public void stateChanged(CuratorFramework client, ConnectionState newState)
+                {
+                    if ( newState == ConnectionState.LOST )
+                    {
+                        lostLatch.countDown();
+                    }
+                    else if ( newState == ConnectionState.RECONNECTED  )
+                    {
+                        restartedLatch.countDown();
+                    }
+                }
+            });
+
+            timing.sleepABit();
+            server.stop();
+            Assert.assertTrue(timing.awaitLatch(lostLatch));
+            InterProcessSemaphoreV2.debugAcquireLatch.countDown();  // the waiting semaphore proceeds to getChildren - which should fail
+            Assert.assertTrue(timing.awaitLatch(InterProcessSemaphoreV2.debugFailedGetChildrenLatch));  // wait until getChildren fails
+
+            server.restart();
+
+            Assert.assertTrue(timing.awaitLatch(restartedLatch));
+            for ( int i = 0; i < NUM_CLIENTS; ++i )
+            {
+                // acquires should continue as normal after server restart
+                Boolean polled = acquiredQueue.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS);
+                if ( (polled == null) || !polled )
+                {
+                    Assert.fail("Semaphores not reacquired after restart");
+                }
+            }
+        }
+        finally
+        {
+            executor.shutdownNow();
+            CloseableUtils.closeQuietly(client);
+        }
+    }
+
+    @Test
     public void testThreadedLeaseIncrease() throws Exception
     {
         final Timing timing = new Timing();
@@ -551,13 +677,13 @@ public class TestInterProcessSemaphore extends BaseClassForTests
             Assert.assertTrue(client.getChildren().forPath("/test").size() > 0);
 
             childReaper = new ChildReaper(
-                    client,
-                    "/test",
-                    Reaper.Mode.REAP_UNTIL_GONE,
-                    ChildReaper.newExecutorService(),
-                    1,
-                    "/test-leader",
-                    InterProcessSemaphoreV2.LOCK_SCHEMA
+                client,
+                "/test",
+                Reaper.Mode.REAP_UNTIL_GONE,
+                ChildReaper.newExecutorService(),
+                1,
+                "/test-leader",
+                InterProcessSemaphoreV2.LOCK_SCHEMA
             );
             childReaper.start();
 
@@ -591,18 +717,23 @@ public class TestInterProcessSemaphore extends BaseClassForTests
             Assert.assertEquals(childNodes.size(), 1);
 
             final CountDownLatch nodeCreatedLatch = new CountDownLatch(1);
-            client.getChildren().usingWatcher(new CuratorWatcher() {
+            client.getChildren().usingWatcher(new CuratorWatcher()
+            {
                 @Override
-                public void process(WatchedEvent event) throws Exception {
-                    if (event.getType() == Watcher.Event.EventType.NodeCreated) {
+                public void process(WatchedEvent event) throws Exception
+                {
+                    if ( event.getType() == Watcher.Event.EventType.NodeCreated )
+                    {
                         nodeCreatedLatch.countDown();
                     }
                 }
             }).forPath("/test/leases");
 
-            final Future<Lease> leaseFuture = executor.submit(new Callable<Lease>() {
+            final Future<Lease> leaseFuture = executor.submit(new Callable<Lease>()
+            {
                 @Override
-                public Lease call() throws Exception {
+                public Lease call() throws Exception
+                {
                     return semaphore.acquire(timing.forWaiting().multiple(2).seconds(), TimeUnit.SECONDS);
                 }
             });
@@ -610,8 +741,10 @@ public class TestInterProcessSemaphore extends BaseClassForTests
             // wait for second lease to create its node
             timing.awaitLatch(nodeCreatedLatch);
             String newNode = null;
-            for (String c : client.getChildren().forPath("/test/leases")) {
-                if (!childNodes.contains(c)) {
+            for ( String c : client.getChildren().forPath("/test/leases") )
+            {
+                if ( !childNodes.contains(c) )
+                {
                     newNode = c;
                 }
             }