You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by ma...@apache.org on 2008/08/25 23:13:01 UTC

svn commit: r688885 - in /hadoop/zookeeper/trunk/src/java: ./ main/org/apache/zookeeper/server/quorum/

Author: mahadev
Date: Mon Aug 25 14:13:01 2008
New Revision: 688885

URL: http://svn.apache.org/viewvc?rev=688885&view=rev
Log:
ZOOKEEPER-2. Fix synchronization issues in QuorumPeer and FastLeader election. (Flavio Paiva Junqueira via mahadev)

Modified:
    hadoop/zookeeper/trunk/src/java/Changes.txt
    hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/AuthFastLeaderElection.java
    hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FastLeaderElection.java
    hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Follower.java
    hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LeaderElection.java
    hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java
    hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java

Modified: hadoop/zookeeper/trunk/src/java/Changes.txt
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/Changes.txt?rev=688885&r1=688884&r2=688885&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/Changes.txt (original)
+++ hadoop/zookeeper/trunk/src/java/Changes.txt Mon Aug 25 14:13:01 2008
@@ -18,3 +18,6 @@
   
  ZOOKEEPER-123. Fix  the wrong class is specified for the logger. (Jakob Homan
  via mahadev)
+
+ ZOOKEEPER-2. Fix synchronization issues in QuorumPeer and FastLeader
+ election. (Flavio Paiva Junqueira via mahadev)

Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/AuthFastLeaderElection.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/AuthFastLeaderElection.java?rev=688885&r1=688884&r2=688885&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/AuthFastLeaderElection.java (original)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/AuthFastLeaderElection.java Mon Aug 25 14:13:01 2008
@@ -264,11 +264,13 @@
                         break;
                     }
 
+                    Vote current = self.getCurrentVote();
+
                     switch (type) {
                     case 0:
                         // Receive challenge request
                         ToSend c = new ToSend(ToSend.mType.challenge, tag,
-                                self.currentVote.id, self.currentVote.zxid,
+                                current.id, current.zxid,
                                 logicalclock, self.getPeerState(),
                                 (InetSocketAddress) responsePacket
                                         .getSocketAddress());
@@ -309,8 +311,8 @@
                                     recvqueue.offer(n);
 
                                     ToSend a = new ToSend(ToSend.mType.ack,
-                                            tag, self.currentVote.id,
-                                            self.currentVote.zxid,
+                                            tag, current.id,
+                                            current.zxid,
                                             logicalclock, self.getPeerState(),
                                             (InetSocketAddress) addr);
 
@@ -328,7 +330,7 @@
                             recvqueue.offer(n);
 
                             ToSend a = new ToSend(ToSend.mType.ack, tag,
-                                    self.currentVote.id, self.currentVote.zxid,
+                                    current.id, current.zxid,
                                     logicalclock, self.getPeerState(),
                                     (InetSocketAddress) responsePacket
                                             .getSocketAddress());
@@ -685,7 +687,7 @@
 
     QuorumPeer self;
     int port;
-    long logicalclock; /* Election instance */
+    volatile long logicalclock; /* Election instance */
     DatagramSocket mySocket;
     long proposedLeader;
     long proposedZxid;

Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FastLeaderElection.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FastLeaderElection.java?rev=688885&r1=688884&r2=688885&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FastLeaderElection.java (original)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FastLeaderElection.java Mon Aug 25 14:13:01 2008
@@ -49,26 +49,36 @@
 	/* Sequence numbers for messages */
     static int sequencer = 0;
 
-    /*
+    /**
      * Determine how much time a process has to wait
      * once it believes that it has reached the end of
      * leader election.
      */
     static int finalizeWait = 100;
 
-    /*
+    /**
 	 * Challenge counter to avoid replay attacks
 	 */
 	
 	static int challengeCounter = 0;
 	
     
-	/*
-	 * Connection manager
+	/**
+	 * Connection manager. Fast leader election uses TCP for 
+	 * communication between peers, and QuorumCnxManager manages
+	 * such connections. 
 	 */
 	
 	QuorumCnxManager manager;
 
+	
+	/**
+	 * Notifications are messages that let other peers know that
+	 * a given peer has changed its vote, either because it has
+	 * joined leader election or because it learned of another 
+	 * peer with higher zxid or same zxid and higher server id
+	 */
+	
     static public class Notification {
         /*
          * Proposed leader
@@ -96,8 +106,10 @@
         InetAddress addr;
     }
 
-    /*
-     * Messages to send, both Notifications and Acks
+    /**
+     * Messages that a peer wants to send to other peers.
+     * These messages can be both Notifications and Acks
+     * of reception of notification.
      */
     static public class ToSend {
     	static enum mType {crequest, challenge, notification, ack};
@@ -145,12 +157,24 @@
     LinkedBlockingQueue<ToSend> sendqueue;
     LinkedBlockingQueue<Notification> recvqueue;
 
+    /**
+     * Multi-threaded implementation of message handler. Messenger
+     * implements two sub-classes: WorkReceiver and  WorkSender. The
+     * functionality of each is obvious from the name. Each of these
+     * spawns a new thread.
+     */
+    
     private class Messenger {
     	
         long lastProposedLeader;
         long lastProposedZxid;
         long lastEpoch;
         
+        /**
+         * Receives messages from instance of QuorumCnxManager on
+         * method run(), and processes such messages.
+         */
+        
         class WorkerReceiver implements Runnable {
 
         	QuorumCnxManager manager;
@@ -175,7 +199,7 @@
             			}
             			response.buffer.clear();
                
-
+            			// State of peer that sent this message
             			QuorumPeer.ServerState ackstate = QuorumPeer.ServerState.LOOKING;
             			switch (response.buffer.getInt()) {
             			case 0:
@@ -189,6 +213,7 @@
             				break;
             			}
                     	
+            			// Instantiate Notification and set its attributes
             			Notification n = new Notification();
             			n.leader = response.buffer.getLong();
             			n.zxid = response.buffer.getLong();
@@ -196,6 +221,12 @@
             			n.state = ackstate;
             			n.addr = response.addr;
 
+            			/*
+            			 * Accept the values of this notification
+            			 * if we are at right epoch and the new notification
+            			 * contains a vote that succeeds our current vote
+            			 * in our order of votes.
+            			 */
             			if ((messenger.lastEpoch <= n.epoch)
             					&& ((n.zxid > messenger.lastProposedZxid) 
             					|| ((n.zxid == messenger.lastProposedZxid) 
@@ -205,10 +236,18 @@
             				messenger.lastEpoch = n.epoch;
             			}
 
-            			//InetAddress addr = (InetAddress) responsePacket.getSocketAddress();
+            			/*
+            			 * If this server is looking, then send proposed leader
+            			 */
+
             			if(self.getPeerState() == QuorumPeer.ServerState.LOOKING){
             				recvqueue.offer(n);
-            				if(recvqueue.size() == 0) LOG.warn("Message: " + n.addr);
+            				if(recvqueue.size() == 0) LOG.debug("Message: " + n.addr);
+            				/*
+            				 * Send a notification back if the peer that sent this
+            				 * message is also looking and its logical clock is 
+            				 * lagging behind.
+            				 */
             				if((ackstate == QuorumPeer.ServerState.LOOKING)
             						&& (n.epoch < logicalclock)){
             					ToSend notmsg = new ToSend(ToSend.mType.notification, 
@@ -219,16 +258,21 @@
                 						response.addr);
                 				sendqueue.offer(notmsg);
             				}
-            			} else { 	           				
-            				if((ackstate == QuorumPeer.ServerState.LOOKING) &&
-            						(self.getPeerState() != QuorumPeer.ServerState.LOOKING)){
-            					ToSend notmsg = new ToSend(ToSend.mType.notification, 
-            						self.currentVote.id, 
-            						self.currentVote.zxid,
-            						logicalclock,
-            						self.getPeerState(),
-            						response.addr);
-            					sendqueue.offer(notmsg);
+            			} else {
+            			    /*
+            			     * If this server is not looking, but the one that sent the ack
+            			     * is looking, then send back what it believes to be the leader.
+            			     */
+            			    Vote current = self.getCurrentVote();
+            			    if(ackstate == QuorumPeer.ServerState.LOOKING){
+
+            			        ToSend notmsg = new ToSend(ToSend.mType.notification, 
+            			                current.id, 
+            			                current.zxid,
+            			                logicalclock,
+            			                self.getPeerState(),
+            			                response.addr);
+            			        sendqueue.offer(notmsg);
             				}
             			}
             			
@@ -240,6 +284,12 @@
             }
         }
 
+        
+        /**
+         * This worker simply dequeues a message to send and
+         * and queues it on the manager's queue. 
+         */
+        
         class WorkerSender implements Runnable {
         	
             QuorumCnxManager manager;
@@ -260,6 +310,11 @@
                 }
             }
 
+            /**
+             * Called by run() once there is a new message to send.
+             * 
+             * @param m     message to send
+             */
             private void process(ToSend m) {
                 byte requestBytes[] = new byte[28];
                 ByteBuffer requestBuffer = ByteBuffer.wrap(requestBytes);  
@@ -279,10 +334,18 @@
             }
         }
 
+        /**
+         * Test if both send and receive queues are empty.
+         */
         public boolean queueEmpty() {
             return (sendqueue.isEmpty() || recvqueue.isEmpty());
         }
 
+        /**
+         * Constructor of class Messenger.
+         * 
+         * @param manager   Connection manager
+         */
         Messenger(QuorumCnxManager manager) {
             lastProposedLeader = 0;
             lastProposedZxid = 0;
@@ -303,17 +366,36 @@
 
     QuorumPeer self;
     int port;
-    long logicalclock; /* Election instance */
+    volatile long logicalclock; /* Election instance */
     Messenger messenger;
     long proposedLeader;
     long proposedZxid;
 
-        
+    
+    /**
+     * Constructor of FastLeaderElection. It takes two parameters, one
+     * is the QuorumPeer object that instantiated this object, and the other
+     * is the connection manager. Such an object should be created only once 
+     * by each peer during an instance of the ZooKeeper service.
+     * 
+     * @param self  QuorumPeer that created this object
+     * @param manager   Connection manager
+     */
     public FastLeaderElection(QuorumPeer self, QuorumCnxManager manager){
     	this.manager = manager;
     	starter(self, manager);
     }
     
+    /**
+     * This method is invoked by the constructor. Because it is a
+     * part of the starting procedure of the object that must be on
+     * any constructor of this class, it is probably best to keep as
+     * a separate method. As we have a single constructor currently, 
+     * it is not strictly necessary to have it separate.
+     * 
+     * @param self      QuorumPeer that created this object
+     * @param manager   Connection manager   
+     */
     private void starter(QuorumPeer self, QuorumCnxManager manager) {
         this.self = self;
         proposedLeader = -1;
@@ -328,6 +410,7 @@
         recvqueue.clear();
     }
 
+    
     public static class ElectionResult {
         public Vote vote;
 
@@ -338,6 +421,9 @@
         public int winningCount;
     }
 
+    /**
+     * Send notifications to all peers upon a change in our vote
+     */
     private void sendNotifications() {
         for (QuorumServer server : self.quorumPeers) {
             InetAddress saddr = server.addr.getAddress();
@@ -353,6 +439,13 @@
         }
     }
 
+    /**
+     * Check if a pair (server id, zxid) succeeds our
+     * current vote.
+     * 
+     * @param id    Server identifier
+     * @param zxid  Last zxid observed by the issuer of this vote
+     */
     private boolean totalOrderPredicate(long id, long zxid) {
         if ((zxid > proposedZxid)
                 || ((zxid == proposedZxid) && (id > proposedLeader)))
@@ -362,6 +455,14 @@
 
     }
 
+    /**
+     * Termination predicate. Given a set of votes, determines if
+     * have sufficient to declare the end of the election round.
+     * 
+     *  @param votes    Set of votes
+     *  @param l        Identifier of the vote received last
+     *  @param zxid     zxid of the the vote received last
+     */
     private boolean termPredicate(
             HashMap<InetAddress, Vote> votes, long l,
             long zxid) {
@@ -384,6 +485,11 @@
 
     }
 
+    /**
+     * Starts a new round of leader election. Whenever our QuorumPeer 
+     * changes its state to LOOKING, this method is invoked, and it 
+     * sends notifications to al other peers.
+     */
     public Vote lookForLeader() throws InterruptedException {
         HashMap<InetAddress, Vote> recvset = new HashMap<InetAddress, Vote>();
 
@@ -394,7 +500,7 @@
         proposedLeader = self.getId();
         proposedZxid = self.getLastLoggedZxid();
 
-        LOG.warn("Election tally: " + proposedZxid);
+        LOG.warn("New election: " + proposedZxid);
         sendNotifications();
 
         /*
@@ -449,7 +555,7 @@
 
                 } else if (termPredicate(recvset, proposedLeader, proposedZxid)) {
                     //Otherwise, wait for a fixed amount of time
-                    LOG.warn("Passed predicate");
+                    LOG.debug("Passed predicate");
                     Thread.sleep(finalizeWait);
 
                     // Verify if there is any change in the proposed leader

Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Follower.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Follower.java?rev=688885&r1=688884&r2=688885&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Follower.java (original)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Follower.java Mon Aug 25 14:13:01 2008
@@ -114,15 +114,16 @@
     void followLeader() throws InterruptedException {
         InetSocketAddress addr = null;
         // Find the leader by id
+        Vote current = self.getCurrentVote();
         for (QuorumServer s : self.quorumPeers) {
-            if (s.id == self.currentVote.id) {
+            if (s.id == current.id) {
                 addr = s.addr;
                 break;
             }
         }
         if (addr == null) {
             LOG.warn("Couldn't find the leader with id = "
-                    + self.currentVote.id);
+                    + current.id);
         }
         LOG.info("Following " + addr);
         sock = new Socket();

Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LeaderElection.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LeaderElection.java?rev=688885&r1=688884&r2=688885&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LeaderElection.java (original)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LeaderElection.java Mon Aug 25 14:13:01 2008
@@ -100,7 +100,7 @@
     }
 
     public Vote lookForLeader() throws InterruptedException {
-        self.currentVote = new Vote(self.getId(), self.getLastLoggedZxid());
+        self.setCurrentVote(new Vote(self.getId(), self.getLastLoggedZxid()));
         // We are going to look for a leader by casting a vote for ourself
         byte requestBytes[] = new byte[4];
         ByteBuffer requestBuffer = ByteBuffer.wrap(requestBytes);
@@ -161,16 +161,17 @@
             }
             ElectionResult result = countVotes(votes);
             if (result.winner.id >= 0) {
-                self.currentVote = result.vote;
+                self.setCurrentVote(result.vote);
                 if (result.winningCount > (self.quorumPeers.size() / 2)) {
-                    self.currentVote = result.winner;
+                    self.setCurrentVote(result.winner);
                     s.close();
-                    self.setPeerState((self.currentVote.id == self.getId()) 
+                    Vote current = self.getCurrentVote();
+                    self.setPeerState((current.id == self.getId()) 
                             ? ServerState.LEADING: ServerState.FOLLOWING);
                     if (self.getPeerState() == ServerState.FOLLOWING) {
                         Thread.sleep(100);
                     }
-                    return self.currentVote;
+                    return current;
                 }
             }
             Thread.sleep(1000);

Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java?rev=688885&r1=688884&r2=688885&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java (original)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java Mon Aug 25 14:13:01 2008
@@ -493,6 +493,8 @@
             this.addr = channel.socket().getInetAddress();
             this.channel = channel;
             recvWorker = null;
+            
+            LOG.debug("Address of remote peer: " + this.addr);
         }
 
         void setRecv(RecvWorker recvWorker) {

Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java?rev=688885&r1=688884&r2=688885&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java (original)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java Mon Aug 25 14:13:01 2008
@@ -111,7 +111,15 @@
     /**
      * This is who I think the leader currently is.
      */
-    volatile Vote currentVote;
+    volatile private Vote currentVote;
+        
+    public synchronized Vote getCurrentVote(){
+        return currentVote;
+    }
+       
+    public synchronized void setCurrentVote(Vote v){
+        currentVote = v;
+    }    
 
     volatile boolean running = true;
 
@@ -167,10 +175,11 @@
                         responseBuffer.clear();
                         responseBuffer.getInt(); // Skip the xid
                         responseBuffer.putLong(myid);
-                        switch (state) {
+                        Vote current = getCurrentVote();
+                        switch (getPeerState()) {
                         case LOOKING:
-                            responseBuffer.putLong(currentVote.id);
-                            responseBuffer.putLong(currentVote.zxid);
+                            responseBuffer.putLong(current.id);
+                            responseBuffer.putLong(current.zxid);
                             break;
                         case LEADING:
                             responseBuffer.putLong(myid);
@@ -182,7 +191,7 @@
                             }
                             break;
                         case FOLLOWING:
-                            responseBuffer.putLong(currentVote.id);
+                            responseBuffer.putLong(current.id);
                             try {
                                 responseBuffer.putLong(follower.getZxid());
                             } catch (NullPointerException npe) {
@@ -205,11 +214,11 @@
 
     private ServerState state = ServerState.LOOKING;
 
-    public void setPeerState(ServerState newState){
+    public synchronized void setPeerState(ServerState newState){
         state=newState;
     }
 
-    public ServerState getPeerState(){
+    public synchronized ServerState getPeerState(){
         return state;
     }
 
@@ -364,14 +373,14 @@
          * Main loop
          */
         while (running) {
-            switch (state) {
+            switch (getPeerState()) {
             case LOOKING:
                 try {
                     LOG.info("LOOKING");
-                    currentVote = makeLEStrategy().lookForLeader();
+                    setCurrentVote(makeLEStrategy().lookForLeader());
                 } catch (Exception e) {
                     LOG.warn("Unexpected exception",e);
-                    state = ServerState.LOOKING;
+                    setPeerState(ServerState.LOOKING);
                 }
                 break;
             case FOLLOWING:
@@ -384,7 +393,7 @@
                 } finally {
                     follower.shutdown();
                     setFollower(null);
-                    state = ServerState.LOOKING;
+                    setPeerState(ServerState.LOOKING);
                 }
                 break;
             case LEADING:
@@ -400,7 +409,7 @@
                         leader.shutdown("Forcing shutdown");
                         setLeader(null);
                     }
-                    state = ServerState.LOOKING;
+                    setPeerState(ServerState.LOOKING);
                 }
                 break;
             }
@@ -504,7 +513,7 @@
     }
 
     public String getServerState() {
-        switch (state) {
+        switch (getPeerState()) {
         case LOOKING:
             return QuorumStats.Provider.LOOKING_STATE;
         case LEADING: