You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@curator.apache.org by ra...@apache.org on 2014/03/07 01:59:31 UTC

[2/2] git commit: ReadWriteLockResource must use InterProcessSemaphoreReadWrite

ReadWriteLockResource must use InterProcessSemaphoreReadWrite


Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/ae481351
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/ae481351
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/ae481351

Branch: refs/heads/CURATOR-88
Commit: ae4813512aaa8b64e226d02d08a1450efd310a05
Parents: 9f757b3
Author: randgalt <ra...@apache.org>
Authored: Thu Mar 6 19:59:17 2014 -0500
Committer: randgalt <ra...@apache.org>
Committed: Thu Mar 6 19:59:17 2014 -0500

----------------------------------------------------------------------
 .../x/rest/api/ReadWriteLockResource.java       |  14 +-
 .../apache/curator/x/rest/api/TestLocks.java    | 309 +++++++++----------
 2 files changed, 151 insertions(+), 172 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/curator/blob/ae481351/curator-x-rest/src/main/java/org/apache/curator/x/rest/api/ReadWriteLockResource.java
----------------------------------------------------------------------
diff --git a/curator-x-rest/src/main/java/org/apache/curator/x/rest/api/ReadWriteLockResource.java b/curator-x-rest/src/main/java/org/apache/curator/x/rest/api/ReadWriteLockResource.java
index b501d0e..8792323 100644
--- a/curator-x-rest/src/main/java/org/apache/curator/x/rest/api/ReadWriteLockResource.java
+++ b/curator-x-rest/src/main/java/org/apache/curator/x/rest/api/ReadWriteLockResource.java
@@ -18,8 +18,8 @@
  */
 package org.apache.curator.x.rest.api;
 
-import org.apache.curator.framework.recipes.locks.InterProcessMutex;
-import org.apache.curator.framework.recipes.locks.InterProcessReadWriteLock;
+import org.apache.curator.framework.recipes.locks.InterProcessLock;
+import org.apache.curator.framework.recipes.locks.InterProcessSemaphoreReadWrite;
 import org.apache.curator.x.rest.CuratorRestContext;
 import org.apache.curator.x.rest.entities.Id;
 import org.apache.curator.x.rest.entities.LockSpec;
