You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by eo...@apache.org on 2020/01/09 10:27:19 UTC
[zookeeper] branch master updated: ZOOKEEPER-2083: Remove
deprecated class AuthFastLeaderElection
This is an automated email from the ASF dual-hosted git repository.
eolivelli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/zookeeper.git
The following commit(s) were added to refs/heads/master by this push:
new 590e3cb ZOOKEEPER-2083: Remove deprecated class AuthFastLeaderElection
590e3cb is described below
commit 590e3cbcf1967d4de6ebc08a938584e3b4caf7c4
Author: ravowlga123 <ra...@gmail.com>
AuthorDate: Thu Jan 9 11:27:14 2020 +0100
ZOOKEEPER-2083: Remove deprecated class AuthFastLeaderElection
As per [ZOOKEEPER-2083](https://jira.apache.org/jira/browse/ZOOKEEPER-2083) we need to remove class AuthFastLeaderElection.java so I made changes in Quorumpeer.java by removing two cases 1 and 2 present in createElectionAlgorithm as QuorumPeerconfig.electioalg is always 3.
Please do let me know if anything additional needs to be done.
Author: ravowlga123 <ra...@gmail.com>
Reviewers: Enrico Olivelli <eo...@apache.org>, Andor Molnar <an...@apache.org>, Patrick Hunt <ph...@apache.org>
Closes #1171 from ravowlga123/ZOOKEEPER-2083
---
.../src/main/resources/markdown/zookeeperAdmin.md | 11 +-
.../server/quorum/AuthFastLeaderElection.java | 956 ---------------------
.../apache/zookeeper/server/quorum/QuorumPeer.java | 6 +-
.../zookeeper/server/quorum/QuorumPeerConfig.java | 4 +-
.../src/test/resources/findbugsExcludeFile.xml | 5 -
5 files changed, 10 insertions(+), 972 deletions(-)
diff --git a/zookeeper-docs/src/main/resources/markdown/zookeeperAdmin.md b/zookeeper-docs/src/main/resources/markdown/zookeeperAdmin.md
index f46270d..785494b 100644
--- a/zookeeper-docs/src/main/resources/markdown/zookeeperAdmin.md
+++ b/zookeeper-docs/src/main/resources/markdown/zookeeperAdmin.md
@@ -1146,12 +1146,13 @@ of servers -- that is, when deploying clusters of servers.
non-authenticated UDP-based version of fast leader election, "2"
corresponds to the authenticated UDP-based version of fast
leader election, and "3" corresponds to TCP-based version of
- fast leader election. Currently, algorithm 3 is the default.
+ fast leader election. Algorithm 3 was made default in 3.2.0 and
+ prior versions (3.0.0 and 3.1.0) were using algorithm 1 and 2 as well.
###### Note
- >The implementations of leader election 1, and 2 are now
- **deprecated**. We have the intention
- of removing them in the next release, at which point only the
- FastLeaderElection will be available.
+ >The implementations of leader election 1, and 2 were
+ **deprecated** in 3.4.0. Since 3.6.0 only FastLeaderElection is available,
+ in case of upgrade you have to shutdown all of your servers and
+ restart them with electionAlg=3 (or by removing the line from the configuration file). [...]
* *maxTimeToWaitForEpoch* :
(Java system property: **zookeeper.leader.maxTimeToWaitForEpoch**)
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/AuthFastLeaderElection.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/AuthFastLeaderElection.java
deleted file mode 100644
index cb0fec8..0000000
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/AuthFastLeaderElection.java
+++ /dev/null
@@ -1,956 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.zookeeper.server.quorum;
-
-import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
-import java.io.IOException;
-import java.net.DatagramPacket;
-import java.net.DatagramSocket;
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.net.SocketException;
-import java.nio.ByteBuffer;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Random;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.Semaphore;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
-import org.apache.zookeeper.common.Time;
-import org.apache.zookeeper.jmx.MBeanRegistry;
-import org.apache.zookeeper.server.ZooKeeperThread;
-import org.apache.zookeeper.server.quorum.QuorumPeer.QuorumServer;
-import org.apache.zookeeper.server.quorum.QuorumPeer.ServerState;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * @deprecated This class has been deprecated as of release 3.4.0.
- */
-@Deprecated
-public class AuthFastLeaderElection implements Election {
-
- private static final Logger LOG = LoggerFactory.getLogger(AuthFastLeaderElection.class);
-
- /* Sequence numbers for messages */
- static int sequencer = 0;
- static int maxTag = 0;
-
- /*
- * Determine how much time a process has to wait once it believes that it
- * has reached the end of leader election.
- */
- static int finalizeWait = 100;
-
- /*
- * Challenge counter to avoid replay attacks
- */
-
- static int challengeCounter = 0;
-
- /*
- * Flag to determine whether to authenticate or not
- */
-
- private boolean authEnabled = false;
-
- public static class Notification {
-
- /*
- * Proposed leader
- */ long leader;
-
- /*
- * zxid of the proposed leader
- */ long zxid;
-
- /*
- * Epoch
- */ long epoch;
-
- /*
- * current state of sender
- */ QuorumPeer.ServerState state;
-
- /*
- * Address of the sender
- */ InetSocketAddress addr;
-
- }
-
- /*
- * Messages to send, both Notifications and Acks
- */
- public static class ToSend {
-
- enum mType {
- crequest,
- challenge,
- notification,
- ack
- }
-
- ToSend(mType type, long tag, long leader, long zxid, long epoch, ServerState state, InetSocketAddress addr) {
-
- switch (type) {
- case crequest:
- this.type = 0;
- this.tag = tag;
- this.leader = leader;
- this.zxid = zxid;
- this.epoch = epoch;
- this.state = state;
- this.addr = addr;
-
- break;
- case challenge:
- this.type = 1;
- this.tag = tag;
- this.leader = leader;
- this.zxid = zxid;
- this.epoch = epoch;
- this.state = state;
- this.addr = addr;
-
- break;
- case notification:
- this.type = 2;
- this.leader = leader;
- this.zxid = zxid;
- this.epoch = epoch;
- this.state = QuorumPeer.ServerState.LOOKING;
- this.tag = tag;
- this.addr = addr;
-
- break;
- case ack:
- this.type = 3;
- this.tag = tag;
- this.leader = leader;
- this.zxid = zxid;
- this.epoch = epoch;
- this.state = state;
- this.addr = addr;
-
- break;
- default:
- break;
- }
- }
-
- /*
- * Message type: 0 notification, 1 acknowledgement
- */ int type;
-
- /*
- * Proposed leader in the case of notification
- */ long leader;
-
- /*
- * id contains the tag for acks, and zxid for notifications
- */ long zxid;
-
- /*
- * Epoch
- */ long epoch;
-
- /*
- * Current state;
- */ QuorumPeer.ServerState state;
-
- /*
- * Message tag
- */ long tag;
-
- InetSocketAddress addr;
-
- }
-
- LinkedBlockingQueue<ToSend> sendqueue;
-
- LinkedBlockingQueue<Notification> recvqueue;
-
- private class Messenger {
-
- final DatagramSocket mySocket;
- long lastProposedLeader;
- long lastProposedZxid;
- long lastEpoch;
- final Set<Long> ackset;
- final ConcurrentHashMap<Long, Long> challengeMap;
- final ConcurrentHashMap<Long, Semaphore> challengeMutex;
- final ConcurrentHashMap<Long, Semaphore> ackMutex;
- final ConcurrentHashMap<InetSocketAddress, ConcurrentHashMap<Long, Long>> addrChallengeMap;
-
- class WorkerReceiver extends ZooKeeperThread {
-
- DatagramSocket mySocket;
- Messenger myMsg;
-
- WorkerReceiver(DatagramSocket s, Messenger msg) {
- super("WorkerReceiver-" + s.getRemoteSocketAddress());
- mySocket = s;
- myMsg = msg;
- }
-
- boolean saveChallenge(long tag, long challenge) {
- Semaphore s = challengeMutex.get(tag);
- if (s != null) {
- synchronized (Messenger.this) {
- challengeMap.put(tag, challenge);
- challengeMutex.remove(tag);
- }
-
- s.release();
- } else {
- LOG.error("No challenge mutex object");
- }
-
- return true;
- }
-
- public void run() {
- byte[] responseBytes = new byte[48];
- ByteBuffer responseBuffer = ByteBuffer.wrap(responseBytes);
- DatagramPacket responsePacket = new DatagramPacket(responseBytes, responseBytes.length);
- while (true) {
- // Sleeps on receive
- try {
- responseBuffer.clear();
- mySocket.receive(responsePacket);
- } catch (IOException e) {
- LOG.warn("Ignoring exception receiving", e);
- }
- // Receive new message
- if (responsePacket.getLength() != responseBytes.length) {
- LOG.warn("Got a short response: {} {}", responsePacket.getLength(), responsePacket.toString());
- continue;
- }
- responseBuffer.clear();
- int type = responseBuffer.getInt();
- if ((type > 3) || (type < 0)) {
- LOG.warn("Got bad Msg type: {}", type);
- continue;
- }
- long tag = responseBuffer.getLong();
-
- QuorumPeer.ServerState ackstate = QuorumPeer.ServerState.LOOKING;
- switch (responseBuffer.getInt()) {
- case 0:
- ackstate = QuorumPeer.ServerState.LOOKING;
- break;
- case 1:
- ackstate = QuorumPeer.ServerState.LEADING;
- break;
- case 2:
- ackstate = QuorumPeer.ServerState.FOLLOWING;
- break;
- default:
- LOG.warn("unknown type {}", responseBuffer.getInt());
- break;
- }
-
- Vote current = self.getCurrentVote();
-
- switch (type) {
- case 0:
- // Receive challenge request
- ToSend c = new ToSend(
- ToSend.mType.challenge,
- tag,
- current.getId(),
- current.getZxid(),
- logicalclock.get(),
- self.getPeerState(),
- (InetSocketAddress) responsePacket.getSocketAddress());
- sendqueue.offer(c);
- break;
- case 1:
- // Receive challenge and store somewhere else
- long challenge = responseBuffer.getLong();
- saveChallenge(tag, challenge);
-
- break;
- case 2:
- Notification n = new Notification();
- n.leader = responseBuffer.getLong();
- n.zxid = responseBuffer.getLong();
- n.epoch = responseBuffer.getLong();
- n.state = ackstate;
- n.addr = (InetSocketAddress) responsePacket.getSocketAddress();
-
- if ((myMsg.lastEpoch <= n.epoch)
- && ((n.zxid > myMsg.lastProposedZxid)
- || ((n.zxid == myMsg.lastProposedZxid)
- && (n.leader > myMsg.lastProposedLeader)))) {
- myMsg.lastProposedZxid = n.zxid;
- myMsg.lastProposedLeader = n.leader;
- myMsg.lastEpoch = n.epoch;
- }
-
- long recChallenge;
- InetSocketAddress addr = (InetSocketAddress) responsePacket.getSocketAddress();
- if (authEnabled) {
- ConcurrentHashMap<Long, Long> tmpMap = addrChallengeMap.get(addr);
- if (tmpMap != null) {
- if (tmpMap.get(tag) != null) {
- recChallenge = responseBuffer.getLong();
-
- if (tmpMap.get(tag) == recChallenge) {
- recvqueue.offer(n);
-
- ToSend a = new ToSend(
- ToSend.mType.ack,
- tag,
- current.getId(),
- current.getZxid(),
- logicalclock.get(),
- self.getPeerState(),
- addr);
-
- sendqueue.offer(a);
- } else {
- LOG.warn("Incorrect challenge: {}, {}", recChallenge, addrChallengeMap.toString());
- }
- } else {
- LOG.warn("No challenge for host: {} {}", addr, tag);
- }
- }
- } else {
- recvqueue.offer(n);
-
- ToSend a = new ToSend(
- ToSend.mType.ack,
- tag,
- current.getId(),
- current.getZxid(),
- logicalclock.get(),
- self.getPeerState(),
- (InetSocketAddress) responsePacket.getSocketAddress());
-
- sendqueue.offer(a);
- }
- break;
-
- // Upon reception of an ack message, remove it from the
- // queue
- case 3:
- Semaphore s = ackMutex.get(tag);
-
- if (s != null) {
- s.release();
- } else {
- LOG.error("Empty ack semaphore");
- }
-
- ackset.add(tag);
-
- if (authEnabled) {
- ConcurrentHashMap<Long, Long> tmpMap = addrChallengeMap.get(responsePacket.getSocketAddress());
- if (tmpMap != null) {
- tmpMap.remove(tag);
- } else {
- LOG.warn("No such address in the ensemble configuration {}", responsePacket.getSocketAddress());
- }
- }
-
- if (ackstate != QuorumPeer.ServerState.LOOKING) {
- Notification outofsync = new Notification();
- outofsync.leader = responseBuffer.getLong();
- outofsync.zxid = responseBuffer.getLong();
- outofsync.epoch = responseBuffer.getLong();
- outofsync.state = ackstate;
- outofsync.addr = (InetSocketAddress) responsePacket.getSocketAddress();
-
- recvqueue.offer(outofsync);
- }
-
- break;
- // Default case
- default:
- LOG.warn("Received message of incorrect type {}", type);
- break;
- }
- }
- }
-
- }
-
- class WorkerSender extends ZooKeeperThread {
-
- Random rand;
- int maxAttempts;
- int ackWait = finalizeWait;
-
- /*
- * Receives a socket and max number of attempts as input
- */
-
- WorkerSender(int attempts) {
- super("WorkerSender");
- maxAttempts = attempts;
- rand = new Random(java.lang.Thread.currentThread().getId() + Time.currentElapsedTime());
- }
-
- long genChallenge() {
- byte[] buf = new byte[8];
-
- buf[0] = (byte) ((challengeCounter & 0xff000000) >>> 24);
- buf[1] = (byte) ((challengeCounter & 0x00ff0000) >>> 16);
- buf[2] = (byte) ((challengeCounter & 0x0000ff00) >>> 8);
- buf[3] = (byte) ((challengeCounter & 0x000000ff));
-
- challengeCounter++;
- int secret = rand.nextInt(java.lang.Integer.MAX_VALUE);
-
- buf[4] = (byte) ((secret & 0xff000000) >>> 24);
- buf[5] = (byte) ((secret & 0x00ff0000) >>> 16);
- buf[6] = (byte) ((secret & 0x0000ff00) >>> 8);
- buf[7] = (byte) ((secret & 0x000000ff));
-
- return (((long) (buf[0] & 0xFF)) << 56)
- + (((long) (buf[1] & 0xFF)) << 48)
- + (((long) (buf[2] & 0xFF)) << 40)
- + (((long) (buf[3] & 0xFF)) << 32)
- + (((long) (buf[4] & 0xFF)) << 24)
- + (((long) (buf[5] & 0xFF)) << 16)
- + (((long) (buf[6] & 0xFF)) << 8)
- + ((long) (buf[7] & 0xFF));
- }
-
- public void run() {
- while (true) {
- try {
- ToSend m = sendqueue.take();
- process(m);
- } catch (InterruptedException e) {
- break;
- }
-
- }
- }
-
- @SuppressFBWarnings(
- value = "RV_RETURN_VALUE_IGNORED",
- justification = "tryAcquire result not chacked, but it is not an issue")
- private void process(ToSend m) {
- int attempts = 0;
- byte[] zeroes;
- byte[] requestBytes = new byte[48];
- DatagramPacket requestPacket = new DatagramPacket(requestBytes, requestBytes.length);
- ByteBuffer requestBuffer = ByteBuffer.wrap(requestBytes);
-
- switch (m.type) {
- case 0:
- /*
- * Building challenge request packet to send
- */
- requestBuffer.clear();
- requestBuffer.putInt(ToSend.mType.crequest.ordinal());
- requestBuffer.putLong(m.tag);
- requestBuffer.putInt(m.state.ordinal());
- zeroes = new byte[32];
- requestBuffer.put(zeroes);
-
- requestPacket.setLength(48);
- try {
- requestPacket.setSocketAddress(m.addr);
- } catch (IllegalArgumentException e) {
- // Sun doesn't include the address that causes this
- // exception to be thrown, so we wrap the exception
- // in order to capture this critical detail.
- throw new IllegalArgumentException("Unable to set socket address on packet, msg:" + e.getMessage()
- + " with addr:" + m.addr, e);
- }
-
- try {
- if (challengeMap.get(m.tag) == null) {
- mySocket.send(requestPacket);
- }
- } catch (IOException e) {
- LOG.warn("Exception while sending challenge: ", e);
- }
-
- break;
- case 1:
- /*
- * Building challenge packet to send
- */
-
- long newChallenge;
- ConcurrentHashMap<Long, Long> tmpMap = addrChallengeMap.get(m.addr);
- if (tmpMap != null) {
- Long tmpLong = tmpMap.get(m.tag);
- if (tmpLong != null) {
- newChallenge = tmpLong;
- } else {
- newChallenge = genChallenge();
- }
-
- tmpMap.put(m.tag, newChallenge);
-
- requestBuffer.clear();
- requestBuffer.putInt(ToSend.mType.challenge.ordinal());
- requestBuffer.putLong(m.tag);
- requestBuffer.putInt(m.state.ordinal());
- requestBuffer.putLong(newChallenge);
- zeroes = new byte[24];
- requestBuffer.put(zeroes);
-
- requestPacket.setLength(48);
- try {
- requestPacket.setSocketAddress(m.addr);
- } catch (IllegalArgumentException e) {
- // Sun doesn't include the address that causes this
- // exception to be thrown, so we wrap the exception
- // in order to capture this critical detail.
- throw new IllegalArgumentException("Unable to set socket address on packet, msg:" + e.getMessage()
- + " with addr:" + m.addr, e);
- }
-
- try {
- mySocket.send(requestPacket);
- } catch (IOException e) {
- LOG.warn("Exception while sending challenge: ", e);
- }
- } else {
- LOG.error("Address is not in the configuration: {}", m.addr);
- }
-
- break;
- case 2:
-
- /*
- * Building notification packet to send
- */
-
- requestBuffer.clear();
- requestBuffer.putInt(m.type);
- requestBuffer.putLong(m.tag);
- requestBuffer.putInt(m.state.ordinal());
- requestBuffer.putLong(m.leader);
- requestBuffer.putLong(m.zxid);
- requestBuffer.putLong(m.epoch);
- zeroes = new byte[8];
- requestBuffer.put(zeroes);
-
- requestPacket.setLength(48);
- try {
- requestPacket.setSocketAddress(m.addr);
- } catch (IllegalArgumentException e) {
- // Sun doesn't include the address that causes this
- // exception to be thrown, so we wrap the exception
- // in order to capture this critical detail.
- throw new IllegalArgumentException("Unable to set socket address on packet, msg:" + e.getMessage()
- + " with addr:" + m.addr, e);
- }
-
- boolean myChallenge = false;
- boolean myAck = false;
-
- while (attempts < maxAttempts) {
- try {
- /*
- * Try to obtain a challenge only if does not have
- * one yet
- */
-
- if (!myChallenge && authEnabled) {
- ToSend crequest = new ToSend(
- ToSend.mType.crequest,
- m.tag,
- m.leader,
- m.zxid,
- m.epoch,
- QuorumPeer.ServerState.LOOKING,
- m.addr);
- sendqueue.offer(crequest);
-
- try {
- double timeout = ackWait * java.lang.Math.pow(2, attempts);
-
- Semaphore s = new Semaphore(0);
- synchronized (Messenger.this) {
- challengeMutex.put(m.tag, s);
- s.tryAcquire((long) timeout, TimeUnit.MILLISECONDS);
- myChallenge = challengeMap.containsKey(m.tag);
- }
- } catch (InterruptedException e) {
- LOG.warn("Challenge request exception: ", e);
- }
- }
-
- /*
- * If don't have challenge yet, skip sending
- * notification
- */
-
- if (authEnabled && !myChallenge) {
- attempts++;
- continue;
- }
-
- if (authEnabled) {
- requestBuffer.position(40);
- Long tmpLong = challengeMap.get(m.tag);
- if (tmpLong != null) {
- requestBuffer.putLong(tmpLong);
- } else {
- LOG.warn("No challenge with tag: {}", m.tag);
- }
- }
- mySocket.send(requestPacket);
- try {
- Semaphore s = new Semaphore(0);
- double timeout = ackWait * java.lang.Math.pow(10, attempts);
- ackMutex.put(m.tag, s);
- s.tryAcquire((int) timeout, TimeUnit.MILLISECONDS);
- } catch (InterruptedException e) {
- LOG.warn("Ack exception: ", e);
- }
-
- if (ackset.remove(m.tag)) {
- myAck = true;
- }
-
- } catch (IOException e) {
- LOG.warn("Sending exception: ", e);
- /*
- * Do nothing, just try again
- */
- }
- if (myAck) {
- /*
- * Received ack successfully, so return
- */
- challengeMap.remove(m.tag);
-
- return;
- } else {
- attempts++;
- }
- }
- /*
- * Return message to queue for another attempt later if
- * epoch hasn't changed.
- */
- if (m.epoch == logicalclock.get()) {
- challengeMap.remove(m.tag);
- sendqueue.offer(m);
- }
- break;
- case 3:
-
- requestBuffer.clear();
- requestBuffer.putInt(m.type);
- requestBuffer.putLong(m.tag);
- requestBuffer.putInt(m.state.ordinal());
- requestBuffer.putLong(m.leader);
- requestBuffer.putLong(m.zxid);
- requestBuffer.putLong(m.epoch);
-
- requestPacket.setLength(48);
- try {
- requestPacket.setSocketAddress(m.addr);
- } catch (IllegalArgumentException e) {
- // Sun doesn't include the address that causes this
- // exception to be thrown, so we wrap the exception
- // in order to capture this critical detail.
- throw new IllegalArgumentException("Unable to set socket address on packet, msg:" + e.getMessage()
- + " with addr:" + m.addr, e);
- }
-
- try {
- mySocket.send(requestPacket);
- } catch (IOException e) {
- LOG.warn("Exception while sending ack: ", e);
- }
- break;
- default:
- LOG.warn("unknown type {}", m.type);
- break;
- }
- }
-
- }
-
- Messenger(int threads, DatagramSocket s) {
- mySocket = s;
- ackset = Collections.newSetFromMap(new ConcurrentHashMap<Long, Boolean>());
- challengeMap = new ConcurrentHashMap<Long, Long>();
- challengeMutex = new ConcurrentHashMap<Long, Semaphore>();
- ackMutex = new ConcurrentHashMap<Long, Semaphore>();
- addrChallengeMap = new ConcurrentHashMap<InetSocketAddress, ConcurrentHashMap<Long, Long>>();
- lastProposedLeader = 0;
- lastProposedZxid = 0;
- lastEpoch = 0;
-
- for (int i = 0; i < threads; ++i) {
- Thread t = new Thread(new WorkerSender(3), "WorkerSender Thread: " + (i + 1));
- t.setDaemon(true);
- t.start();
- }
-
- for (QuorumServer server : self.getVotingView().values()) {
- InetAddress address = server.addr.getReachableOrOne().getAddress();
- InetSocketAddress saddr = new InetSocketAddress(address, port);
- addrChallengeMap.put(saddr, new ConcurrentHashMap<Long, Long>());
- }
-
- Thread t = new Thread(new WorkerReceiver(s, this), "WorkerReceiver Thread");
- t.start();
- }
-
- }
-
- QuorumPeer self;
- int port;
- AtomicLong logicalclock = new AtomicLong(); /* Election instance */
- DatagramSocket mySocket;
- long proposedLeader;
- long proposedZxid;
-
- public AuthFastLeaderElection(QuorumPeer self, boolean auth) {
- this.authEnabled = auth;
- starter(self);
- }
-
- public AuthFastLeaderElection(QuorumPeer self) {
- starter(self);
- }
-
- private void starter(QuorumPeer self) {
- this.self = self;
- port = self.getVotingView().get(self.getId()).electionAddr.getAllPorts().get(0);
- proposedLeader = -1;
- proposedZxid = -1;
-
- try {
- mySocket = new DatagramSocket(port);
- // mySocket.setSoTimeout(20000);
- } catch (SocketException e1) {
- e1.printStackTrace();
- throw new RuntimeException();
- }
- sendqueue = new LinkedBlockingQueue<ToSend>(2 * self.getVotingView().size());
- recvqueue = new LinkedBlockingQueue<Notification>(2 * self.getVotingView().size());
- new Messenger(self.getVotingView().size() * 2, mySocket);
- }
-
- private void leaveInstance() {
- logicalclock.incrementAndGet();
- }
-
- private void sendNotifications() {
- for (QuorumServer server : self.getView().values()) {
-
- InetSocketAddress address = self.getView().get(server.id).electionAddr.getReachableOrOne();
- ToSend notmsg = new ToSend(
- ToSend.mType.notification,
- AuthFastLeaderElection.sequencer++,
- proposedLeader,
- proposedZxid,
- logicalclock.get(),
- QuorumPeer.ServerState.LOOKING,
- address);
-
- sendqueue.offer(notmsg);
- }
- }
-
- private boolean totalOrderPredicate(long id, long zxid) {
- return (zxid > proposedZxid) || ((zxid == proposedZxid) && (id > proposedLeader));
-
- }
-
- private boolean termPredicate(Map<InetSocketAddress, Vote> votes, long l, long zxid) {
-
- Collection<Vote> votesCast = votes.values();
- int count = 0;
- /*
- * First make the views consistent. Sometimes peers will have different
- * zxids for a server depending on timing.
- */
- for (Vote v : votesCast) {
- if ((v.getId() == l) && (v.getZxid() == zxid)) {
- count++;
- }
- }
-
- return count > (self.getVotingView().size() / 2);
-
- }
-
- /**
- * There is nothing to shutdown in this implementation of
- * leader election, so we simply have an empty method.
- */
- public void shutdown() {
- }
-
- /**
- * Invoked in QuorumPeer to find or elect a new leader.
- *
- * @throws InterruptedException
- */
- public Vote lookForLeader() throws InterruptedException {
- try {
- self.jmxLeaderElectionBean = new LeaderElectionBean();
- MBeanRegistry.getInstance().register(self.jmxLeaderElectionBean, self.jmxLocalPeerBean);
- } catch (Exception e) {
- LOG.warn("Failed to register with JMX", e);
- self.jmxLeaderElectionBean = null;
- }
-
- try {
- HashMap<InetSocketAddress, Vote> recvset = new HashMap<InetSocketAddress, Vote>();
-
- HashMap<InetSocketAddress, Vote> outofelection = new HashMap<InetSocketAddress, Vote>();
-
- logicalclock.incrementAndGet();
-
- proposedLeader = self.getId();
- proposedZxid = self.getLastLoggedZxid();
-
- LOG.info("Election tally");
- sendNotifications();
-
- /*
- * Loop in which we exchange notifications until we find a leader
- */
-
- while (self.getPeerState() == ServerState.LOOKING) {
- /*
- * Remove next notification from queue, times out after 2 times
- * the termination time
- */
- Notification n = recvqueue.poll(2 * finalizeWait, TimeUnit.MILLISECONDS);
-
- /*
- * Sends more notifications if haven't received enough.
- * Otherwise processes new notification.
- */
- if (n == null) {
- if (((!outofelection.isEmpty()) || (recvset.size() > 1))) {
- sendNotifications();
- }
- } else {
- switch (n.state) {
- case LOOKING:
- if (n.epoch > logicalclock.get()) {
- logicalclock.set(n.epoch);
- recvset.clear();
- if (totalOrderPredicate(n.leader, n.zxid)) {
- proposedLeader = n.leader;
- proposedZxid = n.zxid;
- }
- sendNotifications();
- } else if (n.epoch < logicalclock.get()) {
- break;
- } else if (totalOrderPredicate(n.leader, n.zxid)) {
- proposedLeader = n.leader;
- proposedZxid = n.zxid;
-
- sendNotifications();
- }
-
- recvset.put(n.addr, new Vote(n.leader, n.zxid));
-
- // If have received from all nodes, then terminate
- if (self.getVotingView().size() == recvset.size()) {
- self.setPeerState((proposedLeader == self.getId())
- ? ServerState.LEADING
- : ServerState.FOLLOWING);
- // if (self.state == ServerState.FOLLOWING) {
- // Thread.sleep(100);
- // }
- leaveInstance();
- return new Vote(proposedLeader, proposedZxid);
-
- } else if (termPredicate(recvset, proposedLeader, proposedZxid)) {
- // Otherwise, wait for a fixed amount of time
- LOG.info("Passed predicate");
- Thread.sleep(finalizeWait);
-
- // Notification probe = recvqueue.peek();
-
- // Verify if there is any change in the proposed leader
- while ((!recvqueue.isEmpty())
- && !totalOrderPredicate(recvqueue.peek().leader, recvqueue.peek().zxid)) {
- recvqueue.poll();
- }
- if (recvqueue.isEmpty()) {
- // LOG.warn("Proposed leader: " +
- // proposedLeader);
- self.setPeerState((proposedLeader == self.getId())
- ? ServerState.LEADING
- : ServerState.FOLLOWING);
-
- leaveInstance();
- return new Vote(proposedLeader, proposedZxid);
- }
- }
- break;
- case LEADING:
- outofelection.put(n.addr, new Vote(n.leader, n.zxid));
-
- if (termPredicate(outofelection, n.leader, n.zxid)) {
-
- self.setPeerState((n.leader == self.getId()) ? ServerState.LEADING : ServerState.FOLLOWING);
-
- leaveInstance();
- return new Vote(n.leader, n.zxid);
- }
- break;
- case FOLLOWING:
- outofelection.put(n.addr, new Vote(n.leader, n.zxid));
-
- if (termPredicate(outofelection, n.leader, n.zxid)) {
-
- self.setPeerState((n.leader == self.getId()) ? ServerState.LEADING : ServerState.FOLLOWING);
-
- leaveInstance();
- return new Vote(n.leader, n.zxid);
- }
- break;
- default:
- break;
- }
- }
- }
-
- return null;
- } finally {
- try {
- if (self.jmxLeaderElectionBean != null) {
- MBeanRegistry.getInstance().unregister(self.jmxLeaderElectionBean);
- }
- } catch (Exception e) {
- LOG.warn("Failed to unregister with JMX", e);
- }
- self.jmxLeaderElectionBean = null;
- }
- }
-
-}
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeer.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeer.java
index dba5b27..c6375cd 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeer.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeer.java
@@ -1227,11 +1227,9 @@ public class QuorumPeer extends ZooKeeperThread implements QuorumStats.Provider
//TODO: use a factory rather than a switch
switch (electionAlgorithm) {
case 1:
- le = new AuthFastLeaderElection(this);
- break;
+ throw new UnsupportedOperationException("Election Algorithm 1 is not supported.");
case 2:
- le = new AuthFastLeaderElection(this, true);
- break;
+ throw new UnsupportedOperationException("Election Algorithm 2 is not supported.");
case 3:
QuorumCnxManager qcm = createCnxnManager();
QuorumCnxManager oldQcm = qcmRef.getAndSet(qcm);
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeerConfig.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeerConfig.java
index 459a057..99fc2ed 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeerConfig.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeerConfig.java
@@ -312,8 +312,8 @@ public class QuorumPeerConfig {
connectToLearnerMasterLimit = Integer.parseInt(value);
} else if (key.equals("electionAlg")) {
electionAlg = Integer.parseInt(value);
- if (electionAlg != 1 && electionAlg != 2 && electionAlg != 3) {
- throw new ConfigException("Invalid electionAlg value. Only 1, 2, 3 are supported.");
+ if (electionAlg != 3) {
+ throw new ConfigException("Invalid electionAlg value. Only 3 is supported.");
}
} else if (key.equals("quorumListenOnAllIPs")) {
quorumListenOnAllIPs = Boolean.parseBoolean(value);
diff --git a/zookeeper-server/src/test/resources/findbugsExcludeFile.xml b/zookeeper-server/src/test/resources/findbugsExcludeFile.xml
index 28ac468..2a352cc 100644
--- a/zookeeper-server/src/test/resources/findbugsExcludeFile.xml
+++ b/zookeeper-server/src/test/resources/findbugsExcludeFile.xml
@@ -117,11 +117,6 @@
<Class name="org.apache.zookeeper.server.quorum.LearnerSessionTracker"/>
<Bug code="UrF"/>
</Match>
- <Match>
- <Class name="org.apache.zookeeper.server.quorum.AuthFastLeaderElection$Messenger$WorkerSender"/>
- <Method name="process"/>
- <Bug code="RV"/>
- </Match>
<!-- these are old classes just for upgrading and should go away -->
<Match>