You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by br...@apache.org on 2008/10/01 10:40:03 UTC

svn commit: r700714 - in /hadoop/zookeeper/trunk: ./ src/java/jmx/org/apache/zookeeper/server/quorum/ src/java/main/org/apache/zookeeper/server/quorum/ src/java/test/org/apache/zookeeper/test/

Author: breed
Date: Wed Oct  1 01:40:02 2008
New Revision: 700714

URL: http://svn.apache.org/viewvc?rev=700714&view=rev
Log:
ZOOKEEPER-127.  Use of non-standard election ports in config breaks services

Added:
    hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/FLETest.java
Modified:
    hadoop/zookeeper/trunk/CHANGES.txt
    hadoop/zookeeper/trunk/src/java/jmx/org/apache/zookeeper/server/quorum/ManagedQuorumPeer.java
    hadoop/zookeeper/trunk/src/java/jmx/org/apache/zookeeper/server/quorum/ManagedQuorumPeerMain.java
    hadoop/zookeeper/trunk/src/java/jmx/org/apache/zookeeper/server/quorum/ObservableQuorumPeer.java
    hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/AuthFastLeaderElection.java
    hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Election.java
    hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FastLeaderElection.java
    hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Follower.java
    hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LeaderElection.java
    hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java
    hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java
    hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeerConfig.java
    hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeerMain.java
    hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/LETest.java
    hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/QuorumTest.java

Modified: hadoop/zookeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/CHANGES.txt?rev=700714&r1=700713&r2=700714&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/CHANGES.txt (original)
+++ hadoop/zookeeper/trunk/CHANGES.txt Wed Oct  1 01:40:02 2008
@@ -99,3 +99,6 @@
 
  ZOOKEEPER-38. headers (version+) in log/snap files (Andrew Kornev and Mahadev
  Konar via breed)
+
+ ZOOKEEPER-127.  Use of non-standard election ports in config breaks services (Mark
+ Harwood and Flavio Junqueira via breed)

Modified: hadoop/zookeeper/trunk/src/java/jmx/org/apache/zookeeper/server/quorum/ManagedQuorumPeer.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/jmx/org/apache/zookeeper/server/quorum/ManagedQuorumPeer.java?rev=700714&r1=700713&r2=700714&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/jmx/org/apache/zookeeper/server/quorum/ManagedQuorumPeer.java (original)
+++ hadoop/zookeeper/trunk/src/java/jmx/org/apache/zookeeper/server/quorum/ManagedQuorumPeer.java Wed Oct  1 01:40:02 2008
@@ -20,7 +20,7 @@
 
 import java.io.File;
 import java.io.IOException;
-import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.log4j.Logger;
@@ -139,7 +139,7 @@
                     "Starting quorum peer");
             quorumBean=new QuorumBean(qp);
             MBeanRegistry.getInstance().register(quorumBean, null);
