You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by ma...@apache.org on 2008/08/25 23:13:01 UTC
svn commit: r688885 - in /hadoop/zookeeper/trunk/src/java: ./
main/org/apache/zookeeper/server/quorum/
Author: mahadev
Date: Mon Aug 25 14:13:01 2008
New Revision: 688885
URL: http://svn.apache.org/viewvc?rev=688885&view=rev
Log:
ZOOKEEPER-2. Fix synchronization issues in QuorumPeer and FastLeader election. (Flavio Paiva Junqueira via mahadev)
Modified:
hadoop/zookeeper/trunk/src/java/Changes.txt
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/AuthFastLeaderElection.java
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FastLeaderElection.java
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Follower.java
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LeaderElection.java
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java
Modified: hadoop/zookeeper/trunk/src/java/Changes.txt
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/Changes.txt?rev=688885&r1=688884&r2=688885&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/Changes.txt (original)
+++ hadoop/zookeeper/trunk/src/java/Changes.txt Mon Aug 25 14:13:01 2008
@@ -18,3 +18,6 @@
ZOOKEEPER-123. Fix the wrong class is specified for the logger. (Jakob Homan
via mahadev)
+
+ ZOOKEEPER-2. Fix synchronization issues in QuorumPeer and FastLeader
+ election. (Flavio Paiva Junqueira via mahadev)
Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/AuthFastLeaderElection.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/AuthFastLeaderElection.java?rev=688885&r1=688884&r2=688885&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/AuthFastLeaderElection.java (original)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/AuthFastLeaderElection.java Mon Aug 25 14:13:01 2008
@@ -264,11 +264,13 @@
break;
}
+ Vote current = self.getCurrentVote();
+
switch (type) {
case 0:
// Receive challenge request
ToSend c = new ToSend(ToSend.mType.challenge, tag,
- self.currentVote.id, self.currentVote.zxid,
+ current.id, current.zxid,
logicalclock, self.getPeerState(),
(InetSocketAddress) responsePacket
.getSocketAddress());
@@ -309,8 +311,8 @@
recvqueue.offer(n);
ToSend a = new ToSend(ToSend.mType.ack,
- tag, self.currentVote.id,
- self.currentVote.zxid,
+ tag, current.id,
+ current.zxid,
logicalclock, self.getPeerState(),
(InetSocketAddress) addr);
@@ -328,7 +330,7 @@
recvqueue.offer(n);
ToSend a = new ToSend(ToSend.mType.ack, tag,
- self.currentVote.id, self.currentVote.zxid,
+ current.id, current.zxid,
logicalclock, self.getPeerState(),
(InetSocketAddress) responsePacket
.getSocketAddress());
@@ -685,7 +687,7 @@
QuorumPeer self;
int port;
- long logicalclock; /* Election instance */
+ volatile long logicalclock; /* Election instance */
DatagramSocket mySocket;
long proposedLeader;
long proposedZxid;
Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FastLeaderElection.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FastLeaderElection.java?rev=688885&r1=688884&r2=688885&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FastLeaderElection.java (original)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FastLeaderElection.java Mon Aug 25 14:13:01 2008
@@ -49,26 +49,36 @@
/* Sequence numbers for messages */
static int sequencer = 0;
- /*
+ /**
* Determine how much time a process has to wait
* once it believes that it has reached the end of
* leader election.
*/
static int finalizeWait = 100;
- /*
+ /**
* Challenge counter to avoid replay attacks
*/
static int challengeCounter = 0;
- /*
- * Connection manager
+ /**
+ * Connection manager. Fast leader election uses TCP for
+ * communication between peers, and QuorumCnxManager manages
+ * such connections.
*/
QuorumCnxManager manager;
+
+ /**
+ * Notifications are messages that let other peers know that
+ * a given peer has changed its vote, either because it has
+ * joined leader election or because it learned of another
+ * peer with higher zxid or same zxid and higher server id
+ */
+
static public class Notification {
/*
* Proposed leader
@@ -96,8 +106,10 @@
InetAddress addr;
}
- /*
- * Messages to send, both Notifications and Acks
+ /**
+ * Messages that a peer wants to send to other peers.
+ * These messages can be both Notifications and Acks
+ * of reception of notification.
*/
static public class ToSend {
static enum mType {crequest, challenge, notification, ack};
@@ -145,12 +157,24 @@
LinkedBlockingQueue<ToSend> sendqueue;
LinkedBlockingQueue<Notification> recvqueue;
+ /**
+ * Multi-threaded implementation of message handler. Messenger
+ * implements two sub-classes: WorkReceiver and WorkSender. The
+ * functionality of each is obvious from the name. Each of these
+ * spawns a new thread.
+ */
+
private class Messenger {
long lastProposedLeader;
long lastProposedZxid;
long lastEpoch;
+ /**
+ * Receives messages from instance of QuorumCnxManager on
+ * method run(), and processes such messages.
+ */
+
class WorkerReceiver implements Runnable {
QuorumCnxManager manager;
@@ -175,7 +199,7 @@
}
response.buffer.clear();
-
+ // State of peer that sent this message
QuorumPeer.ServerState ackstate = QuorumPeer.ServerState.LOOKING;
switch (response.buffer.getInt()) {
case 0:
@@ -189,6 +213,7 @@
break;
}
+ // Instantiate Notification and set its attributes
Notification n = new Notification();
n.leader = response.buffer.getLong();
n.zxid = response.buffer.getLong();
@@ -196,6 +221,12 @@
n.state = ackstate;
n.addr = response.addr;
+ /*
+ * Accept the values of this notification
+ * if we are at right epoch and the new notification
+ * contains a vote that succeeds our current vote
+ * in our order of votes.
+ */
if ((messenger.lastEpoch <= n.epoch)
&& ((n.zxid > messenger.lastProposedZxid)
|| ((n.zxid == messenger.lastProposedZxid)
@@ -205,10 +236,18 @@
messenger.lastEpoch = n.epoch;
}
- //InetAddress addr = (InetAddress) responsePacket.getSocketAddress();
+ /*
+ * If this server is looking, then send proposed leader
+ */
+
if(self.getPeerState() == QuorumPeer.ServerState.LOOKING){
recvqueue.offer(n);
- if(recvqueue.size() == 0) LOG.warn("Message: " + n.addr);
+ if(recvqueue.size() == 0) LOG.debug("Message: " + n.addr);
+ /*
+ * Send a notification back if the peer that sent this
+ * message is also looking and its logical clock is
+ * lagging behind.
+ */
if((ackstate == QuorumPeer.ServerState.LOOKING)
&& (n.epoch < logicalclock)){
ToSend notmsg = new ToSend(ToSend.mType.notification,
@@ -219,16 +258,21 @@
response.addr);
sendqueue.offer(notmsg);
}
- } else {
- if((ackstate == QuorumPeer.ServerState.LOOKING) &&
- (self.getPeerState() != QuorumPeer.ServerState.LOOKING)){
- ToSend notmsg = new ToSend(ToSend.mType.notification,
- self.currentVote.id,
- self.currentVote.zxid,
- logicalclock,
- self.getPeerState(),
- response.addr);
- sendqueue.offer(notmsg);
+ } else {
+ /*
+ * If this server is not looking, but the one that sent the ack
+ * is looking, then send back what it believes to be the leader.
+ */
+ Vote current = self.getCurrentVote();
+ if(ackstate == QuorumPeer.ServerState.LOOKING){
+
+ ToSend notmsg = new ToSend(ToSend.mType.notification,
+ current.id,
+ current.zxid,
+ logicalclock,
+ self.getPeerState(),
+ response.addr);
+ sendqueue.offer(notmsg);
}
}
@@ -240,6 +284,12 @@
}
}
+
+ /**
+ * This worker simply dequeues a message to send and
+ * and queues it on the manager's queue.
+ */
+
class WorkerSender implements Runnable {
QuorumCnxManager manager;
@@ -260,6 +310,11 @@
}
}
+ /**
+ * Called by run() once there is a new message to send.
+ *
+ * @param m message to send
+ */
private void process(ToSend m) {
byte requestBytes[] = new byte[28];
ByteBuffer requestBuffer = ByteBuffer.wrap(requestBytes);
@@ -279,10 +334,18 @@
}
}
+ /**
+ * Test if both send and receive queues are empty.
+ */
public boolean queueEmpty() {
return (sendqueue.isEmpty() || recvqueue.isEmpty());
}
+ /**
+ * Constructor of class Messenger.
+ *
+ * @param manager Connection manager
+ */
Messenger(QuorumCnxManager manager) {
lastProposedLeader = 0;
lastProposedZxid = 0;
@@ -303,17 +366,36 @@
QuorumPeer self;
int port;
- long logicalclock; /* Election instance */
+ volatile long logicalclock; /* Election instance */
Messenger messenger;
long proposedLeader;
long proposedZxid;
-
+
+ /**
+ * Constructor of FastLeaderElection. It takes two parameters, one
+ * is the QuorumPeer object that instantiated this object, and the other
+ * is the connection manager. Such an object should be created only once
+ * by each peer during an instance of the ZooKeeper service.
+ *
+ * @param self QuorumPeer that created this object
+ * @param manager Connection manager
+ */
public FastLeaderElection(QuorumPeer self, QuorumCnxManager manager){
this.manager = manager;
starter(self, manager);
}
+ /**
+ * This method is invoked by the constructor. Because it is a
+ * part of the starting procedure of the object that must be on
+ * any constructor of this class, it is probably best to keep as
+ * a separate method. As we have a single constructor currently,
+ * it is not strictly necessary to have it separate.
+ *
+ * @param self QuorumPeer that created this object
+ * @param manager Connection manager
+ */
private void starter(QuorumPeer self, QuorumCnxManager manager) {
this.self = self;
proposedLeader = -1;
@@ -328,6 +410,7 @@
recvqueue.clear();
}
+
public static class ElectionResult {
public Vote vote;
@@ -338,6 +421,9 @@
public int winningCount;
}
+ /**
+ * Send notifications to all peers upon a change in our vote
+ */
private void sendNotifications() {
for (QuorumServer server : self.quorumPeers) {
InetAddress saddr = server.addr.getAddress();
@@ -353,6 +439,13 @@
}
}
+ /**
+ * Check if a pair (server id, zxid) succeeds our
+ * current vote.
+ *
+ * @param id Server identifier
+ * @param zxid Last zxid observed by the issuer of this vote
+ */
private boolean totalOrderPredicate(long id, long zxid) {
if ((zxid > proposedZxid)
|| ((zxid == proposedZxid) && (id > proposedLeader)))
@@ -362,6 +455,14 @@
}
+ /**
+ * Termination predicate. Given a set of votes, determines if
+ * have sufficient to declare the end of the election round.
+ *
+ * @param votes Set of votes
+ * @param l Identifier of the vote received last
+ * @param zxid zxid of the the vote received last
+ */
private boolean termPredicate(
HashMap<InetAddress, Vote> votes, long l,
long zxid) {
@@ -384,6 +485,11 @@
}
+ /**
+ * Starts a new round of leader election. Whenever our QuorumPeer
+ * changes its state to LOOKING, this method is invoked, and it
+ * sends notifications to al other peers.
+ */
public Vote lookForLeader() throws InterruptedException {
HashMap<InetAddress, Vote> recvset = new HashMap<InetAddress, Vote>();
@@ -394,7 +500,7 @@
proposedLeader = self.getId();
proposedZxid = self.getLastLoggedZxid();
- LOG.warn("Election tally: " + proposedZxid);
+ LOG.warn("New election: " + proposedZxid);
sendNotifications();
/*
@@ -449,7 +555,7 @@
} else if (termPredicate(recvset, proposedLeader, proposedZxid)) {
//Otherwise, wait for a fixed amount of time
- LOG.warn("Passed predicate");
+ LOG.debug("Passed predicate");
Thread.sleep(finalizeWait);
// Verify if there is any change in the proposed leader
Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Follower.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Follower.java?rev=688885&r1=688884&r2=688885&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Follower.java (original)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Follower.java Mon Aug 25 14:13:01 2008
@@ -114,15 +114,16 @@
void followLeader() throws InterruptedException {
InetSocketAddress addr = null;
// Find the leader by id
+ Vote current = self.getCurrentVote();
for (QuorumServer s : self.quorumPeers) {
- if (s.id == self.currentVote.id) {
+ if (s.id == current.id) {
addr = s.addr;
break;
}
}
if (addr == null) {
LOG.warn("Couldn't find the leader with id = "
- + self.currentVote.id);
+ + current.id);
}
LOG.info("Following " + addr);
sock = new Socket();
Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LeaderElection.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LeaderElection.java?rev=688885&r1=688884&r2=688885&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LeaderElection.java (original)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LeaderElection.java Mon Aug 25 14:13:01 2008
@@ -100,7 +100,7 @@
}
public Vote lookForLeader() throws InterruptedException {
- self.currentVote = new Vote(self.getId(), self.getLastLoggedZxid());
+ self.setCurrentVote(new Vote(self.getId(), self.getLastLoggedZxid()));
// We are going to look for a leader by casting a vote for ourself
byte requestBytes[] = new byte[4];
ByteBuffer requestBuffer = ByteBuffer.wrap(requestBytes);
@@ -161,16 +161,17 @@
}
ElectionResult result = countVotes(votes);
if (result.winner.id >= 0) {
- self.currentVote = result.vote;
+ self.setCurrentVote(result.vote);
if (result.winningCount > (self.quorumPeers.size() / 2)) {
- self.currentVote = result.winner;
+ self.setCurrentVote(result.winner);
s.close();
- self.setPeerState((self.currentVote.id == self.getId())
+ Vote current = self.getCurrentVote();
+ self.setPeerState((current.id == self.getId())
? ServerState.LEADING: ServerState.FOLLOWING);
if (self.getPeerState() == ServerState.FOLLOWING) {
Thread.sleep(100);
}
- return self.currentVote;
+ return current;
}
}
Thread.sleep(1000);
Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java?rev=688885&r1=688884&r2=688885&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java (original)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java Mon Aug 25 14:13:01 2008
@@ -493,6 +493,8 @@
this.addr = channel.socket().getInetAddress();
this.channel = channel;
recvWorker = null;
+
+ LOG.debug("Address of remote peer: " + this.addr);
}
void setRecv(RecvWorker recvWorker) {
Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java?rev=688885&r1=688884&r2=688885&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java (original)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java Mon Aug 25 14:13:01 2008
@@ -111,7 +111,15 @@
/**
* This is who I think the leader currently is.
*/
- volatile Vote currentVote;
+ volatile private Vote currentVote;
+
+ public synchronized Vote getCurrentVote(){
+ return currentVote;
+ }
+
+ public synchronized void setCurrentVote(Vote v){
+ currentVote = v;
+ }
volatile boolean running = true;
@@ -167,10 +175,11 @@
responseBuffer.clear();
responseBuffer.getInt(); // Skip the xid
responseBuffer.putLong(myid);
- switch (state) {
+ Vote current = getCurrentVote();
+ switch (getPeerState()) {
case LOOKING:
- responseBuffer.putLong(currentVote.id);
- responseBuffer.putLong(currentVote.zxid);
+ responseBuffer.putLong(current.id);
+ responseBuffer.putLong(current.zxid);
break;
case LEADING:
responseBuffer.putLong(myid);
@@ -182,7 +191,7 @@
}
break;
case FOLLOWING:
- responseBuffer.putLong(currentVote.id);
+ responseBuffer.putLong(current.id);
try {
responseBuffer.putLong(follower.getZxid());
} catch (NullPointerException npe) {
@@ -205,11 +214,11 @@
private ServerState state = ServerState.LOOKING;
- public void setPeerState(ServerState newState){
+ public synchronized void setPeerState(ServerState newState){
state=newState;
}
- public ServerState getPeerState(){
+ public synchronized ServerState getPeerState(){
return state;
}
@@ -364,14 +373,14 @@
* Main loop
*/
while (running) {
- switch (state) {
+ switch (getPeerState()) {
case LOOKING:
try {
LOG.info("LOOKING");
- currentVote = makeLEStrategy().lookForLeader();
+ setCurrentVote(makeLEStrategy().lookForLeader());
} catch (Exception e) {
LOG.warn("Unexpected exception",e);
- state = ServerState.LOOKING;
+ setPeerState(ServerState.LOOKING);
}
break;
case FOLLOWING:
@@ -384,7 +393,7 @@
} finally {
follower.shutdown();
setFollower(null);
- state = ServerState.LOOKING;
+ setPeerState(ServerState.LOOKING);
}
break;
case LEADING:
@@ -400,7 +409,7 @@
leader.shutdown("Forcing shutdown");
setLeader(null);
}
- state = ServerState.LOOKING;
+ setPeerState(ServerState.LOOKING);
}
break;
}
@@ -504,7 +513,7 @@
}
public String getServerState() {
- switch (state) {
+ switch (getPeerState()) {
case LOOKING:
return QuorumStats.Provider.LOOKING_STATE;
case LEADING: