You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@curator.apache.org by ra...@apache.org on 2014/03/05 13:08:45 UTC

[2/3] git commit: more testing for rest proxy

more testing for rest proxy


Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/0dcba0f4
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/0dcba0f4
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/0dcba0f4

Branch: refs/heads/CURATOR-88
Commit: 0dcba0f47bc439ce506037512ab835238c2f320d
Parents: a798ff8
Author: randgalt <ra...@apache.org>
Authored: Tue Mar 4 21:06:08 2014 +0530
Committer: randgalt <ra...@apache.org>
Committed: Tue Mar 4 21:06:08 2014 +0530

----------------------------------------------------------------------
 .../x/rest/dropwizard/CuratorRestBundle.java    |  14 ++-
 .../apache/curator/x/rest/api/TestLocks.java    | 126 +++++++++++++++++++
 .../x/rest/support/BaseClassForTests.java       |  16 ++-
 .../curator/x/rest/support/SessionManager.java  |  30 ++++-
 4 files changed, 173 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/curator/blob/0dcba0f4/curator-x-rest/src/main/java/org/apache/curator/x/rest/dropwizard/CuratorRestBundle.java
----------------------------------------------------------------------
diff --git a/curator-x-rest/src/main/java/org/apache/curator/x/rest/dropwizard/CuratorRestBundle.java b/curator-x-rest/src/main/java/org/apache/curator/x/rest/dropwizard/CuratorRestBundle.java
index 6ad93b3..80f3e23 100644
--- a/curator-x-rest/src/main/java/org/apache/curator/x/rest/dropwizard/CuratorRestBundle.java
+++ b/curator-x-rest/src/main/java/org/apache/curator/x/rest/dropwizard/CuratorRestBundle.java
@@ -19,6 +19,7 @@
 
 package org.apache.curator.x.rest.dropwizard;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.sun.jersey.spi.inject.SingletonTypeInjectableProvider;
 import io.dropwizard.ConfiguredBundle;
 import io.dropwizard.setup.Bootstrap;
