You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@curator.apache.org by ra...@apache.org on 2016/05/08 18:44:28 UTC
[1/2] curator git commit: Return leases before retrying aquire
Repository: curator
Updated Branches:
refs/heads/master 0ef5e454a -> 168dfd734
Return leases before retrying aquire
Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/613a51be
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/613a51be
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/613a51be
Branch: refs/heads/master
Commit: 613a51be4535544e990d31e485674e237bd7e2da
Parents: 73cd00a
Author: Ulrich Geilmann <ug...@googlemail.com>
Authored: Mon Apr 11 15:32:29 2016 +0200
Committer: Ulrich Geilmann <ug...@googlemail.com>
Committed: Mon Apr 11 23:02:28 2016 +0200
----------------------------------------------------------------------
.../recipes/locks/InterProcessSemaphoreV2.java | 8 ++-
.../locks/TestInterProcessSemaphore.java | 66 ++++++++++++++++++++
2 files changed, 73 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/curator/blob/613a51be/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/InterProcessSemaphoreV2.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/InterProcessSemaphoreV2.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/InterProcessSemaphoreV2.java
index 8524075..3d96be2 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/InterProcessSemaphoreV2.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/InterProcessSemaphoreV2.java
@@ -346,12 +346,15 @@ public class InterProcessSemaphoreV2
{
lock.acquire();
}
+
+ Lease lease = null;
+
try
{
PathAndBytesable<String> createBuilder = client.create().creatingParentContainersIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL);
String path = (nodeData != null) ? createBuilder.forPath(ZKPaths.makePath(leasesPath, LEASE_BASE_NAME), nodeData) : createBuilder.forPath(ZKPaths.makePath(leasesPath, LEASE_BASE_NAME));
String nodeName = ZKPaths.getNodeFromPath(path);
- builder.add(makeLease(path));
+ lease = makeLease(path);
synchronized(this)
{
@@ -361,6 +364,7 @@ public class InterProcessSemaphoreV2
if ( !children.contains(nodeName) )
{
log.error("Sequential path not found: " + path);
+ returnLease(lease);
return InternalAcquireResult.RETRY_DUE_TO_MISSING_NODE;
}
@@ -373,6 +377,7 @@ public class InterProcessSemaphoreV2
long thisWaitMs = getThisWaitMs(startMs, waitMs);
if ( thisWaitMs <= 0 )
{
+ returnLease(lease);
return InternalAcquireResult.RETURN_NULL;
}
wait(thisWaitMs);
@@ -388,6 +393,7 @@ public class InterProcessSemaphoreV2
{
lock.release();
}
+ builder.add(Preconditions.checkNotNull(lease));
return InternalAcquireResult.CONTINUE;
}
http://git-wip-us.apache.org/repos/asf/curator/blob/613a51be/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessSemaphore.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessSemaphore.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessSemaphore.java
index 631b7c7..ad45d90 100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessSemaphore.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessSemaphore.java
@@ -20,6 +20,7 @@
package org.apache.curator.framework.recipes.locks;
import com.google.common.collect.Lists;
+import org.apache.curator.framework.api.CuratorWatcher;
import org.apache.curator.test.BaseClassForTests;
import org.apache.curator.utils.CloseableUtils;
import org.apache.curator.framework.CuratorFramework;
@@ -27,6 +28,8 @@ import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.shared.SharedCount;
import org.apache.curator.retry.RetryOneTime;
import org.apache.curator.test.Timing;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
import org.testng.Assert;
import org.testng.annotations.Test;
import java.util.Collection;
@@ -571,4 +574,67 @@ public class TestInterProcessSemaphore extends BaseClassForTests
}
}
+
+ @Test
+ public void testNoOrphanedNodes() throws Exception
+ {
+ final Timing timing = new Timing();
+ final ExecutorService executor = Executors.newFixedThreadPool(1);
+ CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
+ client.start();
+ try
+ {
+ final InterProcessSemaphoreV2 semaphore = new InterProcessSemaphoreV2(client, "/test", 1);
+ Lease lease = semaphore.acquire(timing.forWaiting().seconds(), TimeUnit.SECONDS);
+ Assert.assertNotNull(lease);
+ final List<String> childNodes = client.getChildren().forPath("/test/leases");
+ Assert.assertEquals(childNodes.size(), 1);
+
+ final CountDownLatch nodeCreatedLatch = new CountDownLatch(1);
+ client.getChildren().usingWatcher(new CuratorWatcher() {
+ @Override
+ public void process(WatchedEvent event) throws Exception {
+ if (event.getType() == Watcher.Event.EventType.NodeCreated) {
+ nodeCreatedLatch.countDown();
+ }
+ }
+ }).forPath("/test/leases");
+
+ final Future<Lease> leaseFuture = executor.submit(new Callable<Lease>() {
+ @Override
+ public Lease call() throws Exception {
+ return semaphore.acquire(timing.forWaiting().multiple(2).seconds(), TimeUnit.SECONDS);
+ }
+ });
+
+ // wait for second lease to create its node
+ timing.awaitLatch(nodeCreatedLatch);
+ String newNode = null;
+ for (String c : client.getChildren().forPath("/test/leases")) {
+ if (!childNodes.contains(c)) {
+ newNode = c;
+ }
+ }
+ Assert.assertNotNull(newNode);
+
+ // delete the ephemeral node to trigger a retry
+ client.delete().forPath("/test/leases/" + newNode);
+
+ // release first lease so second one can be acquired
+ lease.close();
+ lease = leaseFuture.get();
+ Assert.assertNotNull(lease);
+ lease.close();
+ Assert.assertEquals(client.getChildren().forPath("/test/leases").size(), 0);
+
+ // no more lease exist. must be possible to acquire a new one
+ Assert.assertNotNull(semaphore.acquire(timing.forWaiting().seconds(), TimeUnit.SECONDS));
+ }
+ finally
+ {
+ client.close();
+ executor.shutdownNow();
+ }
+ }
+
}
[2/2] curator git commit: Merge branch 'CURATOR-315' of
github.com:ulle/curator into CURATOR-315
Posted by ra...@apache.org.
Merge branch 'CURATOR-315' of github.com:ulle/curator into CURATOR-315
Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/168dfd73
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/168dfd73
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/168dfd73
Branch: refs/heads/master
Commit: 168dfd7348cd5a66f411fcfdf51376cc8a08a515
Parents: 0ef5e45 613a51b
Author: randgalt <ra...@apache.org>
Authored: Sun May 8 13:33:43 2016 -0500
Committer: randgalt <ra...@apache.org>
Committed: Sun May 8 13:33:43 2016 -0500
----------------------------------------------------------------------
.../recipes/locks/InterProcessSemaphoreV2.java | 8 ++-
.../locks/TestInterProcessSemaphore.java | 66 ++++++++++++++++++++
2 files changed, 73 insertions(+), 1 deletion(-)
----------------------------------------------------------------------