You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@curator.apache.org by ra...@apache.org on 2014/03/10 01:09:41 UTC
git commit: leader tests
Repository: curator
Updated Branches:
refs/heads/CURATOR-88 fbd9c39ca -> 7e768f0bd
leader tests
Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/7e768f0b
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/7e768f0b
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/7e768f0b
Branch: refs/heads/CURATOR-88
Commit: 7e768f0bd5d2c022924a7e858d8af9c69ba769e3
Parents: fbd9c39
Author: randgalt <ra...@apache.org>
Authored: Sun Mar 9 19:09:27 2014 -0500
Committer: randgalt <ra...@apache.org>
Committed: Sun Mar 9 19:09:27 2014 -0500
----------------------------------------------------------------------
.../recipes/leader/TestLeaderLatch.java | 2 +-
.../apache/curator/x/rest/api/TestLeader.java | 99 ++++++++++++++++++++
.../apache/curator/x/rest/api/TestLocks.java | 2 +-
.../x/rest/support/LeaderLatchBridge.java | 95 +++++++++++++++++++
.../curator/x/rest/support/SessionManager.java | 23 +++--
5 files changed, 210 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/curator/blob/7e768f0b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java
index f4b5590..79ef7ba 100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java
@@ -373,7 +373,7 @@ public class TestLeaderLatch extends BaseClassForTests
private void basic(Mode mode) throws Exception
{
- final int PARTICIPANT_QTY = 1;//0;
+ final int PARTICIPANT_QTY = 10;
List<LeaderLatch> latches = Lists.newArrayList();
http://git-wip-us.apache.org/repos/asf/curator/blob/7e768f0b/curator-x-rest/src/test/java/org/apache/curator/x/rest/api/TestLeader.java
----------------------------------------------------------------------
diff --git a/curator-x-rest/src/test/java/org/apache/curator/x/rest/api/TestLeader.java b/curator-x-rest/src/test/java/org/apache/curator/x/rest/api/TestLeader.java
new file mode 100644
index 0000000..b179b8b
--- /dev/null
+++ b/curator-x-rest/src/test/java/org/apache/curator/x/rest/api/TestLeader.java
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.curator.x.rest.api;
+
+import com.google.common.collect.Lists;
+import org.apache.curator.test.Timing;
+import org.apache.curator.utils.CloseableUtils;
+import org.apache.curator.x.rest.support.BaseClassForTests;
+import org.apache.curator.x.rest.support.LeaderLatchBridge;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+import java.util.Collection;
+import java.util.List;
+
+public class TestLeader extends BaseClassForTests
+{
+ private static final int MAX_LOOPS = 5;
+
+ @Test
+ public void testBasic() throws Exception
+ {
+ final String PATH_NAME = "/leader";
+ final int PARTICIPANT_QTY = 10;
+
+ List<LeaderLatchBridge> latches = Lists.newArrayList();
+
+ Timing timing = new Timing();
+ try
+ {
+ for ( int i = 0; i < PARTICIPANT_QTY; ++i )
+ {
+ LeaderLatchBridge latch = new LeaderLatchBridge(restClient, sessionManager, uriMaker, PATH_NAME);
+ latch.start();
+ latches.add(latch);
+ }
+
+ while ( latches.size() > 0 )
+ {
+ List<LeaderLatchBridge> leaders = waitForALeader(latches, timing);
+ Assert.assertEquals(leaders.size(), 1); // there can only be one leader
+ LeaderLatchBridge theLeader = leaders.get(0);
+ Assert.assertEquals(latches.indexOf(theLeader), 0); // assert ordering - leadership should advance in start order
+ theLeader.close();
+ latches.remove(theLeader);
+ }
+ }
+ finally
+ {
+ for ( LeaderLatchBridge latch : latches )
+ {
+ CloseableUtils.closeQuietly(latch);
+ }
+ }
+ }
+
+ private List<LeaderLatchBridge> waitForALeader(List<LeaderLatchBridge> latches, Timing timing) throws InterruptedException
+ {
+ for ( int i = 0; i < MAX_LOOPS; ++i )
+ {
+ List<LeaderLatchBridge> leaders = getLeaders(latches);
+ if ( leaders.size() != 0 )
+ {
+ return leaders;
+ }
+ timing.sleepABit();
+ }
+ return Lists.newArrayList();
+ }
+
+ private List<LeaderLatchBridge> getLeaders(Collection<LeaderLatchBridge> latches)
+ {
+ List<LeaderLatchBridge> leaders = Lists.newArrayList();
+ for ( LeaderLatchBridge latch : latches )
+ {
+ if ( latch.hasLeadership() )
+ {
+ leaders.add(latch);
+ }
+ }
+ return leaders;
+ }
+}
http://git-wip-us.apache.org/repos/asf/curator/blob/7e768f0b/curator-x-rest/src/test/java/org/apache/curator/x/rest/api/TestLocks.java
----------------------------------------------------------------------
diff --git a/curator-x-rest/src/test/java/org/apache/curator/x/rest/api/TestLocks.java b/curator-x-rest/src/test/java/org/apache/curator/x/rest/api/TestLocks.java
index 7ea6245..308e0c4 100644
--- a/curator-x-rest/src/test/java/org/apache/curator/x/rest/api/TestLocks.java
+++ b/curator-x-rest/src/test/java/org/apache/curator/x/rest/api/TestLocks.java
@@ -171,7 +171,7 @@ public class TestLocks extends BaseClassForTests
}
}
};
- sessionManager.setStatusListener(statusListener);
+ sessionManager.addStatusListener(statusListener);
final AtomicBoolean isFirst = new AtomicBoolean(true);
ExecutorCompletionService<Object> service = new ExecutorCompletionService<Object>(Executors.newFixedThreadPool(2));
http://git-wip-us.apache.org/repos/asf/curator/blob/7e768f0b/curator-x-rest/src/test/java/org/apache/curator/x/rest/support/LeaderLatchBridge.java
----------------------------------------------------------------------
diff --git a/curator-x-rest/src/test/java/org/apache/curator/x/rest/support/LeaderLatchBridge.java b/curator-x-rest/src/test/java/org/apache/curator/x/rest/support/LeaderLatchBridge.java
new file mode 100644
index 0000000..ea03803
--- /dev/null
+++ b/curator-x-rest/src/test/java/org/apache/curator/x/rest/support/LeaderLatchBridge.java
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.curator.x.rest.support;
+
+import com.sun.jersey.api.client.Client;
+import org.apache.curator.x.rest.api.LeaderResource;
+import org.apache.curator.x.rest.entities.Id;
+import org.apache.curator.x.rest.entities.LeaderSpec;
+import org.apache.curator.x.rest.entities.Status;
+import org.apache.curator.x.rest.entities.StatusMessage;
+import javax.ws.rs.core.MediaType;
+import java.io.Closeable;
+import java.io.IOException;
+import java.net.URI;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public class LeaderLatchBridge implements Closeable, StatusListener
+{
+ private final Client restClient;
+ private final SessionManager sessionManager;
+ private final UriMaker uriMaker;
+ private final String path;
+ private final AtomicBoolean hasLeadership = new AtomicBoolean(false);
+
+ private volatile String id = null;
+
+ public LeaderLatchBridge(Client restClient, SessionManager sessionManager, UriMaker uriMaker, String path)
+ {
+ this.restClient = restClient;
+ this.sessionManager = sessionManager;
+ this.uriMaker = uriMaker;
+ this.path = path;
+ }
+
+ @Override
+ public void close() throws IOException
+ {
+ hasLeadership.set(false);
+ sessionManager.removeStatusListener(this);
+ sessionManager.removeEntry(uriMaker.getLocalhost(), id);
+ URI uri = uriMaker.getMethodUri(LeaderResource.class, null);
+ restClient.resource(uri).path(id).delete();
+ }
+
+ public boolean hasLeadership()
+ {
+ return hasLeadership.get();
+ }
+
+ public void start()
+ {
+ LeaderSpec leaderSpec = new LeaderSpec();
+ leaderSpec.setPath(path);
+ URI uri = uriMaker.getMethodUri(LeaderResource.class, null);
+ id = restClient.resource(uri).type(MediaType.APPLICATION_JSON_TYPE).post(Id.class, leaderSpec).getId();
+ sessionManager.addEntry(uriMaker.getLocalhost(), id, null);
+ sessionManager.addStatusListener(this);
+ }
+
+ @Override
+ public void statusUpdate(List<StatusMessage> messages)
+ {
+ for ( StatusMessage statusMessage : messages )
+ {
+ if ( statusMessage.getType().equals("leader") && statusMessage.getSourceId().equals(id) )
+ {
+ hasLeadership.set(Boolean.valueOf(statusMessage.getMessage()));
+ }
+ }
+ }
+
+ @Override
+ public void errorState(Status status)
+ {
+ hasLeadership.set(false);
+ }
+}
http://git-wip-us.apache.org/repos/asf/curator/blob/7e768f0b/curator-x-rest/src/test/java/org/apache/curator/x/rest/support/SessionManager.java
----------------------------------------------------------------------
diff --git a/curator-x-rest/src/test/java/org/apache/curator/x/rest/support/SessionManager.java b/curator-x-rest/src/test/java/org/apache/curator/x/rest/support/SessionManager.java
index 140569e..10fc780 100644
--- a/curator-x-rest/src/test/java/org/apache/curator/x/rest/support/SessionManager.java
+++ b/curator-x-rest/src/test/java/org/apache/curator/x/rest/support/SessionManager.java
@@ -23,6 +23,7 @@ import com.google.common.base.Equivalence;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.sun.jersey.api.client.Client;
import org.apache.curator.x.rest.api.ClientResource;
@@ -36,6 +37,7 @@ import java.net.URI;
import java.util.List;
import java.util.Map;
import java.util.Queue;
+import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executors;
@@ -48,8 +50,7 @@ public class SessionManager implements Closeable
private final Client restClient;
private final Future<?> task;
private final ConcurrentMap<InetSocketAddress, Entry> entries = Maps.newConcurrentMap();
-
- private volatile StatusListener statusListener;
+ private final Set<StatusListener> statusListeners = Sets.newSetFromMap(Maps.<StatusListener, Boolean>newConcurrentMap());
private static class Entry
{
@@ -106,10 +107,14 @@ public class SessionManager implements Closeable
task.cancel(true);
}
- public void setStatusListener(StatusListener statusListener)
+ public void addStatusListener(StatusListener statusListener)
+ {
+ statusListeners.add(statusListener);
+ }
+
+ public void removeStatusListener(StatusListener statusListener)
{
- Preconditions.checkState(this.statusListener == null, "statusListener already set");
- this.statusListener = statusListener;
+ statusListeners.remove(statusListener);
}
private void processEntries()
@@ -132,9 +137,9 @@ public class SessionManager implements Closeable
List<StatusMessage> messages = status.getMessages();
if ( messages.size() > 0 )
{
- if ( statusListener != null )
+ for ( StatusListener listener : statusListeners )
{
- statusListener.statusUpdate(status.getMessages());
+ listener.statusUpdate(status.getMessages());
}
if ( entry.listener != null )
{
@@ -144,9 +149,9 @@ public class SessionManager implements Closeable
}
else
{
- if ( statusListener != null )
+ for ( StatusListener listener : statusListeners )
{
- statusListener.errorState(status);
+ listener.errorState(status);
}
if ( entry.listener != null )
{