You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@curator.apache.org by ra...@apache.org on 2014/03/07 13:39:32 UTC

git commit: more read/write tests

Repository: curator
Updated Branches:
  refs/heads/CURATOR-88 ae4813512 -> 203b49d90


more read/write tests


Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/203b49d9
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/203b49d9
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/203b49d9

Branch: refs/heads/CURATOR-88
Commit: 203b49d905b34a7c3b17ab7553baea0c5cb1f63a
Parents: ae48135
Author: randgalt <ra...@apache.org>
Authored: Fri Mar 7 07:39:19 2014 -0500
Committer: randgalt <ra...@apache.org>
Committed: Fri Mar 7 07:39:19 2014 -0500

----------------------------------------------------------------------
 .../locks/TestInterProcessReadWriteLock.java    | 161 +++++++++++++++
 .../TestInterProcessReadWriteLockBase.java      | 207 +++++++------------
 .../TestInterProcessSemaphoreReadWrite.java     |  26 +--
 3 files changed, 243 insertions(+), 151 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/curator/blob/203b49d9/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessReadWriteLock.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessReadWriteLock.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessReadWriteLock.java
index 91e1ce0..49fbb55 100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessReadWriteLock.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessReadWriteLock.java
@@ -20,9 +20,170 @@
 package org.apache.curator.framework.recipes.locks;
 
 import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.RetryOneTime;
