You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by fp...@apache.org on 2011/04/29 18:12:45 UTC
svn commit: r1097865 - in /zookeeper/trunk: ./
src/java/main/org/apache/zookeeper/server/quorum/
src/java/test/org/apache/zookeeper/test/
Author: fpj
Date: Fri Apr 29 16:12:44 2011
New Revision: 1097865
URL: http://svn.apache.org/viewvc?rev=1097865&view=rev
Log:
ZOOKEEPER-975. new peer goes in LEADING state even if ensemble is online. (vishal via fpj)
Modified:
zookeeper/trunk/CHANGES.txt
zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FastLeaderElection.java
zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Follower.java
zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Leader.java
zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java
zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java
zookeeper/trunk/src/java/test/org/apache/zookeeper/test/CnxManagerTest.java
zookeeper/trunk/src/java/test/org/apache/zookeeper/test/FLETest.java
Modified: zookeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/zookeeper/trunk/CHANGES.txt?rev=1097865&r1=1097864&r2=1097865&view=diff
==============================================================================
--- zookeeper/trunk/CHANGES.txt (original)
+++ zookeeper/trunk/CHANGES.txt Fri Apr 29 16:12:44 2011
@@ -201,6 +201,8 @@ BUGFIXES:
ZOOKEEPER-1028. In python bindings, zookeeper.set2() should return a stat dict but
instead returns None. (Chris Medaglia and Ivan Kelly via mahadev)
+ ZOOKEEPER-975. new peer goes in LEADING state even if ensemble is online. (vishal via fpj)
+
IMPROVEMENTS:
ZOOKEEPER-724. Improve junit test integration - log harness information
(phunt via mahadev)
Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FastLeaderElection.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FastLeaderElection.java?rev=1097865&r1=1097864&r2=1097865&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FastLeaderElection.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FastLeaderElection.java Fri Apr 29 16:12:44 2011
@@ -188,7 +188,7 @@ public class FastLeaderElection implemen
while (!stop) {
// Sleeps on receive
try{
- response = manager.recvQueue.poll(3000, TimeUnit.MILLISECONDS);
+ response = manager.pollRecvQueue(3000, TimeUnit.MILLISECONDS);
if(response == null) continue;
/*
@@ -288,7 +288,9 @@ public class FastLeaderElection implemen
if(LOG.isDebugEnabled()){
LOG.debug("Sending new notification. My id = " +
self.getId() + ", Recipient = " +
- response.sid);
+ response.sid + " zxid =" +
+ current.zxid + " leader=" +
+ current.id);
}
ToSend notmsg = new ToSend(
ToSend.mType.notification,
@@ -384,14 +386,14 @@ public class FastLeaderElection implemen
this.ws = new WorkerSender(manager);
Thread t = new Thread(this.ws,
- "WorkerSender(" + Thread.currentThread().getName() + ")");
+ "WorkerSender[myid=" + self.getId() + "]");
t.setDaemon(true);
t.start();
this.wr = new WorkerReceiver(manager);
t = new Thread(this.wr,
- "WorkerReceiver(" + Thread.currentThread().getName() + ")");
+ "WorkerReceiver[myid=" + self.getId() + "]");
t.setDaemon(true);
t.start();
}
@@ -455,7 +457,13 @@ public class FastLeaderElection implemen
this.messenger = new Messenger(manager);
}
- private void leaveInstance() {
+ private void leaveInstance(Vote v) {
+ if(LOG.isDebugEnabled()){
+ LOG.debug("About to leave FLE instance: Leader= "
+ + v.id + ", Zxid = " +
+ v.zxid + ", My id = " + self.getId()
+ + ", My state = " + self.getPeerState());
+ }
recvqueue.clear();
}
@@ -487,7 +495,12 @@ public class FastLeaderElection implemen
logicalclock,
QuorumPeer.ServerState.LOOKING,
sid);
-
+ if(LOG.isDebugEnabled()){
+ LOG.debug("Sending Notification: " + proposedLeader + " (n.leader), " +
+ proposedZxid + " (n.zxid), " + logicalclock +
+ " (n.round), " + sid + " (recipient), " + self.getId() +
+ " (myid)");
+ }
sendqueue.offer(notmsg);
}
}
@@ -578,6 +591,11 @@ public class FastLeaderElection implemen
}
synchronized void updateProposal(long leader, long zxid){
+ if(LOG.isDebugEnabled()){
+ LOG.debug("Updating proposal: " + leader + " (newleader), " + zxid +
+ " (newzxid), " + proposedLeader + " (oldleader), " +
+ proposedZxid + " (oldzxid)");
+ }
proposedLeader = leader;
proposedZxid = zxid;
}
@@ -640,7 +658,9 @@ public class FastLeaderElection implemen
LOG.warn("Failed to register with JMX", e);
self.jmxLeaderElectionBean = null;
}
-
+ if (self.start_fle == 0) {
+ self.start_fle = System.currentTimeMillis();
+ }
try {
HashMap<Long, Vote> recvset = new HashMap<Long, Vote>();
@@ -689,7 +709,11 @@ public class FastLeaderElection implemen
tmpTimeOut : maxNotificationInterval);
LOG.info("Notification time out: " + notTimeout);
}
- else{
+ else if(self.getVotingView().containsKey(n.sid)) {
+ /*
+ * Only proceed if the vote comes from a replica in the
+ * voting view.
+ */
switch (n.state) {
case LOOKING:
// If notification > current, replace and send messages out
@@ -697,11 +721,12 @@ public class FastLeaderElection implemen
logicalclock = n.epoch;
recvset.clear();
if(totalOrderPredicate(n.leader, n.zxid,
- getInitId(), getInitLastLoggedZxid()))
+ getInitId(), getInitLastLoggedZxid())) {
updateProposal(n.leader, n.zxid);
- else
+ } else {
updateProposal(getInitId(),
getInitLastLoggedZxid());
+ }
sendNotifications();
} else if (n.epoch < logicalclock) {
if(LOG.isDebugEnabled()){
@@ -711,7 +736,6 @@ public class FastLeaderElection implemen
break;
} else if (totalOrderPredicate(n.leader, n.zxid,
proposedLeader, proposedZxid)) {
- LOG.info("Updating proposal");
updateProposal(n.leader, n.zxid);
sendNotifications();
}
@@ -723,85 +747,66 @@ public class FastLeaderElection implemen
", Proposed epoch = " + n.epoch);
}
- /*
- * Only proceed if the vote comes from a replica in the
- * voting view.
- */
- if(self.getVotingView().containsKey(n.sid)){
- recvset.put(n.sid, new Vote(n.leader, n.zxid, n.epoch));
-
- //If have received from all nodes, then terminate
- if ((self.getVotingView().size() == recvset.size()) &&
- (self.getQuorumVerifier().getWeight(proposedLeader) != 0)){
- self.setPeerState((proposedLeader == self.getId()) ?
- ServerState.LEADING: learningState());
- leaveInstance();
- return new Vote(proposedLeader, proposedZxid);
+ recvset.put(n.sid, new Vote(n.leader, n.zxid, n.epoch));
- } else if (termPredicate(recvset,
- new Vote(proposedLeader, proposedZxid,
- logicalclock))) {
-
- // Verify if there is any change in the proposed leader
- while((n = recvqueue.poll(finalizeWait,
- TimeUnit.MILLISECONDS)) != null){
- if(totalOrderPredicate(n.leader, n.zxid,
- proposedLeader, proposedZxid)){
- recvqueue.put(n);
- break;
- }
+ if (termPredicate(recvset,
+ new Vote(proposedLeader, proposedZxid,
+ logicalclock))) {
+
+ // Verify if there is any change in the proposed leader
+ while((n = recvqueue.poll(finalizeWait,
+ TimeUnit.MILLISECONDS)) != null){
+ if(totalOrderPredicate(n.leader, n.zxid,
+ proposedLeader, proposedZxid)){
+ recvqueue.put(n);
+ break;
}
+ }
- /*
- * This predicate is true once we don't read any new
- * relevant message from the reception queue
- */
- if (n == null) {
- self.setPeerState((proposedLeader == self.getId()) ?
- ServerState.LEADING: learningState());
- if(LOG.isDebugEnabled()){
- LOG.debug("About to leave FLE instance: Leader= "
- + proposedLeader + ", Zxid = " +
- proposedZxid + ", My id = " + self.getId()
- + ", My state = " + self.getPeerState());
- }
+ /*
+ * This predicate is true once we don't read any new
+ * relevant message from the reception queue
+ */
+ if (n == null) {
+ self.setPeerState((proposedLeader == self.getId()) ?
+ ServerState.LEADING: learningState());
- leaveInstance();
- return new Vote(proposedLeader,
- proposedZxid);
- }
+ Vote endVote = new Vote(proposedLeader,
+ proposedZxid);
+ leaveInstance(endVote);
+ return endVote;
}
}
break;
case OBSERVING:
LOG.debug("Notification from observer: " + n.sid);
break;
- default:
+ case FOLLOWING:
+ case LEADING:
/*
- * There is at most one leader for each epoch, so if a
- * peer claims to be the leader for an epoch, then that
- * peer must be the leader (no* arbitrary failures
- * assumed). Now, if there is no quorum supporting
- * this leader, then processes will naturally move
- * to a new epoch.
+ * Consider all notifications from the same epoch
+ * together.
*/
if(n.epoch == logicalclock){
recvset.put(n.sid, new Vote(n.leader, n.zxid, n.epoch));
- if((n.state == ServerState.LEADING) ||
- (termPredicate(recvset, new Vote(n.leader,
+ if(termPredicate(recvset, new Vote(n.leader,
n.zxid, n.epoch, n.state))
- && checkLeader(outofelection, n.leader, n.epoch)) ){
+ && checkLeader(outofelection, n.leader, n.epoch)) {
self.setPeerState((n.leader == self.getId()) ?
ServerState.LEADING: learningState());
- leaveInstance();
- return new Vote(n.leader, n.zxid);
+ Vote endVote = new Vote(n.leader, n.zxid);
+ leaveInstance(endVote);
+ return endVote;
}
}
+ /**
+ * Before joining an established ensemble, verify that
+ * a majority are following the same leader.
+ */
outofelection.put(n.sid, new Vote(n.leader, n.zxid,
n.epoch, n.state));
-
if (termPredicate(outofelection, new Vote(n.leader,
n.zxid, n.epoch, n.state))
&& checkLeader(outofelection, n.leader, n.epoch)) {
@@ -810,15 +815,20 @@ public class FastLeaderElection implemen
self.setPeerState((n.leader == self.getId()) ?
ServerState.LEADING: learningState());
}
- leaveInstance();
- return new Vote(n.leader, n.zxid);
+ Vote endVote = new Vote(n.leader, n.zxid);
+ leaveInstance(endVote);
+ return endVote;
}
-
+ break;
+ default:
+ LOG.warn("Notification state unrecoginized: " + n.state
+ + " (n.state), " + n.sid + " (n.sid)");
break;
}
+ } else {
+ LOG.warn("Ignoring notification from non-cluster member " + n.sid);
}
}
-
return null;
} finally {
try {
Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Follower.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Follower.java?rev=1097865&r1=1097864&r2=1097865&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Follower.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Follower.java Fri Apr 29 16:12:44 2011
@@ -58,9 +58,14 @@ public class Follower extends Learner{
* @throws InterruptedException
*/
void followLeader() throws InterruptedException {
+ self.end_fle = System.currentTimeMillis();
+ LOG.info("FOLLOWING - LEADER ELECTION TOOK - " +
+ (self.end_fle - self.start_fle));
+ self.start_fle = 0;
+ self.end_fle = 0;
fzk.registerJMX(new FollowerBean(this, zk), self.jmxLocalPeerBean);
- try {
- InetSocketAddress addr = findLeader();
+ try {
+ InetSocketAddress addr = findLeader();
try {
connectToLeader(addr);
long newLeaderZxid = registerWithLeader(Leader.FOLLOWERINFO);
Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Leader.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Leader.java?rev=1097865&r1=1097864&r2=1097865&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Leader.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Leader.java Fri Apr 29 16:12:44 2011
@@ -270,6 +270,12 @@ public class Leader {
* @throws InterruptedException
*/
void lead() throws IOException, InterruptedException {
+ self.end_fle = System.currentTimeMillis();
+ LOG.info("LEADING - LEADER ELECTION TOOK - " +
+ (self.end_fle - self.start_fle));
+ self.start_fle = 0;
+ self.end_fle = 0;
+
zk.registerJMX(new LeaderBean(this, zk), self.jmxLocalPeerBean);
try {
Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java?rev=1097865&r1=1097864&r2=1097865&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java Fri Apr 29 16:12:44 2011
@@ -32,6 +32,7 @@ import java.util.Enumeration;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
+import java.util.NoSuchElementException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.Date;
@@ -64,8 +65,11 @@ public class QuorumCnxManager {
/*
* Maximum capacity of thread queues
*/
+ static final int RECV_CAPACITY = 100;
+ // Initialized to 1 to prevent sending
+ // stale notifications to peers
+ static final int SEND_CAPACITY = 1;
- static final int CAPACITY = 100;
static final int PACKETMAXSIZE = 1024 * 1024;
/*
* Maximum number of attempts to connect to a peer
@@ -101,6 +105,10 @@ public class QuorumCnxManager {
* Reception queue
*/
public final ArrayBlockingQueue<Message> recvQueue;
+ /*
+ * Object to synchronize access to recvQueue
+ */
+ private final Object recvQLock = new Object();
/*
* Shutdown flag
@@ -129,7 +137,7 @@ public class QuorumCnxManager {
}
public QuorumCnxManager(QuorumPeer self) {
- this.recvQueue = new ArrayBlockingQueue<Message>(CAPACITY);
+ this.recvQueue = new ArrayBlockingQueue<Message>(RECV_CAPACITY);
this.queueSendMap = new ConcurrentHashMap<Long, ArrayBlockingQueue<ByteBuffer>>();
this.senderWorkerMap = new ConcurrentHashMap<Long, SendWorker>();
this.lastMessageSent = new ConcurrentHashMap<Long, ByteBuffer>();
@@ -196,7 +204,7 @@ public class QuorumCnxManager {
senderWorkerMap.put(sid, sw);
if (!queueSendMap.containsKey(sid)) {
queueSendMap.put(sid, new ArrayBlockingQueue<ByteBuffer>(
- CAPACITY));
+ SEND_CAPACITY));
}
sw.start();
@@ -273,7 +281,7 @@ public class QuorumCnxManager {
if (!queueSendMap.containsKey(sid)) {
queueSendMap.put(sid, new ArrayBlockingQueue<ByteBuffer>(
- CAPACITY));
+ SEND_CAPACITY));
}
sw.start();
@@ -293,44 +301,31 @@ public class QuorumCnxManager {
* If sending message to myself, then simply enqueue it (loopback).
*/
if (self.getId() == sid) {
- try {
- b.position(0);
- recvQueue.put(new Message(b.duplicate(), sid));
- } catch (InterruptedException e) {
- LOG.warn("Exception when loopbacking", e);
- }
+ b.position(0);
+ addToRecvQueue(new Message(b.duplicate(), sid));
/*
* Otherwise send to the corresponding thread to send.
*/
} else {
- try {
- /*
- * Start a new connection if doesn't have one already.
- */
- if (!queueSendMap.containsKey(sid)) {
- ArrayBlockingQueue<ByteBuffer> bq = new ArrayBlockingQueue<ByteBuffer>(
- CAPACITY);
- queueSendMap.put(sid, bq);
- bq.put(b);
-
- } else {
- ArrayBlockingQueue<ByteBuffer> bq = queueSendMap.get(sid);
- if(bq != null){
- if (bq.remainingCapacity() == 0) {
- bq.take();
- }
- bq.put(b);
- } else {
- LOG.error("No queue for server " + sid);
- }
- }
-
- connectOne(sid);
+ /*
+ * Start a new connection if doesn't have one already.
+ */
+ if (!queueSendMap.containsKey(sid)) {
+ ArrayBlockingQueue<ByteBuffer> bq = new ArrayBlockingQueue<ByteBuffer>(
+ SEND_CAPACITY);
+ queueSendMap.put(sid, bq);
+ addToSendQueue(bq, b);
+
+ } else {
+ ArrayBlockingQueue<ByteBuffer> bq = queueSendMap.get(sid);
+ if(bq != null){
+ addToSendQueue(bq, b);
+ } else {
+ LOG.error("No queue for server " + sid);
+ }
+ }
+ connectOne(sid);
- } catch (InterruptedException e) {
- LOG.warn("Interrupted while waiting to put message in queue.",
- e);
- }
}
}
@@ -634,9 +629,26 @@ public class QuorumCnxManager {
public void run() {
threadCnt.incrementAndGet();
try {
- ByteBuffer b = lastMessageSent.get(sid);
- if (b != null) {
- send(b);
+ /**
+ * If there is nothing in the queue to send, then we
+ * send the lastMessage to ensure that the last message
+ * was received by the peer. The message could be dropped
+ * in case self or the peer shutdown their connection
+ * (and exit the thread) prior to reading/processing
+ * the last message. Duplicate messages are handled correctly
+ * by the peer.
+ *
+ * If the send queue is non-empty, then we have a recent
+ * message than that stored in lastMessage. To avoid sending
+ * stale message, we should send the message in the send queue.
+ */
+ ArrayBlockingQueue<ByteBuffer> bq = queueSendMap.get(sid);
+ if (bq == null || isSendQueueEmpty(bq)) {
+ ByteBuffer b = lastMessageSent.get(sid);
+ if (b != null) {
+ LOG.debug("Attempting to send lastMessage to sid=" + sid);
+ send(b);
+ }
}
} catch (IOException e) {
LOG.error("Failed to send last message. Shutting down thread.", e);
@@ -651,7 +663,7 @@ public class QuorumCnxManager {
ArrayBlockingQueue<ByteBuffer> bq = queueSendMap
.get(sid);
if (bq != null) {
- b = bq.poll(1000, TimeUnit.MILLISECONDS);
+ b = pollSendQueue(bq, 1000, TimeUnit.MILLISECONDS);
} else {
LOG.error("No queue of incoming messages for " +
"server " + sid);
@@ -742,7 +754,7 @@ public class QuorumCnxManager {
byte[] msgArray = new byte[length];
din.readFully(msgArray, 0, length);
ByteBuffer message = ByteBuffer.wrap(msgArray);
- recvQueue.put(new Message(message.duplicate(), sid));
+ addToRecvQueue(new Message(message.duplicate(), sid));
}
} catch (Exception e) {
LOG.warn("Connection broken for id " + sid + ", my id = " +
@@ -756,4 +768,116 @@ public class QuorumCnxManager {
}
}
}
+
+ /**
+ * Inserts an element in the specified queue. If the Queue is full, this
+ * method removes an element from the head of the Queue and then inserts
+ * the element at the tail. It can happen that the an element is removed
+ * by another thread in {@link SendWorker#processMessage() processMessage}
+ * method before this method attempts to remove an element from the queue.
+ * This will cause {@link ArrayBlockingQueue#remove() remove} to throw an
+ * exception, which is safe to ignore.
+ *
+ * Unlike {@link #addToRecvQueue(Message) addToRecvQueue} this method does
+ * not need to be synchronized since there is only one thread that inserts
+ * an element in the queue and another thread that reads from the queue.
+ *
+ * @param queue
+ * Reference to the Queue
+ * @param buffer
+ * Reference to the buffer to be inserted in the queue
+ */
+ private void addToSendQueue(ArrayBlockingQueue<ByteBuffer> queue,
+ ByteBuffer buffer) {
+ if (queue.remainingCapacity() == 0) {
+ try {
+ queue.remove();
+ } catch (NoSuchElementException ne) {
+ // element could be removed by poll()
+ LOG.debug("Trying to remove from an empty " +
+ "Queue. Ignoring exception " + ne);
+ }
+ }
+ try {
+ queue.add(buffer);
+ } catch (IllegalStateException ie) {
+ // This should never happen
+ LOG.error("Unable to insert an element in the queue " + ie);
+ }
+ }
+
+ /**
+ * Returns true if queue is empty.
+ * @param queue
+ * Reference to the queue
+ * @return
+ * true if the specified queue is empty
+ */
+ private boolean isSendQueueEmpty(ArrayBlockingQueue<ByteBuffer> queue) {
+ return queue.isEmpty();
+ }
+
+ /**
+ * Retrieves and removes buffer at the head of this queue,
+ * waiting up to the specified wait time if necessary for an element to
+ * become available.
+ *
+ * {@link ArrayBlockingQueue#poll(long, java.util.concurrent.TimeUnit)}
+ */
+ private ByteBuffer pollSendQueue(ArrayBlockingQueue<ByteBuffer> queue,
+ long timeout, TimeUnit unit) throws InterruptedException {
+ return queue.poll(timeout, unit);
+ }
+
+ /**
+ * Inserts an element in the {@link #recvQueue}. If the Queue is full, this
+ * methods removes an element from the head of the Queue and then inserts
+ * the element at the tail of the queue.
+ *
+ * This method is synchronized to achieve fairness between two threads that
+ * are trying to insert an element in the queue. Each thread checks if the
+ * queue is full, then removes the element at the head of the queue, and
+ * then inserts an element at the tail. This three-step process is done to
+ * prevent a thread from blocking while inserting an element in the queue.
+ * If we do not synchronize the call to this method, then a thread can grab
+ * a slot in the queue created by the second thread. This can cause the call
+ * to insert by the second thread to fail.
+ * Note that synchronizing this method does not block another thread
+ * from polling the queue since that synchronization is provided by the
+ * queue itself.
+ *
+ * @param msg
+ * Reference to the message to be inserted in the queue
+ */
+ public void addToRecvQueue(Message msg) {
+ synchronized(recvQLock) {
+ if (recvQueue.remainingCapacity() == 0) {
+ try {
+ recvQueue.remove();
+ } catch (NoSuchElementException ne) {
+ // element could be removed by poll()
+ LOG.debug("Trying to remove from an empty " +
+ "recvQueue. Ignoring exception " + ne);
+ }
+ }
+ try {
+ recvQueue.add(msg);
+ } catch (IllegalStateException ie) {
+ // This should never happen
+ LOG.error("Unable to insert element in the recvQueue " + ie);
+ }
+ }
+ }
+
+ /**
+ * Retrieves and removes a message at the head of this queue,
+ * waiting up to the specified wait time if necessary for an element to
+ * become available.
+ *
+ * {@link ArrayBlockingQueue#poll(long, java.util.concurrent.TimeUnit)}
+ */
+ public Message pollRecvQueue(long timeout, TimeUnit unit)
+ throws InterruptedException {
+ return recvQueue.poll(timeout, unit);
+ }
}
Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java?rev=1097865&r1=1097864&r2=1097865&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java Fri Apr 29 16:12:44 2011
@@ -139,6 +139,11 @@ public class QuorumPeer extends Thread i
*/
static final long OBSERVER_ID = Long.MAX_VALUE;
+
+ /*
+ * Record leader election time
+ */
+ public long start_fle, end_fle;
/*
* Default value of peer is participant
@@ -573,7 +578,8 @@ public class QuorumPeer extends Thread i
@Override
public void run() {
- setName("QuorumPeer:" + cnxnFactory.getLocalAddress());
+ setName("QuorumPeer" + "[myid=" + getId() + "]" +
+ cnxnFactory.getLocalAddress());
LOG.debug("Starting quorum peer");
try {
Modified: zookeeper/trunk/src/java/test/org/apache/zookeeper/test/CnxManagerTest.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/CnxManagerTest.java?rev=1097865&r1=1097864&r2=1097865&view=diff
==============================================================================
--- zookeeper/trunk/src/java/test/org/apache/zookeeper/test/CnxManagerTest.java (original)
+++ zookeeper/trunk/src/java/test/org/apache/zookeeper/test/CnxManagerTest.java Fri Apr 29 16:12:44 2011
@@ -112,7 +112,7 @@ public class CnxManagerTest extends ZKTe
Message m = null;
int numRetries = 1;
while((m == null) && (numRetries++ <= THRESHOLD)){
- m = cnxManager.recvQueue.poll(3000, TimeUnit.MILLISECONDS);
+ m = cnxManager.pollRecvQueue(3000, TimeUnit.MILLISECONDS);
if(m == null) cnxManager.connectAll();
}
@@ -123,7 +123,7 @@ public class CnxManagerTest extends ZKTe
cnxManager.testInitiateConnection(sid);
- m = cnxManager.recvQueue.poll(3000, TimeUnit.MILLISECONDS);
+ m = cnxManager.pollRecvQueue(3000, TimeUnit.MILLISECONDS);
if(m == null){
failed = true;
return;
@@ -155,7 +155,7 @@ public class CnxManagerTest extends ZKTe
Message m = null;
int numRetries = 1;
while((m == null) && (numRetries++ <= THRESHOLD)){
- m = cnxManager.recvQueue.poll(3000, TimeUnit.MILLISECONDS);
+ m = cnxManager.pollRecvQueue(3000, TimeUnit.MILLISECONDS);
if(m == null) cnxManager.connectAll();
}
Modified: zookeeper/trunk/src/java/test/org/apache/zookeeper/test/FLETest.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/FLETest.java?rev=1097865&r1=1097864&r2=1097865&view=diff
==============================================================================
--- zookeeper/trunk/src/java/test/org/apache/zookeeper/test/FLETest.java (original)
+++ zookeeper/trunk/src/java/test/org/apache/zookeeper/test/FLETest.java Fri Apr 29 16:12:44 2011
@@ -313,4 +313,94 @@ public class FLETest extends ZKTestCase
Assert.fail("Leader hasn't joined: " + leader);
}
}
+
+ /*
+ * Class to verify of the thread has become a follower
+ */
+ class VerifyState extends Thread {
+ volatile private boolean success = false;
+ QuorumPeer peer;
+ public VerifyState(QuorumPeer peer) {
+ this.peer = peer;
+ }
+ public void run() {
+ setName("VerifyState-" + peer.getId());
+ while (true) {
+ if(peer.getPeerState() == ServerState.FOLLOWING) {
+ LOG.info("I am following");
+ success = true;
+ break;
+ } else if (peer.getPeerState() == ServerState.LEADING) {
+ LOG.info("I am leading");
+ success = false;
+ break;
+ }
+ try {
+ Thread.sleep(250);
+ } catch (Exception e) {
+ LOG.warn("Sleep failed ", e);
+ }
+ }
+ }
+ public boolean isSuccess() {
+ return success;
+ }
+ }
+
+ /*
+ * For ZOOKEEPER-975 verify that a peer joining an established cluster
+ * does not go in LEADING state.
+ */
+ @Test
+ public void testJoin() throws Exception {
+ int sid;
+ QuorumPeer peer;
+ int waitTime = 10 * 1000;
+ ArrayList<QuorumPeer> peerList = new ArrayList<QuorumPeer>();
+ for(sid = 0; sid < 3; sid++) {
+ peers.put(Long.valueOf(sid),
+ new QuorumServer(sid,
+ new InetSocketAddress(PortAssignment.unique()),
+ new InetSocketAddress(PortAssignment.unique())));
+ tmpdir[sid] = ClientBase.createTmpDir();
+ port[sid] = PortAssignment.unique();
+ }
+ // start 2 peers and verify if they form the cluster
+ for (sid = 0; sid < 2; sid++) {
+ peer = new QuorumPeer(peers, tmpdir[sid], tmpdir[sid],
+ port[sid], 3, sid, 2000, 2, 2);
+ LOG.info("Starting peer " + peer.getId());
+ peer.start();
+ peerList.add(sid, peer);
+ }
+ peer = peerList.get(0);
+ VerifyState v1 = new VerifyState(peerList.get(0));
+ v1.start();
+ v1.join(waitTime);
+ Assert.assertFalse("Unable to form cluster in " +
+ waitTime + " ms",
+ !v1.isSuccess());
+ // Start 3rd peer and check if it goes in LEADING state
+ peer = new QuorumPeer(peers, tmpdir[sid], tmpdir[sid],
+ port[sid], 3, sid, 2000, 2, 2);
+ LOG.info("Starting peer " + peer.getId());
+ peer.start();
+ peerList.add(sid, peer);
+ v1 = new VerifyState(peer);
+ v1.start();
+ v1.join(waitTime);
+ if (v1.isAlive()) {
+ Assert.fail("Peer " + peer.getId() + " failed to join the cluster " +
+ "within " + waitTime + " ms");
+ } else if (!v1.isSuccess()) {
+ Assert.fail("Incorrect LEADING state for peer " + peer.getId());
+ }
+ // cleanup
+ for (int id = 0; id < 3; id++) {
+ peer = peerList.get(id);
+ if (peer != null) {
+ peer.shutdown();
+ }
+ }
+ }
}