You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@curator.apache.org by ra...@apache.org on 2014/03/07 13:39:32 UTC
git commit: more read/write tests
Repository: curator
Updated Branches:
refs/heads/CURATOR-88 ae4813512 -> 203b49d90
more read/write tests
Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/203b49d9
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/203b49d9
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/203b49d9
Branch: refs/heads/CURATOR-88
Commit: 203b49d905b34a7c3b17ab7553baea0c5cb1f63a
Parents: ae48135
Author: randgalt <ra...@apache.org>
Authored: Fri Mar 7 07:39:19 2014 -0500
Committer: randgalt <ra...@apache.org>
Committed: Fri Mar 7 07:39:19 2014 -0500
----------------------------------------------------------------------
.../locks/TestInterProcessReadWriteLock.java | 161 +++++++++++++++
.../TestInterProcessReadWriteLockBase.java | 207 +++++++------------
.../TestInterProcessSemaphoreReadWrite.java | 26 +--
3 files changed, 243 insertions(+), 151 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/curator/blob/203b49d9/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessReadWriteLock.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessReadWriteLock.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessReadWriteLock.java
index 91e1ce0..49fbb55 100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessReadWriteLock.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessReadWriteLock.java
@@ -20,9 +20,170 @@
package org.apache.curator.framework.recipes.locks;
import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.RetryOneTime;
+import org.apache.curator.test.Timing;
+import org.apache.curator.utils.CloseableUtils;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+import java.util.Collection;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
public class TestInterProcessReadWriteLock extends TestInterProcessReadWriteLockBase
{
+ @Test
+ public void testGetParticipantNodes() throws Exception
+ {
+ final int READERS = 20;
+ final int WRITERS = 8;
+
+ final CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1));
+ try
+ {
+ client.start();
+
+ final CountDownLatch latch = new CountDownLatch(READERS + WRITERS);
+ final CountDownLatch readLatch = new CountDownLatch(READERS);
+
+ ExecutorService service = Executors.newCachedThreadPool();
+ for ( int i = 0; i < READERS; ++i )
+ {
+ service.submit(new Callable<Void>()
+ {
+ @Override
+ public Void call() throws Exception
+ {
+ InterProcessReadWriteLockBase lock = newLock(client, "/lock");
+ lock.readLock().acquire();
+ latch.countDown();
+ readLatch.countDown();
+ return null;
+ }
+ });
+ }
+ for ( int i = 0; i < WRITERS; ++i )
+ {
+ service.submit(new Callable<Void>()
+ {
+ @Override
+ public Void call() throws Exception
+ {
+ InterProcessReadWriteLockBase lock = newLock(client, "/lock");
+ Assert.assertTrue(readLatch.await(10, TimeUnit.SECONDS));
+ latch.countDown(); // must be before as there can only be one writer
+ lock.writeLock().acquire();
+ return null;
+ }
+ });
+ }
+
+ Assert.assertTrue(latch.await(10, TimeUnit.SECONDS));
+ new Timing().sleepABit();
+
+ InterProcessReadWriteLockBase lock = newLock(client, "/lock");
+ Collection<String> readers = lock.readLock().getParticipantNodes();
+ Collection<String> writers = lock.writeLock().getParticipantNodes();
+
+ Assert.assertEquals(readers.size(), READERS);
+ Assert.assertEquals(writers.size(), WRITERS);
+ }
+ finally
+ {
+ CloseableUtils.closeQuietly(client);
+ }
+ }
+
+ @Test
+ public void testThatUpgradingIsDisallowed() throws Exception
+ {
+ CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1));
+ try
+ {
+ client.start();
+
+ InterProcessReadWriteLockBase lock = newLock(client, "/lock");
+ lock.readLock().acquire();
+ Assert.assertFalse(lock.writeLock().acquire(5, TimeUnit.SECONDS));
+
+ lock.readLock().release();
+ }
+ finally
+ {
+ CloseableUtils.closeQuietly(client);
+ }
+ }
+
+ @Test
+ public void testThatDowngradingRespectsThreads() throws Exception
+ {
+ CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1));
+ try
+ {
+ client.start();
+
+ final InterProcessReadWriteLockBase lock = newLock(client, "/lock");
+ ExecutorService t1 = Executors.newSingleThreadExecutor();
+ ExecutorService t2 = Executors.newSingleThreadExecutor();
+
+ final CountDownLatch latch = new CountDownLatch(1);
+
+ Future<Object> f1 = t1.submit(new Callable<Object>()
+ {
+ @Override
+ public Object call() throws Exception
+ {
+ lock.writeLock().acquire();
+ latch.countDown();
+ return null;
+ }
+ });
+
+ Future<Object> f2 = t2.submit(new Callable<Object>()
+ {
+ @Override
+ public Object call() throws Exception
+ {
+ Assert.assertTrue(latch.await(10, TimeUnit.SECONDS));
+ Assert.assertFalse(lock.readLock().acquire(5, TimeUnit.SECONDS));
+ return null;
+ }
+ });
+
+ f1.get();
+ f2.get();
+ }
+ finally
+ {
+ CloseableUtils.closeQuietly(client);
+ }
+ }
+
+ @Test
+ public void testDowngrading() throws Exception
+ {
+ CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1));
+ try
+ {
+ client.start();
+
+ InterProcessReadWriteLockBase lock = newLock(client, "/lock");
+ lock.writeLock().acquire();
+ Assert.assertTrue(lock.readLock().acquire(5, TimeUnit.SECONDS));
+ lock.writeLock().release();
+
+ lock.readLock().release();
+ }
+ finally
+ {
+ CloseableUtils.closeQuietly(client);
+ }
+ }
+
@Override
protected InterProcessReadWriteLockBase newLock(CuratorFramework client, String path)
{
http://git-wip-us.apache.org/repos/asf/curator/blob/203b49d9/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessReadWriteLockBase.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessReadWriteLockBase.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessReadWriteLockBase.java
index 81a6b14..0bd36d8 100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessReadWriteLockBase.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessReadWriteLockBase.java
@@ -19,7 +19,6 @@
package org.apache.curator.framework.recipes.locks;
-import com.google.common.collect.Lists;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.BaseClassForTests;
@@ -28,11 +27,9 @@ import org.apache.curator.test.Timing;
import org.apache.curator.utils.CloseableUtils;
import org.testng.Assert;
import org.testng.annotations.Test;
-import java.util.Collection;
-import java.util.List;
import java.util.Random;
import java.util.concurrent.Callable;
-import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
@@ -42,150 +39,104 @@ import java.util.concurrent.atomic.AtomicInteger;
public abstract class TestInterProcessReadWriteLockBase extends BaseClassForTests
{
@Test
- public void testGetParticipantNodes() throws Exception
+ public void testWriterAgainstConstantReaders() throws Exception
{
- final int READERS = 20;
- final int WRITERS = 8;
+ final int CONCURRENCY = 8;
+ final int WRITER_ATTEMPTS = 10;
- final CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1));
- try
+ ExecutorService service = Executors.newCachedThreadPool();
+ ExecutorCompletionService<Void> completionService = new ExecutorCompletionService<Void>(service);
+ for ( int i = 0; i < CONCURRENCY; ++i )
{
- client.start();
-
- final CountDownLatch latch = new CountDownLatch(READERS + WRITERS);
- final CountDownLatch readLatch = new CountDownLatch(READERS);
-
- ExecutorService service = Executors.newCachedThreadPool();
- for ( int i = 0; i < READERS; ++i )
+ completionService.submit(new Callable<Void>()
{
- service.submit(new Callable<Void>()
+ @Override
+ public Void call() throws Exception
{
- @Override
- public Void call() throws Exception
+ CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1));
+ client.start();
+ try
{
InterProcessReadWriteLockBase lock = newLock(client, "/lock");
- lock.readLock().acquire();
- latch.countDown();
- readLatch.countDown();
- return null;
+ try
+ {
+ while ( !Thread.currentThread().isInterrupted() )
+ {
+ lock.readLock().acquire();
+ try
+ {
+ Thread.sleep(100);
+ }
+ finally
+ {
+ lock.readLock().release();
+ }
+ }
+ }
+ catch ( InterruptedException dummy )
+ {
+ Thread.currentThread().interrupt();
+ }
}
- });
- }
- for ( int i = 0; i < WRITERS; ++i )
- {
- service.submit(new Callable<Void>()
- {
- @Override
- public Void call() throws Exception
+ finally
{
- InterProcessReadWriteLockBase lock = newLock(client, "/lock");
- Assert.assertTrue(readLatch.await(10, TimeUnit.SECONDS));
- latch.countDown(); // must be before as there can only be one writer
- lock.writeLock().acquire();
- return null;
+ CloseableUtils.closeQuietly(client);
}
- });
- }
-
- Assert.assertTrue(latch.await(10, TimeUnit.SECONDS));
- new Timing().sleepABit();
-
- InterProcessReadWriteLockBase lock = newLock(client, "/lock");
- Collection<String> readers = lock.readLock().getParticipantNodes();
- Collection<String> writers = lock.writeLock().getParticipantNodes();
-
- Assert.assertEquals(readers.size(), READERS);
- Assert.assertEquals(writers.size(), WRITERS);
- }
- finally
- {
- CloseableUtils.closeQuietly(client);
+ return null;
+ }
+ });
}
- }
-
- @Test
- public void testThatUpgradingIsDisallowed() throws Exception
- {
- CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1));
- try
- {
- client.start();
- InterProcessReadWriteLockBase lock = newLock(client, "/lock");
- lock.readLock().acquire();
- Assert.assertFalse(lock.writeLock().acquire(5, TimeUnit.SECONDS));
+ new Timing().sleepABit();
- lock.readLock().release();
- }
- finally
+ final AtomicInteger writerCount = new AtomicInteger();
+ Future<Void> writerThread = completionService.submit(new Callable<Void>()
{
- CloseableUtils.closeQuietly(client);
- }
- }
-
- @Test
- public void testThatDowngradingRespectsThreads() throws Exception
- {
- CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1));
- try
- {
- client.start();
-
- final InterProcessReadWriteLockBase lock = newLock(client, "/lock");
- ExecutorService t1 = Executors.newSingleThreadExecutor();
- ExecutorService t2 = Executors.newSingleThreadExecutor();
-
- final CountDownLatch latch = new CountDownLatch(1);
-
- Future<Object> f1 = t1.submit(new Callable<Object>()
+ @Override
+ public Void call() throws Exception
{
- @Override
- public Object call() throws Exception
+ CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1));
+ client.start();
+ try
{
- lock.writeLock().acquire();
- latch.countDown();
- return null;
+ InterProcessReadWriteLockBase lock = newLock(client, "/lock");
+ for ( int i = 0; i < WRITER_ATTEMPTS; ++i )
+ {
+ if ( !lock.writeLock().acquire(10, TimeUnit.SECONDS) )
+ {
+ throw new Exception("Could not get write lock");
+ }
+ try
+ {
+ writerCount.incrementAndGet();
+ Thread.sleep(100);
+ }
+ finally
+ {
+ lock.writeLock().release();
+ }
+ }
}
- });
-
- Future<Object> f2 = t2.submit(new Callable<Object>()
- {
- @Override
- public Object call() throws Exception
+ finally
{
- Assert.assertTrue(latch.await(10, TimeUnit.SECONDS));
- Assert.assertFalse(lock.readLock().acquire(5, TimeUnit.SECONDS));
- return null;
+ CloseableUtils.closeQuietly(client);
}
- });
-
- f1.get();
- f2.get();
- }
- finally
- {
- CloseableUtils.closeQuietly(client);
- }
- }
+ return null;
+ }
+ });
- @Test
- public void testDowngrading() throws Exception
- {
- CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1));
+ writerThread.get();
try
{
- client.start();
-
- InterProcessReadWriteLockBase lock = newLock(client, "/lock");
- lock.writeLock().acquire();
- Assert.assertTrue(lock.readLock().acquire(5, TimeUnit.SECONDS));
- lock.writeLock().release();
-
- lock.readLock().release();
+ Assert.assertEquals(writerCount.get(), WRITER_ATTEMPTS);
}
finally
{
- CloseableUtils.closeQuietly(client);
+ service.shutdownNow();
+ for ( int i =0; i < CONCURRENCY; ++i )
+ {
+ completionService.take().get();
+ }
}
}
@@ -201,11 +152,11 @@ public abstract class TestInterProcessReadWriteLockBase extends BaseClassForTest
final AtomicInteger writeCount = new AtomicInteger(0);
final AtomicInteger readCount = new AtomicInteger(0);
- List<Future<Void>> futures = Lists.newArrayList();
ExecutorService service = Executors.newCachedThreadPool();
+ ExecutorCompletionService<Void> completionService = new ExecutorCompletionService<Void>(service);
for ( int i = 0; i < CONCURRENCY; ++i )
{
- Future<Void> future = service.submit(new Callable<Void>()
+ completionService.submit(new Callable<Void>()
{
@Override
public Void call() throws Exception
@@ -236,12 +187,10 @@ public abstract class TestInterProcessReadWriteLockBase extends BaseClassForTest
return null;
}
});
- futures.add(future);
}
-
- for ( Future<Void> future : futures )
+ for ( int i =0; i < CONCURRENCY; ++i )
{
- future.get();
+ completionService.take().get();
}
System.out.println("Writes: " + writeCount.get() + " - Reads: " + readCount.get() + " - Max Reads: " + maxConcurrentCount.get());
http://git-wip-us.apache.org/repos/asf/curator/blob/203b49d9/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessSemaphoreReadWrite.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessSemaphoreReadWrite.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessSemaphoreReadWrite.java
index 7faa198..469cc2c 100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessSemaphoreReadWrite.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessSemaphoreReadWrite.java
@@ -20,30 +20,12 @@
package org.apache.curator.framework.recipes.locks;
import org.apache.curator.framework.CuratorFramework;
-import org.testng.annotations.Test;
-public class TestInterProcessSemaphoreReadWrite
+public class TestInterProcessSemaphoreReadWrite extends TestInterProcessReadWriteLockBase
{
- @Test
- public void testBasic() throws Exception
+ @Override
+ protected InterProcessReadWriteLockBase newLock(CuratorFramework client, String path)
{
- TestInterProcessReadWriteLockBase base = new TestInterProcessReadWriteLockBase()
- {
- @Override
- protected InterProcessReadWriteLockBase newLock(CuratorFramework client, String path)
- {
- return new InterProcessSemaphoreReadWrite(client, path);
- }
- };
-
- base.setup();
- try
- {
- base.testBasic();
- }
- finally
- {
- base.teardown();
- }
+ return new InterProcessSemaphoreReadWrite(client, path);
}
}