You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@curator.apache.org by ra...@apache.org on 2016/05/08 18:53:26 UTC

[1/4] curator git commit: Return leases before retrying aquire

Repository: curator
Updated Branches:
  refs/heads/CURATOR-3.0 e76eb590f -> 29906f1eb


Return leases before retrying aquire


Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/613a51be
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/613a51be
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/613a51be

Branch: refs/heads/CURATOR-3.0
Commit: 613a51be4535544e990d31e485674e237bd7e2da
Parents: 73cd00a
Author: Ulrich Geilmann <ug...@googlemail.com>
Authored: Mon Apr 11 15:32:29 2016 +0200
Committer: Ulrich Geilmann <ug...@googlemail.com>
Committed: Mon Apr 11 23:02:28 2016 +0200

----------------------------------------------------------------------
 .../recipes/locks/InterProcessSemaphoreV2.java  |  8 ++-
 .../locks/TestInterProcessSemaphore.java        | 66 ++++++++++++++++++++
 2 files changed, 73 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/curator/blob/613a51be/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/InterProcessSemaphoreV2.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/InterProcessSemaphoreV2.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/InterProcessSemaphoreV2.java
index 8524075..3d96be2 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/InterProcessSemaphoreV2.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/InterProcessSemaphoreV2.java
@@ -346,12 +346,15 @@ public class InterProcessSemaphoreV2
         {
             lock.acquire();
         }
+
+        Lease lease = null;
+
         try
         {
             PathAndBytesable<String> createBuilder = client.create().creatingParentContainersIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL);
             String path = (nodeData != null) ? createBuilder.forPath(ZKPaths.makePath(leasesPath, LEASE_BASE_NAME), nodeData) : createBuilder.forPath(ZKPaths.makePath(leasesPath, LEASE_BASE_NAME));
             String nodeName = ZKPaths.getNodeFromPath(path);
-            builder.add(makeLease(path));
+            lease = makeLease(path);
 
             synchronized(this)
             {
@@ -361,6 +364,7 @@ public class InterProcessSemaphoreV2
                     if ( !children.contains(nodeName) )
                     {
                         log.error("Sequential path not found: " + path);
+                        returnLease(lease);
                         return InternalAcquireResult.RETRY_DUE_TO_MISSING_NODE;
                     }
 
@@ -373,6 +377,7 @@ public class InterProcessSemaphoreV2
                         long thisWaitMs = getThisWaitMs(startMs, waitMs);
                         if ( thisWaitMs <= 0 )
                         {
+                            returnLease(lease);
                             return InternalAcquireResult.RETURN_NULL;
                         }
                         wait(thisWaitMs);
@@ -388,6 +393,7 @@ public class InterProcessSemaphoreV2
         {
             lock.release();
         }
+        builder.add(Preconditions.checkNotNull(lease));
         return InternalAcquireResult.CONTINUE;
     }
 

http://git-wip-us.apache.org/repos/asf/curator/blob/613a51be/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessSemaphore.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessSemaphore.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessSemaphore.java
index 631b7c7..ad45d90 100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessSemaphore.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessSemaphore.java
@@ -20,6 +20,7 @@
 package org.apache.curator.framework.recipes.locks;
 
 import com.google.common.collect.Lists;
+import org.apache.curator.framework.api.CuratorWatcher;
 import org.apache.curator.test.BaseClassForTests;
 import org.apache.curator.utils.CloseableUtils;
 import org.apache.curator.framework.CuratorFramework;
@@ -27,6 +28,8 @@ import org.apache.curator.framework.CuratorFrameworkFactory;
 import org.apache.curator.framework.recipes.shared.SharedCount;
 import org.apache.curator.retry.RetryOneTime;
 import org.apache.curator.test.Timing;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 import java.util.Collection;
@@ -571,4 +574,67 @@ public class TestInterProcessSemaphore extends BaseClassForTests
         }
 
     }
