You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@curator.apache.org by ra...@apache.org on 2014/03/02 13:13:20 UTC

git commit: more tests

Repository: curator
Updated Branches:
  refs/heads/CURATOR-88 0313b6fce -> cf30ec2f6


more tests


Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/cf30ec2f
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/cf30ec2f
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/cf30ec2f

Branch: refs/heads/CURATOR-88
Commit: cf30ec2f6c158f36e16a9489d3450426aa9991f4
Parents: 0313b6f
Author: randgalt <ra...@apache.org>
Authored: Sun Mar 2 17:43:10 2014 +0530
Committer: randgalt <ra...@apache.org>
Committed: Sun Mar 2 17:43:10 2014 +0530

----------------------------------------------------------------------
 .../curator/x/rest/CuratorRestContext.java      |   1 -
 .../apache/curator/x/rest/api/TestClient.java   |  80 ++++++++++-
 .../x/rest/support/BaseClassForTests.java       |  15 +-
 .../curator/x/rest/support/SessionManager.java  | 136 +++++++++++++++++++
 .../curator/x/rest/support/StatusListener.java  |  31 +++++
 5 files changed, 256 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/curator/blob/cf30ec2f/curator-x-rest/src/main/java/org/apache/curator/x/rest/CuratorRestContext.java
----------------------------------------------------------------------
diff --git a/curator-x-rest/src/main/java/org/apache/curator/x/rest/CuratorRestContext.java b/curator-x-rest/src/main/java/org/apache/curator/x/rest/CuratorRestContext.java
index 68cb939..50943b0 100644
--- a/curator-x-rest/src/main/java/org/apache/curator/x/rest/CuratorRestContext.java
+++ b/curator-x-rest/src/main/java/org/apache/curator/x/rest/CuratorRestContext.java
@@ -69,7 +69,6 @@ public class CuratorRestContext implements Closeable
 
     public CuratorFramework getClient()
     {
-        Preconditions.checkState(state.get() == State.STARTED, "Not started");
         return client;
     }
 

http://git-wip-us.apache.org/repos/asf/curator/blob/cf30ec2f/curator-x-rest/src/test/java/org/apache/curator/x/rest/api/TestClient.java
----------------------------------------------------------------------
diff --git a/curator-x-rest/src/test/java/org/apache/curator/x/rest/api/TestClient.java b/curator-x-rest/src/test/java/org/apache/curator/x/rest/api/TestClient.java
index b839603..e6bb391 100644
--- a/curator-x-rest/src/test/java/org/apache/curator/x/rest/api/TestClient.java
+++ b/curator-x-rest/src/test/java/org/apache/curator/x/rest/api/TestClient.java
@@ -24,15 +24,23 @@ import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.CuratorFrameworkFactory;
 import org.apache.curator.retry.RetryOneTime;
 import org.apache.curator.test.Timing;
+import org.apache.curator.x.rest.entities.CreateSpec;
 import org.apache.curator.x.rest.entities.ExistsSpec;
+import org.apache.curator.x.rest.entities.PathAndId;
 import org.apache.curator.x.rest.entities.Status;
 import org.apache.curator.x.rest.entities.StatusMessage;
 import org.apache.curator.x.rest.support.BaseClassForTests;
+import org.apache.curator.x.rest.support.SessionManager;
+import org.apache.curator.x.rest.support.StatusListener;
+import org.apache.zookeeper.CreateMode;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.UriBuilder;
+import java.net.InetSocketAddress;
 import java.net.URI;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicReference;
 
 public class TestClient extends BaseClassForTests
 {
@@ -54,6 +62,68 @@ public class TestClient extends BaseClassForTests
     }
 
     @Test
+    public void testSession() throws Exception
+    {
+        final String path = "/my/path";
+
+        CuratorFramework client = null;
+        SessionManager sessionManager = new SessionManager(restClient, curatorConfiguration.getSessionLengthMs());
+        try
+        {
+            CreateSpec createSpec = new CreateSpec();
+            createSpec.setPath(path);
+            createSpec.setCreatingParentsIfNeeded(true);
+            createSpec.setMode(CreateMode.EPHEMERAL);
+            PathAndId pathAndId = restClient.resource(getMethodUri("create")).type(MediaType.APPLICATION_JSON).post(PathAndId.class, createSpec);
+
+            final AtomicReference<String> expiredId = new AtomicReference<String>();
+            StatusListener listener = new StatusListener()
+            {
+                @Override
+                public void statusUpdate(List<StatusMessage> messages)
+                {
+                    for ( StatusMessage statusMessage : messages )
+                    {
+                        if ( statusMessage.getType().equals("expired") )
+                        {
+                            if ( expiredId.get() != null )
+                            {
+                                expiredId.set("-1");
+                            }
+                            else
+                            {
+                                expiredId.set(statusMessage.getSourceId());
+                            }
+                        }
+                    }
+                }
+
+                @Override
+                public void errorState(Status status)
+                {
+                }
+            };
+            sessionManager.addEntry(new InetSocketAddress("localhost", PORT), pathAndId.getId(), listener);
+
+            Thread.sleep(2 * curatorConfiguration.getSessionLengthMs());
+
+            client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1));
+            client.start();
+            Assert.assertNotNull(client.checkExists().forPath(path));
+
+            sessionManager.removeEntry(new InetSocketAddress("localhost", PORT), pathAndId.getId());
+            Thread.sleep(2 * curatorConfiguration.getSessionLengthMs());
+            Assert.assertNull(client.checkExists().forPath(path));
+            Assert.assertEquals(pathAndId.getId(), expiredId.get());
+        }
+        finally
+        {
+            CloseUtil.closeQuietly(sessionManager);
+            CloseUtil.closeQuietly(client);
+        }
+    }
+
+    @Test
     public void testWatcher() throws Exception
     {
         final String path = "/a/path/to/a/node";
@@ -63,8 +133,7 @@ public class TestClient extends BaseClassForTests
         existsSpec.setPath(path);
         existsSpec.setWatched(true);
         existsSpec.setWatchId(watchId);
-        URI uri = UriBuilder.fromUri("http://localhost:" + PORT).path(ClientResource.class).path(ClientResource.class, "exists").build();
-        restClient.resource(uri).type(MediaType.APPLICATION_JSON).post(existsSpec);
+        restClient.resource(getMethodUri("exists")).type(MediaType.APPLICATION_JSON).post(existsSpec);
 
         CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1));
         try
@@ -105,8 +174,13 @@ public class TestClient extends BaseClassForTests
         Assert.assertTrue(foundMessage);
     }
 
+    private URI getMethodUri(String method)
+    {
+        return UriBuilder.fromUri("http://localhost:" + PORT).path(ClientResource.class).path(ClientResource.class, method).build();
+    }
+
     private URI getStatusUri()
     {
-        return UriBuilder.fromUri("http://localhost:" + PORT).path(ClientResource.class).path(ClientResource.class, "getStatus").build();
+        return getMethodUri("getStatus");
     }
 }

http://git-wip-us.apache.org/repos/asf/curator/blob/cf30ec2f/curator-x-rest/src/test/java/org/apache/curator/x/rest/support/BaseClassForTests.java
----------------------------------------------------------------------
diff --git a/curator-x-rest/src/test/java/org/apache/curator/x/rest/support/BaseClassForTests.java b/curator-x-rest/src/test/java/org/apache/curator/x/rest/support/BaseClassForTests.java
index c9d4cb7..ea0a1a5 100644
--- a/curator-x-rest/src/test/java/org/apache/curator/x/rest/support/BaseClassForTests.java
+++ b/curator-x-rest/src/test/java/org/apache/curator/x/rest/support/BaseClassForTests.java
@@ -33,6 +33,7 @@ import org.apache.curator.x.rest.dropwizard.CuratorRestBundle;
 import org.eclipse.jetty.util.component.AbstractLifeCycle;
 import org.eclipse.jetty.util.component.LifeCycle;
 import org.eclipse.jetty.util.thread.ShutdownThread;
+import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import java.io.File;
@@ -40,12 +41,15 @@ import java.nio.charset.Charset;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
 
 public class BaseClassForTests
 {
     protected TestingServer server;
     protected Application<CuratorConfiguration> application;
     protected Client restClient;
+    protected CuratorConfiguration curatorConfiguration;
 
     protected static final int PORT = 8080;
 
@@ -60,9 +64,10 @@ public class BaseClassForTests
         server = new TestingServer();
 
         configFile = File.createTempFile("temp", ".tmp");
-        CharStreams.write("{\"connection-string\": \"" + server.getConnectString() + "\"}", Files.newWriterSupplier(configFile, Charset.defaultCharset()));
+        CharStreams.write("{\"connection-string\": \"" + server.getConnectString() + "\", \"session-length-ms\":5000}", Files.newWriterSupplier(configFile, Charset.defaultCharset()));
 
         final CountDownLatch startedLatch = new CountDownLatch(1);
+        final AtomicReference<CuratorConfiguration> curatorConfigurationAtomicReference = new AtomicReference<CuratorConfiguration>();
         application = new Application<CuratorConfiguration>()
         {
             @Override
@@ -74,6 +79,7 @@ public class BaseClassForTests
             @Override
             public void run(CuratorConfiguration configuration, Environment environment) throws Exception
             {
+                curatorConfigurationAtomicReference.set(configuration);
                 LifeCycle.Listener listener = new AbstractLifeCycle.AbstractLifeCycleListener()
                 {
                     @Override
@@ -100,7 +106,9 @@ public class BaseClassForTests
             }
         );
 
-        startedLatch.await();
+        Assert.assertTrue(startedLatch.await(5, TimeUnit.SECONDS));
+
+        curatorConfiguration = curatorConfigurationAtomicReference.get();
     }
 
     @AfterMethod
@@ -117,7 +125,8 @@ public class BaseClassForTests
             restClient.destroy();
         }
 
