You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@curator.apache.org by ra...@apache.org on 2013/09/06 08:06:41 UTC
git commit: InterProcessSemaphoreMutex changed to use
InterProcessSemaphoreV2. In the process,
a bug in InterProcessSemaphoreV2 was noticed that was been long fixed in the
older InterProcessSemaphore. If the session drops in the process of creating
the l
Updated Branches:
refs/heads/CURATOR-48 [created] 86e442a1d
InterProcessSemaphoreMutex changed to use InterProcessSemaphoreV2. In the process, a bug in InterProcessSemaphoreV2 was
noticed that was been long fixed in the older InterProcessSemaphore. If the session drops in the process of creating the lock
node, the acquire fails. The old InterProcessSemaphore worked around this by using a retry loop to try to recreate the lock
node. InterProcessSemaphoreV2 now does this as well.
Also, as part of this checkin, testWaitingProcessKilledServer has been tightened and made to be more reliable.
Project: http://git-wip-us.apache.org/repos/asf/incubator-curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-curator/commit/86e442a1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-curator/tree/86e442a1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-curator/diff/86e442a1
Branch: refs/heads/CURATOR-48
Commit: 86e442a1d9a0907c12f21787ba26bc6be9b80e7b
Parents: aa17424
Author: randgalt <ra...@apache.org>
Authored: Thu Sep 5 23:05:11 2013 -0700
Committer: randgalt <ra...@apache.org>
Committed: Thu Sep 5 23:05:11 2013 -0700
----------------------------------------------------------------------
.../locks/InterProcessSemaphoreMutex.java | 4 +-
.../recipes/locks/InterProcessSemaphoreV2.java | 268 ++++++++++--------
.../locks/TestInterProcessMutexBase.java | 272 ++++++++++---------
3 files changed, 296 insertions(+), 248 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-curator/blob/86e442a1/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/InterProcessSemaphoreMutex.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/InterProcessSemaphoreMutex.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/InterProcessSemaphoreMutex.java
index 29416d5..88b5f5d 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/InterProcessSemaphoreMutex.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/InterProcessSemaphoreMutex.java
@@ -28,7 +28,7 @@ import java.util.concurrent.TimeUnit;
*/
public class InterProcessSemaphoreMutex implements InterProcessLock
{
- private final InterProcessSemaphore semaphore;
+ private final InterProcessSemaphoreV2 semaphore;
private volatile Lease lease;
/**
@@ -37,7 +37,7 @@ public class InterProcessSemaphoreMutex implements InterProcessLock
*/
public InterProcessSemaphoreMutex(CuratorFramework client, String path)
{
- this.semaphore = new InterProcessSemaphore(client, path, 1);
+ this.semaphore = new InterProcessSemaphoreV2(client, path, 1);
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-curator/blob/86e442a1/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/InterProcessSemaphoreV2.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/InterProcessSemaphoreV2.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/InterProcessSemaphoreV2.java
index 5e5acc4..e24b019 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/InterProcessSemaphoreV2.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/InterProcessSemaphoreV2.java
@@ -16,13 +16,16 @@
* specific language governing permissions and limitations
* under the License.
*/
+
package org.apache.curator.framework.recipes.locks;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.io.Closeables;
+import org.apache.curator.RetryLoop;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.api.PathAndBytesable;
+import org.apache.curator.framework.imps.CuratorFrameworkState;
import org.apache.curator.framework.recipes.shared.SharedCountListener;
import org.apache.curator.framework.recipes.shared.SharedCountReader;
import org.apache.curator.framework.state.ConnectionState;
@@ -41,43 +44,43 @@ import java.util.concurrent.TimeUnit;
/**
* <p>
- * A counting semaphore that works across JVMs. All processes
- * in all JVMs that use the same lock path will achieve an inter-process limited set of leases.
- * Further, this semaphore is mostly "fair" - each user will get a lease in the order requested
- * (from ZK's point of view).
+ * A counting semaphore that works across JVMs. All processes
+ * in all JVMs that use the same lock path will achieve an inter-process limited set of leases.
+ * Further, this semaphore is mostly "fair" - each user will get a lease in the order requested
+ * (from ZK's point of view).
* </p>
- *
+ * <p/>
* <p>
- * There are two modes for determining the max leases for the semaphore. In the first mode the
- * max leases is a convention maintained by the users of a given path. In the second mode a
- * {@link SharedCountReader} is used as the method for semaphores of a given path to determine
- * the max leases.
+ * There are two modes for determining the max leases for the semaphore. In the first mode the
+ * max leases is a convention maintained by the users of a given path. In the second mode a
+ * {@link SharedCountReader} is used as the method for semaphores of a given path to determine
+ * the max leases.
* </p>
- *
+ * <p/>
* <p>
- * If a {@link SharedCountReader} is <b>not</b> used, no internal checks are done to prevent
- * Process A acting as if there are 10 leases and Process B acting as if there are 20. Therefore,
- * make sure that all instances in all processes use the same numberOfLeases value.
+ * If a {@link SharedCountReader} is <b>not</b> used, no internal checks are done to prevent
+ * Process A acting as if there are 10 leases and Process B acting as if there are 20. Therefore,
+ * make sure that all instances in all processes use the same numberOfLeases value.
* </p>
- *
+ * <p/>
* <p>
- * The various acquire methods return {@link Lease} objects that represent acquired leases. Clients
- * must take care to close lease objects (ideally in a <code>finally</code>
- * block) else the lease will be lost. However, if the client session drops (crash, etc.),
- * any leases held by the client are automatically closed and made available to other clients.
+ * The various acquire methods return {@link Lease} objects that represent acquired leases. Clients
+ * must take care to close lease objects (ideally in a <code>finally</code>
+ * block) else the lease will be lost. However, if the client session drops (crash, etc.),
+ * any leases held by the client are automatically closed and made available to other clients.
* </p>
- *
+ * <p/>
* <p>
- * Thanks to Ben Bangert (ben@groovie.org) for the algorithm used.
+ * Thanks to Ben Bangert (ben@groovie.org) for the algorithm used.
* </p>
*/
public class InterProcessSemaphoreV2
{
- private final Logger log = LoggerFactory.getLogger(getClass());
- private final InterProcessMutex lock;
- private final CuratorFramework client;
- private final String leasesPath;
- private final Watcher watcher = new Watcher()
+ private final Logger log = LoggerFactory.getLogger(getClass());
+ private final InterProcessMutex lock;
+ private final CuratorFramework client;
+ private final String leasesPath;
+ private final Watcher watcher = new Watcher()
{
@Override
public void process(WatchedEvent event)
@@ -86,16 +89,16 @@ public class InterProcessSemaphoreV2
}
};
- private volatile byte[] nodeData;
- private volatile int maxLeases;
+ private volatile byte[] nodeData;
+ private volatile int maxLeases;
- private static final String LOCK_PARENT = "locks";
- private static final String LEASE_PARENT = "leases";
- private static final String LEASE_BASE_NAME = "lease-";
+ private static final String LOCK_PARENT = "locks";
+ private static final String LEASE_PARENT = "leases";
+ private static final String LEASE_BASE_NAME = "lease-";
/**
- * @param client the client
- * @param path path for the semaphore
+ * @param client the client
+ * @param path path for the semaphore
* @param maxLeases the max number of leases to allow for this instance
*/
public InterProcessSemaphoreV2(CuratorFramework client, String path, int maxLeases)
@@ -105,8 +108,8 @@ public class InterProcessSemaphoreV2
/**
* @param client the client
- * @param path path for the semaphore
- * @param count the shared count to use for the max leases
+ * @param path path for the semaphore
+ * @param count the shared count to use for the max leases
*/
public InterProcessSemaphoreV2(CuratorFramework client, String path, SharedCountReader count)
{
@@ -123,22 +126,22 @@ public class InterProcessSemaphoreV2
if ( count != null )
{
count.addListener
- (
- new SharedCountListener()
- {
- @Override
- public void countHasChanged(SharedCountReader sharedCount, int newCount) throws Exception
+ (
+ new SharedCountListener()
{
- InterProcessSemaphoreV2.this.maxLeases = newCount;
- }
+ @Override
+ public void countHasChanged(SharedCountReader sharedCount, int newCount) throws Exception
+ {
+ InterProcessSemaphoreV2.this.maxLeases = newCount;
+ }
- @Override
- public void stateChanged(CuratorFramework client, ConnectionState newState)
- {
- // no need to handle this here - clients should set their own connection state listener
+ @Override
+ public void stateChanged(CuratorFramework client, ConnectionState newState)
+ {
+ // no need to handle this here - clients should set their own connection state listener
+ }
}
- }
- );
+ );
}
}
@@ -148,7 +151,7 @@ public class InterProcessSemaphoreV2
*
* @param nodeData node data
*/
- public void setNodeData(byte[] nodeData)
+ public void setNodeData(byte[] nodeData)
{
this.nodeData = (nodeData != null) ? Arrays.copyOf(nodeData, nodeData.length) : null;
}
@@ -159,7 +162,7 @@ public class InterProcessSemaphoreV2
* @return list of nodes
* @throws Exception ZK errors, interruptions, etc.
*/
- public Collection<String> getParticipantNodes() throws Exception
+ public Collection<String> getParticipantNodes() throws Exception
{
return client.getChildren().forPath(ZKPaths.makePath(leasesPath, LEASE_BASE_NAME));
}
@@ -169,7 +172,7 @@ public class InterProcessSemaphoreV2
*
* @param leases leases to close
*/
- public void returnAll(Collection<Lease> leases)
+ public void returnAll(Collection<Lease> leases)
{
for ( Lease l : leases )
{
@@ -182,7 +185,7 @@ public class InterProcessSemaphoreV2
*
* @param lease lease to close
*/
- public void returnLease(Lease lease)
+ public void returnLease(Lease lease)
{
Closeables.closeQuietly(lease);
}
@@ -190,7 +193,7 @@ public class InterProcessSemaphoreV2
/**
* <p>Acquire a lease. If no leases are available, this method blocks until either the maximum
* number of leases is increased or another client/process closes a lease.</p>
- *
+ * <p/>
* <p>The client must close the lease when it is done with it. You should do this in a
* <code>finally</code> block.</p>
*
@@ -207,7 +210,7 @@ public class InterProcessSemaphoreV2
* <p>Acquire <code>qty</code> leases. If there are not enough leases available, this method
* blocks until either the maximum number of leases is increased enough or other clients/processes
* close enough leases.</p>
- *
+ * <p/>
* <p>The client must close the leases when it is done with them. You should do this in a
* <code>finally</code> block. NOTE: You can use {@link #returnAll(Collection)} for this.</p>
*
@@ -224,7 +227,7 @@ public class InterProcessSemaphoreV2
* <p>Acquire a lease. If no leases are available, this method blocks until either the maximum
* number of leases is increased or another client/process closes a lease. However, this method
* will only block to a maximum of the time parameters given.</p>
- *
+ * <p/>
* <p>The client must close the lease when it is done with it. You should do this in a
* <code>finally</code> block.</p>
*
@@ -245,11 +248,11 @@ public class InterProcessSemaphoreV2
* close enough leases. However, this method will only block to a maximum of the time
* parameters given. If time expires before all leases are acquired, the subset of acquired
* leases are automatically closed.</p>
- *
+ * <p/>
* <p>The client must close the leases when it is done with them. You should do this in a
* <code>finally</code> block. NOTE: You can use {@link #returnAll(Collection)} for this.</p>
*
- * @param qty number of leases to acquire
+ * @param qty number of leases to acquire
* @param time time to wait
* @param unit time unit
* @return the new leases or null if time ran out
@@ -257,77 +260,49 @@ public class InterProcessSemaphoreV2
*/
public Collection<Lease> acquire(int qty, long time, TimeUnit unit) throws Exception
{
- long startMs = System.currentTimeMillis();
- boolean hasWait = (unit != null);
- long waitMs = hasWait ? TimeUnit.MILLISECONDS.convert(time, unit) : 0;
+ long startMs = System.currentTimeMillis();
+ boolean hasWait = (unit != null);
+ long waitMs = hasWait ? TimeUnit.MILLISECONDS.convert(time, unit) : 0;
Preconditions.checkArgument(qty > 0, "qty cannot be 0");
- ImmutableList.Builder<Lease> builder = ImmutableList.builder();
- boolean success = false;
+ ImmutableList.Builder<Lease> builder = ImmutableList.builder();
+ boolean success = false;
try
{
while ( qty-- > 0 )
{
- if ( !client.isStarted() )
+ int retryCount = 0;
+ long startMillis = System.currentTimeMillis();
+ boolean isDone = false;
+ while ( !isDone )
{
- return null;
- }
-
- if ( hasWait )
- {
- long thisWaitMs = getThisWaitMs(startMs, waitMs);
- if ( !lock.acquire(thisWaitMs, TimeUnit.MILLISECONDS) )
+ switch ( internalAcquire1Lease(builder, startMs, hasWait, waitMs) )
{
- return null;
- }
- }
- else
- {
- lock.acquire();
- }
- try
- {
- PathAndBytesable<String> createBuilder = client.create().creatingParentsIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL);
- String path = (nodeData != null) ? createBuilder.forPath(ZKPaths.makePath(leasesPath, LEASE_BASE_NAME), nodeData) : createBuilder.forPath(ZKPaths.makePath(leasesPath, LEASE_BASE_NAME));
- String nodeName = ZKPaths.getNodeFromPath(path);
- builder.add(makeLease(path));
+ case CONTINUE:
+ {
+ isDone = true;
+ break;
+ }
- synchronized(this)
- {
- for(;;)
+ case RETURN_NULL:
{
- List<String> children = client.getChildren().usingWatcher(watcher).forPath(leasesPath);
- if ( !children.contains(nodeName) )
- {
- log.error("Sequential path not found: " + path);
- throw new KeeperException.NoNodeException("Sequential path not found: " + path);
- }
+ return null;
+ }
- if ( children.size() <= maxLeases )
- {
- break;
- }
- if ( hasWait )
- {
- long thisWaitMs = getThisWaitMs(startMs, waitMs);
- if ( thisWaitMs <= 0 )
- {
- return null;
- }
- wait(thisWaitMs);
- }
- else
+ case RETRY_DUE_TO_MISSING_NODE:
+ {
+ // gets thrown by internalAcquire1Lease when it can't find the lock node
+ // this can happen when the session expires, etc. So, if the retry allows, just try it all again
+ if ( !client.getZookeeperClient().getRetryPolicy().allowRetry(retryCount++, System.currentTimeMillis() - startMillis, RetryLoop.getDefaultRetrySleeper()) )
{
- wait();
+ throw new KeeperException.NoNodeException("Sequential path not found - possible session loss");
}
+ // try again
+ break;
}
}
}
- finally
- {
- lock.release();
- }
}
success = true;
}
@@ -342,9 +317,80 @@ public class InterProcessSemaphoreV2
return builder.build();
}
+ private enum InternalAcquireResult
+ {
+ CONTINUE,
+ RETURN_NULL,
+ RETRY_DUE_TO_MISSING_NODE
+ }
+
+ private InternalAcquireResult internalAcquire1Lease(ImmutableList.Builder<Lease> builder, long startMs, boolean hasWait, long waitMs) throws Exception
+ {
+ if ( client.getState() != CuratorFrameworkState.STARTED )
+ {
+ return InternalAcquireResult.RETURN_NULL;
+ }
+
+ if ( hasWait )
+ {
+ long thisWaitMs = getThisWaitMs(startMs, waitMs);
+ if ( !lock.acquire(thisWaitMs, TimeUnit.MILLISECONDS) )
+ {
+ return InternalAcquireResult.RETURN_NULL;
+ }
+ }
+ else
+ {
+ lock.acquire();
+ }
+ try
+ {
+ PathAndBytesable<String> createBuilder = client.create().creatingParentsIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL);
+ String path = (nodeData != null) ? createBuilder.forPath(ZKPaths.makePath(leasesPath, LEASE_BASE_NAME), nodeData) : createBuilder.forPath(ZKPaths.makePath(leasesPath, LEASE_BASE_NAME));
+ String nodeName = ZKPaths.getNodeFromPath(path);
+ builder.add(makeLease(path));
+
+ synchronized(this)
+ {
+ for(;;)
+ {
+ List<String> children = client.getChildren().usingWatcher(watcher).forPath(leasesPath);
+ if ( !children.contains(nodeName) )
+ {
+ log.error("Sequential path not found: " + path);
+ return InternalAcquireResult.RETRY_DUE_TO_MISSING_NODE;
+ }
+
+ if ( children.size() <= maxLeases )
+ {
+ break;
+ }
+ if ( hasWait )
+ {
+ long thisWaitMs = getThisWaitMs(startMs, waitMs);
+ if ( thisWaitMs <= 0 )
+ {
+ return InternalAcquireResult.RETURN_NULL;
+ }
+ wait(thisWaitMs);
+ }
+ else
+ {
+ wait();
+ }
+ }
+ }
+ }
+ finally
+ {
+ lock.release();
+ }
+ return InternalAcquireResult.CONTINUE;
+ }
+
private long getThisWaitMs(long startMs, long waitMs)
{
- long elapsedMs = System.currentTimeMillis() - startMs;
+ long elapsedMs = System.currentTimeMillis() - startMs;
return waitMs - elapsedMs;
}
http://git-wip-us.apache.org/repos/asf/incubator-curator/blob/86e442a1/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessMutexBase.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessMutexBase.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessMutexBase.java
index c321607..73b530c 100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessMutexBase.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessMutexBase.java
@@ -16,6 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
+
package org.apache.curator.framework.recipes.locks;
import com.google.common.collect.Lists;
@@ -25,6 +26,7 @@ import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.BaseClassForTests;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.framework.state.ConnectionStateListener;
+import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.retry.RetryOneTime;
import org.apache.curator.test.KillSession;
import org.apache.curator.test.TestingServer;
@@ -45,22 +47,22 @@ import java.util.concurrent.atomic.AtomicReference;
public abstract class TestInterProcessMutexBase extends BaseClassForTests
{
- private volatile CountDownLatch waitLatchForBar = null;
- private volatile CountDownLatch countLatchForBar = null;
+ private volatile CountDownLatch waitLatchForBar = null;
+ private volatile CountDownLatch countLatchForBar = null;
- protected abstract InterProcessLock makeLock(CuratorFramework client);
+ protected abstract InterProcessLock makeLock(CuratorFramework client);
@Test
- public void testWaitingProcessKilledServer() throws Exception
+ public void testWaitingProcessKilledServer() throws Exception
{
- final Timing timing = new Timing();
- final CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1));
+ final Timing timing = new Timing();
+ final CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new ExponentialBackoffRetry(100, 3));
try
{
client.start();
- final CountDownLatch latch = new CountDownLatch(1);
- ConnectionStateListener listener = new ConnectionStateListener()
+ final CountDownLatch latch = new CountDownLatch(1);
+ ConnectionStateListener listener = new ConnectionStateListener()
{
@Override
public void stateChanged(CuratorFramework client, ConnectionState newState)
@@ -73,8 +75,8 @@ public abstract class TestInterProcessMutexBase extends BaseClassForTests
};
client.getConnectionStateListenable().addListener(listener);
- final AtomicBoolean isFirst = new AtomicBoolean(true);
- ExecutorCompletionService<Object> service = new ExecutorCompletionService<Object>(Executors.newFixedThreadPool(2));
+ final AtomicBoolean isFirst = new AtomicBoolean(true);
+ ExecutorCompletionService<Object> service = new ExecutorCompletionService<Object>(Executors.newFixedThreadPool(2));
for ( int i = 0; i < 2; ++i )
{
service.submit
@@ -126,9 +128,9 @@ public abstract class TestInterProcessMutexBase extends BaseClassForTests
}
@Test
- public void testKilledSession() throws Exception
+ public void testKilledSession() throws Exception
{
- final Timing timing = new Timing();
+ final Timing timing = new Timing();
CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
client.start();
@@ -140,34 +142,34 @@ public abstract class TestInterProcessMutexBase extends BaseClassForTests
final Semaphore semaphore = new Semaphore(0);
ExecutorCompletionService<Object> service = new ExecutorCompletionService<Object>(Executors.newFixedThreadPool(2));
service.submit
- (
- new Callable<Object>()
- {
- @Override
- public Object call() throws Exception
+ (
+ new Callable<Object>()
{
- mutex1.acquire();
- semaphore.release();
- Thread.sleep(1000000);
- return null;
+ @Override
+ public Object call() throws Exception
+ {
+ mutex1.acquire();
+ semaphore.release();
+ Thread.sleep(1000000);
+ return null;
+ }
}
- }
- );
+ );
service.submit
- (
- new Callable<Object>()
- {
- @Override
- public Object call() throws Exception
+ (
+ new Callable<Object>()
{
- mutex2.acquire();
- semaphore.release();
- Thread.sleep(1000000);
- return null;
+ @Override
+ public Object call() throws Exception
+ {
+ mutex2.acquire();
+ semaphore.release();
+ Thread.sleep(1000000);
+ return null;
+ }
}
- }
- );
+ );
Assert.assertTrue(timing.acquireSemaphore(semaphore, 1));
KillSession.kill(client.getZookeeperClient().getZooKeeper(), server.getConnectString());
@@ -180,7 +182,7 @@ public abstract class TestInterProcessMutexBase extends BaseClassForTests
}
@Test
- public void testWithNamespace() throws Exception
+ public void testWithNamespace() throws Exception
{
CuratorFramework client = CuratorFrameworkFactory.builder().
connectString(server.getConnectString()).
@@ -202,57 +204,57 @@ public abstract class TestInterProcessMutexBase extends BaseClassForTests
}
@Test
- public void testReentrantSingleLock() throws Exception
+ public void testReentrantSingleLock() throws Exception
{
- final int THREAD_QTY = 10;
-
+ final int THREAD_QTY = 10;
+
CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1));
client.start();
try
{
- final AtomicBoolean hasLock = new AtomicBoolean(false);
- final AtomicBoolean isFirst = new AtomicBoolean(true);
- final Semaphore semaphore = new Semaphore(1);
- final InterProcessLock mutex = makeLock(client);
+ final AtomicBoolean hasLock = new AtomicBoolean(false);
+ final AtomicBoolean isFirst = new AtomicBoolean(true);
+ final Semaphore semaphore = new Semaphore(1);
+ final InterProcessLock mutex = makeLock(client);
- List<Future<Object>> threads = Lists.newArrayList();
- ExecutorService service = Executors.newCachedThreadPool();
+ List<Future<Object>> threads = Lists.newArrayList();
+ ExecutorService service = Executors.newCachedThreadPool();
for ( int i = 0; i < THREAD_QTY; ++i )
{
- Future<Object> t = service.submit
- (
- new Callable<Object>()
- {
- @Override
- public Object call() throws Exception
+ Future<Object> t = service.submit
+ (
+ new Callable<Object>()
{
- semaphore.acquire();
- mutex.acquire();
- Assert.assertTrue(hasLock.compareAndSet(false, true));
- try
+ @Override
+ public Object call() throws Exception
{
- if ( isFirst.compareAndSet(true, false) )
+ semaphore.acquire();
+ mutex.acquire();
+ Assert.assertTrue(hasLock.compareAndSet(false, true));
+ try
{
- semaphore.release(THREAD_QTY - 1);
- while ( semaphore.availablePermits() > 0 )
+ if ( isFirst.compareAndSet(true, false) )
+ {
+ semaphore.release(THREAD_QTY - 1);
+ while ( semaphore.availablePermits() > 0 )
+ {
+ Thread.sleep(100);
+ }
+ }
+ else
{
Thread.sleep(100);
}
}
- else
+ finally
{
- Thread.sleep(100);
+ mutex.release();
+ hasLock.set(false);
}
+ return null;
}
- finally
- {
- mutex.release();
- hasLock.set(false);
- }
- return null;
}
- }
- );
+ );
threads.add(t);
}
@@ -268,7 +270,7 @@ public abstract class TestInterProcessMutexBase extends BaseClassForTests
}
@Test
- public void testReentrant2Threads() throws Exception
+ public void testReentrant2Threads() throws Exception
{
CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1));
client.start();
@@ -279,30 +281,30 @@ public abstract class TestInterProcessMutexBase extends BaseClassForTests
final InterProcessLock mutex = makeLock(client);
Executors.newSingleThreadExecutor().submit
- (
- new Callable<Object>()
- {
- @Override
- public Object call() throws Exception
+ (
+ new Callable<Object>()
{
- Assert.assertTrue(countLatchForBar.await(10, TimeUnit.SECONDS));
- try
- {
- mutex.acquire(10, TimeUnit.SECONDS);
- Assert.fail();
- }
- catch ( Exception e )
- {
- // correct
- }
- finally
+ @Override
+ public Object call() throws Exception
{
- waitLatchForBar.countDown();
+ Assert.assertTrue(countLatchForBar.await(10, TimeUnit.SECONDS));
+ try
+ {
+ mutex.acquire(10, TimeUnit.SECONDS);
+ Assert.fail();
+ }
+ catch ( Exception e )
+ {
+ // correct
+ }
+ finally
+ {
+ waitLatchForBar.countDown();
+ }
+ return null;
}
- return null;
}
- }
- );
+ );
foo(mutex);
Assert.assertFalse(mutex.isAcquiredInThisProcess());
@@ -314,7 +316,7 @@ public abstract class TestInterProcessMutexBase extends BaseClassForTests
}
@Test
- public void testReentrant() throws Exception
+ public void testReentrant() throws Exception
{
CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1));
client.start();
@@ -330,7 +332,7 @@ public abstract class TestInterProcessMutexBase extends BaseClassForTests
}
}
- private void foo(InterProcessLock mutex) throws Exception
+ private void foo(InterProcessLock mutex) throws Exception
{
mutex.acquire(10, TimeUnit.SECONDS);
Assert.assertTrue(mutex.isAcquiredInThisProcess());
@@ -339,7 +341,7 @@ public abstract class TestInterProcessMutexBase extends BaseClassForTests
mutex.release();
}
- private void bar(InterProcessLock mutex) throws Exception
+ private void bar(InterProcessLock mutex) throws Exception
{
mutex.acquire(10, TimeUnit.SECONDS);
Assert.assertTrue(mutex.isAcquiredInThisProcess());
@@ -353,7 +355,7 @@ public abstract class TestInterProcessMutexBase extends BaseClassForTests
mutex.release();
}
- private void snafu(InterProcessLock mutex) throws Exception
+ private void snafu(InterProcessLock mutex) throws Exception
{
mutex.acquire(10, TimeUnit.SECONDS);
Assert.assertTrue(mutex.isAcquiredInThisProcess());
@@ -362,7 +364,7 @@ public abstract class TestInterProcessMutexBase extends BaseClassForTests
}
@Test
- public void test2Clients() throws Exception
+ public void test2Clients() throws Exception
{
CuratorFramework client1 = null;
CuratorFramework client2 = null;
@@ -376,58 +378,58 @@ public abstract class TestInterProcessMutexBase extends BaseClassForTests
final InterProcessLock mutexForClient1 = makeLock(client1);
final InterProcessLock mutexForClient2 = makeLock(client2);
- final CountDownLatch latchForClient1 = new CountDownLatch(1);
- final CountDownLatch latchForClient2 = new CountDownLatch(1);
- final CountDownLatch acquiredLatchForClient1 = new CountDownLatch(1);
- final CountDownLatch acquiredLatchForClient2 = new CountDownLatch(1);
+ final CountDownLatch latchForClient1 = new CountDownLatch(1);
+ final CountDownLatch latchForClient2 = new CountDownLatch(1);
+ final CountDownLatch acquiredLatchForClient1 = new CountDownLatch(1);
+ final CountDownLatch acquiredLatchForClient2 = new CountDownLatch(1);
- final AtomicReference<Exception> exceptionRef = new AtomicReference<Exception>();
+ final AtomicReference<Exception> exceptionRef = new AtomicReference<Exception>();
- ExecutorService service = Executors.newCachedThreadPool();
- Future<Object> future1 = service.submit
- (
- new Callable<Object>()
- {
- @Override
- public Object call() throws Exception
+ ExecutorService service = Executors.newCachedThreadPool();
+ Future<Object> future1 = service.submit
+ (
+ new Callable<Object>()
{
- try
- {
- mutexForClient1.acquire(10, TimeUnit.SECONDS);
- acquiredLatchForClient1.countDown();
- latchForClient1.await(10, TimeUnit.SECONDS);
- mutexForClient1.release();
- }
- catch ( Exception e )
+ @Override
+ public Object call() throws Exception
{
- exceptionRef.set(e);
+ try
+ {
+ mutexForClient1.acquire(10, TimeUnit.SECONDS);
+ acquiredLatchForClient1.countDown();
+ latchForClient1.await(10, TimeUnit.SECONDS);
+ mutexForClient1.release();
+ }
+ catch ( Exception e )
+ {
+ exceptionRef.set(e);
+ }
+ return null;
}
- return null;
}
- }
- );
- Future<Object> future2 = service.submit
- (
- new Callable<Object>()
- {
- @Override
- public Object call() throws Exception
+ );
+ Future<Object> future2 = service.submit
+ (
+ new Callable<Object>()
{
- try
- {
- mutexForClient2.acquire(10, TimeUnit.SECONDS);
- acquiredLatchForClient2.countDown();
- latchForClient2.await(10, TimeUnit.SECONDS);
- mutexForClient2.release();
- }
- catch ( Exception e )
+ @Override
+ public Object call() throws Exception
{
- exceptionRef.set(e);
+ try
+ {
+ mutexForClient2.acquire(10, TimeUnit.SECONDS);
+ acquiredLatchForClient2.countDown();
+ latchForClient2.await(10, TimeUnit.SECONDS);
+ mutexForClient2.release();
+ }
+ catch ( Exception e )
+ {
+ exceptionRef.set(e);
+ }
+ return null;
}
- return null;
}
- }
- );
+ );
while ( !mutexForClient1.isAcquiredInThisProcess() && !mutexForClient2.isAcquiredInThisProcess() )
{