You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@curator.apache.org by ra...@apache.org on 2014/03/05 13:08:45 UTC
[2/3] git commit: more testing for rest proxy
more testing for rest proxy
Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/0dcba0f4
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/0dcba0f4
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/0dcba0f4
Branch: refs/heads/CURATOR-88
Commit: 0dcba0f47bc439ce506037512ab835238c2f320d
Parents: a798ff8
Author: randgalt <ra...@apache.org>
Authored: Tue Mar 4 21:06:08 2014 +0530
Committer: randgalt <ra...@apache.org>
Committed: Tue Mar 4 21:06:08 2014 +0530
----------------------------------------------------------------------
.../x/rest/dropwizard/CuratorRestBundle.java | 14 ++-
.../apache/curator/x/rest/api/TestLocks.java | 126 +++++++++++++++++++
.../x/rest/support/BaseClassForTests.java | 16 ++-
.../curator/x/rest/support/SessionManager.java | 30 ++++-
4 files changed, 173 insertions(+), 13 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/curator/blob/0dcba0f4/curator-x-rest/src/main/java/org/apache/curator/x/rest/dropwizard/CuratorRestBundle.java
----------------------------------------------------------------------
diff --git a/curator-x-rest/src/main/java/org/apache/curator/x/rest/dropwizard/CuratorRestBundle.java b/curator-x-rest/src/main/java/org/apache/curator/x/rest/dropwizard/CuratorRestBundle.java
index 6ad93b3..80f3e23 100644
--- a/curator-x-rest/src/main/java/org/apache/curator/x/rest/dropwizard/CuratorRestBundle.java
+++ b/curator-x-rest/src/main/java/org/apache/curator/x/rest/dropwizard/CuratorRestBundle.java
@@ -19,6 +19,7 @@
package org.apache.curator.x.rest.dropwizard;
+import com.google.common.annotations.VisibleForTesting;
import com.sun.jersey.spi.inject.SingletonTypeInjectableProvider;
import io.dropwizard.ConfiguredBundle;
import io.dropwizard.setup.Bootstrap;
@@ -34,6 +35,13 @@ import javax.ws.rs.core.Context;
public class CuratorRestBundle implements ConfiguredBundle<CuratorConfiguration>
{
+ private volatile CuratorRestContext curatorRestContext;
+
+ public CuratorRestContext getCuratorRestContext()
+ {
+ return curatorRestContext;
+ }
+
@Override
public void initialize(Bootstrap<?> bootstrap)
{
@@ -43,15 +51,15 @@ public class CuratorRestBundle implements ConfiguredBundle<CuratorConfiguration>
@Override
public void run(CuratorConfiguration configuration, Environment environment) throws Exception
{
- final CuratorRestContext context = newCuratorRestContext(configuration);
- runFromContext(environment, context);
+ curatorRestContext = newCuratorRestContext(configuration);
+ runFromContext(environment, curatorRestContext);
LifeCycle.Listener listener = new AbstractLifeCycle.AbstractLifeCycleListener()
{
@Override
public void lifeCycleStopping(LifeCycle event)
{
- closeCuratorClient(context.getClient());
+ closeCuratorClient(curatorRestContext.getClient());
}
};
environment.lifecycle().addLifeCycleListener(listener);
http://git-wip-us.apache.org/repos/asf/curator/blob/0dcba0f4/curator-x-rest/src/test/java/org/apache/curator/x/rest/api/TestLocks.java
----------------------------------------------------------------------
diff --git a/curator-x-rest/src/test/java/org/apache/curator/x/rest/api/TestLocks.java b/curator-x-rest/src/test/java/org/apache/curator/x/rest/api/TestLocks.java
index 4a7eaf0..6625388 100644
--- a/curator-x-rest/src/test/java/org/apache/curator/x/rest/api/TestLocks.java
+++ b/curator-x-rest/src/test/java/org/apache/curator/x/rest/api/TestLocks.java
@@ -20,16 +20,26 @@
package org.apache.curator.x.rest.api;
import org.apache.curator.framework.recipes.locks.InterProcessLock;
+import org.apache.curator.test.KillSession;
+import org.apache.curator.test.TestingServer;
+import org.apache.curator.test.Timing;
+import org.apache.curator.x.rest.entities.Status;
+import org.apache.curator.x.rest.entities.StatusMessage;
import org.apache.curator.x.rest.support.BaseClassForTests;
import org.apache.curator.x.rest.support.InterProcessLockBridge;
+import org.apache.curator.x.rest.support.StatusListener;
import org.testng.Assert;
import org.testng.annotations.Test;
+import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
+import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
public class TestLocks extends BaseClassForTests
@@ -140,4 +150,120 @@ public class TestLocks extends BaseClassForTests
Assert.assertTrue(mutexForClient1.isAcquiredInThisProcess());
}
}
+
+ @Test
+ public void testWaitingProcessKilledServer() throws Exception
+ {
+ final Timing timing = new Timing();
+ final CountDownLatch latch = new CountDownLatch(1);
+ StatusListener statusListener = new StatusListener()
+ {
+ @Override
+ public void statusUpdate(List<StatusMessage> messages)
+ {
+ // NOP
+ }
+
+ @Override
+ public void errorState(Status status)
+ {
+ if ( status.getState().equals("lost") )
+ {
+ latch.countDown();
+ }
+ }
+ };
+ sessionManager.setStatusListener(statusListener);
+
+ final AtomicBoolean isFirst = new AtomicBoolean(true);
+ ExecutorCompletionService<Object> service = new ExecutorCompletionService<Object>(Executors.newFixedThreadPool(2));
+ for ( int i = 0; i < 2; ++i )
+ {
+ service.submit
+ (
+ new Callable<Object>()
+ {
+ @Override
+ public Object call() throws Exception
+ {
+ InterProcessLock lock = new InterProcessLockBridge(restClient, sessionManager, uriMaker);
+ lock.acquire();
+ try
+ {
+ if ( isFirst.compareAndSet(true, false) )
+ {
+ timing.sleepABit();
+
+ server.stop();
+ Assert.assertTrue(timing.awaitLatch(latch));
+ server = new TestingServer(server.getPort(), server.getTempDirectory());
+ }
+ }
+ finally
+ {
+ try
+ {
+ lock.release();
+ }
+ catch ( Exception e )
+ {
+ // ignore
+ }
+ }
+ return null;
+ }
+ }
+ );
+ }
+
+ for ( int i = 0; i < 2; ++i )
+ {
+ service.take().get(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS);
+ }
+ }
+
+ @Test
+ public void testKilledSession() throws Exception
+ {
+ final Timing timing = new Timing();
+
+ final InterProcessLock mutex1 = new InterProcessLockBridge(restClient, sessionManager, uriMaker);
+ final InterProcessLock mutex2 = new InterProcessLockBridge(restClient, sessionManager, uriMaker);
+
+ final Semaphore semaphore = new Semaphore(0);
+ ExecutorCompletionService<Object> service = new ExecutorCompletionService<Object>(Executors.newFixedThreadPool(2));
+ service.submit
+ (
+ new Callable<Object>()
+ {
+ @Override
+ public Object call() throws Exception
+ {
+ mutex1.acquire();
+ semaphore.release();
+ Thread.sleep(1000000);
+ return null;
+ }
+ }
+ );
+
+ service.submit
+ (
+ new Callable<Object>()
+ {
+ @Override
+ public Object call() throws Exception
+ {
+ mutex2.acquire();
+ semaphore.release();
+ Thread.sleep(1000000);
+ return null;
+ }
+ }
+ );
+
+ Assert.assertTrue(timing.acquireSemaphore(semaphore, 1));
+ KillSession.kill(getCuratorRestContext().getClient().getZookeeperClient().getZooKeeper(), server.getConnectString());
+ Assert.assertTrue(timing.acquireSemaphore(semaphore, 1));
+ }
}
http://git-wip-us.apache.org/repos/asf/curator/blob/0dcba0f4/curator-x-rest/src/test/java/org/apache/curator/x/rest/support/BaseClassForTests.java
----------------------------------------------------------------------
diff --git a/curator-x-rest/src/test/java/org/apache/curator/x/rest/support/BaseClassForTests.java b/curator-x-rest/src/test/java/org/apache/curator/x/rest/support/BaseClassForTests.java
index 2690559..53b8b36 100644
--- a/curator-x-rest/src/test/java/org/apache/curator/x/rest/support/BaseClassForTests.java
+++ b/curator-x-rest/src/test/java/org/apache/curator/x/rest/support/BaseClassForTests.java
@@ -30,6 +30,7 @@ import io.dropwizard.setup.Environment;
import org.apache.curator.test.InstanceSpec;
import org.apache.curator.test.TestingServer;
import org.apache.curator.utils.DebugUtils;
+import org.apache.curator.x.rest.CuratorRestContext;
import org.apache.curator.x.rest.dropwizard.CuratorApplication;
import org.apache.curator.x.rest.dropwizard.CuratorConfiguration;
import org.apache.curator.x.rest.dropwizard.CuratorRestBundle;
@@ -58,22 +59,20 @@ public class BaseClassForTests
protected UriMaker uriMaker;
private File configFile;
+ private CuratorRestBundle bundle;
@BeforeMethod
public void setup() throws Exception
{
- setup(5000);
- }
+ bundle = new CuratorRestBundle();
- protected void setup(int sessionLengthMs) throws Exception
- {
int port = InstanceSpec.getRandomPort();
restClient = Client.create();
System.setProperty(DebugUtils.PROPERTY_DONT_LOG_CONNECTION_ISSUES, "true");
server = new TestingServer();
- configFile = makeConfigFile(server.getConnectString(), sessionLengthMs, port);
+ configFile = makeConfigFile(server.getConnectString(), 5000, port);
final AtomicReference<CuratorConfiguration> curatorConfigurationAtomicReference = new AtomicReference<CuratorConfiguration>();
makeAndStartApplication(curatorConfigurationAtomicReference, configFile);
@@ -104,6 +103,11 @@ public class BaseClassForTests
}
}
+ protected CuratorRestContext getCuratorRestContext()
+ {
+ return bundle.getCuratorRestContext();
+ }
+
private Application<CuratorConfiguration> makeAndStartApplication(final AtomicReference<CuratorConfiguration> curatorConfigurationRef, final File configFile) throws InterruptedException
{
final CountDownLatch startedLatch = new CountDownLatch(1);
@@ -112,7 +116,7 @@ public class BaseClassForTests
@Override
public void initialize(Bootstrap<CuratorConfiguration> bootstrap)
{
- bootstrap.addBundle(new CuratorRestBundle());
+ bootstrap.addBundle(bundle);
}
@Override
http://git-wip-us.apache.org/repos/asf/curator/blob/0dcba0f4/curator-x-rest/src/test/java/org/apache/curator/x/rest/support/SessionManager.java
----------------------------------------------------------------------
diff --git a/curator-x-rest/src/test/java/org/apache/curator/x/rest/support/SessionManager.java b/curator-x-rest/src/test/java/org/apache/curator/x/rest/support/SessionManager.java
index 69cfc76..140569e 100644
--- a/curator-x-rest/src/test/java/org/apache/curator/x/rest/support/SessionManager.java
+++ b/curator-x-rest/src/test/java/org/apache/curator/x/rest/support/SessionManager.java
@@ -49,6 +49,8 @@ public class SessionManager implements Closeable
private final Future<?> task;
private final ConcurrentMap<InetSocketAddress, Entry> entries = Maps.newConcurrentMap();
+ private volatile StatusListener statusListener;
+
private static class Entry
{
private final StatusListener listener;
@@ -104,6 +106,12 @@ public class SessionManager implements Closeable
task.cancel(true);
}
+ public void setStatusListener(StatusListener statusListener)
+ {
+ Preconditions.checkState(this.statusListener == null, "statusListener already set");
+ this.statusListener = statusListener;
+ }
+
private void processEntries()
{
for ( Map.Entry<InetSocketAddress, Entry> mapEntry : entries.entrySet() )
@@ -122,14 +130,28 @@ public class SessionManager implements Closeable
if ( status.getState().equals("connected") )
{
List<StatusMessage> messages = status.getMessages();
- if ( (messages.size() > 0) && (entry.listener != null) )
+ if ( messages.size() > 0 )
{
- entry.listener.statusUpdate(status.getMessages());
+ if ( statusListener != null )
+ {
+ statusListener.statusUpdate(status.getMessages());
+ }
+ if ( entry.listener != null )
+ {
+ entry.listener.statusUpdate(status.getMessages());
+ }
}
}
- else if ( entry.listener != null )
+ else
{
- entry.listener.errorState(status);
+ if ( statusListener != null )
+ {
+ statusListener.errorState(status);
+ }
+ if ( entry.listener != null )
+ {
+ entry.listener.errorState(status);
+ }
}
}
}