You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@curator.apache.org by ra...@apache.org on 2014/03/10 01:09:41 UTC

git commit: leader tests

Repository: curator
Updated Branches:
  refs/heads/CURATOR-88 fbd9c39ca -> 7e768f0bd


leader tests


Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/7e768f0b
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/7e768f0b
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/7e768f0b

Branch: refs/heads/CURATOR-88
Commit: 7e768f0bd5d2c022924a7e858d8af9c69ba769e3
Parents: fbd9c39
Author: randgalt <ra...@apache.org>
Authored: Sun Mar 9 19:09:27 2014 -0500
Committer: randgalt <ra...@apache.org>
Committed: Sun Mar 9 19:09:27 2014 -0500

----------------------------------------------------------------------
 .../recipes/leader/TestLeaderLatch.java         |  2 +-
 .../apache/curator/x/rest/api/TestLeader.java   | 99 ++++++++++++++++++++
 .../apache/curator/x/rest/api/TestLocks.java    |  2 +-
 .../x/rest/support/LeaderLatchBridge.java       | 95 +++++++++++++++++++
 .../curator/x/rest/support/SessionManager.java  | 23 +++--
 5 files changed, 210 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/curator/blob/7e768f0b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java
index f4b5590..79ef7ba 100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java
@@ -373,7 +373,7 @@ public class TestLeaderLatch extends BaseClassForTests
 
     private void basic(Mode mode) throws Exception
     {
-        final int PARTICIPANT_QTY = 1;//0;
+        final int PARTICIPANT_QTY = 10;
 
         List<LeaderLatch> latches = Lists.newArrayList();
 

http://git-wip-us.apache.org/repos/asf/curator/blob/7e768f0b/curator-x-rest/src/test/java/org/apache/curator/x/rest/api/TestLeader.java
----------------------------------------------------------------------
diff --git a/curator-x-rest/src/test/java/org/apache/curator/x/rest/api/TestLeader.java b/curator-x-rest/src/test/java/org/apache/curator/x/rest/api/TestLeader.java
new file mode 100644
index 0000000..b179b8b
--- /dev/null
+++ b/curator-x-rest/src/test/java/org/apache/curator/x/rest/api/TestLeader.java
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.curator.x.rest.api;
+
+import com.google.common.collect.Lists;
+import org.apache.curator.test.Timing;
+import org.apache.curator.utils.CloseableUtils;
+import org.apache.curator.x.rest.support.BaseClassForTests;
+import org.apache.curator.x.rest.support.LeaderLatchBridge;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+import java.util.Collection;
+import java.util.List;
+
+public class TestLeader extends BaseClassForTests
+{
+    private static final int MAX_LOOPS = 5;
+
+    @Test
+    public void testBasic() throws Exception
+    {
+        final String PATH_NAME = "/leader";
+        final int PARTICIPANT_QTY = 10;
+
+        List<LeaderLatchBridge> latches = Lists.newArrayList();
+
+        Timing timing = new Timing();
+        try
+        {
+            for ( int i = 0; i < PARTICIPANT_QTY; ++i )
+            {
+                LeaderLatchBridge latch = new LeaderLatchBridge(restClient, sessionManager, uriMaker, PATH_NAME);
+                latch.start();
+                latches.add(latch);
+            }
+
+            while ( latches.size() > 0 )
+            {
+                List<LeaderLatchBridge> leaders = waitForALeader(latches, timing);
+                Assert.assertEquals(leaders.size(), 1); // there can only be one leader
+                LeaderLatchBridge theLeader = leaders.get(0);
+                Assert.assertEquals(latches.indexOf(theLeader), 0); // assert ordering - leadership should advance in start order
+                theLeader.close();
+                latches.remove(theLeader);
+            }
+        }
+        finally
+        {
+            for ( LeaderLatchBridge latch : latches )
+            {
+                CloseableUtils.closeQuietly(latch);
+            }
+        }
+    }
+
+    private List<LeaderLatchBridge> waitForALeader(List<LeaderLatchBridge> latches, Timing timing) throws InterruptedException
+    {
+        for ( int i = 0; i < MAX_LOOPS; ++i )
+        {
+            List<LeaderLatchBridge> leaders = getLeaders(latches);
+            if ( leaders.size() != 0 )
+            {
+                return leaders;
+            }
+            timing.sleepABit();
+        }
+        return Lists.newArrayList();
+    }
+
+    private List<LeaderLatchBridge> getLeaders(Collection<LeaderLatchBridge> latches)
+    {
+        List<LeaderLatchBridge> leaders = Lists.newArrayList();
+        for ( LeaderLatchBridge latch : latches )
+        {
+            if ( latch.hasLeadership() )
+            {
+                leaders.add(latch);
+            }
+        }
+        return leaders;
+    }
+}

http://git-wip-us.apache.org/repos/asf/curator/blob/7e768f0b/curator-x-rest/src/test/java/org/apache/curator/x/rest/api/TestLocks.java
----------------------------------------------------------------------
diff --git a/curator-x-rest/src/test/java/org/apache/curator/x/rest/api/TestLocks.java b/curator-x-rest/src/test/java/org/apache/curator/x/rest/api/TestLocks.java
index 7ea6245..308e0c4 100644
--- a/curator-x-rest/src/test/java/org/apache/curator/x/rest/api/TestLocks.java
+++ b/curator-x-rest/src/test/java/org/apache/curator/x/rest/api/TestLocks.java
@@ -171,7 +171,7 @@ public class TestLocks extends BaseClassForTests
                 }
             }
         };
