You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by fp...@apache.org on 2011/04/29 18:12:45 UTC

svn commit: r1097865 - in /zookeeper/trunk: ./ src/java/main/org/apache/zookeeper/server/quorum/ src/java/test/org/apache/zookeeper/test/

Author: fpj
Date: Fri Apr 29 16:12:44 2011
New Revision: 1097865

URL: http://svn.apache.org/viewvc?rev=1097865&view=rev
Log:
ZOOKEEPER-975. new peer goes in LEADING state even if ensemble is online. (vishal via fpj)


Modified:
    zookeeper/trunk/CHANGES.txt
    zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FastLeaderElection.java
    zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Follower.java
    zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Leader.java
    zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java
    zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java
    zookeeper/trunk/src/java/test/org/apache/zookeeper/test/CnxManagerTest.java
    zookeeper/trunk/src/java/test/org/apache/zookeeper/test/FLETest.java

Modified: zookeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/zookeeper/trunk/CHANGES.txt?rev=1097865&r1=1097864&r2=1097865&view=diff
==============================================================================
--- zookeeper/trunk/CHANGES.txt (original)
+++ zookeeper/trunk/CHANGES.txt Fri Apr 29 16:12:44 2011
@@ -201,6 +201,8 @@ BUGFIXES: 
   ZOOKEEPER-1028. In python bindings, zookeeper.set2() should return a stat dict but 
   instead returns None. (Chris Medaglia and Ivan Kelly via mahadev)
 