+import org.apache.curator.test.Timing;
+import org.apache.curator.utils.CloseableUtils;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+import java.util.Collection;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
 
 public class TestInterProcessReadWriteLock extends TestInterProcessReadWriteLockBase
 {
+    @Test
+    public void testGetParticipantNodes() throws Exception
+    {
+        final int READERS = 20;
+        final int WRITERS = 8;
+
+        final CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1));
+        try
+        {
+            client.start();
+
+            final CountDownLatch latch = new CountDownLatch(READERS + WRITERS);
+            final CountDownLatch readLatch = new CountDownLatch(READERS);
+
+            ExecutorService service = Executors.newCachedThreadPool();
+            for ( int i = 0; i < READERS; ++i )
+            {
+                service.submit(new Callable<Void>()
+                {
+                    @Override
+                    public Void call() throws Exception
+                    {
+                        InterProcessReadWriteLockBase lock = newLock(client, "/lock");
+                        lock.readLock().acquire();
+                        latch.countDown();
+                        readLatch.countDown();
+                        return null;
+                    }
+                });
+            }
+            for ( int i = 0; i < WRITERS; ++i )
+            {
+                service.submit(new Callable<Void>()
+                {
+                    @Override
+                    public Void call() throws Exception
+                    {
+                        InterProcessReadWriteLockBase lock = newLock(client, "/lock");
+                        Assert.assertTrue(readLatch.await(10, TimeUnit.SECONDS));
+                        latch.countDown();  // must be before as there can only be one writer
+                        lock.writeLock().acquire();
+                        return null;
+                    }
+                });
+            }
+
+            Assert.assertTrue(latch.await(10, TimeUnit.SECONDS));
+            new Timing().sleepABit();
+
+            InterProcessReadWriteLockBase lock = newLock(client, "/lock");
+            Collection<String> readers = lock.readLock().getParticipantNodes();
+            Collection<String> writers = lock.writeLock().getParticipantNodes();
+
+            Assert.assertEquals(readers.size(), READERS);
+            Assert.assertEquals(writers.size(), WRITERS);
+        }
+        finally
+        {
+            CloseableUtils.closeQuietly(client);
+        }
+    }
+
+    @Test
+    public void testThatUpgradingIsDisallowed() throws Exception
+    {
+        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1));
+        try
+        {
+            client.start();
+
+            InterProcessReadWriteLockBase lock = newLock(client, "/lock");
+            lock.readLock().acquire();
+            Assert.assertFalse(lock.writeLock().acquire(5, TimeUnit.SECONDS));
+
+            lock.readLock().release();
+        }
+        finally
+        {
+            CloseableUtils.closeQuietly(client);
+        }
+    }
+
+    @Test
+    public void testThatDowngradingRespectsThreads() throws Exception
+    {
+        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1));
+        try
+        {
+            client.start();
+
+            final InterProcessReadWriteLockBase lock = newLock(client, "/lock");
+            ExecutorService t1 = Executors.newSingleThreadExecutor();
+            ExecutorService t2 = Executors.newSingleThreadExecutor();
+
+            final CountDownLatch latch = new CountDownLatch(1);
+
+            Future<Object> f1 = t1.submit(new Callable<Object>()
+            {
+                @Override
+                public Object call() throws Exception
+                {
+                    lock.writeLock().acquire();
+                    latch.countDown();
+                    return null;
+                }
+            });
+
+            Future<Object> f2 = t2.submit(new Callable<Object>()
+            {
+                @Override
+                public Object call() throws Exception
+                {
+                    Assert.assertTrue(latch.await(10, TimeUnit.SECONDS));
+                    Assert.assertFalse(lock.readLock().acquire(5, TimeUnit.SECONDS));
+                    return null;
+                }
+            });
+
+            f1.get();
+            f2.get();
+        }
+        finally
+        {
+            CloseableUtils.closeQuietly(client);
+        }
+    }
+
+    @Test
+    public void testDowngrading() throws Exception
+    {
+        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1));
+        try
+        {
+            client.start();
+
+            InterProcessReadWriteLockBase lock = newLock(client, "/lock");
+            lock.writeLock().acquire();
+            Assert.assertTrue(lock.readLock().acquire(5, TimeUnit.SECONDS));
+            lock.writeLock().release();
+
+            lock.readLock().release();
+        }
+        finally
+        {
+            CloseableUtils.closeQuietly(client);
+        }
+    }
+
     @Override
     protected InterProcessReadWriteLockBase newLock(CuratorFramework client, String path)
     {

http://git-wip-us.apache.org/repos/asf/curator/blob/203b49d9/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessReadWriteLockBase.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessReadWriteLockBase.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessReadWriteLockBase.java
index 81a6b14..0bd36d8 100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessReadWriteLockBase.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessReadWriteLockBase.java
@@ -19,7 +19,6 @@
 
 package org.apache.curator.framework.recipes.locks;
 
-import com.google.common.collect.Lists;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.CuratorFrameworkFactory;
 import org.apache.curator.framework.recipes.BaseClassForTests;
@@ -28,11 +27,9 @@ import org.apache.curator.test.Timing;
 import org.apache.curator.utils.CloseableUtils;
 import org.testng.Assert;
 import org.testng.annotations.Test;
-import java.util.Collection;
-import java.util.List;
 import java.util.Random;
 import java.util.concurrent.Callable;
-import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorCompletionService;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
@@ -42,150 +39,104 @@ import java.util.concurrent.atomic.AtomicInteger;
 public abstract class TestInterProcessReadWriteLockBase extends BaseClassForTests
 {
     @Test
-    public void testGetParticipantNodes() throws Exception
+    public void testWriterAgainstConstantReaders() throws Exception
     {
-        final int READERS = 20;
-        final int WRITERS = 8;
+        final int CONCURRENCY = 8;
+        final int WRITER_ATTEMPTS = 10;
 
-        final CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1));
-        try
+        ExecutorService service = Executors.newCachedThreadPool();
+        ExecutorCompletionService<Void> completionService = new ExecutorCompletionService<Void>(service);
+        for ( int i = 0; i < CONCURRENCY; ++i )
         {
-            client.start();
-
-            final CountDownLatch latch = new CountDownLatch(READERS + WRITERS);
-            final CountDownLatch readLatch = new CountDownLatch(READERS);
-
-            ExecutorService service = Executors.newCachedThreadPool();
-            for ( int i = 0; i < READERS; ++i )
+            completionService.submit(new Callable<Void>()
             {
-                service.submit(new Callable<Void>()
+                @Override
+                public Void call() throws Exception
                 {
-                    @Override
-                    public Void call() throws Exception
+                    CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1));
+                    client.start();
+                    try
                     {
                         InterProcessReadWriteLockBase lock = newLock(client, "/lock");
-                        lock.readLock().acquire();
-                        latch.countDown();
-                        readLatch.countDown();
-                        return null;
+                        try
+                        {
+                            while ( !Thread.currentThread().isInterrupted() )
+                            {
+                                lock.readLock().acquire();
+                                try
+                                {
+                                    Thread.sleep(100);
+                                }
+                                finally
+                                {
+                                    lock.readLock().release();
+                                }
+                            }
+                        }
+                        catch ( InterruptedException dummy )
+                        {
+                            Thread.currentThread().interrupt();
+                        }
                     }
-                });
-            }
-            for ( int i = 0; i < WRITERS; ++i )
-            {
-                service.submit(new Callable<Void>()
-                {
-                    @Override
-                    public Void call() throws Exception
+                    finally
                     {
-                        InterProcessReadWriteLockBase lock = newLock(client, "/lock");
-                        Assert.assertTrue(readLatch.await(10, TimeUnit.SECONDS));
-                        latch.countDown();  // must be before as there can only be one writer
-                        lock.writeLock().acquire();
-                        return null;
+                        CloseableUtils.closeQuietly(client);
                     }
-                });
-            }
-
-            Assert.assertTrue(latch.await(10, TimeUnit.SECONDS));
-            new Timing().sleepABit();
-
-            InterProcessReadWriteLockBase lock = newLock(client, "/lock");
-            Collection<String> readers = lock.readLock().getParticipantNodes();
-            Collection<String> writers = lock.writeLock().getParticipantNodes();
-
-            Assert.assertEquals(readers.size(), READERS);
-            Assert.assertEquals(writers.size(), WRITERS);
-        }
-        finally
-        {
-            CloseableUtils.closeQuietly(client);
+                    return null;
+                }
+            });
         }