@@ -34,6 +35,13 @@ import javax.ws.rs.core.Context;
 
 public class CuratorRestBundle implements ConfiguredBundle<CuratorConfiguration>
 {
+    private volatile CuratorRestContext curatorRestContext;
+
+    public CuratorRestContext getCuratorRestContext()
+    {
+        return curatorRestContext;
+    }
+
     @Override
     public void initialize(Bootstrap<?> bootstrap)
     {
@@ -43,15 +51,15 @@ public class CuratorRestBundle implements ConfiguredBundle<CuratorConfiguration>
     @Override
     public void run(CuratorConfiguration configuration, Environment environment) throws Exception
     {
-        final CuratorRestContext context = newCuratorRestContext(configuration);
-        runFromContext(environment, context);
+        curatorRestContext = newCuratorRestContext(configuration);
+        runFromContext(environment, curatorRestContext);
 
         LifeCycle.Listener listener = new AbstractLifeCycle.AbstractLifeCycleListener()
         {
             @Override
             public void lifeCycleStopping(LifeCycle event)
             {
-                closeCuratorClient(context.getClient());
+                closeCuratorClient(curatorRestContext.getClient());
             }
         };
         environment.lifecycle().addLifeCycleListener(listener);

http://git-wip-us.apache.org/repos/asf/curator/blob/0dcba0f4/curator-x-rest/src/test/java/org/apache/curator/x/rest/api/TestLocks.java
----------------------------------------------------------------------
diff --git a/curator-x-rest/src/test/java/org/apache/curator/x/rest/api/TestLocks.java b/curator-x-rest/src/test/java/org/apache/curator/x/rest/api/TestLocks.java
index 4a7eaf0..6625388 100644
--- a/curator-x-rest/src/test/java/org/apache/curator/x/rest/api/TestLocks.java
+++ b/curator-x-rest/src/test/java/org/apache/curator/x/rest/api/TestLocks.java
@@ -20,16 +20,26 @@
 package org.apache.curator.x.rest.api;
 
 import org.apache.curator.framework.recipes.locks.InterProcessLock;
+import org.apache.curator.test.KillSession;
+import org.apache.curator.test.TestingServer;
+import org.apache.curator.test.Timing;
+import org.apache.curator.x.rest.entities.Status;
+import org.apache.curator.x.rest.entities.StatusMessage;
 import org.apache.curator.x.rest.support.BaseClassForTests;
 import org.apache.curator.x.rest.support.InterProcessLockBridge;
+import org.apache.curator.x.rest.support.StatusListener;
 import org.testng.Assert;
 import org.testng.annotations.Test;
+import java.util.List;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorCompletionService;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
+import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 
 public class TestLocks extends BaseClassForTests
@@ -140,4 +150,120 @@ public class TestLocks extends BaseClassForTests
             Assert.assertTrue(mutexForClient1.isAcquiredInThisProcess());
         }
     }
+
+    @Test
+    public void testWaitingProcessKilledServer() throws Exception
+    {
+        final Timing timing = new Timing();
+        final CountDownLatch latch = new CountDownLatch(1);
+        StatusListener statusListener = new StatusListener()
+        {
+            @Override
+            public void statusUpdate(List<StatusMessage> messages)
+            {
+                // NOP
+            }
+
+            @Override
+            public void errorState(Status status)
+            {
+                if ( status.getState().equals("lost") )
+                {
+                    latch.countDown();
+                }
+            }
+        };
+        sessionManager.setStatusListener(statusListener);
+
+        final AtomicBoolean isFirst = new AtomicBoolean(true);
+        ExecutorCompletionService<Object> service = new ExecutorCompletionService<Object>(Executors.newFixedThreadPool(2));
+        for ( int i = 0; i < 2; ++i )
+        {
+            service.submit
+                (
+                    new Callable<Object>()
+                    {
+                        @Override
+                        public Object call() throws Exception
+                        {
+                            InterProcessLock lock = new InterProcessLockBridge(restClient, sessionManager, uriMaker);
+                            lock.acquire();
+                            try
+                            {
+                                if ( isFirst.compareAndSet(true, false) )
+                                {
+                                    timing.sleepABit();
+
+                                    server.stop();
+                                    Assert.assertTrue(timing.awaitLatch(latch));
+                                    server = new TestingServer(server.getPort(), server.getTempDirectory());
+                                }
+                            }
+                            finally
+                            {
+                                try
+                                {
+                                    lock.release();
+                                }
+                                catch ( Exception e )
+                                {
+                                    // ignore
+                                }
+                            }
+                            return null;
+                        }
+                    }
+                );
+        }
+
+        for ( int i = 0; i < 2; ++i )
+        {
+            service.take().get(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS);
+        }
+    }
+
+    @Test
+    public void testKilledSession() throws Exception
+    {
+        final Timing timing = new Timing();
+
+        final InterProcessLock mutex1 = new InterProcessLockBridge(restClient, sessionManager, uriMaker);
+        final InterProcessLock mutex2 = new InterProcessLockBridge(restClient, sessionManager, uriMaker);
+
+        final Semaphore semaphore = new Semaphore(0);
+        ExecutorCompletionService<Object> service = new ExecutorCompletionService<Object>(Executors.newFixedThreadPool(2));
+        service.submit
+            (
+                new Callable<Object>()
+                {
+                    @Override
+                    public Object call() throws Exception
+                    {
+                        mutex1.acquire();
+                        semaphore.release();
+                        Thread.sleep(1000000);
+                        return null;
+                    }
+                }
+            );
+
+        service.submit
+            (
+                new Callable<Object>()
+                {
+                    @Override
+                    public Object call() throws Exception
+                    {
+                        mutex2.acquire();
+                        semaphore.release();
+                        Thread.sleep(1000000);
+                        return null;
+                    }
+                }
+            );
+
+        Assert.assertTrue(timing.acquireSemaphore(semaphore, 1));
+        KillSession.kill(getCuratorRestContext().getClient().getZookeeperClient().getZooKeeper(), server.getConnectString());
+        Assert.assertTrue(timing.acquireSemaphore(semaphore, 1));
+    }
 }

http://git-wip-us.apache.org/repos/asf/curator/blob/0dcba0f4/curator-x-rest/src/test/java/org/apache/curator/x/rest/support/BaseClassForTests.java
----------------------------------------------------------------------
diff --git a/curator-x-rest/src/test/java/org/apache/curator/x/rest/support/BaseClassForTests.java b/curator-x-rest/src/test/java/org/apache/curator/x/rest/support/BaseClassForTests.java
index 2690559..53b8b36 100644
--- a/curator-x-rest/src/test/java/org/apache/curator/x/rest/support/BaseClassForTests.java
+++ b/curator-x-rest/src/test/java/org/apache/curator/x/rest/support/BaseClassForTests.java
@@ -30,6 +30,7 @@ import io.dropwizard.setup.Environment;
 import org.apache.curator.test.InstanceSpec;
 import org.apache.curator.test.TestingServer;
 import org.apache.curator.utils.DebugUtils;
