You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@curator.apache.org by ra...@apache.org on 2014/03/05 13:27:54 UTC

git commit: More testing - there are problems with r/w lock and threading. More to come.

Repository: curator
Updated Branches:
  refs/heads/CURATOR-88 6477a2f08 -> 79e55cf88


More testing - there are problems with r/w lock and threading. More to come.


Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/79e55cf8
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/79e55cf8
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/79e55cf8

Branch: refs/heads/CURATOR-88
Commit: 79e55cf882df32bc21490af7511f5f52f4566c24
Parents: 6477a2f
Author: randgalt <ra...@apache.org>
Authored: Wed Mar 5 07:27:47 2014 -0500
Committer: randgalt <ra...@apache.org>
Committed: Wed Mar 5 07:27:47 2014 -0500

----------------------------------------------------------------------
 .../apache/curator/x/rest/api/TestLocks.java    | 100 +++++++++++++-
 .../x/rest/support/InterProcessLockBridge.java  |   8 +-
 .../InterProcessReadWriteLockBridge.java        | 132 +++++++++++++++++++
 3 files changed, 230 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/curator/blob/79e55cf8/curator-x-rest/src/test/java/org/apache/curator/x/rest/api/TestLocks.java
----------------------------------------------------------------------
diff --git a/curator-x-rest/src/test/java/org/apache/curator/x/rest/api/TestLocks.java b/curator-x-rest/src/test/java/org/apache/curator/x/rest/api/TestLocks.java
index 7aa4585..168329e 100644
--- a/curator-x-rest/src/test/java/org/apache/curator/x/rest/api/TestLocks.java
+++ b/curator-x-rest/src/test/java/org/apache/curator/x/rest/api/TestLocks.java
@@ -28,10 +28,12 @@ import org.apache.curator.x.rest.entities.Status;
 import org.apache.curator.x.rest.entities.StatusMessage;
 import org.apache.curator.x.rest.support.BaseClassForTests;
 import org.apache.curator.x.rest.support.InterProcessLockBridge;
+import org.apache.curator.x.rest.support.InterProcessReadWriteLockBridge;
 import org.apache.curator.x.rest.support.StatusListener;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 import java.util.List;
+import java.util.Random;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorCompletionService;
@@ -41,6 +43,7 @@ import java.util.concurrent.Future;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 
 public class TestLocks extends BaseClassForTests