-    }
-
-    @Test
-    public void testThatUpgradingIsDisallowed() throws Exception
-    {
-        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1));
-        try
-        {
-            client.start();
 
-            InterProcessReadWriteLockBase lock = newLock(client, "/lock");
-            lock.readLock().acquire();
-            Assert.assertFalse(lock.writeLock().acquire(5, TimeUnit.SECONDS));
+        new Timing().sleepABit();
 
-            lock.readLock().release();
-        }
-        finally
+        final AtomicInteger writerCount = new AtomicInteger();
+        Future<Void> writerThread = completionService.submit(new Callable<Void>()
         {
-            CloseableUtils.closeQuietly(client);
-        }
-    }
-
-    @Test
-    public void testThatDowngradingRespectsThreads() throws Exception
-    {
-        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1));
-        try
-        {
-            client.start();
-
-            final InterProcessReadWriteLockBase lock = newLock(client, "/lock");
-            ExecutorService t1 = Executors.newSingleThreadExecutor();
-            ExecutorService t2 = Executors.newSingleThreadExecutor();
-
-            final CountDownLatch latch = new CountDownLatch(1);
-
-            Future<Object> f1 = t1.submit(new Callable<Object>()
+            @Override
+            public Void call() throws Exception
             {
-                @Override
-                public Object call() throws Exception
+                CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1));
+                client.start();
+                try
                 {
-                    lock.writeLock().acquire();
-                    latch.countDown();
-                    return null;
+                    InterProcessReadWriteLockBase lock = newLock(client, "/lock");
+                    for ( int i = 0; i < WRITER_ATTEMPTS; ++i )
+                    {
+                        if ( !lock.writeLock().acquire(10, TimeUnit.SECONDS) )
+                        {
+                            throw new Exception("Could not get write lock");
+                        }
+                        try
+                        {
+                            writerCount.incrementAndGet();
+                            Thread.sleep(100);
+                        }
+                        finally
+                        {
+                            lock.writeLock().release();
+                        }
+                    }
                 }
-            });
-
-            Future<Object> f2 = t2.submit(new Callable<Object>()
-            {
-                @Override
-                public Object call() throws Exception
+                finally
                 {
-                    Assert.assertTrue(latch.await(10, TimeUnit.SECONDS));
-                    Assert.assertFalse(lock.readLock().acquire(5, TimeUnit.SECONDS));
-                    return null;
+                    CloseableUtils.closeQuietly(client);
                 }
-            });
-
-            f1.get();
-            f2.get();
-        }
-        finally
-        {
-            CloseableUtils.closeQuietly(client);
-        }
-    }
+                return null;
+            }
+        });
 