-        sessionManager.setStatusListener(statusListener);
+        sessionManager.addStatusListener(statusListener);
 
         final AtomicBoolean isFirst = new AtomicBoolean(true);
         ExecutorCompletionService<Object> service = new ExecutorCompletionService<Object>(Executors.newFixedThreadPool(2));

http://git-wip-us.apache.org/repos/asf/curator/blob/7e768f0b/curator-x-rest/src/test/java/org/apache/curator/x/rest/support/LeaderLatchBridge.java
----------------------------------------------------------------------
diff --git a/curator-x-rest/src/test/java/org/apache/curator/x/rest/support/LeaderLatchBridge.java b/curator-x-rest/src/test/java/org/apache/curator/x/rest/support/LeaderLatchBridge.java
new file mode 100644
index 0000000..ea03803
--- /dev/null
+++ b/curator-x-rest/src/test/java/org/apache/curator/x/rest/support/LeaderLatchBridge.java
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.curator.x.rest.support;
+
+import com.sun.jersey.api.client.Client;
+import org.apache.curator.x.rest.api.LeaderResource;
+import org.apache.curator.x.rest.entities.Id;
+import org.apache.curator.x.rest.entities.LeaderSpec;
+import org.apache.curator.x.rest.entities.Status;
+import org.apache.curator.x.rest.entities.StatusMessage;
+import javax.ws.rs.core.MediaType;
+import java.io.Closeable;
+import java.io.IOException;
+import java.net.URI;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public class LeaderLatchBridge implements Closeable, StatusListener
+{
+    private final Client restClient;
+    private final SessionManager sessionManager;
+    private final UriMaker uriMaker;
+    private final String path;
+    private final AtomicBoolean hasLeadership = new AtomicBoolean(false);
+
+    private volatile String id = null;
+
+    public LeaderLatchBridge(Client restClient, SessionManager sessionManager, UriMaker uriMaker, String path)
+    {
+        this.restClient = restClient;
+        this.sessionManager = sessionManager;
+        this.uriMaker = uriMaker;
+        this.path = path;
+    }
+
+    @Override
+    public void close() throws IOException
+    {
+        hasLeadership.set(false);
+        sessionManager.removeStatusListener(this);
+        sessionManager.removeEntry(uriMaker.getLocalhost(), id);
+        URI uri = uriMaker.getMethodUri(LeaderResource.class, null);
+        restClient.resource(uri).path(id).delete();
+    }
+
+    public boolean hasLeadership()
+    {
+        return hasLeadership.get();
+    }
+
+    public void start()
+    {
+        LeaderSpec leaderSpec = new LeaderSpec();
+        leaderSpec.setPath(path);
+        URI uri = uriMaker.getMethodUri(LeaderResource.class, null);
+        id = restClient.resource(uri).type(MediaType.APPLICATION_JSON_TYPE).post(Id.class, leaderSpec).getId();
+        sessionManager.addEntry(uriMaker.getLocalhost(), id, null);
+        sessionManager.addStatusListener(this);
+    }
+
+    @Override
+    public void statusUpdate(List<StatusMessage> messages)
+    {
+        for ( StatusMessage statusMessage : messages )
+        {
+            if ( statusMessage.getType().equals("leader") && statusMessage.getSourceId().equals(id) )
+            {
+                hasLeadership.set(Boolean.valueOf(statusMessage.getMessage()));
+            }
+        }
+    }
+
+    @Override
+    public void errorState(Status status)
+    {
+        hasLeadership.set(false);
+    }
+}

