You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by br...@apache.org on 2013/03/07 07:01:50 UTC

svn commit: r1453693 [3/5] - in /zookeeper/trunk: ./ src/ src/c/ src/c/include/ src/c/src/ src/c/tests/ src/java/main/org/apache/zookeeper/ src/java/main/org/apache/zookeeper/cli/ src/java/main/org/apache/zookeeper/common/ src/java/main/org/apache/zook...

Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/NIOServerCnxnFactory.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/NIOServerCnxnFactory.java?rev=1453693&r1=1453692&r2=1453693&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/NIOServerCnxnFactory.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/NIOServerCnxnFactory.java Thu Mar  7 06:01:49 2013
@@ -185,7 +185,8 @@ public class NIOServerCnxnFactory extend
         private final RateLogger acceptErrorLogger = new RateLogger(LOG);
         private final Collection<SelectorThread> selectorThreads;
         private Iterator<SelectorThread> selectorIterator;
-
+        private volatile boolean reconfiguring = false;
+        
         public AcceptThread(ServerSocketChannel ss, InetSocketAddress addr,
                 Set<SelectorThread> selectorThreads) throws IOException {
             super("NIOServerCxnFactory.AcceptThread:" + addr);
@@ -212,10 +213,16 @@ public class NIOServerCnxnFactory extend
                 closeSelector();
                 // This will wake up the selector threads, and tell the
                 // worker thread pool to begin shutdown.
-                NIOServerCnxnFactory.this.stop();
+            	if (!reconfiguring) {                    
+                    NIOServerCnxnFactory.this.stop();
+                }
                 LOG.info("accept thread exitted run method");
             }
         }
+        
+        public void setReconfiguring() {
+        	reconfiguring = true;
+        }
 
         private void select() {
             try {
@@ -678,7 +685,31 @@ public class NIOServerCnxnFactory extend
         ss.configureBlocking(false);
         acceptThread = new AcceptThread(ss, addr, selectorThreads);
     }
-
+   
+    @Override
+    public void reconfigure(InetSocketAddress addr){
+        ServerSocketChannel oldSS = ss;        
+        try {
+           this.ss = ServerSocketChannel.open();
+           ss.socket().setReuseAddress(true);
+           LOG.info("binding to port " + addr);
+           ss.socket().bind(addr);
+           ss.configureBlocking(false);
+           acceptThread.setReconfiguring();
+           oldSS.close();           
+           acceptThread.wakeupSelector();
+           try {
+			  acceptThread.join();
+		   } catch (InterruptedException e) {
+			   LOG.error("Error joining old acceptThread when reconfiguring client port " + e.getMessage());
+		   }
+           acceptThread = new AcceptThread(ss, addr, selectorThreads);
+           acceptThread.start();
+        } catch(IOException e) {
+           LOG.error("Error reconfiguring client port to " + addr + " " + e.getMessage());
+        }
+    }
+    
     /** {@inheritDoc} */
     public int getMaxClientCnxnsPerHost() {
         return maxClientCnxns;

Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/NettyServerCnxnFactory.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/NettyServerCnxnFactory.java?rev=1453693&r1=1453692&r2=1453693&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/NettyServerCnxnFactory.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/NettyServerCnxnFactory.java Thu Mar  7 06:01:49 2013
@@ -360,7 +360,16 @@ public class NettyServerCnxnFactory exte
         LOG.info("binding to port " + localAddress);
         parentChannel = bootstrap.bind(localAddress);
     }
-
+    
+    public void reconfigure(InetSocketAddress addr) 
+    {  
+       Channel oldChannel = parentChannel;
+       LOG.info("binding to port " + addr);
+        parentChannel = bootstrap.bind(addr);
+        localAddress = addr;  
+        oldChannel.close();
+    }
+    
     @Override
     public void startup(ZooKeeperServer zks) throws IOException,
             InterruptedException {

Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/PrepRequestProcessor.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/PrepRequestProcessor.java?rev=1453693&r1=1453692&r2=1453693&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/PrepRequestProcessor.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/PrepRequestProcessor.java Thu Mar  7 06:01:49 2013
@@ -20,13 +20,17 @@ package org.apache.zookeeper.server;
 
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
+import java.io.StringReader;
 import java.nio.ByteBuffer;
+import java.util.Arrays;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.ListIterator;
+import java.util.Map;
+import java.util.Properties;
 import java.util.Locale;
 import java.util.Map;
 import java.util.Set;
@@ -46,18 +50,26 @@ import org.apache.zookeeper.ZooDefs;
 import org.apache.zookeeper.KeeperException.Code;
 import org.apache.zookeeper.ZooDefs.OpCode;
 import org.apache.zookeeper.common.PathUtils;
+import org.apache.zookeeper.common.StringUtils;
 import org.apache.zookeeper.data.ACL;
 import org.apache.zookeeper.data.Id;
 import org.apache.zookeeper.data.StatPersisted;
 import org.apache.zookeeper.proto.CreateRequest;
 import org.apache.zookeeper.proto.Create2Request;
 import org.apache.zookeeper.proto.DeleteRequest;
+import org.apache.zookeeper.proto.ReconfigRequest;
 import org.apache.zookeeper.proto.SetACLRequest;
 import org.apache.zookeeper.proto.SetDataRequest;
 import org.apache.zookeeper.proto.CheckVersionRequest;
 import org.apache.zookeeper.server.ZooKeeperServer.ChangeRecord;
 import org.apache.zookeeper.server.auth.AuthenticationProvider;
 import org.apache.zookeeper.server.auth.ProviderRegistry;
+import org.apache.zookeeper.server.quorum.LeaderZooKeeperServer;
+import org.apache.zookeeper.server.quorum.QuorumPeer.QuorumServer;
+import org.apache.zookeeper.server.quorum.QuorumPeerConfig;
+import org.apache.zookeeper.server.quorum.QuorumPeerConfig.ConfigException;
+import org.apache.zookeeper.server.quorum.flexible.QuorumMaj;
+import org.apache.zookeeper.server.quorum.flexible.QuorumVerifier;
 import org.apache.zookeeper.server.quorum.Leader.XidRolloverException;
 import org.apache.zookeeper.txn.CreateSessionTxn;
 import org.apache.zookeeper.txn.CreateTxn;
@@ -481,6 +493,114 @@ public class PrepRequestProcessor extend
                 nodeRecord.stat.setVersion(newVersion);
                 addChangeRecord(nodeRecord);
                 break;
+            case OpCode.reconfig:
+                zks.sessionTracker.checkSession(request.sessionId, request.getOwner());
+                ReconfigRequest reconfigRequest = (ReconfigRequest)record;                             
+                LeaderZooKeeperServer lzks = (LeaderZooKeeperServer)zks;
+                QuorumVerifier lastSeenQV = lzks.self.getLastSeenQuorumVerifier();                                                                                 
+                // check that there's no reconfig in progress
+                if (lastSeenQV.getVersion()!=lzks.self.getQuorumVerifier().getVersion()) {
+                       throw new KeeperException.ReconfigInProgress(); 
+                }
+                long configId = reconfigRequest.getCurConfigId();
+  
+                if (configId != -1 && configId!=lzks.self.getLastSeenQuorumVerifier().getVersion()){
+                   String msg = "Reconfiguration from version " + configId + " failed -- last seen version is " + lzks.self.getLastSeenQuorumVerifier().getVersion();
+                   throw new KeeperException.BadVersionException(msg);
+                }
+
+                String newMembers = reconfigRequest.getNewMembers();
+                
+                if (newMembers != null) { //non-incremental membership change                  
+                   LOG.info("Non-incremental reconfig");
+                
+                   // Input may be delimited by either commas or newlines so convert to common newline separated format
+                   newMembers = newMembers.replaceAll(",", "\n");
+                   
+                   try{
+                       Properties props = new Properties();                        
+                       props.load(new StringReader(newMembers));
+                       QuorumPeerConfig config = new QuorumPeerConfig();
+                       config.parseDynamicConfig(props, lzks.self.getElectionType(), true);
+                       request.qv = config.getQuorumVerifier();
+                       request.qv.setVersion(request.getHdr().getZxid());
+                   } catch (IOException e) {
+                       throw new KeeperException.BadArgumentsException(e.getMessage());
+                   } catch (ConfigException e) {
+                       throw new KeeperException.BadArgumentsException(e.getMessage());
+                   }                   
+                } else { //incremental change - must be a majority quorum system   
+                   LOG.info("Incremental reconfig");
+                   
+                   List<String> joiningServers = null; 
+                   String joiningServersString = reconfigRequest.getJoiningServers();
+                   if (joiningServersString != null)
+                   {
+                       joiningServers = StringUtils.split(joiningServersString,",");
+                   }
+                   
+                   List<String> leavingServers = null;
+                   String leavingServersString = reconfigRequest.getLeavingServers();
+                   if (leavingServersString != null)
+                   {
+                       leavingServers = StringUtils.split(leavingServersString, ",");
+                   }
+                   
+                   if (!(lastSeenQV instanceof QuorumMaj)) {
+                           String msg = "Incremental reconfiguration requested but last configuration seen has a non-majority quorum system";
+                           LOG.warn(msg);
+                           throw new KeeperException.BadArgumentsException(msg);               
+                   }
+                   Map<Long, QuorumServer> nextServers = new HashMap<Long, QuorumServer>(lastSeenQV.getAllMembers());
+                   try {                           
+                       if (leavingServers != null) {
+                           for (String leaving: leavingServers){
+                               long sid = Long.parseLong(leaving);
+                               nextServers.remove(sid);
+                           } 
+                       }
+                       if (joiningServers != null) {
+                           for (String joiner: joiningServers){
+                        	   // joiner should have the following format: server.x = server_spec;client_spec               
+                        	   String[] parts = StringUtils.split(joiner, "=").toArray(new String[0]);
+                               if (parts.length != 2) {
+                                   throw new KeeperException.BadArgumentsException("Wrong format of server string");
+                               }
+                               // extract server id x from first part of joiner: server.x
+                               Long sid = Long.parseLong(parts[0].substring(parts[0].lastIndexOf('.') + 1));
+                               QuorumServer qs = new QuorumServer(sid, parts[1]);
+                               if (qs.clientAddr == null || qs.electionAddr == null || qs.addr == null) {
+                                   throw new KeeperException.BadArgumentsException("Wrong format of server string - each server should have 3 ports specified"); 	   
+                               }
+                               nextServers.remove(qs.id);
+                               nextServers.put(Long.valueOf(qs.id), qs);
+                           }  
+                       }
+                   } catch (ConfigException e){
+                       throw new KeeperException.BadArgumentsException("Reconfiguration failed");
+                   }
+                   request.qv = new QuorumMaj(nextServers);
+                   request.qv.setVersion(request.getHdr().getZxid());
+                }                             
+                if (request.qv.getVotingMembers().size() < 2){
+                   String msg = "Reconfig failed - new configuration must include at least 2 followers";
+                   LOG.warn(msg);
+                   throw new KeeperException.BadArgumentsException(msg);
+               }
+                   
+                if (!lzks.getLeader().isQuorumSynced(request.qv)) {
+                   String msg2 = "Reconfig failed - there must be a connected and synced quorum in new configuration";
+                   LOG.warn(msg2);             
+                   throw new KeeperException.NewConfigNoQuorum();
+                }
+                
+                nodeRecord = getRecordForPath(ZooDefs.CONFIG_NODE);               
+                checkACL(zks, nodeRecord.acl, ZooDefs.Perms.WRITE, request.authInfo);                  
+                request.setTxn(new SetDataTxn(ZooDefs.CONFIG_NODE, request.qv.toString().getBytes(), -1));    
+                nodeRecord = nodeRecord.duplicate(request.getHdr().getZxid());
+                nodeRecord.stat.setVersion(-1);                
+                addChangeRecord(nodeRecord);
+                break;                         
             case OpCode.setACL:
                 zks.sessionTracker.checkSession(request.sessionId, request.getOwner());
                 SetACLRequest setAclRequest = (SetACLRequest)record;
@@ -585,6 +705,11 @@ public class PrepRequestProcessor extend
                 SetDataRequest setDataRequest = new SetDataRequest();                
                 pRequest2Txn(request.type, zks.getNextZxid(), request, setDataRequest, true);
                 break;
+            case OpCode.reconfig:
+                ReconfigRequest reconfigRequest = new ReconfigRequest();
+                ByteBufferInputStream.byteBuffer2Record(request.request, reconfigRequest);
+                pRequest2Txn(request.type, zks.getNextZxid(), request, reconfigRequest, true);
+                break;
             case OpCode.setACL:
                 SetACLRequest setAclRequest = new SetACLRequest();                
                 pRequest2Txn(request.type, zks.getNextZxid(), request, setAclRequest, true);

Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/Request.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/Request.java?rev=1453693&r1=1453692&r2=1453693&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/Request.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/Request.java Thu Mar  7 06:01:49 2013
@@ -8,9 +8,9 @@
  * with the License.  You may obtain a copy of the License at
  *
  *     http://www.apache.org/licenses/LICENSE-2.0
- *
+ *uuuuu
  * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
+ * distributed under the License is distributed on an "/RequuuAS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
@@ -25,6 +25,7 @@ import org.apache.jute.Record;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.ZooDefs.OpCode;
 import org.apache.zookeeper.data.Id;
+import org.apache.zookeeper.server.quorum.flexible.QuorumVerifier;
 import org.apache.zookeeper.txn.TxnHeader;
 
 /**
@@ -80,6 +81,8 @@ public class Request {
 
     private KeeperException e;
 
+    public QuorumVerifier qv = null;
+    
     public Object getOwner() {
         return owner;
     }
@@ -133,6 +136,7 @@ public class Request {
         case OpCode.ping:
         case OpCode.closeSession:
         case OpCode.setWatches:
+        case OpCode.reconfig:
             return true;
         default:
             return false;
@@ -157,6 +161,7 @@ public class Request {
         case OpCode.setData:
         case OpCode.check:
         case OpCode.multi:
+        case OpCode.reconfig:
             return true;
         default:
             return false;
@@ -203,6 +208,8 @@ public class Request {
             return "closeSession";
         case OpCode.error:
             return "error";
+        case OpCode.reconfig:
+           return "reconfig";
         default:
             return "unknown " + op;
         }

Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ServerCnxnFactory.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ServerCnxnFactory.java?rev=1453693&r1=1453692&r2=1453693&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ServerCnxnFactory.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ServerCnxnFactory.java Thu Mar  7 06:01:49 2013
@@ -65,7 +65,10 @@ public abstract class ServerCnxnFactory 
 
     public abstract void configure(InetSocketAddress addr,
                                    int maxClientCnxns) throws IOException;
-
+    
+    public abstract void reconfigure(InetSocketAddress addr);
+    
+    
     protected SaslServerCallbackHandler saslServerCallbackHandler;
     public Login login;
 

Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/TraceFormatter.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/TraceFormatter.java?rev=1453693&r1=1453692&r2=1453693&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/TraceFormatter.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/TraceFormatter.java Thu Mar  7 06:01:49 2013
@@ -61,6 +61,8 @@ public class TraceFormatter {
             return "closeSession";
         case OpCode.error:
             return "error";
+        case OpCode.reconfig:
+           return "reconfig";
         default:
             return "unknown " + op;
         }

Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ZKDatabase.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ZKDatabase.java?rev=1453693&r1=1453692&r2=1453693&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ZKDatabase.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ZKDatabase.java Thu Mar  7 06:01:49 2013
@@ -39,6 +39,7 @@ import org.slf4j.LoggerFactory;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.KeeperException.NoNodeException;
 import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooDefs;
 import org.apache.zookeeper.data.ACL;
 import org.apache.zookeeper.data.Stat;
 import org.apache.zookeeper.server.DataTree.ProcessTxnResult;
@@ -47,6 +48,7 @@ import org.apache.zookeeper.server.persi
 import org.apache.zookeeper.server.quorum.Leader;
 import org.apache.zookeeper.server.quorum.Leader.Proposal;
 import org.apache.zookeeper.server.quorum.QuorumPacket;
+import org.apache.zookeeper.server.quorum.flexible.QuorumVerifier;
 import org.apache.zookeeper.server.util.SerializeUtils;
 import org.apache.zookeeper.txn.TxnHeader;
 
@@ -487,4 +489,18 @@ public class ZKDatabase {
         this.snapLog.close();
     }
 
+    public synchronized void initConfigInZKDatabase(QuorumVerifier qv) {   
+    	if (qv == null) return; // only happens during tests
+        try {
+             if (this.dataTree.getNode(ZooDefs.CONFIG_NODE) == null) {
+            	 // should only happen during upgrade
+            	 LOG.warn("configuration znode missing (hould only happen during upgrade), creating the node");
+                 this.dataTree.addConfigNode();
+             }
+             this.dataTree.setData(ZooDefs.CONFIG_NODE, qv.toString().getBytes(), -1, qv.getVersion(), System.currentTimeMillis());           
+        } catch (NoNodeException e) {
+           System.out.println("configuration node missing - should not happen");         
+        }
+    }
+ 
 }

Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/CommitProcessor.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/CommitProcessor.java?rev=1453693&r1=1453692&r2=1453693&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/CommitProcessor.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/CommitProcessor.java Thu Mar  7 06:01:49 2013
@@ -131,6 +131,7 @@ public class CommitProcessor extends Thr
             case OpCode.create:
             case OpCode.delete:
             case OpCode.setData:
+            case OpCode.reconfig:
             case OpCode.multi:
             case OpCode.setACL:
             case OpCode.createSession:

Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FastLeaderElection.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FastLeaderElection.java?rev=1453693&r1=1453692&r2=1453693&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FastLeaderElection.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FastLeaderElection.java Thu Mar  7 06:01:49 2013
@@ -30,8 +30,9 @@ import java.util.concurrent.TimeUnit;
 import org.apache.zookeeper.jmx.MBeanRegistry;
 import org.apache.zookeeper.server.quorum.QuorumCnxManager.Message;
 import org.apache.zookeeper.server.quorum.QuorumPeer.LearnerType;
-import org.apache.zookeeper.server.quorum.QuorumPeer.QuorumServer;
 import org.apache.zookeeper.server.quorum.QuorumPeer.ServerState;
+import org.apache.zookeeper.server.quorum.QuorumPeerConfig.ConfigException;
+import org.apache.zookeeper.server.quorum.flexible.QuorumVerifier;
 import org.apache.zookeeper.server.util.ZxidUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -108,13 +109,16 @@ public class FastLeaderElection implemen
          * Address of sender
          */
         long sid;
-
+        
+        QuorumVerifier qv;
         /*
          * epoch of the proposed leader
          */
         long peerEpoch;
     }
-
+    
+    static byte[] dummyData = new byte[0];
+    
     /**
      * Messages that a peer wants to send to other peers.
      * These messages can be both Notifications and Acks
@@ -129,14 +133,17 @@ public class FastLeaderElection implemen
                 long electionEpoch,
                 ServerState state,
                 long sid,
-                long peerEpoch) {
+                long peerEpoch,                
+                byte[] configData) {
+
 
             this.leader = leader;
             this.zxid = zxid;
             this.electionEpoch = electionEpoch;
             this.state = state;
             this.sid = sid;
-            this.peerEpoch = peerEpoch;
+            this.peerEpoch = peerEpoch;            
+            this.configData = configData;
         }
 
         /*
@@ -163,6 +170,11 @@ public class FastLeaderElection implemen
          * Address of recipient
          */
         long sid;
+
+        /*
+         * Used to send a QuorumVerifier (configuration info)
+         */
+        byte[] configData = dummyData;
         
         /*
          * Leader epoch
@@ -204,24 +216,83 @@ public class FastLeaderElection implemen
                     try{
                         response = manager.pollRecvQueue(3000, TimeUnit.MILLISECONDS);
                         if(response == null) continue;
-
+                        
+                        // The current protocol and two previous generations all send at least 28 bytes
+                        if (response.buffer.capacity() < 28) {
+                            LOG.error("Got a short response: " + response.buffer.capacity());
+                            continue;
+                        }
+                        
+                        // this is the backwardCompatibility mode in place before ZK-107
+                        // It is for a version of the protocol in which we didn't send peer epoch
+                        // With peer epoch the message became 36 bytes
+                        boolean backCompatibility28 = (response.buffer.capacity() == 28);
+                        
+                        // ZK-107 sends the configuration info in every message.
+                        // So messages are 36 bytes + size of configuration info 
+                        // (variable length, shoulld be at the end of the message).
+                        boolean backCompatibility36 = (response.buffer.capacity() == 36);
+
+                        response.buffer.clear();
+                        int rstate = response.buffer.getInt();
+                        long rleader = response.buffer.getLong();
+                        long rzxid = response.buffer.getLong();
+                        long relectionEpoch = response.buffer.getLong();
+                        long rpeerepoch;
+                        
+                        if(!backCompatibility28){
+                           rpeerepoch = response.buffer.getLong();
+                        } else {
+                            if(LOG.isInfoEnabled()){
+                                LOG.info("Backward compatibility mode (28 bits), server id: " + response.sid);
+                            }
+                            rpeerepoch = ZxidUtils.getEpochFromZxid(rzxid);
+                        }
+                        
+                        QuorumVerifier rqv = null;
+                        
+                        // check if we have more than 36 bytes. If so extract config info from message.
+                        if(!backCompatibility28 && !backCompatibility36){
+                           byte b[] = new byte[response.buffer.remaining()];
+                           response.buffer.get(b);
+                                                       
+                           synchronized(self){                             
+                               try {
+                                   rqv = self.configFromString(new String(b));                                             
+                                   if (rqv.getVersion() > self.getQuorumVerifier().getVersion()) {
+                                       LOG.info(self.getId() + " Received version: " + Long.toHexString(rqv.getVersion()) + " my version: " + Long.toHexString(self.getQuorumVerifier().getVersion()));
+                                       self.processReconfig(rqv, null, null, false);
+                                       LOG.info("restarting leader election");
+                                       self.shuttingDownLE = true;
+                                       self.getElectionAlg().shutdown();
+                                   }           
+                               } catch (IOException e) {                         
+                                   LOG.error("Something went wrong while processing config received from " + response.sid);
+                               } catch (ConfigException e) {
+                                   LOG.error("Something went wrong while processing config received from " + response.sid);
+							   } 
+                          }                                                       
+                        } else {
+                            if(LOG.isInfoEnabled()){
+                                LOG.info("Backward compatibility mode (before reconfig), server id: " + response.sid);
+                            }
+                        }
+                       
                         /*
-                         * If it is from an observer, respond right away.
-                         * Note that the following predicate assumes that
-                         * if a server is not a follower, then it must be
-                         * an observer. If we ever have any other type of
-                         * learner in the future, we'll have to change the
-                         * way we check for observers.
+                         * If it is from a non-voting server (such as an observer or 
+                         * a non-voting follower), respond right away. 
                          */
                         if(!self.getVotingView().containsKey(response.sid)){
                             Vote current = self.getCurrentVote();
+                            QuorumVerifier qv = self.getQuorumVerifier();
                             ToSend notmsg = new ToSend(ToSend.mType.notification,
                                     current.getId(),
                                     current.getZxid(),
                                     logicalclock,
                                     self.getPeerState(),
                                     response.sid,
-                                    current.getPeerEpoch());
+                                    current.getPeerEpoch(),
+                                    qv.toString().getBytes());
 
                             sendqueue.offer(notmsg);
                         } else {
@@ -231,20 +302,9 @@ public class FastLeaderElection implemen
                                         + self.getId());
                             }
 
-                            /*
-                             * We check for 28 bytes for backward compatibility
-                             */
-                            if (response.buffer.capacity() < 28) {
-                                LOG.error("Got a short response: "
-                                        + response.buffer.capacity());
-                                continue;
-                            }
-                            boolean backCompatibility = (response.buffer.capacity() == 28);
-                            response.buffer.clear();
-
                             // State of peer that sent this message
                             QuorumPeer.ServerState ackstate = QuorumPeer.ServerState.LOOKING;
-                            switch (response.buffer.getInt()) {
+                            switch (rstate) {
                             case 0:
                                 ackstate = QuorumPeer.ServerState.LOOKING;
                                 break;
@@ -261,20 +321,13 @@ public class FastLeaderElection implemen
 
                             // Instantiate Notification and set its attributes
                             Notification n = new Notification();
-                            n.leader = response.buffer.getLong();
-                            n.zxid = response.buffer.getLong();
-                            n.electionEpoch = response.buffer.getLong();
+                            n.leader = rleader;
+                            n.zxid = rzxid;
+                            n.electionEpoch = relectionEpoch;
                             n.state = ackstate;
-                            n.sid = response.sid;
-                            if(!backCompatibility){
-                                n.peerEpoch = response.buffer.getLong();
-                            } else {
-                                if(LOG.isInfoEnabled()){
-                                    LOG.info("Backward compatibility mode, server id=" + n.sid);
-                                }
-                                n.peerEpoch = ZxidUtils.getEpochFromZxid(n.zxid);
-                            }
-
+                            n.sid = response.sid;                            
+                            n.peerEpoch = rpeerepoch;
+                            n.qv = rqv;                              
                             /*
                              * Print notification info
                              */
@@ -297,13 +350,15 @@ public class FastLeaderElection implemen
                                 if((ackstate == QuorumPeer.ServerState.LOOKING)
                                         && (n.electionEpoch < logicalclock)){
                                     Vote v = getVote();
+                                    QuorumVerifier qv = self.getQuorumVerifier();
                                     ToSend notmsg = new ToSend(ToSend.mType.notification,
                                             v.getId(),
                                             v.getZxid(),
                                             logicalclock,
                                             self.getPeerState(),
                                             response.sid,
-                                            v.getPeerEpoch());
+                                            v.getPeerEpoch(),
+                                            qv.toString().getBytes());
                                     sendqueue.offer(notmsg);
                                 }
                             } else {
@@ -318,8 +373,11 @@ public class FastLeaderElection implemen
                                                 self.getId() + " recipient=" +
                                                 response.sid + " zxid=0x" +
                                                 Long.toHexString(current.getZxid()) +
-                                                " leader=" + current.getId());
+                                                " leader=" + current.getId() + " config version = " + 
+                                                Long.toHexString(self.getQuorumVerifier().getVersion()));
                                     }
+                                    
+                                    QuorumVerifier qv = self.getQuorumVerifier();
                                     ToSend notmsg = new ToSend(
                                             ToSend.mType.notification,
                                             current.getId(),
@@ -327,13 +385,14 @@ public class FastLeaderElection implemen
                                             logicalclock,
                                             self.getPeerState(),
                                             response.sid,
-                                            current.getPeerEpoch());
+                                            current.getPeerEpoch(), 
+                                            qv.toString().getBytes());
                                     sendqueue.offer(notmsg);
                                 }
                             }
                         }
                     } catch (InterruptedException e) {
-                        System.out.println("Interrupted Exception while waiting for new message" +
+                        LOG.warn("Interrupted Exception while waiting for new message" +
                                 e.toString());
                     }
                 }
@@ -341,6 +400,8 @@ public class FastLeaderElection implemen
             }
         }
 
+    
+  
 
         /**
          * This worker simply dequeues a message to send and
@@ -376,7 +437,7 @@ public class FastLeaderElection implemen
              * @param m     message to send
              */
             private void process(ToSend m) {
-                byte requestBytes[] = new byte[36];
+                byte requestBytes[] = new byte[36 + m.configData.length];
                 ByteBuffer requestBuffer = ByteBuffer.wrap(requestBytes);
 
                 /*
@@ -389,6 +450,7 @@ public class FastLeaderElection implemen
                 requestBuffer.putLong(m.zxid);
                 requestBuffer.putLong(m.electionEpoch);
                 requestBuffer.putLong(m.peerEpoch);
+                requestBuffer.put(m.configData);
 
                 manager.toSend(m.sid, requestBuffer);
 
@@ -503,22 +565,21 @@ public class FastLeaderElection implemen
         messenger.halt();
         LOG.debug("FLE is down");
     }
-
+    
 
     /**
      * Send notifications to all peers upon a change in our vote
      */
     private void sendNotifications() {
-        for (QuorumServer server : self.getVotingView().values()) {
-            long sid = server.id;
-
+        for (long sid : self.getAllKnownServerIds()) {
+            QuorumVerifier qv = self.getQuorumVerifier();
             ToSend notmsg = new ToSend(ToSend.mType.notification,
                     proposedLeader,
                     proposedZxid,
                     logicalclock,
                     QuorumPeer.ServerState.LOOKING,
                     sid,
-                    proposedEpoch);
+                    proposedEpoch, qv.toString().getBytes());
             if(LOG.isDebugEnabled()){
                 LOG.debug("Sending Notification: " + proposedLeader + " (n.leader), 0x"  +
                       Long.toHexString(proposedZxid) + " (n.zxid), 0x" + Long.toHexString(logicalclock)  +
@@ -529,15 +590,15 @@ public class FastLeaderElection implemen
         }
     }
 
-
     private void printNotification(Notification n){
         LOG.info("Notification: " + n.leader + " (n.leader), 0x"
                 + Long.toHexString(n.zxid) + " (n.zxid), 0x"
                 + Long.toHexString(n.electionEpoch) + " (n.round), " + n.state
                 + " (n.state), " + n.sid + " (n.sid), 0x"
                 + Long.toHexString(n.peerEpoch) + " (n.peerEPoch), "
-                + self.getPeerState() + " (my state)");
+                + self.getPeerState() + " (my state)" + (n.qv!=null ? (Long.toHexString(n.qv.getVersion()) + " (n.config version)"):""));
     }
+ 
 
     /**
      * Check if a pair (server id, zxid) succeeds our
@@ -571,8 +632,7 @@ public class FastLeaderElection implemen
      * have sufficient to declare the end of the election round.
      *
      *  @param votes    Set of votes
-     *  @param l        Identifier of the vote received last
-     *  @param zxid     zxid of the the vote received last
+     *  @param vote        Identifier of the vote received last
      */
     private boolean termPredicate(
             HashMap<Long, Vote> votes,

Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Follower.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Follower.java?rev=1453693&r1=1453692&r2=1453693&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Follower.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Follower.java Thu Mar  7 06:01:49 2013
@@ -20,12 +20,16 @@ package org.apache.zookeeper.server.quor
 
 import java.io.IOException;
 import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
 
 import org.apache.jute.Record;
+import org.apache.zookeeper.ZooDefs.OpCode;
+import org.apache.zookeeper.server.Request;
+import org.apache.zookeeper.server.quorum.flexible.QuorumVerifier;
 import org.apache.zookeeper.server.util.SerializeUtils;
 import org.apache.zookeeper.server.util.ZxidUtils;
+import org.apache.zookeeper.txn.SetDataTxn;
 import org.apache.zookeeper.txn.TxnHeader;
-
 /**
  * This class has the control logic for the Follower.
  */
@@ -68,7 +72,8 @@ public class Follower extends Learner{
             try {
                 connectToLeader(addr);
                 long newEpochZxid = registerWithLeader(Leader.FOLLOWERINFO);
-
+                if (self.isReconfigStateChange())
+                   throw new Exception("learned about role change");
                 //check to see if the leader zxid is lower than ours
                 //this should never happen but is just a safety check
                 long newEpoch = ZxidUtils.getEpochFromZxid(newEpochZxid);
@@ -83,7 +88,7 @@ public class Follower extends Learner{
                     readPacket(qp);
                     processPacket(qp);
                 }
-            } catch (IOException e) {
+            } catch (Exception e) {
                 LOG.warn("Exception when following the leader", e);
                 try {
                     sock.close();
@@ -104,12 +109,12 @@ public class Follower extends Learner{
      * @param qp
      * @throws IOException
      */
-    protected void processPacket(QuorumPacket qp) throws IOException{
+    protected void processPacket(QuorumPacket qp) throws Exception{
         switch (qp.getType()) {
         case Leader.PING:            
             ping(qp);            
             break;
-        case Leader.PROPOSAL:            
+        case Leader.PROPOSAL:           
             TxnHeader hdr = new TxnHeader();
             Record txn = SerializeUtils.deserializeTxn(qp.getData(), hdr);
             if (hdr.getZxid() != lastQueued + 1) {
@@ -119,11 +124,36 @@ public class Follower extends Learner{
                         + Long.toHexString(lastQueued + 1));
             }
             lastQueued = hdr.getZxid();
+            
+            if (hdr.getType() == OpCode.reconfig){
+               SetDataTxn setDataTxn = (SetDataTxn) txn;       
+               QuorumVerifier qv = self.configFromString(new String(setDataTxn.getData()));
+               self.setLastSeenQuorumVerifier(qv, true);                               
+            }
+            
             fzk.logRequest(hdr, txn);
             break;
         case Leader.COMMIT:
             fzk.commit(qp.getZxid());
             break;
+            
+        case Leader.COMMITANDACTIVATE:
+           // get the new configuration from the request
+           Request request = fzk.pendingTxns.element();
+           SetDataTxn setDataTxn = (SetDataTxn) request.getTxn();                                                                                                      
+           QuorumVerifier qv = self.configFromString(new String(setDataTxn.getData()));                                
+ 
+           // get new designated leader from (current) leader's message
+           ByteBuffer buffer = ByteBuffer.wrap(qp.getData());    
+           long suggestedLeaderId = buffer.getLong();
+            boolean majorChange = 
+                   self.processReconfig(qv, suggestedLeaderId, qp.getZxid(), true);
+           // commit (writes the new config to ZK tree (/zookeeper/config)                     
+           fzk.commit(qp.getZxid());
+            if (majorChange) {
+               throw new Exception("changes proposed in reconfig");
+           }
+           break;
         case Leader.UPTODATE:
             LOG.error("Received an UPTODATE message after Follower started");
             break;

Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FollowerRequestProcessor.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FollowerRequestProcessor.java?rev=1453693&r1=1453692&r2=1453693&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FollowerRequestProcessor.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FollowerRequestProcessor.java Thu Mar  7 06:01:49 2013
@@ -81,6 +81,7 @@ public class FollowerRequestProcessor ex
                 case OpCode.create:
                 case OpCode.delete:
                 case OpCode.setData:
+                case OpCode.reconfig:
                 case OpCode.setACL:
                 case OpCode.createSession:
                 case OpCode.closeSession:

Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Leader.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Leader.java?rev=1453693&r1=1453692&r2=1453693&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Leader.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Leader.java Thu Mar  7 06:01:49 2013
@@ -26,6 +26,7 @@ import java.net.ServerSocket;
 import java.net.Socket;
 import java.net.SocketAddress;
 import java.net.SocketException;
+import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
@@ -38,6 +39,7 @@ import java.util.concurrent.ConcurrentLi
 import java.util.concurrent.ConcurrentMap;
 
 import org.apache.jute.BinaryOutputArchive;
+import org.apache.zookeeper.ZooDefs.OpCode;
 import org.apache.zookeeper.server.FinalRequestProcessor;
 import org.apache.zookeeper.server.Request;
 import org.apache.zookeeper.server.RequestProcessor;
@@ -47,6 +49,7 @@ import org.apache.zookeeper.server.util.
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+
 /**
  * This class has the control logic for the Leader.
  */
@@ -61,7 +64,7 @@ public class Leader {
     static public class Proposal {
         public QuorumPacket packet;
 
-        public HashSet<Long> ackSet = new HashSet<Long>();
+        private ArrayList<QuorumVerifierAcksetPair> qvAcksetPairs = new ArrayList<QuorumVerifierAcksetPair>();
 
         public Request request;
 
@@ -69,12 +72,55 @@ public class Leader {
         public String toString() {
             return packet.getType() + ", " + packet.getZxid() + ", " + request;
         }
+
+        public void addQuorumVerifier(QuorumVerifier qv) {
+            qvAcksetPairs.add(new QuorumVerifierAcksetPair(qv,
+                    new HashSet<Long>(qv.getVotingMembers().size())));
+        }
+
+        public boolean addAck(Long sid) {
+            boolean change = false;
+            for (QuorumVerifierAcksetPair qvAckset : qvAcksetPairs) {
+                if (qvAckset.getQuorumVerifier().getVotingMembers().containsKey(sid)) {
+                    qvAckset.getAckset().add(sid);
+                    change = true;
+                }
+            }
+            return change;
+        }
+
+        public boolean hasAllQuorums() {
+            for (QuorumVerifierAcksetPair qvAckset : qvAcksetPairs) {
+                if (!qvAckset.getQuorumVerifier().containsQuorum(qvAckset.getAckset()))
+                    return false;
+            }
+            return true;
+        }
+
+        public static class QuorumVerifierAcksetPair {
+            private final QuorumVerifier _qv;
+            private final HashSet<Long> _ackset;
+
+            public QuorumVerifierAcksetPair(QuorumVerifier qv, HashSet<Long> ackset) {                
+                _qv = qv;
+                _ackset = ackset;
+            }
+
+            public QuorumVerifier getQuorumVerifier() {
+                return _qv;
+            }
+
+            public HashSet<Long> getAckset() {
+                return _ackset;
+            }
+        }
     }
 
     final LeaderZooKeeperServer zk;
 
     final QuorumPeer self;
 
+    
     // the follower acceptor thread
     volatile LearnerCnxAcceptor cnxAcceptor = null;
 
@@ -174,6 +220,27 @@ public class Leader {
         }
     }
 
+
+    /**
+     * Returns true if a quorum in qv is connected and synced with the leader
+     * and false otherwise
+     *  
+     * @param qv, a QuorumVerifier
+     */
+    public boolean isQuorumSynced(QuorumVerifier qv) {
+       HashSet<Long> ids = new HashSet<Long>();
+       if (qv.getVotingMembers().containsKey(self.getId()))
+           ids.add(self.getId());
+       synchronized (forwardingFollowers) {
+           for (LearnerHandler learnerHandler: forwardingFollowers){
+               if (learnerHandler.synced() && qv.getVotingMembers().containsKey(learnerHandler.getSid())){
+                   ids.add(learnerHandler.getSid());
+               }
+           }
+       }
+       return qv.containsQuorum(ids);
+    }
+    
     private final ServerSocket ss;
 
     Leader(QuorumPeer self,LeaderZooKeeperServer zk) throws IOException {
@@ -282,7 +349,17 @@ public class Leader {
      * This message type informs observers of a committed proposal.
      */
     final static int INFORM = 8;
-
+    
+    /**
+     * Similar to COMMIT, only for a reconfig operation.
+     */
+    final static int COMMITANDACTIVATE = 9;
+    
+    /**
+     * Similar to INFORM, only for a reconfig operation.
+     */
+    final static int INFORMANDACTIVATE = 19;
+    
     final ConcurrentMap<Long, Proposal> outstandingProposals = new ConcurrentHashMap<Long, Proposal>();
 
     private final ConcurrentLinkedQueue<Proposal> toBeApplied = new ConcurrentLinkedQueue<Proposal>();
@@ -333,6 +410,9 @@ public class Leader {
     long epoch = -1;
     boolean waitingForNewEpoch = true;
 
+    // when a reconfig occurs where the leader is removed or becomes an observer, 
+   // it does not commit ops after committing the reconfig
+    boolean allowedToCommit = true;     
     /**
      * This method is main function that is called to lead
      *
@@ -369,32 +449,45 @@ public class Leader {
             }
 
             newLeaderProposal.packet = new QuorumPacket(NEWLEADER, zk.getZxid(),
-                    null, null);
+                   null, null);
 
 
             if ((newLeaderProposal.packet.getZxid() & 0xffffffffL) != 0) {
                 LOG.info("NEWLEADER proposal has Zxid of "
                         + Long.toHexString(newLeaderProposal.packet.getZxid()));
             }
-            outstandingProposals.put(newLeaderProposal.packet.getZxid(), newLeaderProposal);
-            newLeaderProposal.ackSet.add(self.getId());
-
-            waitForEpochAck(self.getId(), leaderStateSummary);
-            self.setCurrentEpoch(epoch);
 
+            newLeaderProposal.addQuorumVerifier(self.getQuorumVerifier());
+            if (self.getLastSeenQuorumVerifier().getVersion() > self.getQuorumVerifier().getVersion()){
+               newLeaderProposal.addQuorumVerifier(self.getLastSeenQuorumVerifier());
+            }
+            newLeaderProposal.addAck(self.getId());
+            
+            outstandingProposals.put(newLeaderProposal.packet.getZxid(), newLeaderProposal);                                  
+            LOG.debug("put newleader into outstanding proposals");
             // We have to get at least a majority of servers in sync with
             // us. We do this by waiting for the NEWLEADER packet to get
             // acknowledged
-            while (!self.getQuorumVerifier().containsQuorum(newLeaderProposal.ackSet)){
+                       
+             waitForEpochAck(self.getId(), leaderStateSummary);
+             self.setCurrentEpoch(epoch);    
+            
+            while (!newLeaderProposal.hasAllQuorums()){           
             //while (newLeaderProposal.ackCount <= self.quorumPeers.size() / 2) {
                 if (self.tick > self.initLimit) {
                     // Followers aren't syncing fast enough,
                     // renounce leadership!
                     StringBuilder ackToString = new StringBuilder();
-                    for(Long id : newLeaderProposal.ackSet)
-                        ackToString.append(id + ": ");
-
-                    shutdown("Waiting for a quorum of followers, only synced with: " + ackToString);
+                    
+                    for (Proposal.QuorumVerifierAcksetPair qvAckset:newLeaderProposal.qvAcksetPairs) {
+                       if (ackToString.length() > 0) ackToString.append('\n');
+                       if (!qvAckset.getQuorumVerifier().containsQuorum(qvAckset.getAckset())) {
+                           ackToString.append("Configuration " + qvAckset.getQuorumVerifier().getVersion() + ": waiting for a quorum of followers, only synced with: ");
+                           for(Long id : qvAckset.getAckset())
+                                ackToString.append(id + " ");
+                       }
+                    }
+                    shutdown(ackToString.toString());
                     HashSet<Long> followerSet = new HashSet<Long>();
 
                     for(LearnerHandler f : getLearners()) {
@@ -402,9 +495,15 @@ public class Leader {
                             followerSet.add(f.getSid());
                         }
                     }
-
-                    if (self.getQuorumVerifier().containsQuorum(followerSet)) {
-                    //if (followers.size() >= self.quorumPeers.size() / 2) {
+                    
+                    boolean initTicksShouldBeIncreased = true;
+                    for (Proposal.QuorumVerifierAcksetPair qvAckset:newLeaderProposal.qvAcksetPairs) {
+                       if (!qvAckset.getQuorumVerifier().containsQuorum(followerSet)) {
+                           initTicksShouldBeIncreased = false;
+                           break;
+                       }
+                    }                  
+                    if (initTicksShouldBeIncreased) {
                         LOG.warn("Enough followers present. "+
                                 "Perhaps the initTicks need to be increased.");
                     }
@@ -469,7 +568,7 @@ public class Leader {
                 //if (!tickSkip && syncedCount < self.quorumPeers.size() / 2) {
                     // Lost quorum, shutdown
                   // TODO: message is wrong unless majority quorums used
-                    shutdown("Only " + syncedSet.size() + " followers, need "
+                    shutdown("Only " + syncedSet.size() + " voting followers, need "
                             + (self.getVotingView().size() / 2));
                     // make sure the order is the same!
                     // the leader goes to looking
@@ -525,15 +624,161 @@ public class Leader {
         isShutdown = true;
     }
 
+    /** In a reconfig operation, this method attempts to find the best leader for next configuration.
+     *  If the current leader is a voter in the next configuartion, then it remains the leader.
+     *  Otherwise, choose one of the new voters that acked the reconfiguartion, such that it is as   
+     * up-to-date as possible, i.e., acked as many outstanding proposals as possible.
+     *  
+     * @param reconfigProposal
+     * @param zxid of the reconfigProposal
+     * @return server if of the designated leader
+     */
+    
+    private long getDesignatedLeader(Proposal reconfigProposal, long zxid) {
+       //new configuration
+       Proposal.QuorumVerifierAcksetPair newQVAcksetPair = reconfigProposal.qvAcksetPairs.get(reconfigProposal.qvAcksetPairs.size()-1);        
+       
+       //check if I'm in the new configuration with the same quorum address - 
+       // if so, I'll remain the leader    
+       if (newQVAcksetPair.getQuorumVerifier().getVotingMembers().containsKey(self.getId()) && 
+               newQVAcksetPair.getQuorumVerifier().getVotingMembers().get(self.getId()).addr.equals(self.getQuorumAddress())){  
+           return self.getId();
+       }
+       // start with an initial set of candidates that are voters from new config that 
+       // acknowledged the reconfig op (there must be a quorum). Choose one of them as 
+       // current leader candidate
+       HashSet<Long> candidates = new HashSet<Long>(newQVAcksetPair.getAckset());
+       candidates.remove(self.getId()); // if we're here, I shouldn't be the leader
+       long curCandidate = candidates.iterator().next();
+       
+       //go over outstanding ops in order, and try to find a candidate that acked the most ops.
+       //this way it will be the most up-to-date and we'll minimize the number of ops that get dropped
+       
+       long curZxid = zxid + 1;
+       Proposal p = outstandingProposals.get(curZxid);
+               
+       while (p!=null && !candidates.isEmpty()) {                              
+           for (Proposal.QuorumVerifierAcksetPair qvAckset: p.qvAcksetPairs){ 
+               //reduce the set of candidates to those that acknowledged p
+               candidates.retainAll(qvAckset.getAckset());
+               //no candidate acked p, return the best candidate found so far
+               if (candidates.isEmpty()) return curCandidate;
+               //update the current candidate, and if it is the only one remaining, return it
+               curCandidate = candidates.iterator().next();
+               if (candidates.size() == 1) return curCandidate;
+           }      
+           curZxid++;
+           p = outstandingProposals.get(curZxid);
+       }
+       
+       return curCandidate;
+    }
+
+    /**
+     * @return True if committed, otherwise false.
+     * @param a proposal p
+     **/
+    synchronized public boolean tryToCommit(Proposal p, long zxid, SocketAddress followerAddr) {       
+       // make sure that ops are committed in order. With reconfigurations it is now possible
+       // that different operations wait for different sets of acks, and we still want to enforce
+       // that they are committed in order. Currently we only permit one outstanding reconfiguration
+       // such that the reconfiguration and subsequent outstanding ops proposed while the reconfig is
+       // pending all wait for a quorum of old and new config, so its not possible to get enough acks
+       // for an operation without getting enough acks for preceding ops. But in the future if multiple
+       // concurrent reconfigs are allowed, this can happen.
+       if (outstandingProposals.containsKey(zxid - 1)) return false;
+       
+       // getting a quorum from all necessary configurations
+        if (!p.hasAllQuorums()) {
+           return false;                 
+        }
+        
+        // commit proposals in order
+        if (zxid != lastCommitted+1) {    
+           LOG.warn("Commiting zxid 0x" + Long.toHexString(zxid)
+                    + " from " + followerAddr + " not first!");
+            LOG.warn("First is "
+                    + (lastCommitted+1));
+        }     
+        
+        // in order to be committed, a proposal must be accepted by a quorum              
+        
+        outstandingProposals.remove(zxid);
+        
+        if (p.request != null) {
+             toBeApplied.add(p);
+        }
+     
+        // We don't commit the new leader proposal
+        if ((zxid & 0xffffffffL) != 0) {
+           if (p.request == null) {
+                LOG.warn("Going to commmit null: " + p);
+           } else if (p.request.getHdr().getType() == OpCode.reconfig) {                                   
+                   LOG.debug("Committing a reconfiguration! " + outstandingProposals.size()); 
+                 
+                   //if this server is voter in new config with the same quorum address, 
+                   //then it will remain the leader
+                   //otherwise an up-to-date follower will be designated as leader. This saves
+                   //leader election time, unless the designated leader fails                             
+                    Long designatedLeader = getDesignatedLeader(p, zxid);
+                    //LOG.warn("designated leader is: " + designatedLeader);
+                    
+                    QuorumVerifier newQV = p.qvAcksetPairs.get(p.qvAcksetPairs.size()-1).getQuorumVerifier();
+                   
+                    self.processReconfig(newQV, designatedLeader, zk.getZxid(), true);
+                   
+                   if (designatedLeader != self.getId()) {
+                       allowedToCommit = false;
+                   }
+                   
+                   // we're sending the designated leader, and if the leader is changing the followers are 
+                   // responsible for closing the connection - this way we are sure that at least a majority of them 
+                   // receive the commit message.
+                   commitAndActivate(zxid, designatedLeader);
+                   informAndActivate(p, designatedLeader);
+                   //turnOffFollowers();
+               } else {
+                   commit(zxid);
+                   inform(p);
+               }
+               zk.commitProcessor.commit(p.request);
+               if(pendingSyncs.containsKey(zxid)){
+                   for(LearnerSyncRequest r: pendingSyncs.remove(zxid)) {
+                      sendSync(r);
+                   }
+               }                
+        } else {
+                lastCommitted = zxid;                
+                if(LOG.isInfoEnabled()){
+                    LOG.info("Have quorum of supporters; starting up and setting last processed zxid: " + zk.getZxid());
+                }
+                QuorumVerifier newQV = self.getLastSeenQuorumVerifier();
+               
+               Long designatedLeader = getDesignatedLeader(p, zxid);                                         
+               
+               self.processReconfig(newQV, designatedLeader, zk.getZxid(), true);
+               if (designatedLeader != self.getId()) {
+                   allowedToCommit = false;
+               }
+               LOG.debug("GOT QUORUM of ACKS FOR NEWLEADER msg " + allowedToCommit);       
+                zk.startup();
+                zk.getZKDatabase().setlastProcessedZxid(zk.getZxid());
+                
+        }
+        return  true;   
+    }
+    
     /**
      * Keep a count of acks that are received by the leader for a particular
      * proposal
      *
-     * @param zxid
-     *                the zxid of the proposal sent out
+     * @param zxid, the zxid of the proposal sent out
+     * @param sid, the id of the server that sent the ack
      * @param followerAddr
      */
-    synchronized public void processAck(long sid, long zxid, SocketAddress followerAddr) {
+    synchronized public void processAck(long sid, long zxid, SocketAddress followerAddr) {        
+        if (!allowedToCommit) return; // last op committed was a leader change - from now on 
+                                     // the new leader should commit        
         if (LOG.isTraceEnabled()) {
             LOG.trace("Ack zxid: 0x{}", Long.toHexString(zxid));
             for (Proposal p : outstandingProposals.values()) {
@@ -543,7 +788,6 @@ public class Leader {
             }
             LOG.trace("outstanding proposals all");
         }
-
         if (outstandingProposals.size() == 0) {
             if (LOG.isDebugEnabled()) {
                 LOG.debug("outstanding is 0");
@@ -564,46 +808,34 @@ public class Leader {
                     Long.toHexString(zxid), followerAddr);
             return;
         }
-
-        p.ackSet.add(sid);
-        if (LOG.isDebugEnabled()) {
+        
+        p.addAck(sid);        
+        /*if (LOG.isDebugEnabled()) {
             LOG.debug("Count for zxid: 0x{} is {}",
                     Long.toHexString(zxid), p.ackSet.size());
-        }
-        if (self.getQuorumVerifier().containsQuorum(p.ackSet)){
-            if (zxid != lastCommitted+1) {
-                LOG.warn("Commiting zxid 0x{} from {} not first!",
-                        Long.toHexString(zxid), followerAddr);
-                LOG.warn("First is 0x{}", Long.toHexString(lastCommitted + 1));
-            }
-            outstandingProposals.remove(zxid);
-            if (p.request != null) {
-                toBeApplied.add(p);
-            }
-            // We don't commit the new leader proposal
-            if ((zxid & 0xffffffffL) != 0) {
-                if (p.request == null) {
-                    LOG.warn("Going to commmit null request for proposal: {}", p);
-                }
-                commit(zxid);
-                inform(p);
-                zk.commitProcessor.commit(p.request);
-                if(pendingSyncs.containsKey(zxid)){
-                    for(LearnerSyncRequest r: pendingSyncs.remove(zxid)) {
-                        sendSync(r);
-                    }
-                }
-                return;
-            } else {
-                lastCommitted = zxid;
-                LOG.info("Have quorum of supporters; starting up and setting last processed zxid: 0x{}",
-                        Long.toHexString(zk.getZxid()));
-                zk.startup();
-                zk.getZKDatabase().setlastProcessedZxid(zk.getZxid());
-            }
+        }*/
+        
+        boolean hasCommitted = tryToCommit(p, zxid, followerAddr);
+
+        // If p is a reconfiguration, multiple other operations may be ready to be committed,
+        // since operations wait for different sets of acks.
+       // Currently we only permit one outstanding reconfiguration at a time
+       // such that the reconfiguration and subsequent outstanding ops proposed while the reconfig is
+       // pending all wait for a quorum of old and new config, so its not possible to get enough acks
+       // for an operation without getting enough acks for preceding ops. But in the future if multiple
+       // concurrent reconfigs are allowed, this can happen and then we need to check whether some pending
+        // ops may already have enough acks and can be committed, which is what this code does.
+
+        if (hasCommitted && p.request!=null && p.request.getHdr().getType() == OpCode.reconfig){
+               long curZxid = zxid;
+           while (allowedToCommit && hasCommitted && p!=null){
+               curZxid++;
+               p = outstandingProposals.get(curZxid);
+               if (p !=null) hasCommitted = tryToCommit(p, curZxid, null);             
+           }
         }
     }
-
+    
     static class ToBeAppliedRequestProcessor implements RequestProcessor {
         private final RequestProcessor next;
 
@@ -707,6 +939,20 @@ public class Leader {
         sendPacket(qp);
     }
 
+    //commit and send some info
+    public void commitAndActivate(long zxid, long designatedLeader) {
+        synchronized(this){
+            lastCommitted = zxid;
+        }
+        
+        byte data[] = new byte[8];
+        ByteBuffer buffer = ByteBuffer.wrap(data);                            
+       buffer.putLong(designatedLeader);
+       
+        QuorumPacket qp = new QuorumPacket(Leader.COMMITANDACTIVATE, zxid, data, null);
+        sendPacket(qp);
+    }
+
     /**
      * Create an inform packet and send it to all observers.
      * @param zxid
@@ -718,6 +964,23 @@ public class Leader {
         sendObserverPacket(qp);
     }
 
+    
+    /**
+     * Create an inform&activate packet and send it to all observers.
+     * @param zxid
+     * @param proposal
+     */
+    public void informAndActivate(Proposal proposal, long designatedLeader) {
+       byte[] proposalData = proposal.packet.getData();
+        byte[] data = new byte[proposalData.length + 8];
+        ByteBuffer buffer = ByteBuffer.wrap(data);                            
+       buffer.putLong(designatedLeader);
+       buffer.put(proposalData);
+       
+        QuorumPacket qp = new QuorumPacket(Leader.INFORMANDACTIVATE, proposal.request.zxid, data, null);
+        sendObserverPacket(qp);
+    }
+
     long lastProposed;
 
 
@@ -771,8 +1034,19 @@ public class Leader {
 
         Proposal p = new Proposal();
         p.packet = pp;
-        p.request = request;
-        synchronized (this) {
+        p.request = request;                
+        
+        synchronized(this) {
+           p.addQuorumVerifier(self.getQuorumVerifier());
+                   
+           if (request.getHdr().getType() == OpCode.reconfig){
+               self.setLastSeenQuorumVerifier(request.qv, true);                       
+           }
+           
+           if (self.getQuorumVerifier().getVersion()<self.getLastSeenQuorumVerifier().getVersion()) {
+               p.addQuorumVerifier(self.getLastSeenQuorumVerifier());
+           }
+                   
             if (LOG.isDebugEnabled()) {
                 LOG.debug("Proposing:: " + request);
             }
@@ -821,6 +1095,7 @@ public class Leader {
      *
      * @param handler handler of the follower
      * @return last proposed zxid
+     * @throws InterruptedException 
      */
     synchronized public long startForwarding(LearnerHandler handler,
             long lastSeenZxid) {
@@ -855,7 +1130,6 @@ public class Leader {
 
         return lastProposed;
     }
-
     private final HashSet<Long> connectingFollowers = new HashSet<Long>();
     public long getEpochToPropose(long sid, long lastAcceptedEpoch) throws InterruptedException, IOException {
         synchronized(connectingFollowers) {
@@ -957,6 +1231,8 @@ public class Leader {
             return "ACK";
         case COMMIT:
             return "COMMIT";
+        case COMMITANDACTIVATE:
+            return "COMMITANDACTIVATE";           
         case PING:
             return "PING";
         case REVALIDATE:
@@ -965,6 +1241,8 @@ public class Leader {
             return "SYNC";
         case INFORM:
             return "INFORM";
+        case INFORMANDACTIVATE:
+            return "INFORMANDACTIVATE";
         default:
             return "UNKNOWN";
         }

Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Learner.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Learner.java?rev=1453693&r1=1453692&r2=1453693&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Learner.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Learner.java Thu Mar  7 06:01:49 2013
@@ -40,12 +40,19 @@ import org.apache.jute.OutputArchive;
 import org.apache.jute.Record;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.apache.zookeeper.KeeperException.NoNodeException;
+import org.apache.zookeeper.ZooDefs.OpCode;
+import org.apache.zookeeper.data.Stat;
 import org.apache.zookeeper.server.Request;
 import org.apache.zookeeper.server.ServerCnxn;
 import org.apache.zookeeper.server.ZooTrace;
+import org.apache.zookeeper.server.quorum.QuorumPeer.LearnerType;
 import org.apache.zookeeper.server.quorum.QuorumPeer.QuorumServer;
+import org.apache.zookeeper.server.quorum.QuorumPeer.ServerState;
+import org.apache.zookeeper.server.quorum.flexible.QuorumVerifier;
 import org.apache.zookeeper.server.util.SerializeUtils;
 import org.apache.zookeeper.server.util.ZxidUtils;
+import org.apache.zookeeper.txn.SetDataTxn;
 import org.apache.zookeeper.txn.TxnHeader;
 
 /**
@@ -262,7 +269,7 @@ public class Learner {       
         /*
          * Add sid to payload
          */
-        LearnerInfo li = new LearnerInfo(self.getId(), 0x10000);
+        LearnerInfo li = new LearnerInfo(self.getId(), 0x10000, self.getQuorumVerifier().getVersion());
         ByteArrayOutputStream bsid = new ByteArrayOutputStream();
         BinaryOutputArchive boa = BinaryOutputArchive.getArchive(bsid);
         boa.writeRecord(li, "LearnerInfo");
@@ -309,11 +316,13 @@ public class Learner {       
      * @throws IOException
      * @throws InterruptedException
      */
-    protected void syncWithLeader(long newLeaderZxid) throws IOException, InterruptedException{
+    protected void syncWithLeader(long newLeaderZxid) throws Exception{
         QuorumPacket ack = new QuorumPacket(Leader.ACK, 0, null, null);
         QuorumPacket qp = new QuorumPacket();
         long newEpoch = ZxidUtils.getEpochFromZxid(newLeaderZxid);
         
+        QuorumVerifier newLeaderQV = null;
+        
         readPacket(qp);   
         LinkedList<Long> packetsCommitted = new LinkedList<Long>();
         LinkedList<PacketInFlight> packetsNotCommitted = new LinkedList<PacketInFlight>();
@@ -351,6 +360,7 @@ public class Learner {       
                 System.exit(13);
 
             }
+            zk.getZKDatabase().initConfigInZKDatabase(self.getQuorumVerifier());
             zk.getZKDatabase().setlastProcessedZxid(qp.getZxid());
             zk.createSessionTracker();            
             
@@ -376,14 +386,30 @@ public class Learner {       
                             + Long.toHexString(lastQueued + 1));
                     }
                     lastQueued = pif.hdr.getZxid();
+                    
+                    if (pif.hdr.getType() == OpCode.reconfig){                
+                        SetDataTxn setDataTxn = (SetDataTxn) pif.rec;       
+                       QuorumVerifier qv = self.configFromString(new String(setDataTxn.getData()));
+                       self.setLastSeenQuorumVerifier(qv, true);                               
+                    }
+                    
                     packetsNotCommitted.add(pif);
                     break;
                 case Leader.COMMIT:
+                case Leader.COMMITANDACTIVATE:
                     if (!snapshotTaken) {
                         pif = packetsNotCommitted.peekFirst();
                         if (pif.hdr.getZxid() != qp.getZxid()) {
                             LOG.warn("Committing " + qp.getZxid() + ", but next proposal is " + pif.hdr.getZxid());
                         } else {
+                           if (qp.getType() == Leader.COMMITANDACTIVATE) {
+                               QuorumVerifier qv = self.configFromString(new String(((SetDataTxn)pif.rec).getData()));
+                               boolean majorChange =
+                                       self.processReconfig(qv, ByteBuffer.wrap(qp.getData()).getLong(), qp.getZxid(), true);
+                                if (majorChange) {
+                                   throw new Exception("changes proposed in reconfig");
+                                }
+                           }
                             zk.processTxn(pif.hdr, pif.rec);
                             packetsNotCommitted.remove();
                         }
@@ -392,18 +418,53 @@ public class Learner {       
                     }
                     break;
                 case Leader.INFORM:
+                case Leader.INFORMANDACTIVATE: 
                     TxnHeader hdr = new TxnHeader();
-                    Record txn = SerializeUtils.deserializeTxn(qp.getData(), hdr);
+                    Record txn;
+                    if (qp.getType() == Leader.COMMITANDACTIVATE) {
+                       ByteBuffer buffer = ByteBuffer.wrap(qp.getData());    
+                       long suggestedLeaderId = buffer.getLong();                      
+                        byte[] remainingdata = new byte[buffer.remaining()];
+                        buffer.get(remainingdata);
+                        txn = SerializeUtils.deserializeTxn(remainingdata, hdr);
+                       QuorumVerifier qv = self.configFromString(new String(((SetDataTxn)txn).getData()));
+                       boolean majorChange =
+                               self.processReconfig(qv, suggestedLeaderId, qp.getZxid(), true);
+                        if (majorChange) {
+                           throw new Exception("changes proposed in reconfig");
+                        }
+                    } else {
+                       txn = SerializeUtils.deserializeTxn(qp.getData(), hdr);
+                    }
                     zk.processTxn(hdr, txn);
-                    break;
+                    break;                
                 case Leader.UPTODATE:
+                    LOG.info("Learner received UPTODATE message");                                      
+                    if (newLeaderQV!=null) {
+                       boolean majorChange =
+                           self.processReconfig(newLeaderQV, null, null, true);
+                       if (majorChange) {
+                           throw new Exception("changes proposed in reconfig");
+                       }
+                    }
                     if (!snapshotTaken) { // true for the pre v1.0 case
-                        zk.takeSnapshot();
+                       zk.takeSnapshot();
                         self.setCurrentEpoch(newEpoch);
                     }
-                    self.cnxnFactory.setZooKeeperServer(zk);                
+                    self.cnxnFactory.setZooKeeperServer(zk);
                     break outerLoop;
-                case Leader.NEWLEADER: // it will be NEWLEADER in v1.0
+                case Leader.NEWLEADER: // it will be NEWLEADER in v1.0        
+                   LOG.info("Learner received NEWLEADER message");
+                   if (qp.getData()!=null && qp.getData().length > 1) {
+                       try {                       
+                           QuorumVerifier qv = self.configFromString(new String(qp.getData()));
+                           self.setLastSeenQuorumVerifier(qv, true);
+                           newLeaderQV = qv;
+                       } catch (Exception e) {
+                           e.printStackTrace();
+                       }
+                   }
+                   
                     zk.takeSnapshot();
                     self.setCurrentEpoch(newEpoch);
                     snapshotTaken = true;

Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LearnerHandler.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LearnerHandler.java?rev=1453693&r1=1453692&r2=1453693&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LearnerHandler.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LearnerHandler.java Thu Mar  7 06:01:49 2013
@@ -245,19 +245,24 @@ public class LearnerHandler extends Thre
                         + " is not FOLLOWERINFO or OBSERVERINFO!");
                 return;
             }
+ 
             byte learnerInfoData[] = qp.getData();
             if (learnerInfoData != null) {
-            	if (learnerInfoData.length == 8) {
-            		ByteBuffer bbsid = ByteBuffer.wrap(learnerInfoData);
-            		this.sid = bbsid.getLong();
-            	} else {
-            		LearnerInfo li = new LearnerInfo();
-            		ByteBufferInputStream.byteBuffer2Record(ByteBuffer.wrap(learnerInfoData), li);
-            		this.sid = li.getServerid();
-            		this.version = li.getProtocolVersion();
-            	}
+                ByteBuffer bbsid = ByteBuffer.wrap(learnerInfoData);
+                if (learnerInfoData.length >= 8) {
+                    this.sid = bbsid.getLong();
+                } 
+                if (learnerInfoData.length >= 12) {
+                    this.version = bbsid.getInt(); // protocolVersion
+                }
+                if (learnerInfoData.length >= 20) {
+                    long configVersion = bbsid.getLong();
+                    if (configVersion > leader.self.getQuorumVerifier().getVersion()) {
+                        throw new IOException("Follower is ahead of the leader (has a later activated configuration)");                     
+                    }
+                }
             } else {
-            	this.sid = leader.followerCounter.getAndDecrement();
+                this.sid = leader.followerCounter.getAndDecrement();
             }
 
             if (leader.self.getView().containsKey(this.sid)) {
@@ -277,6 +282,7 @@ public class LearnerHandler extends Thre
             StateSummary ss = null;
             long zxid = qp.getZxid();
             long newEpoch = leader.getEpochToPropose(this.getSid(), lastAcceptedEpoch);
+            long newLeaderZxid = ZxidUtils.makeZxid(newEpoch, 0);
 
             if (this.getVersion() < 0x10000) {
                 // we are going to have to extrapolate the epoch information
@@ -287,7 +293,7 @@ public class LearnerHandler extends Thre
             } else {
                 byte ver[] = new byte[4];
                 ByteBuffer.wrap(ver).putInt(0x10000);
-                QuorumPacket newEpochPacket = new QuorumPacket(Leader.LEADERINFO, ZxidUtils.makeZxid(newEpoch, 0), ver, null);
+                QuorumPacket newEpochPacket = new QuorumPacket(Leader.LEADERINFO, newLeaderZxid, ver, null);
                 oa.writeRecord(newEpochPacket, "packet");
                 bufferedOutput.flush();
                 QuorumPacket ackEpochPacket = new QuorumPacket();
@@ -395,19 +401,25 @@ public class LearnerHandler extends Thre
                     // just let the state transfer happen
                     LOG.debug("proposals is empty");
                 }               
-
                 LOG.info("Sending " + Leader.getPacketType(packetToSend));
                 leaderLastZxid = leader.startForwarding(this, updates);
 
             } finally {
                 rl.unlock();
             }
-
-             QuorumPacket newLeaderQP = new QuorumPacket(Leader.NEWLEADER,
-                    ZxidUtils.makeZxid(newEpoch, 0), null, null);
+          
+            LOG.debug("Sending NEWLEADER message to " + sid);
+            // the version of this quorumVerifier will be set by leader.lead() in case
+            // the leader is just being established. waitForEpochAck makes sure that readyToStart is true if
+            // we got here, so the version was set
+            
              if (getVersion() < 0x10000) {
+            	 QuorumPacket newLeaderQP = new QuorumPacket(Leader.NEWLEADER,
+                 		newLeaderZxid, null, null);
                 oa.writeRecord(newLeaderQP, "packet");
             } else {
+            	QuorumPacket newLeaderQP = new QuorumPacket(Leader.NEWLEADER,
+                		newLeaderZxid, leader.self.getLastSeenQuorumVerifier().toString().getBytes(), null);
                 queuedPackets.add(newLeaderQP);
             }
             bufferedOutput.flush();
@@ -456,6 +468,7 @@ public class LearnerHandler extends Thre
                 LOG.error("Next packet was supposed to be an ACK");
                 return;
             }
+            LOG.debug("Received NEWLEADER-ACK message from " + sid);   
             leader.processAck(this.sid, qp.getZxid(), sock.getLocalSocketAddress());
             
             // now that the ack has been processed expect the syncLimit
@@ -473,6 +486,7 @@ public class LearnerHandler extends Thre
             // so we need to mark when the peer can actually start
             // using the data
             //
+            LOG.debug("Sending UPTODATE message to " + sid);      
             queuedPackets.add(new QuorumPacket(Leader.UPTODATE, -1, null, null));
 
             while (true) {

Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Observer.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Observer.java?rev=1453693&r1=1453692&r2=1453693&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Observer.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Observer.java Thu Mar  7 06:01:49 2013
@@ -20,11 +20,19 @@ package org.apache.zookeeper.server.quor
 
 import java.io.IOException;
 import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
 
 import org.apache.jute.Record;
+import org.apache.zookeeper.ZooDefs.OpCode;
 import org.apache.zookeeper.server.ObserverBean;
 import org.apache.zookeeper.server.Request;
+import org.apache.zookeeper.server.quorum.QuorumPeer.LearnerType;
+import org.apache.zookeeper.server.quorum.QuorumPeer.QuorumServer;
+import org.apache.zookeeper.server.quorum.QuorumPeer.ServerState;
+import org.apache.zookeeper.server.quorum.QuorumPeerConfig.ConfigException;
+import org.apache.zookeeper.server.quorum.flexible.QuorumVerifier;
 import org.apache.zookeeper.server.util.SerializeUtils;
+import org.apache.zookeeper.txn.SetDataTxn;
 import org.apache.zookeeper.txn.TxnHeader;
 
 /**
@@ -54,10 +62,9 @@ public class Observer extends Learner{
 
     /**
      * the main method called by the observer to observe the leader
-     *
-     * @throws InterruptedException
+     * @throws Exception 
      */
-    void observeLeader() throws InterruptedException {
+    void observeLeader() throws Exception {
         zk.registerJMX(new ObserverBean(this, zk), self.jmxLocalPeerBean);
 
         try {
@@ -66,7 +73,9 @@ public class Observer extends Learner{
             try {
                 connectToLeader(addr);
                 long newLeaderZxid = registerWithLeader(Leader.OBSERVERINFO);
-
+                if (self.isReconfigStateChange())
+                   throw new Exception("learned about role change");
+ 
                 syncWithLeader(newLeaderZxid);
                 QuorumPacket qp = new QuorumPacket();
                 while (self.isRunning()) {
@@ -92,9 +101,9 @@ public class Observer extends Learner{
     /**
      * Controls the response of an observer to the receipt of a quorumpacket
      * @param qp
-     * @throws IOException
+     * @throws Exception 
      */
-    protected void processPacket(QuorumPacket qp) throws IOException{
+    protected void processPacket(QuorumPacket qp) throws Exception{
         switch (qp.getType()) {
         case Leader.PING:
             ping(qp);
@@ -121,6 +130,30 @@ public class Observer extends Learner{
             ObserverZooKeeperServer obs = (ObserverZooKeeperServer)zk;
             obs.commitRequest(request);
             break;
+        case Leader.INFORMANDACTIVATE:            
+            hdr = new TxnHeader();
+            
+           // get new designated leader from (current) leader's message
+            ByteBuffer buffer = ByteBuffer.wrap(qp.getData());    
+           long suggestedLeaderId = buffer.getLong();
+           
+            byte[] remainingdata = new byte[buffer.remaining()];
+            buffer.get(remainingdata);
+            txn = SerializeUtils.deserializeTxn(remainingdata, hdr);
+            QuorumVerifier qv = self.configFromString(new String(((SetDataTxn)txn).getData()));
+            
+            request = new Request (hdr.getClientId(),  hdr.getCxid(), hdr.getType(), hdr, txn, 0);
+            obs = (ObserverZooKeeperServer)zk;
+                        
+            boolean majorChange = 
+                self.processReconfig(qv, suggestedLeaderId, qp.getZxid(), true);
+           
+            obs.commitRequest(request);                                 
+
+            if (majorChange) {
+               throw new Exception("changes proposed in reconfig");
+           }            
+            break;
         }
     }
 

Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/ObserverRequestProcessor.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/ObserverRequestProcessor.java?rev=1453693&r1=1453692&r2=1453693&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/ObserverRequestProcessor.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/ObserverRequestProcessor.java Thu Mar  7 06:01:49 2013
@@ -89,6 +89,7 @@ public class ObserverRequestProcessor ex
                 case OpCode.create:
                 case OpCode.delete:
                 case OpCode.setData:
+                case OpCode.reconfig:
                 case OpCode.setACL:
                 case OpCode.createSession:
                 case OpCode.closeSession:

Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumBean.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumBean.java?rev=1453693&r1=1453692&r2=1453693&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumBean.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumBean.java Thu Mar  7 06:01:49 2013
@@ -27,7 +27,7 @@ public class QuorumBean implements Quoru
     
     public QuorumBean(QuorumPeer peer){
         this.peer = peer;
-        name = "ReplicatedServer_id" + peer.getMyid();
+        name = "ReplicatedServer_id" + peer.getId();
     }
     
     public String getName() {