You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by br...@apache.org on 2012/04/23 01:20:15 UTC

svn commit: r1328991 [1/2] - in /zookeeper/trunk: ./ src/java/main/org/apache/zookeeper/server/quorum/ src/java/main/org/apache/zookeeper/server/quorum/flexible/ src/java/test/org/apache/zookeeper/server/quorum/ src/java/test/org/apache/zookeeper/serve...

Author: breed
Date: Sun Apr 22 23:20:13 2012
New Revision: 1328991

URL: http://svn.apache.org/viewvc?rev=1328991&view=rev
Log:
ZOOKEEPER-1411. Consolidate membership management, distinguish between static and dynamic configuration parameters

Added:
    zookeeper/trunk/src/java/test/org/apache/zookeeper/server/util/DynamicConfigBCTest.java
Modified:
    zookeeper/trunk/CHANGES.txt
    zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FastLeaderElection.java
    zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Leader.java
    zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LearnerHandler.java
    zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java
    zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java
    zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeerConfig.java
    zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeerMain.java
    zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumZooKeeperServer.java
    zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/flexible/QuorumHierarchical.java
    zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/flexible/QuorumMaj.java
    zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/flexible/QuorumVerifier.java
    zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/QuorumPeerMainTest.java
    zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/QuorumPeerTestBase.java
    zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/Zab1_0Test.java
    zookeeper/trunk/src/java/test/org/apache/zookeeper/test/CnxManagerTest.java
    zookeeper/trunk/src/java/test/org/apache/zookeeper/test/FLETest.java
    zookeeper/trunk/src/java/test/org/apache/zookeeper/test/FLEZeroWeightTest.java
    zookeeper/trunk/src/java/test/org/apache/zookeeper/test/HierarchicalQuorumTest.java
    zookeeper/trunk/src/java/test/org/apache/zookeeper/test/LENonTerminateTest.java
    zookeeper/trunk/src/java/test/org/apache/zookeeper/test/ObserverTest.java
    zookeeper/trunk/src/java/test/org/apache/zookeeper/test/QuorumBase.java
    zookeeper/trunk/src/java/test/org/apache/zookeeper/test/QuorumUtil.java
    zookeeper/trunk/src/java/test/org/apache/zookeeper/test/StandaloneTest.java

Modified: zookeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/zookeeper/trunk/CHANGES.txt?rev=1328991&r1=1328990&r2=1328991&view=diff
==============================================================================
--- zookeeper/trunk/CHANGES.txt (original)
+++ zookeeper/trunk/CHANGES.txt Sun Apr 22 23:20:13 2012
@@ -276,6 +276,8 @@ IMPROVEMENTS:
   ZOOKEEPER-1432. Add javadoc and debug logging for checkACL() method in 
   PrepRequestProcessor (Eugene Koontz via michim)
 
+  ZOOKEEPER-1411. Consolidate membership management, distinguish between static and dynamic configuration parameters (Alex Shraer via breed)
+
 Release 3.4.0 - 
 
 Non-backward compatible changes:

Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FastLeaderElection.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FastLeaderElection.java?rev=1328991&r1=1328990&r2=1328991&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FastLeaderElection.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FastLeaderElection.java Sun Apr 22 23:20:13 2012
@@ -585,14 +585,14 @@ public class FastLeaderElection implemen
          * different zxids for a server depending on timing.
          */
         for (Map.Entry<Long,Vote> entry : votes.entrySet()) {
-            if (vote.equals(entry.getValue())){
+            if (self.getQuorumVerifier().getVotingMembers().containsKey(entry.getKey())
+                    && vote.equals(entry.getValue())){
                 set.add(entry.getKey());
             }
         }
 
         return self.getQuorumVerifier().containsQuorum(set);
     }
