You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by br...@apache.org on 2008/10/01 10:40:03 UTC
svn commit: r700714 - in /hadoop/zookeeper/trunk: ./
src/java/jmx/org/apache/zookeeper/server/quorum/
src/java/main/org/apache/zookeeper/server/quorum/
src/java/test/org/apache/zookeeper/test/
Author: breed
Date: Wed Oct 1 01:40:02 2008
New Revision: 700714
URL: http://svn.apache.org/viewvc?rev=700714&view=rev
Log:
ZOOKEEPER-127. Use of non-standard election ports in config breaks services
Added:
hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/FLETest.java
Modified:
hadoop/zookeeper/trunk/CHANGES.txt
hadoop/zookeeper/trunk/src/java/jmx/org/apache/zookeeper/server/quorum/ManagedQuorumPeer.java
hadoop/zookeeper/trunk/src/java/jmx/org/apache/zookeeper/server/quorum/ManagedQuorumPeerMain.java
hadoop/zookeeper/trunk/src/java/jmx/org/apache/zookeeper/server/quorum/ObservableQuorumPeer.java
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/AuthFastLeaderElection.java
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Election.java
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FastLeaderElection.java
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Follower.java
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LeaderElection.java
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeerConfig.java
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeerMain.java
hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/LETest.java
hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/QuorumTest.java
Modified: hadoop/zookeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/CHANGES.txt?rev=700714&r1=700713&r2=700714&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/CHANGES.txt (original)
+++ hadoop/zookeeper/trunk/CHANGES.txt Wed Oct 1 01:40:02 2008
@@ -99,3 +99,6 @@
ZOOKEEPER-38. headers (version+) in log/snap files (Andrew Kornev and Mahadev
Konar via breed)
+
+ ZOOKEEPER-127. Use of non-standard election ports in config breaks services (Mark
+ Harwood and Flavio Junqueira via breed)
Modified: hadoop/zookeeper/trunk/src/java/jmx/org/apache/zookeeper/server/quorum/ManagedQuorumPeer.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/jmx/org/apache/zookeeper/server/quorum/ManagedQuorumPeer.java?rev=700714&r1=700713&r2=700714&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/jmx/org/apache/zookeeper/server/quorum/ManagedQuorumPeer.java (original)
+++ hadoop/zookeeper/trunk/src/java/jmx/org/apache/zookeeper/server/quorum/ManagedQuorumPeer.java Wed Oct 1 01:40:02 2008
@@ -20,7 +20,7 @@
import java.io.File;
import java.io.IOException;
-import java.util.ArrayList;
+import java.util.HashMap;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.log4j.Logger;
@@ -139,7 +139,7 @@
"Starting quorum peer");
quorumBean=new QuorumBean(qp);
MBeanRegistry.getInstance().register(quorumBean, null);
- for(QuorumServer s: qp.quorumPeers){
+ for(QuorumServer s: qp.quorumPeers.values()){
ZKMBeanInfo p;
if(qp.getId()==s.id)
p=localPeerBean=new LocalPeerBean(qp);
@@ -209,21 +209,16 @@
setupObservers();
}
- public ManagedQuorumPeer(ArrayList<QuorumServer> quorumPeers,
- File dataDir, File dataLogDir, int clientPort,
- int electionAlg, int electionPort, long myid, int tickTime, int initLimit,
- int syncLimit) throws IOException {
- super(quorumPeers, dataDir, dataLogDir, clientPort,
- electionAlg, electionPort, myid, tickTime, initLimit, syncLimit);
+ public ManagedQuorumPeer(HashMap<Long,QuorumServer> quorumPeers, File dataDir, File dataLogDir, int clientPort, int electionAlg, long myid, int tickTime, int initLimit,
+ int syncLimit) throws IOException {
+ super(quorumPeers, dataDir, dataLogDir, clientPort, electionAlg, myid, tickTime, initLimit,
+syncLimit);
setupObservers();
}
- public ManagedQuorumPeer(ArrayList<QuorumServer> quorumPeers,
- File dataDir, File dataLogDir, int electionType, int electionPort,
- long myid, int tickTime, int initLimit, int syncLimit,
- NIOServerCnxn.Factory cnxnFactory) throws IOException {
- super(quorumPeers, dataDir, dataLogDir, electionType, electionPort,
- myid, tickTime, initLimit, syncLimit, cnxnFactory);
+ public ManagedQuorumPeer(HashMap<Long,QuorumServer> quorumPeers, File dataDir, File dataLogDir, int electionType, long myid, int tickTime, int initLimit, int syncLimit,
+ NIOServerCnxn.Factory cnxnFactory) throws IOException {
+ super(quorumPeers, dataDir, dataLogDir, electionType, myid, tickTime, initLimit, syncLimit, cnxnFactory);
setupObservers();
}
Modified: hadoop/zookeeper/trunk/src/java/jmx/org/apache/zookeeper/server/quorum/ManagedQuorumPeerMain.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/jmx/org/apache/zookeeper/server/quorum/ManagedQuorumPeerMain.java?rev=700714&r1=700713&r2=700714&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/jmx/org/apache/zookeeper/server/quorum/ManagedQuorumPeerMain.java (original)
+++ hadoop/zookeeper/trunk/src/java/jmx/org/apache/zookeeper/server/quorum/ManagedQuorumPeerMain.java Wed Oct 1 01:40:02 2008
@@ -98,7 +98,6 @@
File(ServerConfig.getDataDir()));
peer.setTxnFactory(factory);
peer.setQuorumPeers(QuorumPeerConfig.getServers());
- peer.setElectionPort(QuorumPeerConfig.getElectionPort());
peer.setElectionType(QuorumPeerConfig.getElectionAlg());
peer.setMyid(QuorumPeerConfig.getServerId());
peer.setTickTime(QuorumPeerConfig.getTickTime());
Modified: hadoop/zookeeper/trunk/src/java/jmx/org/apache/zookeeper/server/quorum/ObservableQuorumPeer.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/jmx/org/apache/zookeeper/server/quorum/ObservableQuorumPeer.java?rev=700714&r1=700713&r2=700714&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/jmx/org/apache/zookeeper/server/quorum/ObservableQuorumPeer.java (original)
+++ hadoop/zookeeper/trunk/src/java/jmx/org/apache/zookeeper/server/quorum/ObservableQuorumPeer.java Wed Oct 1 01:40:02 2008
@@ -20,7 +20,8 @@
import java.io.File;
import java.io.IOException;
-import java.util.ArrayList;
+import java.util.HashMap;
+import java.net.InetSocketAddress;
import org.apache.zookeeper.server.NIOServerCnxn;
import org.apache.zookeeper.server.ZooKeeperServer;
@@ -63,21 +64,14 @@
super();
}
- public ObservableQuorumPeer(ArrayList<QuorumServer> quorumPeers, File dataDir,
- File dataLogDir, int clientPort, int electionAlg,
- int electionPort, long myid, int tickTime, int initLimit,
- int syncLimit) throws IOException {
- super(quorumPeers, dataDir, dataLogDir, clientPort,
- electionAlg, electionPort, myid, tickTime, initLimit, syncLimit);
+ public ObservableQuorumPeer(HashMap<Long,QuorumServer> quorumPeers, File dataDir, File dataLogDir, int clientPort, int electionAlg, long myid, int tickTime, int initLimit,
+ int syncLimit) throws IOException {
+ super(quorumPeers, dataDir, dataLogDir, clientPort, electionAlg, myid, tickTime, initLimit, syncLimit);
}
- public ObservableQuorumPeer(ArrayList<QuorumServer> quorumPeers,
- File dataDir, File dataLogDir, int electionType,
- int electionPort, long myid, int tickTime,
- int initLimit, int syncLimit,
- NIOServerCnxn.Factory cnxnFactory) throws IOException {
- super(quorumPeers, dataDir, dataLogDir, electionType, electionPort,
- myid, tickTime, initLimit, syncLimit, cnxnFactory);
+ public ObservableQuorumPeer(HashMap<Long,QuorumServer> quorumPeers, File dataDir, File dataLogDir, int electionType, long myid, int tickTime, int initLimit, int syncLimit,
+ NIOServerCnxn.Factory cnxnFactory) throws IOException {
+ super(quorumPeers, dataDir, dataLogDir, electionType, myid, tickTime, initLimit, syncLimit, cnxnFactory);
}
Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/AuthFastLeaderElection.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/AuthFastLeaderElection.java?rev=700714&r1=700713&r2=700714&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/AuthFastLeaderElection.java (original)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/AuthFastLeaderElection.java Wed Oct 1 01:40:02 2008
@@ -670,7 +670,7 @@
t.start();
}
- for (QuorumServer server : self.quorumPeers) {
+ for (QuorumServer server : self.quorumPeers.values()) {
InetSocketAddress saddr = new InetSocketAddress(server.addr
.getAddress(), port);
addrChallengeMap.put(saddr, new HashMap<Long, Long>());
@@ -690,19 +690,19 @@
long proposedLeader;
long proposedZxid;
- public AuthFastLeaderElection(QuorumPeer self, int electionPort,
+ public AuthFastLeaderElection(QuorumPeer self,
boolean auth) {
this.authEnabled = auth;
- starter(self, electionPort);
+ starter(self);
}
- public AuthFastLeaderElection(QuorumPeer self, int electionPort) {
- starter(self, electionPort);
+ public AuthFastLeaderElection(QuorumPeer self) {
+ starter(self);
}
- private void starter(QuorumPeer self, int electionPort) {
+ private void starter(QuorumPeer self) {
this.self = self;
- port = electionPort;
+ port = self.quorumPeers.get(self.getId()).electionAddr.getPort();
proposedLeader = -1;
proposedZxid = -1;
@@ -726,14 +726,14 @@
}
private void sendNotifications() {
- for (QuorumServer server : self.quorumPeers) {
- InetSocketAddress saddr = new InetSocketAddress(server.addr
- .getAddress(), port);
+ for (QuorumServer server : self.quorumPeers.values()) {
+ //InetSocketAddress saddr = new InetSocketAddress(server.addr
+ // .getAddress(), port);
ToSend notmsg = new ToSend(ToSend.mType.notification,
AuthFastLeaderElection.sequencer++, proposedLeader,
proposedZxid, logicalclock, QuorumPeer.ServerState.LOOKING,
- saddr);
+ self.quorumPeers.get(server.id).electionAddr);
sendqueue.offer(notmsg);
}
Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Election.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Election.java?rev=700714&r1=700713&r2=700714&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Election.java (original)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Election.java Wed Oct 1 01:40:02 2008
@@ -21,6 +21,6 @@
import org.apache.zookeeper.server.quorum.Vote;
-interface Election {
+public interface Election {
public Vote lookForLeader() throws InterruptedException;
}
\ No newline at end of file
Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FastLeaderElection.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FastLeaderElection.java?rev=700714&r1=700713&r2=700714&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FastLeaderElection.java (original)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FastLeaderElection.java Wed Oct 1 01:40:02 2008
@@ -54,7 +54,7 @@
* once it believes that it has reached the end of
* leader election.
*/
- static int finalizeWait = 100;
+ static int finalizeWait = 200;
/**
* Challenge counter to avoid replay attacks
@@ -103,7 +103,7 @@
/*
* Address of sender
*/
- InetAddress addr;
+ long sid;
}
/**
@@ -119,13 +119,13 @@
long zxid,
long epoch,
ServerState state,
- InetAddress addr) {
+ long sid) {
this.leader = leader;
this.zxid = zxid;
this.epoch = epoch;
this.state = state;
- this.addr = addr;
+ this.sid = sid;
}
/*
@@ -151,7 +151,7 @@
/*
* Address of recipient
*/
- InetAddress addr;
+ long sid;
}
LinkedBlockingQueue<ToSend> sendqueue;
@@ -165,10 +165,6 @@
*/
private class Messenger {
-
- long lastProposedLeader;
- long lastProposedZxid;
- long lastEpoch;
/**
* Receives messages from instance of QuorumCnxManager on
@@ -206,10 +202,10 @@
ackstate = QuorumPeer.ServerState.LOOKING;
break;
case 1:
- ackstate = QuorumPeer.ServerState.LEADING;
+ ackstate = QuorumPeer.ServerState.FOLLOWING;
break;
case 2:
- ackstate = QuorumPeer.ServerState.FOLLOWING;
+ ackstate = QuorumPeer.ServerState.LEADING;
break;
}
@@ -219,22 +215,7 @@
n.zxid = response.buffer.getLong();
n.epoch = response.buffer.getLong();
n.state = ackstate;
- n.addr = response.addr;
-
- /*
- * Accept the values of this notification
- * if we are at right epoch and the new notification
- * contains a vote that succeeds our current vote
- * in our order of votes.
- */
- if ((messenger.lastEpoch <= n.epoch)
- && ((n.zxid > messenger.lastProposedZxid)
- || ((n.zxid == messenger.lastProposedZxid)
- && (n.leader > messenger.lastProposedLeader)))) {
- messenger.lastProposedZxid = n.zxid;
- messenger.lastProposedLeader = n.leader;
- messenger.lastEpoch = n.epoch;
- }
+ n.sid = response.sid;
/*
* If this server is looking, then send proposed leader
@@ -242,7 +223,7 @@
if(self.getPeerState() == QuorumPeer.ServerState.LOOKING){
recvqueue.offer(n);
- if(recvqueue.size() == 0) LOG.debug("Message: " + n.addr);
+ if(recvqueue.size() == 0) LOG.debug("Message: " + n.sid);
/*
* Send a notification back if the peer that sent this
* message is also looking and its logical clock is
@@ -250,12 +231,13 @@
*/
if((ackstate == QuorumPeer.ServerState.LOOKING)
&& (n.epoch < logicalclock)){
+ Vote v = getVote();
ToSend notmsg = new ToSend(ToSend.mType.notification,
- proposedLeader,
- proposedZxid,
+ v.id,
+ v.zxid,
logicalclock,
self.getPeerState(),
- response.addr);
+ response.sid);
sendqueue.offer(notmsg);
}
} else {
@@ -266,12 +248,14 @@
Vote current = self.getCurrentVote();
if(ackstate == QuorumPeer.ServerState.LOOKING){
- ToSend notmsg = new ToSend(ToSend.mType.notification,
+
+ ToSend notmsg = new ToSend(
+ ToSend.mType.notification,
current.id,
current.zxid,
logicalclock,
self.getPeerState(),
- response.addr);
+ response.sid);
sendqueue.offer(notmsg);
}
}
@@ -329,7 +313,7 @@
requestBuffer.putLong(m.zxid);
requestBuffer.putLong(m.epoch);
- manager.toSend(m.addr, requestBuffer);
+ manager.toSend(m.sid, requestBuffer);
}
}
@@ -347,9 +331,6 @@
* @param manager Connection manager
*/
Messenger(QuorumCnxManager manager) {
- lastProposedLeader = 0;
- lastProposedZxid = 0;
- lastEpoch = 0;
Thread t = new Thread(new WorkerSender(manager),
"WorkerSender Thread");
@@ -371,6 +352,13 @@
long proposedLeader;
long proposedZxid;
+
+ /**
+ * Returns the current vlue of the logical clock counter
+ */
+ public long getLogicalClock(){
+ return logicalclock;
+ }
/**
* Constructor of FastLeaderElection. It takes two parameters, one
@@ -410,20 +398,23 @@
recvqueue.clear();
}
+ public void shutdown(){
+ manager.halt();
+ }
/**
* Send notifications to all peers upon a change in our vote
*/
private void sendNotifications() {
- for (QuorumServer server : self.quorumPeers) {
- InetAddress saddr = server.addr.getAddress();
+ for (QuorumServer server : self.quorumPeers.values()) {
+ long sid = server.id;
ToSend notmsg = new ToSend(ToSend.mType.notification,
proposedLeader,
proposedZxid,
logicalclock,
QuorumPeer.ServerState.LOOKING,
- saddr);
+ sid);
sendqueue.offer(notmsg);
}
@@ -454,7 +445,7 @@
* @param zxid zxid of the the vote received last
*/
private boolean termPredicate(
- HashMap<InetAddress, Vote> votes, long l,
+ HashMap<Long, Vote> votes, long l,
long zxid) {
int count = 0;
@@ -475,21 +466,30 @@
}
+ synchronized void updateProposal(long leader, long zxid){
+ proposedLeader = leader;
+ proposedZxid = zxid;
+ }
+
+ synchronized Vote getVote(){
+ return new Vote(proposedLeader, proposedZxid);
+ }
+
/**
* Starts a new round of leader election. Whenever our QuorumPeer
* changes its state to LOOKING, this method is invoked, and it
* sends notifications to al other peers.
*/
public Vote lookForLeader() throws InterruptedException {
- HashMap<InetAddress, Vote> recvset = new HashMap<InetAddress, Vote>();
-
- HashMap<InetAddress, Vote> outofelection = new HashMap<InetAddress, Vote>();
+ HashMap<Long, Vote> recvset = new HashMap<Long, Vote>();
- logicalclock++;
-
- proposedLeader = self.getId();
- proposedZxid = self.getLastLoggedZxid();
+ HashMap<Long, Vote> outofelection = new HashMap<Long, Vote>();
+ synchronized(this){
+ logicalclock++;
+ updateProposal(self.getId(), self.getLastLoggedZxid());
+ }
+
LOG.warn("New election: " + proposedZxid);
sendNotifications();
@@ -515,48 +515,47 @@
}
else switch (n.state) {
case LOOKING:
- // If notification > current, replace and send messages out
- if (n.epoch > logicalclock) {
+ // If notification > current, replace and send messages out
+ LOG.info("Notification: " + n.leader + ", " + n.zxid + ", " +
+ n.epoch + ", " + self.getId() + ", " + self.getPeerState() +
+ ", " + n.state + ", " + n.sid);
+ if (n.epoch > logicalclock) {
logicalclock = n.epoch;
recvset.clear();
- if(totalOrderPredicate(n.leader, n.zxid)){
- proposedLeader = n.leader;
- proposedZxid = n.zxid;
- }
+ updateProposal(n.leader, n.zxid);
sendNotifications();
} else if (n.epoch < logicalclock) {
- break;
+ break;
} else if (totalOrderPredicate(n.leader, n.zxid)) {
- proposedLeader = n.leader;
- proposedZxid = n.zxid;
-
+ updateProposal(n.leader, n.zxid);
sendNotifications();
}
-
- recvset.put(n.addr, new Vote(n.leader,
- n.zxid));
+
+ recvset.put(n.sid, new Vote(n.leader, n.zxid));
//If have received from all nodes, then terminate
if (self.quorumPeers.size() == recvset.size()) {
self.setPeerState((proposedLeader == self.getId()) ?
- ServerState.LEADING: ServerState.FOLLOWING);
+ ServerState.LEADING: ServerState.FOLLOWING);
leaveInstance();
return new Vote(proposedLeader, proposedZxid);
} else if (termPredicate(recvset, proposedLeader, proposedZxid)) {
//Otherwise, wait for a fixed amount of time
LOG.debug("Passed predicate");
- Thread.sleep(finalizeWait);
// Verify if there is any change in the proposed leader
- while ((!recvqueue.isEmpty())
- && !totalOrderPredicate(recvqueue.peek().leader,
- recvqueue.peek().zxid)) {
- recvqueue.poll();
+ while((n = recvqueue.poll(finalizeWait, TimeUnit.MILLISECONDS)) != null){
+ if(totalOrderPredicate(n.leader, n.zxid)){
+ recvqueue.put(n);
+ break;
+ }
}
- if (recvqueue.isEmpty()) {
+
+ if (n == null) {
self.setPeerState((proposedLeader == self.getId()) ?
- ServerState.LEADING: ServerState.FOLLOWING);
+ ServerState.LEADING: ServerState.FOLLOWING);
+ LOG.info("About to leave instance:" + proposedLeader + ", " + proposedZxid + ", " + self.getId() + ", " + self.getPeerState());
leaveInstance();
return new Vote(proposedLeader,
proposedZxid);
@@ -564,24 +563,18 @@
}
break;
case LEADING:
- outofelection.put(n.addr, new Vote(n.leader, n.zxid));
-
- if (termPredicate(outofelection, n.leader, n.zxid)) {
-
- self.setPeerState((n.leader == self.getId()) ?
- ServerState.LEADING: ServerState.FOLLOWING);
-
- leaveInstance();
- return new Vote(n.leader, n.zxid);
- }
- break;
+ leaveInstance();
+ return new Vote(n.leader, n.zxid);
case FOLLOWING:
- outofelection.put(n.addr, new Vote(n.leader, n.zxid));
+ LOG.info("Notification: " + n.leader + ", " + n.zxid + ", " + n.epoch + ", " + self.getId() + ", " + self.getPeerState() + ", " + n.state + ", " + n.sid);
+
+ if(n.epoch >= logicalclock)
+ outofelection.put(n.sid, new Vote(n.leader, n.zxid));
if (termPredicate(outofelection, n.leader, n.zxid)) {
self.setPeerState((n.leader == self.getId()) ?
- ServerState.LEADING: ServerState.FOLLOWING);
+ ServerState.LEADING: ServerState.FOLLOWING);
leaveInstance();
return new Vote(n.leader, n.zxid);
Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Follower.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Follower.java?rev=700714&r1=700713&r2=700714&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Follower.java (original)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Follower.java Wed Oct 1 01:40:02 2008
@@ -114,7 +114,7 @@
InetSocketAddress addr = null;
// Find the leader by id
Vote current = self.getCurrentVote();
- for (QuorumServer s : self.quorumPeers) {
+ for (QuorumServer s : self.quorumPeers.values()) {
if (s.id == current.id) {
addr = s.addr;
break;
@@ -269,7 +269,7 @@
}
}
} catch (IOException e) {
- e.printStackTrace();
+ LOG.warn("Exception when following the leader", e);
try {
sock.close();
} catch (IOException e1) {
Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LeaderElection.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LeaderElection.java?rev=700714&r1=700713&r2=700714&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LeaderElection.java (original)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LeaderElection.java Wed Oct 1 01:40:02 2008
@@ -137,8 +137,9 @@
requestBuffer.putInt(xid);
requestPacket.setLength(4);
HashSet<Long> heardFrom = new HashSet<Long>();
- for (QuorumServer server : self.quorumPeers) {
+ for (QuorumServer server : self.quorumPeers.values()) {
requestPacket.setSocketAddress(server.addr);
+ LOG.warn("Server address: " + server.addr);
try {
s.send(requestPacket);
responsePacket.setLength(responseBytes.length);
Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java?rev=700714&r1=700713&r2=700714&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java (original)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java Wed Oct 1 01:40:02 2008
@@ -28,6 +28,7 @@
import java.util.HashMap;
import java.util.Random;
import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
import org.apache.log4j.Logger;
@@ -51,7 +52,7 @@
*
*/
-class QuorumCnxManager {
+public class QuorumCnxManager {
private static final Logger LOG = Logger.getLogger(QuorumCnxManager.class);
/*
@@ -84,13 +85,13 @@
/*
* Local IP address
*/
- InetAddress localIP;
+ QuorumPeer self;
/*
* Mapping from Peer to Thread number
*/
- HashMap<InetAddress, SendWorker> senderWorkerMap;
- HashMap<InetAddress, ArrayBlockingQueue<ByteBuffer>> queueSendMap;
+ ConcurrentHashMap<Long, SendWorker> senderWorkerMap;
+ ConcurrentHashMap<Long, ArrayBlockingQueue<ByteBuffer>> queueSendMap;
/*
* Reception queue
@@ -109,26 +110,21 @@
Listener listener;
static class Message {
- Message(ByteBuffer buffer, InetAddress addr) {
+ Message(ByteBuffer buffer, long sid) {
this.buffer = buffer;
- this.addr = addr;
+ this.sid = sid;
}
ByteBuffer buffer;
- InetAddress addr;
+ long sid;
}
- QuorumCnxManager(int port) {
+ public QuorumCnxManager(QuorumPeer self) {
this.port = port;
this.recvQueue = new ArrayBlockingQueue<Message>(CAPACITY);
- this.queueSendMap = new HashMap<InetAddress, ArrayBlockingQueue<ByteBuffer>>();
- this.senderWorkerMap = new HashMap<InetAddress, SendWorker>();
-
- try {
- localIP = InetAddress.getLocalHost();
- } catch (UnknownHostException e) {
- LOG.warn("Couldn't get local address");
- }
+ this.queueSendMap = new ConcurrentHashMap<Long, ArrayBlockingQueue<ByteBuffer>>();
+ this.senderWorkerMap = new ConcurrentHashMap<Long, SendWorker>();
+ this.self = self;
// Generates a challenge to guarantee one connection between pairs of
// servers
@@ -140,10 +136,15 @@
}
void genChallenge() {
- Random rand = new Random(System.currentTimeMillis()
- + localIP.hashCode());
- long newValue = rand.nextLong();
- challenge = newValue;
+ try{
+ Random rand = new Random(System.currentTimeMillis()
+ + InetAddress.getLocalHost().hashCode());
+ long newValue = rand.nextLong();
+ challenge = newValue;
+ } catch(UnknownHostException e){
+ LOG.error("Cannot resolve local address");
+ challenge = 0;
+ }
}
/**
@@ -151,54 +152,29 @@
* connection if it loses challenge. Otherwise, it keeps the connection.
*/
- boolean initiateConnection(SocketChannel s) {
+ boolean initiateConnection(SocketChannel s, Long sid) {
boolean challenged = true;
boolean wins = false;
long newChallenge;
-
- // Compare IP addresses based on their hash codes
- //int hashCodeRemote = s.socket().getInetAddress().hashCode();
- //if(hashCodeRemote >= localIP.hashCode()){
- // wins = false;
- //} else {
- // wins = true;
- //}
- //LOG.warn("Hash codes: " + hashCodeRemote + ", " + localIP.hashCode());
try {
- while (challenged && s.isConnected()) {
- // Sending challenge
- byte[] msgBytes = new byte[8];
- ByteBuffer msgBuffer = ByteBuffer.wrap(msgBytes);
- msgBuffer.putLong(challenge);
- msgBuffer.position(0);
- s.write(msgBuffer);
-
- // Reading challenge
- msgBuffer.position(0);
- s.read(msgBuffer);
-
- msgBuffer.position(0);
- newChallenge = msgBuffer.getLong();
- if (challenge > newChallenge) {
- wins = true;
- challenged = false;
- } else if (challenge == newChallenge) {
- genChallenge();
- } else {
- challenged = false;
- }
- }
+ // Sending id and challenge
+ byte[] msgBytes = new byte[8];
+ ByteBuffer msgBuffer = ByteBuffer.wrap(msgBytes);
+ msgBuffer.putLong(self.getId());
+ msgBuffer.position(0);
+ s.write(msgBuffer);
} catch (IOException e) {
LOG.warn("Exception reading or writing challenge: "
+ e.toString());
return false;
}
-
+
// If lost the challenge, then drop the new connection
- if (!wins) {
+ if (sid > self.getId()) {
try {
- //LOG.warn("lost cause (initiate");
+ LOG.warn("Have smaller server identifier, so dropping the connection: (" +
+ sid + ", " + self.getId());
s.socket().close();
} catch (IOException e) {
LOG.warn("Error when closing socket or trying to reopen connection: "
@@ -206,33 +182,23 @@
}
// Otherwise proceed with the connection
- } else
- synchronized (senderWorkerMap) {
- /*
- * It may happen that a thread from a previous connection to the same
- * server is still active. In this case, we terminate the thread by
- * calling finish(). Note that senderWorkerMap is a map from IP
- * addresses to worker thread.
- */
- if (senderWorkerMap.get(s.socket().getInetAddress()) != null) {
- senderWorkerMap.get(s.socket().getInetAddress()).finish();
- }
-
- /*
- * Start new worker thread with a clean state.
- */
+ } else {
if (s != null) {
- SendWorker sw = new SendWorker(s);
- RecvWorker rw = new RecvWorker(s);
+ SendWorker sw = new SendWorker(s, sid);
+ RecvWorker rw = new RecvWorker(s, sid);
sw.setRecv(rw);
if (senderWorkerMap
- .containsKey(s.socket().getInetAddress())) {
- InetAddress addr = s.socket().getInetAddress();
- senderWorkerMap.get(addr).finish();
+ .containsKey(sid)) {
+ senderWorkerMap.get(sid).finish();
}
- senderWorkerMap.put(s.socket().getInetAddress(), sw);
+ if (!queueSendMap.containsKey(sid)) {
+ queueSendMap.put(sid, new ArrayBlockingQueue<ByteBuffer>(
+ CAPACITY));
+ }
+
+ senderWorkerMap.put(sid, sw);
sw.start();
rw.start();
@@ -241,8 +207,8 @@
LOG.warn("Channel null");
return false;
}
- }
-
+
+ }
return false;
}
@@ -257,93 +223,60 @@
boolean challenged = true;
boolean wins = false;
long newChallenge;
-
-
- //Compare IP addresses based on their hash codes.
- //int hashCodeRemote = s.socket().getInetAddress().hashCode();
- //if(hashCodeRemote >= localIP.hashCode()){
- // wins = false;
- //} else {
- // wins = true;
- //}
-
- //LOG.warn("Hash codes: " + hashCodeRemote + ", " + localIP.hashCode());
-
+ Long sid = null;
try {
- while (challenged && s.isConnected()) {
- // Sending challenge
- byte[] msgBytes = new byte[8];
- ByteBuffer msgBuffer = ByteBuffer.wrap(msgBytes);
- long vsent;
- if (senderWorkerMap.get(s.socket().getInetAddress()) == null)
- vsent = Long.MIN_VALUE;
- else
- vsent = challenge;
- msgBuffer.putLong(vsent);
- msgBuffer.position(0);
- s.write(msgBuffer);
-
- // Reading challenge
- msgBuffer.position(0);
- s.read(msgBuffer);
-
- msgBuffer.position(0);
- newChallenge = msgBuffer.getLong();
- if (vsent > newChallenge) {
- wins = true;
- challenged = false;
- } else if (challenge == newChallenge) {
- genChallenge();
- } else {
- challenged = false;
- }
- }
+ // Sending challenge and sid
+ byte[] msgBytes = new byte[8];
+ ByteBuffer msgBuffer = ByteBuffer.wrap(msgBytes);
+
+ s.read(msgBuffer);
+ msgBuffer.position(0);
+
+ // Read server id
+ sid = Long.valueOf(msgBuffer.getLong());
} catch (IOException e) {
LOG.warn("Exception reading or writing challenge: "
+ e.toString());
return false;
}
-
+
//If wins the challenge, then close the new connection.
- if (wins) {
+ if (sid < self.getId()) {
try {
- InetAddress addr = s.socket().getInetAddress();
- SendWorker sw = senderWorkerMap.get(addr);
+ SendWorker sw = senderWorkerMap.get(sid);
- //LOG.warn("Keep connection (received)");
+ LOG.warn("Create new connection");
//sw.connect();
s.socket().close();
- sw.finish();
- SocketChannel channel = SocketChannel.open(new InetSocketAddress(addr, port));
+ if(sw != null) sw.finish();
+ SocketChannel channel = SocketChannel.open(self.quorumPeers.get(sid).electionAddr);
if (channel.isConnected()) {
- initiateConnection(channel);
+ initiateConnection(channel, sid);
}
-
} catch (IOException e) {
LOG.warn("Error when closing socket or trying to reopen connection: "
+ e.toString());
}
//Otherwise start worker threads to receive data.
- } else
- synchronized (senderWorkerMap) {
- if (senderWorkerMap.get(s.socket().getInetAddress()) != null) {
- senderWorkerMap.get(s.socket().getInetAddress()).finish();
- }
-
+ } else {
+
if (s != null) {
- SendWorker sw = new SendWorker(s);
- RecvWorker rw = new RecvWorker(s);
+ SendWorker sw = new SendWorker(s, sid);
+ RecvWorker rw = new RecvWorker(s, sid);
sw.setRecv(rw);
- if (senderWorkerMap
- .containsKey(s.socket().getInetAddress())) {
- InetAddress addr = s.socket().getInetAddress();
- senderWorkerMap.get(addr).finish();
+ if (senderWorkerMap.containsKey(sid)) {
+ senderWorkerMap.get(sid).finish();
}
-
- senderWorkerMap.put(s.socket().getInetAddress(), sw);
+
+ senderWorkerMap.put(sid, sw);
+
+ if (!queueSendMap.containsKey(sid)) {
+ queueSendMap.put(sid, new ArrayBlockingQueue<ByteBuffer>(
+ CAPACITY));
+ }
sw.start();
rw.start();
@@ -352,8 +285,7 @@
LOG.warn("Channel null");
return false;
}
- }
-
+ }
return false;
}
@@ -361,14 +293,14 @@
* Processes invoke this message to send a message. Currently, only leader
* election uses it.
*/
- void toSend(InetAddress addr, ByteBuffer b) {
+ void toSend(Long sid, ByteBuffer b) {
/*
* If sending message to myself, then simply enqueue it (loopback).
*/
- if (addr.equals(localIP)) {
+ if (self.getId() == sid) {
try {
b.position(0);
- recvQueue.put(new Message(b.duplicate(), addr));
+ recvQueue.put(new Message(b.duplicate(), sid));
} catch (InterruptedException e) {
LOG.warn("Exception when loopbacking");
}
@@ -380,33 +312,33 @@
/*
* Start a new connection if doesn't have one already.
*/
- if (!queueSendMap.containsKey(addr)) {
- queueSendMap.put(addr, new ArrayBlockingQueue<ByteBuffer>(
+ if (!queueSendMap.containsKey(sid)) {
+ queueSendMap.put(sid, new ArrayBlockingQueue<ByteBuffer>(
CAPACITY));
- queueSendMap.get(addr).put(b);
+ queueSendMap.get(sid).put(b);
} else {
- if (queueSendMap.get(addr).remainingCapacity() == 0) {
- queueSendMap.get(addr).take();
+ if (queueSendMap.get(sid).remainingCapacity() == 0) {
+ queueSendMap.get(sid).take();
}
- queueSendMap.get(addr).put(b);
+ queueSendMap.get(sid).put(b);
}
- synchronized (senderWorkerMap) {
- if (senderWorkerMap.get(addr) == null) {
+ //synchronized (senderWorkerMap) {
+ if ((senderWorkerMap.get(sid) == null)) {
SocketChannel channel;
try {
channel = SocketChannel
- .open(new InetSocketAddress(addr, port));
+ .open(self.quorumPeers.get(sid).electionAddr);
channel.socket().setTcpNoDelay(true);
- initiateConnection(channel);
+ initiateConnection(channel, sid);
} catch (IOException e) {
LOG.warn("Cannot open channel to "
- + addr.toString() + "( " + e.toString()
+ + sid + "( " + e.toString()
+ ")");
}
}
- }
+ //}
} catch (InterruptedException e) {
LOG.warn("Interrupted while waiting to put message in queue."
+ e.toString());
@@ -428,9 +360,15 @@
/**
* Flag that it is time to wrap up all activities and interrupt the listener.
*/
- public void shutdown() {
+ public void halt() {
shutdown = true;
- listener.interrupt();
+ LOG.warn("Halting listener");
+ listener.halt();
+
+ for(SendWorker sw: senderWorkerMap.values()){
+ LOG.warn("Halting sender: " + sw);
+ sw.finish();
+ }
}
/**
@@ -438,6 +376,7 @@
*/
class Listener extends Thread {
+ ServerSocketChannel ss = null;
/**
* Sleeps on accept().
*/
@@ -446,35 +385,31 @@
ServerSocketChannel ss = null;
try {
ss = ServerSocketChannel.open();
+ int port = self.quorumPeers.get(self.getId()).electionAddr.getPort();
+ LOG.warn("My election bind port: " + port);
ss.socket().bind(new InetSocketAddress(port));
while (!shutdown) {
SocketChannel client = ss.accept();
client.socket().setTcpNoDelay(true);
- /*
- * This synchronized block guarantees that if
- * both parties try to connect to each other
- * simultaneously, then only one will succeed.
- * If we don't have this block, then there
- * are runs in which both parties act as if they
- * don't have any connection starting or started.
- * In receiveConnection(), a server sends the minimum
- * value for a challenge, if they believe they must
- * accept the connection because they don't have one.
- *
- * This synchronized block prevents that the same server
- * invokes receiveConnection() and initiateConnection()
- * simultaneously.
- */
- synchronized(senderWorkerMap){
- LOG.warn("Connection request");
- receiveConnection(client);
- }
+
+ //synchronized(senderWorkerMap){
+ LOG.warn("Connection request");
+ receiveConnection(client);
+ //}
}
} catch (IOException e) {
System.err.println("Listener.run: " + e.getMessage());
}
}
+
+ void halt(){
+ try{
+ if(ss != null) ss.close();
+ } catch (IOException e){
+ LOG.warn("Exception when shutting down listener: " + e);
+ }
+ }
}
/**
@@ -485,17 +420,17 @@
class SendWorker extends Thread {
// Send msgs to peer
- InetAddress addr;
+ Long sid;
SocketChannel channel;
RecvWorker recvWorker;
boolean running = true;
- SendWorker(SocketChannel channel) {
- this.addr = channel.socket().getInetAddress();
+ SendWorker(SocketChannel channel, Long sid) {
+ this.sid = sid;
this.channel = channel;
recvWorker = null;
- LOG.debug("Address of remote peer: " + this.addr);
+ LOG.debug("Address of remote peer: " + this.sid);
}
void setRecv(RecvWorker recvWorker) {
@@ -508,7 +443,7 @@
this.interrupt();
if (recvWorker != null)
recvWorker.finish();
- senderWorkerMap.remove(channel.socket().getInetAddress());
+ senderWorkerMap.remove(sid);
return running;
}
@@ -519,7 +454,7 @@
ByteBuffer b = null;
try {
- b = queueSendMap.get(addr).take();
+ b = queueSendMap.get(sid).take();
} catch (InterruptedException e) {
LOG.warn("Interrupted while waiting for message on queue ("
+ e.toString() + ")");
@@ -541,18 +476,18 @@
* If reconnection doesn't work, then put the
* message back to the beginning of the queue and leave.
*/
- LOG.warn("Exception when using channel: " + addr
+ LOG.warn("Exception when using channel: " + sid
+ ")" + e.toString());
running = false;
synchronized (senderWorkerMap) {
recvWorker.finish();
recvWorker = null;
- senderWorkerMap.remove(channel.socket().getInetAddress());
+ senderWorkerMap.remove(sid);
- if (queueSendMap.get(channel.socket().getInetAddress())
+ if (queueSendMap.get(sid)
.size() == 0)
- queueSendMap.get(channel.socket().getInetAddress())
+ queueSendMap.get(sid)
.offer(b);
}
}
@@ -566,12 +501,12 @@
* channel breaks, then removes itself from the pool of receivers.
*/
class RecvWorker extends Thread {
- InetAddress addr;
+ Long sid;
SocketChannel channel;
boolean running = true;
- RecvWorker(SocketChannel channel) {
- this.addr = channel.socket().getInetAddress();
+ RecvWorker(SocketChannel channel, Long sid) {
+ this.sid = sid;
this.channel = channel;
}
@@ -610,7 +545,7 @@
message.position(0);
synchronized (recvQueue) {
recvQueue
- .put(new Message(message.duplicate(), addr));
+ .put(new Message(message.duplicate(), sid));
}
msgLength.position(0);
}
Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java?rev=700714&r1=700713&r2=700714&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java (original)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java Wed Oct 1 01:40:02 2008
@@ -26,13 +26,16 @@
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
+import java.util.HashMap;
+
+import org.apache.jute.BinaryInputArchive;
+import org.apache.jute.InputArchive;
import org.apache.log4j.Logger;
import org.apache.zookeeper.server.NIOServerCnxn;
import org.apache.zookeeper.server.ZooKeeperServer;
import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
import static org.apache.zookeeper.server.quorum.QuorumPeerConfig.getElectionAlg;
-import static org.apache.zookeeper.server.quorum.QuorumPeerConfig.getElectionPort;
import static org.apache.zookeeper.server.quorum.QuorumPeerConfig.getInitLimit;
import static org.apache.zookeeper.server.quorum.QuorumPeerConfig.getServerId;
import static org.apache.zookeeper.server.quorum.QuorumPeerConfig.getServers;
@@ -78,13 +81,22 @@
}
public static class QuorumServer {
- public QuorumServer(long id, InetSocketAddress addr) {
+ public QuorumServer(long id, InetSocketAddress addr,
+ InetSocketAddress electionAddr) {
this.id = id;
this.addr = addr;
+ this.electionAddr = electionAddr;
}
+ public QuorumServer(long id, InetSocketAddress addr) {
+ this.id = id;
+ this.addr = addr;
+ }
+
public InetSocketAddress addr;
+ public InetSocketAddress electionAddr;
+
public long id;
}
@@ -94,7 +106,7 @@
/**
* The servers that make up the cluster
*/
- ArrayList<QuorumServer> quorumPeers;
+ HashMap<Long, QuorumServer> quorumPeers;
public int getQuorumSize(){
return quorumPeers.size();
}
@@ -239,23 +251,23 @@
Election electionAlg;
- int electionPort;
-
NIOServerCnxn.Factory cnxnFactory;
private FileTxnSnapLog logFactory = null;
public QuorumPeer() {
super("QuorumPeer");
+ QuorumStats.getInstance().setStatsProvider(this);
}
- public QuorumPeer(ArrayList<QuorumServer> quorumPeers, File dataDir, File dataLogDir,
- int electionAlg, int electionPort,long myid, int tickTime,
- int initLimit, int syncLimit,NIOServerCnxn.Factory cnxnFactory) throws IOException {
+ public QuorumPeer(HashMap<Long, QuorumServer> quorumPeers, File dataDir,
+ File dataLogDir, int electionType,
+ long myid, int tickTime, int initLimit, int syncLimit,
+ NIOServerCnxn.Factory cnxnFactory) throws IOException {
super("QuorumPeer");
this.cnxnFactory = cnxnFactory;
this.quorumPeers = quorumPeers;
- this.electionPort = electionPort;
+ this.electionType = electionType;
this.myid = myid;
this.tickTime = tickTime;
this.initLimit = initLimit;
@@ -264,22 +276,6 @@
QuorumStats.getInstance().setStatsProvider(this);
}
- public QuorumPeer(ArrayList<QuorumServer> quorumPeers, FileTxnSnapLog logFactory,
- int electionAlg, int electionPort,long myid, int tickTime,
- int initLimit, int syncLimit,NIOServerCnxn.Factory cnxnFactory) throws IOException {
-
- super("QuorumPeer");
- this.cnxnFactory = cnxnFactory;
- this.quorumPeers = quorumPeers;
- this.electionPort = electionPort;
- this.myid = myid;
- this.tickTime = tickTime;
- this.initLimit = initLimit;
- this.syncLimit = syncLimit;
- this.logFactory=logFactory;
- QuorumStats.getInstance().setStatsProvider(this);
- }
-
@Override
public synchronized void start() {
startLeaderElection();
@@ -292,9 +288,9 @@
responder.running = false;
responder.interrupt();
}
- public void startLeaderElection() {
+ synchronized public void startLeaderElection() {
currentVote = new Vote(myid, getLastLoggedZxid());
- for (QuorumServer p : quorumPeers) {
+ for (QuorumServer p : quorumPeers.values()) {
if (p.id == myid) {
myQuorumAddr = p.addr;
break;
@@ -319,13 +315,11 @@
* This constructor is only used by the existing unit test code.
* It defaults to FileLogProvider persistence provider.
*/
- public QuorumPeer(ArrayList<QuorumServer> quorumPeers, File snapDir,
- File logDir, int clientPort, int electionAlg, int electionPort,
+ public QuorumPeer(HashMap<Long,QuorumServer> quorumPeers, File snapDir,
+ File logDir, int clientPort, int electionAlg,
long myid, int tickTime, int initLimit, int syncLimit) throws IOException {
- this(quorumPeers,
- new FileTxnSnapLog(snapDir,logDir),
- electionAlg,electionPort,myid,tickTime,initLimit,syncLimit,
- new NIOServerCnxn.Factory(clientPort));
+ this(quorumPeers, snapDir, logDir, electionAlg,
+ myid,tickTime, initLimit,syncLimit,new NIOServerCnxn.Factory(clientPort));
}
public long getLastLoggedZxid(){
@@ -352,14 +346,14 @@
// will create a new instance for each run of the protocol
break;
case 1:
- le = new AuthFastLeaderElection(this, this.electionPort);
+ le = new AuthFastLeaderElection(this);
break;
case 2:
- le = new AuthFastLeaderElection(this, this.electionPort, true);
+ le = new AuthFastLeaderElection(this, true);
break;
case 3:
le = new FastLeaderElection(this,
- new QuorumCnxManager(this.electionPort));
+ new QuorumCnxManager(this));
default:
assert false;
}
@@ -484,19 +478,6 @@
}
- public NIOServerCnxn.Factory getCnxnFactory() {
- return cnxnFactory;
- }
-
- public void setCnxnFactory(NIOServerCnxn.Factory cnxnFactory) {
- this.cnxnFactory = cnxnFactory;
- }
-
- public void setQuorumPeers(ArrayList<QuorumServer> quorumPeers) {
- this.quorumPeers = quorumPeers;
- }
-
-
/**
* get the id of this quorum peer.
*/
@@ -540,6 +521,14 @@
}
/**
+ * Get an instance of LeaderElection
+ */
+
+ public Election getElectionAlg(){
+ return electionAlg;
+ }
+
+ /**
* Get the number of ticks that can pass between sending a request and getting
* an acknowledgement
*/
@@ -556,13 +545,6 @@
}
/**
- * Gets the election port
- */
- public int getElectionPort() {
- return electionPort;
- }
-
- /**
* Gets the election type
*/
public int getElectionType() {
@@ -576,13 +558,18 @@
this.electionType = electionType;
}
- /**
- * Sets the election port
- */
- public void setElectionPort(int electionPort) {
- this.electionPort = electionPort;
+ public NIOServerCnxn.Factory getCnxnFactory() {
+ return cnxnFactory;
}
-
+
+ public void setCnxnFactory(NIOServerCnxn.Factory cnxnFactory) {
+ this.cnxnFactory = cnxnFactory;
+ }
+
+ public void setQuorumPeers(HashMap<Long,QuorumServer> quorumPeers) {
+ this.quorumPeers = quorumPeers;
+ }
+
public int getClientPort() {
return -1;
}
Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeerConfig.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeerConfig.java?rev=700714&r1=700713&r2=700714&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeerConfig.java (original)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeerConfig.java Wed Oct 1 01:40:02 2008
@@ -23,9 +23,9 @@
import java.io.FileInputStream;
import java.io.FileReader;
import java.net.InetSocketAddress;
-import java.util.ArrayList;
import java.util.Properties;
import java.util.Map.Entry;
+import java.util.HashMap;
import org.apache.log4j.Logger;
@@ -40,7 +40,7 @@
private int syncLimit;
private int electionAlg;
private int electionPort;
- private ArrayList<QuorumServer> servers = null;
+ private HashMap<Long,QuorumServer> servers = null;
private long serverId;
private QuorumPeerConfig(int port, String dataDir, String dataLogDir) {
@@ -68,7 +68,7 @@
} finally {
zooCfgStream.close();
}
- ArrayList<QuorumServer> servers = new ArrayList<QuorumServer>();
+ HashMap<Long,QuorumServer> servers = new HashMap<Long,QuorumServer>();
String dataDir = null;
String dataLogDir = null;
int clientPort = 0;
@@ -94,19 +94,24 @@
syncLimit = Integer.parseInt(value);
} else if (key.equals("electionAlg")) {
electionAlg = Integer.parseInt(value);
- } else if (key.equals("electionPort")) {
- electionPort = Integer.parseInt(value);
} else if (key.startsWith("server.")) {
int dot = key.indexOf('.');
long sid = Long.parseLong(key.substring(dot + 1));
String parts[] = value.split(":");
- if (parts.length != 2) {
+ if ((parts.length != 2) &&
+ (parts.length != 3)){
LOG.error(value
- + " does not have the form host:port");
+ + " does not have the form host:port or host:port:port");
}
InetSocketAddress addr = new InetSocketAddress(parts[0],
- Integer.parseInt(parts[1]));
- servers.add(new QuorumServer(sid, addr));
+ Integer.parseInt(parts[1]));
+ if(parts.length == 2)
+ servers.put(Long.valueOf(sid), new QuorumServer(sid, addr));
+ else if(parts.length == 3){
+ InetSocketAddress electionAddr = new InetSocketAddress(parts[0],
+ Integer.parseInt(parts[2]));
+ servers.put(Long.valueOf(sid), new QuorumServer(sid, addr, electionAddr));
+ }
} else {
System.setProperty("zookeeper." + key, value);
}
@@ -145,7 +150,6 @@
conf.initLimit = initLimit;
conf.syncLimit = syncLimit;
conf.electionAlg = electionAlg;
- conf.electionPort = electionPort;
conf.servers = servers;
if (servers.size() > 1) {
File myIdFile = new File(dataDir, "myid");
@@ -198,13 +202,8 @@
assert instance instanceof QuorumPeerConfig;
return ((QuorumPeerConfig)instance).electionAlg;
}
-
- public static int getElectionPort() {
- assert instance instanceof QuorumPeerConfig;
- return ((QuorumPeerConfig)instance).electionPort;
- }
-
- public static ArrayList<QuorumServer> getServers() {
+
+ public static HashMap<Long,QuorumServer> getServers() {
assert instance instanceof QuorumPeerConfig;
return ((QuorumPeerConfig)instance).servers;
}
Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeerMain.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeerMain.java?rev=700714&r1=700713&r2=700714&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeerMain.java (original)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeerMain.java Wed Oct 1 01:40:02 2008
@@ -77,7 +77,6 @@
new File(QuorumPeerConfig.getDataLogDir()),
new File(QuorumPeerConfig.getDataDir())));
peer.setQuorumPeers(QuorumPeerConfig.getServers());
- peer.setElectionPort(QuorumPeerConfig.getElectionPort());
peer.setElectionType(QuorumPeerConfig.getElectionAlg());
peer.setMyid(QuorumPeerConfig.getServerId());
peer.setTickTime(QuorumPeerConfig.getTickTime());
Added: hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/FLETest.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/FLETest.java?rev=700714&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/FLETest.java (added)
+++ hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/FLETest.java Wed Oct 1 01:40:02 2008
@@ -0,0 +1,193 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.test;
+
+import java.io.File;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Random;
+
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.server.quorum.FastLeaderElection;
+import org.apache.zookeeper.server.quorum.QuorumCnxManager;
+import org.apache.zookeeper.server.quorum.QuorumPeer;
+import org.apache.zookeeper.server.quorum.QuorumStats;
+import org.apache.zookeeper.server.quorum.Vote;
+import org.apache.zookeeper.server.quorum.QuorumPeer.QuorumServer;
+import org.apache.zookeeper.server.quorum.QuorumPeer.ServerState;
+
+import junit.framework.TestCase;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class FLETest extends TestCase {
+ protected static final Logger LOG = Logger.getLogger(FLETest.class);
+
+ int count;
+ int baseport;
+ int baseLEport;
+ HashMap<Long,QuorumServer> peers;
+ ArrayList<LEThread> threads;
+ File tmpdir[];
+ int port[];
+
+ volatile Vote votes[];
+ volatile boolean leaderDies;
+ volatile long leader = -1;
+ volatile int round = 1;
+ Random rand = new Random();
+
+ @Override
+ public void setUp() throws Exception {
+ count = 7;
+ baseport= 33003;
+ baseLEport = 43003;
+
+ peers = new HashMap<Long,QuorumServer>(count);
+ threads = new ArrayList<LEThread>(count);
+ votes = new Vote[count];
+ tmpdir = new File[count];
+ port = new int[count];
+
+ QuorumStats.registerAsConcrete();
+ LOG.info("SetUp " + getName());
+ }
+
+ @Override
+ public void tearDown() throws Exception {
+ for(int i = 0; i < threads.size(); i++) {
+ ((FastLeaderElection) threads.get(i).peer.getElectionAlg()).shutdown();
+ }
+ LOG.info("FINISHED " + getName());
+ }
+
+ class LEThread extends Thread {
+ FastLeaderElection le;
+ int i;
+ QuorumPeer peer;
+ int peerRound = 1;
+
+ LEThread(QuorumPeer peer, int i) {
+ this.i = i;
+ this.peer = peer;
+ LOG.info("Constructor: " + getName());
+ }
+ public void run() {
+ try {
+ Vote v = null;
+ while(true) {
+ peer.setPeerState(ServerState.LOOKING);
+ LOG.info("Going to call leader election again.");
+ v = peer.getElectionAlg().lookForLeader();
+ if(v == null){
+ LOG.info("Thread " + i + " got a null vote");
+ break;
+ }
+ peer.setCurrentVote(v);
+
+ LOG.info("Finished election: " + i + ", " + v.id);
+ votes[i] = v;
+ if (v.id == ((long) i)) {
+ LOG.debug("I'm the leader");
+ synchronized(FLETest.this) {
+ if (leaderDies) {
+ LOG.debug("Leader " + i + " dying");
+ leaderDies = false;
+ ((FastLeaderElection) peer.getElectionAlg()).shutdown();
+ leader = -1;
+ LOG.debug("Leader " + i + " dead");
+ } else {
+ leader = i;
+ }
+ round++;
+ FLETest.this.notifyAll();
+ }
+ break;
+ }
+ synchronized(FLETest.this) {
+ if (round == ((FastLeaderElection) peer.getElectionAlg()).getLogicalClock()) {
+ int tmp_round = round;
+ FLETest.this.wait(1000);
+ if(tmp_round == round) round++;
+ }
+ LOG.info("The leader: " + leader + " and my vote " + votes[i].id);
+ if (leader == votes[i].id) {
+ break;
+ }
+ peerRound++;
+ }
+ Thread.sleep(rand.nextInt(1000));
+ peer.setCurrentVote(new Vote(peer.getId(), 0));
+ }
+ LOG.debug("Thread " + i + " votes " + v);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+ }
+
+ @Test
+ public void testLE() throws Exception {
+
+ FastLeaderElection le[] = new FastLeaderElection[count];
+ leaderDies = true;
+ boolean allowOneBadLeader = leaderDies;
+
+ LOG.info("TestLE: " + getName()+ ", " + count);
+ for(int i = 0; i < count; i++) {
+ peers.put(Long.valueOf(i), new QuorumServer(i, new InetSocketAddress(baseport+100+i),
+ new InetSocketAddress(baseLEport+100+i)));
+ tmpdir[i] = File.createTempFile("letest", "test");
+ port[i] = baseport+i;
+ }
+
+ for(int i = 0; i < le.length; i++) {
+ QuorumPeer peer = new QuorumPeer(peers, tmpdir[i], tmpdir[i], port[i], 3, i, 2, 2, 2);
+ peer.startLeaderElection();
+ //le[i] = new FastLeaderElection(peer, new QuorumCnxManager(peer));
+ LEThread thread = new LEThread(peer, i);
+ thread.start();
+ threads.add(thread);
+ }
+ LOG.info("Started threads " + getName());
+
+ for(int i = 0; i < threads.size(); i++) {
+ threads.get(i).join(20000);
+ if (threads.get(i).isAlive()) {
+ fail("Threads didn't join: " + i);
+ }
+ }
+ long id = votes[0].id;
+ for(int i = 1; i < votes.length; i++) {
+ if (votes[i] == null) {
+ fail("Thread " + i + " had a null vote");
+ }
+ LOG.info("Final leader info: " + i + ", " + votes[i].id + ", " + id);
+ if (votes[i].id != id) {
+ if (allowOneBadLeader && votes[i].id == i) {
+ allowOneBadLeader = false;
+ } else {
+ fail("Thread " + i + " got " + votes[i].id + " expected " + id);
+ }
+ }
+ }
+ }
+}
Modified: hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/LETest.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/LETest.java?rev=700714&r1=700713&r2=700714&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/LETest.java (original)
+++ hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/LETest.java Wed Oct 1 01:40:02 2008
@@ -22,6 +22,7 @@
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Random;
+import java.util.HashMap;
import org.apache.log4j.Logger;
import org.apache.zookeeper.server.quorum.LeaderElection;
@@ -86,14 +87,14 @@
public void testLE() throws Exception {
int count = 30;
int baseport= 33003;
- ArrayList<QuorumServer> peers = new ArrayList<QuorumServer>(count);
+ HashMap<Long,QuorumServer> peers = new HashMap<Long,QuorumServer>(count);
ArrayList<LEThread> threads = new ArrayList<LEThread>(count);
File tmpdir[] = new File[count];
int port[] = new int[count];
votes = new Vote[count];
QuorumStats.registerAsConcrete();
for(int i = 0; i < count; i++) {
- peers.add(new QuorumServer(i, new InetSocketAddress(baseport+100+i)));
+ peers.put(Long.valueOf(i), new QuorumServer(i, new InetSocketAddress("127.0.0.1", baseport+100+i)));
tmpdir[i] = File.createTempFile("letest", "test");
port[i] = baseport+i;
}
@@ -101,7 +102,7 @@
leaderDies = true;
boolean allowOneBadLeader = leaderDies;
for(int i = 0; i < le.length; i++) {
- QuorumPeer peer = new QuorumPeer(peers, tmpdir[i], tmpdir[i], port[i], 0, 0, i, 2, 2, 2);
+ QuorumPeer peer = new QuorumPeer(peers, tmpdir[i], tmpdir[i], port[i], 0, i, 2, 2, 2);
peer.startLeaderElection();
le[i] = new LeaderElection(peer);
LEThread thread = new LEThread(le[i], peer, i);
Modified: hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/QuorumTest.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/QuorumTest.java?rev=700714&r1=700713&r2=700714&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/QuorumTest.java (original)
+++ hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/QuorumTest.java Wed Oct 1 01:40:02 2008
@@ -22,6 +22,7 @@
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
+import java.util.HashMap;
import org.apache.log4j.Logger;
import org.apache.zookeeper.KeeperException;
@@ -65,22 +66,23 @@
int tickTime = 2000;
int initLimit = 3;
int syncLimit = 3;
- ArrayList<QuorumServer> peers = new ArrayList<QuorumServer>();
- peers.add(new QuorumServer(1, new InetSocketAddress("127.0.0.1", 3181)));
- peers.add(new QuorumServer(2, new InetSocketAddress("127.0.0.1", 3182)));
- peers.add(new QuorumServer(3, new InetSocketAddress("127.0.0.1", 3183)));
- peers.add(new QuorumServer(4, new InetSocketAddress("127.0.0.1", 3184)));
- peers.add(new QuorumServer(5, new InetSocketAddress("127.0.0.1", 3185)));
+ HashMap<Long,QuorumServer> peers = new HashMap<Long,QuorumServer>();
+ peers.put(Long.valueOf(1), new QuorumServer(1, new InetSocketAddress("127.0.0.1", 3181)));
+ peers.put(Long.valueOf(2), new QuorumServer(2, new InetSocketAddress("127.0.0.1", 3182)));
+ peers.put(Long.valueOf(3), new QuorumServer(3, new InetSocketAddress("127.0.0.1", 3183)));
+ peers.put(Long.valueOf(4), new QuorumServer(4, new InetSocketAddress("127.0.0.1", 3184)));
+ peers.put(Long.valueOf(5), new QuorumServer(5, new InetSocketAddress("127.0.0.1", 3185)));
+
LOG.info("creating QuorumPeer 1");
- s1 = new QuorumPeer(peers, s1dir, s1dir, 2181, 0, 1181, 1, tickTime, initLimit, syncLimit);
+ s1 = new QuorumPeer(peers, s1dir, s1dir, 2181, 0, 1, tickTime, initLimit, syncLimit);
LOG.info("creating QuorumPeer 2");
- s2 = new QuorumPeer(peers, s2dir, s2dir, 2182, 0, 1182, 2, tickTime, initLimit, syncLimit);
+ s2 = new QuorumPeer(peers, s2dir, s2dir, 2182, 0, 2, tickTime, initLimit, syncLimit);
LOG.info("creating QuorumPeer 3");
- s3 = new QuorumPeer(peers, s3dir, s3dir, 2183, 0, 1183, 3, tickTime, initLimit, syncLimit);
+ s3 = new QuorumPeer(peers, s3dir, s3dir, 2183, 0, 3, tickTime, initLimit, syncLimit);
LOG.info("creating QuorumPeer 4");
- s4 = new QuorumPeer(peers, s4dir, s4dir, 2184, 0, 1184, 4, tickTime, initLimit, syncLimit);
+ s4 = new QuorumPeer(peers, s4dir, s4dir, 2184, 0, 4, tickTime, initLimit, syncLimit);
LOG.info("creating QuorumPeer 5");
- s5 = new QuorumPeer(peers, s5dir, s5dir, 2185, 0, 1185, 5, tickTime, initLimit, syncLimit);
+ s5 = new QuorumPeer(peers, s5dir, s5dir, 2185, 0, 5, tickTime, initLimit, syncLimit);
LOG.info("start QuorumPeer 1");
s1.start();
LOG.info("start QuorumPeer 2");