You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@curator.apache.org by ra...@apache.org on 2016/05/08 18:53:26 UTC
[1/4] curator git commit: Return leases before retrying aquire
Repository: curator
Updated Branches:
refs/heads/CURATOR-3.0 e76eb590f -> 29906f1eb
Return leases before retrying aquire
Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/613a51be
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/613a51be
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/613a51be
Branch: refs/heads/CURATOR-3.0
Commit: 613a51be4535544e990d31e485674e237bd7e2da
Parents: 73cd00a
Author: Ulrich Geilmann <ug...@googlemail.com>
Authored: Mon Apr 11 15:32:29 2016 +0200
Committer: Ulrich Geilmann <ug...@googlemail.com>
Committed: Mon Apr 11 23:02:28 2016 +0200
----------------------------------------------------------------------
.../recipes/locks/InterProcessSemaphoreV2.java | 8 ++-
.../locks/TestInterProcessSemaphore.java | 66 ++++++++++++++++++++
2 files changed, 73 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/curator/blob/613a51be/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/InterProcessSemaphoreV2.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/InterProcessSemaphoreV2.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/InterProcessSemaphoreV2.java
index 8524075..3d96be2 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/InterProcessSemaphoreV2.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/InterProcessSemaphoreV2.java
@@ -346,12 +346,15 @@ public class InterProcessSemaphoreV2
{
lock.acquire();
}
+
+ Lease lease = null;
+
try
{
PathAndBytesable<String> createBuilder = client.create().creatingParentContainersIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL);
String path = (nodeData != null) ? createBuilder.forPath(ZKPaths.makePath(leasesPath, LEASE_BASE_NAME), nodeData) : createBuilder.forPath(ZKPaths.makePath(leasesPath, LEASE_BASE_NAME));
String nodeName = ZKPaths.getNodeFromPath(path);
- builder.add(makeLease(path));
+ lease = makeLease(path);
synchronized(this)
{
@@ -361,6 +364,7 @@ public class InterProcessSemaphoreV2
if ( !children.contains(nodeName) )
{
log.error("Sequential path not found: " + path);
+ returnLease(lease);
return InternalAcquireResult.RETRY_DUE_TO_MISSING_NODE;
}
@@ -373,6 +377,7 @@ public class InterProcessSemaphoreV2
long thisWaitMs = getThisWaitMs(startMs, waitMs);
if ( thisWaitMs <= 0 )
{
+ returnLease(lease);
return InternalAcquireResult.RETURN_NULL;
}
wait(thisWaitMs);
@@ -388,6 +393,7 @@ public class InterProcessSemaphoreV2
{
lock.release();
}
+ builder.add(Preconditions.checkNotNull(lease));
return InternalAcquireResult.CONTINUE;
}
http://git-wip-us.apache.org/repos/asf/curator/blob/613a51be/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessSemaphore.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessSemaphore.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessSemaphore.java
index 631b7c7..ad45d90 100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessSemaphore.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessSemaphore.java
@@ -20,6 +20,7 @@
package org.apache.curator.framework.recipes.locks;
import com.google.common.collect.Lists;
+import org.apache.curator.framework.api.CuratorWatcher;
import org.apache.curator.test.BaseClassForTests;
import org.apache.curator.utils.CloseableUtils;
import org.apache.curator.framework.CuratorFramework;
@@ -27,6 +28,8 @@ import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.shared.SharedCount;
import org.apache.curator.retry.RetryOneTime;
import org.apache.curator.test.Timing;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
import org.testng.Assert;
import org.testng.annotations.Test;
import java.util.Collection;
@@ -571,4 +574,67 @@ public class TestInterProcessSemaphore extends BaseClassForTests
}
}
+
+ @Test
+ public void testNoOrphanedNodes() throws Exception
+ {
+ final Timing timing = new Timing();
+ final ExecutorService executor = Executors.newFixedThreadPool(1);
+ CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
+ client.start();
+ try
+ {
+ final InterProcessSemaphoreV2 semaphore = new InterProcessSemaphoreV2(client, "/test", 1);
+ Lease lease = semaphore.acquire(timing.forWaiting().seconds(), TimeUnit.SECONDS);
+ Assert.assertNotNull(lease);
+ final List<String> childNodes = client.getChildren().forPath("/test/leases");
+ Assert.assertEquals(childNodes.size(), 1);
+
+ final CountDownLatch nodeCreatedLatch = new CountDownLatch(1);
+ client.getChildren().usingWatcher(new CuratorWatcher() {
+ @Override
+ public void process(WatchedEvent event) throws Exception {
+ if (event.getType() == Watcher.Event.EventType.NodeCreated) {
+ nodeCreatedLatch.countDown();
+ }
+ }
+ }).forPath("/test/leases");
+
+ final Future<Lease> leaseFuture = executor.submit(new Callable<Lease>() {
+ @Override
+ public Lease call() throws Exception {
+ return semaphore.acquire(timing.forWaiting().multiple(2).seconds(), TimeUnit.SECONDS);
+ }
+ });
+
+ // wait for second lease to create its node
+ timing.awaitLatch(nodeCreatedLatch);
+ String newNode = null;
+ for (String c : client.getChildren().forPath("/test/leases")) {
+ if (!childNodes.contains(c)) {
+ newNode = c;
+ }
+ }
+ Assert.assertNotNull(newNode);
+
+ // delete the ephemeral node to trigger a retry
+ client.delete().forPath("/test/leases/" + newNode);
+
+ // release first lease so second one can be acquired
+ lease.close();
+ lease = leaseFuture.get();
+ Assert.assertNotNull(lease);
+ lease.close();
+ Assert.assertEquals(client.getChildren().forPath("/test/leases").size(), 0);
+
+ // no more lease exist. must be possible to acquire a new one
+ Assert.assertNotNull(semaphore.acquire(timing.forWaiting().seconds(), TimeUnit.SECONDS));
+ }
+ finally
+ {
+ client.close();
+ executor.shutdownNow();
+ }
+ }
+
}
[3/4] curator git commit: Merge branch 'CURATOR-315' of
github.com:ulle/curator into CURATOR-315
Posted by ra...@apache.org.
Merge branch 'CURATOR-315' of github.com:ulle/curator into CURATOR-315
Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/168dfd73
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/168dfd73
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/168dfd73
Branch: refs/heads/CURATOR-3.0
Commit: 168dfd7348cd5a66f411fcfdf51376cc8a08a515
Parents: 0ef5e45 613a51b
Author: randgalt <ra...@apache.org>
Authored: Sun May 8 13:33:43 2016 -0500
Committer: randgalt <ra...@apache.org>
Committed: Sun May 8 13:33:43 2016 -0500
----------------------------------------------------------------------
.../recipes/locks/InterProcessSemaphoreV2.java | 8 ++-
.../locks/TestInterProcessSemaphore.java | 66 ++++++++++++++++++++
2 files changed, 73 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
[2/4] curator git commit: changed a deprecation comment to provide a
better equivalent
Posted by ra...@apache.org.
changed a deprecation comment to provide a better equivalent
Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/0ef5e454
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/0ef5e454
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/0ef5e454
Branch: refs/heads/CURATOR-3.0
Commit: 0ef5e454a7d72a65a3c14b47bf3c15f84ff9d4d6
Parents: 73cd00a
Author: Jorge Montero <jx...@stripe.com>
Authored: Wed Apr 20 07:21:51 2016 -0700
Committer: Jorge Montero <jx...@stripe.com>
Committed: Wed Apr 20 07:21:51 2016 -0700
----------------------------------------------------------------------
.../main/java/org/apache/curator/framework/api/CreateBuilder.java | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/curator/blob/0ef5e454/curator-framework/src/main/java/org/apache/curator/framework/api/CreateBuilder.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/api/CreateBuilder.java b/curator-framework/src/main/java/org/apache/curator/framework/api/CreateBuilder.java
index 0db2094..e507f68 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/api/CreateBuilder.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/api/CreateBuilder.java
@@ -46,7 +46,7 @@ public interface CreateBuilder extends
/**
* @deprecated this has been generalized to support all create modes. Instead, use:
* <pre>
- * client.create().withProtection().withMode(CreateMode.PERSISTENT_SEQUENTIAL)...
+ * client.create().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL)...
* </pre>
* @return this
*/
[4/4] curator git commit: Finished merge of CURATOR-315
Posted by ra...@apache.org.
Finished merge of CURATOR-315
Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/29906f1e
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/29906f1e
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/29906f1e
Branch: refs/heads/CURATOR-3.0
Commit: 29906f1ebb9130b91ec1d7034de9885af9f15d07
Parents: e76eb59 168dfd7
Author: randgalt <ra...@apache.org>
Authored: Sun May 8 13:53:20 2016 -0500
Committer: randgalt <ra...@apache.org>
Committed: Sun May 8 13:53:20 2016 -0500
----------------------------------------------------------------------
.../recipes/locks/InterProcessSemaphoreV2.java | 8 ++-
.../locks/TestInterProcessSemaphore.java | 66 ++++++++++++++++++++
2 files changed, 73 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/curator/blob/29906f1e/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/InterProcessSemaphoreV2.java
----------------------------------------------------------------------
diff --cc curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/InterProcessSemaphoreV2.java
index 36dbff4,3d96be2..d967b98
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/InterProcessSemaphoreV2.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/InterProcessSemaphoreV2.java
@@@ -352,38 -354,37 +355,40 @@@ public class InterProcessSemaphoreV
PathAndBytesable<String> createBuilder = client.create().creatingParentContainersIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL);
String path = (nodeData != null) ? createBuilder.forPath(ZKPaths.makePath(leasesPath, LEASE_BASE_NAME), nodeData) : createBuilder.forPath(ZKPaths.makePath(leasesPath, LEASE_BASE_NAME));
String nodeName = ZKPaths.getNodeFromPath(path);
- builder.add(makeLease(path));
+ lease = makeLease(path);
- synchronized(this)
+ try
{
- for(;;)
+ synchronized(this)
{
- List<String> children = client.getChildren().usingWatcher(watcher).forPath(leasesPath);
- if ( !children.contains(nodeName) )
+ for(;;)
{
- log.error("Sequential path not found: " + path);
- returnLease(lease);
- return InternalAcquireResult.RETRY_DUE_TO_MISSING_NODE;
- }
-
- if ( children.size() <= maxLeases )
- {
- break;
- }
- if ( hasWait )
- {
- long thisWaitMs = getThisWaitMs(startMs, waitMs);
- if ( thisWaitMs <= 0 )
+ List<String> children = client.getChildren().usingWatcher(watcher).forPath(leasesPath);
+ if ( !children.contains(nodeName) )
{
+ log.error("Sequential path not found: " + path);
+ returnLease(lease);
- return InternalAcquireResult.RETURN_NULL;
+ return InternalAcquireResult.RETRY_DUE_TO_MISSING_NODE;
+ }
+
+ if ( children.size() <= maxLeases )
+ {
+ break;
+ }
+ if ( hasWait )
+ {
+ long thisWaitMs = getThisWaitMs(startMs, waitMs);
+ if ( thisWaitMs <= 0 )
+ {
++ returnLease(lease);
+ return InternalAcquireResult.RETURN_NULL;
+ }
+ wait(thisWaitMs);
+ }
+ else
+ {
+ wait();
}
- wait(thisWaitMs);
- }
- else
- {
- wait();
}
}
}
http://git-wip-us.apache.org/repos/asf/curator/blob/29906f1e/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessSemaphore.java
----------------------------------------------------------------------
diff --cc curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessSemaphore.java
index 2797b5f,ad45d90..802290e
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessSemaphore.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessSemaphore.java
@@@ -20,14 -20,16 +20,17 @@@
package org.apache.curator.framework.recipes.locks;
import com.google.common.collect.Lists;
-import org.apache.curator.framework.api.CuratorWatcher;
-import org.apache.curator.test.BaseClassForTests;
-import org.apache.curator.utils.CloseableUtils;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
++import org.apache.curator.framework.api.CuratorWatcher;
+import org.apache.curator.framework.imps.TestCleanState;
import org.apache.curator.framework.recipes.shared.SharedCount;
import org.apache.curator.retry.RetryOneTime;
+import org.apache.curator.test.BaseClassForTests;
import org.apache.curator.test.Timing;
+import org.apache.curator.utils.CloseableUtils;
+ import org.apache.zookeeper.WatchedEvent;
+ import org.apache.zookeeper.Watcher;
import org.testng.Assert;
import org.testng.annotations.Test;
import java.util.Collection;
@@@ -531,7 -531,110 +534,70 @@@ public class TestInterProcessSemaphore
{
CloseableUtils.closeQuietly(l);
}
- CloseableUtils.closeQuietly(client);
+ TestCleanState.closeAndTestClean(client);
}
}
+
- @Test
- public void testChildReaperCleansUpLockNodes() throws Exception
- {
- Timing timing = new Timing();
- CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
- client.start();
-
- ChildReaper childReaper = null;
- try
- {
- InterProcessSemaphoreV2 semaphore = new InterProcessSemaphoreV2(client, "/test/lock", 1);
- semaphore.returnLease(semaphore.acquire(timing.forWaiting().seconds(), TimeUnit.SECONDS));
-
- Assert.assertTrue(client.getChildren().forPath("/test").size() > 0);
-
- childReaper = new ChildReaper(
- client,
- "/test",
- Reaper.Mode.REAP_UNTIL_GONE,
- ChildReaper.newExecutorService(),
- 1,
- "/test-leader",
- InterProcessSemaphoreV2.LOCK_SCHEMA
- );
- childReaper.start();
-
- timing.forWaiting().sleepABit();
-
- List<String> children = client.getChildren().forPath("/test");
-
- Assert.assertEquals(children.size(), 0, "All children of /test should have been reaped");
- }
- finally
- {
- CloseableUtils.closeQuietly(childReaper);
- CloseableUtils.closeQuietly(client);
- }
-
- }
+
+ @Test
+ public void testNoOrphanedNodes() throws Exception
+ {
+ final Timing timing = new Timing();
+ final ExecutorService executor = Executors.newFixedThreadPool(1);
+ CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
+ client.start();
+ try
+ {
+ final InterProcessSemaphoreV2 semaphore = new InterProcessSemaphoreV2(client, "/test", 1);
+ Lease lease = semaphore.acquire(timing.forWaiting().seconds(), TimeUnit.SECONDS);
+ Assert.assertNotNull(lease);
+ final List<String> childNodes = client.getChildren().forPath("/test/leases");
+ Assert.assertEquals(childNodes.size(), 1);
+
+ final CountDownLatch nodeCreatedLatch = new CountDownLatch(1);
+ client.getChildren().usingWatcher(new CuratorWatcher() {
+ @Override
+ public void process(WatchedEvent event) throws Exception {
+ if (event.getType() == Watcher.Event.EventType.NodeCreated) {
+ nodeCreatedLatch.countDown();
+ }
+ }
+ }).forPath("/test/leases");
+
+ final Future<Lease> leaseFuture = executor.submit(new Callable<Lease>() {
+ @Override
+ public Lease call() throws Exception {
+ return semaphore.acquire(timing.forWaiting().multiple(2).seconds(), TimeUnit.SECONDS);
+ }
+ });
+
+ // wait for second lease to create its node
+ timing.awaitLatch(nodeCreatedLatch);
+ String newNode = null;
+ for (String c : client.getChildren().forPath("/test/leases")) {
+ if (!childNodes.contains(c)) {
+ newNode = c;
+ }
+ }
+ Assert.assertNotNull(newNode);
+
+ // delete the ephemeral node to trigger a retry
+ client.delete().forPath("/test/leases/" + newNode);
+
+ // release first lease so second one can be acquired
+ lease.close();
+ lease = leaseFuture.get();
+ Assert.assertNotNull(lease);
+ lease.close();
+ Assert.assertEquals(client.getChildren().forPath("/test/leases").size(), 0);
+
+ // no more lease exist. must be possible to acquire a new one
+ Assert.assertNotNull(semaphore.acquire(timing.forWaiting().seconds(), TimeUnit.SECONDS));
+ }
+ finally
+ {
- client.close();
+ executor.shutdownNow();
++ TestCleanState.closeAndTestClean(client);
+ }
+ }
-
}