http://git-wip-us.apache.org/repos/asf/curator/blob/7e768f0b/curator-x-rest/src/test/java/org/apache/curator/x/rest/support/SessionManager.java
----------------------------------------------------------------------
diff --git a/curator-x-rest/src/test/java/org/apache/curator/x/rest/support/SessionManager.java b/curator-x-rest/src/test/java/org/apache/curator/x/rest/support/SessionManager.java
index 140569e..10fc780 100644
--- a/curator-x-rest/src/test/java/org/apache/curator/x/rest/support/SessionManager.java
+++ b/curator-x-rest/src/test/java/org/apache/curator/x/rest/support/SessionManager.java
@@ -23,6 +23,7 @@ import com.google.common.base.Equivalence;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import com.sun.jersey.api.client.Client;
 import org.apache.curator.x.rest.api.ClientResource;
@@ -36,6 +37,7 @@ import java.net.URI;
 import java.util.List;
 import java.util.Map;
 import java.util.Queue;
+import java.util.Set;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.Executors;
@@ -48,8 +50,7 @@ public class SessionManager implements Closeable
     private final Client restClient;
     private final Future<?> task;
     private final ConcurrentMap<InetSocketAddress, Entry> entries = Maps.newConcurrentMap();
-
-    private volatile StatusListener statusListener;
+    private final Set<StatusListener> statusListeners = Sets.newSetFromMap(Maps.<StatusListener, Boolean>newConcurrentMap());
 
     private static class Entry
     {
@@ -106,10 +107,14 @@ public class SessionManager implements Closeable
         task.cancel(true);
     }
 
-    public void setStatusListener(StatusListener statusListener)
+    public void addStatusListener(StatusListener statusListener)
+    {
+        statusListeners.add(statusListener);
+    }
+
+    public void removeStatusListener(StatusListener statusListener)
     {
-        Preconditions.checkState(this.statusListener == null, "statusListener already set");
-        this.statusListener = statusListener;
+        statusListeners.remove(statusListener);
     }
 
     private void processEntries()
@@ -132,9 +137,9 @@ public class SessionManager implements Closeable
                 List<StatusMessage> messages = status.getMessages();
                 if ( messages.size() > 0 )
                 {
-                    if ( statusListener != null )
+                    for ( StatusListener listener : statusListeners )
                     {
-                        statusListener.statusUpdate(status.getMessages());
+                        listener.statusUpdate(status.getMessages());
                     }
                     if ( entry.listener != null )
                     {
@@ -144,9 +149,9 @@ public class SessionManager implements Closeable
             }
             else
             {
-                if ( statusListener != null )
+                for ( StatusListener listener : statusListeners )
                 {
-                    statusListener.errorState(status);
+                    listener.errorState(status);
                 }
                 if ( entry.listener != null )
                 {