@@ -48,8 +51,8 @@ public class TestLocks extends BaseClassForTests
     @Test
     public void test2Clients() throws Exception
     {
-        final InterProcessLock mutexForClient1 = new InterProcessLockBridge(restClient, sessionManager, uriMaker);
-        final InterProcessLock mutexForClient2 = new InterProcessLockBridge(restClient, sessionManager, uriMaker);
+        final InterProcessLock mutexForClient1 = new InterProcessLockBridge(restClient, sessionManager, uriMaker, "/lock");
+        final InterProcessLock mutexForClient2 = new InterProcessLockBridge(restClient, sessionManager, uriMaker, "/lock");
 
         final CountDownLatch latchForClient1 = new CountDownLatch(1);
         final CountDownLatch latchForClient2 = new CountDownLatch(1);
@@ -187,7 +190,7 @@ public class TestLocks extends BaseClassForTests
                         @Override
                         public Object call() throws Exception
                         {
-                            InterProcessLock lock = new InterProcessLockBridge(restClient, sessionManager, uriMaker);
+                            InterProcessLock lock = new InterProcessLockBridge(restClient, sessionManager, uriMaker, "/lock");
                             lock.acquire();
                             try
                             {
@@ -228,8 +231,8 @@ public class TestLocks extends BaseClassForTests
     {
         final Timing timing = new Timing();
 
-        final InterProcessLock mutex1 = new InterProcessLockBridge(restClient, sessionManager, uriMaker);
-        final InterProcessLock mutex2 = new InterProcessLockBridge(restClient, sessionManager, uriMaker);
+        final InterProcessLock mutex1 = new InterProcessLockBridge(restClient, sessionManager, uriMaker, "/lock");
+        final InterProcessLock mutex2 = new InterProcessLockBridge(restClient, sessionManager, uriMaker, "/lock");
 
         final Semaphore semaphore = new Semaphore(0);
         ExecutorCompletionService<Object> service = new ExecutorCompletionService<Object>(Executors.newFixedThreadPool(2));
@@ -281,7 +284,7 @@ public class TestLocks extends BaseClassForTests
         ExecutorService service = Executors.newCachedThreadPool();
         for ( int i = 0; i < THREAD_QTY; ++i )
         {
-            final InterProcessLock mutex = new InterProcessLockBridge(restClient, sessionManager, uriMaker);
+            final InterProcessLock mutex = new InterProcessLockBridge(restClient, sessionManager, uriMaker, "/lock");
             Future<Object> t = service.submit
                 (
                     new Callable<Object>()
@@ -324,4 +327,89 @@ public class TestLocks extends BaseClassForTests
             t.get();
         }
     }
+
+    @Test
+    public void     testBasicReadWriteLock() throws Exception
+    {
+        final int               CONCURRENCY = 8;
+        final int               ITERATIONS = 100;
+
+        final Random random = new Random();
+        final AtomicInteger concurrentCount = new AtomicInteger(0);
+        final AtomicInteger     maxConcurrentCount = new AtomicInteger(0);
+        final AtomicInteger     writeCount = new AtomicInteger(0);
+        final AtomicInteger     readCount = new AtomicInteger(0);
+
+        List<Future<Void>>  futures = Lists.newArrayList();
+        ExecutorService     service = Executors.newCachedThreadPool();
+        for ( int i = 0; i < CONCURRENCY; ++i )
+        {
+            Future<Void>    future = service.submit
+                (
+                    new Callable<Void>()
+                    {
+                        @Override
+                        public Void call() throws Exception
+                        {
+                            InterProcessReadWriteLockBridge lock = new InterProcessReadWriteLockBridge(restClient, sessionManager, uriMaker, "/lock");
+                            for ( int i = 0; i < ITERATIONS; ++i )
+                            {
+                                if ( random.nextInt(100) < 10 )
+                                {
+                                    doLocking(lock.writeLock(), concurrentCount, maxConcurrentCount, random, 1);
+                                    writeCount.incrementAndGet();
+                                }
+                                else
+                                {
+                                    doLocking(lock.readLock(), concurrentCount, maxConcurrentCount, random, Integer.MAX_VALUE);
+                                    readCount.incrementAndGet();
+                                }
+                            }
+                            return null;
+                        }
+                    }
+                );
+            futures.add(future);
+        }
+
+        for ( Future<Void> future : futures )
+        {
+            future.get();
+        }
+
+        System.out.println("Writes: " + writeCount.get() + " - Reads: " + readCount.get() + " - Max Reads: " + maxConcurrentCount.get());
+
+        Assert.assertTrue(writeCount.get() > 0);
+        Assert.assertTrue(readCount.get() > 0);
+        Assert.assertTrue(maxConcurrentCount.get() > 1);
+    }
+
+    private void doLocking(InterProcessLock lock, AtomicInteger concurrentCount, AtomicInteger maxConcurrentCount, Random random, int maxAllowed) throws Exception
+    {
+        try
+        {
+            Assert.assertTrue(lock.acquire(10, TimeUnit.SECONDS));
+            int     localConcurrentCount;
+            synchronized(this)
+            {
+                localConcurrentCount = concurrentCount.incrementAndGet();
+                if ( localConcurrentCount > maxConcurrentCount.get() )
+                {
+                    maxConcurrentCount.set(localConcurrentCount);
+                }
+            }
+
+            Assert.assertTrue(localConcurrentCount <= maxAllowed, "" + localConcurrentCount);
+
+            Thread.sleep(random.nextInt(9) + 1);
+        }
+        finally
+        {
+            synchronized(this)
+            {
+                concurrentCount.decrementAndGet();
+                lock.release();
+            }
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/curator/blob/79e55cf8/curator-x-rest/src/test/java/org/apache/curator/x/rest/support/InterProcessLockBridge.java
----------------------------------------------------------------------
diff --git a/curator-x-rest/src/test/java/org/apache/curator/x/rest/support/InterProcessLockBridge.java b/curator-x-rest/src/test/java/org/apache/curator/x/rest/support/InterProcessLockBridge.java
index 15a7a5c..1da27ba 100644
--- a/curator-x-rest/src/test/java/org/apache/curator/x/rest/support/InterProcessLockBridge.java
+++ b/curator-x-rest/src/test/java/org/apache/curator/x/rest/support/InterProcessLockBridge.java
@@ -36,16 +36,16 @@ public class InterProcessLockBridge implements InterProcessLock
     private final Client restClient;
     private final SessionManager sessionManager;
     private final UriMaker uriMaker;
+    private final String path;
 
     private volatile String id = null;
 
-    private static final String PATH = "/lock";
-
-    public InterProcessLockBridge(Client restClient, SessionManager sessionManager, UriMaker uriMaker)
+    public InterProcessLockBridge(Client restClient, SessionManager sessionManager, UriMaker uriMaker, String path)
     {
         this.restClient = restClient;
         this.sessionManager = sessionManager;
         this.uriMaker = uriMaker;
+        this.path = path;
     }
 
     @Override
@@ -61,7 +61,7 @@ public class InterProcessLockBridge implements InterProcessLock
 
         URI uri = uriMaker.getMethodUri(LockResource.class, null);
         LockSpec lockSpec = new LockSpec();
-        lockSpec.setPath(PATH);
+        lockSpec.setPath(path);
         lockSpec.setMaxWaitMs((int)unit.toMillis(time));
         try
         {

http://git-wip-us.apache.org/repos/asf/curator/blob/79e55cf8/curator-x-rest/src/test/java/org/apache/curator/x/rest/support/InterProcessReadWriteLockBridge.java
----------------------------------------------------------------------
diff --git a/curator-x-rest/src/test/java/org/apache/curator/x/rest/support/InterProcessReadWriteLockBridge.java b/curator-x-rest/src/test/java/org/apache/curator/x/rest/support/InterProcessReadWriteLockBridge.java
new file mode 100644
index 0000000..ea21f98
--- /dev/null
+++ b/curator-x-rest/src/test/java/org/apache/curator/x/rest/support/InterProcessReadWriteLockBridge.java
@@ -0,0 +1,132 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.curator.x.rest.support;
+
+import com.sun.jersey.api.client.Client;
+import com.sun.jersey.api.client.UniformInterfaceException;
+import org.apache.curator.framework.recipes.locks.InterProcessLock;
+import org.apache.curator.x.rest.api.ReadWriteLockResource;
+import org.apache.curator.x.rest.entities.Id;
+import org.apache.curator.x.rest.entities.LockSpec;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import java.net.URI;
+import java.util.concurrent.TimeUnit;
+
+public class InterProcessReadWriteLockBridge
+{
+    private final Client restClient;
+    private final SessionManager sessionManager;
+    private final UriMaker uriMaker;
+    private final String path;
+
+    public InterProcessReadWriteLockBridge(Client restClient, SessionManager sessionManager, UriMaker uriMaker, String path)
+    {
+
+        this.restClient = restClient;
+        this.sessionManager = sessionManager;
+        this.uriMaker = uriMaker;
+        this.path = path;
+    }
+
+    public InterProcessLock writeLock()
+    {
+        return new InternalLock(true);
+    }
+
+    public InterProcessLock readLock()
+    {
+        return new InternalLock(false);
+    }
+
+    private class InternalLock implements InterProcessLock
+    {
+        private final boolean writeLock;
+        private final ThreadLocal<String> id = new ThreadLocal<String>();
+
+        public InternalLock(boolean writeLock)
+        {
+            this.writeLock = writeLock;
+        }
+
+        @Override
+        public void acquire() throws Exception
+        {
+            if ( !acquire(Integer.MAX_VALUE, TimeUnit.MILLISECONDS) )
+            {
+                throw new Exception("Could not acquire");
+            }
+        }
+
+        @Override
+        public boolean acquire(long time, TimeUnit unit) throws Exception
+        {
+            if ( id.get() != null )
+            {
+                throw new Exception("Already acquired in this thread");
+            }
+
+            URI uri = uriMaker.getMethodUri(ReadWriteLockResource.class, writeLock ? "acquireWriteLock" : "acquireReadLock");
+            LockSpec lockSpec = new LockSpec();
+            lockSpec.setPath(path);
+            lockSpec.setMaxWaitMs((int)unit.toMillis(time));
+
+            String id;
+            try
+            {
+                id = restClient.resource(uri).type(MediaType.APPLICATION_JSON_TYPE).post(Id.class, lockSpec).getId();
+            }
+            catch ( UniformInterfaceException e )
+            {
+                if ( e.getResponse().getStatus() == Response.Status.SERVICE_UNAVAILABLE.getStatusCode() )
+                {
+                    return false;
+                }
+                throw e;
+            }
+
+            this.id.set(id);
+            sessionManager.addEntry(uriMaker.getLocalhost(), id, null);
+            return true;
+        }
+
+        @Override
+        public void release() throws Exception
+        {
+            String localId = id.get();
+            if ( localId == null )
+            {
+                throw new Exception("Not acquired in this thread");
+            }
+
+            URI uri = uriMaker.getMethodUri(ReadWriteLockResource.class, null);
+            restClient.resource(uri).path(localId).delete();
+
+            sessionManager.removeEntry(uriMaker.getLocalhost(), localId);
+            id.set(null);
+        }
+
+        @Override
+        public boolean isAcquiredInThisProcess()
+        {
+            return id.get() != null;
+        }
+    }
+}