You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@curator.apache.org by ra...@apache.org on 2014/03/07 01:59:30 UTC
[1/2] git commit: work continues
Repository: curator
Updated Branches:
refs/heads/CURATOR-88 79e55cf88 -> ae4813512
work continues
Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/9f757b33
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/9f757b33
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/9f757b33
Branch: refs/heads/CURATOR-88
Commit: 9f757b337f8f9eb17bb10316b5baeccce62f6789
Parents: 79e55cf
Author: randgalt <ra...@apache.org>
Authored: Thu Mar 6 19:56:01 2014 -0500
Committer: randgalt <ra...@apache.org>
Committed: Thu Mar 6 19:56:01 2014 -0500
----------------------------------------------------------------------
.../recipes/locks/InterProcessLock.java | 11 +-
.../recipes/locks/InterProcessMultiLock.java | 15 +
.../locks/InterProcessReadWriteLock.java | 4 +-
.../locks/InterProcessReadWriteLockBase.java | 37 +++
.../locks/InterProcessSemaphoreMutex.java | 7 +
.../locks/InterProcessSemaphoreReadWrite.java | 208 +++++++++++++
.../recipes/locks/InterProcessSemaphoreV2.java | 37 ++-
.../locks/TestInterProcessMultiMutex.java | 13 +
.../locks/TestInterProcessReadWriteLock.java | 271 +----------------
.../TestInterProcessReadWriteLockBase.java | 289 +++++++++++++++++++
.../TestInterProcessSemaphoreReadWrite.java | 49 ++++
.../x/rest/support/InterProcessLockBridge.java | 7 +
.../InterProcessReadWriteLockBridge.java | 7 +
13 files changed, 678 insertions(+), 277 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/curator/blob/9f757b33/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/InterProcessLock.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/InterProcessLock.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/InterProcessLock.java
index 7192eae..63b7b14 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/InterProcessLock.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/InterProcessLock.java
@@ -18,6 +18,7 @@
*/
package org.apache.curator.framework.recipes.locks;
+import java.util.Collection;
import java.util.concurrent.TimeUnit;
public interface InterProcessLock
@@ -56,5 +57,13 @@ public interface InterProcessLock
*
* @return true/false
*/
- boolean isAcquiredInThisProcess();
+ public boolean isAcquiredInThisProcess();
+
+ /**
+ * Return a sorted list of all current nodes participating in the lock
+ *
+ * @return list of nodes
+ * @throws Exception ZK errors, interruptions, etc.
+ */
+ public Collection<String> getParticipantNodes() throws Exception;
}
http://git-wip-us.apache.org/repos/asf/curator/blob/9f757b33/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/InterProcessMultiLock.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/InterProcessMultiLock.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/InterProcessMultiLock.java
index ca06b34..75ba3ae 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/InterProcessMultiLock.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/InterProcessMultiLock.java
@@ -21,6 +21,7 @@ package org.apache.curator.framework.recipes.locks;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import org.apache.curator.framework.CuratorFramework;
+import java.util.Collection;
import java.util.List;
import java.util.concurrent.TimeUnit;
@@ -173,6 +174,20 @@ public class InterProcessMultiLock implements InterProcessLock
}
@Override
+ public Collection<String> getParticipantNodes() throws Exception
+ {
+ List<String> participants = Lists.newArrayList();
+ for ( InterProcessLock lock : locks )
+ {
+ if ( !lock.isAcquiredInThisProcess() )
+ {
+ participants.addAll(lock.getParticipantNodes());
+ }
+ }
+ return participants;
+ }
+
+ @Override
public synchronized boolean isAcquiredInThisProcess()
{
// it's subjective what the correct meaning is here - I choose to return true
http://git-wip-us.apache.org/repos/asf/curator/blob/9f757b33/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/InterProcessReadWriteLock.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/InterProcessReadWriteLock.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/InterProcessReadWriteLock.java
index fafde5f..4476e6f 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/InterProcessReadWriteLock.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/InterProcessReadWriteLock.java
@@ -51,7 +51,7 @@ import java.util.List;
* lock to the write lock is not possible.
* </p>
*/
-public class InterProcessReadWriteLock
+public class InterProcessReadWriteLock implements InterProcessReadWriteLockBase
{
private final InterProcessMutex readMutex;
private final InterProcessMutex writeMutex;
@@ -145,6 +145,7 @@ public class InterProcessReadWriteLock
*
* @return read lock
*/
+ @Override
public InterProcessMutex readLock()
{
return readMutex;
@@ -155,6 +156,7 @@ public class InterProcessReadWriteLock
*
* @return write lock
*/
+ @Override
public InterProcessMutex writeLock()
{
return writeMutex;
http://git-wip-us.apache.org/repos/asf/curator/blob/9f757b33/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/InterProcessReadWriteLockBase.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/InterProcessReadWriteLockBase.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/InterProcessReadWriteLockBase.java
new file mode 100644
index 0000000..47025d6
--- /dev/null
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/InterProcessReadWriteLockBase.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.curator.framework.recipes.locks;
+
+public interface InterProcessReadWriteLockBase
+{
+ /**
+ * Returns the lock used for reading.
+ *
+ * @return read lock
+ */
+ InterProcessLock readLock();
+
+ /**
+ * Returns the lock used for writing.
+ *
+ * @return write lock
+ */
+ InterProcessLock writeLock();
+}
http://git-wip-us.apache.org/repos/asf/curator/blob/9f757b33/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/InterProcessSemaphoreMutex.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/InterProcessSemaphoreMutex.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/InterProcessSemaphoreMutex.java
index 88b5f5d..ca33b5d 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/InterProcessSemaphoreMutex.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/InterProcessSemaphoreMutex.java
@@ -20,6 +20,7 @@ package org.apache.curator.framework.recipes.locks;
import com.google.common.base.Preconditions;
import org.apache.curator.framework.CuratorFramework;
+import java.util.Collection;
import java.util.concurrent.TimeUnit;
/**
@@ -78,4 +79,10 @@ public class InterProcessSemaphoreMutex implements InterProcessLock
{
return (lease != null);
}
+
+ @Override
+ public Collection<String> getParticipantNodes() throws Exception
+ {
+ return semaphore.getParticipantNodes();
+ }
}
http://git-wip-us.apache.org/repos/asf/curator/blob/9f757b33/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/InterProcessSemaphoreReadWrite.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/InterProcessSemaphoreReadWrite.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/InterProcessSemaphoreReadWrite.java
new file mode 100644
index 0000000..26912c0
--- /dev/null
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/InterProcessSemaphoreReadWrite.java
@@ -0,0 +1,208 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.curator.framework.recipes.locks;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Predicate;
+import com.google.common.collect.Collections2;
+import org.apache.curator.framework.CuratorFramework;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+public class InterProcessSemaphoreReadWrite implements InterProcessReadWriteLockBase
+{
+ private final InterProcessSemaphoreV2 lock;
+ private final AtomicReference<Lease> writeLease = new AtomicReference<Lease>();
+ private final List<Lease> readLeases = new CopyOnWriteArrayList<Lease>();
+
+ private static final LockInternalsSorter sorter = new LockInternalsSorter()
+ {
+ @Override
+ public String fixForSorting(String str, String lockName)
+ {
+ return StandardLockInternalsDriver.standardFixForSorting(str, lockName);
+ }
+ };
+
+ private final InterProcessLock readLock = new InterProcessLock()
+ {
+ @Override
+ public void acquire() throws Exception
+ {
+ internalAcquire(-1, null, false);
+ }
+
+ @Override
+ public boolean acquire(long time, TimeUnit unit) throws Exception
+ {
+ return internalAcquire(time, unit, false);
+ }
+
+ @Override
+ public void release() throws Exception
+ {
+ internalRelease(false);
+ }
+
+ @Override
+ public Collection<String> getParticipantNodes() throws Exception
+ {
+ return Collections2.filter(lock.getParticipantNodes(), new Predicate<String>()
+ {
+ @Override
+ public boolean apply(String name)
+ {
+ return name.contains(READ_BASE_NAME);
+ }
+ });
+ }
+
+ @Override
+ public boolean isAcquiredInThisProcess()
+ {
+ return readLeases.size() > 0;
+ }
+ };
+ private final InterProcessLock writeLock = new InterProcessLock()
+ {
+ @Override
+ public void acquire() throws Exception
+ {
+ internalAcquire(-1, null, true);
+ }
+
+ @Override
+ public boolean acquire(long time, TimeUnit unit) throws Exception
+ {
+ return internalAcquire(time, unit, true);
+ }
+
+ @Override
+ public void release() throws Exception
+ {
+ internalRelease(true);
+ }
+
+ @Override
+ public Collection<String> getParticipantNodes() throws Exception
+ {
+ return Collections2.filter(lock.getParticipantNodes(), new Predicate<String>()
+ {
+ @Override
+ public boolean apply(String name)
+ {
+ return name.contains(WRITE_BASE_NAME);
+ }
+ });
+ }
+
+ @Override
+ public boolean isAcquiredInThisProcess()
+ {
+ return writeLease.get() != null;
+ }
+ };
+
+ private static final String SUFFIX = "-lease-";
+ private static final String READ_BASE_NAME = "read" + SUFFIX;
+ private static final String WRITE_BASE_NAME = "write" + SUFFIX;
+
+ public InterProcessSemaphoreReadWrite(CuratorFramework client, String path)
+ {
+ lock = new InterProcessSemaphoreV2(client, path, Integer.MAX_VALUE);
+ }
+
+ @Override
+ public InterProcessLock readLock()
+ {
+ return readLock;
+ }
+
+ @Override
+ public InterProcessLock writeLock()
+ {
+ return writeLock;
+ }
+
+ private boolean internalShouldGetLease(String ourNodeName, List<String> children, boolean isWriter)
+ {
+ Preconditions.checkArgument(children.size() > 0, "Empty children list");
+
+ children = LockInternals.getSortedChildren(SUFFIX, sorter, children);
+ String firstNodeName = children.get(0);
+
+ if ( isWriter )
+ {
+ return firstNodeName.equals(ourNodeName);
+ }
+
+ return !firstNodeName.contains(WRITE_BASE_NAME);
+ }
+
+ private boolean internalAcquire(long time, TimeUnit unit, final boolean isWriter) throws Exception
+ {
+ Preconditions.checkState(!isWriter || (writeLease.get() == null), "Write lock already held by this InterProcessSemaphoreReadWrite");
+
+ InterProcessSemaphoreV2.LeaseAcquirePredicate acquireFilter = new InterProcessSemaphoreV2.LeaseAcquirePredicate()
+ {
+ @Override
+ public boolean shouldGetLease(String ourNodeName, List<String> children, int maxLeases)
+ {
+ return internalShouldGetLease(ourNodeName, children, isWriter);
+ }
+ };
+ Collection<Lease> leases = lock.internalAcquire(1, time, unit, isWriter ? WRITE_BASE_NAME : READ_BASE_NAME, acquireFilter);
+ if ( leases.size() == 0 )
+ {
+ return false;
+ }
+
+ Lease lease = leases.iterator().next();
+ if ( isWriter )
+ {
+ writeLease.set(lease);
+ }
+ else
+ {
+ readLeases.add(lease);
+ }
+
+ return true;
+ }
+
+ private void internalRelease(boolean isWriter)
+ {
+ Lease lease;
+ if ( isWriter )
+ {
+ Preconditions.checkState(writeLease.get() != null, "Write lock not held by this InterProcessSemaphoreReadWrite");
+ lease = writeLease.getAndSet(null);
+ }
+ else
+ {
+ Preconditions.checkState(readLeases.size() > 0, "A read lock is not held by this InterProcessSemaphoreReadWrite");
+ lease = readLeases.remove(0);
+ }
+ lock.returnLease(lease);
+ }
+}
http://git-wip-us.apache.org/repos/asf/curator/blob/9f757b33/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/InterProcessSemaphoreV2.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/InterProcessSemaphoreV2.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/InterProcessSemaphoreV2.java
index fd27cbb..005727f 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/InterProcessSemaphoreV2.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/InterProcessSemaphoreV2.java
@@ -96,6 +96,15 @@ public class InterProcessSemaphoreV2
private static final String LEASE_PARENT = "leases";
private static final String LEASE_BASE_NAME = "lease-";
+ private static final LeaseAcquirePredicate defaultLeaseAcquirePredicate = new LeaseAcquirePredicate()
+ {
+ @Override
+ public boolean shouldGetLease(String ourNodeName, List<String> children, int maxLeases)
+ {
+ return children.size() <= maxLeases;
+ }
+ };
+
/**
* @param client the client
* @param path path for the semaphore
@@ -203,7 +212,7 @@ public class InterProcessSemaphoreV2
*/
public Lease acquire() throws Exception
{
- Collection<Lease> leases = acquire(1, 0, null);
+ Collection<Lease> leases = internalAcquire(1, 0, null, LEASE_BASE_NAME, defaultLeaseAcquirePredicate);
return leases.iterator().next();
}
@@ -221,7 +230,7 @@ public class InterProcessSemaphoreV2
*/
public Collection<Lease> acquire(int qty) throws Exception
{
- return acquire(qty, 0, null);
+ return internalAcquire(qty, 0, null, LEASE_BASE_NAME, defaultLeaseAcquirePredicate);
}
/**
@@ -239,7 +248,7 @@ public class InterProcessSemaphoreV2
*/
public Lease acquire(long time, TimeUnit unit) throws Exception
{
- Collection<Lease> leases = acquire(1, time, unit);
+ Collection<Lease> leases = internalAcquire(1, time, unit, LEASE_BASE_NAME, defaultLeaseAcquirePredicate);
return (leases != null) ? leases.iterator().next() : null;
}
@@ -261,6 +270,16 @@ public class InterProcessSemaphoreV2
*/
public Collection<Lease> acquire(int qty, long time, TimeUnit unit) throws Exception
{
+ return internalAcquire(qty, time, unit, LEASE_BASE_NAME, defaultLeaseAcquirePredicate);
+ }
+
+ interface LeaseAcquirePredicate
+ {
+ boolean shouldGetLease(String ourNodeName, List<String> children, int maxLeases);
+ }
+
+ protected Collection<Lease> internalAcquire(int qty, long time, TimeUnit unit, String leaseBaseName, LeaseAcquirePredicate acquireFilter) throws Exception
+ {
long startMs = System.currentTimeMillis();
boolean hasWait = (unit != null);
long waitMs = hasWait ? TimeUnit.MILLISECONDS.convert(time, unit) : 0;
@@ -278,7 +297,7 @@ public class InterProcessSemaphoreV2
boolean isDone = false;
while ( !isDone )
{
- switch ( internalAcquire1Lease(builder, startMs, hasWait, waitMs) )
+ switch ( internalAcquire1Lease(builder, startMs, hasWait, waitMs, leaseBaseName, acquireFilter) )
{
case CONTINUE:
{
@@ -325,7 +344,7 @@ public class InterProcessSemaphoreV2
RETRY_DUE_TO_MISSING_NODE
}
- private InternalAcquireResult internalAcquire1Lease(ImmutableList.Builder<Lease> builder, long startMs, boolean hasWait, long waitMs) throws Exception
+ private InternalAcquireResult internalAcquire1Lease(ImmutableList.Builder<Lease> builder, long startMs, boolean hasWait, long waitMs, String leaseBaseName, LeaseAcquirePredicate acquireFilter) throws Exception
{
if ( client.getState() != CuratorFrameworkState.STARTED )
{
@@ -347,8 +366,8 @@ public class InterProcessSemaphoreV2
try
{
PathAndBytesable<String> createBuilder = client.create().creatingParentsIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL);
- String path = (nodeData != null) ? createBuilder.forPath(ZKPaths.makePath(leasesPath, LEASE_BASE_NAME), nodeData) : createBuilder.forPath(ZKPaths.makePath(leasesPath, LEASE_BASE_NAME));
- String nodeName = ZKPaths.getNodeFromPath(path);
+ String path = (nodeData != null) ? createBuilder.forPath(ZKPaths.makePath(leasesPath, leaseBaseName), nodeData) : createBuilder.forPath(ZKPaths.makePath(leasesPath, leaseBaseName));
+ String ourNodeName = ZKPaths.getNodeFromPath(path);
builder.add(makeLease(path));
synchronized(this)
@@ -356,13 +375,13 @@ public class InterProcessSemaphoreV2
for(;;)
{
List<String> children = client.getChildren().usingWatcher(watcher).forPath(leasesPath);
- if ( !children.contains(nodeName) )
+ if ( !children.contains(ourNodeName) )
{
log.error("Sequential path not found: " + path);
return InternalAcquireResult.RETRY_DUE_TO_MISSING_NODE;
}
- if ( children.size() <= maxLeases )
+ if ( acquireFilter.shouldGetLease(ourNodeName, children, maxLeases) )
{
break;
}
http://git-wip-us.apache.org/repos/asf/curator/blob/9f757b33/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessMultiMutex.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessMultiMutex.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessMultiMutex.java
index b1631a0..b6a9f42 100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessMultiMutex.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessMultiMutex.java
@@ -25,6 +25,7 @@ import org.testng.Assert;
import org.testng.annotations.Test;
import java.io.IOException;
import java.util.Arrays;
+import java.util.Collection;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -69,6 +70,12 @@ public class TestInterProcessMultiMutex extends TestInterProcessMutexBase
}
@Override
+ public Collection<String> getParticipantNodes() throws Exception
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
public boolean isAcquiredInThisProcess()
{
return otherGoodLock.isAcquiredInThisProcess();
@@ -116,6 +123,12 @@ public class TestInterProcessMultiMutex extends TestInterProcessMutexBase
}
@Override
+ public Collection<String> getParticipantNodes() throws Exception
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
public boolean acquire(long time, TimeUnit unit) throws Exception
{
throw new Exception("foo");
http://git-wip-us.apache.org/repos/asf/curator/blob/9f757b33/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessReadWriteLock.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessReadWriteLock.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessReadWriteLock.java
index 5b44c7c..91e1ce0 100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessReadWriteLock.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessReadWriteLock.java
@@ -16,277 +16,16 @@
* specific language governing permissions and limitations
* under the License.
*/
+
package org.apache.curator.framework.recipes.locks;
-import com.google.common.collect.Lists;
-import org.apache.curator.utils.CloseableUtils;
import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.CuratorFrameworkFactory;
-import org.apache.curator.framework.recipes.BaseClassForTests;
-import org.apache.curator.retry.RetryOneTime;
-import org.testng.Assert;
-import org.testng.annotations.Test;
-import java.util.Collection;
-import java.util.List;
-import java.util.Random;
-import java.util.concurrent.Callable;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-public class TestInterProcessReadWriteLock extends BaseClassForTests
+public class TestInterProcessReadWriteLock extends TestInterProcessReadWriteLockBase
{
- @Test
- public void testGetParticipantNodes() throws Exception
- {
- final int READERS = 20;
- final int WRITERS = 8;
-
- CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1));
- try
- {
- client.start();
-
- final CountDownLatch latch = new CountDownLatch(READERS + WRITERS);
- final CountDownLatch readLatch = new CountDownLatch(READERS);
- final InterProcessReadWriteLock lock = new InterProcessReadWriteLock(client, "/lock");
-
- ExecutorService service = Executors.newCachedThreadPool();
- for ( int i = 0; i < READERS; ++i )
- {
- service.submit
- (
- new Callable<Void>()
- {
- @Override
- public Void call() throws Exception
- {
- lock.readLock().acquire();
- latch.countDown();
- readLatch.countDown();
- return null;
- }
- }
- );
- }
- for ( int i = 0; i < WRITERS; ++i )
- {
- service.submit
- (
- new Callable<Void>()
- {
- @Override
- public Void call() throws Exception
- {
- Assert.assertTrue(readLatch.await(10, TimeUnit.SECONDS));
- latch.countDown(); // must be before as there can only be one writer
- lock.writeLock().acquire();
- return null;
- }
- }
- );
- }
-
- Assert.assertTrue(latch.await(10, TimeUnit.SECONDS));
-
- Collection<String> readers = lock.readLock().getParticipantNodes();
- Collection<String> writers = lock.writeLock().getParticipantNodes();
-
- Assert.assertEquals(readers.size(), READERS);
- Assert.assertEquals(writers.size(), WRITERS);
- }
- finally
- {
- CloseableUtils.closeQuietly(client);
- }
- }
-
- @Test
- public void testThatUpgradingIsDisallowed() throws Exception
- {
- CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1));
- try
- {
- client.start();
-
- InterProcessReadWriteLock lock = new InterProcessReadWriteLock(client, "/lock");
- lock.readLock().acquire();
- Assert.assertFalse(lock.writeLock().acquire(5, TimeUnit.SECONDS));
-
- lock.readLock().release();
- }
- finally
- {
- CloseableUtils.closeQuietly(client);
- }
- }
-
- @Test
- public void testThatDowngradingRespectsThreads() throws Exception
- {
- CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1));
- try
- {
- client.start();
-
- final InterProcessReadWriteLock lock = new InterProcessReadWriteLock(client, "/lock");
- ExecutorService t1 = Executors.newSingleThreadExecutor();
- ExecutorService t2 = Executors.newSingleThreadExecutor();
-
- final CountDownLatch latch = new CountDownLatch(1);
-
- Future<Object> f1 = t1.submit
- (
- new Callable<Object>()
- {
- @Override
- public Object call() throws Exception
- {
- lock.writeLock().acquire();
- latch.countDown();
- return null;
- }
- }
- );
-
- Future<Object> f2 = t2.submit
- (
- new Callable<Object>()
- {
- @Override
- public Object call() throws Exception
- {
- Assert.assertTrue(latch.await(10, TimeUnit.SECONDS));
- Assert.assertFalse(lock.readLock().acquire(5, TimeUnit.SECONDS));
- return null;
- }
- }
- );
-
- f1.get();
- f2.get();
- }
- finally
- {
- CloseableUtils.closeQuietly(client);
- }
- }
-
- @Test
- public void testDowngrading() throws Exception
- {
- CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1));
- try
- {
- client.start();
-
- InterProcessReadWriteLock lock = new InterProcessReadWriteLock(client, "/lock");
- lock.writeLock().acquire();
- Assert.assertTrue(lock.readLock().acquire(5, TimeUnit.SECONDS));
- lock.writeLock().release();
-
- lock.readLock().release();
- }
- finally
- {
- CloseableUtils.closeQuietly(client);
- }
- }
-
- @Test
- public void testBasic() throws Exception
+ @Override
+ protected InterProcessReadWriteLockBase newLock(CuratorFramework client, String path)
{
- final int CONCURRENCY = 8;
- final int ITERATIONS = 100;
-
- final Random random = new Random();
- final AtomicInteger concurrentCount = new AtomicInteger(0);
- final AtomicInteger maxConcurrentCount = new AtomicInteger(0);
- final AtomicInteger writeCount = new AtomicInteger(0);
- final AtomicInteger readCount = new AtomicInteger(0);
-
- List<Future<Void>> futures = Lists.newArrayList();
- ExecutorService service = Executors.newCachedThreadPool();
- for ( int i = 0; i < CONCURRENCY; ++i )
- {
- Future<Void> future = service.submit
- (
- new Callable<Void>()
- {
- @Override
- public Void call() throws Exception
- {
- CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1));
- client.start();
- try
- {
- InterProcessReadWriteLock lock = new InterProcessReadWriteLock(client, "/lock");
- for ( int i = 0; i < ITERATIONS; ++i )
- {
- if ( random.nextInt(100) < 10 )
- {
- doLocking(lock.writeLock(), concurrentCount, maxConcurrentCount, random, 1);
- writeCount.incrementAndGet();
- }
- else
- {
- doLocking(lock.readLock(), concurrentCount, maxConcurrentCount, random, Integer.MAX_VALUE);
- readCount.incrementAndGet();
- }
- }
- }
- finally
- {
- CloseableUtils.closeQuietly(client);
- }
- return null;
- }
- }
- );
- futures.add(future);
- }
-
- for ( Future<Void> future : futures )
- {
- future.get();
- }
-
- System.out.println("Writes: " + writeCount.get() + " - Reads: " + readCount.get() + " - Max Reads: " + maxConcurrentCount.get());
-
- Assert.assertTrue(writeCount.get() > 0);
- Assert.assertTrue(readCount.get() > 0);
- Assert.assertTrue(maxConcurrentCount.get() > 1);
- }
-
- private void doLocking(InterProcessLock lock, AtomicInteger concurrentCount, AtomicInteger maxConcurrentCount, Random random, int maxAllowed) throws Exception
- {
- try
- {
- Assert.assertTrue(lock.acquire(10, TimeUnit.SECONDS));
- int localConcurrentCount;
- synchronized(this)
- {
- localConcurrentCount = concurrentCount.incrementAndGet();
- if ( localConcurrentCount > maxConcurrentCount.get() )
- {
- maxConcurrentCount.set(localConcurrentCount);
- }
- }
-
- Assert.assertTrue(localConcurrentCount <= maxAllowed, "" + localConcurrentCount);
-
- Thread.sleep(random.nextInt(9) + 1);
- }
- finally
- {
- synchronized(this)
- {
- concurrentCount.decrementAndGet();
- lock.release();
- }
- }
+ return new InterProcessReadWriteLock(client, path);
}
}
http://git-wip-us.apache.org/repos/asf/curator/blob/9f757b33/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessReadWriteLockBase.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessReadWriteLockBase.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessReadWriteLockBase.java
new file mode 100644
index 0000000..81a6b14
--- /dev/null
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessReadWriteLockBase.java
@@ -0,0 +1,289 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.curator.framework.recipes.locks;
+
+import com.google.common.collect.Lists;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.framework.recipes.BaseClassForTests;
+import org.apache.curator.retry.RetryOneTime;
+import org.apache.curator.test.Timing;
+import org.apache.curator.utils.CloseableUtils;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+import java.util.Collection;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public abstract class TestInterProcessReadWriteLockBase extends BaseClassForTests
+{
+ @Test
+ public void testGetParticipantNodes() throws Exception
+ {
+ final int READERS = 20;
+ final int WRITERS = 8;
+
+ final CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1));
+ try
+ {
+ client.start();
+
+ final CountDownLatch latch = new CountDownLatch(READERS + WRITERS);
+ final CountDownLatch readLatch = new CountDownLatch(READERS);
+
+ ExecutorService service = Executors.newCachedThreadPool();
+ for ( int i = 0; i < READERS; ++i )
+ {
+ service.submit(new Callable<Void>()
+ {
+ @Override
+ public Void call() throws Exception
+ {
+ InterProcessReadWriteLockBase lock = newLock(client, "/lock");
+ lock.readLock().acquire();
+ latch.countDown();
+ readLatch.countDown();
+ return null;
+ }
+ });
+ }
+ for ( int i = 0; i < WRITERS; ++i )
+ {
+ service.submit(new Callable<Void>()
+ {
+ @Override
+ public Void call() throws Exception
+ {
+ InterProcessReadWriteLockBase lock = newLock(client, "/lock");
+ Assert.assertTrue(readLatch.await(10, TimeUnit.SECONDS));
+ latch.countDown(); // must be before as there can only be one writer
+ lock.writeLock().acquire();
+ return null;
+ }
+ });
+ }
+
+ Assert.assertTrue(latch.await(10, TimeUnit.SECONDS));
+ new Timing().sleepABit();
+
+ InterProcessReadWriteLockBase lock = newLock(client, "/lock");
+ Collection<String> readers = lock.readLock().getParticipantNodes();
+ Collection<String> writers = lock.writeLock().getParticipantNodes();
+
+ Assert.assertEquals(readers.size(), READERS);
+ Assert.assertEquals(writers.size(), WRITERS);
+ }
+ finally
+ {
+ CloseableUtils.closeQuietly(client);
+ }
+ }
+
+ @Test
+ public void testThatUpgradingIsDisallowed() throws Exception
+ {
+ CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1));
+ try
+ {
+ client.start();
+
+ InterProcessReadWriteLockBase lock = newLock(client, "/lock");
+ lock.readLock().acquire();
+ Assert.assertFalse(lock.writeLock().acquire(5, TimeUnit.SECONDS));
+
+ lock.readLock().release();
+ }
+ finally
+ {
+ CloseableUtils.closeQuietly(client);
+ }
+ }
+
+ @Test
+ public void testThatDowngradingRespectsThreads() throws Exception
+ {
+ CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1));
+ try
+ {
+ client.start();
+
+ final InterProcessReadWriteLockBase lock = newLock(client, "/lock");
+ ExecutorService t1 = Executors.newSingleThreadExecutor();
+ ExecutorService t2 = Executors.newSingleThreadExecutor();
+
+ final CountDownLatch latch = new CountDownLatch(1);
+
+ Future<Object> f1 = t1.submit(new Callable<Object>()
+ {
+ @Override
+ public Object call() throws Exception
+ {
+ lock.writeLock().acquire();
+ latch.countDown();
+ return null;
+ }
+ });
+
+ Future<Object> f2 = t2.submit(new Callable<Object>()
+ {
+ @Override
+ public Object call() throws Exception
+ {
+ Assert.assertTrue(latch.await(10, TimeUnit.SECONDS));
+ Assert.assertFalse(lock.readLock().acquire(5, TimeUnit.SECONDS));
+ return null;
+ }
+ });
+
+ f1.get();
+ f2.get();
+ }
+ finally
+ {
+ CloseableUtils.closeQuietly(client);
+ }
+ }
+
+ @Test
+ public void testDowngrading() throws Exception
+ {
+ CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1));
+ try
+ {
+ client.start();
+
+ InterProcessReadWriteLockBase lock = newLock(client, "/lock");
+ lock.writeLock().acquire();
+ Assert.assertTrue(lock.readLock().acquire(5, TimeUnit.SECONDS));
+ lock.writeLock().release();
+
+ lock.readLock().release();
+ }
+ finally
+ {
+ CloseableUtils.closeQuietly(client);
+ }
+ }
+
+ @Test
+ public void testBasic() throws Exception
+ {
+ final int CONCURRENCY = 8;
+ final int ITERATIONS = 100;
+
+ final Random random = new Random();
+ final AtomicInteger concurrentCount = new AtomicInteger(0);
+ final AtomicInteger maxConcurrentCount = new AtomicInteger(0);
+ final AtomicInteger writeCount = new AtomicInteger(0);
+ final AtomicInteger readCount = new AtomicInteger(0);
+
+ List<Future<Void>> futures = Lists.newArrayList();
+ ExecutorService service = Executors.newCachedThreadPool();
+ for ( int i = 0; i < CONCURRENCY; ++i )
+ {
+ Future<Void> future = service.submit(new Callable<Void>()
+ {
+ @Override
+ public Void call() throws Exception
+ {
+ CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1));
+ client.start();
+ try
+ {
+ InterProcessReadWriteLockBase lock = newLock(client, "/lock");
+ for ( int i = 0; i < ITERATIONS; ++i )
+ {
+ if ( random.nextInt(100) < 10 )
+ {
+ doLocking(lock.writeLock(), concurrentCount, maxConcurrentCount, random, 1);
+ writeCount.incrementAndGet();
+ }
+ else
+ {
+ doLocking(lock.readLock(), concurrentCount, maxConcurrentCount, random, Integer.MAX_VALUE);
+ readCount.incrementAndGet();
+ }
+ }
+ }
+ finally
+ {
+ CloseableUtils.closeQuietly(client);
+ }
+ return null;
+ }
+ });
+ futures.add(future);
+ }
+
+ for ( Future<Void> future : futures )
+ {
+ future.get();
+ }
+
+ System.out.println("Writes: " + writeCount.get() + " - Reads: " + readCount.get() + " - Max Reads: " + maxConcurrentCount.get());
+
+ Assert.assertTrue(writeCount.get() > 0);
+ Assert.assertTrue(readCount.get() > 0);
+ Assert.assertTrue(maxConcurrentCount.get() > 1);
+ }
+
+ protected abstract InterProcessReadWriteLockBase newLock(CuratorFramework client, String path);
+
+ private void doLocking(InterProcessLock lock, AtomicInteger concurrentCount, AtomicInteger maxConcurrentCount, Random random, int maxAllowed) throws Exception
+ {
+ boolean hasTheLock = false;
+ try
+ {
+ Assert.assertTrue(lock.acquire(10, TimeUnit.SECONDS));
+ hasTheLock = true;
+ int localConcurrentCount;
+ synchronized(this)
+ {
+ localConcurrentCount = concurrentCount.incrementAndGet();
+ if ( localConcurrentCount > maxConcurrentCount.get() )
+ {
+ maxConcurrentCount.set(localConcurrentCount);
+ }
+ }
+
+ Assert.assertTrue(localConcurrentCount <= maxAllowed, "" + localConcurrentCount);
+
+ Thread.sleep(random.nextInt(9) + 1);
+ }
+ finally
+ {
+ if ( hasTheLock )
+ {
+ synchronized(this)
+ {
+ concurrentCount.decrementAndGet();
+ lock.release();
+ }
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/curator/blob/9f757b33/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessSemaphoreReadWrite.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessSemaphoreReadWrite.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessSemaphoreReadWrite.java
new file mode 100644
index 0000000..7faa198
--- /dev/null
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessSemaphoreReadWrite.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.curator.framework.recipes.locks;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.testng.annotations.Test;
+
+public class TestInterProcessSemaphoreReadWrite
+{
+ @Test
+ public void testBasic() throws Exception
+ {
+ TestInterProcessReadWriteLockBase base = new TestInterProcessReadWriteLockBase()
+ {
+ @Override
+ protected InterProcessReadWriteLockBase newLock(CuratorFramework client, String path)
+ {
+ return new InterProcessSemaphoreReadWrite(client, path);
+ }
+ };
+
+ base.setup();
+ try
+ {
+ base.testBasic();
+ }
+ finally
+ {
+ base.teardown();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/curator/blob/9f757b33/curator-x-rest/src/test/java/org/apache/curator/x/rest/support/InterProcessLockBridge.java
----------------------------------------------------------------------
diff --git a/curator-x-rest/src/test/java/org/apache/curator/x/rest/support/InterProcessLockBridge.java b/curator-x-rest/src/test/java/org/apache/curator/x/rest/support/InterProcessLockBridge.java
index 1da27ba..8d273f1 100644
--- a/curator-x-rest/src/test/java/org/apache/curator/x/rest/support/InterProcessLockBridge.java
+++ b/curator-x-rest/src/test/java/org/apache/curator/x/rest/support/InterProcessLockBridge.java
@@ -29,6 +29,7 @@ import org.apache.curator.x.rest.entities.LockSpec;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import java.net.URI;
+import java.util.Collection;
import java.util.concurrent.TimeUnit;
public class InterProcessLockBridge implements InterProcessLock
@@ -92,6 +93,12 @@ public class InterProcessLockBridge implements InterProcessLock
}
@Override
+ public Collection<String> getParticipantNodes() throws Exception
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
public boolean isAcquiredInThisProcess()
{
return (id != null);
http://git-wip-us.apache.org/repos/asf/curator/blob/9f757b33/curator-x-rest/src/test/java/org/apache/curator/x/rest/support/InterProcessReadWriteLockBridge.java
----------------------------------------------------------------------
diff --git a/curator-x-rest/src/test/java/org/apache/curator/x/rest/support/InterProcessReadWriteLockBridge.java b/curator-x-rest/src/test/java/org/apache/curator/x/rest/support/InterProcessReadWriteLockBridge.java
index ea21f98..2f08aa1 100644
--- a/curator-x-rest/src/test/java/org/apache/curator/x/rest/support/InterProcessReadWriteLockBridge.java
+++ b/curator-x-rest/src/test/java/org/apache/curator/x/rest/support/InterProcessReadWriteLockBridge.java
@@ -28,6 +28,7 @@ import org.apache.curator.x.rest.entities.LockSpec;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import java.net.URI;
+import java.util.Collection;
import java.util.concurrent.TimeUnit;
public class InterProcessReadWriteLockBridge
@@ -108,6 +109,12 @@ public class InterProcessReadWriteLockBridge
}
@Override
+ public Collection<String> getParticipantNodes() throws Exception
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
public void release() throws Exception
{
String localId = id.get();
[2/2] git commit: ReadWriteLockResource must use
InterProcessSemaphoreReadWrite
Posted by ra...@apache.org.
ReadWriteLockResource must use InterProcessSemaphoreReadWrite
Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/ae481351
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/ae481351
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/ae481351
Branch: refs/heads/CURATOR-88
Commit: ae4813512aaa8b64e226d02d08a1450efd310a05
Parents: 9f757b3
Author: randgalt <ra...@apache.org>
Authored: Thu Mar 6 19:59:17 2014 -0500
Committer: randgalt <ra...@apache.org>
Committed: Thu Mar 6 19:59:17 2014 -0500
----------------------------------------------------------------------
.../x/rest/api/ReadWriteLockResource.java | 14 +-
.../apache/curator/x/rest/api/TestLocks.java | 309 +++++++++----------
2 files changed, 151 insertions(+), 172 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/curator/blob/ae481351/curator-x-rest/src/main/java/org/apache/curator/x/rest/api/ReadWriteLockResource.java
----------------------------------------------------------------------
diff --git a/curator-x-rest/src/main/java/org/apache/curator/x/rest/api/ReadWriteLockResource.java b/curator-x-rest/src/main/java/org/apache/curator/x/rest/api/ReadWriteLockResource.java
index b501d0e..8792323 100644
--- a/curator-x-rest/src/main/java/org/apache/curator/x/rest/api/ReadWriteLockResource.java
+++ b/curator-x-rest/src/main/java/org/apache/curator/x/rest/api/ReadWriteLockResource.java
@@ -18,8 +18,8 @@
*/
package org.apache.curator.x.rest.api;
-import org.apache.curator.framework.recipes.locks.InterProcessMutex;
-import org.apache.curator.framework.recipes.locks.InterProcessReadWriteLock;
+import org.apache.curator.framework.recipes.locks.InterProcessLock;
+import org.apache.curator.framework.recipes.locks.InterProcessSemaphoreReadWrite;
import org.apache.curator.x.rest.CuratorRestContext;
import org.apache.curator.x.rest.entities.Id;
import org.apache.curator.x.rest.entities.LockSpec;
@@ -69,24 +69,24 @@ public class ReadWriteLockResource
@Path("{lock-id}")
public Response releaseLock(@PathParam("lock-id") String lockId) throws Exception
{
- InterProcessMutex lock = Constants.deleteThing(context.getSession(), lockId, InterProcessMutex.class);
+ InterProcessLock lock = Constants.deleteThing(context.getSession(), lockId, InterProcessLock.class);
lock.release();
return Response.ok().build();
}
private Response internalLock(final LockSpec lockSpec, boolean writeLock) throws Exception
{
- InterProcessReadWriteLock lock = new InterProcessReadWriteLock(context.getClient(), lockSpec.getPath());
- InterProcessMutex actualLock = writeLock ? lock.writeLock() : lock.readLock();
+ InterProcessSemaphoreReadWrite lock = new InterProcessSemaphoreReadWrite(context.getClient(), lockSpec.getPath());
+ InterProcessLock actualLock = writeLock ? lock.writeLock() : lock.readLock();
if ( !actualLock.acquire(lockSpec.getMaxWaitMs(), TimeUnit.MILLISECONDS) )
{
return Response.status(Response.Status.SERVICE_UNAVAILABLE).build();
}
- Closer<InterProcessMutex> closer = new Closer<InterProcessMutex>()
+ Closer<InterProcessLock> closer = new Closer<InterProcessLock>()
{
@Override
- public void close(InterProcessMutex lock)
+ public void close(InterProcessLock lock)
{
if ( lock.isAcquiredInThisProcess() )
{
http://git-wip-us.apache.org/repos/asf/curator/blob/ae481351/curator-x-rest/src/test/java/org/apache/curator/x/rest/api/TestLocks.java
----------------------------------------------------------------------
diff --git a/curator-x-rest/src/test/java/org/apache/curator/x/rest/api/TestLocks.java b/curator-x-rest/src/test/java/org/apache/curator/x/rest/api/TestLocks.java
index 168329e..7ea6245 100644
--- a/curator-x-rest/src/test/java/org/apache/curator/x/rest/api/TestLocks.java
+++ b/curator-x-rest/src/test/java/org/apache/curator/x/rest/api/TestLocks.java
@@ -62,62 +62,56 @@ public class TestLocks extends BaseClassForTests
final AtomicReference<Exception> exceptionRef = new AtomicReference<Exception>();
ExecutorService service = Executors.newCachedThreadPool();
- Future<Object> future1 = service.submit
- (
- new Callable<Object>()
+ Future<Object> future1 = service.submit(new Callable<Object>()
+ {
+ @Override
+ public Object call() throws Exception
+ {
+ try
{
- @Override
- public Object call() throws Exception
+ if ( !mutexForClient1.acquire(10, TimeUnit.SECONDS) )
{
- try
- {
- if ( !mutexForClient1.acquire(10, TimeUnit.SECONDS) )
- {
- throw new Exception("mutexForClient1.acquire timed out");
- }
- acquiredLatchForClient1.countDown();
- if ( !latchForClient1.await(10, TimeUnit.SECONDS) )
- {
- throw new Exception("latchForClient1 timed out");
- }
- mutexForClient1.release();
- }
- catch ( Exception e )
- {
- exceptionRef.set(e);
- }
- return null;
+ throw new Exception("mutexForClient1.acquire timed out");
+ }
+ acquiredLatchForClient1.countDown();
+ if ( !latchForClient1.await(10, TimeUnit.SECONDS) )
+ {
+ throw new Exception("latchForClient1 timed out");
}
+ mutexForClient1.release();
}
- );
- Future<Object> future2 = service.submit
- (
- new Callable<Object>()
+ catch ( Exception e )
{
- @Override
- public Object call() throws Exception
+ exceptionRef.set(e);
+ }
+ return null;
+ }
+ });
+ Future<Object> future2 = service.submit(new Callable<Object>()
+ {
+ @Override
+ public Object call() throws Exception
+ {
+ try
+ {
+ if ( !mutexForClient2.acquire(10, TimeUnit.SECONDS) )
{
- try
- {
- if ( !mutexForClient2.acquire(10, TimeUnit.SECONDS) )
- {
- throw new Exception("mutexForClient2.acquire timed out");
- }
- acquiredLatchForClient2.countDown();
- if ( !latchForClient2.await(10, TimeUnit.SECONDS) )
- {
- throw new Exception("latchForClient2 timed out");
- }
- mutexForClient2.release();
- }
- catch ( Exception e )
- {
- exceptionRef.set(e);
- }
- return null;
+ throw new Exception("mutexForClient2.acquire timed out");
+ }
+ acquiredLatchForClient2.countDown();
+ if ( !latchForClient2.await(10, TimeUnit.SECONDS) )
+ {
+ throw new Exception("latchForClient2 timed out");
}
+ mutexForClient2.release();
}
- );
+ catch ( Exception e )
+ {
+ exceptionRef.set(e);
+ }
+ return null;
+ }
+ });
while ( !mutexForClient1.isAcquiredInThisProcess() && !mutexForClient2.isAcquiredInThisProcess() )
{
@@ -183,41 +177,38 @@ public class TestLocks extends BaseClassForTests
ExecutorCompletionService<Object> service = new ExecutorCompletionService<Object>(Executors.newFixedThreadPool(2));
for ( int i = 0; i < 2; ++i )
{
- service.submit
- (
- new Callable<Object>()
+ service.submit(new Callable<Object>()
+ {
+ @Override
+ public Object call() throws Exception
+ {
+ InterProcessLock lock = new InterProcessLockBridge(restClient, sessionManager, uriMaker, "/lock");
+ lock.acquire();
+ try
{
- @Override
- public Object call() throws Exception
+ if ( isFirst.compareAndSet(true, false) )
{
- InterProcessLock lock = new InterProcessLockBridge(restClient, sessionManager, uriMaker, "/lock");
- lock.acquire();
- try
- {
- if ( isFirst.compareAndSet(true, false) )
- {
- timing.sleepABit();
-
- server.stop();
- Assert.assertTrue(timing.awaitLatch(latch));
- server = new TestingServer(server.getPort(), server.getTempDirectory());
- }
- }
- finally
- {
- try
- {
- lock.release();
- }
- catch ( Exception e )
- {
- // ignore
- }
- }
- return null;
+ timing.sleepABit();
+
+ server.stop();
+ Assert.assertTrue(timing.awaitLatch(latch));
+ server = new TestingServer(server.getPort(), server.getTempDirectory());
}
}
- );
+ finally
+ {
+ try
+ {
+ lock.release();
+ }
+ catch ( Exception e )
+ {
+ // ignore
+ }
+ }
+ return null;
+ }
+ });
}
for ( int i = 0; i < 2; ++i )
@@ -236,35 +227,29 @@ public class TestLocks extends BaseClassForTests
final Semaphore semaphore = new Semaphore(0);
ExecutorCompletionService<Object> service = new ExecutorCompletionService<Object>(Executors.newFixedThreadPool(2));
- service.submit
- (
- new Callable<Object>()
- {
- @Override
- public Object call() throws Exception
- {
- mutex1.acquire();
- semaphore.release();
- Thread.sleep(1000000);
- return null;
- }
- }
- );
+ service.submit(new Callable<Object>()
+ {
+ @Override
+ public Object call() throws Exception
+ {
+ mutex1.acquire();
+ semaphore.release();
+ Thread.sleep(1000000);
+ return null;
+ }
+ });
- service.submit
- (
- new Callable<Object>()
- {
- @Override
- public Object call() throws Exception
- {
- mutex2.acquire();
- semaphore.release();
- Thread.sleep(1000000);
- return null;
- }
- }
- );
+ service.submit(new Callable<Object>()
+ {
+ @Override
+ public Object call() throws Exception
+ {
+ mutex2.acquire();
+ semaphore.release();
+ Thread.sleep(1000000);
+ return null;
+ }
+ });
Assert.assertTrue(timing.acquireSemaphore(semaphore, 1));
KillSession.kill(getCuratorRestContext().getClient().getZookeeperClient().getZooKeeper(), server.getConnectString());
@@ -285,40 +270,37 @@ public class TestLocks extends BaseClassForTests
for ( int i = 0; i < THREAD_QTY; ++i )
{
final InterProcessLock mutex = new InterProcessLockBridge(restClient, sessionManager, uriMaker, "/lock");
- Future<Object> t = service.submit
- (
- new Callable<Object>()
+ Future<Object> t = service.submit(new Callable<Object>()
+ {
+ @Override
+ public Object call() throws Exception
+ {
+ semaphore.acquire();
+ mutex.acquire();
+ Assert.assertTrue(hasLock.compareAndSet(false, true));
+ try
{
- @Override
- public Object call() throws Exception
+ if ( isFirst.compareAndSet(true, false) )
{
- semaphore.acquire();
- mutex.acquire();
- Assert.assertTrue(hasLock.compareAndSet(false, true));
- try
+ semaphore.release(THREAD_QTY - 1);
+ while ( semaphore.availablePermits() > 0 )
{
- if ( isFirst.compareAndSet(true, false) )
- {
- semaphore.release(THREAD_QTY - 1);
- while ( semaphore.availablePermits() > 0 )
- {
- Thread.sleep(100);
- }
- }
- else
- {
- Thread.sleep(100);
- }
+ Thread.sleep(100);
}
- finally
- {
- mutex.release();
- hasLock.set(false);
- }
- return null;
+ }
+ else
+ {
+ Thread.sleep(100);
}
}
- );
+ finally
+ {
+ mutex.release();
+ hasLock.set(false);
+ }
+ return null;
+ }
+ });
threads.add(t);
}
@@ -329,46 +311,43 @@ public class TestLocks extends BaseClassForTests
}
@Test
- public void testBasicReadWriteLock() throws Exception
+ public void testBasicReadWriteLock() throws Exception
{
- final int CONCURRENCY = 8;
- final int ITERATIONS = 100;
+ final int CONCURRENCY = 8;
+ final int ITERATIONS = 100;
final Random random = new Random();
final AtomicInteger concurrentCount = new AtomicInteger(0);
- final AtomicInteger maxConcurrentCount = new AtomicInteger(0);
- final AtomicInteger writeCount = new AtomicInteger(0);
- final AtomicInteger readCount = new AtomicInteger(0);
+ final AtomicInteger maxConcurrentCount = new AtomicInteger(0);
+ final AtomicInteger writeCount = new AtomicInteger(0);
+ final AtomicInteger readCount = new AtomicInteger(0);
- List<Future<Void>> futures = Lists.newArrayList();
- ExecutorService service = Executors.newCachedThreadPool();
+ List<Future<Void>> futures = Lists.newArrayList();
+ ExecutorService service = Executors.newCachedThreadPool();
for ( int i = 0; i < CONCURRENCY; ++i )
{
- Future<Void> future = service.submit
- (
- new Callable<Void>()
+ Future<Void> future = service.submit(new Callable<Void>()
+ {
+ @Override
+ public Void call() throws Exception
+ {
+ InterProcessReadWriteLockBridge lock = new InterProcessReadWriteLockBridge(restClient, sessionManager, uriMaker, "/lock");
+ for ( int i = 0; i < ITERATIONS; ++i )
{
- @Override
- public Void call() throws Exception
+ if ( random.nextInt(100) < 10 )
{
- InterProcessReadWriteLockBridge lock = new InterProcessReadWriteLockBridge(restClient, sessionManager, uriMaker, "/lock");
- for ( int i = 0; i < ITERATIONS; ++i )
- {
- if ( random.nextInt(100) < 10 )
- {
- doLocking(lock.writeLock(), concurrentCount, maxConcurrentCount, random, 1);
- writeCount.incrementAndGet();
- }
- else
- {
- doLocking(lock.readLock(), concurrentCount, maxConcurrentCount, random, Integer.MAX_VALUE);
- readCount.incrementAndGet();
- }
- }
- return null;
+ doLocking(lock.writeLock(), concurrentCount, maxConcurrentCount, random, 1);
+ writeCount.incrementAndGet();
+ }
+ else
+ {
+ doLocking(lock.readLock(), concurrentCount, maxConcurrentCount, random, Integer.MAX_VALUE);
+ readCount.incrementAndGet();
}
}
- );
+ return null;
+ }
+ });
futures.add(future);
}
@@ -389,7 +368,7 @@ public class TestLocks extends BaseClassForTests
try
{
Assert.assertTrue(lock.acquire(10, TimeUnit.SECONDS));
- int localConcurrentCount;
+ int localConcurrentCount;
synchronized(this)
{
localConcurrentCount = concurrentCount.incrementAndGet();