You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by fp...@apache.org on 2014/07/18 20:48:57 UTC

svn commit: r1611765 - in /zookeeper/trunk: ./ src/java/main/org/apache/zookeeper/server/quorum/ src/java/test/org/apache/zookeeper/server/quorum/

Author: fpj
Date: Fri Jul 18 18:48:56 2014
New Revision: 1611765

URL: http://svn.apache.org/r1611765
Log:
ZOOKEEPER-1807. Observers spam each other creating connections to the election addr (Alex Shraer via fpj)


Modified:
    zookeeper/trunk/CHANGES.txt
    zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FastLeaderElection.java
    zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java
    zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/QuorumPeerTestBase.java
    zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/ReconfigRecoveryTest.java

Modified: zookeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/zookeeper/trunk/CHANGES.txt?rev=1611765&r1=1611764&r2=1611765&view=diff
==============================================================================
--- zookeeper/trunk/CHANGES.txt (original)
+++ zookeeper/trunk/CHANGES.txt Fri Jul 18 18:48:56 2014
@@ -708,6 +708,9 @@ BUGFIXES:
   ZOOKEEPER-1851. Follower and Observer Request Processors Do Not Forward
   create2 Requests (Chris Chen via rakeshr)
 
+  ZOOKEEPER-1807. Observers spam each other creating connections to the
+  election addr (Alex Shraer via fpj)
+
 IMPROVEMENTS:
 
   ZOOKEEPER-1170. Fix compiler (eclipse) warnings: unused imports,

Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FastLeaderElection.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FastLeaderElection.java?rev=1611765&r1=1611764&r2=1611765&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FastLeaderElection.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FastLeaderElection.java Fri Jul 18 18:48:56 2014
@@ -315,7 +315,7 @@ public class FastLeaderElection implemen
                          * If it is from a non-voting server (such as an observer or
                          * a non-voting follower), respond right away.
                          */