-    @Test
-    public void testDowngrading() throws Exception
-    {
-        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1));
+        writerThread.get();
         try
         {
-            client.start();
-
-            InterProcessReadWriteLockBase lock = newLock(client, "/lock");
-            lock.writeLock().acquire();
-            Assert.assertTrue(lock.readLock().acquire(5, TimeUnit.SECONDS));
-            lock.writeLock().release();
-
-            lock.readLock().release();
+            Assert.assertEquals(writerCount.get(), WRITER_ATTEMPTS);
         }
         finally
         {
-            CloseableUtils.closeQuietly(client);
+            service.shutdownNow();
+            for ( int i =0; i < CONCURRENCY; ++i )
+            {
+                completionService.take().get();
+            }
         }
     }
 
@@ -201,11 +152,11 @@ public abstract class TestInterProcessReadWriteLockBase extends BaseClassForTest
         final AtomicInteger writeCount = new AtomicInteger(0);
         final AtomicInteger readCount = new AtomicInteger(0);
 
-        List<Future<Void>> futures = Lists.newArrayList();
         ExecutorService service = Executors.newCachedThreadPool();
+        ExecutorCompletionService<Void> completionService = new ExecutorCompletionService<Void>(service);
         for ( int i = 0; i < CONCURRENCY; ++i )
         {
-            Future<Void> future = service.submit(new Callable<Void>()
+            completionService.submit(new Callable<Void>()
             {
                 @Override
                 public Void call() throws Exception
@@ -236,12 +187,10 @@ public abstract class TestInterProcessReadWriteLockBase extends BaseClassForTest
                     return null;
                 }
             });
-            futures.add(future);
         }
-
-        for ( Future<Void> future : futures )
+        for ( int i =0; i < CONCURRENCY; ++i )
         {
-            future.get();
+            completionService.take().get();
         }
 
         System.out.println("Writes: " + writeCount.get() + " - Reads: " + readCount.get() + " - Max Reads: " + maxConcurrentCount.get());

http://git-wip-us.apache.org/repos/asf/curator/blob/203b49d9/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessSemaphoreReadWrite.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessSemaphoreReadWrite.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessSemaphoreReadWrite.java
index 7faa198..469cc2c 100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessSemaphoreReadWrite.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessSemaphoreReadWrite.java
@@ -20,30 +20,12 @@
 package org.apache.curator.framework.recipes.locks;
 
 import org.apache.curator.framework.CuratorFramework;
-import org.testng.annotations.Test;
 
-public class TestInterProcessSemaphoreReadWrite
+public class TestInterProcessSemaphoreReadWrite extends TestInterProcessReadWriteLockBase
 {
-    @Test
-    public void testBasic() throws Exception
+    @Override
+    protected InterProcessReadWriteLockBase newLock(CuratorFramework client, String path)
     {
-        TestInterProcessReadWriteLockBase base = new TestInterProcessReadWriteLockBase()
-        {
-            @Override
-            protected InterProcessReadWriteLockBase newLock(CuratorFramework client, String path)
-            {
-                return new InterProcessSemaphoreReadWrite(client, path);
-            }
-        };
-
-        base.setup();
-        try
-        {
-            base.testBasic();
-        }
-        finally
-        {
-            base.teardown();
-        }
+        return new InterProcessSemaphoreReadWrite(client, path);
     }
 }