You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by fp...@apache.org on 2013/11/16 11:06:20 UTC
svn commit: r1542489 - in /zookeeper/branches/branch-3.4: ./
src/java/main/org/apache/zookeeper/server/quorum/
src/java/test/org/apache/zookeeper/server/quorum/
src/java/test/org/apache/zookeeper/test/
Author: fpj
Date: Sat Nov 16 10:06:19 2013
New Revision: 1542489
URL: http://svn.apache.org/r1542489
Log:
ZOOKEEPER-1808. Add version to FLE notifications for 3.4 branch (fpj)
Added:
zookeeper/branches/branch-3.4/src/java/test/org/apache/zookeeper/server/quorum/FLEBackwardElectionRoundTest.java
zookeeper/branches/branch-3.4/src/java/test/org/apache/zookeeper/server/quorum/FLECompatibilityTest.java
zookeeper/branches/branch-3.4/src/java/test/org/apache/zookeeper/server/quorum/FLELostMessageTest.java
zookeeper/branches/branch-3.4/src/java/test/org/apache/zookeeper/server/quorum/FLETestUtils.java
Removed:
zookeeper/branches/branch-3.4/src/java/test/org/apache/zookeeper/test/FLEBackwardElectionRoundTest.java
zookeeper/branches/branch-3.4/src/java/test/org/apache/zookeeper/test/FLELostMessageTest.java
zookeeper/branches/branch-3.4/src/java/test/org/apache/zookeeper/test/FLETestUtils.java
Modified:
zookeeper/branches/branch-3.4/CHANGES.txt
zookeeper/branches/branch-3.4/src/java/main/org/apache/zookeeper/server/quorum/FastLeaderElection.java
zookeeper/branches/branch-3.4/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java
zookeeper/branches/branch-3.4/src/java/main/org/apache/zookeeper/server/quorum/Vote.java
zookeeper/branches/branch-3.4/src/java/test/org/apache/zookeeper/server/quorum/CnxManagerTest.java
zookeeper/branches/branch-3.4/src/java/test/org/apache/zookeeper/test/LENonTerminateTest.java
Modified: zookeeper/branches/branch-3.4/CHANGES.txt
URL: http://svn.apache.org/viewvc/zookeeper/branches/branch-3.4/CHANGES.txt?rev=1542489&r1=1542488&r2=1542489&view=diff
==============================================================================
--- zookeeper/branches/branch-3.4/CHANGES.txt (original)
+++ zookeeper/branches/branch-3.4/CHANGES.txt Sat Nov 16 10:06:19 2013
@@ -155,6 +155,8 @@ BUGFIXES:
ZOOKEEPER-1798. Fix race condition in testNormalObserverRun
(thawan, fpj via thawan)
+ ZOOKEEPER-1808. Add version to FLE notifications for 3.4 branch (fpj)
+
ZOOKEEPER-1812. ZooInspector reconnection always fails if first
connection fails (Benjamin Jaton via phunt)
Modified: zookeeper/branches/branch-3.4/src/java/main/org/apache/zookeeper/server/quorum/FastLeaderElection.java
URL: http://svn.apache.org/viewvc/zookeeper/branches/branch-3.4/src/java/main/org/apache/zookeeper/server/quorum/FastLeaderElection.java?rev=1542489&r1=1542488&r2=1542489&view=diff
==============================================================================
--- zookeeper/branches/branch-3.4/src/java/main/org/apache/zookeeper/server/quorum/FastLeaderElection.java (original)
+++ zookeeper/branches/branch-3.4/src/java/main/org/apache/zookeeper/server/quorum/FastLeaderElection.java Sat Nov 16 10:06:19 2013
@@ -93,6 +93,13 @@ public class FastLeaderElection implemen
static public class Notification {
/*
+ * Format version, introduced in 3.4.6
+ */
+
+ public final static int CURRENTVERSION = 0x1;
+ int version;
+
+ /*
* Proposed leader
*/
long leader;
@@ -121,6 +128,39 @@ public class FastLeaderElection implemen
* epoch of the proposed leader
*/
long peerEpoch;
+
+ @Override
+ public String toString() {
+ return new String(Long.toHexString(version) + " (message format version), "
+ + leader + " (n.leader), 0x"
+ + Long.toHexString(zxid) + " (n.zxid), 0x"
+ + Long.toHexString(electionEpoch) + " (n.round), " + state
+ + " (n.state), " + sid + " (n.sid), 0x"
+ + Long.toHexString(peerEpoch) + " (n.peerEpoch) ");
+ }
+ }
+
+ static ByteBuffer buildMsg(int state,
+ long leader,
+ long zxid,
+ long electionEpoch,
+ long epoch) {
+ byte requestBytes[] = new byte[40];
+ ByteBuffer requestBuffer = ByteBuffer.wrap(requestBytes);
+
+ /*
+ * Building notification packet to send
+ */
+
+ requestBuffer.clear();
+ requestBuffer.putInt(state);
+ requestBuffer.putLong(leader);
+ requestBuffer.putLong(zxid);
+ requestBuffer.putLong(electionEpoch);
+ requestBuffer.putLong(epoch);
+ requestBuffer.putInt(Notification.CURRENTVERSION);
+
+ return requestBuffer;
}
/**
@@ -188,7 +228,7 @@ public class FastLeaderElection implemen
* spawns a new thread.
*/
- private class Messenger {
+ protected class Messenger {
/**
* Receives messages from instance of QuorumCnxManager on
@@ -250,6 +290,9 @@ public class FastLeaderElection implemen
boolean backCompatibility = (response.buffer.capacity() == 28);
response.buffer.clear();
+ // Instantiate Notification and set its attributes
+ Notification n = new Notification();
+
// State of peer that sent this message
QuorumPeer.ServerState ackstate = QuorumPeer.ServerState.LOOKING;
switch (response.buffer.getInt()) {
@@ -265,10 +308,10 @@ public class FastLeaderElection implemen
case 3:
ackstate = QuorumPeer.ServerState.OBSERVING;
break;
+ default:
+ continue;
}
-
- // Instantiate Notification and set its attributes
- Notification n = new Notification();
+
n.leader = response.buffer.getLong();
n.zxid = response.buffer.getLong();
n.electionEpoch = response.buffer.getLong();
@@ -284,6 +327,12 @@ public class FastLeaderElection implemen
}
/*
+ * Version added in 3.4.6
+ */
+
+ n.version = (response.buffer.remaining() >= 4) ? response.buffer.getInt() : 0x0;
+
+ /*
* Print notification info
*/
if(LOG.isInfoEnabled()){
@@ -383,23 +432,13 @@ public class FastLeaderElection implemen
*
* @param m message to send
*/
- private void process(ToSend m) {
- byte requestBytes[] = new byte[36];
- ByteBuffer requestBuffer = ByteBuffer.wrap(requestBytes);
-
- /*
- * Building notification packet to send
- */
-
- requestBuffer.clear();
- requestBuffer.putInt(m.state.ordinal());
- requestBuffer.putLong(m.leader);
- requestBuffer.putLong(m.zxid);
- requestBuffer.putLong(m.electionEpoch);
- requestBuffer.putLong(m.peerEpoch);
-
+ void process(ToSend m) {
+ ByteBuffer requestBuffer = buildMsg(m.state.ordinal(),
+ m.leader,
+ m.zxid,
+ m.electionEpoch,
+ m.peerEpoch);
manager.toSend(m.sid, requestBuffer);
-
}
}
@@ -547,11 +586,7 @@ public class FastLeaderElection implemen
private void printNotification(Notification n){
- LOG.info("Notification: " + n.leader + " (n.leader), 0x"
- + Long.toHexString(n.zxid) + " (n.zxid), 0x"
- + Long.toHexString(n.electionEpoch) + " (n.round), " + n.state
- + " (n.state), " + n.sid + " (n.sid), 0x"
- + Long.toHexString(n.peerEpoch) + " (n.peerEPoch), "
+ LOG.info("Notification: " + n.toString()
+ self.getPeerState() + " (my state)");
}
Modified: zookeeper/branches/branch-3.4/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java
URL: http://svn.apache.org/viewvc/zookeeper/branches/branch-3.4/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java?rev=1542489&r1=1542488&r2=1542489&view=diff
==============================================================================
--- zookeeper/branches/branch-3.4/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java (original)
+++ zookeeper/branches/branch-3.4/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java Sat Nov 16 10:06:19 2013
@@ -72,7 +72,7 @@ public class QuorumCnxManager {
// stale notifications to peers
static final int SEND_CAPACITY = 1;
- static final int PACKETMAXSIZE = 1024 * 1024;
+ static final int PACKETMAXSIZE = 1024 * 512;
/*
* Maximum number of attempts to connect to a peer
*/
@@ -129,6 +129,7 @@ public class QuorumCnxManager {
private AtomicInteger threadCnt = new AtomicInteger(0);
static public class Message {
+
Message(ByteBuffer buffer, long sid) {
this.buffer = buffer;
this.sid = sid;
Modified: zookeeper/branches/branch-3.4/src/java/main/org/apache/zookeeper/server/quorum/Vote.java
URL: http://svn.apache.org/viewvc/zookeeper/branches/branch-3.4/src/java/main/org/apache/zookeeper/server/quorum/Vote.java?rev=1542489&r1=1542488&r2=1542489&view=diff
==============================================================================
--- zookeeper/branches/branch-3.4/src/java/main/org/apache/zookeeper/server/quorum/Vote.java (original)
+++ zookeeper/branches/branch-3.4/src/java/main/org/apache/zookeeper/server/quorum/Vote.java Sat Nov 16 10:06:19 2013
@@ -23,7 +23,9 @@ import org.apache.zookeeper.server.quoru
public class Vote {
- public Vote(long id, long zxid) {
+ public Vote(long id,
+ long zxid) {
+ this.version = 0x0;
this.id = id;
this.zxid = zxid;
this.electionEpoch = -1;
@@ -31,7 +33,10 @@ public class Vote {
this.state = ServerState.LOOKING;
}
- public Vote(long id, long zxid, long peerEpoch) {
+ public Vote(long id,
+ long zxid,
+ long peerEpoch) {
+ this.version = 0x0;
this.id = id;
this.zxid = zxid;
this.electionEpoch = -1;
@@ -39,7 +44,11 @@ public class Vote {
this.state = ServerState.LOOKING;
}
- public Vote(long id, long zxid, long electionEpoch, long peerEpoch) {
+ public Vote(long id,
+ long zxid,
+ long electionEpoch,
+ long peerEpoch) {
+ this.version = 0x0;
this.id = id;
this.zxid = zxid;
this.electionEpoch = electionEpoch;
@@ -47,7 +56,13 @@ public class Vote {
this.state = ServerState.LOOKING;
}
- public Vote(long id, long zxid, long electionEpoch, long peerEpoch, ServerState state) {
+ public Vote(int version,
+ long id,
+ long zxid,
+ long electionEpoch,
+ long peerEpoch,
+ ServerState state) {
+ this.version = version;
this.id = id;
this.zxid = zxid;
this.electionEpoch = electionEpoch;
@@ -55,6 +70,21 @@ public class Vote {
this.peerEpoch = peerEpoch;
}
+ public Vote(long id,
+ long zxid,
+ long electionEpoch,
+ long peerEpoch,
+ ServerState state) {
+ this.id = id;
+ this.zxid = zxid;
+ this.electionEpoch = electionEpoch;
+ this.state = state;
+ this.peerEpoch = peerEpoch;
+ this.version = 0x0;
+ }
+
+ final private int version;
+
final private long id;
final private long zxid;
@@ -63,6 +93,10 @@ public class Vote {
final private long peerEpoch;
+ public int getVersion() {
+ return version;
+ }
+
public long getId() {
return id;
}
@@ -91,10 +125,13 @@ public class Vote {
return false;
}
Vote other = (Vote) o;
- return (id == other.id && zxid == other.zxid && electionEpoch == other.electionEpoch && peerEpoch == other.peerEpoch);
+ return (id == other.id
+ && zxid == other.zxid
+ && electionEpoch == other.electionEpoch
+ && peerEpoch == other.peerEpoch);
}
-
+
@Override
public int hashCode() {
return (int) (id & zxid);
Modified: zookeeper/branches/branch-3.4/src/java/test/org/apache/zookeeper/server/quorum/CnxManagerTest.java
URL: http://svn.apache.org/viewvc/zookeeper/branches/branch-3.4/src/java/test/org/apache/zookeeper/server/quorum/CnxManagerTest.java?rev=1542489&r1=1542488&r2=1542489&view=diff
==============================================================================
--- zookeeper/branches/branch-3.4/src/java/test/org/apache/zookeeper/server/quorum/CnxManagerTest.java (original)
+++ zookeeper/branches/branch-3.4/src/java/test/org/apache/zookeeper/server/quorum/CnxManagerTest.java Sat Nov 16 10:06:19 2013
@@ -74,20 +74,7 @@ public class CnxManagerTest extends ZKTe
}
ByteBuffer createMsg(int state, long leader, long zxid, long epoch){
- byte requestBytes[] = new byte[28];
- ByteBuffer requestBuffer = ByteBuffer.wrap(requestBytes);
-
- /*
- * Building notification packet to send
- */
-
- requestBuffer.clear();
- requestBuffer.putInt(state);
- requestBuffer.putLong(leader);
- requestBuffer.putLong(zxid);
- requestBuffer.putLong(epoch);
-
- return requestBuffer;
+ return FastLeaderElection.buildMsg(state, leader, zxid, 0, epoch);
}
class CnxManagerThread extends Thread {
Added: zookeeper/branches/branch-3.4/src/java/test/org/apache/zookeeper/server/quorum/FLEBackwardElectionRoundTest.java
URL: http://svn.apache.org/viewvc/zookeeper/branches/branch-3.4/src/java/test/org/apache/zookeeper/server/quorum/FLEBackwardElectionRoundTest.java?rev=1542489&view=auto
==============================================================================
--- zookeeper/branches/branch-3.4/src/java/test/org/apache/zookeeper/server/quorum/FLEBackwardElectionRoundTest.java (added)
+++ zookeeper/branches/branch-3.4/src/java/test/org/apache/zookeeper/server/quorum/FLEBackwardElectionRoundTest.java Sat Nov 16 10:06:19 2013
@@ -0,0 +1,155 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.quorum;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.zookeeper.PortAssignment;
+import org.apache.zookeeper.ZKTestCase;
+import org.apache.zookeeper.server.quorum.QuorumCnxManager;
+import org.apache.zookeeper.server.quorum.QuorumPeer;
+import org.apache.zookeeper.server.quorum.Vote;
+import org.apache.zookeeper.server.quorum.QuorumPeer.QuorumServer;
+import org.apache.zookeeper.server.quorum.QuorumPeer.ServerState;
+import org.apache.zookeeper.test.ClientBase;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+
+public class FLEBackwardElectionRoundTest extends ZKTestCase {
+ protected static final Logger LOG = LoggerFactory.getLogger(FLELostMessageTest.class);
+
+ int count;
+ HashMap<Long,QuorumServer> peers;
+ File tmpdir[];
+ int port[];
+
+ QuorumCnxManager cnxManagers[];
+
+ @Before
+ public void setUp() throws Exception {
+ count = 3;
+
+ peers = new HashMap<Long,QuorumServer>(count);
+ tmpdir = new File[count];
+ port = new int[count];
+ cnxManagers = new QuorumCnxManager[count - 1];
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ for(int i = 0; i < (count - 1); i++){
+ if(cnxManagers[i] != null){
+ cnxManagers[i].halt();
+ }
+ }
+ }
+
+ /**
+ * This test is checking the following case. A server S is
+ * currently LOOKING and it receives notifications from
+ * a quorum indicating they are following S. The election
+ * round E of S is higher than the election round E' in the
+ * notification messages, so S becomes the leader and sets
+ * its epoch back to E'. In the meanwhile, one or more
+ * followers turn to LOOKING and elect S in election round E.
+ * Having leader and followers with different election rounds
+ * might prevent other servers from electing a leader because
+ * they can't get a consistent set of notifications from a
+ * quorum.
+ *
+ * {@link https://issues.apache.org/jira/browse/ZOOKEEPER-1514}
+ *
+ *
+ * @throws Exception
+ */
+
+ @Test
+ public void testBackwardElectionRound() throws Exception {
+ LOG.info("TestLE: " + getTestName()+ ", " + count);
+ for(int i = 0; i < count; i++) {
+ int clientport = PortAssignment.unique();
+ peers.put(Long.valueOf(i),
+ new QuorumServer(i,
+ new InetSocketAddress(clientport),
+ new InetSocketAddress(PortAssignment.unique())));
+ tmpdir[i] = ClientBase.createTmpDir();
+ port[i] = clientport;
+ }
+
+ /*
+ * Start server 0
+ */
+
+ QuorumPeer peer = new QuorumPeer(peers, tmpdir[0], tmpdir[0], port[0], 3, 0, 1000, 2, 2);
+ peer.startLeaderElection();
+ FLETestUtils.LEThread thread = new FLETestUtils.LEThread(peer, 0);
+ thread.start();
+
+
+ /*
+ * Start mock server 1
+ */
+ QuorumPeer mockPeer = new QuorumPeer(peers, tmpdir[1], tmpdir[1], port[1], 3, 1, 1000, 2, 2);
+ cnxManagers[0] = new QuorumCnxManager(mockPeer);
+ QuorumCnxManager.Listener listener = cnxManagers[0].listener;
+ listener.start();
+
+ ByteBuffer msg = FLETestUtils.createMsg(ServerState.FOLLOWING.ordinal(), 0, 0, 1);
+ cnxManagers[0].toSend(0l, msg);
+
+ /*
+ * Start mock server 2
+ */
+ mockPeer = new QuorumPeer(peers, tmpdir[2], tmpdir[2], port[2], 3, 2, 1000, 2, 2);
+ cnxManagers[1] = new QuorumCnxManager(mockPeer);
+ listener = cnxManagers[1].listener;
+ listener.start();
+
+ cnxManagers[1].toSend(0l, msg);
+
+ /*
+ * Run another instance of leader election.
+ */
+ thread.join(5000);
+ thread = new FLETestUtils.LEThread(peer, 0);
+ thread.start();
+
+ /*
+ * Send the same messages, this time should not make 0 the leader.
+ */
+ cnxManagers[0].toSend(0l, msg);
+ cnxManagers[1].toSend(0l, msg);
+
+
+ thread.join(5000);
+
+ if (!thread.isAlive()) {
+ Assert.fail("Should not have joined");
+ }
+
+ }
+}
\ No newline at end of file
Added: zookeeper/branches/branch-3.4/src/java/test/org/apache/zookeeper/server/quorum/FLECompatibilityTest.java
URL: http://svn.apache.org/viewvc/zookeeper/branches/branch-3.4/src/java/test/org/apache/zookeeper/server/quorum/FLECompatibilityTest.java?rev=1542489&view=auto
==============================================================================
--- zookeeper/branches/branch-3.4/src/java/test/org/apache/zookeeper/server/quorum/FLECompatibilityTest.java (added)
+++ zookeeper/branches/branch-3.4/src/java/test/org/apache/zookeeper/server/quorum/FLECompatibilityTest.java Sat Nov 16 10:06:19 2013
@@ -0,0 +1,350 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.quorum;
+
+import java.io.File;
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.zookeeper.PortAssignment;
+import org.apache.zookeeper.ZKTestCase;
+import org.apache.zookeeper.server.quorum.FastLeaderElection.Notification;
+import org.apache.zookeeper.server.quorum.FastLeaderElection.ToSend;
+import org.apache.zookeeper.server.quorum.FastLeaderElection.Messenger.WorkerReceiver;
+import org.apache.zookeeper.server.quorum.QuorumCnxManager.Message;
+import org.apache.zookeeper.server.quorum.QuorumPeer.QuorumServer;
+import org.apache.zookeeper.server.quorum.QuorumPeer.ServerState;
+import org.apache.zookeeper.server.util.ZxidUtils;
+import org.apache.zookeeper.test.ClientBase;
+import org.apache.zookeeper.test.FLETest;
+import org.apache.zookeeper.test.QuorumBase;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class FLECompatibilityTest extends ZKTestCase {
+ protected static final Logger LOG = LoggerFactory.getLogger(FLECompatibilityTest.class);
+
+ int count;
+ HashMap<Long,QuorumServer> peers;
+ File tmpdir[];
+ int port[];
+
+ @Before
+ public void setUp() throws Exception {
+ count = 3;
+ peers = new HashMap<Long,QuorumServer>(count);
+ tmpdir = new File[count];
+ port = new int[count];
+ }
+
+ @After
+ public void tearDown() throws Exception {
+
+ }
+
+ class MockFLEMessengerBackward {
+ QuorumCnxManager manager;
+ QuorumPeer self;
+ long logicalclock = 1L;
+ LinkedBlockingQueue<ToSend> sendqueue = new LinkedBlockingQueue<ToSend>();
+ LinkedBlockingQueue<ToSend> internalqueue = new LinkedBlockingQueue<ToSend>();
+ LinkedBlockingQueue<Notification> recvqueue = new LinkedBlockingQueue<Notification>();
+ WorkerReceiver wr;
+
+ MockFLEMessengerBackward(QuorumPeer self, QuorumCnxManager manager){
+ this.manager = manager;
+ this.self = self;
+
+ this.wr = new WorkerReceiver(manager);
+
+ Thread t = new Thread(this.wr,
+ "WorkerReceiver[myid=" + self.getId() + "]");
+ t.setDaemon(true);
+ t.start();
+ }
+
+ void halt() {
+ wr.stop = true;
+ }
+
+ /*
+ * This class has been copied from before adding versions to notifications.
+ *
+ * {@see https://issues.apache.org/jira/browse/ZOOKEEPER-1808}
+ */
+ class WorkerReceiver implements Runnable {
+ volatile boolean stop;
+ QuorumCnxManager manager;
+ final long proposedLeader = 2;
+ final long proposedZxid = 0x1;
+ final long proposedEpoch = 1;
+
+ WorkerReceiver(QuorumCnxManager manager) {
+ this.stop = false;
+ this.manager = manager;
+ }
+
+ /*
+ * The vote we return here is fixed for test purposes.
+ */
+ Vote getVote(){
+ return new Vote(proposedLeader, proposedZxid, proposedEpoch);
+ }
+
+ public void run() {
+
+ Message response;
+ while (!stop) {
+ // Sleeps on receive
+ try{
+ response = manager.pollRecvQueue(3000, TimeUnit.MILLISECONDS);
+ if(response == null) continue;
+
+ /*
+ * If it is from an observer, respond right away.
+ * Note that the following predicate assumes that
+ * if a server is not a follower, then it must be
+ * an observer. If we ever have any other type of
+ * learner in the future, we'll have to change the
+ * way we check for observers.
+ */
+ if(!self.getVotingView().containsKey(response.sid)){
+ Vote current = self.getCurrentVote();
+ ToSend notmsg = new ToSend(ToSend.mType.notification,
+ current.getId(),
+ current.getZxid(),
+ logicalclock,
+ self.getPeerState(),
+ response.sid,
+ current.getPeerEpoch());
+
+ internalqueue.offer(notmsg);
+ } else {
+ // Receive new message
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Receive new notification message. My id = "
+ + self.getId());
+ }
+
+ /*
+ * We check for 28 bytes for backward compatibility
+ */
+ if (response.buffer.capacity() < 28) {
+ LOG.error("Got a short response: "
+ + response.buffer.capacity());
+ continue;
+ }
+ boolean backCompatibility = (response.buffer.capacity() == 28);
+ response.buffer.clear();
+
+ // State of peer that sent this message
+ QuorumPeer.ServerState ackstate = QuorumPeer.ServerState.LOOKING;
+ switch (response.buffer.getInt()) {
+ case 0:
+ ackstate = QuorumPeer.ServerState.LOOKING;
+ break;
+ case 1:
+ ackstate = QuorumPeer.ServerState.FOLLOWING;
+ break;
+ case 2:
+ ackstate = QuorumPeer.ServerState.LEADING;
+ break;
+ case 3:
+ ackstate = QuorumPeer.ServerState.OBSERVING;
+ break;
+ }
+
+ // Instantiate Notification and set its attributes
+ Notification n = new Notification();
+ n.leader = response.buffer.getLong();
+ n.zxid = response.buffer.getLong();
+ n.electionEpoch = response.buffer.getLong();
+ n.state = ackstate;
+ n.sid = response.sid;
+ if(!backCompatibility){
+ n.peerEpoch = response.buffer.getLong();
+ } else {
+ if(LOG.isInfoEnabled()){
+ LOG.info("Backward compatibility mode, server id=" + n.sid);
+ }
+ n.peerEpoch = ZxidUtils.getEpochFromZxid(n.zxid);
+ }
+
+ /*
+ * If this server is looking, then send proposed leader
+ */
+
+ if(self.getPeerState() == QuorumPeer.ServerState.LOOKING){
+ recvqueue.offer(n);
+
+ /*
+ * Send a notification back if the peer that sent this
+ * message is also looking and its logical clock is
+ * lagging behind.
+ */
+ if((ackstate == QuorumPeer.ServerState.LOOKING)
+ && (n.electionEpoch < logicalclock)){
+ Vote v = getVote();
+ ToSend notmsg = new ToSend(ToSend.mType.notification,
+ v.getId(),
+ v.getZxid(),
+ logicalclock,
+ self.getPeerState(),
+ response.sid,
+ v.getPeerEpoch());
+ internalqueue.offer(notmsg);
+ }
+ } else {
+ /*
+ * If this server is not looking, but the one that sent the ack
+ * is looking, then send back what it believes to be the leader.
+ */
+ Vote current = self.getCurrentVote();
+ if(ackstate == QuorumPeer.ServerState.LOOKING){
+ if(LOG.isDebugEnabled()){
+ LOG.debug("Sending new notification. My id = " +
+ self.getId() + " recipient=" +
+ response.sid + " zxid=0x" +
+ Long.toHexString(current.getZxid()) +
+ " leader=" + current.getId());
+ }
+ ToSend notmsg = new ToSend(
+ ToSend.mType.notification,
+ current.getId(),
+ current.getZxid(),
+ current.getElectionEpoch(),
+ self.getPeerState(),
+ response.sid,
+ current.getPeerEpoch());
+ internalqueue.offer(notmsg);
+ }
+ }
+ }
+ } catch (InterruptedException e) {
+ System.out.println("Interrupted Exception while waiting for new message" +
+ e.toString());
+ }
+ }
+ LOG.info("WorkerReceiver is down");
+ }
+ }
+ }
+
+ class MockFLEMessengerForward extends FastLeaderElection {
+
+ MockFLEMessengerForward(QuorumPeer self, QuorumCnxManager manager){
+ super( self, manager );
+ }
+
+ void halt() {
+ super.shutdown();
+ }
+ }
+
+ void populate()
+ throws Exception {
+ for (int i = 0; i < count; i++) {
+ peers.put(Long.valueOf(i),
+ new QuorumServer(i,
+ new InetSocketAddress(PortAssignment.unique()),
+ new InetSocketAddress(PortAssignment.unique())));
+ tmpdir[i] = ClientBase.createTmpDir();
+ port[i] = PortAssignment.unique();
+ }
+ }
+
+ @Test(timeout=20000)
+ public void testBackwardCompatibility()
+ throws Exception {
+ populate();
+
+ QuorumPeer peer = new QuorumPeer(peers, tmpdir[0], tmpdir[0], port[0], 3, 0, 1000, 2, 2);
+ peer.setPeerState(ServerState.LOOKING);
+ QuorumCnxManager mng = new QuorumCnxManager(peer);
+
+ /*
+ * Check that it generates an internal notification correctly
+ */
+ MockFLEMessengerBackward fle = new MockFLEMessengerBackward(peer, mng);
+ ByteBuffer buffer = FastLeaderElection.buildMsg(ServerState.LOOKING.ordinal(), 2, 0x1, 1, 1);
+ fle.manager.recvQueue.add(new Message(buffer, 2));
+ Notification n = fle.recvqueue.take();
+ Assert.assertTrue("Wrong state", n.state == ServerState.LOOKING);
+ Assert.assertTrue("Wrong leader", n.leader == 2);
+ Assert.assertTrue("Wrong zxid", n.zxid == 0x1);
+ Assert.assertTrue("Wrong epoch", n.electionEpoch == 1);
+ Assert.assertTrue("Wrong epoch", n.peerEpoch == 1);
+
+ /*
+ * Check that it sends a notification back to the sender
+ */
+ peer.setPeerState(ServerState.FOLLOWING);
+ peer.setCurrentVote( new Vote(2, 0x1, 1, 1, ServerState.LOOKING) );
+ buffer = FastLeaderElection.buildMsg(ServerState.LOOKING.ordinal(), 1, 0x1, 1, 1);
+ fle.manager.recvQueue.add(new Message(buffer, 1));
+ ToSend m = fle.internalqueue.take();
+ Assert.assertTrue("Wrong state", m.state == ServerState.FOLLOWING);
+ Assert.assertTrue("Wrong sid", m.sid == 1);
+ Assert.assertTrue("Wrong leader", m.leader == 2);
+ Assert.assertTrue("Wrong epoch", m.electionEpoch == 1);
+ Assert.assertTrue("Wrong epoch", m.peerEpoch == 1);
+ }
+
+ @Test(timeout=20000)
+ public void testForwardCompatibility()
+ throws Exception {
+ populate();
+
+ QuorumPeer peer = new QuorumPeer(peers, tmpdir[0], tmpdir[0], port[0], 3, 0, 1000, 2, 2);
+ peer.setPeerState(ServerState.LOOKING);
+ QuorumCnxManager mng = new QuorumCnxManager(peer);
+
+ /*
+ * Check that it generates an internal notification correctly
+ */
+ MockFLEMessengerForward fle = new MockFLEMessengerForward(peer, mng);
+ ByteBuffer notBuffer = FastLeaderElection.buildMsg(ServerState.LOOKING.ordinal(), 2, 0x1, 1, 1);
+ ByteBuffer buffer = ByteBuffer.allocate( notBuffer.capacity() + 8 );
+ notBuffer.flip();
+ buffer.put(notBuffer);
+ buffer.putLong( Long.MAX_VALUE );
+ buffer.flip();
+
+ fle.manager.recvQueue.add(new Message(buffer, 2));
+ Notification n = fle.recvqueue.take();
+ Assert.assertTrue("Wrong state", n.state == ServerState.LOOKING);
+ Assert.assertTrue("Wrong leader", n.leader == 2);
+ Assert.assertTrue("Wrong zxid", n.zxid == 0x1);
+ Assert.assertTrue("Wrong epoch", n.electionEpoch == 1);
+ Assert.assertTrue("Wrong epoch", n.peerEpoch == 1);
+ Assert.assertTrue("Wrong version", n.version == FastLeaderElection.Notification.CURRENTVERSION);
+ }
+}
Added: zookeeper/branches/branch-3.4/src/java/test/org/apache/zookeeper/server/quorum/FLELostMessageTest.java
URL: http://svn.apache.org/viewvc/zookeeper/branches/branch-3.4/src/java/test/org/apache/zookeeper/server/quorum/FLELostMessageTest.java?rev=1542489&view=auto
==============================================================================
--- zookeeper/branches/branch-3.4/src/java/test/org/apache/zookeeper/server/quorum/FLELostMessageTest.java (added)
+++ zookeeper/branches/branch-3.4/src/java/test/org/apache/zookeeper/server/quorum/FLELostMessageTest.java Sat Nov 16 10:06:19 2013
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.quorum;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.HashMap;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.zookeeper.PortAssignment;
+import org.apache.zookeeper.ZKTestCase;
+import org.apache.zookeeper.server.quorum.FastLeaderElection;
+import org.apache.zookeeper.server.quorum.QuorumCnxManager;
+import org.apache.zookeeper.server.quorum.QuorumPeer;
+import org.apache.zookeeper.server.quorum.QuorumPeer.QuorumServer;
+import org.apache.zookeeper.server.quorum.QuorumPeer.ServerState;
+import org.apache.zookeeper.test.ClientBase;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class FLELostMessageTest extends ZKTestCase {
+ protected static final Logger LOG = LoggerFactory.getLogger(FLELostMessageTest.class);
+
+
+ int count;
+ HashMap<Long,QuorumServer> peers;
+ File tmpdir[];
+ int port[];
+
+ QuorumCnxManager cnxManager;
+
+ @Before
+ public void setUp() throws Exception {
+ count = 3;
+
+ peers = new HashMap<Long,QuorumServer>(count);
+ tmpdir = new File[count];
+ port = new int[count];
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ cnxManager.halt();
+ }
+
+ @Test
+ public void testLostMessage() throws Exception {
+ FastLeaderElection le[] = new FastLeaderElection[count];
+
+ LOG.info("TestLE: " + getTestName()+ ", " + count);
+ for(int i = 0; i < count; i++) {
+ int clientport = PortAssignment.unique();
+ peers.put(Long.valueOf(i),
+ new QuorumServer(i,
+ new InetSocketAddress(clientport),
+ new InetSocketAddress(PortAssignment.unique())));
+ tmpdir[i] = ClientBase.createTmpDir();
+ port[i] = clientport;
+ }
+
+ /*
+ * Start server 0
+ */
+
+ QuorumPeer peer = new QuorumPeer(peers, tmpdir[1], tmpdir[1], port[1], 3, 1, 1000, 2, 2);
+ peer.startLeaderElection();
+ FLETestUtils.LEThread thread = new FLETestUtils.LEThread(peer, 1);
+ thread.start();
+
+ /*
+ * Start mock server 1
+ */
+ mockServer();
+ thread.join(5000);
+ if (thread.isAlive()) {
+ Assert.fail("Threads didn't join");
+ }
+ }
+
+ void mockServer() throws InterruptedException, IOException {
+ /*
+ * Create an instance of the connection manager
+ */
+ QuorumPeer peer = new QuorumPeer(peers, tmpdir[0], tmpdir[0], port[0], 3, 0, 1000, 2, 2);
+ cnxManager = new QuorumCnxManager(peer);
+ QuorumCnxManager.Listener listener = cnxManager.listener;
+ listener.start();
+
+
+ cnxManager.toSend(1l, FLETestUtils.createMsg(ServerState.LOOKING.ordinal(), 0, 0, 0));
+ cnxManager.recvQueue.take();
+ cnxManager.toSend(1L, FLETestUtils.createMsg(ServerState.FOLLOWING.ordinal(), 1, 0, 0));
+ }
+}
Added: zookeeper/branches/branch-3.4/src/java/test/org/apache/zookeeper/server/quorum/FLETestUtils.java
URL: http://svn.apache.org/viewvc/zookeeper/branches/branch-3.4/src/java/test/org/apache/zookeeper/server/quorum/FLETestUtils.java?rev=1542489&view=auto
==============================================================================
--- zookeeper/branches/branch-3.4/src/java/test/org/apache/zookeeper/server/quorum/FLETestUtils.java (added)
+++ zookeeper/branches/branch-3.4/src/java/test/org/apache/zookeeper/server/quorum/FLETestUtils.java Sat Nov 16 10:06:19 2013
@@ -0,0 +1,86 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.quorum;
+
+import java.nio.ByteBuffer;
+
+import org.apache.zookeeper.server.quorum.FastLeaderElection;
+import org.apache.zookeeper.server.quorum.QuorumPeer;
+import org.apache.zookeeper.server.quorum.Vote;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.junit.Assert;
+
+import org.apache.zookeeper.server.quorum.QuorumPeer.ServerState;
+
+public class FLETestUtils {
+ protected static final Logger LOG = LoggerFactory.getLogger(FLETestUtils.class);
+
+
+ /*
+ * Thread to run an instance of leader election for
+ * a given quorum peer.
+ */
+ static class LEThread extends Thread {
+ private int i;
+ private QuorumPeer peer;
+
+ LEThread(QuorumPeer peer, int i) {
+ this.i = i;
+ this.peer = peer;
+ LOG.info("Constructor: " + getName());
+
+ }
+
+ public void run(){
+ try{
+ Vote v = null;
+ peer.setPeerState(ServerState.LOOKING);
+ LOG.info("Going to call leader election: " + i);
+ v = peer.getElectionAlg().lookForLeader();
+
+ if (v == null){
+ Assert.fail("Thread " + i + " got a null vote");
+ }
+
+ /*
+ * A real zookeeper would take care of setting the current vote. Here
+ * we do it manually.
+ */
+ peer.setCurrentVote(v);
+
+ LOG.info("Finished election: " + i + ", " + v.getId());
+
+ Assert.assertTrue("State is not leading.", peer.getPeerState() == ServerState.LEADING);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ LOG.info("Joining");
+ }
+ }
+
+ /*
+ * Creates a leader election notification message.
+ */
+
+ static ByteBuffer createMsg(int state, long leader, long zxid, long epoch){
+ return FastLeaderElection.buildMsg(state, leader, zxid, 1, epoch);
+ }
+
+}
\ No newline at end of file
Modified: zookeeper/branches/branch-3.4/src/java/test/org/apache/zookeeper/test/LENonTerminateTest.java
URL: http://svn.apache.org/viewvc/zookeeper/branches/branch-3.4/src/java/test/org/apache/zookeeper/test/LENonTerminateTest.java?rev=1542489&r1=1542488&r2=1542489&view=diff
==============================================================================
--- zookeeper/branches/branch-3.4/src/java/test/org/apache/zookeeper/test/LENonTerminateTest.java (original)
+++ zookeeper/branches/branch-3.4/src/java/test/org/apache/zookeeper/test/LENonTerminateTest.java Sat Nov 16 10:06:19 2013
@@ -38,6 +38,7 @@ import org.apache.zookeeper.PortAssignme
import org.apache.zookeeper.ZKTestCase;
import org.apache.zookeeper.server.ServerCnxnFactory;
import org.apache.zookeeper.server.quorum.Election;
+import org.apache.zookeeper.server.quorum.FLELostMessageTest;
import org.apache.zookeeper.server.quorum.LeaderElection;
import org.apache.zookeeper.server.quorum.QuorumPeer;
import org.apache.zookeeper.server.quorum.Vote;