+  ZOOKEEPER-975. new peer goes in LEADING state even if ensemble is online. (vishal via fpj)
+
 IMPROVEMENTS:
   ZOOKEEPER-724. Improve junit test integration - log harness information 
   (phunt via mahadev)

Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FastLeaderElection.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FastLeaderElection.java?rev=1097865&r1=1097864&r2=1097865&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FastLeaderElection.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FastLeaderElection.java Fri Apr 29 16:12:44 2011
@@ -188,7 +188,7 @@ public class FastLeaderElection implemen
                 while (!stop) {
                     // Sleeps on receive
                     try{
-                        response = manager.recvQueue.poll(3000, TimeUnit.MILLISECONDS);
+                        response = manager.pollRecvQueue(3000, TimeUnit.MILLISECONDS);
                         if(response == null) continue;
 
                         /*
@@ -288,7 +288,9 @@ public class FastLeaderElection implemen
                                     if(LOG.isDebugEnabled()){
                                         LOG.debug("Sending new notification. My id =  " +
                                                 self.getId() + ", Recipient = " +
-                                                response.sid);
+                                                response.sid + " zxid =" +
+                                                current.zxid + " leader=" +
+                                                current.id);
                                     }
                                     ToSend notmsg = new ToSend(
                                             ToSend.mType.notification,
@@ -384,14 +386,14 @@ public class FastLeaderElection implemen
             this.ws = new WorkerSender(manager);
 
             Thread t = new Thread(this.ws,
-                    "WorkerSender(" + Thread.currentThread().getName() + ")");
+                    "WorkerSender[myid=" + self.getId() + "]");
             t.setDaemon(true);
             t.start();
 
             this.wr = new WorkerReceiver(manager);
 
             t = new Thread(this.wr,
-                    "WorkerReceiver(" + Thread.currentThread().getName() + ")");
+                    "WorkerReceiver[myid=" + self.getId() + "]");
             t.setDaemon(true);
             t.start();
         }
@@ -455,7 +457,13 @@ public class FastLeaderElection implemen
         this.messenger = new Messenger(manager);
     }
 
-    private void leaveInstance() {
+    private void leaveInstance(Vote v) {
+        if(LOG.isDebugEnabled()){
+            LOG.debug("About to leave FLE instance: Leader= "
+                + v.id + ", Zxid = " +
+                v.zxid + ", My id = " + self.getId()
+                + ", My state = " + self.getPeerState());
+        }
         recvqueue.clear();
     }
 
@@ -487,7 +495,12 @@ public class FastLeaderElection implemen
                     logicalclock,
                     QuorumPeer.ServerState.LOOKING,
                     sid);
-
+            if(LOG.isDebugEnabled()){
+                LOG.debug("Sending Notification: " + proposedLeader + " (n.leader), "  +
+                      proposedZxid + " (n.zxid), " + logicalclock  +
+                      " (n.round), " + sid + " (recipient), " + self.getId() +
+                      " (myid)");
+            }
             sendqueue.offer(notmsg);
         }
     }
@@ -578,6 +591,11 @@ public class FastLeaderElection implemen
     }
 
     synchronized void updateProposal(long leader, long zxid){
+        if(LOG.isDebugEnabled()){
+            LOG.debug("Updating proposal: " + leader + " (newleader), " + zxid +
+                  " (newzxid), " + proposedLeader + " (oldleader), " +
+                  proposedZxid + " (oldzxid)");
+        }
         proposedLeader = leader;
         proposedZxid = zxid;
     }
@@ -640,7 +658,9 @@ public class FastLeaderElection implemen
             LOG.warn("Failed to register with JMX", e);
             self.jmxLeaderElectionBean = null;
         }
-
+        if (self.start_fle == 0) {
+           self.start_fle = System.currentTimeMillis();
+        }
         try {
             HashMap<Long, Vote> recvset = new HashMap<Long, Vote>();
 
@@ -689,7 +709,11 @@ public class FastLeaderElection implemen
                             tmpTimeOut : maxNotificationInterval);
                     LOG.info("Notification time out: " + notTimeout);
                 }
-                else{
+                else if(self.getVotingView().containsKey(n.sid)) {
+                    /*
+                     * Only proceed if the vote comes from a replica in the
+                     * voting view.
+                     */
                     switch (n.state) {
                     case LOOKING:
                         // If notification > current, replace and send messages out
@@ -697,11 +721,12 @@ public class FastLeaderElection implemen
                             logicalclock = n.epoch;
                             recvset.clear();
                             if(totalOrderPredicate(n.leader, n.zxid,
-                                    getInitId(), getInitLastLoggedZxid()))
+                                    getInitId(), getInitLastLoggedZxid())) {
                                 updateProposal(n.leader, n.zxid);
-                            else
+                            } else {
                                 updateProposal(getInitId(),
                                         getInitLastLoggedZxid());
+                            }
                             sendNotifications();
                         } else if (n.epoch < logicalclock) {
                             if(LOG.isDebugEnabled()){
@@ -711,7 +736,6 @@ public class FastLeaderElection implemen
                             break;
                         } else if (totalOrderPredicate(n.leader, n.zxid,
                                 proposedLeader, proposedZxid)) {
-                            LOG.info("Updating proposal");
                             updateProposal(n.leader, n.zxid);
                             sendNotifications();
                         }
@@ -723,85 +747,66 @@ public class FastLeaderElection implemen
                                     ", Proposed epoch = " + n.epoch);
                         }
 
