You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@curator.apache.org by ra...@apache.org on 2014/03/07 01:59:31 UTC
[2/2] git commit: ReadWriteLockResource must use
InterProcessSemaphoreReadWrite
ReadWriteLockResource must use InterProcessSemaphoreReadWrite
Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/ae481351
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/ae481351
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/ae481351
Branch: refs/heads/CURATOR-88
Commit: ae4813512aaa8b64e226d02d08a1450efd310a05
Parents: 9f757b3
Author: randgalt <ra...@apache.org>
Authored: Thu Mar 6 19:59:17 2014 -0500
Committer: randgalt <ra...@apache.org>
Committed: Thu Mar 6 19:59:17 2014 -0500
----------------------------------------------------------------------
.../x/rest/api/ReadWriteLockResource.java | 14 +-
.../apache/curator/x/rest/api/TestLocks.java | 309 +++++++++----------
2 files changed, 151 insertions(+), 172 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/curator/blob/ae481351/curator-x-rest/src/main/java/org/apache/curator/x/rest/api/ReadWriteLockResource.java
----------------------------------------------------------------------
diff --git a/curator-x-rest/src/main/java/org/apache/curator/x/rest/api/ReadWriteLockResource.java b/curator-x-rest/src/main/java/org/apache/curator/x/rest/api/ReadWriteLockResource.java
index b501d0e..8792323 100644
--- a/curator-x-rest/src/main/java/org/apache/curator/x/rest/api/ReadWriteLockResource.java
+++ b/curator-x-rest/src/main/java/org/apache/curator/x/rest/api/ReadWriteLockResource.java
@@ -18,8 +18,8 @@
*/
package org.apache.curator.x.rest.api;
-import org.apache.curator.framework.recipes.locks.InterProcessMutex;
-import org.apache.curator.framework.recipes.locks.InterProcessReadWriteLock;
+import org.apache.curator.framework.recipes.locks.InterProcessLock;
+import org.apache.curator.framework.recipes.locks.InterProcessSemaphoreReadWrite;
import org.apache.curator.x.rest.CuratorRestContext;
import org.apache.curator.x.rest.entities.Id;
import org.apache.curator.x.rest.entities.LockSpec;
@@ -69,24 +69,24 @@ public class ReadWriteLockResource
@Path("{lock-id}")
public Response releaseLock(@PathParam("lock-id") String lockId) throws Exception
{
- InterProcessMutex lock = Constants.deleteThing(context.getSession(), lockId, InterProcessMutex.class);
+ InterProcessLock lock = Constants.deleteThing(context.getSession(), lockId, InterProcessLock.class);
lock.release();
return Response.ok().build();
}
private Response internalLock(final LockSpec lockSpec, boolean writeLock) throws Exception
{
- InterProcessReadWriteLock lock = new InterProcessReadWriteLock(context.getClient(), lockSpec.getPath());
- InterProcessMutex actualLock = writeLock ? lock.writeLock() : lock.readLock();
+ InterProcessSemaphoreReadWrite lock = new InterProcessSemaphoreReadWrite(context.getClient(), lockSpec.getPath());
+ InterProcessLock actualLock = writeLock ? lock.writeLock() : lock.readLock();
if ( !actualLock.acquire(lockSpec.getMaxWaitMs(), TimeUnit.MILLISECONDS) )
{
return Response.status(Response.Status.SERVICE_UNAVAILABLE).build();
}
- Closer<InterProcessMutex> closer = new Closer<InterProcessMutex>()
+ Closer<InterProcessLock> closer = new Closer<InterProcessLock>()
{
@Override
- public void close(InterProcessMutex lock)
+ public void close(InterProcessLock lock)
{
if ( lock.isAcquiredInThisProcess() )
{
http://git-wip-us.apache.org/repos/asf/curator/blob/ae481351/curator-x-rest/src/test/java/org/apache/curator/x/rest/api/TestLocks.java
----------------------------------------------------------------------
diff --git a/curator-x-rest/src/test/java/org/apache/curator/x/rest/api/TestLocks.java b/curator-x-rest/src/test/java/org/apache/curator/x/rest/api/TestLocks.java
index 168329e..7ea6245 100644
--- a/curator-x-rest/src/test/java/org/apache/curator/x/rest/api/TestLocks.java
+++ b/curator-x-rest/src/test/java/org/apache/curator/x/rest/api/TestLocks.java
@@ -62,62 +62,56 @@ public class TestLocks extends BaseClassForTests
final AtomicReference<Exception> exceptionRef = new AtomicReference<Exception>();
ExecutorService service = Executors.newCachedThreadPool();
- Future<Object> future1 = service.submit
- (
- new Callable<Object>()
+ Future<Object> future1 = service.submit(new Callable<Object>()
+ {
+ @Override
+ public Object call() throws Exception
+ {
+ try
{
- @Override
- public Object call() throws Exception
+ if ( !mutexForClient1.acquire(10, TimeUnit.SECONDS) )
{
- try
- {
- if ( !mutexForClient1.acquire(10, TimeUnit.SECONDS) )
- {
- throw new Exception("mutexForClient1.acquire timed out");
- }
- acquiredLatchForClient1.countDown();
- if ( !latchForClient1.await(10, TimeUnit.SECONDS) )
- {
- throw new Exception("latchForClient1 timed out");
- }
- mutexForClient1.release();
- }
- catch ( Exception e )
- {
- exceptionRef.set(e);
- }
- return null;
+ throw new Exception("mutexForClient1.acquire timed out");
+ }
+ acquiredLatchForClient1.countDown();
+ if ( !latchForClient1.await(10, TimeUnit.SECONDS) )
+ {
+ throw new Exception("latchForClient1 timed out");
}
+ mutexForClient1.release();
}
- );
- Future<Object> future2 = service.submit
- (
- new Callable<Object>()
+ catch ( Exception e )
{
- @Override
- public Object call() throws Exception
+ exceptionRef.set(e);
+ }
+ return null;
+ }
+ });
+ Future<Object> future2 = service.submit(new Callable<Object>()
+ {
+ @Override
+ public Object call() throws Exception
+ {
+ try
+ {
+ if ( !mutexForClient2.acquire(10, TimeUnit.SECONDS) )
{
- try
- {
- if ( !mutexForClient2.acquire(10, TimeUnit.SECONDS) )
- {
- throw new Exception("mutexForClient2.acquire timed out");
- }
- acquiredLatchForClient2.countDown();
- if ( !latchForClient2.await(10, TimeUnit.SECONDS) )
- {
- throw new Exception("latchForClient2 timed out");
- }
- mutexForClient2.release();
- }
- catch ( Exception e )
- {
- exceptionRef.set(e);
- }
- return null;
+ throw new Exception("mutexForClient2.acquire timed out");
+ }
+ acquiredLatchForClient2.countDown();
+ if ( !latchForClient2.await(10, TimeUnit.SECONDS) )
+ {
+ throw new Exception("latchForClient2 timed out");
}
+ mutexForClient2.release();
}
- );
+ catch ( Exception e )
+ {
+ exceptionRef.set(e);
+ }
+ return null;
+ }
+ });
while ( !mutexForClient1.isAcquiredInThisProcess() && !mutexForClient2.isAcquiredInThisProcess() )
{
@@ -183,41 +177,38 @@ public class TestLocks extends BaseClassForTests
ExecutorCompletionService<Object> service = new ExecutorCompletionService<Object>(Executors.newFixedThreadPool(2));
for ( int i = 0; i < 2; ++i )
{
- service.submit
- (
- new Callable<Object>()
+ service.submit(new Callable<Object>()
+ {
+ @Override
+ public Object call() throws Exception
+ {
+ InterProcessLock lock = new InterProcessLockBridge(restClient, sessionManager, uriMaker, "/lock");
+ lock.acquire();
+ try
{
- @Override
- public Object call() throws Exception
+ if ( isFirst.compareAndSet(true, false) )
{
- InterProcessLock lock = new InterProcessLockBridge(restClient, sessionManager, uriMaker, "/lock");
- lock.acquire();
- try
- {
- if ( isFirst.compareAndSet(true, false) )
- {
- timing.sleepABit();
-
- server.stop();
- Assert.assertTrue(timing.awaitLatch(latch));
- server = new TestingServer(server.getPort(), server.getTempDirectory());
- }
- }
- finally
- {
- try
- {
- lock.release();
- }
- catch ( Exception e )
- {
- // ignore
- }
- }
- return null;
+ timing.sleepABit();
+
+ server.stop();
+ Assert.assertTrue(timing.awaitLatch(latch));
+ server = new TestingServer(server.getPort(), server.getTempDirectory());
}
}
- );
+ finally
+ {
+ try
+ {
+ lock.release();
+ }
+ catch ( Exception e )
+ {
+ // ignore
+ }
+ }
+ return null;
+ }
+ });
}
for ( int i = 0; i < 2; ++i )
@@ -236,35 +227,29 @@ public class TestLocks extends BaseClassForTests
final Semaphore semaphore = new Semaphore(0);
ExecutorCompletionService<Object> service = new ExecutorCompletionService<Object>(Executors.newFixedThreadPool(2));
- service.submit
- (
- new Callable<Object>()
- {
- @Override
- public Object call() throws Exception
- {
- mutex1.acquire();
- semaphore.release();
- Thread.sleep(1000000);
- return null;
- }
- }
- );
+ service.submit(new Callable<Object>()
+ {
+ @Override
+ public Object call() throws Exception
+ {
+ mutex1.acquire();
+ semaphore.release();
+ Thread.sleep(1000000);
+ return null;
+ }
+ });
- service.submit
- (
- new Callable<Object>()
- {
- @Override
- public Object call() throws Exception
- {
- mutex2.acquire();
- semaphore.release();
- Thread.sleep(1000000);
- return null;
- }
- }
- );
+ service.submit(new Callable<Object>()
+ {
+ @Override
+ public Object call() throws Exception
+ {
+ mutex2.acquire();
+ semaphore.release();
+ Thread.sleep(1000000);
+ return null;
+ }
+ });
Assert.assertTrue(timing.acquireSemaphore(semaphore, 1));
KillSession.kill(getCuratorRestContext().getClient().getZookeeperClient().getZooKeeper(), server.getConnectString());
@@ -285,40 +270,37 @@ public class TestLocks extends BaseClassForTests
for ( int i = 0; i < THREAD_QTY; ++i )
{
final InterProcessLock mutex = new InterProcessLockBridge(restClient, sessionManager, uriMaker, "/lock");
- Future<Object> t = service.submit
- (
- new Callable<Object>()
+ Future<Object> t = service.submit(new Callable<Object>()
+ {
+ @Override
+ public Object call() throws Exception
+ {
+ semaphore.acquire();
+ mutex.acquire();
+ Assert.assertTrue(hasLock.compareAndSet(false, true));
+ try
{
- @Override
- public Object call() throws Exception
+ if ( isFirst.compareAndSet(true, false) )
{
- semaphore.acquire();
- mutex.acquire();
- Assert.assertTrue(hasLock.compareAndSet(false, true));
- try
+ semaphore.release(THREAD_QTY - 1);
+ while ( semaphore.availablePermits() > 0 )
{
- if ( isFirst.compareAndSet(true, false) )
- {
- semaphore.release(THREAD_QTY - 1);
- while ( semaphore.availablePermits() > 0 )
- {
- Thread.sleep(100);
- }
- }
- else
- {
- Thread.sleep(100);
- }
+ Thread.sleep(100);
}
- finally
- {
- mutex.release();
- hasLock.set(false);
- }
- return null;
+ }
+ else
+ {
+ Thread.sleep(100);
}
}
- );
+ finally
+ {
+ mutex.release();
+ hasLock.set(false);
+ }
+ return null;
+ }
+ });
threads.add(t);
}
@@ -329,46 +311,43 @@ public class TestLocks extends BaseClassForTests
}
@Test
- public void testBasicReadWriteLock() throws Exception
+ public void testBasicReadWriteLock() throws Exception
{
- final int CONCURRENCY = 8;
- final int ITERATIONS = 100;
+ final int CONCURRENCY = 8;
+ final int ITERATIONS = 100;
final Random random = new Random();
final AtomicInteger concurrentCount = new AtomicInteger(0);
- final AtomicInteger maxConcurrentCount = new AtomicInteger(0);
- final AtomicInteger writeCount = new AtomicInteger(0);
- final AtomicInteger readCount = new AtomicInteger(0);
+ final AtomicInteger maxConcurrentCount = new AtomicInteger(0);
+ final AtomicInteger writeCount = new AtomicInteger(0);
+ final AtomicInteger readCount = new AtomicInteger(0);
- List<Future<Void>> futures = Lists.newArrayList();
- ExecutorService service = Executors.newCachedThreadPool();
+ List<Future<Void>> futures = Lists.newArrayList();
+ ExecutorService service = Executors.newCachedThreadPool();
for ( int i = 0; i < CONCURRENCY; ++i )
{
- Future<Void> future = service.submit
- (
- new Callable<Void>()
+ Future<Void> future = service.submit(new Callable<Void>()
+ {
+ @Override
+ public Void call() throws Exception
+ {
+ InterProcessReadWriteLockBridge lock = new InterProcessReadWriteLockBridge(restClient, sessionManager, uriMaker, "/lock");
+ for ( int i = 0; i < ITERATIONS; ++i )
{
- @Override
- public Void call() throws Exception
+ if ( random.nextInt(100) < 10 )
{
- InterProcessReadWriteLockBridge lock = new InterProcessReadWriteLockBridge(restClient, sessionManager, uriMaker, "/lock");
- for ( int i = 0; i < ITERATIONS; ++i )
- {
- if ( random.nextInt(100) < 10 )
- {
- doLocking(lock.writeLock(), concurrentCount, maxConcurrentCount, random, 1);
- writeCount.incrementAndGet();
- }
- else
- {
- doLocking(lock.readLock(), concurrentCount, maxConcurrentCount, random, Integer.MAX_VALUE);
- readCount.incrementAndGet();
- }
- }
- return null;
+ doLocking(lock.writeLock(), concurrentCount, maxConcurrentCount, random, 1);
+ writeCount.incrementAndGet();
+ }
+ else
+ {
+ doLocking(lock.readLock(), concurrentCount, maxConcurrentCount, random, Integer.MAX_VALUE);
+ readCount.incrementAndGet();
}
}
- );
+ return null;
+ }
+ });
futures.add(future);
}
@@ -389,7 +368,7 @@ public class TestLocks extends BaseClassForTests
try
{
Assert.assertTrue(lock.acquire(10, TimeUnit.SECONDS));
- int localConcurrentCount;
+ int localConcurrentCount;
synchronized(this)
{
localConcurrentCount = concurrentCount.incrementAndGet();