-            for(QuorumServer s: qp.quorumPeers){
+            for(QuorumServer s: qp.quorumPeers.values()){
                 ZKMBeanInfo p;
                 if(qp.getId()==s.id)
                     p=localPeerBean=new LocalPeerBean(qp);
@@ -209,21 +209,16 @@
         setupObservers();
     }
 
-    public ManagedQuorumPeer(ArrayList<QuorumServer> quorumPeers, 
-            File dataDir, File dataLogDir, int clientPort, 
-            int electionAlg, int electionPort, long myid, int tickTime, int initLimit,
-            int syncLimit) throws IOException {
-        super(quorumPeers, dataDir, dataLogDir, clientPort, 
-                electionAlg, electionPort, myid, tickTime, initLimit, syncLimit);
+    public ManagedQuorumPeer(HashMap<Long,QuorumServer> quorumPeers, File dataDir, File dataLogDir, int clientPort, int electionAlg, long myid, int tickTime, int initLimit,
+                                int syncLimit) throws IOException {
+        super(quorumPeers, dataDir, dataLogDir, clientPort, electionAlg, myid, tickTime, initLimit, 
+syncLimit);
         setupObservers();
     }
 
-    public ManagedQuorumPeer(ArrayList<QuorumServer> quorumPeers, 
-            File dataDir, File dataLogDir, int electionType, int electionPort,
-            long myid, int tickTime, int initLimit, int syncLimit,
-            NIOServerCnxn.Factory cnxnFactory) throws IOException {
-        super(quorumPeers, dataDir, dataLogDir, electionType, electionPort,
-                myid, tickTime, initLimit, syncLimit, cnxnFactory);
+    public ManagedQuorumPeer(HashMap<Long,QuorumServer> quorumPeers, File dataDir, File dataLogDir, int electionType, long myid, int tickTime, int initLimit, int syncLimit,
+                                NIOServerCnxn.Factory cnxnFactory) throws IOException {
+        super(quorumPeers, dataDir, dataLogDir, electionType, myid, tickTime, initLimit, syncLimit, cnxnFactory);
         setupObservers();
     }
 

Modified: hadoop/zookeeper/trunk/src/java/jmx/org/apache/zookeeper/server/quorum/ManagedQuorumPeerMain.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/jmx/org/apache/zookeeper/server/quorum/ManagedQuorumPeerMain.java?rev=700714&r1=700713&r2=700714&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/jmx/org/apache/zookeeper/server/quorum/ManagedQuorumPeerMain.java (original)
+++ hadoop/zookeeper/trunk/src/java/jmx/org/apache/zookeeper/server/quorum/ManagedQuorumPeerMain.java Wed Oct  1 01:40:02 2008
@@ -98,7 +98,6 @@
                                     File(ServerConfig.getDataDir()));
                     peer.setTxnFactory(factory);
                     peer.setQuorumPeers(QuorumPeerConfig.getServers());
-                    peer.setElectionPort(QuorumPeerConfig.getElectionPort());
                     peer.setElectionType(QuorumPeerConfig.getElectionAlg());
                     peer.setMyid(QuorumPeerConfig.getServerId());
                     peer.setTickTime(QuorumPeerConfig.getTickTime());

Modified: hadoop/zookeeper/trunk/src/java/jmx/org/apache/zookeeper/server/quorum/ObservableQuorumPeer.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/jmx/org/apache/zookeeper/server/quorum/ObservableQuorumPeer.java?rev=700714&r1=700713&r2=700714&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/jmx/org/apache/zookeeper/server/quorum/ObservableQuorumPeer.java (original)
+++ hadoop/zookeeper/trunk/src/java/jmx/org/apache/zookeeper/server/quorum/ObservableQuorumPeer.java Wed Oct  1 01:40:02 2008
@@ -20,7 +20,8 @@
 
 import java.io.File;
 import java.io.IOException;
-import java.util.ArrayList;
+import java.util.HashMap;
+import java.net.InetSocketAddress;
 
 import org.apache.zookeeper.server.NIOServerCnxn;
 import org.apache.zookeeper.server.ZooKeeperServer;
@@ -63,21 +64,14 @@
         super();
     }
 
-    public ObservableQuorumPeer(ArrayList<QuorumServer> quorumPeers, File dataDir,
-            File dataLogDir, int clientPort, int electionAlg,
-            int electionPort, long myid, int tickTime, int initLimit,
-            int syncLimit) throws IOException {
-        super(quorumPeers, dataDir, dataLogDir, clientPort, 
-                electionAlg, electionPort, myid, tickTime, initLimit, syncLimit);
+    public ObservableQuorumPeer(HashMap<Long,QuorumServer> quorumPeers, File dataDir, File dataLogDir, int clientPort, int electionAlg, long myid, int tickTime, int initLimit,
+                                int syncLimit) throws IOException {
+        super(quorumPeers, dataDir, dataLogDir, clientPort, electionAlg, myid, tickTime, initLimit, syncLimit);
     }
 
-    public ObservableQuorumPeer(ArrayList<QuorumServer> quorumPeers,
-            File dataDir, File dataLogDir, int electionType, 
-            int electionPort, long myid, int tickTime, 
-            int initLimit, int syncLimit,
-            NIOServerCnxn.Factory cnxnFactory) throws IOException {
-        super(quorumPeers, dataDir, dataLogDir, electionType, electionPort,
-                myid, tickTime, initLimit, syncLimit, cnxnFactory);
+    public ObservableQuorumPeer(HashMap<Long,QuorumServer> quorumPeers, File dataDir, File dataLogDir, int electionType, long myid, int tickTime, int initLimit, int syncLimit,
+                                NIOServerCnxn.Factory cnxnFactory) throws IOException {
+        super(quorumPeers, dataDir, dataLogDir, electionType, myid, tickTime, initLimit, syncLimit, cnxnFactory);
     }
 
 

Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/AuthFastLeaderElection.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/AuthFastLeaderElection.java?rev=700714&r1=700713&r2=700714&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/AuthFastLeaderElection.java (original)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/AuthFastLeaderElection.java Wed Oct  1 01:40:02 2008
@@ -670,7 +670,7 @@
                 t.start();
             }
 
-            for (QuorumServer server : self.quorumPeers) {
+            for (QuorumServer server : self.quorumPeers.values()) {
                 InetSocketAddress saddr = new InetSocketAddress(server.addr
                         .getAddress(), port);
                 addrChallengeMap.put(saddr, new HashMap<Long, Long>());
@@ -690,19 +690,19 @@
     long proposedLeader;
     long proposedZxid;
 
-    public AuthFastLeaderElection(QuorumPeer self, int electionPort,
+    public AuthFastLeaderElection(QuorumPeer self,
             boolean auth) {
         this.authEnabled = auth;
-        starter(self, electionPort);
+        starter(self);
     }
 
-    public AuthFastLeaderElection(QuorumPeer self, int electionPort) {
-        starter(self, electionPort);
+    public AuthFastLeaderElection(QuorumPeer self) {
+        starter(self);
     }
 
-    private void starter(QuorumPeer self, int electionPort) {
+    private void starter(QuorumPeer self) {
         this.self = self;
-        port = electionPort;
+        port = self.quorumPeers.get(self.getId()).electionAddr.getPort();
         proposedLeader = -1;
         proposedZxid = -1;
 
@@ -726,14 +726,14 @@
     }
 
     private void sendNotifications() {
-        for (QuorumServer server : self.quorumPeers) {
-            InetSocketAddress saddr = new InetSocketAddress(server.addr
-                    .getAddress(), port);
+        for (QuorumServer server : self.quorumPeers.values()) {
+            //InetSocketAddress saddr = new InetSocketAddress(server.addr
+            //        .getAddress(), port);
 
             ToSend notmsg = new ToSend(ToSend.mType.notification,
                     AuthFastLeaderElection.sequencer++, proposedLeader,
                     proposedZxid, logicalclock, QuorumPeer.ServerState.LOOKING,
-                    saddr);
+                    self.quorumPeers.get(server.id).electionAddr);
 
             sendqueue.offer(notmsg);
         }

Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Election.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Election.java?rev=700714&r1=700713&r2=700714&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Election.java (original)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Election.java Wed Oct  1 01:40:02 2008
@@ -21,6 +21,6 @@
 
 import org.apache.zookeeper.server.quorum.Vote;
 
-interface Election {
+public interface Election {
     public Vote lookForLeader() throws InterruptedException;
 }
\ No newline at end of file

Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FastLeaderElection.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FastLeaderElection.java?rev=700714&r1=700713&r2=700714&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FastLeaderElection.java (original)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FastLeaderElection.java Wed Oct  1 01:40:02 2008
@@ -54,7 +54,7 @@
      * once it believes that it has reached the end of
      * leader election.
      */
-    static int finalizeWait = 100;
+    static int finalizeWait = 200;
 
     /**
 	 * Challenge counter to avoid replay attacks
@@ -103,7 +103,7 @@
         /*
          * Address of sender
          */
-        InetAddress addr;
+        long sid;
     }
 
     /**
@@ -119,13 +119,13 @@
         		long zxid, 
         		long epoch, 
         		ServerState state,
-        		InetAddress addr) {
+        		long sid) {
         
         	this.leader = leader;
         	this.zxid = zxid;
         	this.epoch = epoch;
         	this.state = state;
-        	this.addr = addr;
+        	this.sid = sid;
         }
         
         /*
@@ -151,7 +151,7 @@
         /*
          * Address of recipient
          */
-        InetAddress addr;
+        long sid;
     }
 
     LinkedBlockingQueue<ToSend> sendqueue;
@@ -165,10 +165,6 @@
      */
     
     private class Messenger {
-    	
-        long lastProposedLeader;
-        long lastProposedZxid;
-        long lastEpoch;
         
         /**
          * Receives messages from instance of QuorumCnxManager on
@@ -206,10 +202,10 @@
             				ackstate = QuorumPeer.ServerState.LOOKING;
             				break;
             			case 1:
-            				ackstate = QuorumPeer.ServerState.LEADING;
+            				ackstate = QuorumPeer.ServerState.FOLLOWING;
             				break;
             			case 2:
-            				ackstate = QuorumPeer.ServerState.FOLLOWING;
+            				ackstate = QuorumPeer.ServerState.LEADING;
             				break;
             			}
                     	
@@ -219,22 +215,7 @@
             			n.zxid = response.buffer.getLong();
             			n.epoch = response.buffer.getLong();
             			n.state = ackstate;
-            			n.addr = response.addr;
-
-            			/*
-            			 * Accept the values of this notification
-            			 * if we are at right epoch and the new notification
-            			 * contains a vote that succeeds our current vote
-            			 * in our order of votes.
-            			 */
-            			if ((messenger.lastEpoch <= n.epoch)
-            					&& ((n.zxid > messenger.lastProposedZxid) 
-            					|| ((n.zxid == messenger.lastProposedZxid) 
-            					&& (n.leader > messenger.lastProposedLeader)))) {
-            				messenger.lastProposedZxid = n.zxid;
-            				messenger.lastProposedLeader = n.leader;
-            				messenger.lastEpoch = n.epoch;
-            			}
+            			n.sid = response.sid;
 
             			/*
             			 * If this server is looking, then send proposed leader
@@ -242,7 +223,7 @@
 
             			if(self.getPeerState() == QuorumPeer.ServerState.LOOKING){
             				recvqueue.offer(n);
-            				if(recvqueue.size() == 0) LOG.debug("Message: " + n.addr);
+            				if(recvqueue.size() == 0) LOG.debug("Message: " + n.sid);
             				/*
             				 * Send a notification back if the peer that sent this
             				 * message is also looking and its logical clock is 
@@ -250,12 +231,13 @@
             				 */
             				if((ackstate == QuorumPeer.ServerState.LOOKING)
             						&& (n.epoch < logicalclock)){
+            				    Vote v = getVote();
             					ToSend notmsg = new ToSend(ToSend.mType.notification, 
-                						proposedLeader, 
-                						proposedZxid,
+                						v.id, 
+                						v.zxid,
                 						logicalclock,
                 						self.getPeerState(),
-                						response.addr);
+                						response.sid);
                 				sendqueue.offer(notmsg);
             				}
             			} else {
@@ -266,12 +248,14 @@
             			    Vote current = self.getCurrentVote();
             			    if(ackstate == QuorumPeer.ServerState.LOOKING){
 
-            			        ToSend notmsg = new ToSend(ToSend.mType.notification, 
+            			        
+            			        ToSend notmsg = new ToSend(
+            			                ToSend.mType.notification, 
             			                current.id, 
             			                current.zxid,
             			                logicalclock,
             			                self.getPeerState(),
-            			                response.addr);
+            			                response.sid);
             			        sendqueue.offer(notmsg);
             				}
             			}
@@ -329,7 +313,7 @@
                 requestBuffer.putLong(m.zxid);
                 requestBuffer.putLong(m.epoch);
                 
-                manager.toSend(m.addr, requestBuffer);
+                manager.toSend(m.sid, requestBuffer);
                   
             }
         }
@@ -347,9 +331,6 @@
          * @param manager   Connection manager
          */
         Messenger(QuorumCnxManager manager) {
-            lastProposedLeader = 0;
-            lastProposedZxid = 0;
-            lastEpoch = 0;
 
             Thread t = new Thread(new WorkerSender(manager),
             		"WorkerSender Thread");
@@ -371,6 +352,13 @@
     long proposedLeader;
     long proposedZxid;
 
+
+    /**
+     * Returns the current vlue of the logical clock counter
+     */
+    public long getLogicalClock(){
+	return logicalclock;
+    }
     
     /**
      * Constructor of FastLeaderElection. It takes two parameters, one
@@ -410,20 +398,23 @@
         recvqueue.clear();
     }
 
+    public void shutdown(){
+        manager.halt();
+    }
 
     /**
      * Send notifications to all peers upon a change in our vote
      */
     private void sendNotifications() {
-        for (QuorumServer server : self.quorumPeers) {
-            InetAddress saddr = server.addr.getAddress();
+        for (QuorumServer server : self.quorumPeers.values()) {
+            long sid = server.id;
 
             ToSend notmsg = new ToSend(ToSend.mType.notification, 
             		proposedLeader, 
             		proposedZxid,
                     logicalclock,
                     QuorumPeer.ServerState.LOOKING,
-                    saddr);
+                    sid);
 
             sendqueue.offer(notmsg);
         }
@@ -454,7 +445,7 @@
      *  @param zxid     zxid of the the vote received last
      */
     private boolean termPredicate(
-            HashMap<InetAddress, Vote> votes, long l,
+            HashMap<Long, Vote> votes, long l,
             long zxid) {
 
         int count = 0;
@@ -475,21 +466,30 @@
 
     }
 
+    synchronized void updateProposal(long leader, long zxid){
+        proposedLeader = leader;
+        proposedZxid = zxid;
+    }
+    
+    synchronized Vote getVote(){
+        return new Vote(proposedLeader, proposedZxid);
+    }
+    
     /**
      * Starts a new round of leader election. Whenever our QuorumPeer 
      * changes its state to LOOKING, this method is invoked, and it 
      * sends notifications to al other peers.
      */
     public Vote lookForLeader() throws InterruptedException {
-        HashMap<InetAddress, Vote> recvset = new HashMap<InetAddress, Vote>();
-
-        HashMap<InetAddress, Vote> outofelection = new HashMap<InetAddress, Vote>();
+        HashMap<Long, Vote> recvset = new HashMap<Long, Vote>();
 
-        logicalclock++;
-
-        proposedLeader = self.getId();
-        proposedZxid = self.getLastLoggedZxid();
+        HashMap<Long, Vote> outofelection = new HashMap<Long, Vote>();
 
+        synchronized(this){
+            logicalclock++;
+            updateProposal(self.getId(), self.getLastLoggedZxid());
+        }
+        
         LOG.warn("New election: " + proposedZxid);
         sendNotifications();
 
@@ -515,48 +515,47 @@
             }
             else switch (n.state) {
             case LOOKING:
-            	// If notification > current, replace and send messages out
-            	if (n.epoch > logicalclock) {
+                // If notification > current, replace and send messages out
+                LOG.info("Notification: " + n.leader + ", " + n.zxid + ", " + 
+                        n.epoch + ", " + self.getId() + ", " + self.getPeerState() + 
+                        ", " + n.state + ", " + n.sid);
+                if (n.epoch > logicalclock) {
                     logicalclock = n.epoch;
                     recvset.clear();
-                    if(totalOrderPredicate(n.leader, n.zxid)){
-                    	proposedLeader = n.leader;
-                        proposedZxid = n.zxid;
-                    }
+                    updateProposal(n.leader, n.zxid);
                     sendNotifications();
                 } else if (n.epoch < logicalclock) {
-                	break;
+                    break;
                 } else if (totalOrderPredicate(n.leader, n.zxid)) {
-                	proposedLeader = n.leader;
-                    proposedZxid = n.zxid;
-
+                    updateProposal(n.leader, n.zxid);
                     sendNotifications();
                 }
-
-                recvset.put(n.addr, new Vote(n.leader,
-                        n.zxid));
+                
+                recvset.put(n.sid, new Vote(n.leader, n.zxid));
 
                 //If have received from all nodes, then terminate
                 if (self.quorumPeers.size() == recvset.size()) {
                     self.setPeerState((proposedLeader == self.getId()) ? 
-                    		ServerState.LEADING: ServerState.FOLLOWING);
+                            ServerState.LEADING: ServerState.FOLLOWING);
                     leaveInstance();
                     return new Vote(proposedLeader, proposedZxid);
 
                 } else if (termPredicate(recvset, proposedLeader, proposedZxid)) {
                     //Otherwise, wait for a fixed amount of time
                     LOG.debug("Passed predicate");
-                    Thread.sleep(finalizeWait);
 
                     // Verify if there is any change in the proposed leader
-                    while ((!recvqueue.isEmpty())
-                            && !totalOrderPredicate(recvqueue.peek().leader,
-                                    recvqueue.peek().zxid)) {
-                        recvqueue.poll();
+                    while((n = recvqueue.poll(finalizeWait, TimeUnit.MILLISECONDS)) != null){
+                        if(totalOrderPredicate(n.leader, n.zxid)){
+                            recvqueue.put(n);
+                            break;
+                        }
                     }
-                    if (recvqueue.isEmpty()) {
+                    
+                    if (n == null) {
                         self.setPeerState((proposedLeader == self.getId()) ? 
-                        		ServerState.LEADING: ServerState.FOLLOWING);
+                                ServerState.LEADING: ServerState.FOLLOWING);
+                        LOG.info("About to leave instance:" + proposedLeader + ", " + proposedZxid + ", " + self.getId() + ", " + self.getPeerState());
                         leaveInstance();
                         return new Vote(proposedLeader,
                                 proposedZxid);
@@ -564,24 +563,18 @@
                 }
                 break;
             case LEADING:
-                outofelection.put(n.addr, new Vote(n.leader, n.zxid));
-
-                if (termPredicate(outofelection, n.leader, n.zxid)) {
-                    
-                    self.setPeerState((n.leader == self.getId()) ? 
-                    		ServerState.LEADING: ServerState.FOLLOWING);
-
-                    leaveInstance();
-                    return new Vote(n.leader, n.zxid);
-                }
-                break;
+                leaveInstance();
+                return new Vote(n.leader, n.zxid);
             case FOLLOWING:
-                outofelection.put(n.addr, new Vote(n.leader, n.zxid));
+                LOG.info("Notification: " + n.leader + ", " + n.zxid + ", " + n.epoch + ", " + self.getId() + ", " + self.getPeerState() + ", " + n.state + ", " + n.sid);
+              
+                if(n.epoch >= logicalclock) 
+                    outofelection.put(n.sid, new Vote(n.leader, n.zxid));
 
                 if (termPredicate(outofelection, n.leader, n.zxid)) {
                     
                     self.setPeerState((n.leader == self.getId()) ? 
-                    		ServerState.LEADING: ServerState.FOLLOWING);
+                            ServerState.LEADING: ServerState.FOLLOWING);
 
                     leaveInstance();
                     return new Vote(n.leader, n.zxid);

Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Follower.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Follower.java?rev=700714&r1=700713&r2=700714&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Follower.java (original)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Follower.java Wed Oct  1 01:40:02 2008
@@ -114,7 +114,7 @@
         InetSocketAddress addr = null;
         // Find the leader by id
         Vote current = self.getCurrentVote();
-        for (QuorumServer s : self.quorumPeers) {
+        for (QuorumServer s : self.quorumPeers.values()) {
             if (s.id == current.id) {
                 addr = s.addr;
                 break;
@@ -269,7 +269,7 @@
                 }
             }
         } catch (IOException e) {
-            e.printStackTrace();
+            LOG.warn("Exception when following the leader", e);
             try {
                 sock.close();
             } catch (IOException e1) {

Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LeaderElection.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LeaderElection.java?rev=700714&r1=700713&r2=700714&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LeaderElection.java (original)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LeaderElection.java Wed Oct  1 01:40:02 2008
@@ -137,8 +137,9 @@
             requestBuffer.putInt(xid);
             requestPacket.setLength(4);
             HashSet<Long> heardFrom = new HashSet<Long>();
-            for (QuorumServer server : self.quorumPeers) {
+            for (QuorumServer server : self.quorumPeers.values()) {
                 requestPacket.setSocketAddress(server.addr);
+                LOG.warn("Server address: " + server.addr);
                 try {
                     s.send(requestPacket);
                     responsePacket.setLength(responseBytes.length);

Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java?rev=700714&r1=700713&r2=700714&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java (original)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java Wed Oct  1 01:40:02 2008
@@ -28,6 +28,7 @@
 import java.util.HashMap;
 import java.util.Random;
 import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.log4j.Logger;
 
@@ -51,7 +52,7 @@
  * 
  */
 
-class QuorumCnxManager {
+public class QuorumCnxManager {
     private static final Logger LOG = Logger.getLogger(QuorumCnxManager.class);
 
     /*
@@ -84,13 +85,13 @@
     /*
      * Local IP address
      */
-    InetAddress localIP;
+    QuorumPeer self;
 
     /*
      * Mapping from Peer to Thread number
      */
-    HashMap<InetAddress, SendWorker> senderWorkerMap;
-    HashMap<InetAddress, ArrayBlockingQueue<ByteBuffer>> queueSendMap;
+    ConcurrentHashMap<Long, SendWorker> senderWorkerMap;
+    ConcurrentHashMap<Long, ArrayBlockingQueue<ByteBuffer>> queueSendMap;
 
     /*
      * Reception queue
@@ -109,26 +110,21 @@
     Listener listener;
 
     static class Message {
-        Message(ByteBuffer buffer, InetAddress addr) {
+        Message(ByteBuffer buffer, long sid) {
             this.buffer = buffer;
-            this.addr = addr;
+            this.sid = sid;
         }
 
         ByteBuffer buffer;
-        InetAddress addr;
+        long sid;
     }
 
-    QuorumCnxManager(int port) {
+    public QuorumCnxManager(QuorumPeer self) {
         this.port = port;
         this.recvQueue = new ArrayBlockingQueue<Message>(CAPACITY);
-        this.queueSendMap = new HashMap<InetAddress, ArrayBlockingQueue<ByteBuffer>>();
-        this.senderWorkerMap = new HashMap<InetAddress, SendWorker>();
-
-        try {
-            localIP = InetAddress.getLocalHost();
-        } catch (UnknownHostException e) {
-            LOG.warn("Couldn't get local address");
-        }
+        this.queueSendMap = new ConcurrentHashMap<Long, ArrayBlockingQueue<ByteBuffer>>();
+        this.senderWorkerMap = new ConcurrentHashMap<Long, SendWorker>();
+        this.self = self;
 
         // Generates a challenge to guarantee one connection between pairs of
         // servers
@@ -140,10 +136,15 @@
     }
 
     void genChallenge() {
-        Random rand = new Random(System.currentTimeMillis()
-                + localIP.hashCode());
-        long newValue = rand.nextLong();
-        challenge = newValue;
+        try{
+            Random rand = new Random(System.currentTimeMillis()
+                + InetAddress.getLocalHost().hashCode());
+            long newValue = rand.nextLong();
+            challenge = newValue;
+        } catch(UnknownHostException e){
+            LOG.error("Cannot resolve local address");
+            challenge = 0;
+        }
     }
 
     /**
@@ -151,54 +152,29 @@
      * connection if it loses challenge. Otherwise, it keeps the connection.
      */
 
-    boolean initiateConnection(SocketChannel s) {
+    boolean initiateConnection(SocketChannel s, Long sid) {
         boolean challenged = true;
         boolean wins = false;
         long newChallenge;
-
-        // Compare IP addresses based on their hash codes 
-        //int hashCodeRemote = s.socket().getInetAddress().hashCode();
-        //if(hashCodeRemote >= localIP.hashCode()){
-        //    wins = false;
-        //} else {
-        //    wins = true;
-        //} 
-        //LOG.warn("Hash codes: " + hashCodeRemote + ", " + localIP.hashCode());
         
         try {
-            while (challenged && s.isConnected()) {
-                // Sending challenge
-                byte[] msgBytes = new byte[8];
-                ByteBuffer msgBuffer = ByteBuffer.wrap(msgBytes);
-                msgBuffer.putLong(challenge);
-                msgBuffer.position(0);
-                s.write(msgBuffer);
-        
-                // Reading challenge
-                msgBuffer.position(0);
-                s.read(msgBuffer);
-        
-                msgBuffer.position(0);
-                newChallenge = msgBuffer.getLong();
-                if (challenge > newChallenge) {
-                   wins = true;
-                    challenged = false;
-                } else if (challenge == newChallenge) {
-                    genChallenge();
-                } else {
-                    challenged = false;
-                }
-            }
+            // Sending id and challenge
+            byte[] msgBytes = new byte[8];
+            ByteBuffer msgBuffer = ByteBuffer.wrap(msgBytes);
+            msgBuffer.putLong(self.getId());
+            msgBuffer.position(0);
+            s.write(msgBuffer);
         } catch (IOException e) {
             LOG.warn("Exception reading or writing challenge: "
                     + e.toString());
             return false;
         }
-
+        
         // If lost the challenge, then drop the new connection
-        if (!wins) {
+        if (sid > self.getId()) {
             try {
-                //LOG.warn("lost cause (initiate");
+                LOG.warn("Have smaller server identifier, so dropping the connection: (" + 
+                        sid + ", " + self.getId());
                 s.socket().close();
             } catch (IOException e) {
                 LOG.warn("Error when closing socket or trying to reopen connection: "
@@ -206,33 +182,23 @@
 
             }
         // Otherwise proceed with the connection
-        } else
-            synchronized (senderWorkerMap) {
-                /*
-                 * It may happen that a thread from a previous connection to the same
-                 * server is still active. In this case, we terminate the thread by
-                 * calling finish(). Note that senderWorkerMap is a map from IP 
-                 * addresses to worker thread.
-                 */
-                if (senderWorkerMap.get(s.socket().getInetAddress()) != null) {
-                    senderWorkerMap.get(s.socket().getInetAddress()).finish();
-                }
-
-                /*
-                 * Start new worker thread with a clean state.
-                 */
+        } else {
                 if (s != null) {
-                    SendWorker sw = new SendWorker(s);
-                    RecvWorker rw = new RecvWorker(s);
+                    SendWorker sw = new SendWorker(s, sid);
+                    RecvWorker rw = new RecvWorker(s, sid);
                     sw.setRecv(rw);
 
                     if (senderWorkerMap
-                            .containsKey(s.socket().getInetAddress())) {
-                        InetAddress addr = s.socket().getInetAddress();
-                        senderWorkerMap.get(addr).finish();
+                            .containsKey(sid)) {
+                        senderWorkerMap.get(sid).finish();
                     }
 
-                    senderWorkerMap.put(s.socket().getInetAddress(), sw);
+                    if (!queueSendMap.containsKey(sid)) {
+                        queueSendMap.put(sid, new ArrayBlockingQueue<ByteBuffer>(
+                                CAPACITY));
+                    }
+                    
+                    senderWorkerMap.put(sid, sw);
                     sw.start();
                     rw.start();
 
@@ -241,8 +207,8 @@
                     LOG.warn("Channel null");
                     return false;
                 }
-            }
-
+            
+        }
         return false;
     }
 
@@ -257,93 +223,60 @@
         boolean challenged = true;
         boolean wins = false;
         long newChallenge;
-       
-        
-        //Compare IP addresses based on their hash codes.
-        //int hashCodeRemote = s.socket().getInetAddress().hashCode();
-        //if(hashCodeRemote >= localIP.hashCode()){
-        //    wins = false;
-        //} else {
-        //    wins = true;
-        //} 
-        
-        //LOG.warn("Hash codes: " + hashCodeRemote + ", " + localIP.hashCode());
-        
+        Long sid = null;
         
         try {
-            while (challenged && s.isConnected()) {
-               // Sending challenge
-                byte[] msgBytes = new byte[8];
-                ByteBuffer msgBuffer = ByteBuffer.wrap(msgBytes);
-                long vsent;
-                if (senderWorkerMap.get(s.socket().getInetAddress()) == null)
-                    vsent = Long.MIN_VALUE;
-                else
-                    vsent = challenge;
-                msgBuffer.putLong(vsent);
-                msgBuffer.position(0);
-                s.write(msgBuffer);
-        
-                // Reading challenge
-                msgBuffer.position(0);
-                s.read(msgBuffer);
-        
-                msgBuffer.position(0);
-                newChallenge = msgBuffer.getLong();
-                if (vsent > newChallenge) {
-                    wins = true;
-                    challenged = false;
-                } else if (challenge == newChallenge) {
-                    genChallenge();
-                } else {
-                    challenged = false;
-                }
-            }
+            // Sending challenge and sid
+            byte[] msgBytes = new byte[8];
+            ByteBuffer msgBuffer = ByteBuffer.wrap(msgBytes);
+                
+            s.read(msgBuffer);
+            msgBuffer.position(0);
+                
+            // Read server id
+            sid = Long.valueOf(msgBuffer.getLong());
         } catch (IOException e) {
             LOG.warn("Exception reading or writing challenge: "
                     + e.toString());
             return false;
         }
-
+        
         //If wins the challenge, then close the new connection.
-        if (wins) {
+        if (sid < self.getId()) {
             try {
-                InetAddress addr = s.socket().getInetAddress();
-                SendWorker sw = senderWorkerMap.get(addr);
+                SendWorker sw = senderWorkerMap.get(sid);
 
-                //LOG.warn("Keep connection (received)");
+                LOG.warn("Create new connection");
                 //sw.connect();
                 s.socket().close();
-                sw.finish();
-                SocketChannel channel = SocketChannel.open(new InetSocketAddress(addr, port));
+                if(sw != null) sw.finish();
+                SocketChannel channel = SocketChannel.open(self.quorumPeers.get(sid).electionAddr);
                 if (channel.isConnected()) {
-                    initiateConnection(channel);
+                    initiateConnection(channel, sid);
                 }
                 
-                
             } catch (IOException e) {
                 LOG.warn("Error when closing socket or trying to reopen connection: "
                                 + e.toString());
             }
         //Otherwise start worker threads to receive data.
-        } else
-            synchronized (senderWorkerMap) {
-                if (senderWorkerMap.get(s.socket().getInetAddress()) != null) {
-                    senderWorkerMap.get(s.socket().getInetAddress()).finish();       
-                }
-                
+        } else {
+            
                 if (s != null) {
-                    SendWorker sw = new SendWorker(s);
-                    RecvWorker rw = new RecvWorker(s);
+                    SendWorker sw = new SendWorker(s, sid);
+                    RecvWorker rw = new RecvWorker(s, sid);
                     sw.setRecv(rw);
 
-                    if (senderWorkerMap
-                            .containsKey(s.socket().getInetAddress())) {
-                        InetAddress addr = s.socket().getInetAddress();
-                        senderWorkerMap.get(addr).finish();
+                    if (senderWorkerMap.containsKey(sid)) {
+                        senderWorkerMap.get(sid).finish();
                     }
-
-                    senderWorkerMap.put(s.socket().getInetAddress(), sw);
+                    
+                    senderWorkerMap.put(sid, sw);
+                    
+                    if (!queueSendMap.containsKey(sid)) {
+                        queueSendMap.put(sid, new ArrayBlockingQueue<ByteBuffer>(
+                                CAPACITY));
+                    }      
                     sw.start();
                     rw.start();
 
@@ -352,8 +285,7 @@
                     LOG.warn("Channel null");
                     return false;
                 }
-            }
-
+        }
         return false;
     }
 
@@ -361,14 +293,14 @@
      * Processes invoke this message to send a message. Currently, only leader
      * election uses it.
      */
-    void toSend(InetAddress addr, ByteBuffer b) {
+    void toSend(Long sid, ByteBuffer b) {
         /*
          * If sending message to myself, then simply enqueue it (loopback).
          */
-        if (addr.equals(localIP)) {
+        if (self.getId() == sid) {
             try {
                 b.position(0);
-                recvQueue.put(new Message(b.duplicate(), addr));
+                recvQueue.put(new Message(b.duplicate(), sid));
             } catch (InterruptedException e) {
                 LOG.warn("Exception when loopbacking");
             }
@@ -380,33 +312,33 @@
                 /*
                  * Start a new connection if doesn't have one already.
                  */
-                if (!queueSendMap.containsKey(addr)) {
-                    queueSendMap.put(addr, new ArrayBlockingQueue<ByteBuffer>(
+                if (!queueSendMap.containsKey(sid)) {
+                    queueSendMap.put(sid, new ArrayBlockingQueue<ByteBuffer>(
                             CAPACITY));
-                    queueSendMap.get(addr).put(b);
+                    queueSendMap.get(sid).put(b);
 
                 } else {
-                    if (queueSendMap.get(addr).remainingCapacity() == 0) {
-                        queueSendMap.get(addr).take();
+                    if (queueSendMap.get(sid).remainingCapacity() == 0) {
+                        queueSendMap.get(sid).take();
                     }
-                    queueSendMap.get(addr).put(b);
+                    queueSendMap.get(sid).put(b);
                 }
                 
-                synchronized (senderWorkerMap) {
-                    if (senderWorkerMap.get(addr) == null) {
+                //synchronized (senderWorkerMap) {
+                    if ((senderWorkerMap.get(sid) == null)) {
                         SocketChannel channel;
                         try {
                             channel = SocketChannel
-                                    .open(new InetSocketAddress(addr, port));
+                                    .open(self.quorumPeers.get(sid).electionAddr);
                             channel.socket().setTcpNoDelay(true);
-                            initiateConnection(channel);
+                            initiateConnection(channel, sid);
                         } catch (IOException e) {
                             LOG.warn("Cannot open channel to "
-                                    + addr.toString() + "( " + e.toString()
+                                    + sid + "( " + e.toString()
                                     + ")");
                         }
                     }
-                }     
+                //}     
             } catch (InterruptedException e) {
                 LOG.warn("Interrupted while waiting to put message in queue."
                                 + e.toString());
@@ -428,9 +360,15 @@
     /**
      * Flag that it is time to wrap up all activities and interrupt the listener.
      */
-    public void shutdown() {
+    public void halt() {
         shutdown = true;
-        listener.interrupt();
+        LOG.warn("Halting listener");
+        listener.halt();
+        
+        for(SendWorker sw: senderWorkerMap.values()){
+            LOG.warn("Halting sender: " + sw);
+            sw.finish();
+        }
     }
 
     /**
@@ -438,6 +376,7 @@
      */
     class Listener extends Thread {
 
+        ServerSocketChannel ss = null;
         /**
          * Sleeps on accept().
          */
@@ -446,35 +385,31 @@
             ServerSocketChannel ss = null;
             try {
                 ss = ServerSocketChannel.open();
+                int port = self.quorumPeers.get(self.getId()).electionAddr.getPort();
+                LOG.warn("My election bind port: " + port);
                 ss.socket().bind(new InetSocketAddress(port));
 
                 while (!shutdown) {
                     SocketChannel client = ss.accept();
                     client.socket().setTcpNoDelay(true);
-                    /*
-                     * This synchronized block guarantees that if
-                     * both parties try to connect to each other
-                     * simultaneously, then only one will succeed.
-                     * If we don't have this block, then there 
-                     * are runs in which both parties act as if they
-                     * don't have any connection starting or started.
-                     * In receiveConnection(), a server sends the minimum
-                     * value for a challenge, if they believe they must
-                     * accept the connection because they don't have one.
-                     * 
-                     * This synchronized block prevents that the same server
-                     * invokes receiveConnection() and initiateConnection() 
-                     * simultaneously.
-                     */
-                    synchronized(senderWorkerMap){
-                        LOG.warn("Connection request");
-                        receiveConnection(client);
-                    }
+                    
+                    //synchronized(senderWorkerMap){
+                    LOG.warn("Connection request");
+                    receiveConnection(client);
+                    //}
                 }
             } catch (IOException e) {
                 System.err.println("Listener.run: " + e.getMessage());
             }
         }
+        
+        void halt(){
+            try{
+                if(ss != null) ss.close();
+            } catch (IOException e){
+                LOG.warn("Exception when shutting down listener: " + e);
+            }
+        }
     }
 
     /**
@@ -485,17 +420,17 @@
 
     class SendWorker extends Thread {
         // Send msgs to peer
-        InetAddress addr;
+        Long sid;
         SocketChannel channel;
         RecvWorker recvWorker;
         boolean running = true;
 
-        SendWorker(SocketChannel channel) {
-            this.addr = channel.socket().getInetAddress();
+        SendWorker(SocketChannel channel, Long sid) {
+            this.sid = sid;
             this.channel = channel;
             recvWorker = null;
             
-            LOG.debug("Address of remote peer: " + this.addr);
+            LOG.debug("Address of remote peer: " + this.sid);
         }
 
         void setRecv(RecvWorker recvWorker) {
@@ -508,7 +443,7 @@
             this.interrupt();
             if (recvWorker != null)
                 recvWorker.finish();
-            senderWorkerMap.remove(channel.socket().getInetAddress());
+            senderWorkerMap.remove(sid);
             return running;
         }
 
@@ -519,7 +454,7 @@
 
                 ByteBuffer b = null;
                 try {
-                    b = queueSendMap.get(addr).take();
+                    b = queueSendMap.get(sid).take();
                 } catch (InterruptedException e) {
                     LOG.warn("Interrupted while waiting for message on queue ("
                                     + e.toString() + ")");
@@ -541,18 +476,18 @@
                      * If reconnection doesn't work, then put the
                      * message back to the beginning of the queue and leave.
                      */
-                    LOG.warn("Exception when using channel: " + addr
+                    LOG.warn("Exception when using channel: " + sid
                             + ")" + e.toString());
                     running = false;
                     synchronized (senderWorkerMap) {
                         recvWorker.finish();
                         recvWorker = null;
                     
-                        senderWorkerMap.remove(channel.socket().getInetAddress());
+                        senderWorkerMap.remove(sid);
                     
-                        if (queueSendMap.get(channel.socket().getInetAddress())
+                        if (queueSendMap.get(sid)
                                     .size() == 0)
-                            queueSendMap.get(channel.socket().getInetAddress())
+                            queueSendMap.get(sid)
                                     .offer(b);
                     }
                 }
@@ -566,12 +501,12 @@
      * channel breaks, then removes itself from the pool of receivers.
      */
     class RecvWorker extends Thread {
-        InetAddress addr;
+        Long sid;
         SocketChannel channel;
         boolean running = true;
 
-        RecvWorker(SocketChannel channel) {
-            this.addr = channel.socket().getInetAddress();
+        RecvWorker(SocketChannel channel, Long sid) {
+            this.sid = sid;
             this.channel = channel;
         }
 
@@ -610,7 +545,7 @@
                         message.position(0);
                         synchronized (recvQueue) {
                             recvQueue
-                                    .put(new Message(message.duplicate(), addr));
+                                    .put(new Message(message.duplicate(), sid));
                         }
                         msgLength.position(0);
                     }

Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java?rev=700714&r1=700713&r2=700714&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java (original)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java Wed Oct  1 01:40:02 2008
@@ -26,13 +26,16 @@
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.HashMap;
+
+import org.apache.jute.BinaryInputArchive;
+import org.apache.jute.InputArchive;
 import org.apache.log4j.Logger;
 import org.apache.zookeeper.server.NIOServerCnxn;
 import org.apache.zookeeper.server.ZooKeeperServer;
 import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
 
 import static org.apache.zookeeper.server.quorum.QuorumPeerConfig.getElectionAlg;
-import static org.apache.zookeeper.server.quorum.QuorumPeerConfig.getElectionPort;
 import static org.apache.zookeeper.server.quorum.QuorumPeerConfig.getInitLimit;
 import static org.apache.zookeeper.server.quorum.QuorumPeerConfig.getServerId;
 import static org.apache.zookeeper.server.quorum.QuorumPeerConfig.getServers;
@@ -78,13 +81,22 @@
     }
 
     public static class QuorumServer {
-        public QuorumServer(long id, InetSocketAddress addr) {
+        public QuorumServer(long id, InetSocketAddress addr,
+                InetSocketAddress electionAddr) {
             this.id = id;
             this.addr = addr;
+            this.electionAddr = electionAddr;
         }
 
+        public QuorumServer(long id, InetSocketAddress addr) {
+            this.id = id;
+            this.addr = addr;
+        }
+        
         public InetSocketAddress addr;
 
+        public InetSocketAddress electionAddr;
+        
         public long id;
     }
 
@@ -94,7 +106,7 @@
     /**
      * The servers that make up the cluster
      */
-    ArrayList<QuorumServer> quorumPeers;
+    HashMap<Long, QuorumServer> quorumPeers;
     public int getQuorumSize(){
         return quorumPeers.size();
     }
@@ -239,23 +251,23 @@
 
     Election electionAlg;
 
-    int electionPort;
-
     NIOServerCnxn.Factory cnxnFactory;
     private FileTxnSnapLog logFactory = null;
 
     
     public QuorumPeer() {
         super("QuorumPeer");
+        QuorumStats.getInstance().setStatsProvider(this);
     }
     
-    public QuorumPeer(ArrayList<QuorumServer> quorumPeers, File dataDir, File dataLogDir,
-            int electionAlg, int electionPort,long myid, int tickTime,
-            int initLimit, int syncLimit,NIOServerCnxn.Factory cnxnFactory) throws IOException {
+    public QuorumPeer(HashMap<Long, QuorumServer> quorumPeers, File dataDir,
+            File dataLogDir, int electionType,
+            long myid, int tickTime, int initLimit, int syncLimit,
+            NIOServerCnxn.Factory cnxnFactory) throws IOException {
         super("QuorumPeer");
         this.cnxnFactory = cnxnFactory;
         this.quorumPeers = quorumPeers;
-        this.electionPort = electionPort;
+        this.electionType = electionType;
         this.myid = myid;
         this.tickTime = tickTime;
         this.initLimit = initLimit;
@@ -264,22 +276,6 @@
         QuorumStats.getInstance().setStatsProvider(this);
     }
     
-    public QuorumPeer(ArrayList<QuorumServer> quorumPeers, FileTxnSnapLog logFactory,
-            int electionAlg, int electionPort,long myid, int tickTime,
-            int initLimit, int syncLimit,NIOServerCnxn.Factory cnxnFactory) throws IOException {
-        
-        super("QuorumPeer");
-        this.cnxnFactory = cnxnFactory;
-        this.quorumPeers = quorumPeers;
-        this.electionPort = electionPort;
-        this.myid = myid;
-        this.tickTime = tickTime;
-        this.initLimit = initLimit;
-        this.syncLimit = syncLimit;        
-        this.logFactory=logFactory;
-        QuorumStats.getInstance().setStatsProvider(this);
-    }
-
     @Override
     public synchronized void start() {
         startLeaderElection();
@@ -292,9 +288,9 @@
         responder.running = false;
         responder.interrupt();
     }
-    public void startLeaderElection() {
+    synchronized public void startLeaderElection() {
         currentVote = new Vote(myid, getLastLoggedZxid());
-        for (QuorumServer p : quorumPeers) {
+        for (QuorumServer p : quorumPeers.values()) {
             if (p.id == myid) {
                 myQuorumAddr = p.addr;
                 break;
@@ -319,13 +315,11 @@
      * This constructor is only used by the existing unit test code.
      * It defaults to FileLogProvider persistence provider.
      */
-    public QuorumPeer(ArrayList<QuorumServer> quorumPeers, File snapDir,
-            File logDir, int clientPort, int electionAlg, int electionPort,
+    public QuorumPeer(HashMap<Long,QuorumServer> quorumPeers, File snapDir,
+            File logDir, int clientPort, int electionAlg,
             long myid, int tickTime, int initLimit, int syncLimit) throws IOException {
-        this(quorumPeers,
-                new FileTxnSnapLog(snapDir,logDir),
-                electionAlg,electionPort,myid,tickTime,initLimit,syncLimit,
-                new NIOServerCnxn.Factory(clientPort));
+        this(quorumPeers, snapDir, logDir, electionAlg,
+                myid,tickTime, initLimit,syncLimit,new NIOServerCnxn.Factory(clientPort));
     }
     
     public long getLastLoggedZxid(){
@@ -352,14 +346,14 @@
             // will create a new instance for each run of the protocol
             break;
         case 1:
-            le = new AuthFastLeaderElection(this, this.electionPort);
+            le = new AuthFastLeaderElection(this);
             break;
         case 2:
-            le = new AuthFastLeaderElection(this, this.electionPort, true);
+            le = new AuthFastLeaderElection(this, true);
             break;
         case 3:
             le = new FastLeaderElection(this,
-                        new QuorumCnxManager(this.electionPort));
+                        new QuorumCnxManager(this));
         default:
             assert false;
         }
@@ -484,19 +478,6 @@
     }
 
 
-    public NIOServerCnxn.Factory getCnxnFactory() {
-        return cnxnFactory;
-    }
-
-    public void setCnxnFactory(NIOServerCnxn.Factory cnxnFactory) {
-        this.cnxnFactory = cnxnFactory;
-    }
-
-    public void setQuorumPeers(ArrayList<QuorumServer> quorumPeers) {
-        this.quorumPeers = quorumPeers;
-    }
-
-
     /**
      * get the id of this quorum peer.
      */
@@ -540,6 +521,14 @@
     }
 
     /**
+     * Get an instance of LeaderElection
+     */
+        
+    public Election getElectionAlg(){
+        return electionAlg;
+    }
+        
+    /**
      * Get the number of ticks that can pass between sending a request and getting
      * an acknowledgement
      */
@@ -556,13 +545,6 @@
     }
 
     /**
-     * Gets the election port
-     */
-    public int getElectionPort() {
-        return electionPort;
-    }
-
-    /**
      * Gets the election type
      */
     public int getElectionType() {
@@ -576,13 +558,18 @@
         this.electionType = electionType;
     }
 
-    /**
-     * Sets the election port
-     */
-    public void setElectionPort(int electionPort) {
-        this.electionPort = electionPort;
+    public NIOServerCnxn.Factory getCnxnFactory() {
+        return cnxnFactory;
     }
-    
+
+    public void setCnxnFactory(NIOServerCnxn.Factory cnxnFactory) {
+        this.cnxnFactory = cnxnFactory;
+    }
+
+    public void setQuorumPeers(HashMap<Long,QuorumServer> quorumPeers) {
+        this.quorumPeers = quorumPeers;
+    }
+
     public int getClientPort() {
         return -1;
     }

Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeerConfig.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeerConfig.java?rev=700714&r1=700713&r2=700714&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeerConfig.java (original)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeerConfig.java Wed Oct  1 01:40:02 2008
@@ -23,9 +23,9 @@
 import java.io.FileInputStream;
 import java.io.FileReader;
 import java.net.InetSocketAddress;
-import java.util.ArrayList;
 import java.util.Properties;
 import java.util.Map.Entry;
+import java.util.HashMap;
 
 import org.apache.log4j.Logger;
 
@@ -40,7 +40,7 @@
     private int syncLimit;
     private int electionAlg;
     private int electionPort;
-    private ArrayList<QuorumServer> servers = null;
+    private HashMap<Long,QuorumServer> servers = null;
     private long serverId;
 
     private QuorumPeerConfig(int port, String dataDir, String dataLogDir) {
@@ -68,7 +68,7 @@
             } finally {
                 zooCfgStream.close();
             }
-            ArrayList<QuorumServer> servers = new ArrayList<QuorumServer>();
+            HashMap<Long,QuorumServer> servers = new HashMap<Long,QuorumServer>();
             String dataDir = null;
             String dataLogDir = null;
             int clientPort = 0;
@@ -94,19 +94,24 @@
                     syncLimit = Integer.parseInt(value);
                 } else if (key.equals("electionAlg")) {
                     electionAlg = Integer.parseInt(value);
-                } else if (key.equals("electionPort")) {
-                    electionPort = Integer.parseInt(value);
                 } else if (key.startsWith("server.")) {
                     int dot = key.indexOf('.');
                     long sid = Long.parseLong(key.substring(dot + 1));
                     String parts[] = value.split(":");
-                    if (parts.length != 2) {
+                    if ((parts.length != 2) && 
+                            (parts.length != 3)){
                         LOG.error(value
-                                + " does not have the form host:port");
+                                + " does not have the form host:port or host:port:port");
                     }
                     InetSocketAddress addr = new InetSocketAddress(parts[0],
-                            Integer.parseInt(parts[1]));
-                    servers.add(new QuorumServer(sid, addr));
+                            Integer.parseInt(parts[1])); 
+                    if(parts.length == 2)
+                        servers.put(Long.valueOf(sid), new QuorumServer(sid, addr));
+                    else if(parts.length == 3){
+                        InetSocketAddress electionAddr = new InetSocketAddress(parts[0],
+                                Integer.parseInt(parts[2]));
+                        servers.put(Long.valueOf(sid), new QuorumServer(sid, addr, electionAddr));
+                    }
                 } else {
                     System.setProperty("zookeeper." + key, value);
                 }
@@ -145,7 +150,6 @@
             conf.initLimit = initLimit;
             conf.syncLimit = syncLimit;
             conf.electionAlg = electionAlg;
-            conf.electionPort = electionPort;
             conf.servers = servers;
             if (servers.size() > 1) {
                 File myIdFile = new File(dataDir, "myid");
@@ -198,13 +202,8 @@
         assert instance instanceof QuorumPeerConfig;
         return ((QuorumPeerConfig)instance).electionAlg;
     }
-
-    public static int getElectionPort() {
-        assert instance instanceof QuorumPeerConfig;
-        return ((QuorumPeerConfig)instance).electionPort;
-    }
-
-    public static ArrayList<QuorumServer> getServers() {
+    
+    public static HashMap<Long,QuorumServer> getServers() {
         assert instance instanceof QuorumPeerConfig;
         return ((QuorumPeerConfig)instance).servers;
     }

Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeerMain.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeerMain.java?rev=700714&r1=700713&r2=700714&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeerMain.java (original)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeerMain.java Wed Oct  1 01:40:02 2008
@@ -77,7 +77,6 @@
                                 new File(QuorumPeerConfig.getDataLogDir()), 
                                 new File(QuorumPeerConfig.getDataDir())));
                     peer.setQuorumPeers(QuorumPeerConfig.getServers());
-                    peer.setElectionPort(QuorumPeerConfig.getElectionPort());
                     peer.setElectionType(QuorumPeerConfig.getElectionAlg());
                     peer.setMyid(QuorumPeerConfig.getServerId());
                     peer.setTickTime(QuorumPeerConfig.getTickTime());

Added: hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/FLETest.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/FLETest.java?rev=700714&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/FLETest.java (added)
+++ hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/FLETest.java Wed Oct  1 01:40:02 2008
@@ -0,0 +1,193 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.test;
+
+import java.io.File;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Random;
+
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.server.quorum.FastLeaderElection;
+import org.apache.zookeeper.server.quorum.QuorumCnxManager;
+import org.apache.zookeeper.server.quorum.QuorumPeer;
+import org.apache.zookeeper.server.quorum.QuorumStats;
+import org.apache.zookeeper.server.quorum.Vote;
+import org.apache.zookeeper.server.quorum.QuorumPeer.QuorumServer;
+import org.apache.zookeeper.server.quorum.QuorumPeer.ServerState;
+
+import junit.framework.TestCase;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class FLETest extends TestCase {
+    protected static final Logger LOG = Logger.getLogger(FLETest.class);
+
+    int count;
+    int baseport;
+    int baseLEport;
+    HashMap<Long,QuorumServer> peers; 
+    ArrayList<LEThread> threads;
+    File tmpdir[];
+    int port[];
+    
+    volatile Vote votes[];
+    volatile boolean leaderDies;
+    volatile long leader = -1;
+    volatile int round = 1; 
+    Random rand = new Random();
+    
+    @Override
+    public void setUp() throws Exception {
+        count = 7;
+        baseport= 33003;
+        baseLEport = 43003;
+        
+        peers = new HashMap<Long,QuorumServer>(count);
+        threads = new ArrayList<LEThread>(count);
+        votes = new Vote[count];
+        tmpdir = new File[count];
+        port = new int[count];
+        
+        QuorumStats.registerAsConcrete();
+        LOG.info("SetUp " + getName());
+    }
+
+    @Override
+    public void tearDown() throws Exception {
+        for(int i = 0; i < threads.size(); i++) {
+            ((FastLeaderElection) threads.get(i).peer.getElectionAlg()).shutdown();
+        }
+        LOG.info("FINISHED " + getName());
+    }
+    
+    class LEThread extends Thread {
+        FastLeaderElection le;
+        int i;
+        QuorumPeer peer;
+    int peerRound = 1;
+
+        LEThread(QuorumPeer peer, int i) {
+            this.i = i;
+            this.peer = peer;
+            LOG.info("Constructor: " + getName());
+        }
+        public void run() {
+            try {
+                Vote v = null;
+                while(true) {
+            peer.setPeerState(ServerState.LOOKING);
+            LOG.info("Going to call leader election again.");
+                    v = peer.getElectionAlg().lookForLeader();
+                    if(v == null){ 
+                        LOG.info("Thread " + i + " got a null vote");
+                        break;
+                    }
+            peer.setCurrentVote(v);
+            
+                    LOG.info("Finished election: " + i + ", " + v.id);
+                    votes[i] = v;
+                    if (v.id == ((long) i)) {
+                        LOG.debug("I'm the leader");
+                        synchronized(FLETest.this) {
+                            if (leaderDies) {
+                                LOG.debug("Leader " + i + " dying");
+                                leaderDies = false;
+                                ((FastLeaderElection) peer.getElectionAlg()).shutdown();
+                                leader = -1;
+                                LOG.debug("Leader " + i + " dead");
+                            } else {
+                                leader = i; 
+                            }
+                round++; 
+                            FLETest.this.notifyAll();
+                        }
+                        break;
+                    }
+                    synchronized(FLETest.this) {
+                        if (round == ((FastLeaderElection) peer.getElectionAlg()).getLogicalClock()) {
+                int tmp_round = round;
+                            FLETest.this.wait(1000);
+                if(tmp_round == round) round++;
+                        }
+            LOG.info("The leader: " + leader + " and my vote " + votes[i].id);
+                        if (leader == votes[i].id) {
+                            break;
+                        }
+            peerRound++;
+                    }
+                    Thread.sleep(rand.nextInt(1000));
+                    peer.setCurrentVote(new Vote(peer.getId(), 0));
+                }
+                LOG.debug("Thread " + i + " votes " + v);
+            } catch (InterruptedException e) {
+                e.printStackTrace();
+            }
+        }
+    }
+    
+    @Test
+    public void testLE() throws Exception {
+       
+        FastLeaderElection le[] = new FastLeaderElection[count];
+        leaderDies = true;
+        boolean allowOneBadLeader = leaderDies;
+       
+        LOG.info("TestLE: " + getName()+ ", " + count);
+        for(int i = 0; i < count; i++) {
+            peers.put(Long.valueOf(i), new QuorumServer(i, new InetSocketAddress(baseport+100+i), 
+                    new InetSocketAddress(baseLEport+100+i)));
+            tmpdir[i] = File.createTempFile("letest", "test");
+            port[i] = baseport+i;    
+        }
+        
+        for(int i = 0; i < le.length; i++) {
+            QuorumPeer peer = new QuorumPeer(peers, tmpdir[i], tmpdir[i], port[i], 3, i, 2, 2, 2);
+            peer.startLeaderElection();
+            //le[i] = new FastLeaderElection(peer, new QuorumCnxManager(peer));
+            LEThread thread = new LEThread(peer, i);
+            thread.start();
+            threads.add(thread);
+        }
+        LOG.info("Started threads " + getName());
+        
+       for(int i = 0; i < threads.size(); i++) {
+            threads.get(i).join(20000);
+            if (threads.get(i).isAlive()) {
+                fail("Threads didn't join: " + i);
+            }
+        }
+        long id = votes[0].id;
+        for(int i = 1; i < votes.length; i++) {
+            if (votes[i] == null) {
+                fail("Thread " + i + " had a null vote");
+            }
+        LOG.info("Final leader info: " + i + ", " + votes[i].id + ", " + id); 
+            if (votes[i].id != id) {
+                if (allowOneBadLeader && votes[i].id == i) {
+                    allowOneBadLeader = false;
+                } else {
+                    fail("Thread " + i + " got " + votes[i].id + " expected " + id);
+                }
+            }
+        }
+    }
+}

Modified: hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/LETest.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/LETest.java?rev=700714&r1=700713&r2=700714&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/LETest.java (original)
+++ hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/LETest.java Wed Oct  1 01:40:02 2008
@@ -22,6 +22,7 @@
 import java.net.InetSocketAddress;
 import java.util.ArrayList;
 import java.util.Random;
+import java.util.HashMap;
 
 import org.apache.log4j.Logger;
 import org.apache.zookeeper.server.quorum.LeaderElection;
@@ -86,14 +87,14 @@
     public void testLE() throws Exception {
         int count = 30;
         int baseport= 33003;
-        ArrayList<QuorumServer> peers = new ArrayList<QuorumServer>(count);
+        HashMap<Long,QuorumServer> peers = new HashMap<Long,QuorumServer>(count);
         ArrayList<LEThread> threads = new ArrayList<LEThread>(count);
         File tmpdir[] = new File[count];
         int port[] = new int[count];
         votes = new Vote[count];
         QuorumStats.registerAsConcrete();
         for(int i = 0; i < count; i++) {
-            peers.add(new QuorumServer(i, new InetSocketAddress(baseport+100+i)));
+            peers.put(Long.valueOf(i), new QuorumServer(i, new InetSocketAddress("127.0.0.1", baseport+100+i)));
             tmpdir[i] = File.createTempFile("letest", "test");
             port[i] = baseport+i;    
         }
@@ -101,7 +102,7 @@
         leaderDies = true;
         boolean allowOneBadLeader = leaderDies;
         for(int i = 0; i < le.length; i++) {
-            QuorumPeer peer = new QuorumPeer(peers, tmpdir[i], tmpdir[i], port[i], 0, 0, i, 2, 2, 2);
+            QuorumPeer peer = new QuorumPeer(peers, tmpdir[i], tmpdir[i], port[i], 0, i, 2, 2, 2);
             peer.startLeaderElection();
             le[i] = new LeaderElection(peer);
             LEThread thread = new LEThread(le[i], peer, i);

Modified: hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/QuorumTest.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/QuorumTest.java?rev=700714&r1=700713&r2=700714&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/QuorumTest.java (original)
+++ hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/QuorumTest.java Wed Oct  1 01:40:02 2008
@@ -22,6 +22,7 @@
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.util.ArrayList;
+import java.util.HashMap;
 
 import org.apache.log4j.Logger;
 import org.apache.zookeeper.KeeperException;
@@ -65,22 +66,23 @@
         int tickTime = 2000;
         int initLimit = 3;
         int syncLimit = 3;
-        ArrayList<QuorumServer> peers = new ArrayList<QuorumServer>();
-        peers.add(new QuorumServer(1, new InetSocketAddress("127.0.0.1", 3181)));
-        peers.add(new QuorumServer(2, new InetSocketAddress("127.0.0.1", 3182)));
-        peers.add(new QuorumServer(3, new InetSocketAddress("127.0.0.1", 3183)));
-        peers.add(new QuorumServer(4, new InetSocketAddress("127.0.0.1", 3184)));
-        peers.add(new QuorumServer(5, new InetSocketAddress("127.0.0.1", 3185)));
+        HashMap<Long,QuorumServer> peers = new HashMap<Long,QuorumServer>();
+        peers.put(Long.valueOf(1), new QuorumServer(1, new InetSocketAddress("127.0.0.1", 3181)));
+        peers.put(Long.valueOf(2), new QuorumServer(2, new InetSocketAddress("127.0.0.1", 3182)));
+        peers.put(Long.valueOf(3), new QuorumServer(3, new InetSocketAddress("127.0.0.1", 3183)));
+        peers.put(Long.valueOf(4), new QuorumServer(4, new InetSocketAddress("127.0.0.1", 3184)));
+        peers.put(Long.valueOf(5), new QuorumServer(5, new InetSocketAddress("127.0.0.1", 3185)));
+        
         LOG.info("creating QuorumPeer 1");
-        s1 = new QuorumPeer(peers, s1dir, s1dir, 2181, 0,  1181, 1, tickTime, initLimit, syncLimit);
+        s1 = new QuorumPeer(peers, s1dir, s1dir, 2181, 0, 1, tickTime, initLimit, syncLimit);
         LOG.info("creating QuorumPeer 2");
-        s2 = new QuorumPeer(peers, s2dir, s2dir, 2182, 0, 1182, 2, tickTime, initLimit, syncLimit);
+        s2 = new QuorumPeer(peers, s2dir, s2dir, 2182, 0, 2, tickTime, initLimit, syncLimit);
         LOG.info("creating QuorumPeer 3");
-        s3 = new QuorumPeer(peers, s3dir, s3dir, 2183, 0, 1183, 3, tickTime, initLimit, syncLimit);
+        s3 = new QuorumPeer(peers, s3dir, s3dir, 2183, 0, 3, tickTime, initLimit, syncLimit);
         LOG.info("creating QuorumPeer 4");
-        s4 = new QuorumPeer(peers, s4dir, s4dir, 2184, 0, 1184, 4, tickTime, initLimit, syncLimit);
+        s4 = new QuorumPeer(peers, s4dir, s4dir, 2184, 0, 4, tickTime, initLimit, syncLimit);
         LOG.info("creating QuorumPeer 5");
-        s5 = new QuorumPeer(peers, s5dir, s5dir, 2185, 0, 1185, 5, tickTime, initLimit, syncLimit);
+        s5 = new QuorumPeer(peers, s5dir, s5dir, 2185, 0, 5, tickTime, initLimit, syncLimit);
         LOG.info("start QuorumPeer 1");
         s1.start();
         LOG.info("start QuorumPeer 2");