-                        /*
-                         * Only proceed if the vote comes from a replica in the
-                         * voting view.
-                         */
-                        if(self.getVotingView().containsKey(n.sid)){
-                            recvset.put(n.sid, new Vote(n.leader, n.zxid, n.epoch));
-
-                            //If have received from all nodes, then terminate
-                            if ((self.getVotingView().size() == recvset.size()) &&
-                                    (self.getQuorumVerifier().getWeight(proposedLeader) != 0)){
-                                self.setPeerState((proposedLeader == self.getId()) ?
-                                        ServerState.LEADING: learningState());
-                                leaveInstance();
-                                return new Vote(proposedLeader, proposedZxid);
+                        recvset.put(n.sid, new Vote(n.leader, n.zxid, n.epoch));
 
-                            } else if (termPredicate(recvset,
-                                    new Vote(proposedLeader, proposedZxid,
-                                            logicalclock))) {
-
-                                // Verify if there is any change in the proposed leader
-                                while((n = recvqueue.poll(finalizeWait,
-                                        TimeUnit.MILLISECONDS)) != null){
-                                    if(totalOrderPredicate(n.leader, n.zxid,
-                                            proposedLeader, proposedZxid)){
-                                        recvqueue.put(n);
-                                        break;
-                                    }
+                        if (termPredicate(recvset,
+                                new Vote(proposedLeader, proposedZxid,
+                                        logicalclock))) {
+
+                            // Verify if there is any change in the proposed leader
+                            while((n = recvqueue.poll(finalizeWait,
+                                    TimeUnit.MILLISECONDS)) != null){
+                                if(totalOrderPredicate(n.leader, n.zxid,
+                                        proposedLeader, proposedZxid)){
+                                    recvqueue.put(n);
+                                    break;
                                 }
+                            }
 
-                                /*
-                                 * This predicate is true once we don't read any new
-                                 * relevant message from the reception queue
-                                 */
-                                if (n == null) {
-                                    self.setPeerState((proposedLeader == self.getId()) ?
-                                            ServerState.LEADING: learningState());
-                                    if(LOG.isDebugEnabled()){
-                                        LOG.debug("About to leave FLE instance: Leader= "
-                                            + proposedLeader + ", Zxid = " +
-                                            proposedZxid + ", My id = " + self.getId()
-                                            + ", My state = " + self.getPeerState());
-                                    }
+                            /*
+                             * This predicate is true once we don't read any new
+                             * relevant message from the reception queue
+                             */
+                            if (n == null) {
+                                self.setPeerState((proposedLeader == self.getId()) ?
+                                        ServerState.LEADING: learningState());
 
-                                    leaveInstance();
-                                    return new Vote(proposedLeader,
-                                            proposedZxid);
-                                }
+                                Vote endVote = new Vote(proposedLeader,
+                                        proposedZxid);
+                                leaveInstance(endVote);
+                                return endVote;
                             }
                         }
                         break;
                     case OBSERVING:
                         LOG.debug("Notification from observer: " + n.sid);
                         break;
-                    default:
+                    case FOLLOWING:
+                    case LEADING:
                         /*
-                         * There is at most one leader for each epoch, so if a
-                         * peer claims to be the leader for an epoch, then that
-                         * peer must be the leader (no* arbitrary failures
-                         * assumed). Now, if there is no quorum supporting
-                         * this leader, then processes will naturally move
-                         * to a new epoch.
+                         * Consider all notifications from the same epoch
+                         * together.
                          */
                         if(n.epoch == logicalclock){
                             recvset.put(n.sid, new Vote(n.leader, n.zxid, n.epoch));
-                            if((n.state == ServerState.LEADING) ||
-                                    (termPredicate(recvset, new Vote(n.leader,
+                            if(termPredicate(recvset, new Vote(n.leader,
                                             n.zxid, n.epoch, n.state))
-                                            && checkLeader(outofelection, n.leader, n.epoch)) ){
+                                            && checkLeader(outofelection, n.leader, n.epoch)) {
                                 self.setPeerState((n.leader == self.getId()) ?
                                         ServerState.LEADING: learningState());
 
-                                leaveInstance();
-                                return new Vote(n.leader, n.zxid);
+                                Vote endVote = new Vote(n.leader, n.zxid);
+                                leaveInstance(endVote);
+                                return endVote;
                             }
                         }
 
+                        /**
+                         * Before joining an established ensemble, verify that
+                         * a majority are following the same leader.
+                         */
                         outofelection.put(n.sid, new Vote(n.leader, n.zxid,
                                 n.epoch, n.state));
-
                         if (termPredicate(outofelection, new Vote(n.leader,
                                 n.zxid, n.epoch, n.state))
                                 && checkLeader(outofelection, n.leader, n.epoch)) {
@@ -810,15 +815,20 @@ public class FastLeaderElection implemen
                                 self.setPeerState((n.leader == self.getId()) ?
                                         ServerState.LEADING: learningState());
                             }
-                            leaveInstance();
-                            return new Vote(n.leader, n.zxid);
+                            Vote endVote = new Vote(n.leader, n.zxid);
+                            leaveInstance(endVote);
+                            return endVote;
                         }
-
+                        break;
+                    default:
+                        LOG.warn("Notification state unrecoginized: " + n.state
+                              + " (n.state), " + n.sid + " (n.sid)");
                         break;
                     }
+                } else {
+                    LOG.warn("Ignoring notification from non-cluster member " + n.sid);
                 }
             }
-
             return null;
         } finally {
             try {

Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Follower.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Follower.java?rev=1097865&r1=1097864&r2=1097865&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Follower.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Follower.java Fri Apr 29 16:12:44 2011
@@ -58,9 +58,14 @@ public class Follower extends Learner{
      * @throws InterruptedException
      */
     void followLeader() throws InterruptedException {
+        self.end_fle = System.currentTimeMillis();
+        LOG.info("FOLLOWING - LEADER ELECTION TOOK - " +
+              (self.end_fle - self.start_fle));
+        self.start_fle = 0;
+        self.end_fle = 0;
         fzk.registerJMX(new FollowerBean(this, zk), self.jmxLocalPeerBean);
-        try {
-            InetSocketAddress addr = findLeader();
+        try {            
+            InetSocketAddress addr = findLeader();            
             try {
                 connectToLeader(addr);
                 long newLeaderZxid = registerWithLeader(Leader.FOLLOWERINFO);

Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Leader.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Leader.java?rev=1097865&r1=1097864&r2=1097865&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Leader.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Leader.java Fri Apr 29 16:12:44 2011
@@ -270,6 +270,12 @@ public class Leader {
      * @throws InterruptedException
      */
     void lead() throws IOException, InterruptedException {
+        self.end_fle = System.currentTimeMillis();
+        LOG.info("LEADING - LEADER ELECTION TOOK - " +
+              (self.end_fle - self.start_fle));
+        self.start_fle = 0;
+        self.end_fle = 0;
+
         zk.registerJMX(new LeaderBean(this, zk), self.jmxLocalPeerBean);
 
         try {

Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java?rev=1097865&r1=1097864&r2=1097865&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java Fri Apr 29 16:12:44 2011
@@ -32,6 +32,7 @@ import java.util.Enumeration;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
+import java.util.NoSuchElementException;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.Date;
 
@@ -64,8 +65,11 @@ public class QuorumCnxManager {
     /*
      * Maximum capacity of thread queues
      */
+    static final int RECV_CAPACITY = 100;
+    // Initialized to 1 to prevent sending
+    // stale notifications to peers
+    static final int SEND_CAPACITY = 1;
 
-    static final int CAPACITY = 100;
     static final int PACKETMAXSIZE = 1024 * 1024; 
     /*
      * Maximum number of attempts to connect to a peer
@@ -101,6 +105,10 @@ public class QuorumCnxManager {
      * Reception queue
      */
     public final ArrayBlockingQueue<Message> recvQueue;
+    /*
+     * Object to synchronize access to recvQueue
+     */
+    private final Object recvQLock = new Object();
 
     /*
      * Shutdown flag
@@ -129,7 +137,7 @@ public class QuorumCnxManager {
     }
 
     public QuorumCnxManager(QuorumPeer self) {
-        this.recvQueue = new ArrayBlockingQueue<Message>(CAPACITY);
+        this.recvQueue = new ArrayBlockingQueue<Message>(RECV_CAPACITY);
         this.queueSendMap = new ConcurrentHashMap<Long, ArrayBlockingQueue<ByteBuffer>>();
         this.senderWorkerMap = new ConcurrentHashMap<Long, SendWorker>();
         this.lastMessageSent = new ConcurrentHashMap<Long, ByteBuffer>();
@@ -196,7 +204,7 @@ public class QuorumCnxManager {
             senderWorkerMap.put(sid, sw);
             if (!queueSendMap.containsKey(sid)) {
                 queueSendMap.put(sid, new ArrayBlockingQueue<ByteBuffer>(
-                        CAPACITY));
+                        SEND_CAPACITY));
             }
             
             sw.start();
@@ -273,7 +281,7 @@ public class QuorumCnxManager {
             
             if (!queueSendMap.containsKey(sid)) {
                 queueSendMap.put(sid, new ArrayBlockingQueue<ByteBuffer>(
-                        CAPACITY));
+                        SEND_CAPACITY));
             }
             
             sw.start();
@@ -293,44 +301,31 @@ public class QuorumCnxManager {
          * If sending message to myself, then simply enqueue it (loopback).
          */
         if (self.getId() == sid) {
-            try {
-                b.position(0);
-                recvQueue.put(new Message(b.duplicate(), sid));
-            } catch (InterruptedException e) {
-                LOG.warn("Exception when loopbacking", e);
-            }
+             b.position(0);
+             addToRecvQueue(new Message(b.duplicate(), sid));
             /*
              * Otherwise send to the corresponding thread to send.
              */
         } else {
-            try {
-                /*
-                 * Start a new connection if doesn't have one already.
-                 */
-                if (!queueSendMap.containsKey(sid)) {
-                    ArrayBlockingQueue<ByteBuffer> bq = new ArrayBlockingQueue<ByteBuffer>(
-                            CAPACITY);
-                    queueSendMap.put(sid, bq);
-                    bq.put(b);
-
-                } else {
-                    ArrayBlockingQueue<ByteBuffer> bq = queueSendMap.get(sid);
-                    if(bq != null){
-                        if (bq.remainingCapacity() == 0) {
-                            bq.take();
-                        }
-                        bq.put(b);
-                    } else {
-                        LOG.error("No queue for server " + sid);
-                    }
-                }
-
-                connectOne(sid);
+             /*
+              * Start a new connection if doesn't have one already.
+              */
+             if (!queueSendMap.containsKey(sid)) {
+                 ArrayBlockingQueue<ByteBuffer> bq = new ArrayBlockingQueue<ByteBuffer>(
+                         SEND_CAPACITY);
+                 queueSendMap.put(sid, bq);
+                 addToSendQueue(bq, b);
+
+             } else {
+                 ArrayBlockingQueue<ByteBuffer> bq = queueSendMap.get(sid);
+                 if(bq != null){
+                     addToSendQueue(bq, b);
+                 } else {
+                     LOG.error("No queue for server " + sid);
+                 }
+             }
+             connectOne(sid);
                 
-            } catch (InterruptedException e) {
-                LOG.warn("Interrupted while waiting to put message in queue.",
-                        e);
-            }
         }
     }
     
@@ -634,9 +629,26 @@ public class QuorumCnxManager {
         public void run() {
             threadCnt.incrementAndGet();
             try {
-                ByteBuffer b = lastMessageSent.get(sid);
-                if (b != null) {
-                    send(b);
+                /**
+                 * If there is nothing in the queue to send, then we
+                 * send the lastMessage to ensure that the last message
+                 * was received by the peer. The message could be dropped
+                 * in case self or the peer shutdown their connection
+                 * (and exit the thread) prior to reading/processing
+                 * the last message. Duplicate messages are handled correctly
+                 * by the peer.
+                 *
+                 * If the send queue is non-empty, then we have a recent
+                 * message than that stored in lastMessage. To avoid sending
+                 * stale message, we should send the message in the send queue.
+                 */
+                ArrayBlockingQueue<ByteBuffer> bq = queueSendMap.get(sid);
+                if (bq == null || isSendQueueEmpty(bq)) {
+                   ByteBuffer b = lastMessageSent.get(sid);
+                   if (b != null) {
+                       LOG.debug("Attempting to send lastMessage to sid=" + sid);
+                       send(b);
+                   }
                 }
             } catch (IOException e) {
                 LOG.error("Failed to send last message. Shutting down thread.", e);
@@ -651,7 +663,7 @@ public class QuorumCnxManager {
                         ArrayBlockingQueue<ByteBuffer> bq = queueSendMap
                                 .get(sid);
                         if (bq != null) {
-                            b = bq.poll(1000, TimeUnit.MILLISECONDS);
+                            b = pollSendQueue(bq, 1000, TimeUnit.MILLISECONDS);
                         } else {
                             LOG.error("No queue of incoming messages for " +
                                       "server " + sid);
@@ -742,7 +754,7 @@ public class QuorumCnxManager {
                     byte[] msgArray = new byte[length];
                     din.readFully(msgArray, 0, length);
                     ByteBuffer message = ByteBuffer.wrap(msgArray);
-                    recvQueue.put(new Message(message.duplicate(), sid));
+                    addToRecvQueue(new Message(message.duplicate(), sid));
                 }
             } catch (Exception e) {
                 LOG.warn("Connection broken for id " + sid + ", my id = " + 
@@ -756,4 +768,116 @@ public class QuorumCnxManager {
             }
         }
     }
+
+    /**
+     * Inserts an element in the specified queue. If the Queue is full, this
+     * method removes an element from the head of the Queue and then inserts
+     * the element at the tail. It can happen that the an element is removed
+     * by another thread in {@link SendWorker#processMessage() processMessage}
+     * method before this method attempts to remove an element from the queue.
+     * This will cause {@link ArrayBlockingQueue#remove() remove} to throw an
+     * exception, which is safe to ignore.
+     *
+     * Unlike {@link #addToRecvQueue(Message) addToRecvQueue} this method does
+     * not need to be synchronized since there is only one thread that inserts
+     * an element in the queue and another thread that reads from the queue.
+     *
+     * @param queue
+     *          Reference to the Queue
+     * @param buffer
+     *          Reference to the buffer to be inserted in the queue
+     */
+    private void addToSendQueue(ArrayBlockingQueue<ByteBuffer> queue,
+          ByteBuffer buffer) {
+        if (queue.remainingCapacity() == 0) {
+            try {
+                queue.remove();
+            } catch (NoSuchElementException ne) {
+                // element could be removed by poll()
+                LOG.debug("Trying to remove from an empty " +
+                        "Queue. Ignoring exception " + ne);
+            }
+        }
+        try {
+            queue.add(buffer);
+        } catch (IllegalStateException ie) {
+            // This should never happen
+            LOG.error("Unable to insert an element in the queue " + ie);
+        }
+    }
+
+    /**
+     * Returns true if queue is empty.
+     * @param queue
+     *          Reference to the queue
+     * @return
+     *      true if the specified queue is empty
+     */
+    private boolean isSendQueueEmpty(ArrayBlockingQueue<ByteBuffer> queue) {
+        return queue.isEmpty();
+    }
+
+    /**
+     * Retrieves and removes buffer at the head of this queue,
+     * waiting up to the specified wait time if necessary for an element to
+     * become available.
+     *
+     * {@link ArrayBlockingQueue#poll(long, java.util.concurrent.TimeUnit)}
+     */
+    private ByteBuffer pollSendQueue(ArrayBlockingQueue<ByteBuffer> queue,
+          long timeout, TimeUnit unit) throws InterruptedException {
+       return queue.poll(timeout, unit);
+    }
+
+    /**
+     * Inserts an element in the {@link #recvQueue}. If the Queue is full, this
+     * methods removes an element from the head of the Queue and then inserts
+     * the element at the tail of the queue.
+     *
+     * This method is synchronized to achieve fairness between two threads that
+     * are trying to insert an element in the queue. Each thread checks if the
+     * queue is full, then removes the element at the head of the queue, and
+     * then inserts an element at the tail. This three-step process is done to
+     * prevent a thread from blocking while inserting an element in the queue.
+     * If we do not synchronize the call to this method, then a thread can grab
+     * a slot in the queue created by the second thread. This can cause the call
+     * to insert by the second thread to fail.
+     * Note that synchronizing this method does not block another thread
+     * from polling the queue since that synchronization is provided by the
+     * queue itself.
+     *
+     * @param msg
+     *          Reference to the message to be inserted in the queue
+     */
+    public void addToRecvQueue(Message msg) {
+        synchronized(recvQLock) {
+            if (recvQueue.remainingCapacity() == 0) {
+                try {
+                    recvQueue.remove();
+                } catch (NoSuchElementException ne) {
+                    // element could be removed by poll()
+                     LOG.debug("Trying to remove from an empty " +
+                         "recvQueue. Ignoring exception " + ne);
+                }
+            }
+            try {
+                recvQueue.add(msg);
+            } catch (IllegalStateException ie) {
+                // This should never happen
+                LOG.error("Unable to insert element in the recvQueue " + ie);
+            }
+        }
+    }
+
+    /**
+     * Retrieves and removes a message at the head of this queue,
+     * waiting up to the specified wait time if necessary for an element to
+     * become available.
+     *
+     * {@link ArrayBlockingQueue#poll(long, java.util.concurrent.TimeUnit)}
+     */
+    public Message pollRecvQueue(long timeout, TimeUnit unit)
+       throws InterruptedException {
+       return recvQueue.poll(timeout, unit);
+    }
 }

Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java?rev=1097865&r1=1097864&r2=1097865&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java Fri Apr 29 16:12:44 2011
@@ -139,6 +139,11 @@ public class QuorumPeer extends Thread i
      */
     
     static final long OBSERVER_ID = Long.MAX_VALUE;
+
+    /*
+     * Record leader election time
+     */
+    public long start_fle, end_fle;
     
     /*
      * Default value of peer is participant
@@ -573,7 +578,8 @@ public class QuorumPeer extends Thread i
 
     @Override
     public void run() {
-        setName("QuorumPeer:" + cnxnFactory.getLocalAddress());
+        setName("QuorumPeer" + "[myid=" + getId() + "]" +
+                cnxnFactory.getLocalAddress());
 
         LOG.debug("Starting quorum peer");
         try {

Modified: zookeeper/trunk/src/java/test/org/apache/zookeeper/test/CnxManagerTest.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/CnxManagerTest.java?rev=1097865&r1=1097864&r2=1097865&view=diff
==============================================================================
--- zookeeper/trunk/src/java/test/org/apache/zookeeper/test/CnxManagerTest.java (original)
+++ zookeeper/trunk/src/java/test/org/apache/zookeeper/test/CnxManagerTest.java Fri Apr 29 16:12:44 2011
@@ -112,7 +112,7 @@ public class CnxManagerTest extends ZKTe
                 Message m = null;
                 int numRetries = 1;
                 while((m == null) && (numRetries++ <= THRESHOLD)){
-                    m = cnxManager.recvQueue.poll(3000, TimeUnit.MILLISECONDS);
+                    m = cnxManager.pollRecvQueue(3000, TimeUnit.MILLISECONDS);
                     if(m == null) cnxManager.connectAll();
                 }
 
@@ -123,7 +123,7 @@ public class CnxManagerTest extends ZKTe
 
                 cnxManager.testInitiateConnection(sid);
 
-                m = cnxManager.recvQueue.poll(3000, TimeUnit.MILLISECONDS);
+                m = cnxManager.pollRecvQueue(3000, TimeUnit.MILLISECONDS);
                 if(m == null){
                     failed = true;
                     return;
@@ -155,7 +155,7 @@ public class CnxManagerTest extends ZKTe
         Message m = null;
         int numRetries = 1;
         while((m == null) && (numRetries++ <= THRESHOLD)){
-            m = cnxManager.recvQueue.poll(3000, TimeUnit.MILLISECONDS);
+            m = cnxManager.pollRecvQueue(3000, TimeUnit.MILLISECONDS);
             if(m == null) cnxManager.connectAll();
         }
         

Modified: zookeeper/trunk/src/java/test/org/apache/zookeeper/test/FLETest.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/FLETest.java?rev=1097865&r1=1097864&r2=1097865&view=diff
==============================================================================
--- zookeeper/trunk/src/java/test/org/apache/zookeeper/test/FLETest.java (original)
+++ zookeeper/trunk/src/java/test/org/apache/zookeeper/test/FLETest.java Fri Apr 29 16:12:44 2011
@@ -313,4 +313,94 @@ public class FLETest extends ZKTestCase 
            Assert.fail("Leader hasn't joined: " + leader);
        }
     }
+
+    /*
+     * Class to verify of the thread has become a follower
+     */
+    class VerifyState extends Thread {
+        volatile private boolean success = false;
+        QuorumPeer peer;
+        public VerifyState(QuorumPeer peer) {
+            this.peer = peer;
+        }
+        public void run() {
+            setName("VerifyState-" + peer.getId());
+            while (true) {
+                if(peer.getPeerState() == ServerState.FOLLOWING) {
+                    LOG.info("I am following");
+                    success = true;
+                    break;
+                } else if (peer.getPeerState() == ServerState.LEADING) {
+                    LOG.info("I am leading");
+                    success = false;
+                    break;
+                }
+                try {
+                    Thread.sleep(250);
+                } catch (Exception e) {
+                    LOG.warn("Sleep failed ", e);
+                }
+            }
+        }
+        public boolean isSuccess() {
+            return success;
+        }
+    }
+
+    /*
+     * For ZOOKEEPER-975 verify that a peer joining an established cluster
+     * does not go in LEADING state.
+     */
+    @Test
+    public void testJoin() throws Exception {
+        int sid;
+        QuorumPeer peer;
+        int waitTime = 10 * 1000;
+        ArrayList<QuorumPeer> peerList = new ArrayList<QuorumPeer>();
+        for(sid = 0; sid < 3; sid++) {
+            peers.put(Long.valueOf(sid),
+                    new QuorumServer(sid,
+                            new InetSocketAddress(PortAssignment.unique()),
+                    new InetSocketAddress(PortAssignment.unique())));
+            tmpdir[sid] = ClientBase.createTmpDir();
+            port[sid] = PortAssignment.unique();
+        }
+        // start 2 peers and verify if they form the cluster
+        for (sid = 0; sid < 2; sid++) {
+            peer = new QuorumPeer(peers, tmpdir[sid], tmpdir[sid],
+                                             port[sid], 3, sid, 2000, 2, 2);
+            LOG.info("Starting peer " + peer.getId());
+            peer.start();
+            peerList.add(sid, peer);
+        }
+        peer = peerList.get(0);
+        VerifyState v1 = new VerifyState(peerList.get(0));
+        v1.start();
+        v1.join(waitTime);
+        Assert.assertFalse("Unable to form cluster in " +
+            waitTime + " ms",
+            !v1.isSuccess());
+        // Start 3rd peer and check if it goes in LEADING state
+        peer = new QuorumPeer(peers, tmpdir[sid], tmpdir[sid],
+                 port[sid], 3, sid, 2000, 2, 2);
+        LOG.info("Starting peer " + peer.getId());
+        peer.start();
+        peerList.add(sid, peer);
+        v1 = new VerifyState(peer);
+        v1.start();
+        v1.join(waitTime);
+        if (v1.isAlive()) {
+               Assert.fail("Peer " + peer.getId() + " failed to join the cluster " +
+                "within " + waitTime + " ms");
+        } else if (!v1.isSuccess()) {
+               Assert.fail("Incorrect LEADING state for peer " + peer.getId());
+        }
+        // cleanup
+        for (int id = 0; id < 3; id++) {
+            peer = peerList.get(id);
+            if (peer != null) {
+                peer.shutdown();
+            }
+        }
+    }
 }