You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@curator.apache.org by ra...@apache.org on 2014/03/07 01:59:30 UTC

[1/2] git commit: work continues

Repository: curator
Updated Branches:
  refs/heads/CURATOR-88 79e55cf88 -> ae4813512


work continues


Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/9f757b33
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/9f757b33
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/9f757b33

Branch: refs/heads/CURATOR-88
Commit: 9f757b337f8f9eb17bb10316b5baeccce62f6789
Parents: 79e55cf
Author: randgalt <ra...@apache.org>
Authored: Thu Mar 6 19:56:01 2014 -0500
Committer: randgalt <ra...@apache.org>
Committed: Thu Mar 6 19:56:01 2014 -0500

----------------------------------------------------------------------
 .../recipes/locks/InterProcessLock.java         |  11 +-
 .../recipes/locks/InterProcessMultiLock.java    |  15 +
 .../locks/InterProcessReadWriteLock.java        |   4 +-
 .../locks/InterProcessReadWriteLockBase.java    |  37 +++
 .../locks/InterProcessSemaphoreMutex.java       |   7 +
 .../locks/InterProcessSemaphoreReadWrite.java   | 208 +++++++++++++
 .../recipes/locks/InterProcessSemaphoreV2.java  |  37 ++-
 .../locks/TestInterProcessMultiMutex.java       |  13 +
 .../locks/TestInterProcessReadWriteLock.java    | 271 +----------------
 .../TestInterProcessReadWriteLockBase.java      | 289 +++++++++++++++++++
 .../TestInterProcessSemaphoreReadWrite.java     |  49 ++++
 .../x/rest/support/InterProcessLockBridge.java  |   7 +
 .../InterProcessReadWriteLockBridge.java        |   7 +
 13 files changed, 678 insertions(+), 277 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/curator/blob/9f757b33/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/InterProcessLock.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/InterProcessLock.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/InterProcessLock.java
index 7192eae..63b7b14 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/InterProcessLock.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/InterProcessLock.java
@@ -18,6 +18,7 @@
  */
 package org.apache.curator.framework.recipes.locks;
 
+import java.util.Collection;
 import java.util.concurrent.TimeUnit;
 
 public interface InterProcessLock
@@ -56,5 +57,13 @@ public interface InterProcessLock
      *
      * @return true/false
      */
-    boolean isAcquiredInThisProcess();
+    public boolean isAcquiredInThisProcess();
+
+    /**
+     * Return a sorted list of all current nodes participating in the lock
+     *
+     * @return list of nodes
+     * @throws Exception ZK errors, interruptions, etc.
+     */
+    public Collection<String> getParticipantNodes() throws Exception;
 }

http://git-wip-us.apache.org/repos/asf/curator/blob/9f757b33/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/InterProcessMultiLock.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/InterProcessMultiLock.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/InterProcessMultiLock.java
index ca06b34..75ba3ae 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/InterProcessMultiLock.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/InterProcessMultiLock.java
@@ -21,6 +21,7 @@ package org.apache.curator.framework.recipes.locks;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
 import org.apache.curator.framework.CuratorFramework;
+import java.util.Collection;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
 
@@ -173,6 +174,20 @@ public class InterProcessMultiLock implements InterProcessLock
     }
 
     @Override
