You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@curator.apache.org by ra...@apache.org on 2014/03/05 13:27:54 UTC
git commit: More testing - there are problems with r/w lock and
threading. More to come.
Repository: curator
Updated Branches:
refs/heads/CURATOR-88 6477a2f08 -> 79e55cf88
More testing - there are problems with r/w lock and threading. More to come.
Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/79e55cf8
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/79e55cf8
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/79e55cf8
Branch: refs/heads/CURATOR-88
Commit: 79e55cf882df32bc21490af7511f5f52f4566c24
Parents: 6477a2f
Author: randgalt <ra...@apache.org>
Authored: Wed Mar 5 07:27:47 2014 -0500
Committer: randgalt <ra...@apache.org>
Committed: Wed Mar 5 07:27:47 2014 -0500
----------------------------------------------------------------------
.../apache/curator/x/rest/api/TestLocks.java | 100 +++++++++++++-
.../x/rest/support/InterProcessLockBridge.java | 8 +-
.../InterProcessReadWriteLockBridge.java | 132 +++++++++++++++++++
3 files changed, 230 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/curator/blob/79e55cf8/curator-x-rest/src/test/java/org/apache/curator/x/rest/api/TestLocks.java
----------------------------------------------------------------------
diff --git a/curator-x-rest/src/test/java/org/apache/curator/x/rest/api/TestLocks.java b/curator-x-rest/src/test/java/org/apache/curator/x/rest/api/TestLocks.java
index 7aa4585..168329e 100644
--- a/curator-x-rest/src/test/java/org/apache/curator/x/rest/api/TestLocks.java
+++ b/curator-x-rest/src/test/java/org/apache/curator/x/rest/api/TestLocks.java
@@ -28,10 +28,12 @@ import org.apache.curator.x.rest.entities.Status;
import org.apache.curator.x.rest.entities.StatusMessage;
import org.apache.curator.x.rest.support.BaseClassForTests;
import org.apache.curator.x.rest.support.InterProcessLockBridge;
+import org.apache.curator.x.rest.support.InterProcessReadWriteLockBridge;
import org.apache.curator.x.rest.support.StatusListener;
import org.testng.Assert;
import org.testng.annotations.Test;
import java.util.List;
+import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorCompletionService;
@@ -41,6 +43,7 @@ import java.util.concurrent.Future;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
public class TestLocks extends BaseClassForTests
@@ -48,8 +51,8 @@ public class TestLocks extends BaseClassForTests
@Test
public void test2Clients() throws Exception
{
- final InterProcessLock mutexForClient1 = new InterProcessLockBridge(restClient, sessionManager, uriMaker);
- final InterProcessLock mutexForClient2 = new InterProcessLockBridge(restClient, sessionManager, uriMaker);
+ final InterProcessLock mutexForClient1 = new InterProcessLockBridge(restClient, sessionManager, uriMaker, "/lock");
+ final InterProcessLock mutexForClient2 = new InterProcessLockBridge(restClient, sessionManager, uriMaker, "/lock");
final CountDownLatch latchForClient1 = new CountDownLatch(1);
final CountDownLatch latchForClient2 = new CountDownLatch(1);
@@ -187,7 +190,7 @@ public class TestLocks extends BaseClassForTests
@Override
public Object call() throws Exception
{
- InterProcessLock lock = new InterProcessLockBridge(restClient, sessionManager, uriMaker);
+ InterProcessLock lock = new InterProcessLockBridge(restClient, sessionManager, uriMaker, "/lock");
lock.acquire();
try
{
@@ -228,8 +231,8 @@ public class TestLocks extends BaseClassForTests
{
final Timing timing = new Timing();
- final InterProcessLock mutex1 = new InterProcessLockBridge(restClient, sessionManager, uriMaker);
- final InterProcessLock mutex2 = new InterProcessLockBridge(restClient, sessionManager, uriMaker);
+ final InterProcessLock mutex1 = new InterProcessLockBridge(restClient, sessionManager, uriMaker, "/lock");
+ final InterProcessLock mutex2 = new InterProcessLockBridge(restClient, sessionManager, uriMaker, "/lock");
final Semaphore semaphore = new Semaphore(0);
ExecutorCompletionService<Object> service = new ExecutorCompletionService<Object>(Executors.newFixedThreadPool(2));
@@ -281,7 +284,7 @@ public class TestLocks extends BaseClassForTests
ExecutorService service = Executors.newCachedThreadPool();
for ( int i = 0; i < THREAD_QTY; ++i )
{
- final InterProcessLock mutex = new InterProcessLockBridge(restClient, sessionManager, uriMaker);
+ final InterProcessLock mutex = new InterProcessLockBridge(restClient, sessionManager, uriMaker, "/lock");
Future<Object> t = service.submit
(
new Callable<Object>()
@@ -324,4 +327,89 @@ public class TestLocks extends BaseClassForTests
t.get();
}
}
+
+ @Test
+ public void testBasicReadWriteLock() throws Exception
+ {
+ final int CONCURRENCY = 8;
+ final int ITERATIONS = 100;
+
+ final Random random = new Random();
+ final AtomicInteger concurrentCount = new AtomicInteger(0);
+ final AtomicInteger maxConcurrentCount = new AtomicInteger(0);
+ final AtomicInteger writeCount = new AtomicInteger(0);
+ final AtomicInteger readCount = new AtomicInteger(0);
+
+ List<Future<Void>> futures = Lists.newArrayList();
+ ExecutorService service = Executors.newCachedThreadPool();
+ for ( int i = 0; i < CONCURRENCY; ++i )
+ {
+ Future<Void> future = service.submit
+ (
+ new Callable<Void>()
+ {
+ @Override
+ public Void call() throws Exception
+ {
+ InterProcessReadWriteLockBridge lock = new InterProcessReadWriteLockBridge(restClient, sessionManager, uriMaker, "/lock");
+ for ( int i = 0; i < ITERATIONS; ++i )
+ {
+ if ( random.nextInt(100) < 10 )
+ {
+ doLocking(lock.writeLock(), concurrentCount, maxConcurrentCount, random, 1);
+ writeCount.incrementAndGet();
+ }
+ else
+ {
+ doLocking(lock.readLock(), concurrentCount, maxConcurrentCount, random, Integer.MAX_VALUE);
+ readCount.incrementAndGet();
+ }
+ }
+ return null;
+ }
+ }
+ );
+ futures.add(future);
+ }
+
+ for ( Future<Void> future : futures )
+ {
+ future.get();
+ }
+
+ System.out.println("Writes: " + writeCount.get() + " - Reads: " + readCount.get() + " - Max Reads: " + maxConcurrentCount.get());
+
+ Assert.assertTrue(writeCount.get() > 0);
+ Assert.assertTrue(readCount.get() > 0);
+ Assert.assertTrue(maxConcurrentCount.get() > 1);
+ }
+
+ private void doLocking(InterProcessLock lock, AtomicInteger concurrentCount, AtomicInteger maxConcurrentCount, Random random, int maxAllowed) throws Exception
+ {
+ try
+ {
+ Assert.assertTrue(lock.acquire(10, TimeUnit.SECONDS));
+ int localConcurrentCount;
+ synchronized(this)
+ {
+ localConcurrentCount = concurrentCount.incrementAndGet();
+ if ( localConcurrentCount > maxConcurrentCount.get() )
+ {
+ maxConcurrentCount.set(localConcurrentCount);
+ }
+ }
+
+ Assert.assertTrue(localConcurrentCount <= maxAllowed, "" + localConcurrentCount);
+
+ Thread.sleep(random.nextInt(9) + 1);
+ }
+ finally
+ {
+ synchronized(this)
+ {
+ concurrentCount.decrementAndGet();
+ lock.release();
+ }
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/curator/blob/79e55cf8/curator-x-rest/src/test/java/org/apache/curator/x/rest/support/InterProcessLockBridge.java
----------------------------------------------------------------------
diff --git a/curator-x-rest/src/test/java/org/apache/curator/x/rest/support/InterProcessLockBridge.java b/curator-x-rest/src/test/java/org/apache/curator/x/rest/support/InterProcessLockBridge.java
index 15a7a5c..1da27ba 100644
--- a/curator-x-rest/src/test/java/org/apache/curator/x/rest/support/InterProcessLockBridge.java
+++ b/curator-x-rest/src/test/java/org/apache/curator/x/rest/support/InterProcessLockBridge.java
@@ -36,16 +36,16 @@ public class InterProcessLockBridge implements InterProcessLock
private final Client restClient;
private final SessionManager sessionManager;
private final UriMaker uriMaker;
+ private final String path;
private volatile String id = null;
- private static final String PATH = "/lock";
-
- public InterProcessLockBridge(Client restClient, SessionManager sessionManager, UriMaker uriMaker)
+ public InterProcessLockBridge(Client restClient, SessionManager sessionManager, UriMaker uriMaker, String path)
{
this.restClient = restClient;
this.sessionManager = sessionManager;
this.uriMaker = uriMaker;
+ this.path = path;
}
@Override
@@ -61,7 +61,7 @@ public class InterProcessLockBridge implements InterProcessLock
URI uri = uriMaker.getMethodUri(LockResource.class, null);
LockSpec lockSpec = new LockSpec();
- lockSpec.setPath(PATH);
+ lockSpec.setPath(path);
lockSpec.setMaxWaitMs((int)unit.toMillis(time));
try
{
http://git-wip-us.apache.org/repos/asf/curator/blob/79e55cf8/curator-x-rest/src/test/java/org/apache/curator/x/rest/support/InterProcessReadWriteLockBridge.java
----------------------------------------------------------------------
diff --git a/curator-x-rest/src/test/java/org/apache/curator/x/rest/support/InterProcessReadWriteLockBridge.java b/curator-x-rest/src/test/java/org/apache/curator/x/rest/support/InterProcessReadWriteLockBridge.java
new file mode 100644
index 0000000..ea21f98
--- /dev/null
+++ b/curator-x-rest/src/test/java/org/apache/curator/x/rest/support/InterProcessReadWriteLockBridge.java
@@ -0,0 +1,132 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.curator.x.rest.support;
+
+import com.sun.jersey.api.client.Client;
+import com.sun.jersey.api.client.UniformInterfaceException;
+import org.apache.curator.framework.recipes.locks.InterProcessLock;
+import org.apache.curator.x.rest.api.ReadWriteLockResource;
+import org.apache.curator.x.rest.entities.Id;
+import org.apache.curator.x.rest.entities.LockSpec;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import java.net.URI;
+import java.util.concurrent.TimeUnit;
+
+public class InterProcessReadWriteLockBridge
+{
+ private final Client restClient;
+ private final SessionManager sessionManager;
+ private final UriMaker uriMaker;
+ private final String path;
+
+ public InterProcessReadWriteLockBridge(Client restClient, SessionManager sessionManager, UriMaker uriMaker, String path)
+ {
+
+ this.restClient = restClient;
+ this.sessionManager = sessionManager;
+ this.uriMaker = uriMaker;
+ this.path = path;
+ }
+
+ public InterProcessLock writeLock()
+ {
+ return new InternalLock(true);
+ }
+
+ public InterProcessLock readLock()
+ {
+ return new InternalLock(false);
+ }
+
+ private class InternalLock implements InterProcessLock
+ {
+ private final boolean writeLock;
+ private final ThreadLocal<String> id = new ThreadLocal<String>();
+
+ public InternalLock(boolean writeLock)
+ {
+ this.writeLock = writeLock;
+ }
+
+ @Override
+ public void acquire() throws Exception
+ {
+ if ( !acquire(Integer.MAX_VALUE, TimeUnit.MILLISECONDS) )
+ {
+ throw new Exception("Could not acquire");
+ }
+ }
+
+ @Override
+ public boolean acquire(long time, TimeUnit unit) throws Exception
+ {
+ if ( id.get() != null )
+ {
+ throw new Exception("Already acquired in this thread");
+ }
+
+ URI uri = uriMaker.getMethodUri(ReadWriteLockResource.class, writeLock ? "acquireWriteLock" : "acquireReadLock");
+ LockSpec lockSpec = new LockSpec();
+ lockSpec.setPath(path);
+ lockSpec.setMaxWaitMs((int)unit.toMillis(time));
+
+ String id;
+ try
+ {
+ id = restClient.resource(uri).type(MediaType.APPLICATION_JSON_TYPE).post(Id.class, lockSpec).getId();
+ }
+ catch ( UniformInterfaceException e )
+ {
+ if ( e.getResponse().getStatus() == Response.Status.SERVICE_UNAVAILABLE.getStatusCode() )
+ {
+ return false;
+ }
+ throw e;
+ }
+
+ this.id.set(id);
+ sessionManager.addEntry(uriMaker.getLocalhost(), id, null);
+ return true;
+ }
+
+ @Override
+ public void release() throws Exception
+ {
+ String localId = id.get();
+ if ( localId == null )
+ {
+ throw new Exception("Not acquired in this thread");
+ }
+
+ URI uri = uriMaker.getMethodUri(ReadWriteLockResource.class, null);
+ restClient.resource(uri).path(localId).delete();
+
+ sessionManager.removeEntry(uriMaker.getLocalhost(), localId);
+ id.set(null);
+ }
+
+ @Override
+ public boolean isAcquiredInThisProcess()
+ {
+ return id.get() != null;
+ }
+ }
+}