You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by fp...@apache.org on 2013/11/16 11:06:20 UTC

svn commit: r1542489 - in /zookeeper/branches/branch-3.4: ./ src/java/main/org/apache/zookeeper/server/quorum/ src/java/test/org/apache/zookeeper/server/quorum/ src/java/test/org/apache/zookeeper/test/

Author: fpj
Date: Sat Nov 16 10:06:19 2013
New Revision: 1542489

URL: http://svn.apache.org/r1542489
Log:
ZOOKEEPER-1808. Add version to FLE notifications for 3.4 branch (fpj)


Added:
    zookeeper/branches/branch-3.4/src/java/test/org/apache/zookeeper/server/quorum/FLEBackwardElectionRoundTest.java
    zookeeper/branches/branch-3.4/src/java/test/org/apache/zookeeper/server/quorum/FLECompatibilityTest.java
    zookeeper/branches/branch-3.4/src/java/test/org/apache/zookeeper/server/quorum/FLELostMessageTest.java
    zookeeper/branches/branch-3.4/src/java/test/org/apache/zookeeper/server/quorum/FLETestUtils.java
Removed:
    zookeeper/branches/branch-3.4/src/java/test/org/apache/zookeeper/test/FLEBackwardElectionRoundTest.java
    zookeeper/branches/branch-3.4/src/java/test/org/apache/zookeeper/test/FLELostMessageTest.java
    zookeeper/branches/branch-3.4/src/java/test/org/apache/zookeeper/test/FLETestUtils.java
Modified:
    zookeeper/branches/branch-3.4/CHANGES.txt
    zookeeper/branches/branch-3.4/src/java/main/org/apache/zookeeper/server/quorum/FastLeaderElection.java
    zookeeper/branches/branch-3.4/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java
    zookeeper/branches/branch-3.4/src/java/main/org/apache/zookeeper/server/quorum/Vote.java
    zookeeper/branches/branch-3.4/src/java/test/org/apache/zookeeper/server/quorum/CnxManagerTest.java
    zookeeper/branches/branch-3.4/src/java/test/org/apache/zookeeper/test/LENonTerminateTest.java

Modified: zookeeper/branches/branch-3.4/CHANGES.txt
URL: http://svn.apache.org/viewvc/zookeeper/branches/branch-3.4/CHANGES.txt?rev=1542489&r1=1542488&r2=1542489&view=diff
==============================================================================
--- zookeeper/branches/branch-3.4/CHANGES.txt (original)
+++ zookeeper/branches/branch-3.4/CHANGES.txt Sat Nov 16 10:06:19 2013
@@ -155,6 +155,8 @@ BUGFIXES:
   ZOOKEEPER-1798. Fix race condition in testNormalObserverRun
   (thawan, fpj via thawan)
 
+  ZOOKEEPER-1808. Add version to FLE notifications for 3.4 branch (fpj)
+  
   ZOOKEEPER-1812. ZooInspector reconnection always fails if first
   connection fails (Benjamin Jaton via phunt)
 

Modified: zookeeper/branches/branch-3.4/src/java/main/org/apache/zookeeper/server/quorum/FastLeaderElection.java
URL: http://svn.apache.org/viewvc/zookeeper/branches/branch-3.4/src/java/main/org/apache/zookeeper/server/quorum/FastLeaderElection.java?rev=1542489&r1=1542488&r2=1542489&view=diff
==============================================================================
--- zookeeper/branches/branch-3.4/src/java/main/org/apache/zookeeper/server/quorum/FastLeaderElection.java (original)
+++ zookeeper/branches/branch-3.4/src/java/main/org/apache/zookeeper/server/quorum/FastLeaderElection.java Sat Nov 16 10:06:19 2013
@@ -93,6 +93,13 @@ public class FastLeaderElection implemen
 
     static public class Notification {
         /*
+         * Format version, introduced in 3.4.6
+         */
+        
+        public final static int CURRENTVERSION = 0x1; 
+        int version;
+                
+        /*
          * Proposed leader
          */
         long leader;
@@ -121,6 +128,39 @@ public class FastLeaderElection implemen
          * epoch of the proposed leader
          */
         long peerEpoch;
+        
+        @Override
+        public String toString() {
+            return new String(Long.toHexString(version) + " (message format version), " 
+                    + leader + " (n.leader), 0x"
+                    + Long.toHexString(zxid) + " (n.zxid), 0x"
+                    + Long.toHexString(electionEpoch) + " (n.round), " + state
+                    + " (n.state), " + sid + " (n.sid), 0x"
+                    + Long.toHexString(peerEpoch) + " (n.peerEpoch) ");
+        }
+    }
+    
+    static ByteBuffer buildMsg(int state,
+            long leader,
+            long zxid,
+            long electionEpoch,
+            long epoch) {
+        byte requestBytes[] = new byte[40];
+        ByteBuffer requestBuffer = ByteBuffer.wrap(requestBytes);
+
+        /*
+         * Building notification packet to send 
+         */
+
+        requestBuffer.clear();
+        requestBuffer.putInt(state);
+        requestBuffer.putLong(leader);
+        requestBuffer.putLong(zxid);
+        requestBuffer.putLong(electionEpoch);
+        requestBuffer.putLong(epoch);
+        requestBuffer.putInt(Notification.CURRENTVERSION);
+        
+        return requestBuffer;
     }
 
     /**
@@ -188,7 +228,7 @@ public class FastLeaderElection implemen
      * spawns a new thread.
      */
 
