You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by ma...@apache.org on 2008/12/12 21:21:30 UTC

svn commit: r726110 - in /hadoop/zookeeper/trunk: CHANGES.txt src/java/main/org/apache/zookeeper/server/quorum/FastLeaderElection.java src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java

Author: mahadev
Date: Fri Dec 12 12:21:30 2008
New Revision: 726110

URL: http://svn.apache.org/viewvc?rev=726110&view=rev
Log:
ZOOKEEPER-230. Improvements to FLE. (Flavio via mahadev)

Modified:
    hadoop/zookeeper/trunk/CHANGES.txt
    hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FastLeaderElection.java
    hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java

Modified: hadoop/zookeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/CHANGES.txt?rev=726110&r1=726109&r2=726110&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/CHANGES.txt (original)
+++ hadoop/zookeeper/trunk/CHANGES.txt Fri Dec 12 12:21:30 2008
@@ -73,6 +73,7 @@
    ZOOKEEPER-247. fix formatting of C API in ACL section of programmer guide.
    (patrick hunt via mahadev)
 
+   ZOOKEEPER-230. Improvements to FLE. (Flavio via mahadev)
 
 Release 3.0.0 - 2008-10-21
 

Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FastLeaderElection.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FastLeaderElection.java?rev=726110&r1=726109&r2=726110&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FastLeaderElection.java (original)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FastLeaderElection.java Fri Dec 12 12:21:30 2008
@@ -46,22 +46,21 @@
 public class FastLeaderElection implements Election {
     private static final Logger LOG = Logger.getLogger(FastLeaderElection.class);
 
-	/* Sequence numbers for messages */
-    static int sequencer = 0;
-
     /**
      * Determine how much time a process has to wait
      * once it believes that it has reached the end of
      * leader election.
      */
-    static int finalizeWait = 200;
+    final static int finalizeWait = 200;
 
-    /**
-	 * Challenge counter to avoid replay attacks
+
+	/**
+	 * Upper bound on the amount of time between two consecutive
+	 * notification checks. This impacts the amount of time to get
+	 * the system up again after long partitions. Currently 60 seconds. 
 	 */
 	
-	static int challengeCounter = 0;
-	
+    final static int maxNotificationInterval = 60000;
     
 	/**
 	 * Connection manager. Fast leader election uses TCP for 
@@ -509,6 +508,8 @@
 
         HashMap<Long, Vote> outofelection = new HashMap<Long, Vote>();
 
+        int notTimeout = finalizeWait;
+        
         synchronized(this){
             logicalclock++;
             updateProposal(self.getId(), self.getLastLoggedZxid());
@@ -526,7 +527,7 @@
              * Remove next notification from queue, times out after 2 times
              * the termination time
              */
-            Notification n = recvqueue.poll(2*finalizeWait, TimeUnit.MILLISECONDS);
+            Notification n = recvqueue.poll(notTimeout, TimeUnit.MILLISECONDS);
             
             /*
              * Sends more notifications if haven't received enough.
@@ -535,89 +536,104 @@
             if(n == null){
             	if(manager.haveDelivered()){
             		sendNotifications();
+            	} else {
+            	    manager.connectAll();
             	}
+            	
+            	/*
+            	 * Exponential backoff
+            	 */
+            	int tmpTimeOut = notTimeout*2;
+            	notTimeout = (tmpTimeOut < maxNotificationInterval? tmpTimeOut : maxNotificationInterval);
+            	LOG.info("Notification time out: " + notTimeout);
             }