+import org.apache.curator.x.rest.CuratorRestContext;
 import org.apache.curator.x.rest.dropwizard.CuratorApplication;
 import org.apache.curator.x.rest.dropwizard.CuratorConfiguration;
 import org.apache.curator.x.rest.dropwizard.CuratorRestBundle;
@@ -58,22 +59,20 @@ public class BaseClassForTests
     protected UriMaker uriMaker;
 
     private File configFile;
+    private CuratorRestBundle bundle;
 
     @BeforeMethod
     public void     setup() throws Exception
     {
-        setup(5000);
-    }
+        bundle = new CuratorRestBundle();
 
-    protected void setup(int sessionLengthMs) throws Exception
-    {
         int port = InstanceSpec.getRandomPort();
         restClient = Client.create();
 
         System.setProperty(DebugUtils.PROPERTY_DONT_LOG_CONNECTION_ISSUES, "true");
         server = new TestingServer();
 
-        configFile = makeConfigFile(server.getConnectString(), sessionLengthMs, port);
+        configFile = makeConfigFile(server.getConnectString(), 5000, port);
 
         final AtomicReference<CuratorConfiguration> curatorConfigurationAtomicReference = new AtomicReference<CuratorConfiguration>();
         makeAndStartApplication(curatorConfigurationAtomicReference, configFile);
@@ -104,6 +103,11 @@ public class BaseClassForTests
         }
     }
 
+    protected CuratorRestContext getCuratorRestContext()
+    {
+        return bundle.getCuratorRestContext();
+    }
+
     private Application<CuratorConfiguration> makeAndStartApplication(final AtomicReference<CuratorConfiguration> curatorConfigurationRef, final File configFile) throws InterruptedException
     {
         final CountDownLatch startedLatch = new CountDownLatch(1);
@@ -112,7 +116,7 @@ public class BaseClassForTests
             @Override
             public void initialize(Bootstrap<CuratorConfiguration> bootstrap)
             {
-                bootstrap.addBundle(new CuratorRestBundle());
+                bootstrap.addBundle(bundle);
             }
 
             @Override

http://git-wip-us.apache.org/repos/asf/curator/blob/0dcba0f4/curator-x-rest/src/test/java/org/apache/curator/x/rest/support/SessionManager.java
----------------------------------------------------------------------
diff --git a/curator-x-rest/src/test/java/org/apache/curator/x/rest/support/SessionManager.java b/curator-x-rest/src/test/java/org/apache/curator/x/rest/support/SessionManager.java
index 69cfc76..140569e 100644
--- a/curator-x-rest/src/test/java/org/apache/curator/x/rest/support/SessionManager.java
+++ b/curator-x-rest/src/test/java/org/apache/curator/x/rest/support/SessionManager.java
@@ -49,6 +49,8 @@ public class SessionManager implements Closeable
     private final Future<?> task;
     private final ConcurrentMap<InetSocketAddress, Entry> entries = Maps.newConcurrentMap();
 
+    private volatile StatusListener statusListener;
+
     private static class Entry
     {
         private final StatusListener listener;
@@ -104,6 +106,12 @@ public class SessionManager implements Closeable
         task.cancel(true);
     }
 
+    public void setStatusListener(StatusListener statusListener)
+    {
+        Preconditions.checkState(this.statusListener == null, "statusListener already set");
+        this.statusListener = statusListener;
+    }
+
     private void processEntries()
     {
         for ( Map.Entry<InetSocketAddress, Entry> mapEntry : entries.entrySet() )
@@ -122,14 +130,28 @@ public class SessionManager implements Closeable
             if ( status.getState().equals("connected") )
             {
                 List<StatusMessage> messages = status.getMessages();
-                if ( (messages.size() > 0) && (entry.listener != null) )
+                if ( messages.size() > 0 )
                 {
-                    entry.listener.statusUpdate(status.getMessages());
+                    if ( statusListener != null )
+                    {
+                        statusListener.statusUpdate(status.getMessages());
+                    }
+                    if ( entry.listener != null )
+                    {
+                        entry.listener.statusUpdate(status.getMessages());
+                    }
                 }
             }
-            else if ( entry.listener != null )
+            else
             {
-                entry.listener.errorState(status);
+                if ( statusListener != null )
+                {
+                    statusListener.errorState(status);
+                }
+                if ( entry.listener != null )
+                {
+                    entry.listener.errorState(status);
+                }
             }
         }
     }