+    public Collection<String> getParticipantNodes() throws Exception
+    {
+        List<String> participants = Lists.newArrayList();
+        for ( InterProcessLock lock : locks )
+        {
+            if ( !lock.isAcquiredInThisProcess() )
+            {
+                participants.addAll(lock.getParticipantNodes());
+            }
+        }
+        return participants;
+    }
+
+    @Override
     public synchronized boolean isAcquiredInThisProcess()
     {
         // it's subjective what the correct meaning is here - I choose to return true

http://git-wip-us.apache.org/repos/asf/curator/blob/9f757b33/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/InterProcessReadWriteLock.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/InterProcessReadWriteLock.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/InterProcessReadWriteLock.java
index fafde5f..4476e6f 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/InterProcessReadWriteLock.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/InterProcessReadWriteLock.java
@@ -51,7 +51,7 @@ import java.util.List;
  *    lock to the write lock is not possible.
  * </p>
  */
-public class InterProcessReadWriteLock
+public class InterProcessReadWriteLock implements InterProcessReadWriteLockBase
 {
     private final InterProcessMutex readMutex;
     private final InterProcessMutex writeMutex;
@@ -145,6 +145,7 @@ public class InterProcessReadWriteLock
      *
      * @return read lock
      */
+    @Override
     public InterProcessMutex     readLock()
     {
         return readMutex;
@@ -155,6 +156,7 @@ public class InterProcessReadWriteLock
      *
      * @return write lock
      */
+    @Override
     public InterProcessMutex     writeLock()
     {
         return writeMutex;

http://git-wip-us.apache.org/repos/asf/curator/blob/9f757b33/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/InterProcessReadWriteLockBase.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/InterProcessReadWriteLockBase.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/InterProcessReadWriteLockBase.java
new file mode 100644
index 0000000..47025d6
--- /dev/null
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/InterProcessReadWriteLockBase.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.curator.framework.recipes.locks;
+
+public interface InterProcessReadWriteLockBase
+{
+    /**
+     * Returns the lock used for reading.
+     *
+     * @return read lock
+     */
+    InterProcessLock     readLock();
+
+    /**
+     * Returns the lock used for writing.
+     *
+     * @return write lock
+     */
+    InterProcessLock     writeLock();
+}

http://git-wip-us.apache.org/repos/asf/curator/blob/9f757b33/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/InterProcessSemaphoreMutex.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/InterProcessSemaphoreMutex.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/InterProcessSemaphoreMutex.java
index 88b5f5d..ca33b5d 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/InterProcessSemaphoreMutex.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/InterProcessSemaphoreMutex.java
@@ -20,6 +20,7 @@ package org.apache.curator.framework.recipes.locks;
 
 import com.google.common.base.Preconditions;
 import org.apache.curator.framework.CuratorFramework;
+import java.util.Collection;
 import java.util.concurrent.TimeUnit;
 
 /**
@@ -78,4 +79,10 @@ public class InterProcessSemaphoreMutex implements InterProcessLock
     {
         return (lease != null);
     }
+
+    @Override
+    public Collection<String> getParticipantNodes() throws Exception
+    {
+        return semaphore.getParticipantNodes();
+    }
 }

http://git-wip-us.apache.org/repos/asf/curator/blob/9f757b33/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/InterProcessSemaphoreReadWrite.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/InterProcessSemaphoreReadWrite.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/InterProcessSemaphoreReadWrite.java
new file mode 100644
index 0000000..26912c0
--- /dev/null
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/InterProcessSemaphoreReadWrite.java
@@ -0,0 +1,208 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.curator.framework.recipes.locks;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Predicate;
+import com.google.common.collect.Collections2;
+import org.apache.curator.framework.CuratorFramework;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+public class InterProcessSemaphoreReadWrite implements InterProcessReadWriteLockBase
+{
+    private final InterProcessSemaphoreV2 lock;
+    private final AtomicReference<Lease> writeLease = new AtomicReference<Lease>();
+    private final List<Lease> readLeases = new CopyOnWriteArrayList<Lease>();
+
+    private static final LockInternalsSorter sorter = new LockInternalsSorter()
+    {
+        @Override
+        public String fixForSorting(String str, String lockName)
+        {
+            return StandardLockInternalsDriver.standardFixForSorting(str, lockName);
+        }
+    };
+
+    private final InterProcessLock readLock = new InterProcessLock()
+    {
+        @Override
+        public void acquire() throws Exception
+        {
+            internalAcquire(-1, null, false);
+        }
+
+        @Override
+        public boolean acquire(long time, TimeUnit unit) throws Exception
+        {
+            return internalAcquire(time, unit, false);
+        }
+
+        @Override
+        public void release() throws Exception
+        {
+            internalRelease(false);
+        }
+
+        @Override
+        public Collection<String> getParticipantNodes() throws Exception
+        {
+            return Collections2.filter(lock.getParticipantNodes(), new Predicate<String>()
+            {
+                @Override
+                public boolean apply(String name)
+                {
+                    return name.contains(READ_BASE_NAME);
+                }
+            });
+        }
+
+        @Override
+        public boolean isAcquiredInThisProcess()
+        {
+            return readLeases.size() > 0;
+        }
+    };
+    private final InterProcessLock writeLock = new InterProcessLock()
+    {
+        @Override
+        public void acquire() throws Exception
+        {
+            internalAcquire(-1, null, true);
+        }
+
+        @Override
+        public boolean acquire(long time, TimeUnit unit) throws Exception
+        {
+            return internalAcquire(time, unit, true);
+        }
+
+        @Override
+        public void release() throws Exception
+        {
+            internalRelease(true);
+        }
+
+        @Override
+        public Collection<String> getParticipantNodes() throws Exception
+        {
+            return Collections2.filter(lock.getParticipantNodes(), new Predicate<String>()
+            {
+                @Override
+                public boolean apply(String name)
+                {
+                    return name.contains(WRITE_BASE_NAME);
+                }
+            });
+        }
+
+        @Override
+        public boolean isAcquiredInThisProcess()
+        {
+            return writeLease.get() != null;
+        }
+    };
+
+    private static final String SUFFIX = "-lease-";
+    private static final String READ_BASE_NAME = "read" + SUFFIX;
+    private static final String WRITE_BASE_NAME = "write" + SUFFIX;
+
+    public InterProcessSemaphoreReadWrite(CuratorFramework client, String path)
+    {
+        lock = new InterProcessSemaphoreV2(client, path, Integer.MAX_VALUE);
+    }
+
+    @Override
+    public InterProcessLock     readLock()
+    {
+        return readLock;
+    }
+
+    @Override
+    public InterProcessLock     writeLock()
+    {
+        return writeLock;
+    }
+
+    private boolean internalShouldGetLease(String ourNodeName, List<String> children, boolean isWriter)
+    {
+        Preconditions.checkArgument(children.size() > 0, "Empty children list");
+
+        children = LockInternals.getSortedChildren(SUFFIX, sorter, children);
+        String firstNodeName = children.get(0);
+
+        if ( isWriter )
+        {
+            return firstNodeName.equals(ourNodeName);
+        }
+
+        return !firstNodeName.contains(WRITE_BASE_NAME);
+    }
+
+    private boolean internalAcquire(long time, TimeUnit unit, final boolean isWriter) throws Exception
+    {
+        Preconditions.checkState(!isWriter || (writeLease.get() == null), "Write lock already held by this InterProcessSemaphoreReadWrite");
+
+        InterProcessSemaphoreV2.LeaseAcquirePredicate acquireFilter = new InterProcessSemaphoreV2.LeaseAcquirePredicate()
+        {
+            @Override
+            public boolean shouldGetLease(String ourNodeName, List<String> children, int maxLeases)
+            {
+                return internalShouldGetLease(ourNodeName, children, isWriter);
+            }
+        };
+        Collection<Lease> leases = lock.internalAcquire(1, time, unit, isWriter ? WRITE_BASE_NAME : READ_BASE_NAME, acquireFilter);
+        if ( leases.size() == 0 )
+        {
+            return false;
+        }
+
+        Lease lease = leases.iterator().next();
+        if ( isWriter )
+        {
+            writeLease.set(lease);
+        }
+        else
+        {
+            readLeases.add(lease);
+        }
+
+        return true;
+    }
+
+    private void internalRelease(boolean isWriter)
+    {
+        Lease lease;
+        if ( isWriter )
+        {
+            Preconditions.checkState(writeLease.get() != null, "Write lock not held by this InterProcessSemaphoreReadWrite");
+            lease = writeLease.getAndSet(null);
+        }
+        else
+        {
+            Preconditions.checkState(readLeases.size() > 0, "A read lock is not held by this InterProcessSemaphoreReadWrite");
+            lease = readLeases.remove(0);
+        }
+        lock.returnLease(lease);
+    }
+}

http://git-wip-us.apache.org/repos/asf/curator/blob/9f757b33/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/InterProcessSemaphoreV2.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/InterProcessSemaphoreV2.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/InterProcessSemaphoreV2.java
index fd27cbb..005727f 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/InterProcessSemaphoreV2.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/InterProcessSemaphoreV2.java
@@ -96,6 +96,15 @@ public class InterProcessSemaphoreV2
     private static final String LEASE_PARENT = "leases";
     private static final String LEASE_BASE_NAME = "lease-";
 
+    private static final LeaseAcquirePredicate defaultLeaseAcquirePredicate = new LeaseAcquirePredicate()
+    {
+        @Override
+        public boolean shouldGetLease(String ourNodeName, List<String> children, int maxLeases)
+        {
+            return children.size() <= maxLeases;
+        }
+    };
+
     /**
      * @param client    the client
      * @param path      path for the semaphore
@@ -203,7 +212,7 @@ public class InterProcessSemaphoreV2
      */
     public Lease acquire() throws Exception
     {
-        Collection<Lease> leases = acquire(1, 0, null);
+        Collection<Lease> leases = internalAcquire(1, 0, null, LEASE_BASE_NAME, defaultLeaseAcquirePredicate);
         return leases.iterator().next();
     }
 
@@ -221,7 +230,7 @@ public class InterProcessSemaphoreV2
      */
     public Collection<Lease> acquire(int qty) throws Exception
     {
-        return acquire(qty, 0, null);
+        return internalAcquire(qty, 0, null, LEASE_BASE_NAME, defaultLeaseAcquirePredicate);
     }
 
     /**
@@ -239,7 +248,7 @@ public class InterProcessSemaphoreV2
      */
     public Lease acquire(long time, TimeUnit unit) throws Exception
     {
-        Collection<Lease> leases = acquire(1, time, unit);
+        Collection<Lease> leases = internalAcquire(1, time, unit, LEASE_BASE_NAME, defaultLeaseAcquirePredicate);
         return (leases != null) ? leases.iterator().next() : null;
     }
 
@@ -261,6 +270,16 @@ public class InterProcessSemaphoreV2
      */
     public Collection<Lease> acquire(int qty, long time, TimeUnit unit) throws Exception
     {
+        return internalAcquire(qty, time, unit, LEASE_BASE_NAME, defaultLeaseAcquirePredicate);
+    }
+
+    interface LeaseAcquirePredicate
+    {
+        boolean shouldGetLease(String ourNodeName, List<String> children, int maxLeases);
+    }
+
+    protected Collection<Lease> internalAcquire(int qty, long time, TimeUnit unit, String leaseBaseName, LeaseAcquirePredicate acquireFilter) throws Exception
+    {
         long startMs = System.currentTimeMillis();
         boolean hasWait = (unit != null);
         long waitMs = hasWait ? TimeUnit.MILLISECONDS.convert(time, unit) : 0;
@@ -278,7 +297,7 @@ public class InterProcessSemaphoreV2
                 boolean isDone = false;
                 while ( !isDone )
                 {
-                    switch ( internalAcquire1Lease(builder, startMs, hasWait, waitMs) )
+                    switch ( internalAcquire1Lease(builder, startMs, hasWait, waitMs, leaseBaseName, acquireFilter) )
                     {
                         case CONTINUE:
                         {
@@ -325,7 +344,7 @@ public class InterProcessSemaphoreV2
         RETRY_DUE_TO_MISSING_NODE
     }
 
-    private InternalAcquireResult internalAcquire1Lease(ImmutableList.Builder<Lease> builder, long startMs, boolean hasWait, long waitMs) throws Exception
+    private InternalAcquireResult internalAcquire1Lease(ImmutableList.Builder<Lease> builder, long startMs, boolean hasWait, long waitMs, String leaseBaseName, LeaseAcquirePredicate acquireFilter) throws Exception
     {
         if ( client.getState() != CuratorFrameworkState.STARTED )
         {
@@ -347,8 +366,8 @@ public class InterProcessSemaphoreV2
         try
         {
             PathAndBytesable<String> createBuilder = client.create().creatingParentsIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL);
-            String path = (nodeData != null) ? createBuilder.forPath(ZKPaths.makePath(leasesPath, LEASE_BASE_NAME), nodeData) : createBuilder.forPath(ZKPaths.makePath(leasesPath, LEASE_BASE_NAME));
-            String nodeName = ZKPaths.getNodeFromPath(path);
+            String path = (nodeData != null) ? createBuilder.forPath(ZKPaths.makePath(leasesPath, leaseBaseName), nodeData) : createBuilder.forPath(ZKPaths.makePath(leasesPath, leaseBaseName));
+            String ourNodeName = ZKPaths.getNodeFromPath(path);
             builder.add(makeLease(path));
 
             synchronized(this)
@@ -356,13 +375,13 @@ public class InterProcessSemaphoreV2
                 for(;;)
                 {
                     List<String> children = client.getChildren().usingWatcher(watcher).forPath(leasesPath);
-                    if ( !children.contains(nodeName) )
+                    if ( !children.contains(ourNodeName) )
                     {
                         log.error("Sequential path not found: " + path);
                         return InternalAcquireResult.RETRY_DUE_TO_MISSING_NODE;
                     }
 
-                    if ( children.size() <= maxLeases )
+                    if ( acquireFilter.shouldGetLease(ourNodeName, children, maxLeases) )
                     {
                         break;
                     }

http://git-wip-us.apache.org/repos/asf/curator/blob/9f757b33/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessMultiMutex.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessMultiMutex.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessMultiMutex.java
index b1631a0..b6a9f42 100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessMultiMutex.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessMultiMutex.java
@@ -25,6 +25,7 @@ import org.testng.Assert;
 import org.testng.annotations.Test;
 import java.io.IOException;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
@@ -69,6 +70,12 @@ public class TestInterProcessMultiMutex extends TestInterProcessMutexBase
                 }
 
                 @Override
+                public Collection<String> getParticipantNodes() throws Exception
+                {
+                    throw new UnsupportedOperationException();
+                }
+
+                @Override
                 public boolean isAcquiredInThisProcess()
                 {
                     return otherGoodLock.isAcquiredInThisProcess();
@@ -116,6 +123,12 @@ public class TestInterProcessMultiMutex extends TestInterProcessMutexBase
                 }
 
                 @Override
+                public Collection<String> getParticipantNodes() throws Exception
+                {
+                    throw new UnsupportedOperationException();
+                }
+
+                @Override
                 public boolean acquire(long time, TimeUnit unit) throws Exception
                 {
                     throw new Exception("foo");

http://git-wip-us.apache.org/repos/asf/curator/blob/9f757b33/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessReadWriteLock.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessReadWriteLock.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessReadWriteLock.java
index 5b44c7c..91e1ce0 100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessReadWriteLock.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessReadWriteLock.java
@@ -16,277 +16,16 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.curator.framework.recipes.locks;
 
-import com.google.common.collect.Lists;
-import org.apache.curator.utils.CloseableUtils;
 import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.CuratorFrameworkFactory;
-import org.apache.curator.framework.recipes.BaseClassForTests;
-import org.apache.curator.retry.RetryOneTime;
-import org.testng.Assert;
-import org.testng.annotations.Test;
-import java.util.Collection;
-import java.util.List;
-import java.util.Random;
-import java.util.concurrent.Callable;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
 
-public class TestInterProcessReadWriteLock extends BaseClassForTests
+public class TestInterProcessReadWriteLock extends TestInterProcessReadWriteLockBase
 {
-    @Test
-    public void     testGetParticipantNodes() throws Exception
-    {
-        final int               READERS = 20;
-        final int               WRITERS = 8;
-
-        CuratorFramework        client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1));
-        try
-        {
-            client.start();
-
-            final CountDownLatch              latch = new CountDownLatch(READERS + WRITERS);
-            final CountDownLatch              readLatch = new CountDownLatch(READERS);
-            final InterProcessReadWriteLock   lock = new InterProcessReadWriteLock(client, "/lock");
-
-            ExecutorService                   service = Executors.newCachedThreadPool();
-            for ( int i = 0; i < READERS; ++i )
-            {
-                service.submit
-                (
-                    new Callable<Void>()
-                    {
-                        @Override
-                        public Void call() throws Exception
-                        {
-                            lock.readLock().acquire();
-                            latch.countDown();
-                            readLatch.countDown();
-                            return null;
-                        }
-                    }
-                );
-            }
-            for ( int i = 0; i < WRITERS; ++i )
-            {
-                service.submit
-                (
-                    new Callable<Void>()
-                    {
-                        @Override
-                        public Void call() throws Exception
-                        {
-                            Assert.assertTrue(readLatch.await(10, TimeUnit.SECONDS));
-                            latch.countDown();  // must be before as there can only be one writer
-                            lock.writeLock().acquire();
-                            return null;
-                        }
-                    }
-                );
-            }
-
-            Assert.assertTrue(latch.await(10, TimeUnit.SECONDS));
-
-            Collection<String> readers = lock.readLock().getParticipantNodes();
-            Collection<String> writers = lock.writeLock().getParticipantNodes();
-
-            Assert.assertEquals(readers.size(), READERS);
-            Assert.assertEquals(writers.size(), WRITERS);
-        }
-        finally
-        {
-            CloseableUtils.closeQuietly(client);
-        }
-    }
-
-    @Test
-    public void     testThatUpgradingIsDisallowed() throws Exception
-    {
-        CuratorFramework        client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1));
-        try
-        {
-            client.start();
-
-            InterProcessReadWriteLock   lock = new InterProcessReadWriteLock(client, "/lock");
-            lock.readLock().acquire();
-            Assert.assertFalse(lock.writeLock().acquire(5, TimeUnit.SECONDS));
-
-            lock.readLock().release();
-        }
-        finally
-        {
-            CloseableUtils.closeQuietly(client);
-        }
-    }
-
-    @Test
-    public void     testThatDowngradingRespectsThreads() throws Exception
-    {
-        CuratorFramework        client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1));
-        try
-        {
-            client.start();
-
-            final InterProcessReadWriteLock   lock = new InterProcessReadWriteLock(client, "/lock");
-            ExecutorService                   t1 = Executors.newSingleThreadExecutor();
-            ExecutorService                   t2 = Executors.newSingleThreadExecutor();
-
-            final CountDownLatch              latch = new CountDownLatch(1);
-
-            Future<Object>                    f1 = t1.submit
-            (
-                new Callable<Object>()
-                {
-                    @Override
-                    public Object call() throws Exception
-                    {
-                        lock.writeLock().acquire();
-                        latch.countDown();
-                        return null;
-                    }
-                }
-            );
-
-            Future<Object>                    f2 = t2.submit
-            (
-                new Callable<Object>()
-                {
-                    @Override
-                    public Object call() throws Exception
-                    {
-                        Assert.assertTrue(latch.await(10, TimeUnit.SECONDS));
-                        Assert.assertFalse(lock.readLock().acquire(5, TimeUnit.SECONDS));
-                        return null;
-                    }
-                }
-            );
-
-            f1.get();
-            f2.get();
-        }
-        finally
-        {
-            CloseableUtils.closeQuietly(client);
-        }
-    }
-
-    @Test
-    public void     testDowngrading() throws Exception
-    {
-        CuratorFramework        client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1));
-        try
-        {
-            client.start();
-
-            InterProcessReadWriteLock   lock = new InterProcessReadWriteLock(client, "/lock");
-            lock.writeLock().acquire();
-            Assert.assertTrue(lock.readLock().acquire(5, TimeUnit.SECONDS));
-            lock.writeLock().release();
-
-            lock.readLock().release();
-        }
-        finally
-        {
-            CloseableUtils.closeQuietly(client);
-        }
-    }
-
-    @Test
-    public void     testBasic() throws Exception
+    @Override
+    protected InterProcessReadWriteLockBase newLock(CuratorFramework client, String path)
     {
-        final int               CONCURRENCY = 8;
-        final int               ITERATIONS = 100;
-
-        final Random            random = new Random();
-        final AtomicInteger     concurrentCount = new AtomicInteger(0);
-        final AtomicInteger     maxConcurrentCount = new AtomicInteger(0);
-        final AtomicInteger     writeCount = new AtomicInteger(0);
-        final AtomicInteger     readCount = new AtomicInteger(0);
-
-        List<Future<Void>>  futures = Lists.newArrayList();
-        ExecutorService     service = Executors.newCachedThreadPool();
-        for ( int i = 0; i < CONCURRENCY; ++i )
-        {
-            Future<Void>    future = service.submit
-            (
-                new Callable<Void>()
-                {
-                    @Override
-                    public Void call() throws Exception
-                    {
-                        CuratorFramework        client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1));
-                        client.start();
-                        try
-                        {
-                            InterProcessReadWriteLock   lock = new InterProcessReadWriteLock(client, "/lock");
-                            for ( int i = 0; i < ITERATIONS; ++i )
-                            {
-                                if ( random.nextInt(100) < 10 )
-                                {
-                                    doLocking(lock.writeLock(), concurrentCount, maxConcurrentCount, random, 1);
-                                    writeCount.incrementAndGet();
-                                }
-                                else
-                                {
-                                    doLocking(lock.readLock(), concurrentCount, maxConcurrentCount, random, Integer.MAX_VALUE);
-                                    readCount.incrementAndGet();
-                                }
-                            }
-                        }
-                        finally
-                        {
-                            CloseableUtils.closeQuietly(client);
-                        }
-                        return null;
-                    }
-                }
-            );
-            futures.add(future);
-        }
-
-        for ( Future<Void> future : futures )
-        {
-            future.get();
-        }
-
-        System.out.println("Writes: " + writeCount.get() + " - Reads: " + readCount.get() + " - Max Reads: " + maxConcurrentCount.get());
-
-        Assert.assertTrue(writeCount.get() > 0);
-        Assert.assertTrue(readCount.get() > 0);
-        Assert.assertTrue(maxConcurrentCount.get() > 1);
-    }
-
-    private void doLocking(InterProcessLock lock, AtomicInteger concurrentCount, AtomicInteger maxConcurrentCount, Random random, int maxAllowed) throws Exception
-    {
-        try
-        {
-            Assert.assertTrue(lock.acquire(10, TimeUnit.SECONDS));
-            int     localConcurrentCount;
-            synchronized(this)
-            {
-                localConcurrentCount = concurrentCount.incrementAndGet();
-                if ( localConcurrentCount > maxConcurrentCount.get() )
-                {
-                    maxConcurrentCount.set(localConcurrentCount);
-                }
-            }
-
-            Assert.assertTrue(localConcurrentCount <= maxAllowed, "" + localConcurrentCount);
-
-            Thread.sleep(random.nextInt(9) + 1);
-        }
-        finally
-        {
-            synchronized(this)
-            {
-                concurrentCount.decrementAndGet();
-                lock.release();
-            }
-        }
+        return new InterProcessReadWriteLock(client, path);
     }
 }

