You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by ph...@apache.org on 2008/06/24 21:04:59 UTC
svn commit: r671303 [4/6] - in /hadoop/zookeeper/trunk/src/java:
lib/cobertura/lib/ lib/svnant/ main/org/apache/zookeeper/
main/org/apache/zookeeper/server/ main/org/apache/zookeeper/server/auth/
main/org/apache/zookeeper/server/quorum/ main/org/apache...
Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/AuthFastLeaderElection.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/AuthFastLeaderElection.java?rev=671303&r1=671302&r2=671303&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/AuthFastLeaderElection.java (original)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/AuthFastLeaderElection.java Tue Jun 24 12:04:58 2008
@@ -1,908 +1,908 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.zookeeper.server.quorum;
-
-import java.io.IOException;
-import java.net.DatagramPacket;
-import java.net.DatagramSocket;
-import java.net.InetSocketAddress;
-import java.net.SocketException;
-import java.nio.ByteBuffer;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
-import java.util.Random;
-
-import org.apache.log4j.Logger;
-
-import org.apache.zookeeper.server.quorum.Election;
-import org.apache.zookeeper.server.quorum.Vote;
-import org.apache.zookeeper.server.quorum.QuorumPeer.QuorumServer;
-import org.apache.zookeeper.server.quorum.QuorumPeer.ServerState;
-
-public class AuthFastLeaderElection implements Election {
- private static final Logger LOG = Logger.getLogger(AuthFastLeaderElection.class);
-
- /* Sequence numbers for messages */
- static int sequencer = 0;
- static int maxTag = 0;
-
- /*
- * Determine how much time a process has to wait once it believes that it
- * has reached the end of leader election.
- */
- static int finalizeWait = 100;
-
- /*
- * Challenge counter to avoid replay attacks
- */
-
- static int challengeCounter = 0;
-
- /*
- * Flag to determine whether to authenticate or not
- */
-
- private boolean authEnabled = false;
-
- static public class Notification {
- /*
- * Proposed leader
- */
- long leader;
-
- /*
- * zxid of the proposed leader
- */
- long zxid;
-
- /*
- * Epoch
- */
- long epoch;
-
- /*
- * current state of sender
- */
- QuorumPeer.ServerState state;
-
- /*
- * Address of the sender
- */
- InetSocketAddress addr;
- }
-
- /*
- * Messages to send, both Notifications and Acks
- */
- static public class ToSend {
- static enum mType {
- crequest, challenge, notification, ack
- };
-
- ToSend(mType type, long tag, long leader, long zxid, long epoch,
- ServerState state, InetSocketAddress addr) {
-
- switch (type) {
- case crequest:
- this.type = 0;
- this.tag = tag;
- this.leader = leader;
- this.zxid = zxid;
- this.epoch = epoch;
- this.state = state;
- this.addr = addr;
-
- break;
- case challenge:
- this.type = 1;
- this.tag = tag;
- this.leader = leader;
- this.zxid = zxid;
- this.epoch = epoch;
- this.state = state;
- this.addr = addr;
-
- break;
- case notification:
- this.type = 2;
- this.leader = leader;
- this.zxid = zxid;
- this.epoch = epoch;
- this.state = QuorumPeer.ServerState.LOOKING;
- this.tag = tag;
- this.addr = addr;
-
- break;
- case ack:
- this.type = 3;
- this.tag = tag;
- this.leader = leader;
- this.zxid = zxid;
- this.epoch = epoch;
- this.state = state;
- this.addr = addr;
-
- break;
- default:
- break;
- }
- }
-
- /*
- * Message type: 0 notification, 1 acknowledgement
- */
- int type;
-
- /*
- * Proposed leader in the case of notification
- */
- long leader;
-
- /*
- * id contains the tag for acks, and zxid for notifications
- */
- long zxid;
-
- /*
- * Epoch
- */
- long epoch;
-
- /*
- * Current state;
- */
- QuorumPeer.ServerState state;
-
- /*
- * Message tag
- */
- long tag;
-
- InetSocketAddress addr;
- }
-
- LinkedBlockingQueue<ToSend> sendqueue;
-
- LinkedBlockingQueue<Notification> recvqueue;
-
- private class Messenger {
-
- DatagramSocket mySocket;
- long lastProposedLeader;
- long lastProposedZxid;
- long lastEpoch;
- LinkedBlockingQueue<Long> acksqueue;
- HashMap<Long, Long> challengeMap;
- HashMap<Long, Long> challengeMutex;
- HashMap<Long, Long> ackMutex;
- HashMap<InetSocketAddress, HashMap<Long, Long>> addrChallengeMap;
-
- class WorkerReceiver implements Runnable {
-
- DatagramSocket mySocket;
- Messenger myMsg;
-
- WorkerReceiver(DatagramSocket s, Messenger msg) {
- mySocket = s;
- myMsg = msg;
- }
-
- boolean saveChallenge(long tag, long challenge) {
-
- Long l = challengeMutex.get(tag);
-
- synchronized (challengeMap) {
- challengeMap.put(tag, challenge);
- challengeMutex.remove(tag);
- }
-
- if (l != null) {
- synchronized(l){
- l.notify();
- }
- }
-
- return true;
- }
-
- public void run() {
- byte responseBytes[] = new byte[48];
- ByteBuffer responseBuffer = ByteBuffer.wrap(responseBytes);
- DatagramPacket responsePacket = new DatagramPacket(
- responseBytes, responseBytes.length);
- while (true) {
- // Sleeps on receive
- try {
- responseBuffer.clear();
- mySocket.receive(responsePacket);
- } catch (IOException e) {
- LOG.warn("Exception receiving: " + e.toString());
- }
- // Receive new message
- if (responsePacket.getLength() != responseBytes.length) {
- LOG.error("Got a short response: "
- + responsePacket.getLength() + " "
- + responsePacket.toString());
- continue;
- }
- responseBuffer.clear();
- int type = responseBuffer.getInt();
- if ((type > 3) || (type < 0)) {
- LOG.error("Got bad Msg type: " + type);
- continue;
- }
- long tag = responseBuffer.getLong();
-
- QuorumPeer.ServerState ackstate = QuorumPeer.ServerState.LOOKING;
- switch (responseBuffer.getInt()) {
- case 0:
- ackstate = QuorumPeer.ServerState.LOOKING;
- break;
- case 1:
- ackstate = QuorumPeer.ServerState.LEADING;
- break;
- case 2:
- ackstate = QuorumPeer.ServerState.FOLLOWING;
- break;
- }
-
- switch (type) {
- case 0:
- // Receive challenge request
- ToSend c = new ToSend(ToSend.mType.challenge, tag,
- self.currentVote.id, self.currentVote.zxid,
- logicalclock, self.getPeerState(),
- (InetSocketAddress) responsePacket
- .getSocketAddress());
- sendqueue.offer(c);
- break;
- case 1:
- // Receive challenge and store somewhere else
- long challenge = responseBuffer.getLong();
- saveChallenge(tag, challenge);
-
- break;
- case 2:
- Notification n = new Notification();
- n.leader = responseBuffer.getLong();
- n.zxid = responseBuffer.getLong();
- n.epoch = responseBuffer.getLong();
- n.state = ackstate;
- n.addr = (InetSocketAddress) responsePacket
- .getSocketAddress();
-
- if ((myMsg.lastEpoch <= n.epoch)
- && ((n.zxid > myMsg.lastProposedZxid)
- || ((n.zxid == myMsg.lastProposedZxid)
- && (n.leader > myMsg.lastProposedLeader)))) {
- myMsg.lastProposedZxid = n.zxid;
- myMsg.lastProposedLeader = n.leader;
- myMsg.lastEpoch = n.epoch;
- }
-
- long recChallenge;
- InetSocketAddress addr = (InetSocketAddress) responsePacket
- .getSocketAddress();
- if (authEnabled) {
- if (addrChallengeMap.get(addr).get(tag) != null) {
- recChallenge = responseBuffer.getLong();
-
- if (addrChallengeMap.get(addr).get(tag) == recChallenge) {
- recvqueue.offer(n);
-
- ToSend a = new ToSend(ToSend.mType.ack,
- tag, self.currentVote.id,
- self.currentVote.zxid,
- logicalclock, self.getPeerState(),
- (InetSocketAddress) addr);
-
- sendqueue.offer(a);
- } else {
- LOG.warn("Incorrect challenge: "
- + recChallenge + ", "
- + addrChallengeMap.toString());
- }
- } else {
- LOG.warn("No challenge for host: " + addr
- + " " + tag);
- }
- } else {
- recvqueue.offer(n);
-
- ToSend a = new ToSend(ToSend.mType.ack, tag,
- self.currentVote.id, self.currentVote.zxid,
- logicalclock, self.getPeerState(),
- (InetSocketAddress) responsePacket
- .getSocketAddress());
-
- sendqueue.offer(a);
- }
- break;
-
- // Upon reception of an ack message, remove it from the
- // queue
- case 3:
- Long l = ackMutex.get(tag);
- if (l != null) {
- synchronized(l){
- l.notify();
- }
- }
- acksqueue.offer(tag);
-
- if (authEnabled) {
- addrChallengeMap.get(
- (InetSocketAddress) responsePacket
- .getSocketAddress()).remove(tag);
- }
-
- if (ackstate != QuorumPeer.ServerState.LOOKING) {
- Notification outofsync = new Notification();
- outofsync.leader = responseBuffer.getLong();
- outofsync.zxid = responseBuffer.getLong();
- outofsync.epoch = responseBuffer.getLong();
- outofsync.state = ackstate;
- outofsync.addr = (InetSocketAddress) responsePacket
- .getSocketAddress();
-
- recvqueue.offer(outofsync);
- }
-
- break;
- // Default case
- default:
- LOG.warn("Received message of incorrect type");
- break;
- }
- }
- }
- }
-
- class WorkerSender implements Runnable {
-
- Random rand;
- boolean processing;
- int maxAttempts;
- int ackWait = finalizeWait;
-
- /*
- * Receives a socket and max number of attempts as input
- */
-
- WorkerSender(int attempts) {
- maxAttempts = attempts;
- rand = new Random(java.lang.Thread.currentThread().getId()
- + System.currentTimeMillis());
- }
-
- long genChallenge() {
- byte buf[] = new byte[8];
-
- buf[0] = (byte) ((challengeCounter & 0xff000000) >>> 24);
- buf[1] = (byte) ((challengeCounter & 0x00ff0000) >>> 16);
- buf[2] = (byte) ((challengeCounter & 0x0000ff00) >>> 8);
- buf[3] = (byte) ((challengeCounter & 0x000000ff));
-
- challengeCounter++;
- int secret = rand.nextInt(java.lang.Integer.MAX_VALUE);
-
- buf[4] = (byte) ((secret & 0xff000000) >>> 24);
- buf[5] = (byte) ((secret & 0x00ff0000) >>> 16);
- buf[6] = (byte) ((secret & 0x0000ff00) >>> 8);
- buf[7] = (byte) ((secret & 0x000000ff));
-
- return (((long)(buf[0] & 0xFF)) << 56)
- + (((long)(buf[1] & 0xFF)) << 48)
- + (((long)(buf[2] & 0xFF)) << 40)
- + (((long)(buf[3] & 0xFF)) << 32)
- + (((long)(buf[4] & 0xFF)) << 24)
- + (((long)(buf[5] & 0xFF)) << 16)
- + (((long)(buf[6] & 0xFF)) << 8)
- + ((long)(buf[7] & 0xFF));
- }
-
- public void run() {
- while (true) {
- try {
- ToSend m = sendqueue.take();
- process(m);
- } catch (InterruptedException e) {
- break;
- }
-
- }
- }
-
- private void process(ToSend m) {
- int attempts = 0;
- byte zeroes[];
- byte requestBytes[] = new byte[48];
- DatagramPacket requestPacket = new DatagramPacket(requestBytes,
- requestBytes.length);
- ByteBuffer requestBuffer = ByteBuffer.wrap(requestBytes);
-
- switch (m.type) {
- case 0:
- /*
- * Building challenge request packet to send
- */
- requestBuffer.clear();
- requestBuffer.putInt(ToSend.mType.crequest.ordinal());
- requestBuffer.putLong(m.tag);
- requestBuffer.putInt(m.state.ordinal());
- zeroes = new byte[32];
- requestBuffer.put(zeroes);
-
- requestPacket.setLength(48);
- requestPacket.setSocketAddress(m.addr);
-
- try {
- if (challengeMap.get(m.tag) == null) {
- mySocket.send(requestPacket);
- }
- } catch (IOException e) {
- LOG.warn("Exception while sending challenge: "
- + e.toString());
- }
-
- break;
- case 1:
- /*
- * Building challenge packet to send
- */
-
- long newChallenge;
- if (addrChallengeMap.get(m.addr).containsKey(m.tag)) {
- newChallenge = addrChallengeMap.get(m.addr).get(m.tag);
- } else {
- newChallenge = genChallenge();
- }
-
- addrChallengeMap.get(m.addr).put(m.tag, newChallenge);
-
- requestBuffer.clear();
- requestBuffer.putInt(ToSend.mType.challenge.ordinal());
- requestBuffer.putLong(m.tag);
- requestBuffer.putInt(m.state.ordinal());
- requestBuffer.putLong(newChallenge);
- zeroes = new byte[24];
- requestBuffer.put(zeroes);
-
- requestPacket.setLength(48);
- requestPacket.setSocketAddress(m.addr);
-
- try {
- mySocket.send(requestPacket);
- } catch (IOException e) {
- LOG.warn("Exception while sending challenge: "
- + e.toString());
- }
-
- break;
- case 2:
-
- /*
- * Building notification packet to send
- */
-
- requestBuffer.clear();
- requestBuffer.putInt(m.type);
- requestBuffer.putLong(m.tag);
- requestBuffer.putInt(m.state.ordinal());
- requestBuffer.putLong(m.leader);
- requestBuffer.putLong(m.zxid);
- requestBuffer.putLong(m.epoch);
- zeroes = new byte[8];
- requestBuffer.put(zeroes);
-
- requestPacket.setLength(48);
- requestPacket.setSocketAddress(m.addr);
-
- boolean myChallenge = false;
- boolean myAck = false;
-
- while (attempts < maxAttempts) {
- try {
- /*
- * Try to obtain a challenge only if does not have
- * one yet
- */
-
- if (!myChallenge && authEnabled) {
- ToSend crequest = new ToSend(
- ToSend.mType.crequest, m.tag, m.leader,
- m.zxid, m.epoch,
- QuorumPeer.ServerState.LOOKING, m.addr);
- sendqueue.offer(crequest);
-
- try {
- double timeout = ackWait
- * java.lang.Math.pow(2, attempts);
-
- Long l = Long.valueOf(m.tag);
- synchronized (l) {
- challengeMutex.put(m.tag, l);
- l.wait((long) timeout);
- myChallenge = challengeMap
- .containsKey(m.tag);
- }
- } catch (InterruptedException e) {
- LOG.warn("Challenge request exception: "
- + e.toString());
- }
- }
-
- /*
- * If don't have challenge yet, skip sending
- * notification
- */
-
- if (authEnabled && !myChallenge) {
- attempts++;
- continue;
- }
-
- if (authEnabled) {
- requestBuffer.position(40);
- requestBuffer.putLong(challengeMap.get(m.tag));
- }
- mySocket.send(requestPacket);
- try {
- Long l = Long.valueOf(m.tag);
- double timeout = ackWait
- * java.lang.Math.pow(10, attempts);
- synchronized (l) {
- ackMutex.put(m.tag, l);
- l.wait((int) timeout);
- }
- } catch (InterruptedException e) {
- LOG.warn("Ack exception: "
- + e.toString());
- }
- synchronized (acksqueue) {
- for (int i = 0; i < acksqueue.size(); ++i) {
- Long newack = acksqueue.poll();
-
- /*
- * Under highly concurrent load, a thread
- * may get into this loop but by the time it
- * tries to read from the queue, the queue
- * is empty. There are two alternatives:
- * synchronize this block, or test if newack
- * is null.
- *
- */
-
- if (newack == m.tag) {
- myAck = true;
- } else
- acksqueue.offer(newack);
- }
- }
- } catch (IOException e) {
- LOG.warn("Sending exception: "
- + e.toString());
- /*
- * Do nothing, just try again
- */
- }
- if (myAck) {
- /*
- * Received ack successfully, so return
- */
- if (challengeMap.get(m.tag) != null)
- challengeMap.remove(m.tag);
- return;
- } else
- attempts++;
- }
- /*
- * Return message to queue for another attempt later if
- * epoch hasn't changed.
- */
- if (m.epoch == logicalclock) {
- challengeMap.remove(m.tag);
- sendqueue.offer(m);
- }
- break;
- case 3:
-
- requestBuffer.clear();
- requestBuffer.putInt(m.type);
- requestBuffer.putLong(m.tag);
- requestBuffer.putInt(m.state.ordinal());
- requestBuffer.putLong(m.leader);
- requestBuffer.putLong(m.zxid);
- requestBuffer.putLong(m.epoch);
-
- requestPacket.setLength(48);
- requestPacket.setSocketAddress(m.addr);
-
- try {
- mySocket.send(requestPacket);
- } catch (IOException e) {
- LOG.warn("Exception while sending ack: "
- + e.toString());
- }
- break;
- }
- }
- }
-
- public boolean queueEmpty() {
- return (sendqueue.isEmpty() || acksqueue.isEmpty() || recvqueue
- .isEmpty());
- }
-
- Messenger(int threads, DatagramSocket s) {
- mySocket = s;
- acksqueue = new LinkedBlockingQueue<Long>();
- challengeMap = new HashMap<Long, Long>();
- challengeMutex = new HashMap<Long, Long>();
- ackMutex = new HashMap<Long, Long>();
- addrChallengeMap = new HashMap<InetSocketAddress, HashMap<Long, Long>>();
- lastProposedLeader = 0;
- lastProposedZxid = 0;
- lastEpoch = 0;
-
- for (int i = 0; i < threads; ++i) {
- Thread t = new Thread(new WorkerSender(3),
- "WorkerSender Thread: " + (i + 1));
- t.setDaemon(true);
- t.start();
- }
-
- for (QuorumServer server : self.quorumPeers) {
- InetSocketAddress saddr = new InetSocketAddress(server.addr
- .getAddress(), port);
- addrChallengeMap.put(saddr, new HashMap<Long, Long>());
- }
-
- Thread t = new Thread(new WorkerReceiver(s, this),
- "WorkerReceiver Thread");
- t.start();
- }
-
- }
-
- QuorumPeer self;
- int port;
- long logicalclock; /* Election instance */
- DatagramSocket mySocket;
- long proposedLeader;
- long proposedZxid;
-
- public AuthFastLeaderElection(QuorumPeer self, int electionPort,
- boolean auth) {
- this.authEnabled = auth;
- starter(self, electionPort);
- }
-
- public AuthFastLeaderElection(QuorumPeer self, int electionPort) {
- starter(self, electionPort);
- }
-
- private void starter(QuorumPeer self, int electionPort) {
- this.self = self;
- port = electionPort;
- proposedLeader = -1;
- proposedZxid = -1;
-
- try {
- mySocket = new DatagramSocket(port);
- // mySocket.setSoTimeout(20000);
- } catch (SocketException e1) {
- e1.printStackTrace();
- throw new RuntimeException();
- }
- sendqueue = new LinkedBlockingQueue<ToSend>(2 * self.quorumPeers.size());
- recvqueue = new LinkedBlockingQueue<Notification>(2 * self.quorumPeers
- .size());
- new Messenger(self.quorumPeers.size() * 2, mySocket);
- }
-
- private void leaveInstance() {
- logicalclock++;
- // sendqueue.clear();
- // recvqueue.clear();
- }
-
- public static class ElectionResult {
- public Vote vote;
-
- public int count;
-
- public Vote winner;
-
- public int winningCount;
- }
-
- private void sendNotifications() {
- for (QuorumServer server : self.quorumPeers) {
- InetSocketAddress saddr = new InetSocketAddress(server.addr
- .getAddress(), port);
-
- ToSend notmsg = new ToSend(ToSend.mType.notification,
- AuthFastLeaderElection.sequencer++, proposedLeader,
- proposedZxid, logicalclock, QuorumPeer.ServerState.LOOKING,
- saddr);
-
- sendqueue.offer(notmsg);
- }
- }
-
- private boolean totalOrderPredicate(long id, long zxid) {
- if ((zxid > proposedZxid)
- || ((zxid == proposedZxid) && (id > proposedLeader)))
- return true;
- else
- return false;
-
- }
-
- private boolean termPredicate(HashMap<InetSocketAddress, Vote> votes,
- long l, long zxid) {
-
- int count = 0;
- Collection<Vote> votesCast = votes.values();
- /*
- * First make the views consistent. Sometimes peers will have different
- * zxids for a server depending on timing.
- */
- for (Vote v : votesCast) {
- if ((v.id == l) && (v.zxid == zxid))
- count++;
- }
-
- if (count > (self.quorumPeers.size() / 2))
- return true;
- else
- return false;
-
- }
-
- public Vote lookForLeader() throws InterruptedException {
- HashMap<InetSocketAddress, Vote> recvset =
- new HashMap<InetSocketAddress, Vote>();
-
- HashMap<InetSocketAddress, Vote> outofelection =
- new HashMap<InetSocketAddress, Vote>();
-
- logicalclock++;
-
- proposedLeader = self.getId();
- proposedZxid = self.getLastLoggedZxid();
-
- LOG.warn("Election tally");
- sendNotifications();
-
- /*
- * Loop in which we exchange notifications until we find a leader
- */
-
- while (self.getPeerState() == ServerState.LOOKING) {
- /*
- * Remove next notification from queue, times out after 2 times the
- * termination time
- */
- Notification n = recvqueue.poll(2 * finalizeWait,
- TimeUnit.MILLISECONDS);
-
- /*
- * Sends more notifications if haven't received enough. Otherwise
- * processes new notification.
- */
- if (n == null) {
- if (((!outofelection.isEmpty()) || (recvset.size() > 1)))
- sendNotifications();
- } else
- switch (n.state) {
- case LOOKING:
- if (n.epoch > logicalclock) {
- logicalclock = n.epoch;
- recvset.clear();
- if (totalOrderPredicate(n.leader, n.zxid)) {
- proposedLeader = n.leader;
- proposedZxid = n.zxid;
- }
- sendNotifications();
- } else if (n.epoch < logicalclock) {
- break;
- } else if (totalOrderPredicate(n.leader, n.zxid)) {
- proposedLeader = n.leader;
- proposedZxid = n.zxid;
-
- sendNotifications();
- }
-
- recvset.put(n.addr, new Vote(n.leader, n.zxid));
-
- // If have received from all nodes, then terminate
- if (self.quorumPeers.size() == recvset.size()) {
- self.setPeerState((proposedLeader == self.getId()) ?
- ServerState.LEADING: ServerState.FOLLOWING);
- // if (self.state == ServerState.FOLLOWING) {
- // Thread.sleep(100);
- // }
- leaveInstance();
- return new Vote(proposedLeader, proposedZxid);
-
- } else if (termPredicate(recvset, proposedLeader,
- proposedZxid)) {
- // Otherwise, wait for a fixed amount of time
- LOG.warn("Passed predicate");
- Thread.sleep(finalizeWait);
-
- // Notification probe = recvqueue.peek();
-
- // Verify if there is any change in the proposed leader
- while ((!recvqueue.isEmpty())
- && !totalOrderPredicate(
- recvqueue.peek().leader, recvqueue
- .peek().zxid)) {
- recvqueue.poll();
- }
- if (recvqueue.isEmpty()) {
- // LOG.warn("Proposed leader: " +
- // proposedLeader);
- self.setPeerState((proposedLeader == self.getId()) ?
- ServerState.LEADING: ServerState.FOLLOWING);
- // if (self.state == ServerState.FOLLOWING) {
- // Thread.sleep(100);
- // }
-
- leaveInstance();
- return new Vote(proposedLeader, proposedZxid);
- }
- }
- break;
- case LEADING:
- outofelection.put(n.addr, new Vote(n.leader, n.zxid));
-
- if (termPredicate(outofelection, n.leader, n.zxid)) {
-
- self.setPeerState((n.leader == self.getId()) ?
- ServerState.LEADING: ServerState.FOLLOWING);
-
- leaveInstance();
- return new Vote(n.leader, n.zxid);
- }
- break;
- case FOLLOWING:
- outofelection.put(n.addr, new Vote(n.leader, n.zxid));
-
- if (termPredicate(outofelection, n.leader, n.zxid)) {
-
- self.setPeerState((n.leader == self.getId()) ?
- ServerState.LEADING: ServerState.FOLLOWING);
-
- leaveInstance();
- return new Vote(n.leader, n.zxid);
- }
- break;
- default:
- break;
- }
- }
-
- return null;
- }
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.quorum;
+
+import java.io.IOException;
+import java.net.DatagramPacket;
+import java.net.DatagramSocket;
+import java.net.InetSocketAddress;
+import java.net.SocketException;
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.Random;
+
+import org.apache.log4j.Logger;
+
+import org.apache.zookeeper.server.quorum.Election;
+import org.apache.zookeeper.server.quorum.Vote;
+import org.apache.zookeeper.server.quorum.QuorumPeer.QuorumServer;
+import org.apache.zookeeper.server.quorum.QuorumPeer.ServerState;
+
+public class AuthFastLeaderElection implements Election {
+ private static final Logger LOG = Logger.getLogger(AuthFastLeaderElection.class);
+
+ /* Sequence numbers for messages */
+ static int sequencer = 0;
+ static int maxTag = 0;
+
+ /*
+ * Determine how much time a process has to wait once it believes that it
+ * has reached the end of leader election.
+ */
+ static int finalizeWait = 100;
+
+ /*
+ * Challenge counter to avoid replay attacks
+ */
+
+ static int challengeCounter = 0;
+
+ /*
+ * Flag to determine whether to authenticate or not
+ */
+
+ private boolean authEnabled = false;
+
+ static public class Notification {
+ /*
+ * Proposed leader
+ */
+ long leader;
+
+ /*
+ * zxid of the proposed leader
+ */
+ long zxid;
+
+ /*
+ * Epoch
+ */
+ long epoch;
+
+ /*
+ * current state of sender
+ */
+ QuorumPeer.ServerState state;
+
+ /*
+ * Address of the sender
+ */
+ InetSocketAddress addr;
+ }
+
+ /*
+ * Messages to send, both Notifications and Acks
+ */
+ static public class ToSend {
+ static enum mType {
+ crequest, challenge, notification, ack
+ };
+
+ ToSend(mType type, long tag, long leader, long zxid, long epoch,
+ ServerState state, InetSocketAddress addr) {
+
+ switch (type) {
+ case crequest:
+ this.type = 0;
+ this.tag = tag;
+ this.leader = leader;
+ this.zxid = zxid;
+ this.epoch = epoch;
+ this.state = state;
+ this.addr = addr;
+
+ break;
+ case challenge:
+ this.type = 1;
+ this.tag = tag;
+ this.leader = leader;
+ this.zxid = zxid;
+ this.epoch = epoch;
+ this.state = state;
+ this.addr = addr;
+
+ break;
+ case notification:
+ this.type = 2;
+ this.leader = leader;
+ this.zxid = zxid;
+ this.epoch = epoch;
+ this.state = QuorumPeer.ServerState.LOOKING;
+ this.tag = tag;
+ this.addr = addr;
+
+ break;
+ case ack:
+ this.type = 3;
+ this.tag = tag;
+ this.leader = leader;
+ this.zxid = zxid;
+ this.epoch = epoch;
+ this.state = state;
+ this.addr = addr;
+
+ break;
+ default:
+ break;
+ }
+ }
+
+ /*
+ * Message type: 0 notification, 1 acknowledgement
+ */
+ int type;
+
+ /*
+ * Proposed leader in the case of notification
+ */
+ long leader;
+
+ /*
+ * id contains the tag for acks, and zxid for notifications
+ */
+ long zxid;
+
+ /*
+ * Epoch
+ */
+ long epoch;
+
+ /*
+ * Current state;
+ */
+ QuorumPeer.ServerState state;
+
+ /*
+ * Message tag
+ */
+ long tag;
+
+ InetSocketAddress addr;
+ }
+
+ LinkedBlockingQueue<ToSend> sendqueue;
+
+ LinkedBlockingQueue<Notification> recvqueue;
+
+ private class Messenger {
+
+ DatagramSocket mySocket;
+ long lastProposedLeader;
+ long lastProposedZxid;
+ long lastEpoch;
+ LinkedBlockingQueue<Long> acksqueue;
+ HashMap<Long, Long> challengeMap;
+ HashMap<Long, Long> challengeMutex;
+ HashMap<Long, Long> ackMutex;
+ HashMap<InetSocketAddress, HashMap<Long, Long>> addrChallengeMap;
+
+ class WorkerReceiver implements Runnable {
+
+ DatagramSocket mySocket;
+ Messenger myMsg;
+
+ WorkerReceiver(DatagramSocket s, Messenger msg) {
+ mySocket = s;
+ myMsg = msg;
+ }
+
+ boolean saveChallenge(long tag, long challenge) {
+
+ Long l = challengeMutex.get(tag);
+
+ synchronized (challengeMap) {
+ challengeMap.put(tag, challenge);
+ challengeMutex.remove(tag);
+ }
+
+ if (l != null) {
+ synchronized(l){
+ l.notify();
+ }
+ }
+
+ return true;
+ }
+
+ public void run() {
+ byte responseBytes[] = new byte[48];
+ ByteBuffer responseBuffer = ByteBuffer.wrap(responseBytes);
+ DatagramPacket responsePacket = new DatagramPacket(
+ responseBytes, responseBytes.length);
+ while (true) {
+ // Sleeps on receive
+ try {
+ responseBuffer.clear();
+ mySocket.receive(responsePacket);
+ } catch (IOException e) {
+ LOG.warn("Exception receiving: " + e.toString());
+ }
+ // Receive new message
+ if (responsePacket.getLength() != responseBytes.length) {
+ LOG.error("Got a short response: "
+ + responsePacket.getLength() + " "
+ + responsePacket.toString());
+ continue;
+ }
+ responseBuffer.clear();
+ int type = responseBuffer.getInt();
+ if ((type > 3) || (type < 0)) {
+ LOG.error("Got bad Msg type: " + type);
+ continue;
+ }
+ long tag = responseBuffer.getLong();
+
+ QuorumPeer.ServerState ackstate = QuorumPeer.ServerState.LOOKING;
+ switch (responseBuffer.getInt()) {
+ case 0:
+ ackstate = QuorumPeer.ServerState.LOOKING;
+ break;
+ case 1:
+ ackstate = QuorumPeer.ServerState.LEADING;
+ break;
+ case 2:
+ ackstate = QuorumPeer.ServerState.FOLLOWING;
+ break;
+ }
+
+ switch (type) {
+ case 0:
+ // Receive challenge request
+ ToSend c = new ToSend(ToSend.mType.challenge, tag,
+ self.currentVote.id, self.currentVote.zxid,
+ logicalclock, self.getPeerState(),
+ (InetSocketAddress) responsePacket
+ .getSocketAddress());
+ sendqueue.offer(c);
+ break;
+ case 1:
+ // Receive challenge and store somewhere else
+ long challenge = responseBuffer.getLong();
+ saveChallenge(tag, challenge);
+
+ break;
+ case 2:
+ Notification n = new Notification();
+ n.leader = responseBuffer.getLong();
+ n.zxid = responseBuffer.getLong();
+ n.epoch = responseBuffer.getLong();
+ n.state = ackstate;
+ n.addr = (InetSocketAddress) responsePacket
+ .getSocketAddress();
+
+ if ((myMsg.lastEpoch <= n.epoch)
+ && ((n.zxid > myMsg.lastProposedZxid)
+ || ((n.zxid == myMsg.lastProposedZxid)
+ && (n.leader > myMsg.lastProposedLeader)))) {
+ myMsg.lastProposedZxid = n.zxid;
+ myMsg.lastProposedLeader = n.leader;
+ myMsg.lastEpoch = n.epoch;
+ }
+
+ long recChallenge;
+ InetSocketAddress addr = (InetSocketAddress) responsePacket
+ .getSocketAddress();
+ if (authEnabled) {
+ if (addrChallengeMap.get(addr).get(tag) != null) {
+ recChallenge = responseBuffer.getLong();
+
+ if (addrChallengeMap.get(addr).get(tag) == recChallenge) {
+ recvqueue.offer(n);
+
+ ToSend a = new ToSend(ToSend.mType.ack,
+ tag, self.currentVote.id,
+ self.currentVote.zxid,
+ logicalclock, self.getPeerState(),
+ (InetSocketAddress) addr);
+
+ sendqueue.offer(a);
+ } else {
+ LOG.warn("Incorrect challenge: "
+ + recChallenge + ", "
+ + addrChallengeMap.toString());
+ }
+ } else {
+ LOG.warn("No challenge for host: " + addr
+ + " " + tag);
+ }
+ } else {
+ recvqueue.offer(n);
+
+ ToSend a = new ToSend(ToSend.mType.ack, tag,
+ self.currentVote.id, self.currentVote.zxid,
+ logicalclock, self.getPeerState(),
+ (InetSocketAddress) responsePacket
+ .getSocketAddress());
+
+ sendqueue.offer(a);
+ }
+ break;
+
+ // Upon reception of an ack message, remove it from the
+ // queue
+ case 3:
+ Long l = ackMutex.get(tag);
+ if (l != null) {
+ synchronized(l){
+ l.notify();
+ }
+ }
+ acksqueue.offer(tag);
+
+ if (authEnabled) {
+ addrChallengeMap.get(
+ (InetSocketAddress) responsePacket
+ .getSocketAddress()).remove(tag);
+ }
+
+ if (ackstate != QuorumPeer.ServerState.LOOKING) {
+ Notification outofsync = new Notification();
+ outofsync.leader = responseBuffer.getLong();
+ outofsync.zxid = responseBuffer.getLong();
+ outofsync.epoch = responseBuffer.getLong();
+ outofsync.state = ackstate;
+ outofsync.addr = (InetSocketAddress) responsePacket
+ .getSocketAddress();
+
+ recvqueue.offer(outofsync);
+ }
+
+ break;
+ // Default case
+ default:
+ LOG.warn("Received message of incorrect type");
+ break;
+ }
+ }
+ }
+ }
+
+ class WorkerSender implements Runnable {
+
+ Random rand;
+ boolean processing;
+ int maxAttempts;
+ int ackWait = finalizeWait;
+
+ /*
+ * Receives a socket and max number of attempts as input
+ */
+
+ WorkerSender(int attempts) {
+ maxAttempts = attempts;
+ rand = new Random(java.lang.Thread.currentThread().getId()
+ + System.currentTimeMillis());
+ }
+
+ long genChallenge() {
+ byte buf[] = new byte[8];
+
+ buf[0] = (byte) ((challengeCounter & 0xff000000) >>> 24);
+ buf[1] = (byte) ((challengeCounter & 0x00ff0000) >>> 16);
+ buf[2] = (byte) ((challengeCounter & 0x0000ff00) >>> 8);
+ buf[3] = (byte) ((challengeCounter & 0x000000ff));
+
+ challengeCounter++;
+ int secret = rand.nextInt(java.lang.Integer.MAX_VALUE);
+
+ buf[4] = (byte) ((secret & 0xff000000) >>> 24);
+ buf[5] = (byte) ((secret & 0x00ff0000) >>> 16);
+ buf[6] = (byte) ((secret & 0x0000ff00) >>> 8);
+ buf[7] = (byte) ((secret & 0x000000ff));
+
+ return (((long)(buf[0] & 0xFF)) << 56)
+ + (((long)(buf[1] & 0xFF)) << 48)
+ + (((long)(buf[2] & 0xFF)) << 40)
+ + (((long)(buf[3] & 0xFF)) << 32)
+ + (((long)(buf[4] & 0xFF)) << 24)
+ + (((long)(buf[5] & 0xFF)) << 16)
+ + (((long)(buf[6] & 0xFF)) << 8)
+ + ((long)(buf[7] & 0xFF));
+ }
+
+ public void run() {
+ while (true) {
+ try {
+ ToSend m = sendqueue.take();
+ process(m);
+ } catch (InterruptedException e) {
+ break;
+ }
+
+ }
+ }
+
+ private void process(ToSend m) {
+ int attempts = 0;
+ byte zeroes[];
+ byte requestBytes[] = new byte[48];
+ DatagramPacket requestPacket = new DatagramPacket(requestBytes,
+ requestBytes.length);
+ ByteBuffer requestBuffer = ByteBuffer.wrap(requestBytes);
+
+ switch (m.type) {
+ case 0:
+ /*
+ * Building challenge request packet to send
+ */
+ requestBuffer.clear();
+ requestBuffer.putInt(ToSend.mType.crequest.ordinal());
+ requestBuffer.putLong(m.tag);
+ requestBuffer.putInt(m.state.ordinal());
+ zeroes = new byte[32];
+ requestBuffer.put(zeroes);
+
+ requestPacket.setLength(48);
+ requestPacket.setSocketAddress(m.addr);
+
+ try {
+ if (challengeMap.get(m.tag) == null) {
+ mySocket.send(requestPacket);
+ }
+ } catch (IOException e) {
+ LOG.warn("Exception while sending challenge: "
+ + e.toString());
+ }
+
+ break;
+ case 1:
+ /*
+ * Building challenge packet to send
+ */
+
+ long newChallenge;
+ if (addrChallengeMap.get(m.addr).containsKey(m.tag)) {
+ newChallenge = addrChallengeMap.get(m.addr).get(m.tag);
+ } else {
+ newChallenge = genChallenge();
+ }
+
+ addrChallengeMap.get(m.addr).put(m.tag, newChallenge);
+
+ requestBuffer.clear();
+ requestBuffer.putInt(ToSend.mType.challenge.ordinal());
+ requestBuffer.putLong(m.tag);
+ requestBuffer.putInt(m.state.ordinal());
+ requestBuffer.putLong(newChallenge);
+ zeroes = new byte[24];
+ requestBuffer.put(zeroes);
+
+ requestPacket.setLength(48);
+ requestPacket.setSocketAddress(m.addr);
+
+ try {
+ mySocket.send(requestPacket);
+ } catch (IOException e) {
+ LOG.warn("Exception while sending challenge: "
+ + e.toString());
+ }
+
+ break;
+ case 2:
+
+ /*
+ * Building notification packet to send
+ */
+
+ requestBuffer.clear();
+ requestBuffer.putInt(m.type);
+ requestBuffer.putLong(m.tag);
+ requestBuffer.putInt(m.state.ordinal());
+ requestBuffer.putLong(m.leader);
+ requestBuffer.putLong(m.zxid);
+ requestBuffer.putLong(m.epoch);
+ zeroes = new byte[8];
+ requestBuffer.put(zeroes);
+
+ requestPacket.setLength(48);
+ requestPacket.setSocketAddress(m.addr);
+
+ boolean myChallenge = false;
+ boolean myAck = false;
+
+ while (attempts < maxAttempts) {
+ try {
+ /*
+ * Try to obtain a challenge only if does not have
+ * one yet
+ */
+
+ if (!myChallenge && authEnabled) {
+ ToSend crequest = new ToSend(
+ ToSend.mType.crequest, m.tag, m.leader,
+ m.zxid, m.epoch,
+ QuorumPeer.ServerState.LOOKING, m.addr);
+ sendqueue.offer(crequest);
+
+ try {
+ double timeout = ackWait
+ * java.lang.Math.pow(2, attempts);
+
+ Long l = Long.valueOf(m.tag);
+ synchronized (l) {
+ challengeMutex.put(m.tag, l);
+ l.wait((long) timeout);
+ myChallenge = challengeMap
+ .containsKey(m.tag);
+ }
+ } catch (InterruptedException e) {
+ LOG.warn("Challenge request exception: "
+ + e.toString());
+ }
+ }
+
+ /*
+ * If don't have challenge yet, skip sending
+ * notification
+ */
+
+ if (authEnabled && !myChallenge) {
+ attempts++;
+ continue;
+ }
+
+ if (authEnabled) {
+ requestBuffer.position(40);
+ requestBuffer.putLong(challengeMap.get(m.tag));
+ }
+ mySocket.send(requestPacket);
+ try {
+ Long l = Long.valueOf(m.tag);
+ double timeout = ackWait
+ * java.lang.Math.pow(10, attempts);
+ synchronized (l) {
+ ackMutex.put(m.tag, l);
+ l.wait((int) timeout);
+ }
+ } catch (InterruptedException e) {
+ LOG.warn("Ack exception: "
+ + e.toString());
+ }
+ synchronized (acksqueue) {
+ for (int i = 0; i < acksqueue.size(); ++i) {
+ Long newack = acksqueue.poll();
+
+ /*
+ * Under highly concurrent load, a thread
+ * may get into this loop but by the time it
+ * tries to read from the queue, the queue
+ * is empty. There are two alternatives:
+ * synchronize this block, or test if newack
+ * is null.
+ *
+ */
+
+ if (newack == m.tag) {
+ myAck = true;
+ } else
+ acksqueue.offer(newack);
+ }
+ }
+ } catch (IOException e) {
+ LOG.warn("Sending exception: "
+ + e.toString());
+ /*
+ * Do nothing, just try again
+ */
+ }
+ if (myAck) {
+ /*
+ * Received ack successfully, so return
+ */
+ if (challengeMap.get(m.tag) != null)
+ challengeMap.remove(m.tag);
+ return;
+ } else
+ attempts++;
+ }
+ /*
+ * Return message to queue for another attempt later if
+ * epoch hasn't changed.
+ */
+ if (m.epoch == logicalclock) {
+ challengeMap.remove(m.tag);
+ sendqueue.offer(m);
+ }
+ break;
+ case 3:
+
+ requestBuffer.clear();
+ requestBuffer.putInt(m.type);
+ requestBuffer.putLong(m.tag);
+ requestBuffer.putInt(m.state.ordinal());
+ requestBuffer.putLong(m.leader);
+ requestBuffer.putLong(m.zxid);
+ requestBuffer.putLong(m.epoch);
+
+ requestPacket.setLength(48);
+ requestPacket.setSocketAddress(m.addr);
+
+ try {
+ mySocket.send(requestPacket);
+ } catch (IOException e) {
+ LOG.warn("Exception while sending ack: "
+ + e.toString());
+ }
+ break;
+ }
+ }
+ }
+
+ public boolean queueEmpty() {
+ return (sendqueue.isEmpty() || acksqueue.isEmpty() || recvqueue
+ .isEmpty());
+ }
+
+ Messenger(int threads, DatagramSocket s) {
+ mySocket = s;
+ acksqueue = new LinkedBlockingQueue<Long>();
+ challengeMap = new HashMap<Long, Long>();
+ challengeMutex = new HashMap<Long, Long>();
+ ackMutex = new HashMap<Long, Long>();
+ addrChallengeMap = new HashMap<InetSocketAddress, HashMap<Long, Long>>();
+ lastProposedLeader = 0;
+ lastProposedZxid = 0;
+ lastEpoch = 0;
+
+ for (int i = 0; i < threads; ++i) {
+ Thread t = new Thread(new WorkerSender(3),
+ "WorkerSender Thread: " + (i + 1));
+ t.setDaemon(true);
+ t.start();
+ }
+
+ for (QuorumServer server : self.quorumPeers) {
+ InetSocketAddress saddr = new InetSocketAddress(server.addr
+ .getAddress(), port);
+ addrChallengeMap.put(saddr, new HashMap<Long, Long>());
+ }
+
+ Thread t = new Thread(new WorkerReceiver(s, this),
+ "WorkerReceiver Thread");
+ t.start();
+ }
+
+ }
+
+ QuorumPeer self;
+ int port;
+ long logicalclock; /* Election instance */
+ DatagramSocket mySocket;
+ long proposedLeader;
+ long proposedZxid;
+
+ public AuthFastLeaderElection(QuorumPeer self, int electionPort,
+ boolean auth) {
+ this.authEnabled = auth;
+ starter(self, electionPort);
+ }
+
+ public AuthFastLeaderElection(QuorumPeer self, int electionPort) {
+ starter(self, electionPort);
+ }
+
+ private void starter(QuorumPeer self, int electionPort) {
+ this.self = self;
+ port = electionPort;
+ proposedLeader = -1;
+ proposedZxid = -1;
+
+ try {
+ mySocket = new DatagramSocket(port);
+ // mySocket.setSoTimeout(20000);
+ } catch (SocketException e1) {
+ e1.printStackTrace();
+ throw new RuntimeException();
+ }
+ sendqueue = new LinkedBlockingQueue<ToSend>(2 * self.quorumPeers.size());
+ recvqueue = new LinkedBlockingQueue<Notification>(2 * self.quorumPeers
+ .size());
+ new Messenger(self.quorumPeers.size() * 2, mySocket);
+ }
+
+ private void leaveInstance() {
+ logicalclock++;
+ // sendqueue.clear();
+ // recvqueue.clear();
+ }
+
+ public static class ElectionResult {
+ public Vote vote;
+
+ public int count;
+
+ public Vote winner;
+
+ public int winningCount;
+ }
+
+ private void sendNotifications() {
+ for (QuorumServer server : self.quorumPeers) {
+ InetSocketAddress saddr = new InetSocketAddress(server.addr
+ .getAddress(), port);
+
+ ToSend notmsg = new ToSend(ToSend.mType.notification,
+ AuthFastLeaderElection.sequencer++, proposedLeader,
+ proposedZxid, logicalclock, QuorumPeer.ServerState.LOOKING,
+ saddr);
+
+ sendqueue.offer(notmsg);
+ }
+ }
+
+ private boolean totalOrderPredicate(long id, long zxid) {
+ if ((zxid > proposedZxid)
+ || ((zxid == proposedZxid) && (id > proposedLeader)))
+ return true;
+ else
+ return false;
+
+ }
+
+ private boolean termPredicate(HashMap<InetSocketAddress, Vote> votes,
+ long l, long zxid) {
+
+ int count = 0;
+ Collection<Vote> votesCast = votes.values();
+ /*
+ * First make the views consistent. Sometimes peers will have different
+ * zxids for a server depending on timing.
+ */
+ for (Vote v : votesCast) {
+ if ((v.id == l) && (v.zxid == zxid))
+ count++;
+ }
+
+ if (count > (self.quorumPeers.size() / 2))
+ return true;
+ else
+ return false;
+
+ }
+
+ public Vote lookForLeader() throws InterruptedException {
+ HashMap<InetSocketAddress, Vote> recvset =
+ new HashMap<InetSocketAddress, Vote>();
+
+ HashMap<InetSocketAddress, Vote> outofelection =
+ new HashMap<InetSocketAddress, Vote>();
+
+ logicalclock++;
+
+ proposedLeader = self.getId();
+ proposedZxid = self.getLastLoggedZxid();
+
+ LOG.warn("Election tally");
+ sendNotifications();
+
+ /*
+ * Loop in which we exchange notifications until we find a leader
+ */
+
+ while (self.getPeerState() == ServerState.LOOKING) {
+ /*
+ * Remove next notification from queue, times out after 2 times the
+ * termination time
+ */
+ Notification n = recvqueue.poll(2 * finalizeWait,
+ TimeUnit.MILLISECONDS);
+
+ /*
+ * Sends more notifications if haven't received enough. Otherwise
+ * processes new notification.
+ */
+ if (n == null) {
+ if (((!outofelection.isEmpty()) || (recvset.size() > 1)))
+ sendNotifications();
+ } else
+ switch (n.state) {
+ case LOOKING:
+ if (n.epoch > logicalclock) {
+ logicalclock = n.epoch;
+ recvset.clear();
+ if (totalOrderPredicate(n.leader, n.zxid)) {
+ proposedLeader = n.leader;
+ proposedZxid = n.zxid;
+ }
+ sendNotifications();
+ } else if (n.epoch < logicalclock) {
+ break;
+ } else if (totalOrderPredicate(n.leader, n.zxid)) {
+ proposedLeader = n.leader;
+ proposedZxid = n.zxid;
+
+ sendNotifications();
+ }
+
+ recvset.put(n.addr, new Vote(n.leader, n.zxid));
+
+ // If have received from all nodes, then terminate
+ if (self.quorumPeers.size() == recvset.size()) {
+ self.setPeerState((proposedLeader == self.getId()) ?
+ ServerState.LEADING: ServerState.FOLLOWING);
+ // if (self.state == ServerState.FOLLOWING) {
+ // Thread.sleep(100);
+ // }
+ leaveInstance();
+ return new Vote(proposedLeader, proposedZxid);
+
+ } else if (termPredicate(recvset, proposedLeader,
+ proposedZxid)) {
+ // Otherwise, wait for a fixed amount of time
+ LOG.warn("Passed predicate");
+ Thread.sleep(finalizeWait);
+
+ // Notification probe = recvqueue.peek();
+
+ // Verify if there is any change in the proposed leader
+ while ((!recvqueue.isEmpty())
+ && !totalOrderPredicate(
+ recvqueue.peek().leader, recvqueue
+ .peek().zxid)) {
+ recvqueue.poll();
+ }
+ if (recvqueue.isEmpty()) {
+ // LOG.warn("Proposed leader: " +
+ // proposedLeader);
+ self.setPeerState((proposedLeader == self.getId()) ?
+ ServerState.LEADING: ServerState.FOLLOWING);
+ // if (self.state == ServerState.FOLLOWING) {
+ // Thread.sleep(100);
+ // }
+
+ leaveInstance();
+ return new Vote(proposedLeader, proposedZxid);
+ }
+ }
+ break;
+ case LEADING:
+ outofelection.put(n.addr, new Vote(n.leader, n.zxid));
+
+ if (termPredicate(outofelection, n.leader, n.zxid)) {
+
+ self.setPeerState((n.leader == self.getId()) ?
+ ServerState.LEADING: ServerState.FOLLOWING);
+
+ leaveInstance();
+ return new Vote(n.leader, n.zxid);
+ }
+ break;
+ case FOLLOWING:
+ outofelection.put(n.addr, new Vote(n.leader, n.zxid));
+
+ if (termPredicate(outofelection, n.leader, n.zxid)) {
+
+ self.setPeerState((n.leader == self.getId()) ?
+ ServerState.LEADING: ServerState.FOLLOWING);
+
+ leaveInstance();
+ return new Vote(n.leader, n.zxid);
+ }
+ break;
+ default:
+ break;
+ }
+ }
+
+ return null;
+ }
+}
Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/CommitProcessor.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/CommitProcessor.java?rev=671303&r1=671302&r2=671303&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/CommitProcessor.java (original)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/CommitProcessor.java Tue Jun 24 12:04:58 2008
@@ -1,177 +1,177 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.zookeeper.server.quorum;
-
-import java.util.ArrayList;
-import java.util.LinkedList;
-
-import org.apache.log4j.Logger;
-
-import org.apache.zookeeper.ZooDefs.OpCode;
-import org.apache.zookeeper.server.Request;
-import org.apache.zookeeper.server.RequestProcessor;
-import org.apache.zookeeper.server.ZooTrace;
-
-/**
- * This RequestProcessor matches the incoming committed requests with the
- * locally submitted requests. The trick is that locally submitted requests that
- * change the state of the system will come back as incoming committed requests,
- * so we need to match them up.
- */
-public class CommitProcessor extends Thread implements RequestProcessor {
- private static final Logger LOG = Logger.getLogger(CommitProcessor.class);
-
- /**
- * Requests that we are holding until the commit comes in.
- */
- LinkedList<Request> queuedRequests = new LinkedList<Request>();
-
- /**
- * Requests that have been committed.
- */
- LinkedList<Request> committedRequests = new LinkedList<Request>();
-
- /*
- * Pending sync requests
- */
- LinkedList<Request> pendingSyncs = new LinkedList<Request>();
-
- RequestProcessor nextProcessor;
-
- public CommitProcessor(RequestProcessor nextProcessor) {
- this.nextProcessor = nextProcessor;
- start();
- }
-
- boolean finished = false;
-
- public void run() {
- try {
- Request nextPending = null;
- ArrayList<Request> toProcess = new ArrayList<Request>();
- while (!finished) {
- int len = toProcess.size();
- for (int i = 0; i < len; i++) {
- nextProcessor.processRequest(toProcess.get(i));
- }
- toProcess.clear();
- synchronized (this) {
- if ((queuedRequests.size() == 0 || nextPending != null)
- && committedRequests.size() == 0) {
- wait();
- continue;
- }
- // First check and see if the commit came in for the pending
- // request
- if ((queuedRequests.size() == 0 || nextPending != null)
- && committedRequests.size() > 0) {
- Request r = committedRequests.remove();
- /*
- * We match with nextPending so that we can move to the
- * next request when it is committed. We also want to
- * use nextPending because it has the cnxn member set
- * properly.
- */
- if (nextPending != null
- && nextPending.sessionId == r.sessionId
- && nextPending.cxid == r.cxid) {
- // we want to send our version of the request.
- // the pointer to the connection in the request
- nextPending.hdr = r.hdr;
- nextPending.txn = r.txn;
- nextPending.zxid = r.zxid;
- toProcess.add(nextPending);
- nextPending = null;
- } else {
- // this request came from someone else so just
- // send the commit packet
- toProcess.add(r);
- }
- }
- }
-
- // We haven't matched the pending requests, so go back to
- // waiting
- if (nextPending != null) {
- continue;
- }
-
- synchronized (this) {
- // Process the next requests in the queuedRequests
- while (nextPending == null && queuedRequests.size() > 0) {
- Request request = queuedRequests.remove();
- switch (request.type) {
- case OpCode.create:
- case OpCode.delete:
- case OpCode.setData:
- case OpCode.setACL:
- case OpCode.createSession:
- case OpCode.closeSession:
- nextPending = request;
- break;
- case OpCode.sync:
- nextPending = request;
- pendingSyncs.add(request);
- break;
- default:
- toProcess.add(request);
- }
- }
- }
- }
- } catch (Exception e) {
- LOG.error("FIXMSG",e);
- }
- ZooTrace.logTraceMessage(LOG, ZooTrace.getTextTraceLevel(),
- "CommitProcessor exited loop!");
- }
-
- synchronized public void commit(Request request) {
- if (!finished) {
- if (request == null) {
- LOG.warn("Committed a null!",
- new Exception("committing a null! "));
- return;
- }
- committedRequests.add(request);
- notifyAll();
- }
- }
-
- synchronized public void processRequest(Request request) {
- // request.addRQRec(">commit");
- // LOG.info("Zoo processReq>>> cxid = " + request.cxid + " type =
- // " + request.type + " id = " + request.sessionId + " cnxn " +
- // request.cnxn);
- if (!finished) {
- queuedRequests.add(request);
- notifyAll();
- }
- }
-
- public void shutdown() {
- finished = true;
- queuedRequests.clear();
- synchronized (this) {
- notifyAll();
- }
- nextProcessor.shutdown();
- }
-
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.quorum;
+
+import java.util.ArrayList;
+import java.util.LinkedList;
+
+import org.apache.log4j.Logger;
+
+import org.apache.zookeeper.ZooDefs.OpCode;
+import org.apache.zookeeper.server.Request;
+import org.apache.zookeeper.server.RequestProcessor;
+import org.apache.zookeeper.server.ZooTrace;
+
+/**
+ * This RequestProcessor matches the incoming committed requests with the
+ * locally submitted requests. The trick is that locally submitted requests that
+ * change the state of the system will come back as incoming committed requests,
+ * so we need to match them up.
+ */
+public class CommitProcessor extends Thread implements RequestProcessor {
+ private static final Logger LOG = Logger.getLogger(CommitProcessor.class);
+
+ /**
+ * Requests that we are holding until the commit comes in.
+ */
+ LinkedList<Request> queuedRequests = new LinkedList<Request>();
+
+ /**
+ * Requests that have been committed.
+ */
+ LinkedList<Request> committedRequests = new LinkedList<Request>();
+
+ /*
+ * Pending sync requests
+ */
+ LinkedList<Request> pendingSyncs = new LinkedList<Request>();
+
+ RequestProcessor nextProcessor;
+
+ public CommitProcessor(RequestProcessor nextProcessor) {
+ this.nextProcessor = nextProcessor;
+ start();
+ }
+
+ boolean finished = false;
+
+ public void run() {
+ try {
+ Request nextPending = null;
+ ArrayList<Request> toProcess = new ArrayList<Request>();
+ while (!finished) {
+ int len = toProcess.size();
+ for (int i = 0; i < len; i++) {
+ nextProcessor.processRequest(toProcess.get(i));
+ }
+ toProcess.clear();
+ synchronized (this) {
+ if ((queuedRequests.size() == 0 || nextPending != null)
+ && committedRequests.size() == 0) {
+ wait();
+ continue;
+ }
+ // First check and see if the commit came in for the pending
+ // request
+ if ((queuedRequests.size() == 0 || nextPending != null)
+ && committedRequests.size() > 0) {
+ Request r = committedRequests.remove();
+ /*
+ * We match with nextPending so that we can move to the
+ * next request when it is committed. We also want to
+ * use nextPending because it has the cnxn member set
+ * properly.
+ */
+ if (nextPending != null
+ && nextPending.sessionId == r.sessionId
+ && nextPending.cxid == r.cxid) {
+ // we want to send our version of the request.
+ // the pointer to the connection in the request
+ nextPending.hdr = r.hdr;
+ nextPending.txn = r.txn;
+ nextPending.zxid = r.zxid;
+ toProcess.add(nextPending);
+ nextPending = null;
+ } else {
+ // this request came from someone else so just
+ // send the commit packet
+ toProcess.add(r);
+ }
+ }
+ }
+
+ // We haven't matched the pending requests, so go back to
+ // waiting
+ if (nextPending != null) {
+ continue;
+ }
+
+ synchronized (this) {
+ // Process the next requests in the queuedRequests
+ while (nextPending == null && queuedRequests.size() > 0) {
+ Request request = queuedRequests.remove();
+ switch (request.type) {
+ case OpCode.create:
+ case OpCode.delete:
+ case OpCode.setData:
+ case OpCode.setACL:
+ case OpCode.createSession:
+ case OpCode.closeSession:
+ nextPending = request;
+ break;
+ case OpCode.sync:
+ nextPending = request;
+ pendingSyncs.add(request);
+ break;
+ default:
+ toProcess.add(request);
+ }
+ }
+ }
+ }
+ } catch (Exception e) {
+ LOG.error("FIXMSG",e);
+ }
+ ZooTrace.logTraceMessage(LOG, ZooTrace.getTextTraceLevel(),
+ "CommitProcessor exited loop!");
+ }
+
+ synchronized public void commit(Request request) {
+ if (!finished) {
+ if (request == null) {
+ LOG.warn("Committed a null!",
+ new Exception("committing a null! "));
+ return;
+ }
+ committedRequests.add(request);
+ notifyAll();
+ }
+ }
+
+ synchronized public void processRequest(Request request) {
+ // request.addRQRec(">commit");
+ // LOG.info("Zoo processReq>>> cxid = " + request.cxid + " type =
+ // " + request.type + " id = " + request.sessionId + " cnxn " +
+ // request.cnxn);
+ if (!finished) {
+ queuedRequests.add(request);
+ notifyAll();
+ }
+ }
+
+ public void shutdown() {
+ finished = true;
+ queuedRequests.clear();
+ synchronized (this) {
+ notifyAll();
+ }
+ nextProcessor.shutdown();
+ }
+
+}