+
+    @Test
+    public void testNoOrphanedNodes() throws Exception
+    {
+        final Timing timing = new Timing();
+        final ExecutorService executor = Executors.newFixedThreadPool(1);
+        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
+        client.start();
+        try
+        {
+            final InterProcessSemaphoreV2 semaphore = new InterProcessSemaphoreV2(client, "/test", 1);
+            Lease lease = semaphore.acquire(timing.forWaiting().seconds(), TimeUnit.SECONDS);
+            Assert.assertNotNull(lease);
+            final List<String> childNodes = client.getChildren().forPath("/test/leases");
+            Assert.assertEquals(childNodes.size(), 1);
+
+            final CountDownLatch nodeCreatedLatch = new CountDownLatch(1);
+            client.getChildren().usingWatcher(new CuratorWatcher() {
+                @Override
+                public void process(WatchedEvent event) throws Exception {
+                    if (event.getType() == Watcher.Event.EventType.NodeCreated) {
+                        nodeCreatedLatch.countDown();
+                    }
+                }
+            }).forPath("/test/leases");
+
+            final Future<Lease> leaseFuture = executor.submit(new Callable<Lease>() {
+                @Override
+                public Lease call() throws Exception {
+                    return semaphore.acquire(timing.forWaiting().multiple(2).seconds(), TimeUnit.SECONDS);
+                }
+            });
+
+            // wait for second lease to create its node
+            timing.awaitLatch(nodeCreatedLatch);
+            String newNode = null;
+            for (String c : client.getChildren().forPath("/test/leases")) {
+                if (!childNodes.contains(c)) {
+                    newNode = c;
+                }
+            }
+            Assert.assertNotNull(newNode);
+
+            // delete the ephemeral node to trigger a retry
+            client.delete().forPath("/test/leases/" + newNode);
+
+            // release first lease so second one can be acquired
+            lease.close();
+            lease = leaseFuture.get();
+            Assert.assertNotNull(lease);
+            lease.close();
+            Assert.assertEquals(client.getChildren().forPath("/test/leases").size(), 0);
+
+            // no more lease exist. must be possible to acquire a new one
+            Assert.assertNotNull(semaphore.acquire(timing.forWaiting().seconds(), TimeUnit.SECONDS));
+        }
+        finally
+        {
+            client.close();
+            executor.shutdownNow();
+        }
+    }
+
 }


[3/4] curator git commit: Merge branch 'CURATOR-315' of github.com:ulle/curator into CURATOR-315

Posted by ra...@apache.org.
Merge branch 'CURATOR-315' of github.com:ulle/curator into CURATOR-315


Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/168dfd73
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/168dfd73
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/168dfd73

Branch: refs/heads/CURATOR-3.0
Commit: 168dfd7348cd5a66f411fcfdf51376cc8a08a515
Parents: 0ef5e45 613a51b
Author: randgalt <ra...@apache.org>
Authored: Sun May 8 13:33:43 2016 -0500
Committer: randgalt <ra...@apache.org>
Committed: Sun May 8 13:33:43 2016 -0500

----------------------------------------------------------------------
 .../recipes/locks/InterProcessSemaphoreV2.java  |  8 ++-
 .../locks/TestInterProcessSemaphore.java        | 66 ++++++++++++++++++++
 2 files changed, 73 insertions(+), 1 deletion(-)
----------------------------------------------------------------------



[2/4] curator git commit: changed a deprecation comment to provide a better equivalent

Posted by ra...@apache.org.
changed a deprecation comment to provide a better equivalent


Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/0ef5e454
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/0ef5e454
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/0ef5e454

Branch: refs/heads/CURATOR-3.0
Commit: 0ef5e454a7d72a65a3c14b47bf3c15f84ff9d4d6
Parents: 73cd00a
Author: Jorge Montero <jx...@stripe.com>
Authored: Wed Apr 20 07:21:51 2016 -0700
Committer: Jorge Montero <jx...@stripe.com>
Committed: Wed Apr 20 07:21:51 2016 -0700

----------------------------------------------------------------------
 .../main/java/org/apache/curator/framework/api/CreateBuilder.java  | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/curator/blob/0ef5e454/curator-framework/src/main/java/org/apache/curator/framework/api/CreateBuilder.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/api/CreateBuilder.java b/curator-framework/src/main/java/org/apache/curator/framework/api/CreateBuilder.java
index 0db2094..e507f68 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/api/CreateBuilder.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/api/CreateBuilder.java
@@ -46,7 +46,7 @@ public interface CreateBuilder extends
     /**
      * @deprecated this has been generalized to support all create modes. Instead, use:
      * <pre>
-     *     client.create().withProtection().withMode(CreateMode.PERSISTENT_SEQUENTIAL)...
+     *     client.create().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL)...
      * </pre>
      * @return this
      */


[4/4] curator git commit: Finished merge of CURATOR-315

Posted by ra...@apache.org.
Finished merge of CURATOR-315


Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/29906f1e
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/29906f1e
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/29906f1e