http://git-wip-us.apache.org/repos/asf/curator/blob/9f757b33/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessReadWriteLockBase.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessReadWriteLockBase.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessReadWriteLockBase.java
new file mode 100644
index 0000000..81a6b14
--- /dev/null
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessReadWriteLockBase.java
@@ -0,0 +1,289 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.curator.framework.recipes.locks;
+
+import com.google.common.collect.Lists;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.framework.recipes.BaseClassForTests;
+import org.apache.curator.retry.RetryOneTime;
+import org.apache.curator.test.Timing;
+import org.apache.curator.utils.CloseableUtils;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+import java.util.Collection;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public abstract class TestInterProcessReadWriteLockBase extends BaseClassForTests
+{
+    @Test
+    public void testGetParticipantNodes() throws Exception
+    {
+        final int READERS = 20;
+        final int WRITERS = 8;
+
+        final CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1));
+        try
+        {
+            client.start();
+
+            final CountDownLatch latch = new CountDownLatch(READERS + WRITERS);
+            final CountDownLatch readLatch = new CountDownLatch(READERS);
+
+            ExecutorService service = Executors.newCachedThreadPool();
+            for ( int i = 0; i < READERS; ++i )
+            {
+                service.submit(new Callable<Void>()
+                {
+                    @Override
+                    public Void call() throws Exception
+                    {
+                        InterProcessReadWriteLockBase lock = newLock(client, "/lock");
+                        lock.readLock().acquire();
+                        latch.countDown();
+                        readLatch.countDown();
+                        return null;
+                    }
+                });
+            }
+            for ( int i = 0; i < WRITERS; ++i )
+            {
+                service.submit(new Callable<Void>()
+                {
+                    @Override
+                    public Void call() throws Exception
+                    {
+                        InterProcessReadWriteLockBase lock = newLock(client, "/lock");
+                        Assert.assertTrue(readLatch.await(10, TimeUnit.SECONDS));
+                        latch.countDown();  // must be before as there can only be one writer
+                        lock.writeLock().acquire();
+                        return null;
+                    }
+                });
+            }
+
+            Assert.assertTrue(latch.await(10, TimeUnit.SECONDS));
+            new Timing().sleepABit();
+
+            InterProcessReadWriteLockBase lock = newLock(client, "/lock");
+            Collection<String> readers = lock.readLock().getParticipantNodes();
+            Collection<String> writers = lock.writeLock().getParticipantNodes();
+
+            Assert.assertEquals(readers.size(), READERS);
+            Assert.assertEquals(writers.size(), WRITERS);
+        }
+        finally
+        {
+            CloseableUtils.closeQuietly(client);
+        }
+    }
+
+    @Test
+    public void testThatUpgradingIsDisallowed() throws Exception
+    {
+        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1));
+        try
+        {
+            client.start();
+
+            InterProcessReadWriteLockBase lock = newLock(client, "/lock");
+            lock.readLock().acquire();
+            Assert.assertFalse(lock.writeLock().acquire(5, TimeUnit.SECONDS));
+
+            lock.readLock().release();
+        }
+        finally
+        {
+            CloseableUtils.closeQuietly(client);
+        }
+    }
+
+    @Test
+    public void testThatDowngradingRespectsThreads() throws Exception
+    {
+        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1));
+        try
+        {
+            client.start();
+
+            final InterProcessReadWriteLockBase lock = newLock(client, "/lock");
+            ExecutorService t1 = Executors.newSingleThreadExecutor();
+            ExecutorService t2 = Executors.newSingleThreadExecutor();
+
+            final CountDownLatch latch = new CountDownLatch(1);
+
+            Future<Object> f1 = t1.submit(new Callable<Object>()
+            {
+                @Override
+                public Object call() throws Exception
+                {
+                    lock.writeLock().acquire();
+                    latch.countDown();
+                    return null;
+                }
+            });
+
+            Future<Object> f2 = t2.submit(new Callable<Object>()
+            {
+                @Override
+                public Object call() throws Exception
+                {
+                    Assert.assertTrue(latch.await(10, TimeUnit.SECONDS));
+                    Assert.assertFalse(lock.readLock().acquire(5, TimeUnit.SECONDS));
+                    return null;
+                }
+            });
+
+            f1.get();
+            f2.get();
+        }
+        finally
+        {
+            CloseableUtils.closeQuietly(client);
+        }
+    }
+
+    @Test
+    public void testDowngrading() throws Exception
+    {
+        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1));
+        try
+        {
+            client.start();
+
+            InterProcessReadWriteLockBase lock = newLock(client, "/lock");
+            lock.writeLock().acquire();
+            Assert.assertTrue(lock.readLock().acquire(5, TimeUnit.SECONDS));
+            lock.writeLock().release();
+
+            lock.readLock().release();
+        }
+        finally
+        {
+            CloseableUtils.closeQuietly(client);
+        }
+    }
+
+    @Test
+    public void testBasic() throws Exception
+    {
+        final int CONCURRENCY = 8;
+        final int ITERATIONS = 100;
+
+        final Random random = new Random();
+        final AtomicInteger concurrentCount = new AtomicInteger(0);
+        final AtomicInteger maxConcurrentCount = new AtomicInteger(0);
+        final AtomicInteger writeCount = new AtomicInteger(0);
+        final AtomicInteger readCount = new AtomicInteger(0);
+
+        List<Future<Void>> futures = Lists.newArrayList();
+        ExecutorService service = Executors.newCachedThreadPool();
+        for ( int i = 0; i < CONCURRENCY; ++i )
+        {
+            Future<Void> future = service.submit(new Callable<Void>()
+            {
+                @Override
+                public Void call() throws Exception
+                {
+                    CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1));
+                    client.start();
+                    try
+                    {
+                        InterProcessReadWriteLockBase lock = newLock(client, "/lock");
+                        for ( int i = 0; i < ITERATIONS; ++i )
+                        {
+                            if ( random.nextInt(100) < 10 )
+                            {
+                                doLocking(lock.writeLock(), concurrentCount, maxConcurrentCount, random, 1);
+                                writeCount.incrementAndGet();
+                            }
+                            else
+                            {
+                                doLocking(lock.readLock(), concurrentCount, maxConcurrentCount, random, Integer.MAX_VALUE);
+                                readCount.incrementAndGet();
+                            }
+                        }
+                    }
+                    finally
+                    {
+                        CloseableUtils.closeQuietly(client);
+                    }
+                    return null;
+                }
+            });
+            futures.add(future);
+        }
+
+        for ( Future<Void> future : futures )
+        {
+            future.get();
+        }
+
+        System.out.println("Writes: " + writeCount.get() + " - Reads: " + readCount.get() + " - Max Reads: " + maxConcurrentCount.get());
+
+        Assert.assertTrue(writeCount.get() > 0);
+        Assert.assertTrue(readCount.get() > 0);
+        Assert.assertTrue(maxConcurrentCount.get() > 1);
+    }
+
+    protected abstract InterProcessReadWriteLockBase newLock(CuratorFramework client, String path);
+
+    private void doLocking(InterProcessLock lock, AtomicInteger concurrentCount, AtomicInteger maxConcurrentCount, Random random, int maxAllowed) throws Exception
+    {
+        boolean hasTheLock = false;
+        try
+        {
+            Assert.assertTrue(lock.acquire(10, TimeUnit.SECONDS));
+            hasTheLock = true;
+            int localConcurrentCount;
+            synchronized(this)
+            {
+                localConcurrentCount = concurrentCount.incrementAndGet();
+                if ( localConcurrentCount > maxConcurrentCount.get() )
+                {
+                    maxConcurrentCount.set(localConcurrentCount);
+                }
+            }
+
+            Assert.assertTrue(localConcurrentCount <= maxAllowed, "" + localConcurrentCount);
+
+            Thread.sleep(random.nextInt(9) + 1);
+        }
+        finally
+        {
+            if ( hasTheLock )
+            {
+                synchronized(this)
+                {
+                    concurrentCount.decrementAndGet();
+                    lock.release();
+                }
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/curator/blob/9f757b33/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessSemaphoreReadWrite.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessSemaphoreReadWrite.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessSemaphoreReadWrite.java
new file mode 100644
index 0000000..7faa198
--- /dev/null
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessSemaphoreReadWrite.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.curator.framework.recipes.locks;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.testng.annotations.Test;
+
+public class TestInterProcessSemaphoreReadWrite
+{
+    @Test
+    public void testBasic() throws Exception
+    {
+        TestInterProcessReadWriteLockBase base = new TestInterProcessReadWriteLockBase()
+        {
+            @Override
+            protected InterProcessReadWriteLockBase newLock(CuratorFramework client, String path)
+            {
+                return new InterProcessSemaphoreReadWrite(client, path);
+            }
+        };
+
+        base.setup();
+        try
+        {
+            base.testBasic();
+        }
+        finally
+        {
+            base.teardown();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/curator/blob/9f757b33/curator-x-rest/src/test/java/org/apache/curator/x/rest/support/InterProcessLockBridge.java
----------------------------------------------------------------------
diff --git a/curator-x-rest/src/test/java/org/apache/curator/x/rest/support/InterProcessLockBridge.java b/curator-x-rest/src/test/java/org/apache/curator/x/rest/support/InterProcessLockBridge.java
index 1da27ba..8d273f1 100644
--- a/curator-x-rest/src/test/java/org/apache/curator/x/rest/support/InterProcessLockBridge.java
+++ b/curator-x-rest/src/test/java/org/apache/curator/x/rest/support/InterProcessLockBridge.java
@@ -29,6 +29,7 @@ import org.apache.curator.x.rest.entities.LockSpec;
 import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response;
 import java.net.URI;
+import java.util.Collection;
 import java.util.concurrent.TimeUnit;
 
 public class InterProcessLockBridge implements InterProcessLock
@@ -92,6 +93,12 @@ public class InterProcessLockBridge implements InterProcessLock
     }
 
     @Override
+    public Collection<String> getParticipantNodes() throws Exception
+    {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
     public boolean isAcquiredInThisProcess()
     {
         return (id != null);

http://git-wip-us.apache.org/repos/asf/curator/blob/9f757b33/curator-x-rest/src/test/java/org/apache/curator/x/rest/support/InterProcessReadWriteLockBridge.java
----------------------------------------------------------------------
diff --git a/curator-x-rest/src/test/java/org/apache/curator/x/rest/support/InterProcessReadWriteLockBridge.java b/curator-x-rest/src/test/java/org/apache/curator/x/rest/support/InterProcessReadWriteLockBridge.java
index ea21f98..2f08aa1 100644
--- a/curator-x-rest/src/test/java/org/apache/curator/x/rest/support/InterProcessReadWriteLockBridge.java
+++ b/curator-x-rest/src/test/java/org/apache/curator/x/rest/support/InterProcessReadWriteLockBridge.java
@@ -28,6 +28,7 @@ import org.apache.curator.x.rest.entities.LockSpec;
 import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response;
 import java.net.URI;
+import java.util.Collection;
 import java.util.concurrent.TimeUnit;
 
 public class InterProcessReadWriteLockBridge
@@ -108,6 +109,12 @@ public class InterProcessReadWriteLockBridge
         }
 
         @Override
+        public Collection<String> getParticipantNodes() throws Exception
+        {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
         public void release() throws Exception
         {
             String localId = id.get();


[2/2] git commit: ReadWriteLockResource must use InterProcessSemaphoreReadWrite

Posted by ra...@apache.org.
ReadWriteLockResource must use InterProcessSemaphoreReadWrite


Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/ae481351
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/ae481351
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/ae481351

Branch: refs/heads/CURATOR-88
Commit: ae4813512aaa8b64e226d02d08a1450efd310a05
Parents: 9f757b3
Author: randgalt <ra...@apache.org>
Authored: Thu Mar 6 19:59:17 2014 -0500
Committer: randgalt <ra...@apache.org>
Committed: Thu Mar 6 19:59:17 2014 -0500

----------------------------------------------------------------------
 .../x/rest/api/ReadWriteLockResource.java       |  14 +-
 .../apache/curator/x/rest/api/TestLocks.java    | 309 +++++++++----------
 2 files changed, 151 insertions(+), 172 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/curator/blob/ae481351/curator-x-rest/src/main/java/org/apache/curator/x/rest/api/ReadWriteLockResource.java
----------------------------------------------------------------------
diff --git a/curator-x-rest/src/main/java/org/apache/curator/x/rest/api/ReadWriteLockResource.java b/curator-x-rest/src/main/java/org/apache/curator/x/rest/api/ReadWriteLockResource.java
index b501d0e..8792323 100644
--- a/curator-x-rest/src/main/java/org/apache/curator/x/rest/api/ReadWriteLockResource.java
+++ b/curator-x-rest/src/main/java/org/apache/curator/x/rest/api/ReadWriteLockResource.java
@@ -18,8 +18,8 @@
  */
 package org.apache.curator.x.rest.api;
 
-import org.apache.curator.framework.recipes.locks.InterProcessMutex;
-import org.apache.curator.framework.recipes.locks.InterProcessReadWriteLock;
+import org.apache.curator.framework.recipes.locks.InterProcessLock;
+import org.apache.curator.framework.recipes.locks.InterProcessSemaphoreReadWrite;
 import org.apache.curator.x.rest.CuratorRestContext;
 import org.apache.curator.x.rest.entities.Id;
 import org.apache.curator.x.rest.entities.LockSpec;
@@ -69,24 +69,24 @@ public class ReadWriteLockResource
     @Path("{lock-id}")
     public Response releaseLock(@PathParam("lock-id") String lockId) throws Exception
     {
-        InterProcessMutex lock = Constants.deleteThing(context.getSession(), lockId, InterProcessMutex.class);
+        InterProcessLock lock = Constants.deleteThing(context.getSession(), lockId, InterProcessLock.class);
         lock.release();
         return Response.ok().build();
     }
 
     private Response internalLock(final LockSpec lockSpec, boolean writeLock) throws Exception
     {
-        InterProcessReadWriteLock lock = new InterProcessReadWriteLock(context.getClient(), lockSpec.getPath());
-        InterProcessMutex actualLock = writeLock ? lock.writeLock() : lock.readLock();
+        InterProcessSemaphoreReadWrite lock = new InterProcessSemaphoreReadWrite(context.getClient(), lockSpec.getPath());
+        InterProcessLock actualLock = writeLock ? lock.writeLock() : lock.readLock();
         if ( !actualLock.acquire(lockSpec.getMaxWaitMs(), TimeUnit.MILLISECONDS) )
         {
             return Response.status(Response.Status.SERVICE_UNAVAILABLE).build();
         }
 
-        Closer<InterProcessMutex> closer = new Closer<InterProcessMutex>()
+        Closer<InterProcessLock> closer = new Closer<InterProcessLock>()
         {
             @Override
-            public void close(InterProcessMutex lock)
+            public void close(InterProcessLock lock)
             {
                 if ( lock.isAcquiredInThisProcess() )
                 {

http://git-wip-us.apache.org/repos/asf/curator/blob/ae481351/curator-x-rest/src/test/java/org/apache/curator/x/rest/api/TestLocks.java
----------------------------------------------------------------------
diff --git a/curator-x-rest/src/test/java/org/apache/curator/x/rest/api/TestLocks.java b/curator-x-rest/src/test/java/org/apache/curator/x/rest/api/TestLocks.java
index 168329e..7ea6245 100644
--- a/curator-x-rest/src/test/java/org/apache/curator/x/rest/api/TestLocks.java
+++ b/curator-x-rest/src/test/java/org/apache/curator/x/rest/api/TestLocks.java
@@ -62,62 +62,56 @@ public class TestLocks extends BaseClassForTests
         final AtomicReference<Exception> exceptionRef = new AtomicReference<Exception>();
 
         ExecutorService service = Executors.newCachedThreadPool();
-        Future<Object> future1 = service.submit
-            (
-                new Callable<Object>()
+        Future<Object> future1 = service.submit(new Callable<Object>()
+        {
+            @Override
+            public Object call() throws Exception
+            {
+                try
                 {
-                    @Override
-                    public Object call() throws Exception
+                    if ( !mutexForClient1.acquire(10, TimeUnit.SECONDS) )
                     {
-                        try
-                        {
-                            if ( !mutexForClient1.acquire(10, TimeUnit.SECONDS) )
-                            {
-                                throw new Exception("mutexForClient1.acquire timed out");
-                            }
-                            acquiredLatchForClient1.countDown();
-                            if ( !latchForClient1.await(10, TimeUnit.SECONDS) )
-                            {
-                                throw new Exception("latchForClient1 timed out");
-                            }
-                            mutexForClient1.release();
-                        }
-                        catch ( Exception e )
-                        {
-                            exceptionRef.set(e);
-                        }
-                        return null;
+                        throw new Exception("mutexForClient1.acquire timed out");
+                    }
+                    acquiredLatchForClient1.countDown();
+                    if ( !latchForClient1.await(10, TimeUnit.SECONDS) )
+                    {
+                        throw new Exception("latchForClient1 timed out");
                     }
+                    mutexForClient1.release();
                 }
-            );
-        Future<Object> future2 = service.submit
-            (
-                new Callable<Object>()
+                catch ( Exception e )
                 {
-                    @Override
-                    public Object call() throws Exception
+                    exceptionRef.set(e);
+                }
+                return null;
+            }
+        });
+        Future<Object> future2 = service.submit(new Callable<Object>()
+        {
+            @Override
+            public Object call() throws Exception
+            {
+                try
+                {
+                    if ( !mutexForClient2.acquire(10, TimeUnit.SECONDS) )
                     {
-                        try
-                        {
-                            if ( !mutexForClient2.acquire(10, TimeUnit.SECONDS) )
-                            {
-                                throw new Exception("mutexForClient2.acquire timed out");
-                            }
-                            acquiredLatchForClient2.countDown();
-                            if ( !latchForClient2.await(10, TimeUnit.SECONDS) )
-                            {
-                                throw new Exception("latchForClient2 timed out");
-                            }
-                            mutexForClient2.release();
-                        }
-                        catch ( Exception e )
-                        {
-                            exceptionRef.set(e);
-                        }
-                        return null;
+                        throw new Exception("mutexForClient2.acquire timed out");
+                    }
+                    acquiredLatchForClient2.countDown();
+                    if ( !latchForClient2.await(10, TimeUnit.SECONDS) )
+                    {
+                        throw new Exception("latchForClient2 timed out");
                     }
+                    mutexForClient2.release();
                 }
-            );
+                catch ( Exception e )
+                {
+                    exceptionRef.set(e);
+                }
+                return null;
+            }
+        });
 
         while ( !mutexForClient1.isAcquiredInThisProcess() && !mutexForClient2.isAcquiredInThisProcess() )
         {
@@ -183,41 +177,38 @@ public class TestLocks extends BaseClassForTests
         ExecutorCompletionService<Object> service = new ExecutorCompletionService<Object>(Executors.newFixedThreadPool(2));
         for ( int i = 0; i < 2; ++i )
         {
-            service.submit
-                (
-                    new Callable<Object>()
+            service.submit(new Callable<Object>()
+            {
+                @Override
+                public Object call() throws Exception
+                {
+                    InterProcessLock lock = new InterProcessLockBridge(restClient, sessionManager, uriMaker, "/lock");
+                    lock.acquire();
+                    try
                     {
-                        @Override
-                        public Object call() throws Exception
+                        if ( isFirst.compareAndSet(true, false) )
                         {
-                            InterProcessLock lock = new InterProcessLockBridge(restClient, sessionManager, uriMaker, "/lock");
-                            lock.acquire();
-                            try
-                            {
-                                if ( isFirst.compareAndSet(true, false) )
-                                {
-                                    timing.sleepABit();
-
-                                    server.stop();
-                                    Assert.assertTrue(timing.awaitLatch(latch));
-                                    server = new TestingServer(server.getPort(), server.getTempDirectory());
-                                }
-                            }
-                            finally
-                            {
-                                try
-                                {
-                                    lock.release();
-                                }
-                                catch ( Exception e )
-                                {
-                                    // ignore
-                                }
-                            }
-                            return null;
+                            timing.sleepABit();
+
+                            server.stop();
+                            Assert.assertTrue(timing.awaitLatch(latch));
+                            server = new TestingServer(server.getPort(), server.getTempDirectory());
                         }
                     }
-                );
+                    finally
+                    {
+                        try
+                        {
+                            lock.release();
+                        }
+                        catch ( Exception e )
+                        {
+                            // ignore
+                        }
+                    }
+                    return null;
+                }
+            });
         }
 
         for ( int i = 0; i < 2; ++i )
@@ -236,35 +227,29 @@ public class TestLocks extends BaseClassForTests
 
         final Semaphore semaphore = new Semaphore(0);
         ExecutorCompletionService<Object> service = new ExecutorCompletionService<Object>(Executors.newFixedThreadPool(2));
-        service.submit
-            (
-                new Callable<Object>()
-                {
-                    @Override
-                    public Object call() throws Exception
-                    {
-                        mutex1.acquire();
-                        semaphore.release();
-                        Thread.sleep(1000000);
-                        return null;
-                    }
-                }
-            );
+        service.submit(new Callable<Object>()
+        {
+            @Override
+            public Object call() throws Exception
+            {
+                mutex1.acquire();
+                semaphore.release();
+                Thread.sleep(1000000);
+                return null;
+            }
+        });
 
-        service.submit
-            (
-                new Callable<Object>()
-                {
-                    @Override
-                    public Object call() throws Exception
-                    {
-                        mutex2.acquire();
-                        semaphore.release();
-                        Thread.sleep(1000000);
-                        return null;
-                    }
-                }
-            );
+        service.submit(new Callable<Object>()
+        {
+            @Override
+            public Object call() throws Exception
+            {
+                mutex2.acquire();
+                semaphore.release();
+                Thread.sleep(1000000);
+                return null;
+            }
+        });
 
         Assert.assertTrue(timing.acquireSemaphore(semaphore, 1));
         KillSession.kill(getCuratorRestContext().getClient().getZookeeperClient().getZooKeeper(), server.getConnectString());
@@ -285,40 +270,37 @@ public class TestLocks extends BaseClassForTests
         for ( int i = 0; i < THREAD_QTY; ++i )
         {
             final InterProcessLock mutex = new InterProcessLockBridge(restClient, sessionManager, uriMaker, "/lock");
-            Future<Object> t = service.submit
-                (
-                    new Callable<Object>()
+            Future<Object> t = service.submit(new Callable<Object>()
+            {
+                @Override
+                public Object call() throws Exception
+                {
+                    semaphore.acquire();
+                    mutex.acquire();
+                    Assert.assertTrue(hasLock.compareAndSet(false, true));
+                    try
                     {
-                        @Override
-                        public Object call() throws Exception
+                        if ( isFirst.compareAndSet(true, false) )
                         {
-                            semaphore.acquire();
-                            mutex.acquire();
-                            Assert.assertTrue(hasLock.compareAndSet(false, true));
-                            try
+                            semaphore.release(THREAD_QTY - 1);
+                            while ( semaphore.availablePermits() > 0 )
                             {
-                                if ( isFirst.compareAndSet(true, false) )
-                                {
-                                    semaphore.release(THREAD_QTY - 1);
-                                    while ( semaphore.availablePermits() > 0 )
-                                    {
-                                        Thread.sleep(100);
-                                    }
-                                }
-                                else
-                                {
-                                    Thread.sleep(100);
-                                }
+                                Thread.sleep(100);
                             }
-                            finally
-                            {
-                                mutex.release();
-                                hasLock.set(false);
-                            }
-                            return null;
+                        }
+                        else
+                        {
+                            Thread.sleep(100);
                         }
                     }
-                );
+                    finally
+                    {
+                        mutex.release();
+                        hasLock.set(false);
+                    }
+                    return null;
+                }
+            });
             threads.add(t);
         }
 
@@ -329,46 +311,43 @@ public class TestLocks extends BaseClassForTests
     }
 
     @Test
-    public void     testBasicReadWriteLock() throws Exception
+    public void testBasicReadWriteLock() throws Exception
     {
-        final int               CONCURRENCY = 8;
-        final int               ITERATIONS = 100;
+        final int CONCURRENCY = 8;
+        final int ITERATIONS = 100;
 
         final Random random = new Random();
         final AtomicInteger concurrentCount = new AtomicInteger(0);
-        final AtomicInteger     maxConcurrentCount = new AtomicInteger(0);
-        final AtomicInteger     writeCount = new AtomicInteger(0);
-        final AtomicInteger     readCount = new AtomicInteger(0);
+        final AtomicInteger maxConcurrentCount = new AtomicInteger(0);
+        final AtomicInteger writeCount = new AtomicInteger(0);
+        final AtomicInteger readCount = new AtomicInteger(0);
 
-        List<Future<Void>>  futures = Lists.newArrayList();
-        ExecutorService     service = Executors.newCachedThreadPool();
+        List<Future<Void>> futures = Lists.newArrayList();
+        ExecutorService service = Executors.newCachedThreadPool();
         for ( int i = 0; i < CONCURRENCY; ++i )
         {
-            Future<Void>    future = service.submit
-                (
-                    new Callable<Void>()
+            Future<Void> future = service.submit(new Callable<Void>()
+            {
+                @Override
+                public Void call() throws Exception
+                {
+                    InterProcessReadWriteLockBridge lock = new InterProcessReadWriteLockBridge(restClient, sessionManager, uriMaker, "/lock");
+                    for ( int i = 0; i < ITERATIONS; ++i )
                     {
-                        @Override
-                        public Void call() throws Exception
+                        if ( random.nextInt(100) < 10 )
                         {
-                            InterProcessReadWriteLockBridge lock = new InterProcessReadWriteLockBridge(restClient, sessionManager, uriMaker, "/lock");
-                            for ( int i = 0; i < ITERATIONS; ++i )
-                            {
-                                if ( random.nextInt(100) < 10 )
-                                {
-                                    doLocking(lock.writeLock(), concurrentCount, maxConcurrentCount, random, 1);
-                                    writeCount.incrementAndGet();
-                                }
-                                else
-                                {
-                                    doLocking(lock.readLock(), concurrentCount, maxConcurrentCount, random, Integer.MAX_VALUE);
-                                    readCount.incrementAndGet();
-                                }
-                            }
-                            return null;
+                            doLocking(lock.writeLock(), concurrentCount, maxConcurrentCount, random, 1);
+                            writeCount.incrementAndGet();
+                        }
+                        else
+                        {
+                            doLocking(lock.readLock(), concurrentCount, maxConcurrentCount, random, Integer.MAX_VALUE);
+                            readCount.incrementAndGet();
                         }
                     }
-                );
+                    return null;
+                }
+            });
             futures.add(future);
         }
 
@@ -389,7 +368,7 @@ public class TestLocks extends BaseClassForTests
         try
         {
             Assert.assertTrue(lock.acquire(10, TimeUnit.SECONDS));
-            int     localConcurrentCount;
+            int localConcurrentCount;
             synchronized(this)
             {
                 localConcurrentCount = concurrentCount.incrementAndGet();