-        server.close();
         ShutdownThread.getInstance().run();
+
+        server.close();
     }
 }

http://git-wip-us.apache.org/repos/asf/curator/blob/cf30ec2f/curator-x-rest/src/test/java/org/apache/curator/x/rest/support/SessionManager.java
----------------------------------------------------------------------
diff --git a/curator-x-rest/src/test/java/org/apache/curator/x/rest/support/SessionManager.java b/curator-x-rest/src/test/java/org/apache/curator/x/rest/support/SessionManager.java
new file mode 100644
index 0000000..69cfc76
--- /dev/null
+++ b/curator-x-rest/src/test/java/org/apache/curator/x/rest/support/SessionManager.java
@@ -0,0 +1,136 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.curator.x.rest.support;
+
+import com.google.common.base.Equivalence;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import com.sun.jersey.api.client.Client;
+import org.apache.curator.x.rest.api.ClientResource;
+import org.apache.curator.x.rest.entities.Status;
+import org.apache.curator.x.rest.entities.StatusMessage;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.UriBuilder;
+import java.io.Closeable;
+import java.net.InetSocketAddress;
+import java.net.URI;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+public class SessionManager implements Closeable
+{
+    private final Client restClient;
+    private final Future<?> task;
+    private final ConcurrentMap<InetSocketAddress, Entry> entries = Maps.newConcurrentMap();
+
+    private static class Entry
+    {
+        private final StatusListener listener;
+        private final Queue<String> ids;
+
+        private Entry(StatusListener listener, Queue<String> ids)
+        {
+            this.listener = listener;
+            this.ids = ids;
+        }
+    }
+
+    public SessionManager(Client restClient, int sessionMs)
+    {
+        this(restClient, Executors.newSingleThreadScheduledExecutor(new ThreadFactoryBuilder().setDaemon(true).setNameFormat("SessionManager-%d").build()), sessionMs);
+    }
+
+    public SessionManager(Client restClient, ScheduledExecutorService service, int sessionMs)
+    {
+        this.restClient = restClient;
+        Runnable command = new Runnable()
+        {
+            @Override
+            public void run()
+            {
+                processEntries();
+            }
+        };
+        task = service.scheduleAtFixedRate(command, sessionMs / 3, sessionMs / 3, TimeUnit.MILLISECONDS);
+    }
+
+    public void addEntry(InetSocketAddress address, String id, StatusListener listener)
+    {
+        Entry newEntry = new Entry(listener, new ConcurrentLinkedQueue<String>());
+        Entry oldEntry = entries.putIfAbsent(address, newEntry);
+        Entry useEntry = (oldEntry != null) ? oldEntry : newEntry;
+        Preconditions.checkArgument(Equivalence.identity().equivalent(useEntry.listener, listener), "Different listener passed in for address: " + address);
+        useEntry.ids.add(id);
+    }
+
+    public void removeEntry(InetSocketAddress address, String id)
+    {
+        Entry entry = entries.get(address);
+        if ( entry != null )
+        {
+            entry.ids.remove(id);
+        }
+    }
+
+    @Override
+    public void close()
+    {
+        task.cancel(true);
+    }
+
+    private void processEntries()
+    {
+        for ( Map.Entry<InetSocketAddress, Entry> mapEntry : entries.entrySet() )
+        {
+            InetSocketAddress address = mapEntry.getKey();
+            Entry entry = mapEntry.getValue();
+
+            URI uri = UriBuilder
+                .fromUri("http://" + address.getHostName() + ":" + address.getPort())
+                .path(ClientResource.class)
+                .path(ClientResource.class, "getStatus")
+                .build();
+
+            List<String> idsList = Lists.newArrayList(entry.ids);
+            Status status = restClient.resource(uri).type(MediaType.APPLICATION_JSON).post(Status.class, idsList);
+            if ( status.getState().equals("connected") )
+            {
+                List<StatusMessage> messages = status.getMessages();
+                if ( (messages.size() > 0) && (entry.listener != null) )
+                {
+                    entry.listener.statusUpdate(status.getMessages());
+                }
+            }
+            else if ( entry.listener != null )
+            {
+                entry.listener.errorState(status);
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/curator/blob/cf30ec2f/curator-x-rest/src/test/java/org/apache/curator/x/rest/support/StatusListener.java
----------------------------------------------------------------------
diff --git a/curator-x-rest/src/test/java/org/apache/curator/x/rest/support/StatusListener.java b/curator-x-rest/src/test/java/org/apache/curator/x/rest/support/StatusListener.java
new file mode 100644
index 0000000..b4b0cc4
--- /dev/null
+++ b/curator-x-rest/src/test/java/org/apache/curator/x/rest/support/StatusListener.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.curator.x.rest.support;
+
+import org.apache.curator.x.rest.entities.Status;
+import org.apache.curator.x.rest.entities.StatusMessage;
+import java.util.List;
+
+public interface StatusListener
+{
+    public void statusUpdate(List<StatusMessage> messages);
+
+    public void errorState(Status status);
+}