You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by br...@apache.org on 2013/03/07 07:01:50 UTC

svn commit: r1453693 [4/5] - in /zookeeper/trunk: ./ src/ src/c/ src/c/include/ src/c/src/ src/c/tests/ src/java/main/org/apache/zookeeper/ src/java/main/org/apache/zookeeper/cli/ src/java/main/org/apache/zookeeper/common/ src/java/main/org/apache/zook...

Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java?rev=1453693&r1=1453692&r2=1453693&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java Thu Mar  7 06:01:49 2013
@@ -176,7 +176,13 @@ public class QuorumCnxManager {
         try {
             // Sending id and challenge
             dout = new DataOutputStream(sock.getOutputStream());
+            // represents protocol version (in other words - message type)
+            dout.writeLong(0xffff0000);              
             dout.writeLong(self.getId());
+            String addr = self.getElectionAddress().getHostName() + ":" + self.getElectionAddress().getPort();
+            byte[] addr_bytes = addr.getBytes();
+            dout.writeInt(addr_bytes.length);
+            dout.write(addr_bytes);
             dout.flush();
         } catch (IOException e) {
             LOG.warn("Ignoring exception reading or writing challenge: ", e);
@@ -214,7 +220,6 @@ public class QuorumCnxManager {
         }
         return false;
     }
-
     
     
     /**
@@ -225,12 +230,33 @@ public class QuorumCnxManager {
      * 
      */
     public boolean receiveConnection(Socket sock) {
-        Long sid = null;
-        
+        Long sid = null, protocolVersion = null;
+        InetSocketAddress electionAddr;
         try {
-            // Read server id
             DataInputStream din = new DataInputStream(sock.getInputStream());
-            sid = din.readLong();
+            protocolVersion = din.readLong();
+            if (protocolVersion >= 0) { // this is a server id and not a protocol version
+               sid = protocolVersion;  
+                electionAddr = self.getVotingView().get(sid).electionAddr;
+            } else {
+                sid = din.readLong();
+                int num_remaining_bytes = din.readInt();
+                byte[] b = new byte[num_remaining_bytes];
+                int num_read = din.read(b);
+                if (num_read == num_remaining_bytes) {
+                    if (protocolVersion == 0xffff0000) {
+                        String addr = new String(b);
+                        String[] host_port = addr.split(":");
+                        electionAddr = new InetSocketAddress(host_port[0], Integer.parseInt(host_port[1]));                   
+                    } else {
+                        LOG.error("Got urecognized protocol version " + protocolVersion + " from " + sid);
+                        electionAddr = null;
+                    }
+                } else {
+                   LOG.error("Read only " + num_read + " bytes out of " + num_remaining_bytes + " sent by server " + sid);
+                   electionAddr = null;                
+                }
+            } 
             if (sid == QuorumPeer.OBSERVER_ID) {
                 /*
                  * Choose identifier at random. We need a value to identify
@@ -263,7 +289,7 @@ public class QuorumCnxManager {
              */
             LOG.debug("Create new connection to server: " + sid);
             closeSocket(sock);
-            connectOne(sid);
+            connectOne(sid, electionAddr);
 
             // Otherwise start worker threads to receive data.
         } else {
@@ -329,47 +355,75 @@ public class QuorumCnxManager {
     }
     
     /**
+     * Try to establish a connection to server with id sid using its electionAddr.
+     * 
+     *  @param sid  server id
+     *  @return boolean success indication
+     */
+    synchronized boolean connectOne(long sid, InetSocketAddress electionAddr){
+        if (senderWorkerMap.get(sid) != null) {
+            LOG.debug("There is a connection already for server " + sid);
+            return true;
+        }
+        try {
+
+             if (LOG.isDebugEnabled()) {
+                 LOG.debug("Opening channel to server " + sid);
+             }
+             Socket sock = new Socket();
+             setSockOpts(sock);
+             sock.connect(electionAddr, cnxTO);
+             if (LOG.isDebugEnabled()) {
+                 LOG.debug("Connected to server " + sid);
+             }
+             initiateConnection(sock, sid);
+             return true;
+         } catch (UnresolvedAddressException e) {
+             // Sun doesn't include the address that causes this
+             // exception to be thrown, also UAE cannot be wrapped cleanly
+             // so we log the exception in order to capture this critical
+             // detail.
+             LOG.warn("Cannot open channel to " + sid
+                     + " at election address " + electionAddr, e);
+             throw e;
+         } catch (IOException e) {
+             LOG.warn("Cannot open channel to " + sid
+                     + " at election address " + electionAddr,
+                     e);
+             return false;
+         }
+   
+    }
+    
+    /**
      * Try to establish a connection to server with id sid.
      * 
      *  @param sid  server id
      */
     
     synchronized void connectOne(long sid){
-        if (senderWorkerMap.get(sid) == null){
-            InetSocketAddress electionAddr;
+        if (senderWorkerMap.get(sid) != null) {
+             LOG.debug("There is a connection already for server " + sid);
+             return;
+        }
+        synchronized(self) {
+           boolean knownId = false;
             if (self.getView().containsKey(sid)) {
-                electionAddr = self.getView().get(sid).electionAddr;
-            } else {
+               knownId = true;
+                if (connectOne(sid, self.getView().get(sid).electionAddr))
+                   return;
+            } 
+            if (self.getLastSeenQuorumVerifier()!=null && self.getLastSeenQuorumVerifier().getAllMembers().containsKey(sid)
+                   && (!knownId || (self.getLastSeenQuorumVerifier().getAllMembers().get(sid).electionAddr !=
+                   self.getView().get(sid).electionAddr))) {
+               knownId = true;
+                if (connectOne(sid, self.getLastSeenQuorumVerifier().getAllMembers().get(sid).electionAddr))
+                   return;
+            } 
+            if (!knownId) {
                 LOG.warn("Invalid server id: " + sid);
                 return;
             }
-            try {
-
-                if (LOG.isDebugEnabled()) {
-                    LOG.debug("Opening channel to server " + sid);
-                }
-                Socket sock = new Socket();
-                setSockOpts(sock);
-                sock.connect(self.getView().get(sid).electionAddr, cnxTO);
-                if (LOG.isDebugEnabled()) {
-                    LOG.debug("Connected to server " + sid);
-                }
-                initiateConnection(sock, sid);
-            } catch (UnresolvedAddressException e) {
-                // Sun doesn't include the address that causes this
-                // exception to be thrown, also UAE cannot be wrapped cleanly
-                // so we log the exception in order to capture this critical
-                // detail.
-                LOG.warn("Cannot open channel to " + sid
-                        + " at election address " + electionAddr, e);
-                throw e;
-            } catch (IOException e) {
-                LOG.warn("Cannot open channel to " + sid
-                        + " at election address " + electionAddr,
-                        e);
-            }
-        } else {
-            LOG.debug("There is a connection already for server " + sid);
         }
     }
     

Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java?rev=1453693&r1=1453692&r2=1453693&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java Thu Mar  7 06:01:49 2013
@@ -23,6 +23,8 @@ import java.io.File;
 import java.io.FileNotFoundException;
 import java.io.FileReader;
 import java.io.IOException;
+import java.io.StringReader;
+import java.io.StringWriter;
 import java.io.OutputStreamWriter;
 import java.io.StringReader;
 import java.io.StringWriter;
@@ -34,13 +36,20 @@ import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.Properties;
+import java.util.Set;
 
+
+import org.apache.zookeeper.KeeperException.NoNodeException;
+import org.apache.zookeeper.ZooDefs;
 import org.apache.zookeeper.common.AtomicFileOutputStream;
 import org.apache.zookeeper.jmx.MBeanRegistry;
 import org.apache.zookeeper.jmx.ZKMBeanInfo;
+import org.apache.zookeeper.server.DataNode;
 import org.apache.zookeeper.server.ServerCnxnFactory;
 import org.apache.zookeeper.server.ZKDatabase;
 import org.apache.zookeeper.server.ZooKeeperServer;
@@ -85,7 +94,7 @@ public class QuorumPeer extends Thread i
     QuorumBean jmxQuorumBean;
     LocalPeerBean jmxLocalPeerBean;
     LeaderElectionBean jmxLeaderElectionBean;
-    QuorumCnxManager qcm;
+    QuorumCnxManager qcm = null;
 
     /* ZKDatabase is a top level member of quorumpeer
      * which will be used in all the zookeeperservers
@@ -291,7 +300,7 @@ public class QuorumPeer extends Thread i
     }
 
     /**
-     * Sets the LearnerType both in the QuorumPeer and in the peerMap
+     * Sets the LearnerType
      */
     public void setLearnerType(LearnerType p) {
         learnerType = p;
@@ -332,7 +341,11 @@ public class QuorumPeer extends Thread i
      * QuorumVerifier implementation; default (majority).
      */
 
-    private QuorumVerifier quorumVerifier;
+    //last committed quorum verifier
+    public QuorumVerifier quorumVerifier;
+    
+    //last proposed quorum verifier
+    public QuorumVerifier lastSeenQuorumVerifier = null;
 
     /**
      * My id
@@ -482,11 +495,21 @@ public class QuorumPeer extends Thread i
     }
 
     private ServerState state = ServerState.LOOKING;
+    
+    private boolean reconfigFlag = false; // indicates that a reconfig just committed
 
     public synchronized void setPeerState(ServerState newState){
         state=newState;
     }
-
+    public synchronized void reconfigFlagSet(){
+       reconfigFlag = true;
+    }
+    public synchronized void reconfigFlagClear(){
+       reconfigFlag = false;
+    }
+    public synchronized boolean isReconfigStateChange(){
+       return reconfigFlag;
+    }
     public synchronized ServerState getPeerState(){
         return state;
     }
@@ -574,6 +597,9 @@ public class QuorumPeer extends Thread i
 
     @Override
     public synchronized void start() {
+        if (!getView().containsKey(myid)) {
+            throw new RuntimeException("My id " + myid + " not in the peer list");
+         }
         loadDataBase();
         cnxnFactory.start();
         startLeaderElection();
@@ -630,22 +656,19 @@ public class QuorumPeer extends Thread i
         responder.interrupt();
     }
     synchronized public void startLeaderElection() {
-        try {
-            currentVote = new Vote(myid, getLastLoggedZxid(), getCurrentEpoch());
-        } catch(IOException e) {
-            RuntimeException re = new RuntimeException(e.getMessage());
-            re.setStackTrace(e.getStackTrace());
-            throw re;
-        }
-        for (QuorumServer p : getView().values()) {
-            if (p.id == myid) {
-                myQuorumAddr = p.addr;
-                break;
-            }
-        }
-        if (myQuorumAddr == null) {
-            throw new RuntimeException("My id " + myid + " not in the peer list");
-        }
+       try {
+           if (getPeerState() == ServerState.LOOKING) {
+               currentVote = new Vote(myid, getLastLoggedZxid(), getCurrentEpoch());
+           }
+       } catch(IOException e) {
+           RuntimeException re = new RuntimeException(e.getMessage());
+           re.setStackTrace(e.getStackTrace());
+           throw re;
+       }
+
+       // if (!getView().containsKey(myid)) {
+      //      throw new RuntimeException("My id " + myid + " not in the peer list");
+        //}
         if (electionType == 0) {
             try {
                 udpSocket = new DatagramSocket(myQuorumAddr.getPort());
@@ -795,6 +818,8 @@ public class QuorumPeer extends Thread i
         return null;
     }
 
+    boolean shuttingDownLE = false;
+    
     @Override
     public void run() {
         setName("QuorumPeer" + "[myid=" + getId() + "]" +
@@ -879,6 +904,11 @@ public class QuorumPeer extends Thread i
                         }
                     } else {
                         try {
+                           reconfigFlagClear();
+                            if (shuttingDownLE) {
+                               shuttingDownLE = false;
+                               startLeaderElection();
+                               }
                             setCurrentVote(makeLEStrategy().lookForLeader());
                         } catch (Exception e) {
                             LOG.warn("Unexpected exception", e);
@@ -895,21 +925,21 @@ public class QuorumPeer extends Thread i
                         LOG.warn("Unexpected exception",e );
                     } finally {
                         observer.shutdown();
-                        setObserver(null);
-                        setPeerState(ServerState.LOOKING);
+                        setObserver(null);  
+                       updateServerState();
                     }
                     break;
                 case FOLLOWING:
                     try {
-                        LOG.info("FOLLOWING");
+                       LOG.info("FOLLOWING");
                         setFollower(makeFollower(logFactory));
                         follower.followLeader();
                     } catch (Exception e) {
-                        LOG.warn("Unexpected exception",e);
+                       LOG.warn("Unexpected exception",e);
                     } finally {
-                        follower.shutdown();
-                        setFollower(null);
-                        setPeerState(ServerState.LOOKING);
+                       follower.shutdown();
+                       setFollower(null);
+                       updateServerState();
                     }
                     break;
                 case LEADING:
@@ -925,10 +955,11 @@ public class QuorumPeer extends Thread i
                             leader.shutdown("Forcing shutdown");
                             setLeader(null);
                         }
-                        setPeerState(ServerState.LOOKING);
+                        updateServerState();
                     }
                     break;
                 }
+                start_fle = System.currentTimeMillis();
             }
         } finally {
             LOG.warn("QuorumPeer main thread exited");
@@ -942,6 +973,29 @@ public class QuorumPeer extends Thread i
         }
     }
 
+    private synchronized void updateServerState(){
+       if (!reconfigFlag) {
+           setPeerState(ServerState.LOOKING);
+           LOG.warn("PeerState set to LOOKING");
+           return;
+       }
+       
+       if (getId() == getCurrentVote().getId()) {
+           setPeerState(ServerState.LEADING);
+           LOG.debug("PeerState set to LEADING");
+       } else if (getLearnerType() == LearnerType.PARTICIPANT) {
+           setPeerState(ServerState.FOLLOWING);
+           LOG.debug("PeerState set to FOLLOWING");
+       } else if (getLearnerType() == LearnerType.OBSERVER) {
+           setPeerState(ServerState.OBSERVING);
+           LOG.debug("PeerState set to OBSERVER");
+       } else { // currently shouldn't happen since there are only 2 learner types
+           setPeerState(ServerState.LOOKING);
+           LOG.debug("Shouldn't be here");
+       }       
+       reconfigFlag = false;   
+    }
+    
     public void shutdown() {
         running = false;
         if (leader != null) {
@@ -979,32 +1033,24 @@ public class QuorumPeer extends Thread i
      * PeerType=PARTICIPANT.
      */
     public Map<Long,QuorumPeer.QuorumServer> getVotingView() {
-        Map<Long,QuorumPeer.QuorumServer> ret =
-            new HashMap<Long, QuorumPeer.QuorumServer>();
-        Map<Long,QuorumPeer.QuorumServer> view = getView();
-        for (QuorumServer server : view.values()) {
-            if (server.type == LearnerType.PARTICIPANT) {
-                ret.put(server.id, server);
-            }
-        }
-        return ret;
+        return getQuorumVerifier().getVotingMembers();
     }
 
     /**
      * Returns only observers, no followers.
      */
     public Map<Long,QuorumPeer.QuorumServer> getObservingView() {
-        Map<Long,QuorumPeer.QuorumServer> ret =
-            new HashMap<Long, QuorumPeer.QuorumServer>();
-        Map<Long,QuorumPeer.QuorumServer> view = getView();
-        for (QuorumServer server : view.values()) {
-            if (server.type == LearnerType.OBSERVER) {
-                ret.put(server.id, server);
-            }
-        }
-        return ret;
+       return getQuorumVerifier().getObservingMembers();
     }
 
+    public synchronized Set<Long> getAllKnownServerIds(){
+       Set<Long> tmp = new HashSet<Long>(getQuorumVerifier().getAllMembers().keySet());
+       if (getLastSeenQuorumVerifier()!=null) {
+           tmp.addAll(getLastSeenQuorumVerifier().getAllMembers().keySet());
+       }
+       return tmp;
+    }
+    
     /**
      * Check if a node is in the current view. With static membership, the
      * result of this check will never change; only when dynamic membership
@@ -1052,13 +1098,6 @@ public class QuorumPeer extends Thread i
 
 
     /**
-     * get the id of this quorum peer.
-     */
-    public long getMyid() {
-        return myid;
-    }
-
-    /**
      * set the id of this quorum peer.
      */
     public void setMyid(long myid) {
@@ -1138,13 +1177,13 @@ public class QuorumPeer extends Thread i
         props.load(new StringReader(s));
         
         QuorumPeerConfig config = new QuorumPeerConfig();
-        config.parseDynamicConfig(props, electionType);
+        config.parseDynamicConfig(props, electionType, false);
         
         return config.getQuorumVerifier();
     }
     
     /**
-     * Return QuorumVerifier object
+     * Return QuorumVerifier object for the last committed configuration
      */
 
     public synchronized QuorumVerifier getQuorumVerifier(){
@@ -1152,15 +1191,58 @@ public class QuorumPeer extends Thread i
 
     }
 
+    public synchronized QuorumVerifier getLastSeenQuorumVerifier(){
+        return lastSeenQuorumVerifier;        
+    }
+    
+    public synchronized void connectNewPeers(){
+       if (qcm!=null && getQuorumVerifier()!=null && getLastSeenQuorumVerifier()!=null) {
+           Map<Long, QuorumServer> committedView = getQuorumVerifier().getAllMembers();
+           for (Entry<Long, QuorumServer> e: getLastSeenQuorumVerifier().getAllMembers().entrySet()){
+               if (!committedView.containsKey(e.getKey())) 
+                   qcm.connectOne(e.getKey(), e.getValue().electionAddr);
+           }
+        }
+    }
+    
+    public synchronized void restartLeaderElection(QuorumVerifier qvOLD, QuorumVerifier qvNEW){
+        if (qvOLD == null || !qvOLD.equals(qvNEW)) {
+            LOG.warn("Restarting Leader Election");
+            getElectionAlg().shutdown();
+            startLeaderElection();
+        }           
+    }
+    
+    public synchronized void setLastSeenQuorumVerifier(QuorumVerifier qv, boolean writeToDisk){
+        if (lastSeenQuorumVerifier!=null && lastSeenQuorumVerifier.getVersion() >= qv.getVersion()) {
+           LOG.warn("setLastSeenQuorumVerifier called with stale config " + qv.getVersion() + 
+                   ". Current version: " + quorumVerifier.getVersion());
+          
+        }
+        lastSeenQuorumVerifier = qv;
+        connectNewPeers();
+        if (writeToDisk) {
+            try {
+               QuorumPeerConfig.writeDynamicConfig(dynamicConfigFilename + ".next", null, false, qv);
+           } catch(IOException e){
+                LOG.error("Error closing file: ", e.getMessage());
+            }
+        } 
+
+     }       
+    
     public synchronized QuorumVerifier setQuorumVerifier(QuorumVerifier qv, boolean writeToDisk){
         if ((quorumVerifier != null) && (quorumVerifier.getVersion() >= qv.getVersion())) {
-           LOG.warn("setQuorumVerifier called with stale config " + qv.getVersion() + 
+            // this is normal. For example - server found out about new config through FastLeaderElection gossiping
+           // and then got the same config in UPTODATE message so its already known
+           LOG.debug(getId() + " setQuorumVerifier called with known or old config " + qv.getVersion() + 
                    ". Current version: " + quorumVerifier.getVersion());
            return quorumVerifier;  
         }
         QuorumVerifier prevQV = quorumVerifier;
-        quorumVerifier = qv;
-        
+       quorumVerifier = qv;
+       if (lastSeenQuorumVerifier == null || (qv.getVersion() > lastSeenQuorumVerifier.getVersion()))
+           lastSeenQuorumVerifier = qv;
         if (writeToDisk) {
             try {
                 QuorumPeerConfig.writeDynamicConfig(dynamicConfigFilename, configFilename, configBackwardCompatibility, qv);
@@ -1173,14 +1255,17 @@ public class QuorumPeer extends Thread i
             }
         }
 
-        QuorumServer qs = qv.getAllMembers().get(getId());
-        if (qs!=null){
-            setQuorumAddress(qs.addr);
-            setElectionAddress(qs.electionAddr);
-            setClientAddress(qs.clientAddr);
-        }
-        return prevQV;
-    }
+        if (qv.getVersion() == lastSeenQuorumVerifier.getVersion()){
+           QuorumPeerConfig.deleteFile(dynamicConfigFilename + ".next");
+       }
+       QuorumServer qs = qv.getAllMembers().get(getId());
+       if (qs!=null){
+           setQuorumAddress(qs.addr);
+           setElectionAddress(qs.electionAddr);
+           setClientAddress(qs.clientAddr);
+       }
+       return prevQV;
+    }   
     /**
      * Get an instance of LeaderElection
      */
@@ -1229,9 +1314,6 @@ public class QuorumPeer extends Thread i
         return cnxnFactory.getLocalPort();
     }
 
-    public void setClientPortAddress(InetSocketAddress addr) {
-    }
-
     public void setTxnFactory(FileTxnSnapLog factory) {
         this.logFactory = factory;
     }
@@ -1247,7 +1329,11 @@ public class QuorumPeer extends Thread i
     public void setZKDatabase(ZKDatabase database) {
         this.zkDb = database;
     }
-
+    
+    public synchronized void initConfigInZKDatabase() {   
+        if (zkDb != null) zkDb.initConfigInZKDatabase(getQuorumVerifier());
+    }
+    
     public void setRunning(boolean running) {
         this.running = running;
     }
@@ -1340,4 +1426,98 @@ public class QuorumPeer extends Thread i
         acceptedEpoch = e;
         writeLongToFile(ACCEPTED_EPOCH_FILENAME, e);
     }
+   
+    public boolean processReconfig(QuorumVerifier qv, Long suggestedLeaderId, Long zxid, boolean restartLE){
+       InetSocketAddress oldClientAddr = getClientAddress();
+
+       // update last committed quorum verifier, write the new config to disk
+       // and restart leader election if config changed
+       QuorumVerifier prevQV = setQuorumVerifier(qv, true);
+
+       // There is no log record for the initial config, thus after syncing
+       // with leader
+       // /zookeeper/config is empty! it is also possible that last committed
+       // config is propagated during leader election
+       // without the propagation the corresponding log records.
+       // so we should explicitly do this (this is not necessary when we're
+       // already a Follower/Observer, only
+       // for Learner):
+       initConfigInZKDatabase();
+
+       if (prevQV.getVersion() < qv.getVersion()) {
+           if (restartLE) restartLeaderElection(prevQV, qv);
+
+           QuorumServer myNewQS = qv.getAllMembers().get(getId());
+           if (myNewQS != null && myNewQS.clientAddr != null
+                   && !myNewQS.clientAddr.equals(oldClientAddr)) {
+               cnxnFactory.reconfigure(myNewQS.clientAddr);
+           }
+           
+            boolean roleChange = updateLearnerType(qv);
+           boolean leaderChange = false;
+           if (suggestedLeaderId != null) {
+               // zxid should be non-null too
+               leaderChange = updateVote(suggestedLeaderId, zxid);
+           } else {
+               long currentLeaderId = getCurrentVote().getId();
+               InetSocketAddress currentLeaderAddr = prevQV.getVotingMembers()
+                       .get(currentLeaderId).addr;
+               leaderChange = (!qv.getVotingMembers().containsKey(
+                       currentLeaderId))
+                       || (!qv.getVotingMembers().get(currentLeaderId).addr
+                               .equals(currentLeaderAddr));
+               // we don't have a designated leader - need to go into leader
+               // election
+               reconfigFlagClear();
+           }
+           
+           if (roleChange || leaderChange) {
+               return true;
+           }
+       }
+       return false;
+
+   }
+    
+   private boolean updateLearnerType(QuorumVerifier newQV) {        
+       //check if I'm an observer in new config
+       if (newQV.getObservingMembers().containsKey(getId())) {
+           if (getLearnerType()!=LearnerType.OBSERVER){
+               setLearnerType(LearnerType.OBSERVER);
+               LOG.info("Becoming an observer");
+               reconfigFlagSet();
+               return true;
+           } else {
+               return false;           
+           }
+       } else if (newQV.getVotingMembers().containsKey(getId())) {
+           if (getLearnerType()!=LearnerType.PARTICIPANT){
+               setLearnerType(LearnerType.PARTICIPANT);
+               LOG.info("Becoming a voting participant");
+               reconfigFlagSet();
+               return true;
+           } else {
+               return false;
+           }
+       }
+       // I'm not in the view
+      if (getLearnerType()!=LearnerType.PARTICIPANT){
+          setLearnerType(LearnerType.PARTICIPANT);
+          LOG.info("Becoming a non-voting participant");
+          reconfigFlagSet();
+          return true;
+      }
+      return false;
+   }
+   
+   private boolean updateVote(long designatedLeader, long zxid){       
+       Vote currentVote = getCurrentVote();
+       if (currentVote!=null && designatedLeader != currentVote.getId()) {
+           setCurrentVote(new Vote(designatedLeader, zxid));
+           reconfigFlagSet();
+           LOG.warn("Suggested leader: " + designatedLeader);
+           return true;
+       }
+       return false;
+   }
 }

Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeerConfig.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeerConfig.java?rev=1453693&r1=1453692&r2=1453693&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeerConfig.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeerConfig.java Thu Mar  7 06:01:49 2013
@@ -70,7 +70,7 @@ public class QuorumPeerConfig {
 
     protected long serverId;
 
-    protected QuorumVerifier quorumVerifier = null;
+    protected QuorumVerifier quorumVerifier = null, lastSeenQuorumVerifier = null;
     protected int snapRetainCount = 3;
     protected int purgeInterval = 0;
 
@@ -121,7 +121,7 @@ public class QuorumPeerConfig {
             if (dynamicConfigFileStr == null) {
                 configBackwardCompatibilityMode = true;
                 configFileStr = path;                
-                parseDynamicConfig(cfg, electionAlg);
+                parseDynamicConfig(cfg, electionAlg, true);
                 checkValidity();                
             }
 
@@ -140,14 +140,37 @@ public class QuorumPeerConfig {
                } finally {
                    inConfig.close();
                }
-               parseDynamicConfig(dynamicCfg, electionAlg);
+               parseDynamicConfig(dynamicCfg, electionAlg, true);
                checkValidity();
            
            } catch (IOException e) {
                throw new ConfigException("Error processing " + dynamicConfigFileStr, e);
            } catch (IllegalArgumentException e) {
                throw new ConfigException("Error processing " + dynamicConfigFileStr, e);
-           }                   
+           }        
+           File nextDynamicConfigFile = new File(dynamicConfigFileStr + ".next");
+           if (nextDynamicConfigFile.exists()) {
+               try {           
+                   Properties dynamicConfigNextCfg = new Properties();
+                   FileInputStream inConfigNext = new FileInputStream(nextDynamicConfigFile);       
+                   try {
+                       dynamicConfigNextCfg.load(inConfigNext);
+                   } finally {
+                       inConfigNext.close();
+                   }
+                   boolean isHierarchical = false;
+                   for (Entry<Object, Object> entry : dynamicConfigNextCfg.entrySet()) {
+                       String key = entry.getKey().toString().trim();  
+                       if (key.startsWith("group") || key.startsWith("weight")) {
+                           isHierarchical = true;
+                           break;
+                       }
+                   }
+                   lastSeenQuorumVerifier = createQuorumVerifier(dynamicConfigNextCfg, isHierarchical);    
+               } catch (IOException e) {
+                   LOG.warn("NextQuorumVerifier is initiated to null");
+               }
+           }
         }
     }
 
@@ -258,7 +281,7 @@ public class QuorumPeerConfig {
             boolean configBackwardCompatibilityMode, QuorumVerifier qv) throws IOException {                             
         FileOutputStream outConfig = null;
        try {
-           byte b[] = qv.toByteArray();                                            
+           byte b[] = qv.toString().getBytes();                                            
            if (configBackwardCompatibilityMode) {
                dynamicConfigFilename = configFileStr + ".dynamic";
            }
@@ -353,7 +376,7 @@ public class QuorumPeerConfig {
      * @throws IOException
      * @throws ConfigException
      */
-    public void parseDynamicConfig(Properties dynamicConfigProp, int eAlg)
+    public void parseDynamicConfig(Properties dynamicConfigProp, int eAlg, boolean warnings)
     throws IOException, ConfigException {
        boolean isHierarchical = false;
         for (Entry<Object, Object> entry : dynamicConfigProp.entrySet()) {
@@ -387,13 +410,15 @@ public class QuorumPeerConfig {
             // b/w compatibility reasons we need to keep this here.
             LOG.error("Invalid configuration, only one server specified (ignoring)");
             //servers.clear();
-        } else if (numParticipators > 1) {         
-            if (numParticipators == 2) {
-                LOG.warn("No server failure will be tolerated. " +
-                    "You need at least 3 servers.");
-            } else if (numParticipators % 2 == 0) {
-                LOG.warn("Non-optimial configuration, consider an odd number of servers.");
-            }
+        } else if (numParticipators > 1) {
+           if (warnings) {
+                if (numParticipators == 2) {
+                    LOG.warn("No server failure will be tolerated. " +
+                        "You need at least 3 servers.");
+                } else if (numParticipators % 2 == 0) {
+                    LOG.warn("Non-optimial configuration, consider an odd number of servers.");
+                }
+           }
             /*
              * If using FLE, then every server requires a separate election
              * port.
@@ -489,6 +514,10 @@ public class QuorumPeerConfig {
     public QuorumVerifier getQuorumVerifier() {   
         return quorumVerifier;
     }
+    
+    public QuorumVerifier getLastSeenQuorumVerifier() {   
+        return lastSeenQuorumVerifier;
+    }
 
     public Map<Long,QuorumServer> getServers() {
         // returns all configuration servers -- participants and observers

Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeerMain.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeerMain.java?rev=1453693&r1=1453692&r2=1453693&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeerMain.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeerMain.java Thu Mar  7 06:01:49 2013
@@ -151,6 +151,10 @@ public class QuorumPeerMain {
           quorumPeer.setConfigBackwardCompatibility(config.getConfigBackwardCompatibility());
           quorumPeer.setZKDatabase(new ZKDatabase(quorumPeer.getTxnFactory()));
           quorumPeer.setQuorumVerifier(config.getQuorumVerifier(), false);
+          if (config.getLastSeenQuorumVerifier()!=null) {
+              quorumPeer.setLastSeenQuorumVerifier(config.getLastSeenQuorumVerifier(), false);
+          }
+          quorumPeer.initConfigInZKDatabase();
           quorumPeer.setCnxnFactory(cnxnFactory);
           quorumPeer.setLearnerType(config.getPeerType());
           

Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumZooKeeperServer.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumZooKeeperServer.java?rev=1453693&r1=1453692&r2=1453693&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumZooKeeperServer.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumZooKeeperServer.java Thu Mar  7 06:01:49 2013
@@ -28,7 +28,7 @@ import org.apache.zookeeper.server.persi
  * a quorum.
  */
 public abstract class QuorumZooKeeperServer extends ZooKeeperServer {
-    protected final QuorumPeer self;
+    public final QuorumPeer self;
 
     protected QuorumZooKeeperServer(FileTxnSnapLog logFactory, int tickTime,
             int minSessionTimeout, int maxSessionTimeout,
@@ -54,5 +54,7 @@ public abstract class QuorumZooKeeperSer
         pwriter.println(self.getQuorumAddress().getPort());
         pwriter.print("peerType=");
         pwriter.println(self.getLearnerType().ordinal());
+        pwriter.println("membership: ");
+        pwriter.print(new String(self.getQuorumVerifier().toString().getBytes()));
     }
 }

Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/ReadOnlyRequestProcessor.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/ReadOnlyRequestProcessor.java?rev=1453693&r1=1453692&r2=1453693&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/ReadOnlyRequestProcessor.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/ReadOnlyRequestProcessor.java Thu Mar  7 06:01:49 2013
@@ -80,6 +80,7 @@ public class ReadOnlyRequestProcessor ex
                 case OpCode.create:
                 case OpCode.delete:
                 case OpCode.setData:
+                case OpCode.reconfig:
                 case OpCode.setACL:
                 case OpCode.multi:
                 case OpCode.check:

Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/flexible/QuorumHierarchical.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/flexible/QuorumHierarchical.java?rev=1453693&r1=1453692&r2=1453693&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/flexible/QuorumHierarchical.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/flexible/QuorumHierarchical.java Thu Mar  7 06:01:49 2013
@@ -241,7 +241,7 @@ public class QuorumHierarchical implemen
     public Map<Long, QuorumServer> getAllMembers() { 
        return allMembers;
     }
-    public byte[] toByteArray(){
+    public String toString(){
        StringWriter sw = new StringWriter();
        
        for (QuorumServer member: getAllMembers().values()){            
@@ -286,7 +286,7 @@ public class QuorumHierarchical implemen
        
        sw.append("version=" + Long.toHexString(version));
        
-       return sw.toString().getBytes();        
+       return sw.toString();        
     }
     
     /**
@@ -371,4 +371,9 @@ public class QuorumHierarchical implemen
    public long getVersion() {
        return version;
    }          
+
+   public void setVersion(long ver) {
+       version = ver;
+   }
+
 }

Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/flexible/QuorumMaj.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/flexible/QuorumMaj.java?rev=1453693&r1=1453692&r2=1453693&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/flexible/QuorumMaj.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/flexible/QuorumMaj.java Thu Mar  7 06:01:49 2013
@@ -109,7 +109,7 @@ public class QuorumMaj implements Quorum
         return (long) 1;
     }
 
-    public byte[] toByteArray() {
+    public String toString() {
         StringBuilder sw = new StringBuilder();
 
         for (QuorumServer member : getAllMembers().values()) {
@@ -123,8 +123,8 @@ public class QuorumMaj implements Quorum
         String hexVersion = Long.toHexString(version);
         sw.append("version=");
         sw.append(hexVersion);
-        return sw.toString().getBytes();
-    }
+        return sw.toString();
+    }    
 
     /**
      * Verifies if a set is a majority. Assumes that ackSet contains acks only
@@ -149,5 +149,8 @@ public class QuorumMaj implements Quorum
     public long getVersion() {
         return version;
     }
-
+    
+    public void setVersion(long ver) {
+        version = ver;
+    }
 }

Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/flexible/QuorumVerifier.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/flexible/QuorumVerifier.java?rev=1453693&r1=1453692&r2=1453693&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/flexible/QuorumVerifier.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/flexible/QuorumVerifier.java Thu Mar  7 06:01:49 2013
@@ -34,9 +34,10 @@ public interface QuorumVerifier {
     long getWeight(long id);
     boolean containsQuorum(Set<Long> set);
     long getVersion();
+    void setVersion(long ver);
     Map<Long, QuorumServer> getAllMembers();
     Map<Long, QuorumServer> getVotingMembers();
     Map<Long, QuorumServer> getObservingMembers();
     boolean equals(Object o);
-    byte[] toByteArray();
+    String toString();
 }

Added: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/util/ConfigUtils.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/util/ConfigUtils.java?rev=1453693&view=auto
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/util/ConfigUtils.java (added)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/util/ConfigUtils.java Thu Mar  7 06:01:49 2013
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.util;
+
+import java.io.IOException;
+import java.io.StringReader;
+import java.util.Properties;
+import java.util.Map.Entry;
+
+import org.apache.zookeeper.server.quorum.QuorumPeer;
+import org.apache.zookeeper.server.quorum.QuorumPeerConfig.ConfigException;
+
+
+public class ConfigUtils {
+	static public String getClientConfigStr(String configData) {
+        Properties props = new Properties();    	
+        try {
+          props.load(new StringReader(configData));
+		} catch (IOException e) {
+            e.printStackTrace();
+            return "";
+        }
+        StringBuffer sb = new StringBuffer();
+        boolean first = true;
+        String version = "";
+        for (Entry<Object, Object> entry : props.entrySet()) {
+             String key = entry.getKey().toString().trim();
+             String value = entry.getValue().toString().trim();
+             if (key.equals("version")) version = value;
+             if (!key.startsWith("server.")) continue;	         
+             QuorumPeer.QuorumServer qs;
+             try {
+               qs = new QuorumPeer.QuorumServer(-1, value);
+             } catch (ConfigException e) {				
+                    e.printStackTrace();
+                    continue;
+             }
+             if (!first) sb.append(",");
+             else first = false;
+             sb.append(qs.clientAddr.getHostName() + ":" + qs.clientAddr.getPort());
+        }
+        return version + " " + sb.toString();
+    }
+}

Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/util/SerializeUtils.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/util/SerializeUtils.java?rev=1453693&r1=1453692&r2=1453693&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/util/SerializeUtils.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/util/SerializeUtils.java Thu Mar  7 06:01:49 2013
@@ -71,6 +71,7 @@ public class SerializeUtils {
         case OpCode.delete:
             txn = new DeleteTxn();
             break;
+        case OpCode.reconfig:
         case OpCode.setData:
             txn = new SetDataTxn();
             break;

Modified: zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/QuorumPeerTestBase.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/QuorumPeerTestBase.java?rev=1453693&r1=1453692&r2=1453693&view=diff
==============================================================================
--- zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/QuorumPeerTestBase.java (original)
+++ zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/QuorumPeerTestBase.java Thu Mar  7 06:01:49 2013
@@ -53,17 +53,40 @@ public class QuorumPeerTestBase extends 
             }
         }
     }
-
+    
+    public static class MainThreadReconfigRecovery extends MainThread {
+       final File nextDynamicConfigFile;
+       
+       public MainThreadReconfigRecovery(int myid, int clientPort,
+               String currentQuorumCfgSection, String nextQuorumCfgSection) 
+                       throws IOException {
+           super(myid, clientPort, currentQuorumCfgSection);
+           nextDynamicConfigFile = new File(tmpDir, "zoo.dynamic.next");
+           FileWriter fwriter = new FileWriter(nextDynamicConfigFile);
+            fwriter.write(nextQuorumCfgSection + "\n");
+            fwriter.flush();
+            fwriter.close();
+       }               
+    }
+    
     public static class MainThread implements Runnable {
         final File confFile;
         final File dynamicConfigFile;
+        final File tmpDir;
+        
         volatile TestQPMain main;
 
         public MainThread(int myid, int clientPort, String quorumCfgSection)
                 throws IOException {
-            File tmpDir = ClientBase.createTmpDir();
+            tmpDir = ClientBase.createTmpDir();
             LOG.info("id = " + myid + " tmpDir = " + tmpDir + " clientPort = "
                     + clientPort);
+
+            File dataDir = new File(tmpDir, "data");
+            if (!dataDir.mkdir()) {
+                throw new IOException("Unable to mkdir " + dataDir);
+            }
+
             confFile = new File(tmpDir, "zoo.cfg");
             dynamicConfigFile = new File(tmpDir, "zoo.dynamic");
 
@@ -72,11 +95,6 @@ public class QuorumPeerTestBase extends 
             fwriter.write("initLimit=10\n");
             fwriter.write("syncLimit=5\n");
 
-            File dataDir = new File(tmpDir, "data");
-            if (!dataDir.mkdir()) {
-                throw new IOException("Unable to mkdir " + dataDir);
-            }
-
             // Convert windows path to UNIX to avoid problems with "\"
             String dir = dataDir.toString();
             String dynamicConfigFilename = dynamicConfigFile.toString();
@@ -106,7 +124,7 @@ public class QuorumPeerTestBase extends 
             fwriter.flush();
             fwriter.close();
         }
-
+        
         Thread currentThread;
 
         synchronized public void start() {

Added: zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/ReconfigRecoveryTest.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/ReconfigRecoveryTest.java?rev=1453693&view=auto
==============================================================================
--- zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/ReconfigRecoveryTest.java (added)
+++ zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/ReconfigRecoveryTest.java Thu Mar  7 06:01:49 2013
@@ -0,0 +1,358 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.quorum;
+
+import static org.apache.zookeeper.test.ClientBase.CONNECTION_TIMEOUT;
+
+import java.util.ArrayList;
+
+import org.apache.zookeeper.PortAssignment;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.test.ClientBase;
+import org.apache.zookeeper.test.ReconfigTest;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class ReconfigRecoveryTest extends QuorumPeerTestBase {
+    /**
+     * Reconfiguration recovery - test that a reconfiguration is completed 
+     * if leader has .next file during startup and new config is not running yet
+     */
+    @Test
+    public void testNextConfigCompletion() throws Exception {
+        ClientBase.setupTestEnv();
+        
+        // 2 servers in current config, 3 in next config
+        final int SERVER_COUNT = 3;
+        final int clientPorts[] = new int[SERVER_COUNT];
+        StringBuilder sb = new StringBuilder();
+        String server;
+        ArrayList<String> allServers = new ArrayList<String>();
+
+        String currentQuorumCfgSection = null, nextQuorumCfgSection;
+        
+        for(int i = 0; i < SERVER_COUNT; i++) {
+               clientPorts[i] = PortAssignment.unique();
+               server = "server." + i + "=localhost:"+PortAssignment.unique()+":"+PortAssignment.unique() + 
+                      ":participant;localhost:" + clientPorts[i];        
+               allServers.add(server);
+               sb.append(server +"\n");
+               if (i == 1) currentQuorumCfgSection = sb.toString();
+        }
+        sb.append("version=1\n"); // version of current config is 0
+        nextQuorumCfgSection = sb.toString();
+        
+        // Both servers 0 and 1 will have the .next config file, which means
+        // for them that a reconfiguration was in progress when they failed
+        // and the leader will complete it
+        MainThread mt[] = new MainThread[SERVER_COUNT];
+        ZooKeeper zk[] = new ZooKeeper[SERVER_COUNT];
+        for (int i = 0; i < SERVER_COUNT - 1; i++) {
+            mt[i] = new MainThreadReconfigRecovery(i, clientPorts[i], currentQuorumCfgSection, nextQuorumCfgSection);
+            mt[i].start();
+            zk[i] = new ZooKeeper("127.0.0.1:" + clientPorts[i], ClientBase.CONNECTION_TIMEOUT, this);
+        }
+        
+        Assert.assertTrue("waiting for server 0 being up",
+                ClientBase.waitForServerUp("127.0.0.1:" + clientPorts[0],
+                        CONNECTION_TIMEOUT));
+        Assert.assertTrue("waiting for server 1 being up",
+                ClientBase.waitForServerUp("127.0.0.1:" + clientPorts[1],
+                        CONNECTION_TIMEOUT));
+         
+        int leader = mt[0].main.quorumPeer.leader == null ? 1: 0;
+              
+        // the new server's config is going to include itself and the current leader
+        sb = new StringBuilder();
+        sb.append(allServers.get(leader) + "\n");
+        sb.append(allServers.get(2) + "\n");
+        
+        // suppose that this new server never heard about the reconfig proposal
+        String newServerInitialConfig = sb.toString();
+        mt[2] = new MainThread(2, clientPorts[2], newServerInitialConfig);
+        mt[2].start();
+        zk[2] = new ZooKeeper("127.0.0.1:" + clientPorts[2], ClientBase.CONNECTION_TIMEOUT, this);
+
+       Assert.assertTrue("waiting for server 2 being up",
+                ClientBase.waitForServerUp("127.0.0.1:" + clientPorts[2],
+                        CONNECTION_TIMEOUT));
+        
+        ReconfigTest.testServerHasConfig(zk[0], allServers, null);
+        ReconfigTest.testServerHasConfig(zk[1], allServers, null);
+        ReconfigTest.testServerHasConfig(zk[2], allServers, null);
+        
+        ReconfigTest.testNormalOperation(zk[0], zk[2]);
+        ReconfigTest.testNormalOperation(zk[2], zk[1]);
+
+        zk[2].close();
+        mt[2].shutdown();
+        
+        //now suppose that the new server heard the reconfig request
+        mt[2] = new MainThreadReconfigRecovery(2, clientPorts[2], newServerInitialConfig, nextQuorumCfgSection);
+        mt[2].start();
+        zk[2] = new ZooKeeper("127.0.0.1:" + clientPorts[2], ClientBase.CONNECTION_TIMEOUT, this);
+
+       Assert.assertTrue("waiting for server 2 being up",
+                ClientBase.waitForServerUp("127.0.0.1:" + clientPorts[2],
+                        CONNECTION_TIMEOUT));
+        
+        ReconfigTest.testServerHasConfig(zk[0], allServers, null);
+        ReconfigTest.testServerHasConfig(zk[1], allServers, null);
+        ReconfigTest.testServerHasConfig(zk[2], allServers, null);
+        
+        ReconfigTest.testNormalOperation(zk[0], zk[2]);
+        ReconfigTest.testNormalOperation(zk[2], zk[1]);
+
+        for (int i = 0; i < SERVER_COUNT; i++) {
+            zk[i].close();
+        }
+        for (int i = 0; i < SERVER_COUNT; i++) {
+            mt[i].shutdown();
+        }
+    }
+
+    /**
+     * Reconfiguration recovery - current config servers discover .next file,
+     * but they're both observers and their ports change in next config. Suppose that next config wasn't activated yet.
+     * Should complete reconfiguration. 
+     */
+    @Test
+    public void testCurrentServersAreObserversInNextConfig() throws Exception {
+        ClientBase.setupTestEnv();
+        
+        // 2 servers in current config, 5 in next config
+        final int SERVER_COUNT = 5;
+        final int clientPorts[] = new int[SERVER_COUNT];
+        final int oldClientPorts[] = new int[2];
+        StringBuilder sb = new StringBuilder();
+        String server;
+
+        String currentQuorumCfgSection = null, nextQuorumCfgSection;
+        
+        ArrayList<String> allServersCurrent = new ArrayList<String>();
+        ArrayList<String> allServersNext = new ArrayList<String>();
+        
+        
+        for(int i = 0; i < 2; i++) {
+               oldClientPorts[i] = PortAssignment.unique();
+               server = "server." + i + "=localhost:"+PortAssignment.unique()+":"+PortAssignment.unique() + 
+                      ":participant;localhost:" + oldClientPorts[i];        
+               allServersCurrent.add(server);
+               sb.append(server +"\n");
+        }
+        
+        currentQuorumCfgSection = sb.toString();
+        sb = new StringBuilder();
+        String role;
+        for (int i=0; i<SERVER_COUNT; i++) {
+            clientPorts[i] = PortAssignment.unique();
+            if (i < 2) role = "observer";
+            else role = "participant";
+            server = "server." + i + "=localhost:"+PortAssignment.unique()+":"+PortAssignment.unique() + 
+                  ":" + role + ";localhost:" + clientPorts[i];        
+            allServersNext.add(server);
+            sb.append(server +"\n");
+           
+        }
+        sb.append("version=1\n"); // version of current config is 0
+        nextQuorumCfgSection = sb.toString();
+        
+        MainThread mt[] = new MainThread[SERVER_COUNT];
+        ZooKeeper zk[] = new ZooKeeper[SERVER_COUNT];
+
+        for (int i = 0; i < 2; i++) {
+            mt[i] = new MainThreadReconfigRecovery(i, oldClientPorts[i], currentQuorumCfgSection, nextQuorumCfgSection);
+            mt[i].start();
+            zk[i] = new ZooKeeper("127.0.0.1:" + clientPorts[i], ClientBase.CONNECTION_TIMEOUT, this);
+        }
+
+        // new members are initialized with current config + the new server
+        for (int i = 2; i < SERVER_COUNT; i++) {
+            mt[i] = new MainThread(i, clientPorts[i], currentQuorumCfgSection + allServersNext.get(i));
+            mt[i].start();
+            zk[i] = new ZooKeeper("127.0.0.1:" + clientPorts[i], ClientBase.CONNECTION_TIMEOUT, this);
+        }
+
+        for (int i=0; i<SERVER_COUNT; i++) {
+            Assert.assertTrue("waiting for server "+ i + " being up",
+                ClientBase.waitForServerUp("127.0.0.1:" + clientPorts[i],
+                        CONNECTION_TIMEOUT * 2));
+            ReconfigTest.testServerHasConfig(zk[i], allServersNext, null);  
+        }
+        
+        ReconfigTest.testNormalOperation(zk[0], zk[2]);
+        ReconfigTest.testNormalOperation(zk[4], zk[1]);
+
+        for (int i = 0; i < SERVER_COUNT; i++) {
+            zk[i].close();
+        }
+        for (int i = 0; i < SERVER_COUNT; i++) {
+            mt[i].shutdown();
+        }
+
+     }
+    
+    
+    
+    /**
+     * Reconfiguration recovery - test that if servers in old config have a .next file
+     * but no quorum of new config is up then no progress should be possible (no progress will happen
+     * to ensure safety as the new config might be actually up but partitioned from old config)
+     */
+    @Test
+    public void testNextConfigUnreachable() throws Exception {
+        ClientBase.setupTestEnv();
+        
+        // 2 servers in current config, 5 in next config
+        final int SERVER_COUNT = 5;
+        final int clientPorts[] = new int[SERVER_COUNT];
+        StringBuilder sb = new StringBuilder();
+        String server;
+
+        String currentQuorumCfgSection = null, nextQuorumCfgSection;
+        
+        ArrayList<String> allServers = new ArrayList<String>();
+        for(int i = 0; i < SERVER_COUNT; i++) {
+               clientPorts[i] = PortAssignment.unique();
+               server = "server." + i + "=localhost:"+PortAssignment.unique()+":"+PortAssignment.unique() + 
+                      ":participant;localhost:" + clientPorts[i];        
+               allServers.add(server);
+               sb.append(server +"\n");
+               if (i == 1) currentQuorumCfgSection = sb.toString();
+        }
+        sb.append("version=1\n"); // version of current config is 0
+        nextQuorumCfgSection = sb.toString();
+        
+        // lets start servers 2, 3, 4 with the new config
+        MainThread mt[] = new MainThread[SERVER_COUNT];
+        ZooKeeper zk[] = new ZooKeeper[SERVER_COUNT];
+
+        // Both servers 0 and 1 will have the .next config file, which means
+        // for them that a reconfiguration was in progress when they failed
+        // and the leader will complete it. 
+        for (int i = 0; i < 2; i++) {
+            mt[i] = new MainThreadReconfigRecovery(i, clientPorts[i], currentQuorumCfgSection, nextQuorumCfgSection);
+            mt[i].start();
+            zk[i] = new ZooKeeper("127.0.0.1:" + clientPorts[i], ClientBase.CONNECTION_TIMEOUT, this);
+        }
+
+        Thread.sleep(CONNECTION_TIMEOUT*2);
+        
+        // make sure servers 0, 1 don't come online
+        for (int i = 0; i < 2; i++) {
+           Assert.assertFalse("server " +  i + " is up but shouldn't be",
+                    ClientBase.waitForServerUp("127.0.0.1:" + clientPorts[i],
+                            CONNECTION_TIMEOUT / 10));
+        }
+ 
+        for (int i = 0; i < 2; i++) {
+            zk[i].close();
+        }
+        for (int i = 0; i < 2; i++) {
+            mt[i].shutdown();
+        }
+    }
+    
+    /**
+     * Reconfiguration recovery - test that old config members will join the new config 
+     * if its already active, and not try to complete the reconfiguration
+     */
+    @Test
+    public void testNextConfigAlreadyActive() throws Exception {
+        ClientBase.setupTestEnv();
+        
+        // 2 servers in current config, 5 in next config
+        final int SERVER_COUNT = 5;
+        final int clientPorts[] = new int[SERVER_COUNT];
+        StringBuilder sb = new StringBuilder();
+        String server;
+
+        String currentQuorumCfgSection = null, nextQuorumCfgSection;
+        
+        ArrayList<String> allServers = new ArrayList<String>();
+        for(int i = 0; i < SERVER_COUNT; i++) {
+               clientPorts[i] = PortAssignment.unique();
+               server = "server." + i + "=localhost:"+PortAssignment.unique()+":"+PortAssignment.unique() + 
+                      ":participant;localhost:" + clientPorts[i];        
+               allServers.add(server);
+               sb.append(server +"\n");
+               if (i == 1) currentQuorumCfgSection = sb.toString();
+        }
+        sb.append("version=1\n"); // version of current config is 0
+        nextQuorumCfgSection = sb.toString();
+        
+        // lets start servers 2, 3, 4 with the new config
+        MainThread mt[] = new MainThread[SERVER_COUNT];
+        ZooKeeper zk[] = new ZooKeeper[SERVER_COUNT];
+        for (int i = 2; i < SERVER_COUNT; i++) {
+            mt[i] = new MainThread(i, clientPorts[i], nextQuorumCfgSection);
+            mt[i].start();
+            zk[i] = new ZooKeeper("127.0.0.1:" + clientPorts[i], ClientBase.CONNECTION_TIMEOUT, this);
+        }
+        for (int i = 2; i < SERVER_COUNT; i++) {            
+            Assert.assertTrue("waiting for server " + i + " being up",
+                ClientBase.waitForServerUp("127.0.0.1:" + clientPorts[i],
+                        CONNECTION_TIMEOUT));
+        }
+
+        ReconfigTest.testNormalOperation(zk[2], zk[3]);
+
+        long epoch = mt[2].main.quorumPeer.getAcceptedEpoch();
+        
+        // Both servers 0 and 1 will have the .next config file, which means
+        // for them that a reconfiguration was in progress when they failed
+        // and the leader will complete it. 
+        for (int i = 0; i < 2; i++) {
+            mt[i] = new MainThreadReconfigRecovery(i, clientPorts[i], currentQuorumCfgSection, nextQuorumCfgSection);
+            mt[i].start();
+            zk[i] = new ZooKeeper("127.0.0.1:" + clientPorts[i], ClientBase.CONNECTION_TIMEOUT, this);
+        }
+
+        
+        // servers 0 and 1 should connect to all servers, including the one in their
+        // .next file during startup, and will find the next config and join it
+        for (int i = 0; i < 2; i++) {
+           Assert.assertTrue("waiting for server " +  i + " being up",
+                    ClientBase.waitForServerUp("127.0.0.1:" + clientPorts[i],
+                            CONNECTION_TIMEOUT*2));
+        }
+        
+        // make sure they joined the new config without any change to it
+        Assert.assertEquals(epoch, mt[0].main.quorumPeer.getAcceptedEpoch());
+        Assert.assertEquals(epoch, mt[1].main.quorumPeer.getAcceptedEpoch());
+        Assert.assertEquals(epoch, mt[2].main.quorumPeer.getAcceptedEpoch());
+        
+        ReconfigTest.testServerHasConfig(zk[0], allServers, null);
+        ReconfigTest.testServerHasConfig(zk[1], allServers, null);
+        
+        ReconfigTest.testNormalOperation(zk[0], zk[2]);
+        ReconfigTest.testNormalOperation(zk[4], zk[1]);
+
+       
+        for (int i = 0; i < SERVER_COUNT; i++) {
+            zk[i].close();
+        }
+        for (int i = 0; i < SERVER_COUNT; i++) {
+            mt[i].shutdown();
+        }
+    }
+    
+ 
+    
+}

Modified: zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/Zab1_0Test.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/Zab1_0Test.java?rev=1453693&r1=1453692&r2=1453693&view=diff
==============================================================================
--- zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/Zab1_0Test.java (original)
+++ zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/Zab1_0Test.java Thu Mar  7 06:01:49 2013
@@ -62,6 +62,7 @@ import org.apache.zookeeper.txn.SetDataT
 import org.apache.zookeeper.txn.TxnHeader;
 import org.junit.Assert;
 import org.junit.Test;
+import org.apache.zookeeper.server.quorum.flexible.QuorumVerifier;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -300,6 +301,9 @@ public class Zab1_0Test {
         public int getNumAliveConnections() {
             return 0;
         }
+		@Override
+		public void reconfigure(InetSocketAddress addr) {			
+		}
     }
     static Socket[] getSocketPair() throws IOException {
         ServerSocket ss = new ServerSocket();
@@ -496,41 +500,41 @@ public class Zab1_0Test {
     @Test
     public void testUnnecessarySnap() throws Exception {
         testPopulatedLeaderConversation(new PopulatedLeaderConversation() {
-           @Override
-           public void converseWithLeader(InputArchive ia, OutputArchive oa,
+            @Override
+            public void converseWithLeader(InputArchive ia, OutputArchive oa,
                     Leader l, long zxid) throws Exception {
-               
-               Assert.assertEquals(1, l.self.getAcceptedEpoch());
-               Assert.assertEquals(1, l.self.getCurrentEpoch());
-               
-               /* we test a normal run. everything should work out well. */
-               LearnerInfo li = new LearnerInfo(1, 0x10000);
-               byte liBytes[] = new byte[12];
-               ByteBufferOutputStream.record2ByteBuffer(li,
-                       ByteBuffer.wrap(liBytes));
-               QuorumPacket qp = new QuorumPacket(Leader.FOLLOWERINFO, 1,
-                       liBytes, null);
-               oa.writeRecord(qp, null);
-               
-               readPacketSkippingPing(ia, qp);
-               Assert.assertEquals(Leader.LEADERINFO, qp.getType());
-               Assert.assertEquals(ZxidUtils.makeZxid(2, 0), qp.getZxid());
-               Assert.assertEquals(ByteBuffer.wrap(qp.getData()).getInt(),
-                       0x10000);
-               Assert.assertEquals(2, l.self.getAcceptedEpoch());
-               Assert.assertEquals(1, l.self.getCurrentEpoch());
-               
-               byte epochBytes[] = new byte[4];
-               final ByteBuffer wrappedEpochBytes = ByteBuffer.wrap(epochBytes);
-               wrappedEpochBytes.putInt(1);
-               qp = new QuorumPacket(Leader.ACKEPOCH, zxid, epochBytes, null);
-               oa.writeRecord(qp, null);
-               
-               readPacketSkippingPing(ia, qp);
-               Assert.assertEquals(Leader.DIFF, qp.getType());
-           
-           }
-       }, 2);
+
+                Assert.assertEquals(1, l.self.getAcceptedEpoch());
+                Assert.assertEquals(1, l.self.getCurrentEpoch());
+
+                /* we test a normal run. everything should work out well. */
+                LearnerInfo li = new LearnerInfo(1, 0x10000, 0);
+                byte liBytes[] = new byte[20];
+                ByteBufferOutputStream.record2ByteBuffer(li,
+                        ByteBuffer.wrap(liBytes));
+                QuorumPacket qp = new QuorumPacket(Leader.FOLLOWERINFO, 1,
+                        liBytes, null);
+                oa.writeRecord(qp, null);
+
+                readPacketSkippingPing(ia, qp);
+                Assert.assertEquals(Leader.LEADERINFO, qp.getType());
+                Assert.assertEquals(ZxidUtils.makeZxid(2, 0), qp.getZxid());
+                Assert.assertEquals(ByteBuffer.wrap(qp.getData()).getInt(),
+                        0x10000);
+                Assert.assertEquals(2, l.self.getAcceptedEpoch());
+                Assert.assertEquals(1, l.self.getCurrentEpoch());
+
+                byte epochBytes[] = new byte[4];
+                final ByteBuffer wrappedEpochBytes = ByteBuffer.wrap(epochBytes);
+                wrappedEpochBytes.putInt(1);
+                qp = new QuorumPacket(Leader.ACKEPOCH, zxid, epochBytes, null);
+                oa.writeRecord(qp, null);
+
+                readPacketSkippingPing(ia, qp);
+                Assert.assertEquals(Leader.DIFF, qp.getType());
+
+            }
+        }, 2);
     }
     
     @Test
@@ -796,8 +800,8 @@ public class Zab1_0Test {
                 Assert.assertEquals(0, l.self.getCurrentEpoch());
                 
                 /* we test a normal run. everything should work out well. */
-                LearnerInfo li = new LearnerInfo(1, 0x10000);
-                byte liBytes[] = new byte[12];
+                LearnerInfo li = new LearnerInfo(1, 0x10000, 0);
+                byte liBytes[] = new byte[20];
                 ByteBufferOutputStream.record2ByteBuffer(li,
                         ByteBuffer.wrap(liBytes));
                 QuorumPacket qp = new QuorumPacket(Leader.FOLLOWERINFO, 0,
@@ -856,8 +860,8 @@ public class Zab1_0Test {
             public void converseWithLeader(InputArchive ia, OutputArchive oa, Leader l)
                     throws IOException {
                 /* we test a normal run. everything should work out well. */
-                LearnerInfo li = new LearnerInfo(1, 0x10000);
-                byte liBytes[] = new byte[12];
+                LearnerInfo li = new LearnerInfo(1, 0x10000, 0);
+                byte liBytes[] = new byte[20];
                 ByteBufferOutputStream.record2ByteBuffer(li,
                         ByteBuffer.wrap(liBytes));
                 /* we are going to say we last acked epoch 20 */
@@ -905,8 +909,8 @@ public class Zab1_0Test {
             public void converseWithLeader(InputArchive ia, OutputArchive oa, Leader l)
                     throws IOException, InterruptedException {
                 /* we test a normal run. everything should work out well. */            	
-                LearnerInfo li = new LearnerInfo(1, 0x10000);
-                byte liBytes[] = new byte[12];
+                LearnerInfo li = new LearnerInfo(1, 0x10000, 0);
+                byte liBytes[] = new byte[20];
                 ByteBufferOutputStream.record2ByteBuffer(li,
                         ByteBuffer.wrap(liBytes));
                 QuorumPacket qp = new QuorumPacket(Leader.FOLLOWERINFO, 0,

Modified: zookeeper/trunk/src/java/test/org/apache/zookeeper/test/CnxManagerTest.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/CnxManagerTest.java?rev=1453693&r1=1453692&r2=1453693&view=diff
==============================================================================
--- zookeeper/trunk/src/java/test/org/apache/zookeeper/test/CnxManagerTest.java (original)
+++ zookeeper/trunk/src/java/test/org/apache/zookeeper/test/CnxManagerTest.java Thu Mar  7 06:01:49 2013
@@ -18,6 +18,7 @@
 
 package org.apache.zookeeper.test;
 
+import java.io.DataOutputStream;
 import java.io.File;
 import java.net.InetSocketAddress;
 import java.nio.ByteBuffer;
@@ -232,16 +233,18 @@ public class CnxManagerTest extends ZKTe
         SocketChannel sc = SocketChannel.open();
         sc.socket().connect(peers.get(1L).electionAddr, 5000);
 
-        /*
-         * Write id first then negative length.
-         */
-        byte[] msgBytes = new byte[8];
-        ByteBuffer msgBuffer = ByteBuffer.wrap(msgBytes);
-        msgBuffer.putLong(2L);
-        msgBuffer.position(0);
-        sc.write(msgBuffer);
+        InetSocketAddress otherAddr = peers.get(new Long(2)).electionAddr;
+        DataOutputStream dout = new DataOutputStream(sc.socket().getOutputStream());
+        dout.writeLong(0xffff0000);
+        dout.writeLong(new Long(2));
+        String addr = otherAddr.getHostName()+ ":" + otherAddr.getPort();
+        byte[] addr_bytes = addr.getBytes();
+        dout.writeInt(addr_bytes.length);
+        dout.write(addr_bytes);
+        dout.flush();
+        
 
-        msgBuffer = ByteBuffer.wrap(new byte[4]);
+        ByteBuffer msgBuffer = ByteBuffer.wrap(new byte[4]);
         msgBuffer.putInt(-20);
         msgBuffer.position(0);
         sc.write(msgBuffer);

Added: zookeeper/trunk/src/java/test/org/apache/zookeeper/test/QuorumMajorityTest.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/QuorumMajorityTest.java?rev=1453693&view=auto
==============================================================================
--- zookeeper/trunk/src/java/test/org/apache/zookeeper/test/QuorumMajorityTest.java (added)
+++ zookeeper/trunk/src/java/test/org/apache/zookeeper/test/QuorumMajorityTest.java Thu Mar  7 06:01:49 2013
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.test;
+import java.util.HashSet;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.zookeeper.server.quorum.Leader;
+import org.apache.zookeeper.server.quorum.Leader.Proposal;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class QuorumMajorityTest extends QuorumBase {
+    protected static final Logger LOG = LoggerFactory.getLogger(QuorumMajorityTest.class);
+    public static final long CONNECTION_TIMEOUT = ClientTest.CONNECTION_TIMEOUT;
+
+    /***************************************************************/
+    /* Test that the majority quorum verifier only counts votes from */
+    /* followers in its view                                    */
+    /***************************************************************/
+    @Test
+    public void testMajQuorums() throws Throwable {
+       
+       //setup servers 1-5 to be followers
+       setUp(false);
+        
+       Proposal p = new Proposal();
+       
+        p.addQuorumVerifier(s1.getQuorumVerifier());
+        
+        // 2 followers out of 5 is not a majority
+        p.addAck(Long.valueOf(1));
+        p.addAck(Long.valueOf(2));        
+        Assert.assertEquals(false, p.hasAllQuorums());
+        
+        // 6 is not in the view - its vote shouldn't count
+        p.addAck(Long.valueOf(6));  
+        Assert.assertEquals(false, p.hasAllQuorums());
+        
+        // 3 followers out of 5 are a majority of the voting view
+        p.addAck(Long.valueOf(3));  
+        Assert.assertEquals(true, p.hasAllQuorums());
+        
+       //setup servers 1-3 to be followers and 4 and 5 to be observers
+       setUp(true);
+       
+       p = new Proposal();
+       p.addQuorumVerifier(s1.getQuorumVerifier());
+        
+        // 1 follower out of 3 is not a majority
+       p.addAck(Long.valueOf(1));      
+        Assert.assertEquals(false, p.hasAllQuorums());
+        
+        // 4 and 5 are observers, their vote shouldn't count
+        p.addAck(Long.valueOf(4));
+        p.addAck(Long.valueOf(5));
+        Assert.assertEquals(false, p.hasAllQuorums());
+        
+        // 6 is not in the view - its vote shouldn't count
+        p.addAck(Long.valueOf(6));
+        Assert.assertEquals(false, p.hasAllQuorums());
+        
+        // 2 followers out of 3 are a majority of the voting view
+        p.addAck(Long.valueOf(2));
+        Assert.assertEquals(true, p.hasAllQuorums());
+    }
+}

Modified: zookeeper/trunk/src/java/test/org/apache/zookeeper/test/QuorumUtil.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/QuorumUtil.java?rev=1453693&r1=1453692&r2=1453693&view=diff
==============================================================================
--- zookeeper/trunk/src/java/test/org/apache/zookeeper/test/QuorumUtil.java (original)
+++ zookeeper/trunk/src/java/test/org/apache/zookeeper/test/QuorumUtil.java Thu Mar  7 06:01:49 2013
@@ -122,6 +122,9 @@ public class QuorumUtil {
         return peers.get(id);
     }
 
+    // This was added to avoid running into the problem of ZOOKEEPER-1539
+    public boolean disableJMXTest = false;
+    
     public void startAll() throws IOException {
         shutdownAll();
         for (int i = 1; i <= ALL; ++i) {
@@ -136,6 +139,9 @@ public class QuorumUtil {
             LOG.info(hp + " is accepting client connections");
         }
 
+        // This was added to avoid running into the problem of ZOOKEEPER-1539
+        if (disableJMXTest) return;
+        
         // interesting to see what's there...
         try {
             JMXEnv.dump();