Branch: refs/heads/CURATOR-3.0
Commit: 29906f1ebb9130b91ec1d7034de9885af9f15d07
Parents: e76eb59 168dfd7
Author: randgalt <ra...@apache.org>
Authored: Sun May 8 13:53:20 2016 -0500
Committer: randgalt <ra...@apache.org>
Committed: Sun May 8 13:53:20 2016 -0500

----------------------------------------------------------------------
 .../recipes/locks/InterProcessSemaphoreV2.java  |  8 ++-
 .../locks/TestInterProcessSemaphore.java        | 66 ++++++++++++++++++++
 2 files changed, 73 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/curator/blob/29906f1e/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/InterProcessSemaphoreV2.java
----------------------------------------------------------------------
diff --cc curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/InterProcessSemaphoreV2.java
index 36dbff4,3d96be2..d967b98
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/InterProcessSemaphoreV2.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/InterProcessSemaphoreV2.java
@@@ -352,38 -354,37 +355,40 @@@ public class InterProcessSemaphoreV
              PathAndBytesable<String> createBuilder = client.create().creatingParentContainersIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL);
              String path = (nodeData != null) ? createBuilder.forPath(ZKPaths.makePath(leasesPath, LEASE_BASE_NAME), nodeData) : createBuilder.forPath(ZKPaths.makePath(leasesPath, LEASE_BASE_NAME));
              String nodeName = ZKPaths.getNodeFromPath(path);
-             builder.add(makeLease(path));
+             lease = makeLease(path);
  
 -            synchronized(this)
 +            try
              {
 -                for(;;)
 +                synchronized(this)
                  {
 -                    List<String> children = client.getChildren().usingWatcher(watcher).forPath(leasesPath);
 -                    if ( !children.contains(nodeName) )
 +                    for(;;)
                      {
 -                        log.error("Sequential path not found: " + path);
 -                        returnLease(lease);
 -                        return InternalAcquireResult.RETRY_DUE_TO_MISSING_NODE;
 -                    }
 -
 -                    if ( children.size() <= maxLeases )
 -                    {
 -                        break;
 -                    }
 -                    if ( hasWait )
 -                    {
 -                        long thisWaitMs = getThisWaitMs(startMs, waitMs);
 -                        if ( thisWaitMs <= 0 )
 +                        List<String> children = client.getChildren().usingWatcher(watcher).forPath(leasesPath);
 +                        if ( !children.contains(nodeName) )
                          {
 +                            log.error("Sequential path not found: " + path);
+                             returnLease(lease);
 -                            return InternalAcquireResult.RETURN_NULL;
 +                            return InternalAcquireResult.RETRY_DUE_TO_MISSING_NODE;
 +                        }
 +
 +                        if ( children.size() <= maxLeases )
 +                        {
 +                            break;
 +                        }
 +                        if ( hasWait )
 +                        {
 +                            long thisWaitMs = getThisWaitMs(startMs, waitMs);
 +                            if ( thisWaitMs <= 0 )
 +                            {
++                                returnLease(lease);
 +                                return InternalAcquireResult.RETURN_NULL;
 +                            }
 +                            wait(thisWaitMs);
 +                        }
 +                        else
 +                        {
 +                            wait();
                          }
 -                        wait(thisWaitMs);
 -                    }
 -                    else
 -                    {
 -                        wait();
                      }
                  }
              }

http://git-wip-us.apache.org/repos/asf/curator/blob/29906f1e/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessSemaphore.java
----------------------------------------------------------------------
diff --cc curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessSemaphore.java
index 2797b5f,ad45d90..802290e
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessSemaphore.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessSemaphore.java
@@@ -20,14 -20,16 +20,17 @@@
  package org.apache.curator.framework.recipes.locks;
  
  import com.google.common.collect.Lists;
 -import org.apache.curator.framework.api.CuratorWatcher;
 -import org.apache.curator.test.BaseClassForTests;
 -import org.apache.curator.utils.CloseableUtils;
  import org.apache.curator.framework.CuratorFramework;
  import org.apache.curator.framework.CuratorFrameworkFactory;
++import org.apache.curator.framework.api.CuratorWatcher;
 +import org.apache.curator.framework.imps.TestCleanState;
  import org.apache.curator.framework.recipes.shared.SharedCount;
  import org.apache.curator.retry.RetryOneTime;
 +import org.apache.curator.test.BaseClassForTests;
  import org.apache.curator.test.Timing;
 +import org.apache.curator.utils.CloseableUtils;
+ import org.apache.zookeeper.WatchedEvent;
+ import org.apache.zookeeper.Watcher;
  import org.testng.Assert;
  import org.testng.annotations.Test;
  import java.util.Collection;
@@@ -531,7 -531,110 +534,70 @@@ public class TestInterProcessSemaphore 
              {
                  CloseableUtils.closeQuietly(l);
              }
 -            CloseableUtils.closeQuietly(client);
 +            TestCleanState.closeAndTestClean(client);
          }
      }
+ 
 -    @Test
 -    public void testChildReaperCleansUpLockNodes() throws Exception
 -    {
 -        Timing timing = new Timing();
 -        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
 -        client.start();
 -
 -        ChildReaper childReaper = null;
 -        try
 -        {
 -            InterProcessSemaphoreV2 semaphore = new InterProcessSemaphoreV2(client, "/test/lock", 1);
 -            semaphore.returnLease(semaphore.acquire(timing.forWaiting().seconds(), TimeUnit.SECONDS));
 -
 -            Assert.assertTrue(client.getChildren().forPath("/test").size() > 0);
 -
 -            childReaper = new ChildReaper(
 -                    client,
 -                    "/test",
 -                    Reaper.Mode.REAP_UNTIL_GONE,
 -                    ChildReaper.newExecutorService(),
 -                    1,
 -                    "/test-leader",
 -                    InterProcessSemaphoreV2.LOCK_SCHEMA
 -            );
 -            childReaper.start();
 -
 -            timing.forWaiting().sleepABit();
 -
 -            List<String> children = client.getChildren().forPath("/test");
 -
 -            Assert.assertEquals(children.size(), 0, "All children of /test should have been reaped");
 -        }
 -        finally
 -        {
 -            CloseableUtils.closeQuietly(childReaper);
 -            CloseableUtils.closeQuietly(client);
 -        }
 -
 -    }
+ 
+     @Test
+     public void testNoOrphanedNodes() throws Exception
+     {
+         final Timing timing = new Timing();
+         final ExecutorService executor = Executors.newFixedThreadPool(1);
+         CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
+         client.start();
+         try
+         {
+             final InterProcessSemaphoreV2 semaphore = new InterProcessSemaphoreV2(client, "/test", 1);
+             Lease lease = semaphore.acquire(timing.forWaiting().seconds(), TimeUnit.SECONDS);
+             Assert.assertNotNull(lease);
+             final List<String> childNodes = client.getChildren().forPath("/test/leases");
+             Assert.assertEquals(childNodes.size(), 1);
+ 
+             final CountDownLatch nodeCreatedLatch = new CountDownLatch(1);
+             client.getChildren().usingWatcher(new CuratorWatcher() {
+                 @Override
+                 public void process(WatchedEvent event) throws Exception {
+                     if (event.getType() == Watcher.Event.EventType.NodeCreated) {
+                         nodeCreatedLatch.countDown();
+                     }
+                 }
+             }).forPath("/test/leases");
+ 
+             final Future<Lease> leaseFuture = executor.submit(new Callable<Lease>() {
+                 @Override
+                 public Lease call() throws Exception {
+                     return semaphore.acquire(timing.forWaiting().multiple(2).seconds(), TimeUnit.SECONDS);
+                 }
+             });
+ 
+             // wait for second lease to create its node
+             timing.awaitLatch(nodeCreatedLatch);
+             String newNode = null;
+             for (String c : client.getChildren().forPath("/test/leases")) {
+                 if (!childNodes.contains(c)) {
+                     newNode = c;
+                 }
+             }
+             Assert.assertNotNull(newNode);
+ 
+             // delete the ephemeral node to trigger a retry
+             client.delete().forPath("/test/leases/" + newNode);
+ 
+             // release first lease so second one can be acquired
+             lease.close();
+             lease = leaseFuture.get();
+             Assert.assertNotNull(lease);
+             lease.close();
+             Assert.assertEquals(client.getChildren().forPath("/test/leases").size(), 0);
+ 
+             // no more lease exist. must be possible to acquire a new one
+             Assert.assertNotNull(semaphore.acquire(timing.forWaiting().seconds(), TimeUnit.SECONDS));
+         }
+         finally
+         {
 -            client.close();
+             executor.shutdownNow();
++            TestCleanState.closeAndTestClean(client);
+         }
+     }
 -
  }