You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@curator.apache.org by dr...@apache.org on 2015/08/19 01:18:45 UTC
[29/31] curator git commit: Added WatcherRemoveCuratorFramework to
locks and updated tests to check for cleanliness
Added WatcherRemoveCuratorFramework to locks and updated tests to check for cleanliness
Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/f0a09db4
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/f0a09db4
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/f0a09db4
Branch: refs/heads/CURATOR-3.0
Commit: f0a09db4423f06455ed93c20778c65aaf7e8b06e
Parents: 2c921d6
Author: randgalt <ra...@apache.org>
Authored: Tue May 19 22:42:14 2015 -0700
Committer: randgalt <ra...@apache.org>
Committed: Tue May 19 22:42:14 2015 -0700
----------------------------------------------------------------------
.../framework/imps/WatcherRemovalFacade.java | 3 +-
.../locks/InterProcessSemaphoreMutex.java | 6 +-
.../recipes/locks/InterProcessSemaphoreV2.java | 54 +++--
.../framework/recipes/locks/LockInternals.java | 9 +-
.../locks/TestInterProcessMultiMutex.java | 7 +-
.../recipes/locks/TestInterProcessMutex.java | 5 +-
.../locks/TestInterProcessMutexBase.java | 23 +-
.../locks/TestInterProcessReadWriteLock.java | 223 +++++++++++--------
.../locks/TestInterProcessSemaphore.java | 27 ++-
.../locks/TestInterProcessSemaphoreCluster.java | 3 +-
.../framework/recipes/locks/TestLockACLs.java | 3 +-
.../locks/TestLockCleanlinessWithFaults.java | 3 +-
12 files changed, 213 insertions(+), 153 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/curator/blob/f0a09db4/curator-framework/src/main/java/org/apache/curator/framework/imps/WatcherRemovalFacade.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/WatcherRemovalFacade.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/WatcherRemovalFacade.java
index eee423f..156341e 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/WatcherRemovalFacade.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/WatcherRemovalFacade.java
@@ -16,6 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
+
package org.apache.curator.framework.imps;
import org.apache.curator.CuratorZookeeperClient;
@@ -45,7 +46,7 @@ class WatcherRemovalFacade extends CuratorFrameworkImpl implements WatcherRemove
@Override
public WatcherRemoveCuratorFramework newWatcherRemoveCuratorFramework()
{
- throw new UnsupportedOperationException();
+ return client.newWatcherRemoveCuratorFramework();
}
WatcherRemovalManager getRemovalManager()
http://git-wip-us.apache.org/repos/asf/curator/blob/f0a09db4/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/InterProcessSemaphoreMutex.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/InterProcessSemaphoreMutex.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/InterProcessSemaphoreMutex.java
index 88b5f5d..444b10d 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/InterProcessSemaphoreMutex.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/InterProcessSemaphoreMutex.java
@@ -20,6 +20,7 @@ package org.apache.curator.framework.recipes.locks;
import com.google.common.base.Preconditions;
import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.WatcherRemoveCuratorFramework;
import java.util.concurrent.TimeUnit;
/**
@@ -29,6 +30,7 @@ import java.util.concurrent.TimeUnit;
public class InterProcessSemaphoreMutex implements InterProcessLock
{
private final InterProcessSemaphoreV2 semaphore;
+ private final WatcherRemoveCuratorFramework watcherRemoveClient;
private volatile Lease lease;
/**
@@ -37,7 +39,8 @@ public class InterProcessSemaphoreMutex implements InterProcessLock
*/
public InterProcessSemaphoreMutex(CuratorFramework client, String path)
{
- this.semaphore = new InterProcessSemaphoreV2(client, path, 1);
+ watcherRemoveClient = client.newWatcherRemoveCuratorFramework();
+ this.semaphore = new InterProcessSemaphoreV2(watcherRemoveClient, path, 1);
}
@Override
@@ -66,6 +69,7 @@ public class InterProcessSemaphoreMutex implements InterProcessLock
try
{
lease.close();
+ watcherRemoveClient.removeWatchers();
}
finally
{
http://git-wip-us.apache.org/repos/asf/curator/blob/f0a09db4/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/InterProcessSemaphoreV2.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/InterProcessSemaphoreV2.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/InterProcessSemaphoreV2.java
index 2e14ee1..2a55107 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/InterProcessSemaphoreV2.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/InterProcessSemaphoreV2.java
@@ -21,6 +21,7 @@ package org.apache.curator.framework.recipes.locks;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
+import org.apache.curator.framework.WatcherRemoveCuratorFramework;
import org.apache.curator.utils.CloseableUtils;
import org.apache.curator.RetryLoop;
import org.apache.curator.framework.CuratorFramework;
@@ -75,7 +76,7 @@ public class InterProcessSemaphoreV2
{
private final Logger log = LoggerFactory.getLogger(getClass());
private final InterProcessMutex lock;
- private final CuratorFramework client;
+ private final WatcherRemoveCuratorFramework client;
private final String leasesPath;
private final Watcher watcher = new Watcher()
{
@@ -115,7 +116,7 @@ public class InterProcessSemaphoreV2
private InterProcessSemaphoreV2(CuratorFramework client, String path, int maxLeases, SharedCountReader count)
{
- this.client = client;
+ this.client = client.newWatcherRemoveCuratorFramework();
path = PathUtils.validatePath(path);
lock = new InterProcessMutex(client, ZKPaths.makePath(path, LOCK_PARENT));
this.maxLeases = (count != null) ? count.getCount() : maxLeases;
@@ -345,36 +346,43 @@ public class InterProcessSemaphoreV2
String nodeName = ZKPaths.getNodeFromPath(path);
builder.add(makeLease(path));
- synchronized(this)
+ try
{
- for(;;)
+ synchronized(this)
{
- List<String> children = client.getChildren().usingWatcher(watcher).forPath(leasesPath);
- if ( !children.contains(nodeName) )
+ for(;;)
{
- log.error("Sequential path not found: " + path);
- return InternalAcquireResult.RETRY_DUE_TO_MISSING_NODE;
- }
+ List<String> children = client.getChildren().usingWatcher(watcher).forPath(leasesPath);
+ if ( !children.contains(nodeName) )
+ {
+ log.error("Sequential path not found: " + path);
+ return InternalAcquireResult.RETRY_DUE_TO_MISSING_NODE;
+ }
- if ( children.size() <= maxLeases )
- {
- break;
- }
- if ( hasWait )
- {
- long thisWaitMs = getThisWaitMs(startMs, waitMs);
- if ( thisWaitMs <= 0 )
+ if ( children.size() <= maxLeases )
{
- return InternalAcquireResult.RETURN_NULL;
+ break;
+ }
+ if ( hasWait )
+ {
+ long thisWaitMs = getThisWaitMs(startMs, waitMs);
+ if ( thisWaitMs <= 0 )
+ {
+ return InternalAcquireResult.RETURN_NULL;
+ }
+ wait(thisWaitMs);
+ }
+ else
+ {
+ wait();
}
- wait(thisWaitMs);
- }
- else
- {
- wait();
}
}
}
+ finally
+ {
+ client.removeWatchers();
+ }
}
finally
{
http://git-wip-us.apache.org/repos/asf/curator/blob/f0a09db4/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/LockInternals.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/LockInternals.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/LockInternals.java
index 2b4d3d9..4b0da11 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/LockInternals.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/LockInternals.java
@@ -24,11 +24,11 @@ import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import org.apache.curator.RetryLoop;
import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.WatcherRemoveCuratorFramework;
import org.apache.curator.framework.api.CuratorWatcher;
import org.apache.curator.framework.imps.CuratorFrameworkState;
import org.apache.curator.utils.PathUtils;
import org.apache.curator.utils.ZKPaths;
-import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
@@ -42,7 +42,7 @@ import java.util.concurrent.atomic.AtomicReference;
public class LockInternals
{
- private final CuratorFramework client;
+ private final WatcherRemoveCuratorFramework client;
private final String path;
private final String basePath;
private final LockInternalsDriver driver;
@@ -100,7 +100,7 @@ public class LockInternals
this.lockName = lockName;
this.maxLeases = maxLeases;
- this.client = client;
+ this.client = client.newWatcherRemoveCuratorFramework();
this.basePath = PathUtils.validatePath(path);
this.path = ZKPaths.makePath(path, lockName);
}
@@ -116,8 +116,9 @@ public class LockInternals
revocable.set(entry);
}
- void releaseLock(String lockPath) throws Exception
+ final void releaseLock(String lockPath) throws Exception
{
+ client.removeWatchers();
revocable.set(null);
deleteOurPath(lockPath);
}
http://git-wip-us.apache.org/repos/asf/curator/blob/f0a09db4/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessMultiMutex.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessMultiMutex.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessMultiMutex.java
index b1631a0..df6a2f5 100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessMultiMutex.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessMultiMutex.java
@@ -20,6 +20,7 @@ package org.apache.curator.framework.recipes.locks;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.framework.imps.TestCleanState;
import org.apache.curator.retry.RetryOneTime;
import org.testng.Assert;
import org.testng.annotations.Test;
@@ -84,13 +85,14 @@ public class TestInterProcessMultiMutex extends TestInterProcessMutexBase
}
catch ( Exception e )
{
+ // ignore
}
Assert.assertFalse(goodLock.isAcquiredInThisProcess());
Assert.assertTrue(otherGoodLock.isAcquiredInThisProcess());
}
finally
{
- client.close();
+ TestCleanState.closeAndTestClean(client);
}
}
@@ -142,13 +144,14 @@ public class TestInterProcessMultiMutex extends TestInterProcessMutexBase
}
catch ( Exception e )
{
+ // ignore
}
Assert.assertFalse(goodLock.isAcquiredInThisProcess());
Assert.assertTrue(goodLockWasLocked.get());
}
finally
{
- client.close();
+ TestCleanState.closeAndTestClean(client);
}
}
}
http://git-wip-us.apache.org/repos/asf/curator/blob/f0a09db4/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessMutex.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessMutex.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessMutex.java
index 453de33..d6f8a1d 100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessMutex.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessMutex.java
@@ -20,6 +20,7 @@ package org.apache.curator.framework.recipes.locks;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.framework.imps.TestCleanState;
import org.apache.curator.retry.RetryOneTime;
import org.apache.curator.test.KillSession;
import org.apache.zookeeper.CreateMode;
@@ -106,7 +107,7 @@ public class TestInterProcessMutex extends TestInterProcessMutexBase
}
finally
{
- client.close();
+ TestCleanState.closeAndTestClean(client);
}
}
@@ -151,7 +152,7 @@ public class TestInterProcessMutex extends TestInterProcessMutexBase
}
finally
{
- client.close();
+ TestCleanState.closeAndTestClean(client);
}
}
}
http://git-wip-us.apache.org/repos/asf/curator/blob/f0a09db4/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessMutexBase.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessMutexBase.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessMutexBase.java
index 3fe8110..49e5d19 100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessMutexBase.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessMutexBase.java
@@ -20,16 +20,14 @@
package org.apache.curator.framework.recipes.locks;
import com.google.common.collect.Lists;
-import org.apache.curator.test.BaseClassForTests;
-import org.apache.curator.utils.CloseableUtils;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.framework.imps.TestCleanState;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.framework.state.ConnectionStateListener;
import org.apache.curator.retry.ExponentialBackoffRetry;
-import org.apache.curator.retry.RetryOneTime;
+import org.apache.curator.test.BaseClassForTests;
import org.apache.curator.test.KillSession;
-import org.apache.curator.test.TestingServer;
import org.apache.curator.test.Timing;
import org.testng.Assert;
import org.testng.annotations.Test;
@@ -123,7 +121,7 @@ public abstract class TestInterProcessMutexBase extends BaseClassForTests
}
finally
{
- CloseableUtils.closeQuietly(client);
+ TestCleanState.closeAndTestClean(client);
}
}
@@ -199,7 +197,7 @@ public abstract class TestInterProcessMutexBase extends BaseClassForTests
}
finally
{
- client.close();
+ TestCleanState.closeAndTestClean(client);
}
}
@@ -265,7 +263,7 @@ public abstract class TestInterProcessMutexBase extends BaseClassForTests
}
finally
{
- client.close();
+ TestCleanState.closeAndTestClean(client);
}
}
@@ -311,7 +309,7 @@ public abstract class TestInterProcessMutexBase extends BaseClassForTests
}
finally
{
- client.close();
+ TestCleanState.closeAndTestClean(client);
}
}
@@ -328,7 +326,7 @@ public abstract class TestInterProcessMutexBase extends BaseClassForTests
}
finally
{
- client.close();
+ TestCleanState.closeAndTestClean(client);
}
}
@@ -460,11 +458,14 @@ public abstract class TestInterProcessMutexBase extends BaseClassForTests
Assert.assertTrue(acquiredLatchForClient1.await(10, TimeUnit.SECONDS));
Assert.assertTrue(mutexForClient1.isAcquiredInThisProcess());
}
+
+ future1.get();
+ future2.get();
}
finally
{
- CloseableUtils.closeQuietly(client1);
- CloseableUtils.closeQuietly(client2);
+ TestCleanState.closeAndTestClean(client1);
+ TestCleanState.closeAndTestClean(client2);
}
}
}
http://git-wip-us.apache.org/repos/asf/curator/blob/f0a09db4/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessReadWriteLock.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessReadWriteLock.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessReadWriteLock.java
index f7636ed..48e4805 100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessReadWriteLock.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessReadWriteLock.java
@@ -16,14 +16,15 @@
* specific language governing permissions and limitations
* under the License.
*/
+
package org.apache.curator.framework.recipes.locks;
import com.google.common.collect.Lists;
-import org.apache.curator.test.BaseClassForTests;
-import org.apache.curator.utils.CloseableUtils;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.framework.imps.TestCleanState;
import org.apache.curator.retry.RetryOneTime;
+import org.apache.curator.test.BaseClassForTests;
import org.testng.Assert;
import org.testng.annotations.Test;
import java.util.Collection;
@@ -31,6 +32,7 @@ import java.util.List;
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
@@ -40,21 +42,22 @@ import java.util.concurrent.atomic.AtomicInteger;
public class TestInterProcessReadWriteLock extends BaseClassForTests
{
@Test
- public void testGetParticipantNodes() throws Exception
+ public void testGetParticipantNodes() throws Exception
{
- final int READERS = 20;
- final int WRITERS = 8;
+ final int READERS = 20;
+ final int WRITERS = 8;
- CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1));
+ CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1));
try
{
client.start();
- final CountDownLatch latch = new CountDownLatch(READERS + WRITERS);
- final CountDownLatch readLatch = new CountDownLatch(READERS);
- final InterProcessReadWriteLock lock = new InterProcessReadWriteLock(client, "/lock");
+ final CountDownLatch latch = new CountDownLatch(READERS + WRITERS);
+ final CountDownLatch readLatch = new CountDownLatch(READERS);
+ final InterProcessReadWriteLock lock = new InterProcessReadWriteLock(client, "/lock");
- ExecutorService service = Executors.newCachedThreadPool();
+ final CountDownLatch exitLatch = new CountDownLatch(1);
+ ExecutorCompletionService<Void> service = new ExecutorCompletionService<Void>(Executors.newCachedThreadPool());
for ( int i = 0; i < READERS; ++i )
{
service.submit
@@ -65,8 +68,16 @@ public class TestInterProcessReadWriteLock extends BaseClassForTests
public Void call() throws Exception
{
lock.readLock().acquire();
- latch.countDown();
- readLatch.countDown();
+ try
+ {
+ latch.countDown();
+ readLatch.countDown();
+ exitLatch.await();
+ }
+ finally
+ {
+ lock.readLock().release();
+ }
return null;
}
}
@@ -84,6 +95,14 @@ public class TestInterProcessReadWriteLock extends BaseClassForTests
Assert.assertTrue(readLatch.await(10, TimeUnit.SECONDS));
latch.countDown(); // must be before as there can only be one writer
lock.writeLock().acquire();
+ try
+ {
+ exitLatch.await();
+ }
+ finally
+ {
+ lock.writeLock().release();
+ }
return null;
}
}
@@ -97,22 +116,28 @@ public class TestInterProcessReadWriteLock extends BaseClassForTests
Assert.assertEquals(readers.size(), READERS);
Assert.assertEquals(writers.size(), WRITERS);
+
+ exitLatch.countDown();
+ for ( int i = 0; i < (READERS + WRITERS); ++i )
+ {
+ service.take().get();
+ }
}
finally
{
- CloseableUtils.closeQuietly(client);
+ TestCleanState.closeAndTestClean(client);
}
}
@Test
- public void testThatUpgradingIsDisallowed() throws Exception
+ public void testThatUpgradingIsDisallowed() throws Exception
{
- CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1));
+ CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1));
try
{
client.start();
- InterProcessReadWriteLock lock = new InterProcessReadWriteLock(client, "/lock");
+ InterProcessReadWriteLock lock = new InterProcessReadWriteLock(client, "/lock");
lock.readLock().acquire();
Assert.assertFalse(lock.writeLock().acquire(5, TimeUnit.SECONDS));
@@ -120,70 +145,80 @@ public class TestInterProcessReadWriteLock extends BaseClassForTests
}
finally
{
- CloseableUtils.closeQuietly(client);
+ TestCleanState.closeAndTestClean(client);
}
}
@Test
- public void testThatDowngradingRespectsThreads() throws Exception
+ public void testThatDowngradingRespectsThreads() throws Exception
{
- CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1));
+ CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1));
try
{
client.start();
- final InterProcessReadWriteLock lock = new InterProcessReadWriteLock(client, "/lock");
- ExecutorService t1 = Executors.newSingleThreadExecutor();
- ExecutorService t2 = Executors.newSingleThreadExecutor();
+ final InterProcessReadWriteLock lock = new InterProcessReadWriteLock(client, "/lock");
+ ExecutorService t1 = Executors.newSingleThreadExecutor();
+ ExecutorService t2 = Executors.newSingleThreadExecutor();
- final CountDownLatch latch = new CountDownLatch(1);
+ final CountDownLatch latch = new CountDownLatch(1);
- Future<Object> f1 = t1.submit
- (
- new Callable<Object>()
- {
- @Override
- public Object call() throws Exception
+ final CountDownLatch releaseLatch = new CountDownLatch(1);
+ Future<Object> f1 = t1.submit
+ (
+ new Callable<Object>()
{
- lock.writeLock().acquire();
- latch.countDown();
- return null;
+ @Override
+ public Object call() throws Exception
+ {
+ lock.writeLock().acquire();
+ latch.countDown();
+ try
+ {
+ releaseLatch.await();
+ }
+ finally
+ {
+ lock.writeLock().release();
+ }
+ return null;
+ }
}
- }
- );
+ );
- Future<Object> f2 = t2.submit
- (
- new Callable<Object>()
- {
- @Override
- public Object call() throws Exception
+ Future<Object> f2 = t2.submit
+ (
+ new Callable<Object>()
{
- Assert.assertTrue(latch.await(10, TimeUnit.SECONDS));
- Assert.assertFalse(lock.readLock().acquire(5, TimeUnit.SECONDS));
- return null;
+ @Override
+ public Object call() throws Exception
+ {
+ Assert.assertTrue(latch.await(10, TimeUnit.SECONDS));
+ Assert.assertFalse(lock.readLock().acquire(5, TimeUnit.SECONDS));
+ return null;
+ }
}
- }
- );
+ );
- f1.get();
f2.get();
+ releaseLatch.countDown();
+ f1.get();
}
finally
{
- CloseableUtils.closeQuietly(client);
+ TestCleanState.closeAndTestClean(client);
}
}
@Test
- public void testDowngrading() throws Exception
+ public void testDowngrading() throws Exception
{
- CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1));
+ CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1));
try
{
client.start();
- InterProcessReadWriteLock lock = new InterProcessReadWriteLock(client, "/lock");
+ InterProcessReadWriteLock lock = new InterProcessReadWriteLock(client, "/lock");
lock.writeLock().acquire();
Assert.assertTrue(lock.readLock().acquire(5, TimeUnit.SECONDS));
lock.writeLock().release();
@@ -192,60 +227,60 @@ public class TestInterProcessReadWriteLock extends BaseClassForTests
}
finally
{
- CloseableUtils.closeQuietly(client);
+ TestCleanState.closeAndTestClean(client);
}
}
@Test
- public void testBasic() throws Exception
+ public void testBasic() throws Exception
{
- final int CONCURRENCY = 8;
- final int ITERATIONS = 100;
+ final int CONCURRENCY = 8;
+ final int ITERATIONS = 100;
- final Random random = new Random();
- final AtomicInteger concurrentCount = new AtomicInteger(0);
- final AtomicInteger maxConcurrentCount = new AtomicInteger(0);
- final AtomicInteger writeCount = new AtomicInteger(0);
- final AtomicInteger readCount = new AtomicInteger(0);
+ final Random random = new Random();
+ final AtomicInteger concurrentCount = new AtomicInteger(0);
+ final AtomicInteger maxConcurrentCount = new AtomicInteger(0);
+ final AtomicInteger writeCount = new AtomicInteger(0);
+ final AtomicInteger readCount = new AtomicInteger(0);
- List<Future<Void>> futures = Lists.newArrayList();
- ExecutorService service = Executors.newCachedThreadPool();
+ List<Future<Void>> futures = Lists.newArrayList();
+ ExecutorService service = Executors.newCachedThreadPool();
for ( int i = 0; i < CONCURRENCY; ++i )
{
- Future<Void> future = service.submit
- (
- new Callable<Void>()
- {
- @Override
- public Void call() throws Exception
+ Future<Void> future = service.submit
+ (
+ new Callable<Void>()
{
- CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1));
- client.start();
- try
+ @Override
+ public Void call() throws Exception
{
- InterProcessReadWriteLock lock = new InterProcessReadWriteLock(client, "/lock");
- for ( int i = 0; i < ITERATIONS; ++i )
+ CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1));
+ client.start();
+ try
{
- if ( random.nextInt(100) < 10 )
+ InterProcessReadWriteLock lock = new InterProcessReadWriteLock(client, "/lock");
+ for ( int i = 0; i < ITERATIONS; ++i )
{
- doLocking(lock.writeLock(), concurrentCount, maxConcurrentCount, random, 1);
- writeCount.incrementAndGet();
- }
- else
- {
- doLocking(lock.readLock(), concurrentCount, maxConcurrentCount, random, Integer.MAX_VALUE);
- readCount.incrementAndGet();
+ if ( random.nextInt(100) < 10 )
+ {
+ doLocking(lock.writeLock(), concurrentCount, maxConcurrentCount, random, 1);
+ writeCount.incrementAndGet();
+ }
+ else
+ {
+ doLocking(lock.readLock(), concurrentCount, maxConcurrentCount, random, Integer.MAX_VALUE);
+ readCount.incrementAndGet();
+ }
}
}
+ finally
+ {
+ TestCleanState.closeAndTestClean(client);
+ }
+ return null;
}
- finally
- {
- CloseableUtils.closeQuietly(client);
- }
- return null;
}
- }
- );
+ );
futures.add(future);
}
@@ -262,17 +297,17 @@ public class TestInterProcessReadWriteLock extends BaseClassForTests
}
@Test
- public void testSetNodeData() throws Exception
+ public void testSetNodeData() throws Exception
{
- CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1));
+ CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1));
try
{
client.start();
- final byte[] nodeData = new byte[] { 1, 2, 3, 4 };
+ final byte[] nodeData = new byte[]{1, 2, 3, 4};
- InterProcessReadWriteLock lock = new InterProcessReadWriteLock(client, "/lock", nodeData);
+ InterProcessReadWriteLock lock = new InterProcessReadWriteLock(client, "/lock", nodeData);
// mutate passed-in node data, lock has made copy
nodeData[0] = 5;
@@ -284,13 +319,13 @@ public class TestInterProcessReadWriteLock extends BaseClassForTests
byte dataInZk[] = client.getData().forPath("/lock/" + children.get(0));
Assert.assertNotNull(dataInZk);
- Assert.assertEquals(new byte[] { 1, 2, 3, 4 }, dataInZk);
+ Assert.assertEquals(new byte[]{1, 2, 3, 4}, dataInZk);
lock.writeLock().release();
}
finally
{
- CloseableUtils.closeQuietly(client);
+ TestCleanState.closeAndTestClean(client);
}
}
@@ -299,7 +334,7 @@ public class TestInterProcessReadWriteLock extends BaseClassForTests
try
{
Assert.assertTrue(lock.acquire(10, TimeUnit.SECONDS));
- int localConcurrentCount;
+ int localConcurrentCount;
synchronized(this)
{
localConcurrentCount = concurrentCount.incrementAndGet();
http://git-wip-us.apache.org/repos/asf/curator/blob/f0a09db4/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessSemaphore.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessSemaphore.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessSemaphore.java
index dd3f98f..2797b5f 100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessSemaphore.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessSemaphore.java
@@ -20,13 +20,14 @@
package org.apache.curator.framework.recipes.locks;
import com.google.common.collect.Lists;
-import org.apache.curator.test.BaseClassForTests;
-import org.apache.curator.utils.CloseableUtils;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.framework.imps.TestCleanState;
import org.apache.curator.framework.recipes.shared.SharedCount;
import org.apache.curator.retry.RetryOneTime;
+import org.apache.curator.test.BaseClassForTests;
import org.apache.curator.test.Timing;
+import org.apache.curator.utils.CloseableUtils;
import org.testng.Assert;
import org.testng.annotations.Test;
import java.util.Collection;
@@ -100,10 +101,12 @@ public class TestInterProcessSemaphore extends BaseClassForTests
future1.get();
future2.get();
+
+ count.close();
}
finally
{
- CloseableUtils.closeQuietly(client);
+ TestCleanState.closeAndTestClean(client);
}
}
@@ -140,8 +143,8 @@ public class TestInterProcessSemaphore extends BaseClassForTests
}
finally
{
- CloseableUtils.closeQuietly(client1);
- CloseableUtils.closeQuietly(client2);
+ TestCleanState.closeAndTestClean(client1);
+ TestCleanState.closeAndTestClean(client2);
}
}
@@ -226,7 +229,7 @@ public class TestInterProcessSemaphore extends BaseClassForTests
}
finally
{
- client.close();
+ TestCleanState.closeAndTestClean(client);
}
return null;
}
@@ -299,7 +302,7 @@ public class TestInterProcessSemaphore extends BaseClassForTests
}
finally
{
- client.close();
+ TestCleanState.closeAndTestClean(client);
}
return null;
}
@@ -401,7 +404,7 @@ public class TestInterProcessSemaphore extends BaseClassForTests
}
finally
{
- client.close();
+ TestCleanState.closeAndTestClean(client);
}
}
@@ -445,7 +448,7 @@ public class TestInterProcessSemaphore extends BaseClassForTests
}
finally
{
- client.close();
+ TestCleanState.closeAndTestClean(client);
}
}
@@ -463,7 +466,7 @@ public class TestInterProcessSemaphore extends BaseClassForTests
}
finally
{
- client.close();
+ TestCleanState.closeAndTestClean(client);
}
}
@@ -499,7 +502,7 @@ public class TestInterProcessSemaphore extends BaseClassForTests
{
CloseableUtils.closeQuietly(l);
}
- CloseableUtils.closeQuietly(client);
+ TestCleanState.closeAndTestClean(client);
}
}
@@ -528,7 +531,7 @@ public class TestInterProcessSemaphore extends BaseClassForTests
{
CloseableUtils.closeQuietly(l);
}
- CloseableUtils.closeQuietly(client);
+ TestCleanState.closeAndTestClean(client);
}
}
}
http://git-wip-us.apache.org/repos/asf/curator/blob/f0a09db4/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessSemaphoreCluster.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessSemaphoreCluster.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessSemaphoreCluster.java
index 2aa8a72..f4cb7bb 100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessSemaphoreCluster.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessSemaphoreCluster.java
@@ -22,6 +22,7 @@ import com.google.common.collect.Lists;
import org.apache.curator.ensemble.EnsembleProvider;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.framework.imps.TestCleanState;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.framework.state.ConnectionStateListener;
import org.apache.curator.retry.ExponentialBackoffRetry;
@@ -147,7 +148,7 @@ public class TestInterProcessSemaphoreCluster
}
finally
{
- CloseableUtils.closeQuietly(client);
+ TestCleanState.closeAndTestClean(client);
}
return null;
}
http://git-wip-us.apache.org/repos/asf/curator/blob/f0a09db4/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestLockACLs.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestLockACLs.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestLockACLs.java
index 2d9a9aa..d1e6db5 100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestLockACLs.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestLockACLs.java
@@ -19,6 +19,7 @@
package org.apache.curator.framework.recipes.locks;
+import org.apache.curator.framework.imps.TestCleanState;
import org.apache.curator.test.BaseClassForTests;
import org.apache.curator.utils.CloseableUtils;
import org.apache.curator.RetryPolicy;
@@ -74,7 +75,7 @@ public class TestLockACLs extends BaseClassForTests
}
finally
{
- CloseableUtils.closeQuietly(client);
+ TestCleanState.closeAndTestClean(client);
}
}
http://git-wip-us.apache.org/repos/asf/curator/blob/f0a09db4/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestLockCleanlinessWithFaults.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestLockCleanlinessWithFaults.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestLockCleanlinessWithFaults.java
index 457be75..dc14c11 100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestLockCleanlinessWithFaults.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestLockCleanlinessWithFaults.java
@@ -20,6 +20,7 @@ package org.apache.curator.framework.recipes.locks;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.framework.imps.TestCleanState;
import org.apache.curator.retry.RetryNTimes;
import org.apache.curator.test.BaseClassForTests;
import org.apache.curator.utils.CloseableUtils;
@@ -67,7 +68,7 @@ public class TestLockCleanlinessWithFaults extends BaseClassForTests
}
finally
{
- CloseableUtils.closeQuietly(client);
+ TestCleanState.closeAndTestClean(client);
}
}
}