@@ -69,24 +69,24 @@ public class ReadWriteLockResource
     @Path("{lock-id}")
     public Response releaseLock(@PathParam("lock-id") String lockId) throws Exception
     {
-        InterProcessMutex lock = Constants.deleteThing(context.getSession(), lockId, InterProcessMutex.class);
+        InterProcessLock lock = Constants.deleteThing(context.getSession(), lockId, InterProcessLock.class);
         lock.release();
         return Response.ok().build();
     }
 
     private Response internalLock(final LockSpec lockSpec, boolean writeLock) throws Exception
     {
-        InterProcessReadWriteLock lock = new InterProcessReadWriteLock(context.getClient(), lockSpec.getPath());
-        InterProcessMutex actualLock = writeLock ? lock.writeLock() : lock.readLock();
+        InterProcessSemaphoreReadWrite lock = new InterProcessSemaphoreReadWrite(context.getClient(), lockSpec.getPath());
+        InterProcessLock actualLock = writeLock ? lock.writeLock() : lock.readLock();
         if ( !actualLock.acquire(lockSpec.getMaxWaitMs(), TimeUnit.MILLISECONDS) )
         {
             return Response.status(Response.Status.SERVICE_UNAVAILABLE).build();
         }
 
-        Closer<InterProcessMutex> closer = new Closer<InterProcessMutex>()
+        Closer<InterProcessLock> closer = new Closer<InterProcessLock>()
         {
             @Override
-            public void close(InterProcessMutex lock)
+            public void close(InterProcessLock lock)
             {
                 if ( lock.isAcquiredInThisProcess() )
                 {

http://git-wip-us.apache.org/repos/asf/curator/blob/ae481351/curator-x-rest/src/test/java/org/apache/curator/x/rest/api/TestLocks.java
----------------------------------------------------------------------
diff --git a/curator-x-rest/src/test/java/org/apache/curator/x/rest/api/TestLocks.java b/curator-x-rest/src/test/java/org/apache/curator/x/rest/api/TestLocks.java
index 168329e..7ea6245 100644
--- a/curator-x-rest/src/test/java/org/apache/curator/x/rest/api/TestLocks.java
+++ b/curator-x-rest/src/test/java/org/apache/curator/x/rest/api/TestLocks.java
@@ -62,62 +62,56 @@ public class TestLocks extends BaseClassForTests
         final AtomicReference<Exception> exceptionRef = new AtomicReference<Exception>();
 
         ExecutorService service = Executors.newCachedThreadPool();
-        Future<Object> future1 = service.submit
-            (
-                new Callable<Object>()
+        Future<Object> future1 = service.submit(new Callable<Object>()
+        {
+            @Override
+            public Object call() throws Exception
+            {
+                try
                 {
-                    @Override
-                    public Object call() throws Exception
+                    if ( !mutexForClient1.acquire(10, TimeUnit.SECONDS) )
                     {
-                        try
-                        {
-                            if ( !mutexForClient1.acquire(10, TimeUnit.SECONDS) )
-                            {
-                                throw new Exception("mutexForClient1.acquire timed out");
-                            }
-                            acquiredLatchForClient1.countDown();
-                            if ( !latchForClient1.await(10, TimeUnit.SECONDS) )
-                            {
-                                throw new Exception("latchForClient1 timed out");
-                            }
-                            mutexForClient1.release();
-                        }
-                        catch ( Exception e )
-                        {
-                            exceptionRef.set(e);
-                        }
-                        return null;
+                        throw new Exception("mutexForClient1.acquire timed out");
+                    }
+                    acquiredLatchForClient1.countDown();
+                    if ( !latchForClient1.await(10, TimeUnit.SECONDS) )
+                    {
+                        throw new Exception("latchForClient1 timed out");
                     }
+                    mutexForClient1.release();
                 }
-            );
-        Future<Object> future2 = service.submit
-            (
-                new Callable<Object>()
+                catch ( Exception e )
                 {
-                    @Override
-                    public Object call() throws Exception
+                    exceptionRef.set(e);
+                }
+                return null;
+            }
+        });
+        Future<Object> future2 = service.submit(new Callable<Object>()
+        {
+            @Override
+            public Object call() throws Exception
+            {
+                try
+                {
+                    if ( !mutexForClient2.acquire(10, TimeUnit.SECONDS) )
                     {
-                        try
-                        {
-                            if ( !mutexForClient2.acquire(10, TimeUnit.SECONDS) )
-                            {
-                                throw new Exception("mutexForClient2.acquire timed out");
-                            }
-                            acquiredLatchForClient2.countDown();
-                            if ( !latchForClient2.await(10, TimeUnit.SECONDS) )
-                            {
-                                throw new Exception("latchForClient2 timed out");
-                            }
-                            mutexForClient2.release();
-                        }
-                        catch ( Exception e )
-                        {
-                            exceptionRef.set(e);
-                        }
-                        return null;
+                        throw new Exception("mutexForClient2.acquire timed out");
+                    }
+                    acquiredLatchForClient2.countDown();
+                    if ( !latchForClient2.await(10, TimeUnit.SECONDS) )
+                    {
+                        throw new Exception("latchForClient2 timed out");
                     }
+                    mutexForClient2.release();
                 }
-            );
+                catch ( Exception e )
+                {
+                    exceptionRef.set(e);
+                }
+                return null;
+            }
+        });
 
         while ( !mutexForClient1.isAcquiredInThisProcess() && !mutexForClient2.isAcquiredInThisProcess() )
         {
@@ -183,41 +177,38 @@ public class TestLocks extends BaseClassForTests
         ExecutorCompletionService<Object> service = new ExecutorCompletionService<Object>(Executors.newFixedThreadPool(2));
         for ( int i = 0; i < 2; ++i )
         {
-            service.submit
-                (
-                    new Callable<Object>()
+            service.submit(new Callable<Object>()
+            {
+                @Override
+                public Object call() throws Exception
+                {
+                    InterProcessLock lock = new InterProcessLockBridge(restClient, sessionManager, uriMaker, "/lock");
+                    lock.acquire();
+                    try
                     {
-                        @Override
-                        public Object call() throws Exception
+                        if ( isFirst.compareAndSet(true, false) )
                         {
-                            InterProcessLock lock = new InterProcessLockBridge(restClient, sessionManager, uriMaker, "/lock");
-                            lock.acquire();
-                            try
-                            {
-                                if ( isFirst.compareAndSet(true, false) )
-                                {
-                                    timing.sleepABit();
-
-                                    server.stop();
-                                    Assert.assertTrue(timing.awaitLatch(latch));
-                                    server = new TestingServer(server.getPort(), server.getTempDirectory());
-                                }
-                            }
-                            finally
-                            {
-                                try
-                                {
-                                    lock.release();
-                                }
-                                catch ( Exception e )
-                                {
-                                    // ignore
-                                }
-                            }
-                            return null;
+                            timing.sleepABit();
+
+                            server.stop();
+                            Assert.assertTrue(timing.awaitLatch(latch));
+                            server = new TestingServer(server.getPort(), server.getTempDirectory());
                         }
                     }
-                );
+                    finally
+                    {
+                        try
+                        {
+                            lock.release();
+                        }
+                        catch ( Exception e )
+                        {
+                            // ignore
+                        }
+                    }
+                    return null;
+                }
+            });
         }
 
         for ( int i = 0; i < 2; ++i )
@@ -236,35 +227,29 @@ public class TestLocks extends BaseClassForTests
 
         final Semaphore semaphore = new Semaphore(0);
         ExecutorCompletionService<Object> service = new ExecutorCompletionService<Object>(Executors.newFixedThreadPool(2));
-        service.submit
-            (
-                new Callable<Object>()
-                {
-                    @Override
-                    public Object call() throws Exception
-                    {
-                        mutex1.acquire();
-                        semaphore.release();
-                        Thread.sleep(1000000);
-                        return null;
-                    }
-                }
-            );
+        service.submit(new Callable<Object>()
+        {
+            @Override
+            public Object call() throws Exception
+            {
+                mutex1.acquire();
+                semaphore.release();
+                Thread.sleep(1000000);
+                return null;
+            }
+        });
 
-        service.submit
-            (
-                new Callable<Object>()
-                {
-                    @Override
-                    public Object call() throws Exception
-                    {
-                        mutex2.acquire();
-                        semaphore.release();
-                        Thread.sleep(1000000);
-                        return null;
-                    }
-                }
-            );
+        service.submit(new Callable<Object>()
+        {
+            @Override
+            public Object call() throws Exception
+            {
+                mutex2.acquire();
+                semaphore.release();
+                Thread.sleep(1000000);
+                return null;
+            }
+        });
 
         Assert.assertTrue(timing.acquireSemaphore(semaphore, 1));
         KillSession.kill(getCuratorRestContext().getClient().getZookeeperClient().getZooKeeper(), server.getConnectString());
@@ -285,40 +270,37 @@ public class TestLocks extends BaseClassForTests
         for ( int i = 0; i < THREAD_QTY; ++i )
         {
             final InterProcessLock mutex = new InterProcessLockBridge(restClient, sessionManager, uriMaker, "/lock");
-            Future<Object> t = service.submit
-                (
-                    new Callable<Object>()
+            Future<Object> t = service.submit(new Callable<Object>()
+            {
+                @Override
+                public Object call() throws Exception
+                {
+                    semaphore.acquire();
+                    mutex.acquire();
+                    Assert.assertTrue(hasLock.compareAndSet(false, true));
+                    try
                     {
-                        @Override
-                        public Object call() throws Exception
+                        if ( isFirst.compareAndSet(true, false) )
                         {
-                            semaphore.acquire();
-                            mutex.acquire();
-                            Assert.assertTrue(hasLock.compareAndSet(false, true));
-                            try
+                            semaphore.release(THREAD_QTY - 1);
+                            while ( semaphore.availablePermits() > 0 )
                             {
-                                if ( isFirst.compareAndSet(true, false) )
-                                {
-                                    semaphore.release(THREAD_QTY - 1);
-                                    while ( semaphore.availablePermits() > 0 )
-                                    {
-                                        Thread.sleep(100);
-                                    }
-                                }
-                                else
-                                {
-                                    Thread.sleep(100);
-                                }
+                                Thread.sleep(100);
                             }
-                            finally
-                            {
-                                mutex.release();
-                                hasLock.set(false);
-                            }
-                            return null;
+                        }
+                        else
+                        {
+                            Thread.sleep(100);
                         }
                     }
-                );
+                    finally
+                    {
+                        mutex.release();
+                        hasLock.set(false);
+                    }
+                    return null;
+                }
+            });
             threads.add(t);
         }
 
@@ -329,46 +311,43 @@ public class TestLocks extends BaseClassForTests
     }
 
     @Test
-    public void     testBasicReadWriteLock() throws Exception
+    public void testBasicReadWriteLock() throws Exception
     {
-        final int               CONCURRENCY = 8;
-        final int               ITERATIONS = 100;
+        final int CONCURRENCY = 8;
+        final int ITERATIONS = 100;
 
         final Random random = new Random();
         final AtomicInteger concurrentCount = new AtomicInteger(0);
-        final AtomicInteger     maxConcurrentCount = new AtomicInteger(0);
-        final AtomicInteger     writeCount = new AtomicInteger(0);
-        final AtomicInteger     readCount = new AtomicInteger(0);
+        final AtomicInteger maxConcurrentCount = new AtomicInteger(0);
+        final AtomicInteger writeCount = new AtomicInteger(0);
+        final AtomicInteger readCount = new AtomicInteger(0);
 
-        List<Future<Void>>  futures = Lists.newArrayList();
-        ExecutorService     service = Executors.newCachedThreadPool();
+        List<Future<Void>> futures = Lists.newArrayList();
+        ExecutorService service = Executors.newCachedThreadPool();
         for ( int i = 0; i < CONCURRENCY; ++i )
         {
-            Future<Void>    future = service.submit
-                (
-                    new Callable<Void>()
+            Future<Void> future = service.submit(new Callable<Void>()
+            {
+                @Override
+                public Void call() throws Exception
+                {
+                    InterProcessReadWriteLockBridge lock = new InterProcessReadWriteLockBridge(restClient, sessionManager, uriMaker, "/lock");
+                    for ( int i = 0; i < ITERATIONS; ++i )
                     {
-                        @Override
-                        public Void call() throws Exception
+                        if ( random.nextInt(100) < 10 )
                         {
-                            InterProcessReadWriteLockBridge lock = new InterProcessReadWriteLockBridge(restClient, sessionManager, uriMaker, "/lock");
-                            for ( int i = 0; i < ITERATIONS; ++i )
-                            {
-                                if ( random.nextInt(100) < 10 )
-                                {
-                                    doLocking(lock.writeLock(), concurrentCount, maxConcurrentCount, random, 1);
-                                    writeCount.incrementAndGet();
-                                }
-                                else
-                                {
-                                    doLocking(lock.readLock(), concurrentCount, maxConcurrentCount, random, Integer.MAX_VALUE);
-                                    readCount.incrementAndGet();
-                                }
-                            }
-                            return null;
+                            doLocking(lock.writeLock(), concurrentCount, maxConcurrentCount, random, 1);
+                            writeCount.incrementAndGet();
+                        }
+                        else
+                        {
+                            doLocking(lock.readLock(), concurrentCount, maxConcurrentCount, random, Integer.MAX_VALUE);
+                            readCount.incrementAndGet();
                         }
                     }
-                );
+                    return null;
+                }
+            });
             futures.add(future);
         }
 
@@ -389,7 +368,7 @@ public class TestLocks extends BaseClassForTests
         try
         {
             Assert.assertTrue(lock.acquire(10, TimeUnit.SECONDS));
-            int     localConcurrentCount;
+            int localConcurrentCount;
             synchronized(this)
             {
                 localConcurrentCount = concurrentCount.incrementAndGet();