You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by fp...@apache.org on 2008/10/14 18:44:59 UTC
svn commit: r704578 -
/hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/FLETest.java
Author: fpj
Date: Tue Oct 14 09:44:58 2008
New Revision: 704578
URL: http://svn.apache.org/viewvc?rev=704578&view=rev
Log:
ZOOKEEPER-185
Modified:
hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/FLETest.java
Modified: hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/FLETest.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/FLETest.java?rev=704578&r1=704577&r2=704578&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/FLETest.java (original)
+++ hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/FLETest.java Tue Oct 14 09:44:58 2008
@@ -22,6 +22,7 @@
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Random;
import org.apache.log4j.Logger;
@@ -41,18 +42,40 @@
public class FLETest extends TestCase {
protected static final Logger LOG = Logger.getLogger(FLETest.class);
+ class TestVote{
+ TestVote(int id, long leader){
+ this.leader = leader;
+ this.id = id;
+ }
+
+ long leader;
+ int id;
+ }
+
+ int countVotes(HashSet<TestVote> hs, long id){
+ int counter = 0;
+ for(TestVote v : hs){
+ if(v.leader == id) counter++;
+ }
+
+ return counter;
+ }
+
int count;
int baseport;
int baseLEport;
HashMap<Long,QuorumServer> peers;
ArrayList<LEThread> threads;
+ HashMap<Integer, HashSet<TestVote> > voteMap;
File tmpdir[];
int port[];
+ int successCount;
+ Object finalObj;
volatile Vote votes[];
volatile boolean leaderDies;
volatile long leader = -1;
- volatile int round = 1;
+ //volatile int round = 1;
Random rand = new Random();
@Override
@@ -63,9 +86,12 @@
peers = new HashMap<Long,QuorumServer>(count);
threads = new ArrayList<LEThread>(count);
+ voteMap = new HashMap<Integer, HashSet<TestVote> >();
votes = new Vote[count];
tmpdir = new File[count];
port = new int[count];
+ successCount = 0;
+ finalObj = new Object();
QuorumStats.registerAsConcrete();
LOG.info("SetUp " + getName());
@@ -83,7 +109,7 @@
FastLeaderElection le;
int i;
QuorumPeer peer;
- int peerRound = 1;
+ //int peerRound = 1;
LEThread(QuorumPeer peer, int i) {
this.i = i;
@@ -94,47 +120,140 @@
try {
Vote v = null;
while(true) {
- peer.setPeerState(ServerState.LOOKING);
- LOG.info("Going to call leader election again.");
+ peer.setPeerState(ServerState.LOOKING);
+ LOG.info("Going to call leader election again.");
v = peer.getElectionAlg().lookForLeader();
if(v == null){
LOG.info("Thread " + i + " got a null vote");
break;
}
- peer.setCurrentVote(v);
+
+ /*
+ * A real zookeeper would take care of setting the current vote. Here
+ * we do it manually.
+ */
+ peer.setCurrentVote(v);
LOG.info("Finished election: " + i + ", " + v.id);
votes[i] = v;
+
+ /*
+ * Get the current value of the logical clock for this peer.
+ */
+ int lc = (int) ((FastLeaderElection) peer.getElectionAlg()).getLogicalClock();
+
if (v.id == ((long) i)) {
- LOG.debug("I'm the leader");
+ /*
+ * A leader executes this part of the code. If it is the first leader to be
+ * elected, then it fails right after. Otherwise, it waits until it has enough
+ * followers supporting it.
+ */
+ LOG.info("I'm the leader: " + i);
synchronized(FLETest.this) {
if (leaderDies) {
- LOG.debug("Leader " + i + " dying");
+ LOG.info("Leader " + i + " dying");
leaderDies = false;
((FastLeaderElection) peer.getElectionAlg()).shutdown();
leader = -1;
- LOG.debug("Leader " + i + " dead");
+ LOG.info("Leader " + i + " dead");
+
+ //round++;
+ FLETest.this.notifyAll();
+
+ break;
+
} else {
- leader = i;
+ synchronized(voteMap){
+ if(voteMap.get(lc) == null)
+ voteMap.put(lc, new HashSet<TestVote>());
+ HashSet<TestVote> hs = voteMap.get(lc);
+ hs.add(new TestVote(i, v.id));
+
+ if(countVotes(hs, v.id) > (count/2)){
+ leader = i;
+ LOG.info("Got majority: " + i);
+ } else {
+ voteMap.wait(3000);
+ LOG.info("Notified or expired: " + i);
+ hs = voteMap.get(lc);
+ if(countVotes(hs, v.id) > (count/2)){
+ leader = i;
+ LOG.info("Got majority: " + i);
+ } else {
+ //round++;
+ }
+ }
+ }
+ FLETest.this.notifyAll();
+
+ if(leader == i){
+ synchronized(finalObj){
+ successCount++;
+ if(successCount > (count/2)) finalObj.notify();
+ }
+
+ break;
+ }
}
- round++;
- FLETest.this.notifyAll();
}
- break;
- }
- synchronized(FLETest.this) {
- if (round == ((FastLeaderElection) peer.getElectionAlg()).getLogicalClock()) {
- int tmp_round = round;
- FLETest.this.wait(1000);
- if(tmp_round == round) round++;
+ } else {
+ /*
+ * Followers execute this part. They first add their vote to voteMap, and then
+ * they wait for bounded amount of time. A leader notifies followers through the
+ * FLETest.this object.
+ *
+ * Note that I can get FLETest.this, and then voteMap before adding the vote of
+ * a follower, otherwise a follower would be blocked out until the leader notifies
+ * or leaves the synchronized block on FLEtest.this.
+ */
+
+
+ LOG.info("Logical clock " + ((FastLeaderElection) peer.getElectionAlg()).getLogicalClock());
+ synchronized(voteMap){
+ LOG.info("Voting on " + votes[i].id + ", round " + ((FastLeaderElection) peer.getElectionAlg()).getLogicalClock());
+ if(voteMap.get(lc) == null)
+ voteMap.put(lc, new HashSet<TestVote>());
+ HashSet<TestVote> hs = voteMap.get(lc);
+ hs.add(new TestVote(i, votes[i].id));
+ if(countVotes(hs, votes[i].id) > (count/2)){
+ LOG.info("Logical clock: " + lc + ", " + votes[i].id);
+ voteMap.notify();
+ }
}
- LOG.info("The leader: " + leader + " and my vote " + votes[i].id);
- if (leader == votes[i].id) {
- break;
+
+ /*
+ * In this part a follower waits until the leader notifies it, and remove its
+ * vote if the leader takes too long to respond.
+ */
+ synchronized(FLETest.this){
+ if (leader != votes[i].id) FLETest.this.wait(3000);
+
+ LOG.info("The leader: " + leader + " and my vote " + votes[i].id);
+ synchronized(voteMap){
+ if (leader == votes[i].id) {
+ synchronized(finalObj){
+ successCount++;
+ if(successCount > (count/2)) finalObj.notify();
+ }
+ break;
+ } else {
+ HashSet<TestVote> hs = voteMap.get(lc);
+ TestVote toRemove = null;
+ for(TestVote tv : hs){
+ if(v.id == i){
+ toRemove = tv;
+ break;
+ }
+ }
+ hs.remove(toRemove);
+ }
+ }
}
- peerRound++;
}
- Thread.sleep(rand.nextInt(1000));
+ /*
+ * Add some randomness to the execution.
+ */
+ Thread.sleep(rand.nextInt(500));
peer.setCurrentVote(new Vote(peer.getId(), 0));
}
LOG.debug("Thread " + i + " votes " + v);
@@ -162,32 +281,41 @@
for(int i = 0; i < le.length; i++) {
QuorumPeer peer = new QuorumPeer(peers, tmpdir[i], tmpdir[i], port[i], 3, i, 2, 2, 2);
peer.startLeaderElection();
- //le[i] = new FastLeaderElection(peer, new QuorumCnxManager(peer));
LEThread thread = new LEThread(peer, i);
thread.start();
threads.add(thread);
}
LOG.info("Started threads " + getName());
- for(int i = 0; i < threads.size(); i++) {
- threads.get(i).join(20000);
- if (threads.get(i).isAlive()) {
- fail("Threads didn't join: " + i);
+
+ int waitCounter = 0;
+ synchronized(finalObj){
+ while((successCount <= count/2) && (waitCounter < 50)){
+ finalObj.wait(2000);
+ waitCounter++;
}
}
- long id = votes[0].id;
- for(int i = 1; i < votes.length; i++) {
- if (votes[i] == null) {
- fail("Thread " + i + " had a null vote");
- }
- LOG.info("Final leader info: " + i + ", " + votes[i].id + ", " + id);
- if (votes[i].id != id) {
- if (allowOneBadLeader && votes[i].id == i) {
- allowOneBadLeader = false;
- } else {
- fail("Thread " + i + " got " + votes[i].id + " expected " + id);
- }
+
+ /*
+ * Lists what threads haven-t joined. A thread doesn't join if it hasn't decided
+ * upon a leader yet. It can happen that a peer is slow or disconnected, and it can
+ * take longer to nominate and connect to the current leader.
+ */
+ for(int i = 0; i < threads.size(); i++) {
+ if (threads.get(i).isAlive()) {
+ LOG.info("Threads didn't join: " + i);
}
}
+
+ /*
+ * If we have a majority, then we are good to go.
+ */
+ if(successCount <= count/2){
+ fail("Fewer than a a majority has joined");
+ }
+
+ if(threads.get((int) leader).isAlive()){
+ fail("Leader hasn't joined: " + leader);
+ }
}
}