-    private class Messenger {
+    protected class Messenger {
 
         /**
          * Receives messages from instance of QuorumCnxManager on
@@ -250,6 +290,9 @@ public class FastLeaderElection implemen
                             boolean backCompatibility = (response.buffer.capacity() == 28);
                             response.buffer.clear();
 
+                            // Instantiate Notification and set its attributes
+                            Notification n = new Notification();
+                            
                             // State of peer that sent this message
                             QuorumPeer.ServerState ackstate = QuorumPeer.ServerState.LOOKING;
                             switch (response.buffer.getInt()) {
@@ -265,10 +308,10 @@ public class FastLeaderElection implemen
                             case 3:
                                 ackstate = QuorumPeer.ServerState.OBSERVING;
                                 break;
+                            default:
+                                continue;
                             }
-
-                            // Instantiate Notification and set its attributes
-                            Notification n = new Notification();
+                            
                             n.leader = response.buffer.getLong();
                             n.zxid = response.buffer.getLong();
                             n.electionEpoch = response.buffer.getLong();
@@ -284,6 +327,12 @@ public class FastLeaderElection implemen
                             }
 
                             /*
+                             * Version added in 3.4.6
+                             */
+
+                            n.version = (response.buffer.remaining() >= 4) ? response.buffer.getInt() : 0x0;
+
+                            /*
                              * Print notification info
                              */
                             if(LOG.isInfoEnabled()){
@@ -383,23 +432,13 @@ public class FastLeaderElection implemen
              *
              * @param m     message to send
              */
-            private void process(ToSend m) {
-                byte requestBytes[] = new byte[36];
-                ByteBuffer requestBuffer = ByteBuffer.wrap(requestBytes);
-
-                /*
-                 * Building notification packet to send
-                 */
-
-                requestBuffer.clear();
-                requestBuffer.putInt(m.state.ordinal());
-                requestBuffer.putLong(m.leader);
-                requestBuffer.putLong(m.zxid);
-                requestBuffer.putLong(m.electionEpoch);
-                requestBuffer.putLong(m.peerEpoch);
-
+            void process(ToSend m) {
+                ByteBuffer requestBuffer = buildMsg(m.state.ordinal(), 
+                                                        m.leader,
+                                                        m.zxid, 
+                                                        m.electionEpoch, 
+                                                        m.peerEpoch);
                 manager.toSend(m.sid, requestBuffer);
-
             }
         }
 
@@ -547,11 +586,7 @@ public class FastLeaderElection implemen
 
 
     private void printNotification(Notification n){
-        LOG.info("Notification: " + n.leader + " (n.leader), 0x"
-                + Long.toHexString(n.zxid) + " (n.zxid), 0x"
-                + Long.toHexString(n.electionEpoch) + " (n.round), " + n.state
-                + " (n.state), " + n.sid + " (n.sid), 0x"
-                + Long.toHexString(n.peerEpoch) + " (n.peerEPoch), "
+        LOG.info("Notification: " + n.toString()
                 + self.getPeerState() + " (my state)");
     }
 

Modified: zookeeper/branches/branch-3.4/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java
URL: http://svn.apache.org/viewvc/zookeeper/branches/branch-3.4/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java?rev=1542489&r1=1542488&r2=1542489&view=diff
==============================================================================
--- zookeeper/branches/branch-3.4/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java (original)
+++ zookeeper/branches/branch-3.4/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java Sat Nov 16 10:06:19 2013
@@ -72,7 +72,7 @@ public class QuorumCnxManager {
     // stale notifications to peers
     static final int SEND_CAPACITY = 1;
 
-    static final int PACKETMAXSIZE = 1024 * 1024; 
+    static final int PACKETMAXSIZE = 1024 * 512; 
     /*
      * Maximum number of attempts to connect to a peer
      */
@@ -129,6 +129,7 @@ public class QuorumCnxManager {
     private AtomicInteger threadCnt = new AtomicInteger(0);
 
     static public class Message {
+        
         Message(ByteBuffer buffer, long sid) {
             this.buffer = buffer;
             this.sid = sid;

Modified: zookeeper/branches/branch-3.4/src/java/main/org/apache/zookeeper/server/quorum/Vote.java
URL: http://svn.apache.org/viewvc/zookeeper/branches/branch-3.4/src/java/main/org/apache/zookeeper/server/quorum/Vote.java?rev=1542489&r1=1542488&r2=1542489&view=diff
==============================================================================
--- zookeeper/branches/branch-3.4/src/java/main/org/apache/zookeeper/server/quorum/Vote.java (original)
+++ zookeeper/branches/branch-3.4/src/java/main/org/apache/zookeeper/server/quorum/Vote.java Sat Nov 16 10:06:19 2013
@@ -23,7 +23,9 @@ import org.apache.zookeeper.server.quoru
 
 public class Vote {
     
-    public Vote(long id, long zxid) {
+    public Vote(long id, 
+                    long zxid) {
+        this.version = 0x0;
         this.id = id;
         this.zxid = zxid;
         this.electionEpoch = -1;
@@ -31,7 +33,10 @@ public class Vote {
         this.state = ServerState.LOOKING;
     }
     
-    public Vote(long id, long zxid, long peerEpoch) {
+    public Vote(long id, 
+                    long zxid, 
+                    long peerEpoch) {
+        this.version = 0x0;
         this.id = id;
         this.zxid = zxid;
         this.electionEpoch = -1;
@@ -39,7 +44,11 @@ public class Vote {
         this.state = ServerState.LOOKING;
     }
 
-    public Vote(long id, long zxid, long electionEpoch, long peerEpoch) {
+    public Vote(long id, 
+                    long zxid, 
+                    long electionEpoch, 
+                    long peerEpoch) {
+        this.version = 0x0;
         this.id = id;
         this.zxid = zxid;
         this.electionEpoch = electionEpoch;
@@ -47,7 +56,13 @@ public class Vote {
         this.state = ServerState.LOOKING;
     }
     
-    public Vote(long id, long zxid, long electionEpoch, long peerEpoch, ServerState state) {
+    public Vote(int version,
+                    long id, 
+                    long zxid, 
+                    long electionEpoch, 
+                    long peerEpoch, 
+                    ServerState state) {
+        this.version = version;
         this.id = id;
         this.zxid = zxid;
         this.electionEpoch = electionEpoch;
@@ -55,6 +70,21 @@ public class Vote {
         this.peerEpoch = peerEpoch;
     }
     
+    public Vote(long id, 
+                    long zxid, 
+                    long electionEpoch, 
+                    long peerEpoch, 
+                    ServerState state) {
+        this.id = id;
+        this.zxid = zxid;
+        this.electionEpoch = electionEpoch;
+        this.state = state;
+        this.peerEpoch = peerEpoch;
+        this.version = 0x0;
+    }
+    
+    final private int version;
+    
     final private long id;
     
     final private long zxid;
@@ -63,6 +93,10 @@ public class Vote {
     
     final private long peerEpoch;
     
+    public int getVersion() {
+        return version;
+    }
+    
     public long getId() {
         return id;
     }
@@ -91,10 +125,13 @@ public class Vote {
             return false;
         }
         Vote other = (Vote) o;
-        return (id == other.id && zxid == other.zxid && electionEpoch == other.electionEpoch && peerEpoch == other.peerEpoch);
+        return (id == other.id
+                    && zxid == other.zxid
+                    && electionEpoch == other.electionEpoch
+                    && peerEpoch == other.peerEpoch);
 
     }
-
+    
     @Override
     public int hashCode() {
         return (int) (id & zxid);

Modified: zookeeper/branches/branch-3.4/src/java/test/org/apache/zookeeper/server/quorum/CnxManagerTest.java
URL: http://svn.apache.org/viewvc/zookeeper/branches/branch-3.4/src/java/test/org/apache/zookeeper/server/quorum/CnxManagerTest.java?rev=1542489&r1=1542488&r2=1542489&view=diff
==============================================================================
--- zookeeper/branches/branch-3.4/src/java/test/org/apache/zookeeper/server/quorum/CnxManagerTest.java (original)
+++ zookeeper/branches/branch-3.4/src/java/test/org/apache/zookeeper/server/quorum/CnxManagerTest.java Sat Nov 16 10:06:19 2013
@@ -74,20 +74,7 @@ public class CnxManagerTest extends ZKTe
     }
 
     ByteBuffer createMsg(int state, long leader, long zxid, long epoch){
-        byte requestBytes[] = new byte[28];
-        ByteBuffer requestBuffer = ByteBuffer.wrap(requestBytes);
-
-        /*
-         * Building notification packet to send
-         */
-
-        requestBuffer.clear();
-        requestBuffer.putInt(state);
-        requestBuffer.putLong(leader);
-        requestBuffer.putLong(zxid);
-        requestBuffer.putLong(epoch);
-
-        return requestBuffer;
+        return FastLeaderElection.buildMsg(state, leader, zxid, 0, epoch);
     }
 
     class CnxManagerThread extends Thread {

Added: zookeeper/branches/branch-3.4/src/java/test/org/apache/zookeeper/server/quorum/FLEBackwardElectionRoundTest.java
URL: http://svn.apache.org/viewvc/zookeeper/branches/branch-3.4/src/java/test/org/apache/zookeeper/server/quorum/FLEBackwardElectionRoundTest.java?rev=1542489&view=auto
==============================================================================
--- zookeeper/branches/branch-3.4/src/java/test/org/apache/zookeeper/server/quorum/FLEBackwardElectionRoundTest.java (added)
+++ zookeeper/branches/branch-3.4/src/java/test/org/apache/zookeeper/server/quorum/FLEBackwardElectionRoundTest.java Sat Nov 16 10:06:19 2013
@@ -0,0 +1,155 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.quorum;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.zookeeper.PortAssignment;
+import org.apache.zookeeper.ZKTestCase;
+import org.apache.zookeeper.server.quorum.QuorumCnxManager;
+import org.apache.zookeeper.server.quorum.QuorumPeer;
+import org.apache.zookeeper.server.quorum.Vote;
+import org.apache.zookeeper.server.quorum.QuorumPeer.QuorumServer;
+import org.apache.zookeeper.server.quorum.QuorumPeer.ServerState;
+import org.apache.zookeeper.test.ClientBase;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+
+public class FLEBackwardElectionRoundTest extends ZKTestCase {
+    protected static final Logger LOG = LoggerFactory.getLogger(FLELostMessageTest.class);
+    
+    int count;
+    HashMap<Long,QuorumServer> peers;
+    File tmpdir[];
+    int port[];
+
+    QuorumCnxManager cnxManagers[];
+
+    @Before
+    public void setUp() throws Exception {
+        count = 3;
+
+        peers = new HashMap<Long,QuorumServer>(count);
+        tmpdir = new File[count];
+        port = new int[count];
+        cnxManagers = new QuorumCnxManager[count - 1];
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        for(int i = 0; i < (count - 1); i++){
+            if(cnxManagers[i] != null){
+                cnxManagers[i].halt();
+            }
+        }
+    }
+    
+    /**
+     * This test is checking the following case. A server S is
+     * currently LOOKING and it receives notifications from 
+     * a quorum indicating they are following S. The election
+     * round E of S is higher than the election round E' in the 
+     * notification messages, so S becomes the leader and sets
+     * its epoch back to E'. In the meanwhile, one or more
+     * followers turn to LOOKING and elect S in election round E.
+     * Having leader and followers with different election rounds
+     * might prevent other servers from electing a leader because
+     * they can't get a consistent set of notifications from a 
+     * quorum. 
+     * 
+     * {@link https://issues.apache.org/jira/browse/ZOOKEEPER-1514}
+     *    
+     * 
+     * @throws Exception
+     */
+    
+    @Test
+    public void testBackwardElectionRound() throws Exception {
+        LOG.info("TestLE: " + getTestName()+ ", " + count);
+        for(int i = 0; i < count; i++) {
+            int clientport = PortAssignment.unique();
+            peers.put(Long.valueOf(i),
+                    new QuorumServer(i,
+                            new InetSocketAddress(clientport),
+                            new InetSocketAddress(PortAssignment.unique())));
+            tmpdir[i] = ClientBase.createTmpDir();
+            port[i] = clientport;
+        }
+
+        /*
+         * Start server 0
+         */
+
+        QuorumPeer peer = new QuorumPeer(peers, tmpdir[0], tmpdir[0], port[0], 3, 0, 1000, 2, 2);
+        peer.startLeaderElection();
+        FLETestUtils.LEThread thread = new FLETestUtils.LEThread(peer, 0);
+        thread.start();  
+        
+        
+        /*
+         * Start mock server 1
+         */
+        QuorumPeer mockPeer = new QuorumPeer(peers, tmpdir[1], tmpdir[1], port[1], 3, 1, 1000, 2, 2);
+        cnxManagers[0] = new QuorumCnxManager(mockPeer);
+        QuorumCnxManager.Listener listener = cnxManagers[0].listener;
+        listener.start();
+
+        ByteBuffer msg = FLETestUtils.createMsg(ServerState.FOLLOWING.ordinal(), 0, 0, 1);
+        cnxManagers[0].toSend(0l, msg);
+        
+        /*
+         * Start mock server 2
+         */
+        mockPeer = new QuorumPeer(peers, tmpdir[2], tmpdir[2], port[2], 3, 2, 1000, 2, 2);
+        cnxManagers[1] = new QuorumCnxManager(mockPeer);
+        listener = cnxManagers[1].listener;
+        listener.start();
+
+        cnxManagers[1].toSend(0l, msg);
+        
+        /*
+         * Run another instance of leader election.
+         */
+        thread.join(5000);
+        thread = new FLETestUtils.LEThread(peer, 0);
+        thread.start();
+        
+        /*
+         * Send the same messages, this time should not make 0 the leader.
+         */
+        cnxManagers[0].toSend(0l, msg);
+        cnxManagers[1].toSend(0l, msg);
+        
+        
+        thread.join(5000);
+        
+        if (!thread.isAlive()) {
+            Assert.fail("Should not have joined");
+        }
+        
+    }
+}
\ No newline at end of file

Added: zookeeper/branches/branch-3.4/src/java/test/org/apache/zookeeper/server/quorum/FLECompatibilityTest.java
URL: http://svn.apache.org/viewvc/zookeeper/branches/branch-3.4/src/java/test/org/apache/zookeeper/server/quorum/FLECompatibilityTest.java?rev=1542489&view=auto
==============================================================================
--- zookeeper/branches/branch-3.4/src/java/test/org/apache/zookeeper/server/quorum/FLECompatibilityTest.java (added)
+++ zookeeper/branches/branch-3.4/src/java/test/org/apache/zookeeper/server/quorum/FLECompatibilityTest.java Sat Nov 16 10:06:19 2013
@@ -0,0 +1,350 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.quorum;
+
+import java.io.File;
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.zookeeper.PortAssignment;
+import org.apache.zookeeper.ZKTestCase;
+import org.apache.zookeeper.server.quorum.FastLeaderElection.Notification;
+import org.apache.zookeeper.server.quorum.FastLeaderElection.ToSend;
+import org.apache.zookeeper.server.quorum.FastLeaderElection.Messenger.WorkerReceiver;
+import org.apache.zookeeper.server.quorum.QuorumCnxManager.Message;
+import org.apache.zookeeper.server.quorum.QuorumPeer.QuorumServer;
+import org.apache.zookeeper.server.quorum.QuorumPeer.ServerState;
+import org.apache.zookeeper.server.util.ZxidUtils;
+import org.apache.zookeeper.test.ClientBase;
+import org.apache.zookeeper.test.FLETest;
+import org.apache.zookeeper.test.QuorumBase;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class FLECompatibilityTest extends ZKTestCase {
+    protected static final Logger LOG = LoggerFactory.getLogger(FLECompatibilityTest.class);
+
+    int count;
+    HashMap<Long,QuorumServer> peers;
+    File tmpdir[];
+    int port[];
+    
+    @Before
+    public void setUp() throws Exception {
+        count = 3;
+        peers = new HashMap<Long,QuorumServer>(count);
+        tmpdir = new File[count];
+        port = new int[count];
+    }
+    
+    @After
+    public void tearDown() throws Exception {
+        
+    }
+    
+    class MockFLEMessengerBackward {   
+        QuorumCnxManager manager;
+        QuorumPeer self;
+        long logicalclock = 1L;
+        LinkedBlockingQueue<ToSend> sendqueue = new LinkedBlockingQueue<ToSend>();
+        LinkedBlockingQueue<ToSend> internalqueue = new LinkedBlockingQueue<ToSend>();
+        LinkedBlockingQueue<Notification> recvqueue = new LinkedBlockingQueue<Notification>();
+        WorkerReceiver wr;
+        
+        MockFLEMessengerBackward(QuorumPeer self, QuorumCnxManager manager){
+            this.manager = manager;
+            this.self = self;
+            
+            this.wr = new WorkerReceiver(manager);
+
+            Thread t = new Thread(this.wr,
+                    "WorkerReceiver[myid=" + self.getId() + "]");
+            t.setDaemon(true);
+            t.start();
+        }
+        
+        void halt() {
+            wr.stop = true;
+        }
+        
+        /*
+         * This class has been copied from before adding versions to notifications.
+         * 
+         * {@see https://issues.apache.org/jira/browse/ZOOKEEPER-1808}
+         */
+        class WorkerReceiver implements Runnable {
+            volatile boolean stop;
+            QuorumCnxManager manager;
+            final long proposedLeader = 2;
+            final long proposedZxid = 0x1;
+            final long proposedEpoch = 1;
+
+            WorkerReceiver(QuorumCnxManager manager) {
+                this.stop = false;
+                this.manager = manager;
+            }
+
+            /*
+             * The vote we return here is fixed for test purposes.
+             */
+            Vote getVote(){
+                return new Vote(proposedLeader, proposedZxid, proposedEpoch);
+            }
+            
+            public void run() {
+
+                Message response;
+                while (!stop) {
+                    // Sleeps on receive
+                    try{
+                        response = manager.pollRecvQueue(3000, TimeUnit.MILLISECONDS);
+                        if(response == null) continue;
+
+                        /*
+                         * If it is from an observer, respond right away.
+                         * Note that the following predicate assumes that
+                         * if a server is not a follower, then it must be
+                         * an observer. If we ever have any other type of
+                         * learner in the future, we'll have to change the
+                         * way we check for observers.
+                         */
+                        if(!self.getVotingView().containsKey(response.sid)){
+                            Vote current = self.getCurrentVote();
+                            ToSend notmsg = new ToSend(ToSend.mType.notification,
+                                    current.getId(),
+                                    current.getZxid(),
+                                    logicalclock,
+                                    self.getPeerState(),
+                                    response.sid,
+                                    current.getPeerEpoch());
+
+                            internalqueue.offer(notmsg);
+                        } else {
+                            // Receive new message
+                            if (LOG.isDebugEnabled()) {
+                                LOG.debug("Receive new notification message. My id = "
+                                        + self.getId());
+                            }
+
+                            /*
+                             * We check for 28 bytes for backward compatibility
+                             */
+                            if (response.buffer.capacity() < 28) {
+                                LOG.error("Got a short response: "
+                                        + response.buffer.capacity());
+                                continue;
+                            }
+                            boolean backCompatibility = (response.buffer.capacity() == 28);
+                            response.buffer.clear();
+
+                            // State of peer that sent this message
+                            QuorumPeer.ServerState ackstate = QuorumPeer.ServerState.LOOKING;
+                            switch (response.buffer.getInt()) {
+                            case 0:
+                                ackstate = QuorumPeer.ServerState.LOOKING;
+                                break;
+                            case 1:
+                                ackstate = QuorumPeer.ServerState.FOLLOWING;
+                                break;
+                            case 2:
+                                ackstate = QuorumPeer.ServerState.LEADING;
+                                break;
+                            case 3:
+                                ackstate = QuorumPeer.ServerState.OBSERVING;
+                                break;
+                            }
+
+                            // Instantiate Notification and set its attributes
+                            Notification n = new Notification();
+                            n.leader = response.buffer.getLong();
+                            n.zxid = response.buffer.getLong();
+                            n.electionEpoch = response.buffer.getLong();
+                            n.state = ackstate;
+                            n.sid = response.sid;
+                            if(!backCompatibility){
+                                n.peerEpoch = response.buffer.getLong();
+                            } else {
+                                if(LOG.isInfoEnabled()){
+                                    LOG.info("Backward compatibility mode, server id=" + n.sid);
+                                }
+                                n.peerEpoch = ZxidUtils.getEpochFromZxid(n.zxid);
+                            }
+
+                            /*
+                             * If this server is looking, then send proposed leader
+                             */
+
+                            if(self.getPeerState() == QuorumPeer.ServerState.LOOKING){
+                                recvqueue.offer(n);
+
+                                /*
+                                 * Send a notification back if the peer that sent this
+                                 * message is also looking and its logical clock is
+                                 * lagging behind.
+                                 */
+                                if((ackstate == QuorumPeer.ServerState.LOOKING)
+                                        && (n.electionEpoch < logicalclock)){
+                                    Vote v = getVote();
+                                    ToSend notmsg = new ToSend(ToSend.mType.notification,
+                                            v.getId(),
+                                            v.getZxid(),
+                                            logicalclock,
+                                            self.getPeerState(),
+                                            response.sid,
+                                            v.getPeerEpoch());
+                                    internalqueue.offer(notmsg);
+                                }
+                            } else {
+                                /*
+                                 * If this server is not looking, but the one that sent the ack
+                                 * is looking, then send back what it believes to be the leader.
+                                 */
+                                Vote current = self.getCurrentVote();
+                                if(ackstate == QuorumPeer.ServerState.LOOKING){
+                                    if(LOG.isDebugEnabled()){
+                                        LOG.debug("Sending new notification. My id =  " +
+                                                self.getId() + " recipient=" +
+                                                response.sid + " zxid=0x" +
+                                                Long.toHexString(current.getZxid()) +
+                                                " leader=" + current.getId());
+                                    }
+                                    ToSend notmsg = new ToSend(
+                                            ToSend.mType.notification,
+                                            current.getId(),
+                                            current.getZxid(),
+                                            current.getElectionEpoch(),
+                                            self.getPeerState(),
+                                            response.sid,
+                                            current.getPeerEpoch());
+                                    internalqueue.offer(notmsg);
+                                }
+                            }
+                        }
+                    } catch (InterruptedException e) {
+                        System.out.println("Interrupted Exception while waiting for new message" +
+                                e.toString());
+                    }
+                }
+                LOG.info("WorkerReceiver is down");
+            }
+        }
+    }
+    
+    class MockFLEMessengerForward extends FastLeaderElection {
+        
+        MockFLEMessengerForward(QuorumPeer self, QuorumCnxManager manager){
+            super( self, manager );
+        }
+        
+        void halt() {
+            super.shutdown();
+        }
+    }
+    
+    void populate()
+    throws Exception {
+        for (int i = 0; i < count; i++) {
+            peers.put(Long.valueOf(i),
+                    new QuorumServer(i,
+                            new InetSocketAddress(PortAssignment.unique()),
+                    new InetSocketAddress(PortAssignment.unique())));
+            tmpdir[i] = ClientBase.createTmpDir();
+            port[i] = PortAssignment.unique();
+        }
+    }
+    
+    @Test(timeout=20000)
+    public void testBackwardCompatibility() 
+    throws Exception {
+        populate();
+        
+        QuorumPeer peer = new QuorumPeer(peers, tmpdir[0], tmpdir[0], port[0], 3, 0, 1000, 2, 2);
+        peer.setPeerState(ServerState.LOOKING);
+        QuorumCnxManager mng = new QuorumCnxManager(peer);
+        
+        /*
+         * Check that it generates an internal notification correctly
+         */
+        MockFLEMessengerBackward fle = new MockFLEMessengerBackward(peer, mng);
+        ByteBuffer buffer = FastLeaderElection.buildMsg(ServerState.LOOKING.ordinal(), 2, 0x1, 1, 1);
+        fle.manager.recvQueue.add(new Message(buffer, 2));
+        Notification n = fle.recvqueue.take();
+        Assert.assertTrue("Wrong state", n.state == ServerState.LOOKING);
+        Assert.assertTrue("Wrong leader", n.leader == 2);
+        Assert.assertTrue("Wrong zxid", n.zxid == 0x1);
+        Assert.assertTrue("Wrong epoch", n.electionEpoch == 1);
+        Assert.assertTrue("Wrong epoch", n.peerEpoch == 1);
+        
+        /*
+         * Check that it sends a notification back to the sender
+         */
+        peer.setPeerState(ServerState.FOLLOWING);
+        peer.setCurrentVote( new Vote(2, 0x1, 1, 1, ServerState.LOOKING) );
+        buffer = FastLeaderElection.buildMsg(ServerState.LOOKING.ordinal(), 1, 0x1, 1, 1);
+        fle.manager.recvQueue.add(new Message(buffer, 1));
+        ToSend m = fle.internalqueue.take();
+        Assert.assertTrue("Wrong state", m.state == ServerState.FOLLOWING);
+        Assert.assertTrue("Wrong sid", m.sid == 1);
+        Assert.assertTrue("Wrong leader", m.leader == 2);
+        Assert.assertTrue("Wrong epoch", m.electionEpoch == 1);
+        Assert.assertTrue("Wrong epoch", m.peerEpoch == 1);
+    }
+    
+    @Test(timeout=20000)
+    public void testForwardCompatibility() 
+    throws Exception {
+        populate();
+
+        QuorumPeer peer = new QuorumPeer(peers, tmpdir[0], tmpdir[0], port[0], 3, 0, 1000, 2, 2);
+        peer.setPeerState(ServerState.LOOKING);
+        QuorumCnxManager mng = new QuorumCnxManager(peer);
+        
+        /*
+         * Check that it generates an internal notification correctly
+         */
+        MockFLEMessengerForward fle = new MockFLEMessengerForward(peer, mng);
+        ByteBuffer notBuffer = FastLeaderElection.buildMsg(ServerState.LOOKING.ordinal(), 2, 0x1, 1, 1);
+        ByteBuffer buffer = ByteBuffer.allocate( notBuffer.capacity() + 8 );
+        notBuffer.flip();
+        buffer.put(notBuffer);
+        buffer.putLong( Long.MAX_VALUE );
+        buffer.flip();
+        
+        fle.manager.recvQueue.add(new Message(buffer, 2));
+        Notification n = fle.recvqueue.take();
+        Assert.assertTrue("Wrong state", n.state == ServerState.LOOKING);
+        Assert.assertTrue("Wrong leader", n.leader == 2);
+        Assert.assertTrue("Wrong zxid", n.zxid == 0x1);
+        Assert.assertTrue("Wrong epoch", n.electionEpoch == 1);
+        Assert.assertTrue("Wrong epoch", n.peerEpoch == 1);
+        Assert.assertTrue("Wrong version", n.version == FastLeaderElection.Notification.CURRENTVERSION);
+    }
+}

Added: zookeeper/branches/branch-3.4/src/java/test/org/apache/zookeeper/server/quorum/FLELostMessageTest.java
URL: http://svn.apache.org/viewvc/zookeeper/branches/branch-3.4/src/java/test/org/apache/zookeeper/server/quorum/FLELostMessageTest.java?rev=1542489&view=auto
==============================================================================
--- zookeeper/branches/branch-3.4/src/java/test/org/apache/zookeeper/server/quorum/FLELostMessageTest.java (added)
+++ zookeeper/branches/branch-3.4/src/java/test/org/apache/zookeeper/server/quorum/FLELostMessageTest.java Sat Nov 16 10:06:19 2013
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.quorum;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.HashMap;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.zookeeper.PortAssignment;
+import org.apache.zookeeper.ZKTestCase;
+import org.apache.zookeeper.server.quorum.FastLeaderElection;
+import org.apache.zookeeper.server.quorum.QuorumCnxManager;
+import org.apache.zookeeper.server.quorum.QuorumPeer;
+import org.apache.zookeeper.server.quorum.QuorumPeer.QuorumServer;
+import org.apache.zookeeper.server.quorum.QuorumPeer.ServerState;
+import org.apache.zookeeper.test.ClientBase;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class FLELostMessageTest extends ZKTestCase {
+    protected static final Logger LOG = LoggerFactory.getLogger(FLELostMessageTest.class);
+
+
+    int count;
+    HashMap<Long,QuorumServer> peers;
+    File tmpdir[];
+    int port[];
+
+    QuorumCnxManager cnxManager;
+
+    @Before
+    public void setUp() throws Exception {
+        count = 3;
+
+        peers = new HashMap<Long,QuorumServer>(count);
+        tmpdir = new File[count];
+        port = new int[count];
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        cnxManager.halt();
+    }
+
+    @Test
+    public void testLostMessage() throws Exception {
+        FastLeaderElection le[] = new FastLeaderElection[count];
+
+        LOG.info("TestLE: " + getTestName()+ ", " + count);
+        for(int i = 0; i < count; i++) {
+            int clientport = PortAssignment.unique();
+            peers.put(Long.valueOf(i),
+                    new QuorumServer(i,
+                            new InetSocketAddress(clientport),
+                            new InetSocketAddress(PortAssignment.unique())));
+            tmpdir[i] = ClientBase.createTmpDir();
+            port[i] = clientport;
+        }
+
+        /*
+         * Start server 0
+         */
+
+        QuorumPeer peer = new QuorumPeer(peers, tmpdir[1], tmpdir[1], port[1], 3, 1, 1000, 2, 2);
+        peer.startLeaderElection();
+        FLETestUtils.LEThread thread = new FLETestUtils.LEThread(peer, 1);
+        thread.start();
+
+        /*
+         * Start mock server 1
+         */
+        mockServer();
+        thread.join(5000);
+        if (thread.isAlive()) {
+            Assert.fail("Threads didn't join");
+        }
+    }
+
+    void mockServer() throws InterruptedException, IOException {
+        /*
+         * Create an instance of the connection manager
+         */
+        QuorumPeer peer = new QuorumPeer(peers, tmpdir[0], tmpdir[0], port[0], 3, 0, 1000, 2, 2);
+        cnxManager = new QuorumCnxManager(peer);
+        QuorumCnxManager.Listener listener = cnxManager.listener;
+        listener.start();
+
+
+        cnxManager.toSend(1l, FLETestUtils.createMsg(ServerState.LOOKING.ordinal(), 0, 0, 0));
+        cnxManager.recvQueue.take();
+        cnxManager.toSend(1L, FLETestUtils.createMsg(ServerState.FOLLOWING.ordinal(), 1, 0, 0));
+    }
+}

Added: zookeeper/branches/branch-3.4/src/java/test/org/apache/zookeeper/server/quorum/FLETestUtils.java
URL: http://svn.apache.org/viewvc/zookeeper/branches/branch-3.4/src/java/test/org/apache/zookeeper/server/quorum/FLETestUtils.java?rev=1542489&view=auto
==============================================================================
--- zookeeper/branches/branch-3.4/src/java/test/org/apache/zookeeper/server/quorum/FLETestUtils.java (added)
+++ zookeeper/branches/branch-3.4/src/java/test/org/apache/zookeeper/server/quorum/FLETestUtils.java Sat Nov 16 10:06:19 2013
@@ -0,0 +1,86 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.quorum;
+
+import java.nio.ByteBuffer;
+
+import org.apache.zookeeper.server.quorum.FastLeaderElection;
+import org.apache.zookeeper.server.quorum.QuorumPeer;
+import org.apache.zookeeper.server.quorum.Vote;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.junit.Assert;
+
+import org.apache.zookeeper.server.quorum.QuorumPeer.ServerState;
+
+public class FLETestUtils {
+    protected static final Logger LOG = LoggerFactory.getLogger(FLETestUtils.class);
+    
+    
+    /*
+     * Thread to run an instance of leader election for 
+     * a given quorum peer.
+     */
+    static class LEThread extends Thread {
+        private int i;
+        private QuorumPeer peer;
+
+        LEThread(QuorumPeer peer, int i) {
+            this.i = i;
+            this.peer = peer;
+            LOG.info("Constructor: " + getName());
+
+        }
+
+        public void run(){
+            try{
+                Vote v = null;
+                peer.setPeerState(ServerState.LOOKING);
+                LOG.info("Going to call leader election: " + i);
+                v = peer.getElectionAlg().lookForLeader();
+
+                if (v == null){
+                    Assert.fail("Thread " + i + " got a null vote");
+                }
+
+                /*
+                 * A real zookeeper would take care of setting the current vote. Here
+                 * we do it manually.
+                 */
+                peer.setCurrentVote(v);
+
+                LOG.info("Finished election: " + i + ", " + v.getId());
+
+                Assert.assertTrue("State is not leading.", peer.getPeerState() == ServerState.LEADING);
+            } catch (Exception e) {
+                e.printStackTrace();
+            }
+            LOG.info("Joining");
+        }
+    }
+    
+    /*
+     * Creates a leader election notification message.
+     */
+    
+    static ByteBuffer createMsg(int state, long leader, long zxid, long epoch){
+        return FastLeaderElection.buildMsg(state, leader, zxid, 1, epoch);
+    }
+
+}
\ No newline at end of file

Modified: zookeeper/branches/branch-3.4/src/java/test/org/apache/zookeeper/test/LENonTerminateTest.java
URL: http://svn.apache.org/viewvc/zookeeper/branches/branch-3.4/src/java/test/org/apache/zookeeper/test/LENonTerminateTest.java?rev=1542489&r1=1542488&r2=1542489&view=diff
==============================================================================
--- zookeeper/branches/branch-3.4/src/java/test/org/apache/zookeeper/test/LENonTerminateTest.java (original)
+++ zookeeper/branches/branch-3.4/src/java/test/org/apache/zookeeper/test/LENonTerminateTest.java Sat Nov 16 10:06:19 2013
@@ -38,6 +38,7 @@ import org.apache.zookeeper.PortAssignme
 import org.apache.zookeeper.ZKTestCase;
 import org.apache.zookeeper.server.ServerCnxnFactory;
 import org.apache.zookeeper.server.quorum.Election;
+import org.apache.zookeeper.server.quorum.FLELostMessageTest;
 import org.apache.zookeeper.server.quorum.LeaderElection;
 import org.apache.zookeeper.server.quorum.QuorumPeer;
 import org.apache.zookeeper.server.quorum.Vote;