-
     /**
      * In the case there is a leader elected, and a quorum supporting
      * this leader, we have to check if the leader has voted and acked
@@ -665,7 +665,7 @@ public class FastLeaderElection implemen
      * @return long
      */
     private long getInitId(){
-        if(self.getLearnerType() == LearnerType.PARTICIPANT)
+        if(self.getQuorumVerifier().getVotingMembers().containsKey(self.getId()))       
             return self.getId();
         else return Long.MIN_VALUE;
     }

Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Leader.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Leader.java?rev=1328991&r1=1328990&r2=1328991&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Leader.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Leader.java Sun Apr 22 23:20:13 2012
@@ -396,7 +396,9 @@ public class Leader {
                     HashSet<Long> followerSet = new HashSet<Long>();
 
                     for(LearnerHandler f : getLearners()) {
-                        followerSet.add(f.getSid());
+                        if (self.getQuorumVerifier().getVotingMembers().containsKey(f.getSid())){
+                            followerSet.add(f.getSid());
+                        }
                     }
 
                     if (self.getQuorumVerifier().containsQuorum(followerSet)) {
@@ -455,7 +457,7 @@ public class Leader {
                 for (LearnerHandler f : getLearners()) {
                     // Synced set is used to check we have a supporting quorum, so only
                     // PARTICIPANT, not OBSERVER, learners should be used
-                    if (f.synced() && f.getLearnerType() == LearnerType.PARTICIPANT) {
+                    if (f.synced() && self.getQuorumVerifier().getVotingMembers().containsKey(f.getSid())) {
                         syncedSet.add(f.getSid());
                     }
                     f.ping();

Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LearnerHandler.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LearnerHandler.java?rev=1328991&r1=1328990&r2=1328991&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LearnerHandler.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LearnerHandler.java Sun Apr 22 23:20:13 2012
@@ -261,8 +261,12 @@ public class LearnerHandler extends Thre
             	this.sid = leader.followerCounter.getAndDecrement();
             }
 
-            LOG.info("Follower sid: " + sid + " : info : "
-                    + leader.self.quorumPeers.get(sid));
+            if (leader.self.getView().containsKey(this.sid)) {
+                LOG.info("Follower sid: " + this.sid + " : info : "
+                        + leader.self.getView().get(this.sid).toString());
+            } else {
+                LOG.info("Follower sid: " + this.sid + " not in the current config " + Long.toHexString(leader.self.getQuorumVerifier().getVersion()));
+            }
                         
             if (qp.getType() == Leader.OBSERVERINFO) {
                   learnerType = LearnerType.OBSERVER;

Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java?rev=1328991&r1=1328990&r2=1328991&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java Sun Apr 22 23:20:13 2012
@@ -337,8 +337,8 @@ public class QuorumCnxManager {
     synchronized void connectOne(long sid){
         if (senderWorkerMap.get(sid) == null){
             InetSocketAddress electionAddr;
-            if (self.quorumPeers.containsKey(sid)) {
-                electionAddr = self.quorumPeers.get(sid).electionAddr;
+            if (self.getView().containsKey(sid)) {
+                electionAddr = self.getView().get(sid).electionAddr;
             } else {
                 LOG.warn("Invalid server id: " + sid);
                 return;
@@ -479,12 +479,10 @@ public class QuorumCnxManager {
                 try {
                     ss = new ServerSocket();
                     ss.setReuseAddress(true);
-                    int port = self.quorumPeers.get(self.getId()).electionAddr
-                            .getPort();
-                    InetSocketAddress addr = new InetSocketAddress(port);
+                    InetSocketAddress addr = self.getElectionAddress();
                     LOG.info("My election bind port: " + addr.toString());
-                    setName(self.quorumPeers.get(self.getId()).electionAddr
-                            .toString());
+                    setName(addr.toString());
+                    // we used to bind to the * address but this seems more correct
                     ss.bind(addr);
                     while (!shutdown) {
                         Socket client = ss.accept();
@@ -513,7 +511,7 @@ public class QuorumCnxManager {
                 LOG.error("As I'm leaving the listener thread, "
                         + "I won't be able to participate in leader "
                         + "election any longer: "
-                        + self.quorumPeers.get(self.getId()).electionAddr);
+                        + self.getElectionAddress());
             }
         }
         
@@ -684,7 +682,7 @@ public class QuorumCnxManager {
                         self.getId() + " error = " + e);
             }
             this.finish();
-            LOG.warn("Send worker leaving thread");
+            LOG.warn("Send worker leaving thread " + " id " + sid + " my id = " + self.getId());
         }
     }
 

Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java?rev=1328991&r1=1328990&r2=1328991&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java Sun Apr 22 23:20:13 2012
@@ -25,6 +25,8 @@ import java.io.FileOutputStream;
 import java.io.FileReader;
 import java.io.IOException;
 import java.io.OutputStreamWriter;
+import java.io.StringReader;
+import java.io.StringWriter;
 import java.net.DatagramPacket;
 import java.net.DatagramSocket;
 import java.net.InetSocketAddress;
@@ -35,6 +37,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Properties;
 
 import org.apache.zookeeper.jmx.MBeanRegistry;
 import org.apache.zookeeper.jmx.ZKMBeanInfo;
@@ -42,6 +45,7 @@ import org.apache.zookeeper.server.Serve
 import org.apache.zookeeper.server.ZKDatabase;
 import org.apache.zookeeper.server.ZooKeeperServer;
 import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
+import org.apache.zookeeper.server.quorum.QuorumPeerConfig.ConfigException;
 import org.apache.zookeeper.server.quorum.flexible.QuorumMaj;
 import org.apache.zookeeper.server.quorum.flexible.QuorumVerifier;
 import org.apache.zookeeper.server.util.ZxidUtils;
@@ -92,17 +96,97 @@ public class QuorumPeer extends Thread i
     private ZKDatabase zkDb;
 
     public static class QuorumServer {
+        public InetSocketAddress addr = null;
+
+        public InetSocketAddress electionAddr = null;
+        
+        public InetSocketAddress clientAddr = null;
+        
+        public long id;
+        
+        public LearnerType type = LearnerType.PARTICIPANT;
+        
+        
+        public QuorumServer(long id, InetSocketAddress addr,
+                InetSocketAddress electionAddr, InetSocketAddress clientAddr) {
+            this.id = id;
+            this.addr = addr;
+            this.electionAddr = electionAddr;
+            this.clientAddr = clientAddr;
+        }
+
+        
         public QuorumServer(long id, InetSocketAddress addr,
                 InetSocketAddress electionAddr) {
             this.id = id;
             this.addr = addr;
             this.electionAddr = electionAddr;
+            this.clientAddr = null;
         }
 
         public QuorumServer(long id, InetSocketAddress addr) {
             this.id = id;
             this.addr = addr;
             this.electionAddr = null;
+            this.clientAddr = null;
+        }
+        
+        
+        private void setType(String s) throws ConfigException {
+            if (s.toLowerCase().equals("observer")) {
+               type = LearnerType.OBSERVER;
+           } else if (s.toLowerCase().equals("participant")) {
+               type = LearnerType.PARTICIPANT;
+            } else {
+               throw new ConfigException("Unrecognised peertype: " + s);
+            }   
+        }       
+
+        private static final String wrongFormat = " does not have the form server_cofig or server_config;client_config"+
+        " where server_config is host:port:port or host:port:port:type and client_config is port or host:port";
+        
+        public QuorumServer(long sid, String addressStr) throws ConfigException {
+            // LOG.warn("sid = " + sid + " addressStr = " + addressStr);
+            this.id = sid;
+            String serverClientParts[] = addressStr.split(";");
+            String serverParts[] = serverClientParts[0].split(":");
+            if ((serverClientParts.length > 2) || (serverParts.length < 3)
+                    || (serverParts.length > 4)) {
+                throw new ConfigException(addressStr + wrongFormat);
+            }
+
+            if (serverClientParts.length == 2) {
+                //LOG.warn("ClientParts: " + serverClientParts[1]);
+                String clientParts[] = serverClientParts[1].split(":");
+                if (clientParts.length > 2) {
+                    throw new ConfigException(addressStr + wrongFormat);
+                }
+
+                // is client_config a host:port or just a port
+                String hostname = (clientParts.length == 2) ? clientParts[0] : serverParts[0];
+                try {
+                    clientAddr = new InetSocketAddress(hostname,
+                            Integer.parseInt(clientParts[clientParts.length - 1]));
+                    //LOG.warn("Set clientAddr to " + clientAddr);
+                } catch (NumberFormatException e) {
+                    throw new ConfigException("Address unresolved: " + hostname + ":" + clientParts[clientParts.length - 1]);
+                }
+            }
+
+            // server_config should be either host:port:port or host:port:port:type
+            try {
+                addr = new InetSocketAddress(serverParts[0],
+                        Integer.parseInt(serverParts[1]));
+            } catch (NumberFormatException e) {
+                throw new ConfigException("Address unresolved: " + serverParts[0] + ":" + serverParts[1]);
+            }
+            try {
+                electionAddr = new InetSocketAddress(serverParts[0], 
+                        Integer.parseInt(serverParts[2]));
+            } catch (NumberFormatException e) {
+                throw new ConfigException("Address unresolved: " + serverParts[0] + ":" + serverParts[2]);
+            }
+            if (serverParts.length == 4) setType(serverParts[3]);
         }
 
         public QuorumServer(long id, InetSocketAddress addr,
@@ -111,17 +195,63 @@ public class QuorumPeer extends Thread i
             this.addr = addr;
             this.electionAddr = electionAddr;
             this.type = type;
+            this.clientAddr = null;
         }
 
-        public InetSocketAddress addr;
-
-        public InetSocketAddress electionAddr;
-
-        public long id;
+    public QuorumServer(long id, InetSocketAddress addr,
+                InetSocketAddress electionAddr, InetSocketAddress clientAddr, LearnerType type) {
+        this.id = id;
+        this.addr = addr;
+        this.electionAddr = electionAddr;
+        this.type = type;
+        this.clientAddr = clientAddr;
+    }
+
+        public String toString(){
+            StringWriter sw = new StringWriter();            
+            //addr should never be null, but just to make sure
+            if (addr !=null) { 
+                sw.append(addr.getHostName());
+                sw.append(":");
+                sw.append(String.valueOf(addr.getPort()));
+            }
+            if (electionAddr!=null){
+                sw.append(":");
+                sw.append(String.valueOf(electionAddr.getPort()));
+            }           
+            if (type == LearnerType.OBSERVER) sw.append(":observer");
+            else if (type == LearnerType.PARTICIPANT) sw.append(":participant");            
+            if (clientAddr!=null){
+                sw.append(";");
+                sw.append(clientAddr.getHostName() + ":" + String.valueOf(clientAddr.getPort()));
+            }
+            return sw.toString();       
+        }
 
-        public LearnerType type = LearnerType.PARTICIPANT;
+        public int hashCode() {
+          assert false : "hashCode not designed";
+          return 42; // any arbitrary constant will do 
+        }
+        
+        private boolean checkAddressesEqual(InetSocketAddress addr1, InetSocketAddress addr2){
+            if ((addr1 == null && addr2!=null) ||
+                (addr1!=null && addr2==null) ||
+                (addr1!=null && addr2!=null && !addr1.equals(addr2))) return false;
+            return true;
+        }
+        
+        public boolean equals(Object o){
+            if (!(o instanceof QuorumServer)) return false;
+            QuorumServer qs = (QuorumServer)o;          
+            if ((qs.id != id) || (qs.type != type)) return false;   
+            if (!checkAddressesEqual(addr, qs.addr)) return false;
+            if (!checkAddressesEqual(electionAddr, qs.electionAddr)) return false;
+            if (!checkAddressesEqual(clientAddr, qs.clientAddr)) return false;                    
+            return true;
+        }
     }
 
+
     public enum ServerState {
         LOOKING, FOLLOWING, LEADING, OBSERVING;
     }
@@ -165,18 +295,35 @@ public class QuorumPeer extends Thread i
      */
     public void setLearnerType(LearnerType p) {
         learnerType = p;
-        if (quorumPeers.containsKey(this.myid)) {
-            this.quorumPeers.get(myid).type = p;
-        } else {
-            LOG.error("Setting LearnerType to " + p + " but " + myid
-                    + " not in QuorumPeers. ");
-        }
+    }
 
+       
+    protected synchronized void setDynamicConfigFilename(String s) {
+        dynamicConfigFilename = s;
     }
-    /**
-     * The servers that make up the cluster
-     */
-    protected Map<Long, QuorumServer> quorumPeers;
+
+    protected synchronized String getDynamicConfigFilename() {
+        return dynamicConfigFilename;
+    }
+
+    protected synchronized void setConfigFileName(String s) {
+        configFilename = s;
+    }
+
+    protected synchronized void setConfigBackwardCompatibility(boolean bc) {
+        configBackwardCompatibility = bc;
+    }
+    
+    protected synchronized boolean getConfigBackwardCompatibility() {
+        return configBackwardCompatibility;
+    }
+    
+    private String dynamicConfigFilename = null;
+    
+    private String configFilename = null;
+    
+    private boolean configBackwardCompatibility = false;
+
     public int getQuorumSize(){
         return getVotingView().size();
     }
@@ -185,7 +332,7 @@ public class QuorumPeer extends Thread i
      * QuorumVerifier implementation; default (majority).
      */
 
-    private QuorumVerifier quorumConfig;
+    private QuorumVerifier quorumVerifier;
 
     /**
      * My id
@@ -347,11 +494,33 @@ public class QuorumPeer extends Thread i
     DatagramSocket udpSocket;
 
     private InetSocketAddress myQuorumAddr;
+    private InetSocketAddress myElectionAddr = null;
+    private InetSocketAddress myClientAddr = null;
 
-    public InetSocketAddress getQuorumAddress(){
+    public synchronized InetSocketAddress getQuorumAddress(){
         return myQuorumAddr;
     }
+    
+    public synchronized void setQuorumAddress(InetSocketAddress addr){
+        myQuorumAddr = addr;
+    }
+    
+    public InetSocketAddress getElectionAddress(){
+        return myElectionAddr;
+    }
 
+    public void setElectionAddress(InetSocketAddress addr){
+        myElectionAddr = addr;
+    }
+    
+    public InetSocketAddress getClientAddress(){
+        return myClientAddr;
+    }
+    
+    public void setClientAddress(InetSocketAddress addr){
+        myClientAddr = addr;
+    }
+    
     private int electionType;
 
     Election electionAlg;
@@ -375,19 +544,18 @@ public class QuorumPeer extends Thread i
             File dataLogDir, int electionType,
             long myid, int tickTime, int initLimit, int syncLimit,
             ServerCnxnFactory cnxnFactory) throws IOException {
-        this(quorumPeers, dataDir, dataLogDir, electionType, myid, tickTime,
-                initLimit, syncLimit, cnxnFactory,
-                new QuorumMaj(countParticipants(quorumPeers)));
+        this(quorumPeers, dataDir, dataLogDir, electionType, myid, tickTime, 
+                initLimit, syncLimit, cnxnFactory, 
+                new QuorumMaj(quorumPeers), null);
     }
 
     public QuorumPeer(Map<Long, QuorumServer> quorumPeers, File dataDir,
             File dataLogDir, int electionType,
             long myid, int tickTime, int initLimit, int syncLimit,
             ServerCnxnFactory cnxnFactory,
-            QuorumVerifier quorumConfig) throws IOException {
+            QuorumVerifier quorumConfig, String memFilename) throws IOException {
         this();
         this.cnxnFactory = cnxnFactory;
-        this.quorumPeers = quorumPeers;
         this.electionType = electionType;
         this.myid = myid;
         this.tickTime = tickTime;
@@ -395,9 +563,9 @@ public class QuorumPeer extends Thread i
         this.syncLimit = syncLimit;
         this.logFactory = new FileTxnSnapLog(dataLogDir, dataDir);
         this.zkDb = new ZKDatabase(this.logFactory);
-        if(quorumConfig == null)
-            this.quorumConfig = new QuorumMaj(countParticipants(quorumPeers));
-        else this.quorumConfig = quorumConfig;
+        this.dynamicConfigFilename = (memFilename != null) ? memFilename : "zoo_replicated" + myid + ".dynamic";
+        if(quorumConfig == null) quorumConfig = new QuorumMaj(quorumPeers);
+        setQuorumVerifier(quorumConfig, false);
     }
 
     QuorumStats quorumStats() {
@@ -506,6 +674,7 @@ public class QuorumPeer extends Thread i
     }
 
     /**
+
      * This constructor is only used by the existing unit test code.
      * It defaults to FileLogProvider persistence provider.
      */
@@ -517,7 +686,7 @@ public class QuorumPeer extends Thread i
         this(quorumPeers, snapDir, logDir, electionAlg,
                 myid,tickTime, initLimit,syncLimit,
                 ServerCnxnFactory.createFactory(new InetSocketAddress(clientPort), -1),
-                new QuorumMaj(countParticipants(quorumPeers)));
+                new QuorumMaj(quorumPeers), null);
     }
 
     /**
@@ -533,7 +702,7 @@ public class QuorumPeer extends Thread i
         this(quorumPeers, snapDir, logDir, electionAlg,
                 myid,tickTime, initLimit,syncLimit,
                 ServerCnxnFactory.createFactory(new InetSocketAddress(clientPort), -1),
-                quorumConfig);
+                quorumConfig, null);
     }
 
     /**
@@ -802,7 +971,7 @@ public class QuorumPeer extends Thread i
      * ensemble.
      */
     public Map<Long,QuorumPeer.QuorumServer> getView() {
-        return Collections.unmodifiableMap(this.quorumPeers);
+        return Collections.unmodifiableMap(getQuorumVerifier().getAllMembers());
     }
 
     /**
@@ -842,7 +1011,7 @@ public class QuorumPeer extends Thread i
      * is introduced will this be more useful.
      */
     public boolean viewContains(Long sid) {
-        return this.quorumPeers.containsKey(sid);
+        return this.getView().containsKey(sid);
     }
 
     /**
@@ -964,19 +1133,54 @@ public class QuorumPeer extends Thread i
         return tick;
     }
 
+    public QuorumVerifier configFromString(String s) throws IOException, ConfigException{
+        Properties props = new Properties();        
+        props.load(new StringReader(s));
+        
+        QuorumPeerConfig config = new QuorumPeerConfig();
+        config.parseDynamicConfig(props, electionType);
+        
+        return config.getQuorumVerifier();
+    }
+    
     /**
      * Return QuorumVerifier object
      */
 
-    public QuorumVerifier getQuorumVerifier(){
-        return quorumConfig;
+    public synchronized QuorumVerifier getQuorumVerifier(){
+        return quorumVerifier;
 
     }
 
-    public void setQuorumVerifier(QuorumVerifier quorumConfig){
-       this.quorumConfig = quorumConfig;
-    }
+    public synchronized QuorumVerifier setQuorumVerifier(QuorumVerifier qv, boolean writeToDisk){
+        if ((quorumVerifier != null) && (quorumVerifier.getVersion() >= qv.getVersion())) {
+           LOG.warn("setQuorumVerifier called with stale config " + qv.getVersion() + 
+                   ". Current version: " + quorumVerifier.getVersion());
+           return quorumVerifier;  
+        }
+        QuorumVerifier prevQV = quorumVerifier;
+        quorumVerifier = qv;
+        
+        if (writeToDisk) {
+            try {
+                QuorumPeerConfig.writeDynamicConfig(dynamicConfigFilename, configFilename, configBackwardCompatibility, qv);
+                if (configBackwardCompatibility) {
+                    dynamicConfigFilename = configFilename + ".dynamic";
+                    configBackwardCompatibility = false;
+                }
+            } catch(IOException e){
+                LOG.error("Error closing file: ", e.getMessage());     
+            }
+        }
 
+        QuorumServer qs = qv.getAllMembers().get(getId());
+        if (qs!=null){
+            setQuorumAddress(qs.addr);
+            setElectionAddress(qs.electionAddr);
+            setClientAddress(qs.clientAddr);
+        }
+        return prevQV;
+    }
     /**
      * Get an instance of LeaderElection
      */
@@ -1021,10 +1225,6 @@ public class QuorumPeer extends Thread i
         this.cnxnFactory = cnxnFactory;
     }
 
-    public void setQuorumPeers(Map<Long,QuorumServer> quorumPeers) {
-        this.quorumPeers = quorumPeers;
-    }
-
     public int getClientPort() {
         return cnxnFactory.getLocalPort();
     }

Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeerConfig.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeerConfig.java?rev=1328991&r1=1328990&r2=1328991&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeerConfig.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeerConfig.java Sun Apr 22 23:20:13 2012
@@ -19,15 +19,15 @@
 package org.apache.zookeeper.server.quorum;
 
 import java.io.BufferedReader;
+import java.io.BufferedWriter;
 import java.io.File;
 import java.io.FileInputStream;
+import java.io.FileOutputStream;
 import java.io.FileReader;
+import java.io.FileWriter;
 import java.io.IOException;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
 import java.util.Properties;
 import java.util.Map.Entry;
 
@@ -35,6 +35,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.slf4j.MDC;
 
+
 import org.apache.zookeeper.server.ZooKeeperServer;
 import org.apache.zookeeper.server.quorum.QuorumPeer.LearnerType;
 import org.apache.zookeeper.server.quorum.QuorumPeer.QuorumServer;
@@ -43,12 +44,16 @@ import org.apache.zookeeper.server.quoru
 import org.apache.zookeeper.server.quorum.flexible.QuorumVerifier;
 import org.apache.zookeeper.server.util.VerifyingFileFactory;
 
+
 public class QuorumPeerConfig {
     private static final Logger LOG = LoggerFactory.getLogger(QuorumPeerConfig.class);
 
     protected InetSocketAddress clientPortAddress;
     protected File dataDir;
     protected File dataLogDir;
+    protected boolean configBackwardCompatibilityMode = false;
+    protected String dynamicConfigFileStr = null;
+    protected String configFileStr = null;
     protected int tickTime = ZooKeeperServer.DEFAULT_TICK_TIME;
     protected int maxClientCnxns = 60;
     /** defaults to -1 if not set explicitly */
@@ -60,16 +65,10 @@ public class QuorumPeerConfig {
     protected int syncLimit;
     protected int electionAlg = 3;
     protected int electionPort = 2182;
-    protected final HashMap<Long,QuorumServer> servers =
-        new HashMap<Long, QuorumServer>();
-    protected final HashMap<Long,QuorumServer> observers =
-        new HashMap<Long, QuorumServer>();
 
     protected long serverId;
-    protected HashMap<Long, Long> serverWeight = new HashMap<Long, Long>();
-    protected HashMap<Long, Long> serverGroup = new HashMap<Long, Long>();
-    protected int numGroups = 0;
-    protected QuorumVerifier quorumVerifier;
+
+    protected QuorumVerifier quorumVerifier = null;
     protected int snapRetainCount = 3;
     protected int purgeInterval = 0;
 
@@ -98,13 +97,13 @@ public class QuorumPeerConfig {
      */
     public void parse(String path) throws ConfigException {
         LOG.info("Reading configuration from: " + path);
-
+       
         try {
             File configFile = (new VerifyingFileFactory.Builder(LOG)
                 .warnForRelativePath()
                 .failForNonExistingPath()
                 .build()).create(path);
-
+                
             Properties cfg = new Properties();
             FileInputStream in = new FileInputStream(configFile);
             try {
@@ -112,12 +111,41 @@ public class QuorumPeerConfig {
             } finally {
                 in.close();
             }
-
+            
             parseProperties(cfg);
+            
+            // backward compatibility - dynamic configuration in the same file as static configuration params
+            // see writeDynamicConfig() - we change the config file to new format if reconfig happens
+            if (dynamicConfigFileStr == null) {
+                configBackwardCompatibilityMode = true;
+                configFileStr = path;                
+                parseDynamicConfig(cfg, electionAlg);
+                checkValidity();                
+            }
+
         } catch (IOException e) {
             throw new ConfigException("Error processing " + path, e);
         } catch (IllegalArgumentException e) {
             throw new ConfigException("Error processing " + path, e);
+        }   
+        
+        if (dynamicConfigFileStr!=null) {
+           try {           
+               Properties dynamicCfg = new Properties();
+               FileInputStream inConfig = new FileInputStream(dynamicConfigFileStr);
+               try {
+                   dynamicCfg.load(inConfig);
+               } finally {
+                   inConfig.close();
+               }
+               parseDynamicConfig(dynamicCfg, electionAlg);
+               checkValidity();
+           
+           } catch (IOException e) {
+               throw new ConfigException("Error processing " + dynamicConfigFileStr, e);
+           } catch (IllegalArgumentException e) {
+               throw new ConfigException("Error processing " + dynamicConfigFileStr, e);
+           }                   
         }
     }
 
@@ -166,63 +194,14 @@ public class QuorumPeerConfig {
                 {
                     throw new ConfigException("Unrecognised peertype: " + value);
                 }
+            } else if (key.equals("dynamicConfigFile")){
+               dynamicConfigFileStr = value;
             } else if (key.equals("autopurge.snapRetainCount")) {
                 snapRetainCount = Integer.parseInt(value);
             } else if (key.equals("autopurge.purgeInterval")) {
                 purgeInterval = Integer.parseInt(value);
-            } else if (key.startsWith("server.")) {
-                int dot = key.indexOf('.');
-                long sid = Long.parseLong(key.substring(dot + 1));
-                String parts[] = value.split(":");
-                if ((parts.length != 2) && (parts.length != 3) && (parts.length !=4)) {
-                    LOG.error(value
-                       + " does not have the form host:port or host:port:port " +
-                       " or host:port:port:type");
-                }
-                InetSocketAddress addr = new InetSocketAddress(parts[0],
-                        Integer.parseInt(parts[1]));
-                if (parts.length == 2) {
-                    servers.put(Long.valueOf(sid), new QuorumServer(sid, addr));
-                } else if (parts.length == 3) {
-                    InetSocketAddress electionAddr = new InetSocketAddress(
-                            parts[0], Integer.parseInt(parts[2]));
-                    servers.put(Long.valueOf(sid), new QuorumServer(sid, addr,
-                            electionAddr));
-                } else if (parts.length == 4) {
-                    InetSocketAddress electionAddr = new InetSocketAddress(
-                            parts[0], Integer.parseInt(parts[2]));
-                    LearnerType type = LearnerType.PARTICIPANT;
-                    if (parts[3].equalsIgnoreCase("observer")) {
-                        type = LearnerType.OBSERVER;
-                        observers.put(Long.valueOf(sid), new QuorumServer(sid, addr,
-                                electionAddr,type));
-                    } else if (parts[3].equalsIgnoreCase("participant")) {
-                        type = LearnerType.PARTICIPANT;
-                        servers.put(Long.valueOf(sid), new QuorumServer(sid, addr,
-                                electionAddr,type));
-                    } else {
-                        throw new ConfigException("Unrecognised peertype: " + value);
-                    }
-                }
-            } else if (key.startsWith("group")) {
-                int dot = key.indexOf('.');
-                long gid = Long.parseLong(key.substring(dot + 1));
-
-                numGroups++;
-
-                String parts[] = value.split(":");
-                for(String s : parts){
-                    long sid = Long.parseLong(s);
-                    if(serverGroup.containsKey(sid))
-                        throw new ConfigException("Server " + sid + "is in multiple groups");
-                    else
-                        serverGroup.put(sid, gid);
-                }
-
-            } else if(key.startsWith("weight")) {
-                int dot = key.indexOf('.');
-                long sid = Long.parseLong(key.substring(dot + 1));
-                serverWeight.put(sid, Long.parseLong(value));
+            } else if ((key.startsWith("server.") || key.startsWith("group") || key.startsWith("weight")) && zkProp.entrySet().contains("dynamicConfigFile")){                
+               throw new ConfigException("parameter: " + key + " must be in a separate dynamic config file");
             } else {
                 System.setProperty("zookeeper." + key, value);
             }
@@ -248,97 +227,197 @@ public class QuorumPeerConfig {
                         + " is missing.");
             }
         }
-        if (clientPort == 0) {
-            throw new IllegalArgumentException("clientPort is not set");
-        }
-        if (clientPortAddress == null) {
-            this.clientPortAddress = new InetSocketAddress(clientPort);
-        } else {
-            this.clientPortAddress = new InetSocketAddress(
-                    InetAddress.getByName(clientPortAddress), clientPort);
-        }
-
+        if (clientPortAddress != null) {
+           if (clientPort == 0) {
+               throw new IllegalArgumentException("clientPortAddress is set but clientPort is not set");
+        }
+             this.clientPortAddress = new InetSocketAddress(
+                      InetAddress.getByName(clientPortAddress), clientPort);
+        } else if (clientPort!=0){
+             this.clientPortAddress = new InetSocketAddress(clientPort);
+        }    
         if (tickTime == 0) {
             throw new IllegalArgumentException("tickTime is not set");
         }
         if (minSessionTimeout > maxSessionTimeout) {
             throw new IllegalArgumentException(
                     "minSessionTimeout must not be larger than maxSessionTimeout");
-        }
-        if (servers.size() == 0) {
-            if (observers.size() > 0) {
+        }          
+    }
+    
+    /**
+     * Writes dynamic configuration file, updates static config file if needed. 
+     * @param dynamicConfigFilename
+     * @param configFileStr
+     * @param configBackwardCompatibilityMode
+     * @param qv
+     */
+    public static void writeDynamicConfig(String dynamicConfigFilename, String configFileStr, 
+            boolean configBackwardCompatibilityMode, QuorumVerifier qv) throws IOException {                             
+        FileOutputStream outConfig = null;
+       try {
+           byte b[] = qv.toByteArray();                                            
+           if (configBackwardCompatibilityMode) {
+               dynamicConfigFilename = configFileStr + ".dynamic";
+           }
+           String tmpFilename = dynamicConfigFilename + ".tmp";
+           outConfig = new FileOutputStream(tmpFilename);
+           
+           outConfig.write(b);
+           outConfig.close();
+           File curFile = new File(dynamicConfigFilename);
+           File tmpFile = new File(tmpFilename);
+           if (!tmpFile.renameTo(curFile)) {
+               throw new IOException("renaming " + tmpFile.toString() + " to " + curFile.toString() + " failed!");
+           }
+       } finally{
+           if (outConfig!=null) { 
+               outConfig.close();
+           }
+       }
+       // the following is for users who run without a dynamic config file (old config file)
+       // if the configuration changes (reconfiguration executes), we create a dynamic config
+       // file, remove all the dynamic definitions from the config file and add a pointer
+       // to the config file. The dynamic config file's name will be the same as the config file's
+       // with ".dynamic" appended to it
+       
+        if (configBackwardCompatibilityMode) {
+           BufferedWriter out = null;
+               try {
+                   File configFile = (new VerifyingFileFactory.Builder(LOG)
+                       .warnForRelativePath()
+                       .failForNonExistingPath()
+                       .build()).create(configFileStr);
+                       
+                   Properties cfg = new Properties();
+                   FileInputStream in = new FileInputStream(configFile);
+                   try {
+                       cfg.load(in);
+                   } finally {
+                       in.close();
+                   }
+                   String tmpFilename = configFileStr + ".tmp";                    
+                   FileWriter fstream = new FileWriter(tmpFilename);
+                   out = new BufferedWriter(fstream);                 
+                   
+                   for (Entry<Object, Object> entry : cfg.entrySet()) {
+                       String key = entry.getKey().toString().trim();
+                       String value = entry.getValue().toString().trim();    
+                       if (!key.startsWith("server.") && !key.startsWith("group") 
+                               && !key.startsWith("weight") && !key.equals("clientPort") && !key.equals("clientPortAddress")){
+                           out.write(key.concat("=").concat(value).concat("\n"));
+                       }
+                   }                      
+                   out.write("dynamicConfigFile=".concat(dynamicConfigFilename).concat("\n"));
+                   out.close();
+                   File tmpFile = new File(tmpFilename);
+                   if (!tmpFile.renameTo(configFile)) {
+                       throw new IOException("renaming " + tmpFile.toString() + " to " + configFile.toString() + " failed!");
+                   }
+               } finally{
+                   if (out!=null) {
+                           out.close();
+                   }
+               }
+           }
+   } 
+    public static void deleteFile(String filename){        
+       File f = new File(filename);
+       if (f.exists()) {
+           try{ 
+               f.delete();
+           } catch (Exception e) {
+               LOG.warn("deleting " + filename + " failed");
+           }
+       }                   
+    }
+    
+    
+    private QuorumVerifier createQuorumVerifier(Properties dynamicConfigProp, boolean isHierarchical) throws ConfigException{
+       if(isHierarchical){
+            return new QuorumHierarchical(dynamicConfigProp);
+        } else {
+           /*
+             * The default QuorumVerifier is QuorumMaj
+             */        
+            //LOG.info("Defaulting to majority quorums");
+            return new QuorumMaj(dynamicConfigProp);            
+        }          
+    }
+    
+    /**
+     * Parse dynamic configuration file.
+     * @param zkProp Properties to parse from.
+     * @throws IOException
+     * @throws ConfigException
+     */
+    public void parseDynamicConfig(Properties dynamicConfigProp, int eAlg)
+    throws IOException, ConfigException {
+       boolean isHierarchical = false;
+        for (Entry<Object, Object> entry : dynamicConfigProp.entrySet()) {
+            String key = entry.getKey().toString().trim();                    
+            if (key.startsWith("group") || key.startsWith("weight")) {
+               isHierarchical = true;
+            } else if (!configBackwardCompatibilityMode && !key.startsWith("server.") && !key.equals("version")){ 
+               LOG.info(dynamicConfigProp.toString());
+               throw new ConfigException("Unrecognised parameter: " + key);                
+            }
+        }
+        
+        quorumVerifier = createQuorumVerifier(dynamicConfigProp, isHierarchical);                      
+               
+        int numParticipators = quorumVerifier.getVotingMembers().size();
+        int numObservers = quorumVerifier.getObservingMembers().size();        
+        if (numParticipators == 0) {
+            if (numObservers > 0) {
                 throw new IllegalArgumentException("Observers w/o participants is an invalid configuration");
             }
             // Not a quorum configuration so return immediately - not an error
             // case (for b/w compatibility), server will default to standalone
             // mode.
             return;
-        } else if (servers.size() == 1) {
-            if (observers.size() > 0) {
+        } else if (numParticipators == 1) {            
+            if (numObservers > 0) {
                 throw new IllegalArgumentException("Observers w/o quorum is an invalid configuration");
             }
 
             // HBase currently adds a single server line to the config, for
             // b/w compatibility reasons we need to keep this here.
             LOG.error("Invalid configuration, only one server specified (ignoring)");
-            servers.clear();
-        } else if (servers.size() > 1) {
-            if (servers.size() == 2) {
+            //servers.clear();
+        } else if (numParticipators > 1) {         
+            if (numParticipators == 2) {
                 LOG.warn("No server failure will be tolerated. " +
                     "You need at least 3 servers.");
-            } else if (servers.size() % 2 == 0) {
+            } else if (numParticipators % 2 == 0) {
                 LOG.warn("Non-optimial configuration, consider an odd number of servers.");
             }
-            if (initLimit == 0) {
-                throw new IllegalArgumentException("initLimit is not set");
-            }
-            if (syncLimit == 0) {
-                throw new IllegalArgumentException("syncLimit is not set");
-            }
             /*
              * If using FLE, then every server requires a separate election
              * port.
-             */
-            if (electionAlg != 0) {
-                for (QuorumServer s : servers.values()) {
-                    if (s.electionAddr == null)
-                        throw new IllegalArgumentException(
-                                "Missing election port for server: " + s.id);
-                }
-            }
-
-            /*
-             * Default of quorum config is majority
-             */
-            if(serverGroup.size() > 0){
-                if(servers.size() != serverGroup.size())
-                    throw new ConfigException("Every server must be in exactly one group");
-                /*
-                 * The deafult weight of a server is 1
-                 */
-                for(QuorumServer s : servers.values()){
-                    if(!serverWeight.containsKey(s.id))
-                        serverWeight.put(s.id, (long) 1);
-                }
-
-                /*
-                 * Set the quorumVerifier to be QuorumHierarchical
-                 */
-                quorumVerifier = new QuorumHierarchical(numGroups,
-                        serverWeight, serverGroup);
-            } else {
-                /*
-                 * The default QuorumVerifier is QuorumMaj
-                 */
-
-                LOG.info("Defaulting to majority quorums");
-                quorumVerifier = new QuorumMaj(servers.size());
-            }
+             */            
+           if (eAlg != 0) {
+               for (QuorumServer s : quorumVerifier.getVotingMembers().values()) {
+                   if (s.electionAddr == null)
+                       throw new IllegalArgumentException(
+                               "Missing election port for server: " + s.id);
+               }
+           }   
+        }
+    }
+    
 
-            // Now add observers to servers, once the quorums have been
-            // figured out
-            servers.putAll(observers);
+    public void checkValidity() throws IOException, ConfigException{
 
+       if (quorumVerifier.getVotingMembers().size() > 1) {
+           if (initLimit == 0) {
+               throw new IllegalArgumentException("initLimit is not set");
+           }
+           if (syncLimit == 0) {
+               throw new IllegalArgumentException("syncLimit is not set");
+           }
+            
+                                     
             File myIdFile = new File(dataDir, "myid");
             if (!myIdFile.exists()) {
                 throw new IllegalArgumentException(myIdFile.toString()
@@ -359,8 +438,18 @@ public class QuorumPeerConfig {
                         + " is not a number");
             }
 
+            QuorumServer qs = quorumVerifier.getAllMembers().get(serverId);
+            if (clientPortAddress!=null && qs!=null && qs.clientAddr!=null){ 
+               if ((!clientPortAddress.getAddress().isAnyLocalAddress()
+                       && !clientPortAddress.equals(qs.clientAddr)) || 
+                   (clientPortAddress.getAddress().isAnyLocalAddress() 
+                       && clientPortAddress.getPort()!=qs.clientAddr.getPort())) 
+               throw new ConfigException("client address for this server (id = " + serverId + ") in static config file is " + clientPortAddress + " is different from client address found in dynamic file: " + qs.clientAddr);                    
+           } 
+            if (qs!=null) clientPortAddress = qs.clientAddr;                       
+            
             // Warn about inconsistent peer type
-            LearnerType roleByServersList = observers.containsKey(serverId) ? LearnerType.OBSERVER
+            LearnerType roleByServersList = quorumVerifier.getObservingMembers().containsKey(serverId) ? LearnerType.OBSERVER
                     : LearnerType.PARTICIPANT;
             if (roleByServersList != peerType) {
                 LOG.warn("Peer type from servers list (" + roleByServersList
@@ -369,9 +458,11 @@ public class QuorumPeerConfig {
 
                 peerType = roleByServersList;
             }
-        }
+           
+       }
+       
     }
-
+    
     public InetSocketAddress getClientPortAddress() { return clientPortAddress; }
     public File getDataDir() { return dataDir; }
     public File getDataLogDir() { return dataLogDir; }
@@ -397,15 +488,24 @@ public class QuorumPeerConfig {
         return quorumVerifier;
     }
 
-    public Map<Long,QuorumServer> getServers() {
-        return Collections.unmodifiableMap(servers);
-    }
-
     public long getServerId() { return serverId; }
 
-    public boolean isDistributed() { return servers.size() > 1; }
+    public boolean isDistributed() { return (quorumVerifier!=null && quorumVerifier.getVotingMembers().size() > 1); }
 
     public LearnerType getPeerType() {
         return peerType;
     }
+    
+    public String getDynamicConfigFilename() {
+       return dynamicConfigFileStr;
+    }
+    
+    public String getConfigFilename(){
+        return configFileStr;
+    }
+    
+    public boolean getConfigBackwardCompatibility(){
+        return configBackwardCompatibilityMode;
+    }
+    
 }

Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeerMain.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeerMain.java?rev=1328991&r1=1328990&r2=1328991&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeerMain.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeerMain.java Sun Apr 22 23:20:13 2012
@@ -110,8 +110,8 @@ public class QuorumPeerMain {
                 .getDataDir(), config.getDataLogDir(), config
                 .getSnapRetainCount(), config.getPurgeInterval());
         purgeMgr.start();
-
-        if (args.length == 1 && config.servers.size() > 0) {
+        
+        if (args.length == 1 && config.isDistributed()) {
             runFromConfig(config);
         } else {
             LOG.warn("Either no config or no quorum defined in config, running "
@@ -134,12 +134,11 @@ public class QuorumPeerMain {
           cnxnFactory.configure(config.getClientPortAddress(),
                                 config.getMaxClientCnxns());
   
-          quorumPeer = new QuorumPeer();
-          quorumPeer.setClientPortAddress(config.getClientPortAddress());
+          quorumPeer = new QuorumPeer();          
           quorumPeer.setTxnFactory(new FileTxnSnapLog(
                       config.getDataLogDir(),
                       config.getDataDir()));
-          quorumPeer.setQuorumPeers(config.getServers());
+          //quorumPeer.setQuorumPeers(config.getAllMembers());
           quorumPeer.setElectionType(config.getElectionAlg());
           quorumPeer.setMyid(config.getServerId());
           quorumPeer.setTickTime(config.getTickTime());
@@ -147,11 +146,14 @@ public class QuorumPeerMain {
           quorumPeer.setMaxSessionTimeout(config.getMaxSessionTimeout());
           quorumPeer.setInitLimit(config.getInitLimit());
           quorumPeer.setSyncLimit(config.getSyncLimit());
-          quorumPeer.setQuorumVerifier(config.getQuorumVerifier());
-          quorumPeer.setCnxnFactory(cnxnFactory);
+          quorumPeer.setDynamicConfigFilename(config.getDynamicConfigFilename());
+          quorumPeer.setConfigFileName(config.getConfigFilename());
+          quorumPeer.setConfigBackwardCompatibility(config.getConfigBackwardCompatibility());
           quorumPeer.setZKDatabase(new ZKDatabase(quorumPeer.getTxnFactory()));
+          quorumPeer.setQuorumVerifier(config.getQuorumVerifier(), false);
+          quorumPeer.setCnxnFactory(cnxnFactory);
           quorumPeer.setLearnerType(config.getPeerType());
-  
+          
           quorumPeer.start();
           quorumPeer.join();
       } catch (InterruptedException e) {

Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumZooKeeperServer.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumZooKeeperServer.java?rev=1328991&r1=1328990&r2=1328991&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumZooKeeperServer.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumZooKeeperServer.java Sun Apr 22 23:20:13 2012
@@ -49,10 +49,9 @@ public abstract class QuorumZooKeeperSer
         pwriter.print("electionAlg=");
         pwriter.println(self.getElectionType());
         pwriter.print("electionPort=");
-        pwriter.println(self.quorumPeers.get(self.getId()).electionAddr
-                .getPort());
+        pwriter.println(self.getElectionAddress().getPort());
         pwriter.print("quorumPort=");
-        pwriter.println(self.quorumPeers.get(self.getId()).addr.getPort());
+        pwriter.println(self.getQuorumAddress().getPort());
         pwriter.print("peerType=");
         pwriter.println(self.getLearnerType().ordinal());
     }

Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/flexible/QuorumHierarchical.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/flexible/QuorumHierarchical.java?rev=1328991&r1=1328990&r2=1328991&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/flexible/QuorumHierarchical.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/flexible/QuorumHierarchical.java Sun Apr 22 23:20:13 2012
@@ -21,8 +21,10 @@ package org.apache.zookeeper.server.quor
 import java.io.File;
 import java.io.FileInputStream;
 import java.io.IOException;
-import java.util.HashSet;
+import java.io.StringWriter;
+import java.util.Set;
 import java.util.HashMap;
+import java.util.Map;
 import java.util.Properties;
 import java.util.Map.Entry;
 
@@ -30,6 +32,8 @@ import java.util.Map.Entry;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.zookeeper.server.quorum.QuorumPeer.LearnerType;
+import org.apache.zookeeper.server.quorum.QuorumPeer.QuorumServer;
 import org.apache.zookeeper.server.quorum.QuorumPeerConfig.ConfigException;
 
 
@@ -65,12 +69,53 @@ import org.apache.zookeeper.server.quoru
 public class QuorumHierarchical implements QuorumVerifier {
     private static final Logger LOG = LoggerFactory.getLogger(QuorumHierarchical.class);
 
-    HashMap<Long, Long> serverWeight;
-    HashMap<Long, Long> serverGroup;
-    HashMap<Long, Long> groupWeight;
+    private HashMap<Long, Long> serverWeight = new HashMap<Long, Long>();
+    private HashMap<Long, Long> serverGroup = new HashMap<Long, Long>();
+    private HashMap<Long, Long> groupWeight = new HashMap<Long, Long>();
     
-    int numGroups;
+    private int numGroups = 0;
    
+    private Map<Long, QuorumServer> allMembers = new HashMap<Long, QuorumServer>();
+    private Map<Long, QuorumServer> participatingMembers = new HashMap<Long, QuorumServer>();
+    private Map<Long, QuorumServer> observingMembers = new HashMap<Long, QuorumServer>();
+    
+    private long version = 0;
+    
+    public int hashCode() {
+         assert false : "hashCode not designed";
+         return 42; // any arbitrary constant will do 
+    }
+    
+   public boolean equals(Object o){
+       if (!(o instanceof QuorumHierarchical)) {
+           return false;           
+       }       
+       QuorumHierarchical qm = (QuorumHierarchical)o;
+       if (qm.getVersion() == version) return true;
+       if ((allMembers.size()!=qm.getAllMembers().size()) ||
+           (serverWeight.size() != qm.serverWeight.size()) ||
+           (groupWeight.size() != qm.groupWeight.size()) ||
+            (serverGroup.size() != qm.serverGroup.size())) {
+           return false;
+       }   
+       for (QuorumServer qs: allMembers.values()){
+           QuorumServer qso = qm.getAllMembers().get(qs.id);
+           if (qso == null || !qs.equals(qso)) return false;
+       }
+       for (Long sid: serverWeight.keySet()){
+           if (serverWeight.get(sid) != qm.serverWeight.get(sid))
+               return false;
+       }
+       for (Long sid: groupWeight.keySet()){
+           if (groupWeight.get(sid) != qm.groupWeight.get(sid))
+               return false;
+       }
+       for (Long sid: serverGroup.keySet()){
+           if (serverGroup.get(sid) != qm.serverGroup.get(sid))
+               return false;
+       }
+       return true;
+   }
     /**
      * This contructor requires the quorum configuration
      * to be declared in a separate file, and it takes the
@@ -78,12 +123,7 @@ public class QuorumHierarchical implemen
      */
     public QuorumHierarchical(String filename)
     throws ConfigException {
-        this.serverWeight = new HashMap<Long, Long>();
-        this.serverGroup = new HashMap<Long, Long>();
-        this.groupWeight = new HashMap<Long, Long>();
-        this.numGroups = 0;
-        
-        readConfigFile(filename);
+        readConfigFile(filename);    
     }
     
     /**
@@ -91,40 +131,11 @@ public class QuorumHierarchical implemen
      * it in the unit test for this feature.
      */
     
-    public QuorumHierarchical(Properties qp)
-    throws ConfigException {
-        this.serverWeight = new HashMap<Long, Long>();
-        this.serverGroup = new HashMap<Long, Long>();
-        this.groupWeight = new HashMap<Long, Long>();
-        this.numGroups = 0;
-        
+    public QuorumHierarchical(Properties qp) throws ConfigException {
         parse(qp);
-        
         LOG.info(serverWeight.size() + ", " + serverGroup.size() + ", " + groupWeight.size());
     }
-    
-   /**
-    *  This contructor takes the two hash maps needed to enable 
-    *  validating quorums. We use it with QuorumPeerConfig. That is,
-    *  we declare weights and groups in the server configuration
-    *  file along with the other parameters.
-    * @param numGroups
-    * @param serverWeight
-    * @param serverGroup
-    */
-    public QuorumHierarchical(int numGroups,
-            HashMap<Long, Long> serverWeight,
-            HashMap<Long, Long> serverGroup)
-    {
-        this.serverWeight = serverWeight;
-        this.serverGroup = serverGroup;
-        this.groupWeight = new HashMap<Long, Long>();
-        
-        this.numGroups = numGroups;
-        computeGroupWeight();   
-    }
-    
-    
+  
     /**
      * Returns the weight of a server.
      * 
@@ -170,12 +181,25 @@ public class QuorumHierarchical implemen
     
     /**
      * Parse properties if configuration given in a separate file.
+     * Assumes that allMembers has been already assigned
+     * @throws ConfigException 
      */
-    private void parse(Properties quorumProp){
+    private void parse(Properties quorumProp) throws ConfigException{
         for (Entry<Object, Object> entry : quorumProp.entrySet()) {
             String key = entry.getKey().toString();
             String value = entry.getValue().toString(); 
-            if (key.startsWith("group")) {
+            
+            if (key.startsWith("server.")) {
+                int dot = key.indexOf('.');
+                long sid = Long.parseLong(key.substring(dot + 1));
+                QuorumServer qs = new QuorumServer(sid, value);
+                allMembers.put(Long.valueOf(sid), qs);  
+                if (qs.type == LearnerType.PARTICIPANT) 
+                   participatingMembers.put(Long.valueOf(sid), qs);
+                else {
+                   observingMembers.put(Long.valueOf(sid), qs);
+                }
+            } else if (key.startsWith("group")) {
                 int dot = key.indexOf('.');
                 long gid = Long.parseLong(key.substring(dot + 1));
                 
@@ -184,7 +208,10 @@ public class QuorumHierarchical implemen
                 String parts[] = value.split(":");
                 for(String s : parts){
                     long sid = Long.parseLong(s);
-                    serverGroup.put(sid, gid);
+                    if(serverGroup.containsKey(sid))
+                        throw new ConfigException("Server " + sid + "is in multiple groups");
+                    else
+                        serverGroup.put(sid, gid);
                 }
                     
                 
@@ -192,12 +219,76 @@ public class QuorumHierarchical implemen
                 int dot = key.indexOf('.');
                 long sid = Long.parseLong(key.substring(dot + 1));
                 serverWeight.put(sid, Long.parseLong(value));
-            }
+            } else if (key.equals("version")){
+               version = Long.parseLong(value, 16);
+            }        
         }
         
+        for (QuorumServer qs: allMembers.values()){
+           Long id = qs.id;
+           if (qs.type == LearnerType.PARTICIPANT){
+               if (!serverGroup.containsKey(id)) 
+                   throw new ConfigException("Server " + id + "is not in a group");
+               if (!serverWeight.containsKey(id))
+                   serverWeight.put(id, (long) 1);
+            }
+        }
+           
+           
         computeGroupWeight();
     }
     
+    public Map<Long, QuorumServer> getAllMembers() { 
+       return allMembers;
+    }
+    public byte[] toByteArray(){
+       StringWriter sw = new StringWriter();
+       
+       for (QuorumServer member: getAllMembers().values()){            
+               String key = "server." + member.id;
+            String value = member.toString();
+            sw.append(key);
+            sw.append('=');
+            sw.append(value);
+            sw.append('\n');                       
+       }
+       
+       Map<Long, String> groups = new HashMap<Long, String>();
+       for (Entry<Long, Long> pair: serverGroup.entrySet()) {
+           Long sid = pair.getKey();
+           Long gid = pair.getValue();
+           String str = groups.get(gid);
+           if (str == null) str = sid.toString();
+           else str = str.concat(":").concat(sid.toString());
+           groups.put(gid, str);
+       }
+       
+       for (Entry<Long, String> pair: groups.entrySet()) {
+           Long gid = pair.getKey();
+           String key = "group." + gid.toString();
+            String value = pair.getValue();
+            sw.append(key);
+            sw.append('=');
+            sw.append(value);
+            sw.append('\n');           
+       }
+   
+   
+       for (Entry<Long, Long> pair: serverWeight.entrySet()) {
+           Long sid = pair.getKey();
+           String key = "weight." + sid.toString();
+            String value = pair.getValue().toString();
+            sw.append(key);
+            sw.append('=');
+            sw.append(value);
+            sw.append('\n');           
+       }
+       
+       sw.append("version=" + Long.toHexString(version));
+       
+       return sw.toString().getBytes();        
+    }
+    
     /**
      * This method pre-computes the weights of groups to speed up processing
      * when validating a given set. We compute the weights of groups in 
@@ -229,7 +320,7 @@ public class QuorumHierarchical implemen
     /**
      * Verifies if a given set is a quorum.
      */
-    public boolean containsQuorum(HashSet<Long> set){
+    public boolean containsQuorum(Set<Long> set){
         HashMap<Long, Long> expansion = new HashMap<Long, Long>();
         
         /*
@@ -240,6 +331,7 @@ public class QuorumHierarchical implemen
         
         for(long sid : set){
             Long gid = serverGroup.get(sid);
+            if (gid == null) continue;
             if(!expansion.containsKey(gid))
                 expansion.put(gid, serverWeight.get(sid));
             else {
@@ -267,5 +359,16 @@ public class QuorumHierarchical implemen
             LOG.debug("Negative set size: " + set.size());
             return false;
         }
-    }
+    }  
+    public Map<Long, QuorumServer> getVotingMembers() {        
+       return participatingMembers;
+   }
+
+   public Map<Long, QuorumServer> getObservingMembers() {      
+       return observingMembers;
+   }
+
+   public long getVersion() {
+       return version;
+   }          
 }

Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/flexible/QuorumMaj.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/flexible/QuorumMaj.java?rev=1328991&r1=1328990&r2=1328991&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/flexible/QuorumMaj.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/flexible/QuorumMaj.java Sun Apr 22 23:20:13 2012
@@ -18,40 +18,136 @@
 
 package org.apache.zookeeper.server.quorum.flexible;
 
-import java.util.HashSet;
+import java.util.HashMap;
+import java.util.Set;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Map.Entry;
 
+import org.apache.zookeeper.server.quorum.QuorumPeer.LearnerType;
+import org.apache.zookeeper.server.quorum.QuorumPeer.QuorumServer;
+import org.apache.zookeeper.server.quorum.QuorumPeerConfig.ConfigException;
 
 /**
- * This class implements a validator for majority quorums. The 
- * implementation is straightforward.
- *
+ * This class implements a validator for majority quorums. The implementation is
+ * straightforward.
+ * 
  */
 public class QuorumMaj implements QuorumVerifier {
-    int half;
-    
+    private Map<Long, QuorumServer> allMembers = new HashMap<Long, QuorumServer>();
+    private HashMap<Long, QuorumServer> votingMembers = new HashMap<Long, QuorumServer>();
+    private HashMap<Long, QuorumServer> observingMembers = new HashMap<Long, QuorumServer>();
+    private long version = 0;
+    private int half;
+
+    public int hashCode() {
+        assert false : "hashCode not designed";
+        return 42; // any arbitrary constant will do
+    }
+
+    public boolean equals(Object o) {
+        if (!(o instanceof QuorumMaj)) {
+            return false;
+        }
+        QuorumMaj qm = (QuorumMaj) o;
+        if (qm.getVersion() == version)
+            return true;
+        if (allMembers.size() != qm.getAllMembers().size())
+            return false;
+        for (QuorumServer qs : allMembers.values()) {
+            QuorumServer qso = qm.getAllMembers().get(qs.id);
+            if (qso == null || !qs.equals(qso))
+                return false;
+        }
+        return true;
+    }
+
     /**
      * Defines a majority to avoid computing it every time.
      * 
-     * @param n number of servers
      */
-    public QuorumMaj(int n){
-        this.half = n/2;
+    public QuorumMaj(Map<Long, QuorumServer> allMembers) {
+        this.allMembers = allMembers;
+        for (QuorumServer qs : allMembers.values()) {
+            if (qs.type == LearnerType.PARTICIPANT) {
+                votingMembers.put(Long.valueOf(qs.id), qs);
+            } else {
+                observingMembers.put(Long.valueOf(qs.id), qs);
+            }
+        }
+        half = votingMembers.size() / 2;
     }
-    
+
+    public QuorumMaj(Properties props) throws ConfigException {
+        for (Entry<Object, Object> entry : props.entrySet()) {
+            String key = entry.getKey().toString();
+            String value = entry.getValue().toString();
+
+            if (key.startsWith("server.")) {
+                int dot = key.indexOf('.');
+                long sid = Long.parseLong(key.substring(dot + 1));
+                QuorumServer qs = new QuorumServer(sid, value);
+                allMembers.put(Long.valueOf(sid), qs);
+                if (qs.type == LearnerType.PARTICIPANT)
+                    votingMembers.put(Long.valueOf(sid), qs);
+                else {
+                    observingMembers.put(Long.valueOf(sid), qs);
+                }
+            } else if (key.equals("version")) {
+                version = Long.parseLong(value, 16);
+            }
+        }
+        half = votingMembers.size() / 2;
+    }
+
     /**
      * Returns weight of 1 by default.
      * 
-     * @param id 
+     * @param id
      */
-    public long getWeight(long id){
+    public long getWeight(long id) {
         return (long) 1;
     }
-    
+
+    public byte[] toByteArray() {
+        StringBuilder sw = new StringBuilder();
+
+        for (QuorumServer member : getAllMembers().values()) {
+            String key = "server." + member.id;
+            String value = member.toString();
+            sw.append(key);
+            sw.append('=');
+            sw.append(value);
+            sw.append('\n');
+        }
+        String hexVersion = Long.toHexString(version);
+        sw.append("version=");
+        sw.append(hexVersion);
+        return sw.toString().getBytes();
+    }
+
     /**
-     * Verifies if a set is a majority.
+     * Verifies if a set is a majority. Assumes that ackSet contains acks only
+     * from votingMembers
      */
-    public boolean containsQuorum(HashSet<Long> set){
-        return (set.size() > half);
+    public boolean containsQuorum(Set<Long> ackSet) {
+        return (ackSet.size() > half);
+    }
+
+    public Map<Long, QuorumServer> getAllMembers() {
+        return allMembers;
+    }
+
+    public Map<Long, QuorumServer> getVotingMembers() {
+        return votingMembers;
     }
-    
+
+    public Map<Long, QuorumServer> getObservingMembers() {
+        return observingMembers;
+    }
+
+    public long getVersion() {
+        return version;
+    }
+
 }

Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/flexible/QuorumVerifier.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/flexible/QuorumVerifier.java?rev=1328991&r1=1328990&r2=1328991&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/flexible/QuorumVerifier.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/flexible/QuorumVerifier.java Sun Apr 22 23:20:13 2012
@@ -18,7 +18,10 @@
 
 package org.apache.zookeeper.server.quorum.flexible;
 
-import java.util.HashSet;
+import java.util.Set;
+import java.util.Map;
+
+import org.apache.zookeeper.server.quorum.QuorumPeer.QuorumServer;
 
 /**
  * All quorum validators have to implement a method called
@@ -29,5 +32,11 @@ import java.util.HashSet;
 
 public interface QuorumVerifier {
     long getWeight(long id);
-    boolean containsQuorum(HashSet<Long> set);
+    boolean containsQuorum(Set<Long> set);
+    long getVersion();
+    Map<Long, QuorumServer> getAllMembers();
+    Map<Long, QuorumServer> getVotingMembers();
+    Map<Long, QuorumServer> getObservingMembers();
+    boolean equals(Object o);
+    byte[] toByteArray();
 }

Modified: zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/QuorumPeerMainTest.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/QuorumPeerMainTest.java?rev=1328991&r1=1328990&r2=1328991&view=diff
==============================================================================
--- zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/QuorumPeerMainTest.java (original)
+++ zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/QuorumPeerMainTest.java Sun Apr 22 23:20:13 2012
@@ -63,10 +63,10 @@ public class QuorumPeerMainTest extends 
         final int CLIENT_PORT_QP2 = PortAssignment.unique();
 
         String quorumCfgSection =
-                "server.1=127.0.0.1:" + PortAssignment.unique()
-                + ":" + PortAssignment.unique()
-                + "\nserver.2=127.0.0.1:" + PortAssignment.unique()
-                + ":" + PortAssignment.unique();
+            "server.1=127.0.0.1:" + PortAssignment.unique()
+            + ":" + PortAssignment.unique() + ";" + CLIENT_PORT_QP1
+            + "\nserver.2=127.0.0.1:" + PortAssignment.unique() 
+            + ":" + PortAssignment.unique() + ";" + CLIENT_PORT_QP2;
 
         MainThread q1 = new MainThread(1, CLIENT_PORT_QP1, quorumCfgSection);
         MainThread q2 = new MainThread(2, CLIENT_PORT_QP2, quorumCfgSection);
@@ -117,9 +117,9 @@ public class QuorumPeerMainTest extends 
         final int SERVER_COUNT = 3;
         final int clientPorts[] = new int[SERVER_COUNT];
         StringBuilder sb = new StringBuilder();
-        for (int i = 0; i < SERVER_COUNT; i++) {
-            clientPorts[i] = PortAssignment.unique();
-            sb.append("server." + i + "=127.0.0.1:" + PortAssignment.unique() + ":" + PortAssignment.unique() + "\n");
+        for(int i = 0; i < SERVER_COUNT; i++) {
+               clientPorts[i] = PortAssignment.unique();
+               sb.append("server."+i+"=127.0.0.1:"+PortAssignment.unique()+":"+PortAssignment.unique()+";"+clientPorts[i]+"\n");
         }
         String quorumCfgSection = sb.toString();
 
@@ -351,7 +351,7 @@ public class QuorumPeerMainTest extends 
         StringBuilder sb = new StringBuilder();
         for (int i = 0; i < SERVER_COUNT; i++) {
             clientPorts[i] = PortAssignment.unique();
-            sb.append("server." + i + "=127.0.0.1:" + PortAssignment.unique() + ":" + PortAssignment.unique() + "\n");
+            sb.append("server."+i+"=127.0.0.1:"+PortAssignment.unique()+":"+PortAssignment.unique()+";"+clientPorts[i]+"\n");
         }
         String quorumCfgSection = sb.toString();
 
@@ -388,12 +388,13 @@ public class QuorumPeerMainTest extends 
 
         try {
             final int CLIENT_PORT_QP1 = PortAssignment.unique();
+            final int CLIENT_PORT_QP2 = PortAssignment.unique();
 
             String quorumCfgSection =
-                    "server.1=127.0.0.1:" + PortAssignment.unique()
-                    + ":" + PortAssignment.unique()
-                    + "\nserver.2=fee.fii.foo.fum:" + PortAssignment.unique()
-                    + ":" + PortAssignment.unique();
+                "server.1=127.0.0.1:" + PortAssignment.unique()
+                + ":" + PortAssignment.unique() + ";" + CLIENT_PORT_QP1
+                + "\nserver.2=fee.fii.foo.fum:" + PortAssignment.unique()
+                + ":" + PortAssignment.unique() + ";" + CLIENT_PORT_QP2;
 
             MainThread q1 = new MainThread(1, CLIENT_PORT_QP1, quorumCfgSection);
             q1.start();
@@ -452,12 +453,12 @@ public class QuorumPeerMainTest extends 
             final int CLIENT_PORT_QP3 = PortAssignment.unique();
 
             String quorumCfgSection =
-                    "server.1=127.0.0.1:" + PortAssignment.unique()
-                    + ":" + PortAssignment.unique()
-                    + "\nserver.2=127.0.0.1:" + PortAssignment.unique()
-                    + ":" + PortAssignment.unique()
-                    + "\nserver.3=127.0.0.1:" + PortAssignment.unique()
-                    + ":" + PortAssignment.unique() + ":observer";
+                "server.1=127.0.0.1:" + PortAssignment.unique()
+                + ":" + PortAssignment.unique() + ";" + CLIENT_PORT_QP1
+                + "\nserver.2=127.0.0.1:" + PortAssignment.unique()
+                + ":" + PortAssignment.unique() + ";" + CLIENT_PORT_QP2
+                + "\nserver.3=127.0.0.1:" + PortAssignment.unique()
+                + ":" + PortAssignment.unique() + ":observer" + ";" + CLIENT_PORT_QP3;
 
             MainThread q1 = new MainThread(1, CLIENT_PORT_QP1, quorumCfgSection);
             MainThread q2 = new MainThread(2, CLIENT_PORT_QP2, quorumCfgSection);
@@ -529,11 +530,11 @@ public class QuorumPeerMainTest extends 
         int electionPort1 = PortAssignment.unique();
         int electionPort2 = PortAssignment.unique();
         String quorumCfgSection =
-                "server.1=127.0.0.1:" + PortAssignment.unique()
-                + ":" + electionPort1
-                + "\nserver.2=127.0.0.1:" + PortAssignment.unique()
-                + ":" + electionPort2;
-
+            "server.1=127.0.0.1:" + PortAssignment.unique()
+            + ":" + electionPort1 + ";" + CLIENT_PORT_QP1
+            + "\nserver.2=127.0.0.1:" + PortAssignment.unique()
+            + ":" +  electionPort2 + ";" + CLIENT_PORT_QP2;
+        
         MainThread q1 = new MainThread(1, CLIENT_PORT_QP1, quorumCfgSection);
         MainThread q2 = new MainThread(2, CLIENT_PORT_QP2, quorumCfgSection);
         q1.start();
@@ -593,10 +594,10 @@ public class QuorumPeerMainTest extends 
             final int CLIENT_PORT_QP2 = PortAssignment.unique();
 
             String quorumCfgSection =
-                    "server.1=127.0.0.1:" + PortAssignment.unique()
-                    + ":" + PortAssignment.unique()
-                    + "\nserver.2=127.0.0.1:" + PortAssignment.unique()
-                    + ":" + PortAssignment.unique();
+                "server.1=127.0.0.1:" + PortAssignment.unique()
+                + ":" + PortAssignment.unique() + ";" + CLIENT_PORT_QP1
+                + "\nserver.2=127.0.0.1:" + PortAssignment.unique()
+                + ":" + PortAssignment.unique() + ";" + CLIENT_PORT_QP2;
 
             MainThread q1 = new MainThread(1, CLIENT_PORT_QP1, quorumCfgSection);
             MainThread q2 = new MainThread(2, CLIENT_PORT_QP2, quorumCfgSection);
@@ -646,10 +647,10 @@ public class QuorumPeerMainTest extends 
         long maxwait = 3000;
         final int CLIENT_PORT_QP1 = PortAssignment.unique();
         String quorumCfgSection =
-                "server.1=127.0.0.1:" + PortAssignment.unique()
-                + ":" + PortAssignment.unique()
-                + "\nserver.2=127.0.0.1:" + PortAssignment.unique()
-                + ":" + PortAssignment.unique();
+            "server.1=127.0.0.1:" + PortAssignment.unique()
+            + ":" + PortAssignment.unique() + ";" + CLIENT_PORT_QP1
+            + "\nserver.2=127.0.0.1:" + PortAssignment.unique()
+            + ":" + PortAssignment.unique() + ";" + PortAssignment.unique();
         MainThread q1 = new MainThread(1, CLIENT_PORT_QP1, quorumCfgSection);
         q1.start();
         // Let the notifications timeout

Modified: zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/QuorumPeerTestBase.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/QuorumPeerTestBase.java?rev=1328991&r1=1328990&r2=1328991&view=diff
==============================================================================
--- zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/QuorumPeerTestBase.java (original)
+++ zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/QuorumPeerTestBase.java Sun Apr 22 23:20:13 2012
@@ -56,14 +56,17 @@ public class QuorumPeerTestBase extends 
 
     public static class MainThread implements Runnable {
         final File confFile;
+        final File dynamicConfigFile;
         volatile TestQPMain main;
 
         public MainThread(int myid, int clientPort, String quorumCfgSection)
             throws IOException
         {
             File tmpDir = ClientBase.createTmpDir();
+            LOG.info("id = " + myid + " tmpDir = " + tmpDir);
             confFile = new File(tmpDir, "zoo.cfg");
-
+            dynamicConfigFile = new File(tmpDir, "zoo.dynamic");
+            
             FileWriter fwriter = new FileWriter(confFile);
             fwriter.write("tickTime=4000\n");
             fwriter.write("initLimit=10\n");
@@ -76,13 +79,22 @@ public class QuorumPeerTestBase extends 
 
             // Convert windows path to UNIX to avoid problems with "\"
             String dir = dataDir.toString();
+            String dynamicConfigFilename = dynamicConfigFile.toString();
             String osname = java.lang.System.getProperty("os.name");
             if (osname.toLowerCase().contains("windows")) {
                 dir = dir.replace('\\', '/');
+                dynamicConfigFilename = dynamicConfigFilename.replace('\\', '/');
             }
             fwriter.write("dataDir=" + dir + "\n");
             
             fwriter.write("clientPort=" + clientPort + "\n");
+            
+            fwriter.write("dynamicConfigFile=" + dynamicConfigFilename + "\n");
+            
+            fwriter.flush();
+            fwriter.close();
+
+            fwriter = new FileWriter(dynamicConfigFile);
             fwriter.write(quorumCfgSection + "\n");
             fwriter.flush();
             fwriter.close();

Modified: zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/Zab1_0Test.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/Zab1_0Test.java?rev=1328991&r1=1328990&r2=1328991&view=diff
==============================================================================
--- zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/Zab1_0Test.java (original)
+++ zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/Zab1_0Test.java Sun Apr 22 23:20:13 2012
@@ -859,16 +859,18 @@ public class Zab1_0Test {
         return new ConversableFollower(peer, zk);
     }
 
-    private QuorumPeer createQuorumPeer(File tmpDir) throws IOException,
-            FileNotFoundException {
+    private QuorumPeer createQuorumPeer(File tmpDir) throws IOException, FileNotFoundException {
+        HashMap<Long, QuorumServer> peers = new HashMap<Long, QuorumServer>();
         QuorumPeer peer = new QuorumPeer();
         peer.syncLimit = 2;
         peer.initLimit = 2;
         peer.tickTime = 2000;
-        peer.quorumPeers = new HashMap<Long, QuorumServer>();
-        peer.quorumPeers.put(1L, new QuorumServer(0, new InetSocketAddress(33221)));
-        peer.quorumPeers.put(1L, new QuorumServer(1, new InetSocketAddress(33223)));
-        peer.setQuorumVerifier(new QuorumMaj(3));
+        
+        peers.put(0L, new QuorumServer(0, new InetSocketAddress(33221), new InetSocketAddress(33231), new InetSocketAddress(33241)));
+        peers.put(1L, new QuorumServer(1, new InetSocketAddress(33223), new InetSocketAddress(33233), new InetSocketAddress(33243))); 
+        peers.put(2L, new QuorumServer(2, new InetSocketAddress(33224), new InetSocketAddress(33234), new InetSocketAddress(33245))); 
+        
+        peer.setQuorumVerifier(new QuorumMaj(peers), false);
         peer.setCnxnFactory(new NullServerCnxnFactory());
         File version2 = new File(tmpDir, "version-2");
         version2.mkdir();