You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by ma...@apache.org on 2008/12/12 21:21:30 UTC
svn commit: r726110 - in /hadoop/zookeeper/trunk: CHANGES.txt
src/java/main/org/apache/zookeeper/server/quorum/FastLeaderElection.java
src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java
Author: mahadev
Date: Fri Dec 12 12:21:30 2008
New Revision: 726110
URL: http://svn.apache.org/viewvc?rev=726110&view=rev
Log:
ZOOKEEPER-230. Improvements to FLE. (Flavio via mahadev)
Modified:
hadoop/zookeeper/trunk/CHANGES.txt
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FastLeaderElection.java
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java
Modified: hadoop/zookeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/CHANGES.txt?rev=726110&r1=726109&r2=726110&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/CHANGES.txt (original)
+++ hadoop/zookeeper/trunk/CHANGES.txt Fri Dec 12 12:21:30 2008
@@ -73,6 +73,7 @@
ZOOKEEPER-247. fix formatting of C API in ACL section of programmer guide.
(patrick hunt via mahadev)
+ ZOOKEEPER-230. Improvements to FLE. (Flavio via mahadev)
Release 3.0.0 - 2008-10-21
Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FastLeaderElection.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FastLeaderElection.java?rev=726110&r1=726109&r2=726110&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FastLeaderElection.java (original)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FastLeaderElection.java Fri Dec 12 12:21:30 2008
@@ -46,22 +46,21 @@
public class FastLeaderElection implements Election {
private static final Logger LOG = Logger.getLogger(FastLeaderElection.class);
- /* Sequence numbers for messages */
- static int sequencer = 0;
-
/**
* Determine how much time a process has to wait
* once it believes that it has reached the end of
* leader election.
*/
- static int finalizeWait = 200;
+ final static int finalizeWait = 200;
- /**
- * Challenge counter to avoid replay attacks
+
+ /**
+ * Upper bound on the amount of time between two consecutive
+ * notification checks. This impacts the amount of time to get
+ * the system up again after long partitions. Currently 60 seconds.
*/
- static int challengeCounter = 0;
-
+ final static int maxNotificationInterval = 60000;
/**
* Connection manager. Fast leader election uses TCP for
@@ -509,6 +508,8 @@
HashMap<Long, Vote> outofelection = new HashMap<Long, Vote>();
+ int notTimeout = finalizeWait;
+
synchronized(this){
logicalclock++;
updateProposal(self.getId(), self.getLastLoggedZxid());
@@ -526,7 +527,7 @@
* Remove next notification from queue, times out after 2 times
* the termination time
*/
- Notification n = recvqueue.poll(2*finalizeWait, TimeUnit.MILLISECONDS);
+ Notification n = recvqueue.poll(notTimeout, TimeUnit.MILLISECONDS);
/*
* Sends more notifications if haven't received enough.
@@ -535,89 +536,104 @@
if(n == null){
if(manager.haveDelivered()){
sendNotifications();
+ } else {
+ manager.connectAll();
}
+
+ /*
+ * Exponential backoff
+ */
+ int tmpTimeOut = notTimeout*2;
+ notTimeout = (tmpTimeOut < maxNotificationInterval? tmpTimeOut : maxNotificationInterval);
+ LOG.info("Notification time out: " + notTimeout);
}
- else switch (n.state) {
- case LOOKING:
- // If notification > current, replace and send messages out
- LOG.info("Notification: " + n.leader + ", " + n.zxid + ", " +
- n.epoch + ", " + self.getId() + ", " + self.getPeerState() +
- ", " + n.state + ", " + n.sid);
- if (n.epoch > logicalclock) {
- logicalclock = n.epoch;
- recvset.clear();
- updateProposal(self.getId(), self.getLastLoggedZxid());
- sendNotifications();
- } else if (n.epoch < logicalclock) {
- break;
- } else if (totalOrderPredicate(n.leader, n.zxid)) {
- updateProposal(n.leader, n.zxid);
- sendNotifications();
- }
+ else {
+ //notTimeout = finalizeWait;
+ switch (n.state) {
+ case LOOKING:
+ // If notification > current, replace and send messages out
+ LOG.info("Notification: " + n.leader + ", " + n.zxid + ", " +
+ n.epoch + ", " + self.getId() + ", " + self.getPeerState() +
+ ", " + n.state + ", " + n.sid);
+ if (n.epoch > logicalclock) {
+ logicalclock = n.epoch;
+ recvset.clear();
+ updateProposal(self.getId(), self.getLastLoggedZxid());
+ sendNotifications();
+ } else if (n.epoch < logicalclock) {
+ break;
+ } else if (totalOrderPredicate(n.leader, n.zxid)) {
+ updateProposal(n.leader, n.zxid);
+ sendNotifications();
+ }
- recvset.put(n.sid, new Vote(n.leader, n.zxid, n.epoch));
+ recvset.put(n.sid, new Vote(n.leader, n.zxid, n.epoch));
- //If have received from all nodes, then terminate
- if (self.quorumPeers.size() == recvset.size()) {
- self.setPeerState((proposedLeader == self.getId()) ?
- ServerState.LEADING: ServerState.FOLLOWING);
- leaveInstance();
- return new Vote(proposedLeader, proposedZxid);
-
- } else if (termPredicate(recvset, new Vote(proposedLeader, proposedZxid, logicalclock))) {
- //Otherwise, wait for a fixed amount of time
- LOG.debug("Passed predicate");
-
- // Verify if there is any change in the proposed leader
- while((n = recvqueue.poll(finalizeWait, TimeUnit.MILLISECONDS)) != null){
- if(totalOrderPredicate(n.leader, n.zxid)){
- recvqueue.put(n);
- break;
- }
- }
-
- if (n == null) {
+ //If have received from all nodes, then terminate
+ if (self.quorumPeers.size() == recvset.size()) {
self.setPeerState((proposedLeader == self.getId()) ?
ServerState.LEADING: ServerState.FOLLOWING);
- LOG.info("About to leave instance:" + proposedLeader + ", " + proposedZxid + ", " + self.getId() + ", " + self.getPeerState());
leaveInstance();
- return new Vote(proposedLeader,
+ return new Vote(proposedLeader, proposedZxid);
+
+ } else if (termPredicate(recvset, new Vote(proposedLeader, proposedZxid, logicalclock))) {
+ //Otherwise, wait for a fixed amount of time
+ LOG.debug("Passed predicate");
+
+ // Verify if there is any change in the proposed leader
+ while((n = recvqueue.poll(finalizeWait, TimeUnit.MILLISECONDS)) != null){
+ if(totalOrderPredicate(n.leader, n.zxid)){
+ recvqueue.put(n);
+ break;
+ }
+ }
+
+ if (n == null) {
+ self.setPeerState((proposedLeader == self.getId()) ?
+ ServerState.LEADING: ServerState.FOLLOWING);
+ LOG.info("About to leave instance:" + proposedLeader + ", " +
+ proposedZxid + ", " + self.getId() + ", " + self.getPeerState());
+ leaveInstance();
+ return new Vote(proposedLeader,
proposedZxid);
+ }
}
- }
- break;
- case LEADING:
- /*
- * There is at most one leader for each epoch, so if a peer claims to
- * be the leader for an epoch, then that peer must be the leader (no
- * arbitrary failures assumed). Now, if there is no quorum supporting
- * this leader, then processes will naturally move to a new epoch.
- */
- if(n.epoch == logicalclock){
- self.setPeerState((n.leader == self.getId()) ?
- ServerState.LEADING: ServerState.FOLLOWING);
+ break;
+ case LEADING:
+ /*
+ * There is at most one leader for each epoch, so if a peer claims to
+ * be the leader for an epoch, then that peer must be the leader (no
+ * arbitrary failures assumed). Now, if there is no quorum supporting
+ * this leader, then processes will naturally move to a new epoch.
+ */
+ if(n.epoch == logicalclock){
+ self.setPeerState((n.leader == self.getId()) ?
+ ServerState.LEADING: ServerState.FOLLOWING);
- leaveInstance();
- return new Vote(n.leader, n.zxid);
- }
- case FOLLOWING:
- LOG.info("Notification: " + n.leader + ", " + n.zxid + ", " + n.epoch + ", " + self.getId() + ", " + self.getPeerState() + ", " + n.state + ", " + n.sid);
+ leaveInstance();
+ return new Vote(n.leader, n.zxid);
+ }
+ case FOLLOWING:
+ LOG.info("Notification: " + n.leader + ", " + n.zxid +
+ ", " + n.epoch + ", " + self.getId() + ", " +
+ self.getPeerState() + ", " + n.state + ", " + n.sid);
- outofelection.put(n.sid, new Vote(n.leader, n.zxid, n.epoch, n.state));
+ outofelection.put(n.sid, new Vote(n.leader, n.zxid, n.epoch, n.state));
- if (termPredicate(outofelection, new Vote(n.leader, n.zxid, n.epoch, n.state))
- && checkLeader(outofelection, n.leader, n.epoch)) {
- synchronized(this){
- logicalclock = n.epoch;
- self.setPeerState((n.leader == self.getId()) ?
- ServerState.LEADING: ServerState.FOLLOWING);
+ if (termPredicate(outofelection, new Vote(n.leader, n.zxid, n.epoch, n.state))
+ && checkLeader(outofelection, n.leader, n.epoch)) {
+ synchronized(this){
+ logicalclock = n.epoch;
+ self.setPeerState((n.leader == self.getId()) ?
+ ServerState.LEADING: ServerState.FOLLOWING);
+ }
+ leaveInstance();
+ return new Vote(n.leader, n.zxid);
}
- leaveInstance();
- return new Vote(n.leader, n.zxid);
+ break;
+ default:
+ break;
}
- break;
- default:
- break;
}
}
Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java?rev=726110&r1=726109&r2=726110&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java (original)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java Fri Dec 12 12:21:30 2008
@@ -25,10 +25,10 @@
import java.nio.ByteBuffer;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
-import java.util.HashMap;
import java.util.Random;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.Enumeration;
import org.apache.log4j.Logger;
@@ -120,7 +120,6 @@
}
public QuorumCnxManager(QuorumPeer self) {
- this.port = port;
this.recvQueue = new ArrayBlockingQueue<Message>(CAPACITY);
this.queueSendMap = new ConcurrentHashMap<Long, ArrayBlockingQueue<ByteBuffer>>();
this.senderWorkerMap = new ConcurrentHashMap<Long, SendWorker>();
@@ -153,11 +152,7 @@
*/
boolean initiateConnection(SocketChannel s, Long sid) {
- boolean challenged = true;
- boolean wins = false;
- long newChallenge;
-
- try {
+ try {
// Sending id and challenge
byte[] msgBytes = new byte[8];
ByteBuffer msgBuffer = ByteBuffer.wrap(msgBytes);
@@ -173,11 +168,11 @@
// If lost the challenge, then drop the new connection
if (sid > self.getId()) {
try {
- LOG.warn("Have smaller server identifier, so dropping the connection: (" +
- sid + ", " + self.getId());
+ LOG.info("Have smaller server identifier, so dropping the connection: (" +
+ sid + ", " + self.getId() + ")");
s.socket().close();
} catch (IOException e) {
- LOG.warn("Error when closing socket or trying to reopen connection: "
+ LOG.error("Error when closing socket or trying to reopen connection: "
+ e.toString());
}
@@ -220,9 +215,6 @@
*
*/
boolean receiveConnection(SocketChannel s) {
- boolean challenged = true;
- boolean wins = false;
- long newChallenge;
Long sid = null;
try {
@@ -236,7 +228,7 @@
// Read server id
sid = Long.valueOf(msgBuffer.getLong());
} catch (IOException e) {
- LOG.warn("Exception reading or writing challenge: "
+ LOG.info("Exception reading or writing challenge: "
+ e.toString());
return false;
}
@@ -246,7 +238,7 @@
try {
SendWorker sw = senderWorkerMap.get(sid);
- LOG.warn("Create new connection");
+ LOG.info("Create new connection to server: " + sid);
//sw.connect();
s.socket().close();
if(sw != null) sw.finish();
@@ -256,7 +248,7 @@
}
} catch (IOException e) {
- LOG.warn("Error when closing socket or trying to reopen connection: "
+ LOG.info("Error when closing socket or trying to reopen connection: "
+ e.toString());
}
//Otherwise start worker threads to receive data.
@@ -290,8 +282,8 @@
}
/**
- * Processes invoke this message to send a message. Currently, only leader
- * election uses it.
+ * Processes invoke this message to queue a message to send. Currently,
+ * only leader election uses it.
*/
void toSend(Long sid, ByteBuffer b) {
/*
@@ -322,39 +314,64 @@
queueSendMap.get(sid).take();
}
queueSendMap.get(sid).put(b);
- }
+ }
+
+ connectOne(sid);
- //synchronized (senderWorkerMap) {
- if ((senderWorkerMap.get(sid) == null)) {
- SocketChannel channel;
- try {
- channel = SocketChannel
- .open(self.quorumPeers.get(sid).electionAddr);
- channel.socket().setTcpNoDelay(true);
- initiateConnection(channel, sid);
- } catch (IOException e) {
- LOG.warn("Cannot open channel to "
- + sid + "( " + e.toString()
- + ")");
- }
- }
- //}
} catch (InterruptedException e) {
LOG.warn("Interrupted while waiting to put message in queue."
+ e.toString());
}
}
+
+ /**
+ * Try to establish a connection to server with id sid.
+ *
+ * @param sid server id
+ */
+
+ void connectOne(long sid){
+ if ((senderWorkerMap.get(sid) == null)) {
+ SocketChannel channel;
+ try {
+ channel = SocketChannel
+ .open(self.quorumPeers.get(sid).electionAddr);
+ channel.socket().setTcpNoDelay(true);
+ initiateConnection(channel, sid);
+ } catch (IOException e) {
+ LOG.warn("Cannot open channel to "
+ + sid + "( " + e.toString()
+ + ")");
+ }
+ }
+ }
+
+
+ /**
+ * Try to establish a connection with each server if one
+ * doesn't exist.
+ */
+
+ void connectAll(){
+ long sid;
+ for(Enumeration<Long> en = queueSendMap.keys();
+ en.hasMoreElements();){
+ sid = en.nextElement();
+ connectOne(sid);
+ }
+ }
+
/**
* Check if all queues are empty, indicating that all messages have been delivered.
*/
boolean haveDelivered() {
for (ArrayBlockingQueue<ByteBuffer> queue : queueSendMap.values()) {
- if (queue.size() == 0)
- return true;
+ if (queue.size() != 0)
+ return false;
}
- return false;
+ return true;
}
/**