-            else switch (n.state) {
-            case LOOKING:
-                // If notification > current, replace and send messages out
-                LOG.info("Notification: " + n.leader + ", " + n.zxid + ", " + 
-                        n.epoch + ", " + self.getId() + ", " + self.getPeerState() + 
-                        ", " + n.state + ", " + n.sid);
-                if (n.epoch > logicalclock) {
-                    logicalclock = n.epoch;
-                    recvset.clear();
-                    updateProposal(self.getId(), self.getLastLoggedZxid());
-                    sendNotifications();
-                } else if (n.epoch < logicalclock) {
-                    break;
-                } else if (totalOrderPredicate(n.leader, n.zxid)) {
-                    updateProposal(n.leader, n.zxid);
-                    sendNotifications();
-                }
+            else { 
+                //notTimeout = finalizeWait;
+                switch (n.state) { 
+                case LOOKING:
+                    // If notification > current, replace and send messages out
+                    LOG.info("Notification: " + n.leader + ", " + n.zxid + ", " + 
+                            n.epoch + ", " + self.getId() + ", " + self.getPeerState() + 
+                            ", " + n.state + ", " + n.sid);
+                    if (n.epoch > logicalclock) {
+                        logicalclock = n.epoch;
+                        recvset.clear();
+                        updateProposal(self.getId(), self.getLastLoggedZxid());
+                        sendNotifications();
+                    } else if (n.epoch < logicalclock) {
+                        break;
+                    } else if (totalOrderPredicate(n.leader, n.zxid)) {
+                        updateProposal(n.leader, n.zxid);
+                        sendNotifications();
+                    }
                 
-                recvset.put(n.sid, new Vote(n.leader, n.zxid, n.epoch));
+                    recvset.put(n.sid, new Vote(n.leader, n.zxid, n.epoch));
 
-                //If have received from all nodes, then terminate
-                if (self.quorumPeers.size() == recvset.size()) {
-                    self.setPeerState((proposedLeader == self.getId()) ? 
-                            ServerState.LEADING: ServerState.FOLLOWING);
-                    leaveInstance();
-                    return new Vote(proposedLeader, proposedZxid);
-
-                } else if (termPredicate(recvset, new Vote(proposedLeader, proposedZxid, logicalclock))) {
-                    //Otherwise, wait for a fixed amount of time
-                    LOG.debug("Passed predicate");
-
-                    // Verify if there is any change in the proposed leader
-                    while((n = recvqueue.poll(finalizeWait, TimeUnit.MILLISECONDS)) != null){
-                        if(totalOrderPredicate(n.leader, n.zxid)){
-                            recvqueue.put(n);
-                            break;
-                        }
-                    }
-                    
-                    if (n == null) {
+                    //If have received from all nodes, then terminate
+                    if (self.quorumPeers.size() == recvset.size()) {
                         self.setPeerState((proposedLeader == self.getId()) ? 
                                 ServerState.LEADING: ServerState.FOLLOWING);
-                        LOG.info("About to leave instance:" + proposedLeader + ", " + proposedZxid + ", " + self.getId() + ", " + self.getPeerState());
                         leaveInstance();
-                        return new Vote(proposedLeader,
+                        return new Vote(proposedLeader, proposedZxid);
+                        
+                    } else if (termPredicate(recvset, new Vote(proposedLeader, proposedZxid, logicalclock))) {
+                        //Otherwise, wait for a fixed amount of time
+                        LOG.debug("Passed predicate");
+
+                        // Verify if there is any change in the proposed leader
+                        while((n = recvqueue.poll(finalizeWait, TimeUnit.MILLISECONDS)) != null){
+                            if(totalOrderPredicate(n.leader, n.zxid)){
+                                recvqueue.put(n);
+                                break;
+                            }
+                        }
+                    
+                        if (n == null) {
+                            self.setPeerState((proposedLeader == self.getId()) ? 
+                                ServerState.LEADING: ServerState.FOLLOWING);
+                            LOG.info("About to leave instance:" + proposedLeader + ", " + 
+                                    proposedZxid + ", " + self.getId() + ", " + self.getPeerState());
+                            leaveInstance();
+                            return new Vote(proposedLeader,
                                 proposedZxid);
+                        }
                     }
-                }
-                break;
-            case LEADING:
-                /*
-                 * There is at most one leader for each epoch, so if a peer claims to
-                 * be the leader for an epoch, then that peer must be the leader (no
-                 * arbitrary failures assumed). Now, if there is no quorum supporting 
-                 * this leader, then processes will naturally move to a new epoch.
-                 */
-                if(n.epoch == logicalclock){
-                    self.setPeerState((n.leader == self.getId()) ? 
-                            ServerState.LEADING: ServerState.FOLLOWING);
+                    break;
+                case LEADING:
+                    /*
+                     * There is at most one leader for each epoch, so if a peer claims to
+                     * be the leader for an epoch, then that peer must be the leader (no
+                     * arbitrary failures assumed). Now, if there is no quorum supporting 
+                     * this leader, then processes will naturally move to a new epoch.
+                     */
+                    if(n.epoch == logicalclock){
+                        self.setPeerState((n.leader == self.getId()) ? 
+                                ServerState.LEADING: ServerState.FOLLOWING);
                    
-                    leaveInstance();
-                    return new Vote(n.leader, n.zxid);
-                }
-            case FOLLOWING:
-                LOG.info("Notification: " + n.leader + ", " + n.zxid + ", " + n.epoch + ", " + self.getId() + ", " + self.getPeerState() + ", " + n.state + ", " + n.sid);
+                        leaveInstance();
+                        return new Vote(n.leader, n.zxid);
+                    }
+                case FOLLOWING:
+                    LOG.info("Notification: " + n.leader + ", " + n.zxid + 
+                            ", " + n.epoch + ", " + self.getId() + ", " + 
+                            self.getPeerState() + ", " + n.state + ", " + n.sid);
        
-                outofelection.put(n.sid, new Vote(n.leader, n.zxid, n.epoch, n.state));
+                    outofelection.put(n.sid, new Vote(n.leader, n.zxid, n.epoch, n.state));
 
-                if (termPredicate(outofelection, new Vote(n.leader, n.zxid, n.epoch, n.state))
-                        && checkLeader(outofelection, n.leader, n.epoch)) {
-                    synchronized(this){
-                        logicalclock = n.epoch;
-                        self.setPeerState((n.leader == self.getId()) ? 
-                            ServerState.LEADING: ServerState.FOLLOWING);
+                    if (termPredicate(outofelection, new Vote(n.leader, n.zxid, n.epoch, n.state))
+                            && checkLeader(outofelection, n.leader, n.epoch)) {
+                        synchronized(this){
+                            logicalclock = n.epoch;
+                            self.setPeerState((n.leader == self.getId()) ? 
+                                    ServerState.LEADING: ServerState.FOLLOWING);
+                        }
+                        leaveInstance();
+                        return new Vote(n.leader, n.zxid);
                     }
-                    leaveInstance();
-                    return new Vote(n.leader, n.zxid);
+                    break;
+                default:
+                    break;
                 }
-                break;
-            default:
-                break;
             }
         }
 

Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java?rev=726110&r1=726109&r2=726110&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java (original)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java Fri Dec 12 12:21:30 2008
@@ -25,10 +25,10 @@
 import java.nio.ByteBuffer;
 import java.nio.channels.ServerSocketChannel;
 import java.nio.channels.SocketChannel;
-import java.util.HashMap;
 import java.util.Random;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.Enumeration;
 
 import org.apache.log4j.Logger;
 
@@ -120,7 +120,6 @@
     }
 
     public QuorumCnxManager(QuorumPeer self) {
-        this.port = port;
         this.recvQueue = new ArrayBlockingQueue<Message>(CAPACITY);
         this.queueSendMap = new ConcurrentHashMap<Long, ArrayBlockingQueue<ByteBuffer>>();
         this.senderWorkerMap = new ConcurrentHashMap<Long, SendWorker>();
@@ -153,11 +152,7 @@
      */
 
     boolean initiateConnection(SocketChannel s, Long sid) {
-        boolean challenged = true;
-        boolean wins = false;
-        long newChallenge;
-        
-        try {
+         try {
             // Sending id and challenge
             byte[] msgBytes = new byte[8];
             ByteBuffer msgBuffer = ByteBuffer.wrap(msgBytes);
@@ -173,11 +168,11 @@
         // If lost the challenge, then drop the new connection
         if (sid > self.getId()) {
             try {
-                LOG.warn("Have smaller server identifier, so dropping the connection: (" + 
-                        sid + ", " + self.getId());
+                LOG.info("Have smaller server identifier, so dropping the connection: (" + 
+                        sid + ", " + self.getId() + ")");
                 s.socket().close();
             } catch (IOException e) {
-                LOG.warn("Error when closing socket or trying to reopen connection: "
+                LOG.error("Error when closing socket or trying to reopen connection: "
                                 + e.toString());
 
             }
@@ -220,9 +215,6 @@
      * 
      */
     boolean receiveConnection(SocketChannel s) {
-        boolean challenged = true;
-        boolean wins = false;
-        long newChallenge;
         Long sid = null;
         
         try {
@@ -236,7 +228,7 @@
             // Read server id
             sid = Long.valueOf(msgBuffer.getLong());
         } catch (IOException e) {
-            LOG.warn("Exception reading or writing challenge: "
+            LOG.info("Exception reading or writing challenge: "
                     + e.toString());
             return false;
         }
@@ -246,7 +238,7 @@
             try {
                 SendWorker sw = senderWorkerMap.get(sid);
 
-                LOG.warn("Create new connection");
+                LOG.info("Create new connection to server: " + sid);
                 //sw.connect();
                 s.socket().close();
                 if(sw != null) sw.finish();
@@ -256,7 +248,7 @@
                 }
                 
             } catch (IOException e) {
-                LOG.warn("Error when closing socket or trying to reopen connection: "
+                LOG.info("Error when closing socket or trying to reopen connection: "
                                 + e.toString());
             }
         //Otherwise start worker threads to receive data.
@@ -290,8 +282,8 @@
     }
 
     /**
-     * Processes invoke this message to send a message. Currently, only leader
-     * election uses it.
+     * Processes invoke this message to queue a message to send. Currently, 
+     * only leader election uses it.
      */
     void toSend(Long sid, ByteBuffer b) {
         /*
@@ -322,39 +314,64 @@
                         queueSendMap.get(sid).take();
                     }
                     queueSendMap.get(sid).put(b);
-                }
+                } 
+                
+                connectOne(sid);
                 
-                //synchronized (senderWorkerMap) {
-                    if ((senderWorkerMap.get(sid) == null)) {
-                        SocketChannel channel;
-                        try {
-                            channel = SocketChannel
-                                    .open(self.quorumPeers.get(sid).electionAddr);
-                            channel.socket().setTcpNoDelay(true);
-                            initiateConnection(channel, sid);
-                        } catch (IOException e) {
-                            LOG.warn("Cannot open channel to "
-                                    + sid + "( " + e.toString()
-                                    + ")");
-                        }
-                    }
-                //}     
             } catch (InterruptedException e) {
                 LOG.warn("Interrupted while waiting to put message in queue."
                                 + e.toString());
             }
     }
+    
+    /**
+     * Try to establish a connection to server with id sid.
+     * 
+     *  @param sid  server id
+     */
+    
+    void connectOne(long sid){
+        if ((senderWorkerMap.get(sid) == null)) {
+            SocketChannel channel;
+            try {
+                channel = SocketChannel
+                        .open(self.quorumPeers.get(sid).electionAddr);
+                channel.socket().setTcpNoDelay(true);
+                initiateConnection(channel, sid);
+            } catch (IOException e) {
+                LOG.warn("Cannot open channel to "
+                        + sid + "( " + e.toString()
+                        + ")");
+            }
+        }
+    }
+    
+    
+    /**
+     * Try to establish a connection with each server if one
+     * doesn't exist.
+     */
+    
+    void connectAll(){
+        long sid;
+        for(Enumeration<Long> en = queueSendMap.keys();
+            en.hasMoreElements();){
+            sid = en.nextElement();
+            connectOne(sid);
+        }      
+    }
+    
 
     /**
      * Check if all queues are empty, indicating that all messages have been delivered.
      */
     boolean haveDelivered() {
         for (ArrayBlockingQueue<ByteBuffer> queue : queueSendMap.values()) {
-            if (queue.size() == 0)
-                return true;
+            if (queue.size() != 0)
+                return false;
         }
 
-        return false;
+        return true;
     }
 
     /**