-                        if(!self.getVotingView().containsKey(response.sid)){
+                        if(!self.getCurrentAndNextConfigVoters().contains(response.sid)) {
                             Vote current = self.getCurrentVote();
                             QuorumVerifier qv = self.getQuorumVerifier();
                             ToSend notmsg = new ToSend(ToSend.mType.notification,
@@ -658,7 +658,7 @@ public class FastLeaderElection implemen
      * Send notifications to all peers upon a change in our vote
      */
     private void sendNotifications() {
-        for (long sid : self.getAllKnownServerIds()) {
+        for (long sid : self.getCurrentAndNextConfigVoters()) {
             QuorumVerifier qv = self.getQuorumVerifier();
             ToSend notmsg = new ToSend(ToSend.mType.notification,
                     proposedLeader,
@@ -718,31 +718,36 @@ public class FastLeaderElection implemen
     }
 
     /**
-     * Termination predicate. Given a set of votes, determines if
-     * have sufficient to declare the end of the election round.
-     *
-     *  @param votes    Set of votes
-     *  @param vote        Identifier of the vote received last
-     */
-    private boolean termPredicate(
-            HashMap<Long, Vote> votes,
-            Vote vote) {
-
-        HashSet<Long> set = new HashSet<Long>();
+     * Termination predicate. Given a set of votes, determines if have
+     * sufficient to declare the end of the election round.
+     * 
+     * @param votes
+     *            Set of votes
+     * @param vote
+     *            Identifier of the vote received last
+     */
+    private boolean termPredicate(HashMap<Long, Vote> votes, Vote vote) {
+        SyncedLearnerTracker voteSet = new SyncedLearnerTracker();
+        voteSet.addQuorumVerifier(self.getQuorumVerifier());
+        if (self.getLastSeenQuorumVerifier() != null
+                && self.getLastSeenQuorumVerifier().getVersion() > self
+                        .getQuorumVerifier().getVersion()) {
+            voteSet.addQuorumVerifier(self.getLastSeenQuorumVerifier());
+        }
 
         /*
-         * First make the views consistent. Sometimes peers will have
-         * different zxids for a server depending on timing.
+         * First make the views consistent. Sometimes peers will have different
+         * zxids for a server depending on timing.
          */
-        for (Map.Entry<Long,Vote> entry : votes.entrySet()) {
-            if (self.getQuorumVerifier().getVotingMembers().containsKey(entry.getKey())
-                    && vote.equals(entry.getValue())){
-                set.add(entry.getKey());
+        for (Map.Entry<Long, Vote> entry : votes.entrySet()) {
+            if (vote.equals(entry.getValue())) {
+                voteSet.addAck(entry.getKey());
             }
         }
 
-        return self.getQuorumVerifier().containsQuorum(set);
+        return voteSet.hasAllQuorums();
     }
+
     /**
      * In the case there is a leader elected, and a quorum supporting
      * this leader, we have to check if the leader has voted and acked
@@ -914,10 +919,10 @@ public class FastLeaderElection implemen
                     notTimeout = (tmpTimeOut < maxNotificationInterval?
                             tmpTimeOut : maxNotificationInterval);
                     LOG.info("Notification time out: " + notTimeout);
-                }
-                else if(self.getVotingView().containsKey(n.sid)) {
+                } 
+                else if (self.getCurrentAndNextConfigVoters().contains(n.sid)) {
                     /*
-                     * Only proceed if the vote comes from a replica in the
+                     * Only proceed if the vote comes from a replica in the current or next
                      * voting view.
                      */
                     switch (n.state) {

Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java?rev=1611765&r1=1611764&r2=1611765&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java Fri Jul 18 18:48:56 2014
@@ -1107,12 +1107,14 @@ public class QuorumPeer extends ZooKeepe
        return getQuorumVerifier().getObservingMembers();
     }
 
-    public synchronized Set<Long> getAllKnownServerIds(){
-       Set<Long> tmp = new HashSet<Long>(getQuorumVerifier().getAllMembers().keySet());
-       if (getLastSeenQuorumVerifier()!=null) {
-           tmp.addAll(getLastSeenQuorumVerifier().getAllMembers().keySet());
-       }
-       return tmp;
+    public synchronized Set<Long> getCurrentAndNextConfigVoters() {
+        Set<Long> voterIds = new HashSet<Long>(getQuorumVerifier()
+                .getVotingMembers().keySet());
+        if (getLastSeenQuorumVerifier() != null) {
+            voterIds.addAll(getLastSeenQuorumVerifier().getVotingMembers()
+                    .keySet());
+        }
+        return voterIds;
     }
     
     /**

Modified: zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/QuorumPeerTestBase.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/QuorumPeerTestBase.java?rev=1611765&r1=1611764&r2=1611765&view=diff
==============================================================================
--- zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/QuorumPeerTestBase.java (original)
+++ zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/QuorumPeerTestBase.java Fri Jul 18 18:48:56 2014
@@ -55,21 +55,6 @@ public class QuorumPeerTestBase extends 
         }
     }
     
-    public static class MainThreadReconfigRecovery extends MainThread {
-       final File nextDynamicConfigFile;
-       
-       public MainThreadReconfigRecovery(int myid, int clientPort,
-               String currentQuorumCfgSection, String nextQuorumCfgSection) 
-                       throws IOException {
-           super(myid, clientPort, currentQuorumCfgSection);
-           nextDynamicConfigFile = new File(tmpDir, "zoo.dynamic.next");
-           FileWriter fwriter = new FileWriter(nextDynamicConfigFile);
-            fwriter.write(nextQuorumCfgSection + "\n");
-            fwriter.flush();
-            fwriter.close();
-       }               
-    }
-    
     public static class MainThread implements Runnable {
         final File confFile;
         final File dynamicConfigFile;
@@ -141,7 +126,16 @@ public class QuorumPeerTestBase extends 
             fwriter.flush();
             fwriter.close();
         }
-        
+
+        public void writeTempDynamicConfigFile(String nextQuorumCfgSection)
+                throws IOException {
+            File nextDynamicConfigFile = new File(tmpDir, "zoo.dynamic.next");
+            FileWriter fwriter = new FileWriter(nextDynamicConfigFile);
+            fwriter.write(nextQuorumCfgSection + "\n");
+            fwriter.flush();
+            fwriter.close();
+        }
+
         Thread currentThread;
 
         synchronized public void start() {

Modified: zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/ReconfigRecoveryTest.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/ReconfigRecoveryTest.java?rev=1611765&r1=1611764&r2=1611765&view=diff
==============================================================================
--- zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/ReconfigRecoveryTest.java (original)
+++ zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/ReconfigRecoveryTest.java Fri Jul 18 18:48:56 2014
@@ -21,6 +21,7 @@ package org.apache.zookeeper.server.quor
 import static org.apache.zookeeper.test.ClientBase.CONNECTION_TIMEOUT;
 
 import java.util.ArrayList;
+import java.util.HashSet;
 
 import org.apache.zookeeper.PortAssignment;
 import org.apache.zookeeper.ZooKeeper;
@@ -31,13 +32,13 @@ import org.junit.Test;
 
 public class ReconfigRecoveryTest extends QuorumPeerTestBase {
     /**
-     * Reconfiguration recovery - test that a reconfiguration is completed 
-     * if leader has .next file during startup and new config is not running yet
+     * Reconfiguration recovery - test that a reconfiguration is completed if
+     * leader has .next file during startup and new config is not running yet
      */
     @Test
     public void testNextConfigCompletion() throws Exception {
         ClientBase.setupTestEnv();
-        
+
         // 2 servers in current config, 3 in next config
         final int SERVER_COUNT = 3;
         final int clientPorts[] = new int[SERVER_COUNT];
@@ -46,96 +47,84 @@ public class ReconfigRecoveryTest extend
         ArrayList<String> allServers = new ArrayList<String>();
 
         String currentQuorumCfgSection = null, nextQuorumCfgSection;
-        
-        for(int i = 0; i < SERVER_COUNT; i++) {
-               clientPorts[i] = PortAssignment.unique();
-               server = "server." + i + "=localhost:"+PortAssignment.unique()+":"+PortAssignment.unique() + 
-                      ":participant;localhost:" + clientPorts[i];        
-               allServers.add(server);
-               sb.append(server +"\n");
-               if (i == 1) currentQuorumCfgSection = sb.toString();
+
+        for (int i = 0; i < SERVER_COUNT; i++) {
+            clientPorts[i] = PortAssignment.unique();
+            server = "server." + i + "=localhost:" + PortAssignment.unique()
+                    + ":" + PortAssignment.unique() + ":participant;localhost:"
+                    + clientPorts[i];
+            allServers.add(server);
+            sb.append(server + "\n");
+            if (i == 1)
+                currentQuorumCfgSection = sb.toString() + "version=100000000\n";
         }
         sb.append("version=200000000\n"); // version of current config is 100000000
         nextQuorumCfgSection = sb.toString();
-        
+
         // Both servers 0 and 1 will have the .next config file, which means
         // for them that a reconfiguration was in progress when they failed
         // and the leader will complete it
         MainThread mt[] = new MainThread[SERVER_COUNT];
         ZooKeeper zk[] = new ZooKeeper[SERVER_COUNT];
         for (int i = 0; i < SERVER_COUNT - 1; i++) {
-            mt[i] = new MainThreadReconfigRecovery(i, clientPorts[i], currentQuorumCfgSection, nextQuorumCfgSection);
+            mt[i] = new MainThread(i, clientPorts[i], currentQuorumCfgSection);
+            // note that we should run the server, shut it down and only then
+            // simulate a reconfig in progress by writing the temp file, but here no
+            // other server is competing with them in FLE, so we can skip this step
+            // (server 2 is booted after FLE ends)
+            mt[i].writeTempDynamicConfigFile(nextQuorumCfgSection);
             mt[i].start();
-            zk[i] = new ZooKeeper("127.0.0.1:" + clientPorts[i], ClientBase.CONNECTION_TIMEOUT, this);
+            zk[i] = new ZooKeeper("127.0.0.1:" + clientPorts[i],
+                    ClientBase.CONNECTION_TIMEOUT, this);
         }
-        
-        Assert.assertTrue("waiting for server 0 being up",
-                ClientBase.waitForServerUp("127.0.0.1:" + clientPorts[0],
+
+        Assert.assertTrue("waiting for server 0 being up", ClientBase
+                .waitForServerUp("127.0.0.1:" + clientPorts[0],
                         CONNECTION_TIMEOUT));
-        Assert.assertTrue("waiting for server 1 being up",
-                ClientBase.waitForServerUp("127.0.0.1:" + clientPorts[1],
+        Assert.assertTrue("waiting for server 1 being up", ClientBase
+                .waitForServerUp("127.0.0.1:" + clientPorts[1],
                         CONNECTION_TIMEOUT));
-         
-        int leader = mt[0].main.quorumPeer.leader == null ? 1: 0;
-              
+
+        int leader = mt[0].main.quorumPeer.leader == null ? 1 : 0;
+
         // the new server's config is going to include itself and the current leader
         sb = new StringBuilder();
         sb.append(allServers.get(leader) + "\n");
         sb.append(allServers.get(2) + "\n");
-        
+
         // suppose that this new server never heard about the reconfig proposal
         String newServerInitialConfig = sb.toString();
         mt[2] = new MainThread(2, clientPorts[2], newServerInitialConfig);
         mt[2].start();
-        zk[2] = new ZooKeeper("127.0.0.1:" + clientPorts[2], ClientBase.CONNECTION_TIMEOUT, this);
+        zk[2] = new ZooKeeper("127.0.0.1:" + clientPorts[2],
+                ClientBase.CONNECTION_TIMEOUT, this);
 
-       Assert.assertTrue("waiting for server 2 being up",
-                ClientBase.waitForServerUp("127.0.0.1:" + clientPorts[2],
+        Assert.assertTrue("waiting for server 2 being up", ClientBase
+                .waitForServerUp("127.0.0.1:" + clientPorts[2],
                         CONNECTION_TIMEOUT));
-        
-        ReconfigTest.testServerHasConfig(zk[0], allServers, null);
-        ReconfigTest.testServerHasConfig(zk[1], allServers, null);
-        ReconfigTest.testServerHasConfig(zk[2], allServers, null);
-        
-        ReconfigTest.testNormalOperation(zk[0], zk[2]);
-        ReconfigTest.testNormalOperation(zk[2], zk[1]);
-
-        zk[2].close();
-        mt[2].shutdown();
-        
-        //now suppose that the new server heard the reconfig request
-        mt[2] = new MainThreadReconfigRecovery(2, clientPorts[2], newServerInitialConfig, nextQuorumCfgSection);
-        mt[2].start();
-        zk[2] = new ZooKeeper("127.0.0.1:" + clientPorts[2], ClientBase.CONNECTION_TIMEOUT, this);
 
-       Assert.assertTrue("waiting for server 2 being up",
-                ClientBase.waitForServerUp("127.0.0.1:" + clientPorts[2],
-                        CONNECTION_TIMEOUT));
-        
         ReconfigTest.testServerHasConfig(zk[0], allServers, null);
         ReconfigTest.testServerHasConfig(zk[1], allServers, null);
         ReconfigTest.testServerHasConfig(zk[2], allServers, null);
-        
+
         ReconfigTest.testNormalOperation(zk[0], zk[2]);
         ReconfigTest.testNormalOperation(zk[2], zk[1]);
 
         for (int i = 0; i < SERVER_COUNT; i++) {
-            zk[i].close();
-        }
-        for (int i = 0; i < SERVER_COUNT; i++) {
             mt[i].shutdown();
+            zk[i].close();
         }
     }
 
     /**
      * Reconfiguration recovery - current config servers discover .next file,
-     * but they're both observers and their ports change in next config. Suppose that next config wasn't activated yet.
-     * Should complete reconfiguration. 
+     * but they're both observers and their ports change in next config. Suppose
+     * that next config wasn't activated yet. Should complete reconfiguration.
      */
     @Test
     public void testCurrentServersAreObserversInNextConfig() throws Exception {
         ClientBase.setupTestEnv();
-        
+
         // 2 servers in current config, 5 in next config
         final int SERVER_COUNT = 5;
         final int clientPorts[] = new int[SERVER_COUNT];
@@ -143,82 +132,118 @@ public class ReconfigRecoveryTest extend
         StringBuilder sb = new StringBuilder();
         String server;
 
-        String currentQuorumCfgSection = null, nextQuorumCfgSection;
-        
+        String currentQuorumCfg = null, currentQuorumCfgSection = null, nextQuorumCfgSection = null;
+
         ArrayList<String> allServersCurrent = new ArrayList<String>();
         ArrayList<String> allServersNext = new ArrayList<String>();
-        
-        
-        for(int i = 0; i < 2; i++) {
-               oldClientPorts[i] = PortAssignment.unique();
-               server = "server." + i + "=localhost:"+PortAssignment.unique()+":"+PortAssignment.unique() + 
-                      ":participant;localhost:" + oldClientPorts[i];        
-               allServersCurrent.add(server);
-               sb.append(server +"\n");
+
+        for (int i = 0; i < 2; i++) {
+            oldClientPorts[i] = PortAssignment.unique();
+            server = "server." + i + "=localhost:" + PortAssignment.unique()
+                    + ":" + PortAssignment.unique() + ":participant;localhost:"
+                    + oldClientPorts[i];
+            allServersCurrent.add(server);
+            sb.append(server + "\n");
         }
-        
+
+        currentQuorumCfg = sb.toString();
+        sb.append("version=100000000\n");
         currentQuorumCfgSection = sb.toString();
+
         sb = new StringBuilder();
         String role;
-        for (int i=0; i<SERVER_COUNT; i++) {
+        for (int i = 0; i < SERVER_COUNT; i++) {
             clientPorts[i] = PortAssignment.unique();
-            if (i < 2) role = "observer";
-            else role = "participant";
-            server = "server." + i + "=localhost:"+PortAssignment.unique()+":"+PortAssignment.unique() + 
-                  ":" + role + ";localhost:" + clientPorts[i];        
+            if (i < 2) {
+                role = "observer";
+            } else {
+                role = "participant";
+            }
+            server = "server." + i + "=localhost:" + PortAssignment.unique()
+                    + ":" + PortAssignment.unique() + ":" + role
+                    + ";localhost:" + clientPorts[i];
             allServersNext.add(server);
-            sb.append(server +"\n");
-           
+            sb.append(server + "\n");
         }
         sb.append("version=200000000\n"); // version of current config is 100000000
         nextQuorumCfgSection = sb.toString();
-        
+
         MainThread mt[] = new MainThread[SERVER_COUNT];
         ZooKeeper zk[] = new ZooKeeper[SERVER_COUNT];
 
+        // run servers 0 and 1 normally
         for (int i = 0; i < 2; i++) {
-            mt[i] = new MainThreadReconfigRecovery(i, oldClientPorts[i], currentQuorumCfgSection, nextQuorumCfgSection);
+            mt[i] = new MainThread(i, oldClientPorts[i],
+                    currentQuorumCfgSection);
             mt[i].start();
-            zk[i] = new ZooKeeper("127.0.0.1:" + clientPorts[i], ClientBase.CONNECTION_TIMEOUT, this);
+            zk[i] = new ZooKeeper("127.0.0.1:" + oldClientPorts[i],
+                    ClientBase.CONNECTION_TIMEOUT, this);
+        }
+
+        for (int i = 0; i < 2; i++) {
+            Assert.assertTrue("waiting for server " + i + " being up",
+                    ClientBase.waitForServerUp(
+                            "127.0.0.1:" + oldClientPorts[i],
+                            CONNECTION_TIMEOUT * 2));
+        }
+
+        ReconfigTest.testNormalOperation(zk[0], zk[1]);
+
+        // shut them down and then simulate a reboot with a reconfig in progress
+        for (int i = 0; i < 2; i++) {
+            mt[i].shutdown();
+            zk[i].close();
+        }
+
+        for (int i = 0; i < 2; i++) {
+            Assert.assertTrue(
+                    "waiting for server " + i + " being up",
+                    ClientBase.waitForServerDown("127.0.0.1:"
+                            + oldClientPorts[i], CONNECTION_TIMEOUT * 2));
+        }
+
+        for (int i = 0; i < 2; i++) {
+            mt[i].writeTempDynamicConfigFile(nextQuorumCfgSection);
+            mt[i].start();
+            zk[i] = new ZooKeeper("127.0.0.1:" + clientPorts[i],
+                    ClientBase.CONNECTION_TIMEOUT, this);
         }
 
         // new members are initialized with current config + the new server
         for (int i = 2; i < SERVER_COUNT; i++) {
-            mt[i] = new MainThread(i, clientPorts[i], currentQuorumCfgSection + allServersNext.get(i));
+            mt[i] = new MainThread(i, clientPorts[i], currentQuorumCfg
+                    + allServersNext.get(i));
             mt[i].start();
-            zk[i] = new ZooKeeper("127.0.0.1:" + clientPorts[i], ClientBase.CONNECTION_TIMEOUT, this);
+            zk[i] = new ZooKeeper("127.0.0.1:" + clientPorts[i],
+                    ClientBase.CONNECTION_TIMEOUT, this);
         }
 
-        for (int i=0; i<SERVER_COUNT; i++) {
-            Assert.assertTrue("waiting for server "+ i + " being up",
-                ClientBase.waitForServerUp("127.0.0.1:" + clientPorts[i],
-                        CONNECTION_TIMEOUT * 2));
-            ReconfigTest.testServerHasConfig(zk[i], allServersNext, null);  
+        for (int i = 0; i < SERVER_COUNT; i++) {
+            Assert.assertTrue("waiting for server " + i + " being up",
+                    ClientBase.waitForServerUp("127.0.0.1:" + clientPorts[i],
+                            CONNECTION_TIMEOUT * 2));
+            ReconfigTest.testServerHasConfig(zk[i], allServersNext, null);
         }
-        
+
         ReconfigTest.testNormalOperation(zk[0], zk[2]);
         ReconfigTest.testNormalOperation(zk[4], zk[1]);
 
         for (int i = 0; i < SERVER_COUNT; i++) {
             zk[i].close();
-        }
-        for (int i = 0; i < SERVER_COUNT; i++) {
             mt[i].shutdown();
         }
+    }
 
-     }
-    
-    
-    
     /**
-     * Reconfiguration recovery - test that if servers in old config have a .next file
-     * but no quorum of new config is up then no progress should be possible (no progress will happen
-     * to ensure safety as the new config might be actually up but partitioned from old config)
+     * Reconfiguration recovery - test that if servers in old config have a
+     * .next file but no quorum of new config is up then no progress should be
+     * possible (no progress will happen to ensure safety as the new config
+     * might be actually up but partitioned from old config)
      */
     @Test
     public void testNextConfigUnreachable() throws Exception {
         ClientBase.setupTestEnv();
-        
+
         // 2 servers in current config, 5 in next config
         final int SERVER_COUNT = 5;
         final int clientPorts[] = new int[SERVER_COUNT];
@@ -226,57 +251,61 @@ public class ReconfigRecoveryTest extend
         String server;
 
         String currentQuorumCfgSection = null, nextQuorumCfgSection;
-        
+
         ArrayList<String> allServers = new ArrayList<String>();
-        for(int i = 0; i < SERVER_COUNT; i++) {
-               clientPorts[i] = PortAssignment.unique();
-               server = "server." + i + "=localhost:"+PortAssignment.unique()+":"+PortAssignment.unique() + 
-                      ":participant;localhost:" + clientPorts[i];        
-               allServers.add(server);
-               sb.append(server +"\n");
-               if (i == 1) currentQuorumCfgSection = sb.toString();
+        for (int i = 0; i < SERVER_COUNT; i++) {
+            clientPorts[i] = PortAssignment.unique();
+            server = "server." + i + "=localhost:" + PortAssignment.unique()
+                    + ":" + PortAssignment.unique() + ":participant;localhost:"
+                    + clientPorts[i];
+            allServers.add(server);
+            sb.append(server + "\n");
+            if (i == 1)
+                currentQuorumCfgSection = sb.toString() + "version=100000000\n";
         }
-        sb.append("version=200000000\n"); // version of current config is 100000000
+        sb.append("version=200000000\n"); // version of current config is 100000000 
         nextQuorumCfgSection = sb.toString();
-        
-        // lets start servers 2, 3, 4 with the new config
+
         MainThread mt[] = new MainThread[SERVER_COUNT];
         ZooKeeper zk[] = new ZooKeeper[SERVER_COUNT];
 
         // Both servers 0 and 1 will have the .next config file, which means
         // for them that a reconfiguration was in progress when they failed
-        // and the leader will complete it. 
         for (int i = 0; i < 2; i++) {
-            mt[i] = new MainThreadReconfigRecovery(i, clientPorts[i], currentQuorumCfgSection, nextQuorumCfgSection);
+            mt[i] = new MainThread(i, clientPorts[i], currentQuorumCfgSection);
+            // note that we should run the server, shut it down and only then
+            // simulate a reconfig in progress by writing the temp file, but here no
+            // other server is competing with them in FLE, so we can skip this step
+            mt[i].writeTempDynamicConfigFile(nextQuorumCfgSection);
             mt[i].start();
-            zk[i] = new ZooKeeper("127.0.0.1:" + clientPorts[i], ClientBase.CONNECTION_TIMEOUT, this);
+            zk[i] = new ZooKeeper("127.0.0.1:" + clientPorts[i],
+                    ClientBase.CONNECTION_TIMEOUT, this);
         }
 
-        Thread.sleep(CONNECTION_TIMEOUT*2);
-        
-        // make sure servers 0, 1 don't come online
+        Thread.sleep(CONNECTION_TIMEOUT * 2);
+
+        // make sure servers 0, 1 don't come online - this should be the case
+        // since they can't complete the reconfig
         for (int i = 0; i < 2; i++) {
-           Assert.assertFalse("server " +  i + " is up but shouldn't be",
+            Assert.assertFalse("server " + i + " is up but shouldn't be",
                     ClientBase.waitForServerUp("127.0.0.1:" + clientPorts[i],
                             CONNECTION_TIMEOUT / 10));
         }
- 
+
         for (int i = 0; i < 2; i++) {
             zk[i].close();
-        }
-        for (int i = 0; i < 2; i++) {
             mt[i].shutdown();
         }
     }
-    
+
     /**
-     * Reconfiguration recovery - test that old config members will join the new config 
-     * if its already active, and not try to complete the reconfiguration
+     * Reconfiguration recovery - test that old config members will join the new
+     * config if its already active, and not try to complete the reconfiguration
      */
     @Test
     public void testNextConfigAlreadyActive() throws Exception {
         ClientBase.setupTestEnv();
-        
+
         // 2 servers in current config, 5 in next config
         final int SERVER_COUNT = 5;
         final int clientPorts[] = new int[SERVER_COUNT];
@@ -284,75 +313,274 @@ public class ReconfigRecoveryTest extend
         String server;
 
         String currentQuorumCfgSection = null, nextQuorumCfgSection;
-        
+
         ArrayList<String> allServers = new ArrayList<String>();
-        for(int i = 0; i < SERVER_COUNT; i++) {
-               clientPorts[i] = PortAssignment.unique();
-               server = "server." + i + "=localhost:"+PortAssignment.unique()+":"+PortAssignment.unique() + 
-                      ":participant;localhost:" + clientPorts[i];        
-               allServers.add(server);
-               sb.append(server +"\n");
-               if (i == 1) currentQuorumCfgSection = sb.toString();
+        for (int i = 0; i < SERVER_COUNT; i++) {
+            clientPorts[i] = PortAssignment.unique();
+            server = "server." + i + "=localhost:" + PortAssignment.unique()
+                    + ":" + PortAssignment.unique() + ":participant;localhost:"
+                    + clientPorts[i];
+            allServers.add(server);
+            sb.append(server + "\n");
+            if (i == 1) currentQuorumCfgSection = sb.toString() + "version=100000000\n";
         }
         sb.append("version=200000000\n"); // version of current config is 100000000
         nextQuorumCfgSection = sb.toString();
-        
+
         // lets start servers 2, 3, 4 with the new config
         MainThread mt[] = new MainThread[SERVER_COUNT];
         ZooKeeper zk[] = new ZooKeeper[SERVER_COUNT];
         for (int i = 2; i < SERVER_COUNT; i++) {
             mt[i] = new MainThread(i, clientPorts[i], nextQuorumCfgSection);
             mt[i].start();
-            zk[i] = new ZooKeeper("127.0.0.1:" + clientPorts[i], ClientBase.CONNECTION_TIMEOUT, this);
+            zk[i] = new ZooKeeper("127.0.0.1:" + clientPorts[i],
+                    ClientBase.CONNECTION_TIMEOUT, this);
         }
-        for (int i = 2; i < SERVER_COUNT; i++) {            
+        for (int i = 2; i < SERVER_COUNT; i++) {
             Assert.assertTrue("waiting for server " + i + " being up",
-                ClientBase.waitForServerUp("127.0.0.1:" + clientPorts[i],
-                        CONNECTION_TIMEOUT));
+                    ClientBase.waitForServerUp("127.0.0.1:" + clientPorts[i],
+                            CONNECTION_TIMEOUT));
         }
 
         ReconfigTest.testNormalOperation(zk[2], zk[3]);
 
         long epoch = mt[2].main.quorumPeer.getAcceptedEpoch();
-        
+
         // Both servers 0 and 1 will have the .next config file, which means
         // for them that a reconfiguration was in progress when they failed
-        // and the leader will complete it. 
+        // and the leader will complete it.
         for (int i = 0; i < 2; i++) {
-            mt[i] = new MainThreadReconfigRecovery(i, clientPorts[i], currentQuorumCfgSection, nextQuorumCfgSection);
+            mt[i] = new MainThread(i, clientPorts[i], currentQuorumCfgSection);
+            mt[i].writeTempDynamicConfigFile(nextQuorumCfgSection);
             mt[i].start();
-            zk[i] = new ZooKeeper("127.0.0.1:" + clientPorts[i], ClientBase.CONNECTION_TIMEOUT, this);
+            zk[i] = new ZooKeeper("127.0.0.1:" + clientPorts[i],
+                    ClientBase.CONNECTION_TIMEOUT, this);
         }
 
-        
-        // servers 0 and 1 should connect to all servers, including the one in their
-        // .next file during startup, and will find the next config and join it
+        // servers 0 and 1 should connect to all servers, including the one in
+        // their .next file during startup, and will find the next config and join it
         for (int i = 0; i < 2; i++) {
-           Assert.assertTrue("waiting for server " +  i + " being up",
+            Assert.assertTrue("waiting for server " + i + " being up",
                     ClientBase.waitForServerUp("127.0.0.1:" + clientPorts[i],
-                            CONNECTION_TIMEOUT*2));
+                            CONNECTION_TIMEOUT * 2));
         }
-        
+
         // make sure they joined the new config without any change to it
         Assert.assertEquals(epoch, mt[0].main.quorumPeer.getAcceptedEpoch());
         Assert.assertEquals(epoch, mt[1].main.quorumPeer.getAcceptedEpoch());
         Assert.assertEquals(epoch, mt[2].main.quorumPeer.getAcceptedEpoch());
-        
+
         ReconfigTest.testServerHasConfig(zk[0], allServers, null);
         ReconfigTest.testServerHasConfig(zk[1], allServers, null);
-        
+
         ReconfigTest.testNormalOperation(zk[0], zk[2]);
         ReconfigTest.testNormalOperation(zk[4], zk[1]);
 
-       
         for (int i = 0; i < SERVER_COUNT; i++) {
             zk[i].close();
+            mt[i].shutdown();
         }
+    }
+
+    /**
+     * Tests conversion of observer to participant AFTER new config was already
+     * committed. Old config: servers 0 (participant), 1 (participant), 2
+     * (observer) New config: servers 2 (participant), 3 (participant) We start
+     * server 2 with old config and start server 3 with new config. All other
+     * servers are down. In order to terminate FLE, server 3 must 'convince'
+     * server 2 to adopt the new config and turn into a participant.
+     */
+    @Test
+    public void testObserverConvertedToParticipantDuringFLE() throws Exception {
+        ClientBase.setupTestEnv();
+
+        final int SERVER_COUNT = 4;
+        int[][] ports = generatePorts(SERVER_COUNT);
+        String currentQuorumCfgSection, nextQuorumCfgSection;
+
+        // generate old config string
+        HashSet<Integer> observers = new HashSet<Integer>();
+        observers.add(2);
+        StringBuilder sb = generateConfig(3, ports, observers);
+        sb.append("version=100000000");
+        currentQuorumCfgSection = sb.toString();
+
+        // generate new config string
+        ArrayList<String> allServersNext = new ArrayList<String>();
+        sb = new StringBuilder();
+        for (int i = 2; i < SERVER_COUNT; i++) {
+            String server = "server." + i + "=localhost:" + ports[i][0] + ":"
+                    + ports[i][1] + ":participant;localhost:" + ports[i][2];
+            allServersNext.add(server);
+            sb.append(server + "\n");
+        }
+        sb.append("version=200000000"); // version of current config is 100000000
+        nextQuorumCfgSection = sb.toString();
+
+        MainThread mt[] = new MainThread[SERVER_COUNT];
+        ZooKeeper zk[] = new ZooKeeper[SERVER_COUNT];
+
+        // start server 2 with old config, where it is an observer
+        mt[2] = new MainThread(2, ports[2][2], currentQuorumCfgSection);
+        mt[2].start();
+        zk[2] = new ZooKeeper("127.0.0.1:" + ports[2][2],
+                ClientBase.CONNECTION_TIMEOUT, this);
+
+        // start server 3 with new config
+        mt[3] = new MainThread(3, ports[3][2], nextQuorumCfgSection);
+        mt[3].start();
+        zk[3] = new ZooKeeper("127.0.0.1:" + ports[3][2],
+                ClientBase.CONNECTION_TIMEOUT, this);
+
+        for (int i = 2; i < SERVER_COUNT; i++) {
+            Assert.assertTrue("waiting for server " + i + " being up",
+                    ClientBase.waitForServerUp("127.0.0.1:" + ports[i][2],
+                            CONNECTION_TIMEOUT * 2));
+            ReconfigTest.testServerHasConfig(zk[i], allServersNext, null);
+        }
+
+        Assert.assertEquals(nextQuorumCfgSection,
+                ReconfigTest.testServerHasConfig(zk[2], null, null));
+        Assert.assertEquals(nextQuorumCfgSection,
+                ReconfigTest.testServerHasConfig(zk[3], null, null));
+        ReconfigTest.testNormalOperation(zk[2], zk[2]);
+        ReconfigTest.testNormalOperation(zk[3], zk[2]);
+
+        for (int i = 2; i < SERVER_COUNT; i++) {
+            zk[i].close();
+            mt[i].shutdown();
+        }
+    }
+
+    /**
+     * Tests conversion of observer to participant during reconfig recovery, new
+     * config was not committed yet. Old config: servers 0 (participant), 1
+     * (participant), 2 (observer) New config: servers 2 (participant), 3
+     * (participant) We start server servers 0, 1, 2 with old config and a .next
+     * file indicating a reconfig in progress. We start server 3 with old config
+     * + itself in config file. In this scenario server 2 can't be converted to
+     * participant during reconfig since we don't gossip about proposed
+     * configurations, only about committed ones. This tests that new config can
+     * be completed, which requires server 2's ack for the newleader message,
+     * even though its an observer.
+     */
+    @Test
+    public void testCurrentObserverIsParticipantInNewConfig() throws Exception {
+        ClientBase.setupTestEnv();
+
+        final int SERVER_COUNT = 4;
+        int[][] ports = generatePorts(SERVER_COUNT);
+        String currentQuorumCfg, currentQuorumCfgSection, nextQuorumCfgSection;
+
+        // generate old config string
+        HashSet<Integer> observers = new HashSet<Integer>();
+        observers.add(2);
+
+        StringBuilder sb = generateConfig(3, ports, observers);
+        currentQuorumCfg = sb.toString();
+        sb.append("version=100000000");
+        currentQuorumCfgSection = sb.toString();
+
+        // Run servers 0..2 for a while
+        MainThread mt[] = new MainThread[SERVER_COUNT];
+        ZooKeeper zk[] = new ZooKeeper[SERVER_COUNT];
+        for (int i = 0; i <= 2; i++) {
+            mt[i] = new MainThread(i, ports[i][2], currentQuorumCfgSection);
+            mt[i].start();
+            zk[i] = new ZooKeeper("127.0.0.1:" + ports[i][2],
+                    ClientBase.CONNECTION_TIMEOUT, this);
+        }
+
+        ReconfigTest.testNormalOperation(zk[0], zk[2]);
+
+        for (int i = 0; i <= 2; i++) {
+            Assert.assertTrue("waiting for server " + i + " being up",
+                    ClientBase.waitForServerUp("127.0.0.1:" + ports[i][2],
+                            CONNECTION_TIMEOUT * 2));
+        }
+
+        // shut servers 0..2 down
+        for (int i = 0; i <= 2; i++) {
+            mt[i].shutdown();
+            zk[i].close();
+        }
+
+        // generate new config string
+        ArrayList<String> allServersNext = new ArrayList<String>();
+        sb = new StringBuilder();
+        for (int i = 2; i < SERVER_COUNT; i++) {
+            String server = "server." + i + "=localhost:" + ports[i][0] + ":"
+                    + ports[i][1] + ":participant;localhost:" + ports[i][2];
+            allServersNext.add(server);
+            sb.append(server + "\n");
+        }
+        sb.append("version=200000000"); // version of current config is 100000000
+        nextQuorumCfgSection = sb.toString();
+
+        // simulate reconfig in progress - servers 0..2 have a temp reconfig
+        // file when they boot
+        for (int i = 0; i <= 2; i++) {
+            mt[i].writeTempDynamicConfigFile(nextQuorumCfgSection);
+            mt[i].start();
+            zk[i] = new ZooKeeper("127.0.0.1:" + ports[i][2],
+                    ClientBase.CONNECTION_TIMEOUT, this);
+        }
+        // new server 3 has still its invalid joiner config - everyone in old
+        // config + itself
+        mt[3] = new MainThread(3, ports[3][2], currentQuorumCfg
+                + allServersNext.get(1));
+        mt[3].start();
+        zk[3] = new ZooKeeper("127.0.0.1:" + ports[3][2],
+                ClientBase.CONNECTION_TIMEOUT, this);
+
+        for (int i = 2; i < SERVER_COUNT; i++) {
+            Assert.assertTrue("waiting for server " + i + " being up",
+                    ClientBase.waitForServerUp("127.0.0.1:" + ports[i][2],
+                            CONNECTION_TIMEOUT * 2));
+            ReconfigTest.testServerHasConfig(zk[i], allServersNext, null);
+        }
+
+        ReconfigTest.testNormalOperation(zk[0], zk[2]);
+        ReconfigTest.testNormalOperation(zk[3], zk[1]);
+        Assert.assertEquals(nextQuorumCfgSection,
+                ReconfigTest.testServerHasConfig(zk[2], null, null));
+        Assert.assertEquals(nextQuorumCfgSection,
+                ReconfigTest.testServerHasConfig(zk[3], null, null));
+
         for (int i = 0; i < SERVER_COUNT; i++) {
+            zk[i].close();
             mt[i].shutdown();
         }
     }
-    
- 
-    
-}
+
+    /*
+     * Generates 3 ports per server
+     */
+    private int[][] generatePorts(int numServers) {
+        int[][] ports = new int[numServers][];
+        for (int i = 0; i < numServers; i++) {
+            ports[i] = new int[3];
+            for (int j = 0; j < 3; j++) {
+                ports[i][j] = PortAssignment.unique();
+            }
+        }
+        return ports;
+    }
+
+    /*
+     * Creates a configuration string for servers 0..numServers-1 Ids in
+     * observerIds correspond to observers, other ids are for participants.
+     */
+    private StringBuilder generateConfig(int numServers, int[][] ports,
+            HashSet<Integer> observerIds) {
+        StringBuilder sb = new StringBuilder();
+        for (int i = 0; i < numServers; i++) {
+            String server = "server." + i + "=localhost:" + ports[i][0] + ":"
+                    + ports[i][1] + ":"
+                    + (observerIds.contains(i) ? "observer" : "participant")
+                    + ";localhost:" + ports[i][2];
+            sb.append(server + "\n");
+        }
+        return sb;
+    }
+}
\ No newline at end of file