You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by ph...@apache.org on 2014/07/20 04:50:21 UTC
svn commit: r1612015 - in /zookeeper/trunk: ./
src/java/main/org/apache/zookeeper/server/quorum/
Author: phunt
Date: Sun Jul 20 02:50:20 2014
New Revision: 1612015
URL: http://svn.apache.org/r1612015
Log:
ZOOKEEPER-1972. Fix invalid volatile long/int increment (++) (Hongchao Deng via phunt)
Modified:
zookeeper/trunk/CHANGES.txt
zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/AuthFastLeaderElection.java
zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FastLeaderElection.java
zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Leader.java
zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LearnerHandler.java
zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java
Modified: zookeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/zookeeper/trunk/CHANGES.txt?rev=1612015&r1=1612014&r2=1612015&view=diff
==============================================================================
--- zookeeper/trunk/CHANGES.txt (original)
+++ zookeeper/trunk/CHANGES.txt Sun Jul 20 02:50:20 2014
@@ -711,6 +711,9 @@ BUGFIXES:
ZOOKEEPER-1807. Observers spam each other creating connections to the
election addr (Alex Shraer via fpj)
+ ZOOKEEPER-1972. Fix invalid volatile long/int increment (++)
+ (Hongchao Deng via phunt)
+
IMPROVEMENTS:
ZOOKEEPER-1170. Fix compiler (eclipse) warnings: unused imports,
Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/AuthFastLeaderElection.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/AuthFastLeaderElection.java?rev=1612015&r1=1612014&r2=1612015&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/AuthFastLeaderElection.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/AuthFastLeaderElection.java Sun Jul 20 02:50:20 2014
@@ -34,6 +34,7 @@ import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.Random;
+import java.util.concurrent.atomic.AtomicLong;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -284,7 +285,7 @@ public class AuthFastLeaderElection impl
// Receive challenge request
ToSend c = new ToSend(ToSend.mType.challenge, tag,
current.getId(), current.getZxid(),
- logicalclock, self.getPeerState(),
+ logicalclock.get(), self.getPeerState(),
(InetSocketAddress) responsePacket
.getSocketAddress());
sendqueue.offer(c);
@@ -328,7 +329,7 @@ public class AuthFastLeaderElection impl
ToSend a = new ToSend(ToSend.mType.ack,
tag, current.getId(),
current.getZxid(),
- logicalclock, self.getPeerState(),
+ logicalclock.get(), self.getPeerState(),
addr);
sendqueue.offer(a);
@@ -347,7 +348,7 @@ public class AuthFastLeaderElection impl
ToSend a = new ToSend(ToSend.mType.ack, tag,
current.getId(), current.getZxid(),
- logicalclock, self.getPeerState(),
+ logicalclock.get(), self.getPeerState(),
(InetSocketAddress) responsePacket
.getSocketAddress());
@@ -662,7 +663,7 @@ public class AuthFastLeaderElection impl
* Return message to queue for another attempt later if
* epoch hasn't changed.
*/
- if (m.epoch == logicalclock) {
+ if (m.epoch == logicalclock.get()) {
challengeMap.remove(m.tag);
sendqueue.offer(m);
}
@@ -734,7 +735,7 @@ public class AuthFastLeaderElection impl
QuorumPeer self;
int port;
- volatile long logicalclock; /* Election instance */
+ AtomicLong logicalclock = new AtomicLong(); /* Election instance */
DatagramSocket mySocket;
long proposedLeader;
long proposedZxid;
@@ -769,7 +770,7 @@ public class AuthFastLeaderElection impl
}
private void leaveInstance() {
- logicalclock++;
+ logicalclock.incrementAndGet();
}
private void sendNotifications() {
@@ -777,7 +778,7 @@ public class AuthFastLeaderElection impl
ToSend notmsg = new ToSend(ToSend.mType.notification,
AuthFastLeaderElection.sequencer++, proposedLeader,
- proposedZxid, logicalclock, QuorumPeer.ServerState.LOOKING,
+ proposedZxid, logicalclock.get(), QuorumPeer.ServerState.LOOKING,
self.getView().get(server.id).electionAddr);
sendqueue.offer(notmsg);
@@ -843,7 +844,7 @@ public class AuthFastLeaderElection impl
HashMap<InetSocketAddress, Vote> outofelection =
new HashMap<InetSocketAddress, Vote>();
- logicalclock++;
+ logicalclock.incrementAndGet();
proposedLeader = self.getId();
proposedZxid = self.getLastLoggedZxid();
@@ -873,15 +874,15 @@ public class AuthFastLeaderElection impl
} else
switch (n.state) {
case LOOKING:
- if (n.epoch > logicalclock) {
- logicalclock = n.epoch;
+ if (n.epoch > logicalclock.get()) {
+ logicalclock.set( n.epoch );
recvset.clear();
if (totalOrderPredicate(n.leader, n.zxid)) {
proposedLeader = n.leader;
proposedZxid = n.zxid;
}
sendNotifications();
- } else if (n.epoch < logicalclock) {
+ } else if (n.epoch < logicalclock.get()) {
break;
} else if (totalOrderPredicate(n.leader, n.zxid)) {
proposedLeader = n.leader;
Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FastLeaderElection.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FastLeaderElection.java?rev=1612015&r1=1612014&r2=1612015&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FastLeaderElection.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FastLeaderElection.java Sun Jul 20 02:50:20 2014
@@ -26,6 +26,7 @@ import java.util.HashSet;
import java.util.Map;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
import org.apache.zookeeper.jmx.MBeanRegistry;
import org.apache.zookeeper.server.ZooKeeperThread;
@@ -321,7 +322,7 @@ public class FastLeaderElection implemen
ToSend notmsg = new ToSend(ToSend.mType.notification,
current.getId(),
current.getZxid(),
- logicalclock,
+ logicalclock.get(),
self.getPeerState(),
response.sid,
current.getPeerEpoch(),
@@ -382,13 +383,13 @@ public class FastLeaderElection implemen
* lagging behind.
*/
if((ackstate == QuorumPeer.ServerState.LOOKING)
- && (n.electionEpoch < logicalclock)){
+ && (n.electionEpoch < logicalclock.get())){
Vote v = getVote();
QuorumVerifier qv = self.getQuorumVerifier();
ToSend notmsg = new ToSend(ToSend.mType.notification,
v.getId(),
v.getZxid(),
- logicalclock,
+ logicalclock.get(),
self.getPeerState(),
response.sid,
v.getPeerEpoch(),
@@ -526,7 +527,7 @@ public class FastLeaderElection implemen
QuorumPeer self;
Messenger messenger;
- volatile long logicalclock; /* Election instance */
+ AtomicLong logicalclock = new AtomicLong(); /* Election instance */
long proposedLeader;
long proposedZxid;
long proposedEpoch;
@@ -536,7 +537,7 @@ public class FastLeaderElection implemen
* Returns the current vlue of the logical clock counter
*/
public long getLogicalClock(){
- return logicalclock;
+ return logicalclock.get();
}
static ByteBuffer buildMsg(int state,
@@ -663,13 +664,13 @@ public class FastLeaderElection implemen
ToSend notmsg = new ToSend(ToSend.mType.notification,
proposedLeader,
proposedZxid,
- logicalclock,
+ logicalclock.get(),
QuorumPeer.ServerState.LOOKING,
sid,
proposedEpoch, qv.toString().getBytes());
if(LOG.isDebugEnabled()){
LOG.debug("Sending Notification: " + proposedLeader + " (n.leader), 0x" +
- Long.toHexString(proposedZxid) + " (n.zxid), 0x" + Long.toHexString(logicalclock) +
+ Long.toHexString(proposedZxid) + " (n.zxid), 0x" + Long.toHexString(logicalclock.get()) +
" (n.round), " + sid + " (recipient), " + self.getId() +
" (myid), 0x" + Long.toHexString(proposedEpoch) + " (n.peerEpoch)");
}
@@ -776,7 +777,7 @@ public class FastLeaderElection implemen
if(leader != self.getId()){
if(votes.get(leader) == null) predicate = false;
else if(votes.get(leader).getState() != ServerState.LEADING) predicate = false;
- } else if(logicalclock != electionEpoch) {
+ } else if(logicalclock.get() != electionEpoch) {
predicate = false;
}
@@ -880,7 +881,7 @@ public class FastLeaderElection implemen
int notTimeout = finalizeWait;
synchronized(this){
- logicalclock++;
+ logicalclock.incrementAndGet();
updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch());
}
@@ -928,8 +929,8 @@ public class FastLeaderElection implemen
switch (n.state) {
case LOOKING:
// If notification > current, replace and send messages out
- if (n.electionEpoch > logicalclock) {
- logicalclock = n.electionEpoch;
+ if (n.electionEpoch > logicalclock.get()) {
+ logicalclock.set(n.electionEpoch);
recvset.clear();
if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
getInitId(), getInitLastLoggedZxid(), getPeerEpoch())) {
@@ -940,11 +941,11 @@ public class FastLeaderElection implemen
getPeerEpoch());
}
sendNotifications();
- } else if (n.electionEpoch < logicalclock) {
+ } else if (n.electionEpoch < logicalclock.get()) {
if(LOG.isDebugEnabled()){
LOG.debug("Notification election epoch is smaller than logicalclock. n.electionEpoch = 0x"
+ Long.toHexString(n.electionEpoch)
- + ", logicalclock=0x" + Long.toHexString(logicalclock));
+ + ", logicalclock=0x" + Long.toHexString(logicalclock.get()));
}
break;
} else if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
@@ -964,7 +965,7 @@ public class FastLeaderElection implemen
if (termPredicate(recvset,
new Vote(proposedLeader, proposedZxid,
- logicalclock, proposedEpoch))) {
+ logicalclock.get(), proposedEpoch))) {
// Verify if there is any change in the proposed leader
while((n = recvqueue.poll(finalizeWait,
@@ -1000,7 +1001,7 @@ public class FastLeaderElection implemen
* Consider all notifications from the same epoch
* together.
*/
- if(n.electionEpoch == logicalclock){
+ if(n.electionEpoch == logicalclock.get()){
recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch));
if(termPredicate(recvset, new Vote(n.leader,
n.zxid, n.electionEpoch, n.peerEpoch, n.state))
@@ -1034,7 +1035,7 @@ public class FastLeaderElection implemen
IGNOREVALUE, IGNOREVALUE, n.peerEpoch, n.state))
&& checkLeader(outofelection, n.leader, IGNOREVALUE)) {
synchronized(this){
- logicalclock = n.electionEpoch;
+ logicalclock.set(n.electionEpoch);
self.setPeerState((n.leader == self.getId()) ?
ServerState.LEADING: learningState());
}
Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Leader.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Leader.java?rev=1612015&r1=1612014&r2=1612015&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Leader.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Leader.java Sun Jul 20 02:50:20 2014
@@ -414,7 +414,7 @@ public class Leader {
zk.registerJMX(new LeaderBean(this, zk), self.jmxLocalPeerBean);
try {
- self.tick = 0;
+ self.tick.set(0);
zk.loadData();
leaderStateSummary = new StateSummary(self.getCurrentEpoch(), zk.getLastProcessedZxid());
@@ -558,7 +558,7 @@ public class Leader {
}
if (!tickSkip) {
- self.tick++;
+ self.tick.incrementAndGet();
}
// We use an instance of SyncedLearnerTracker to
Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LearnerHandler.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LearnerHandler.java?rev=1612015&r1=1612014&r2=1612015&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LearnerHandler.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LearnerHandler.java Sun Jul 20 02:50:20 2014
@@ -322,7 +322,7 @@ public class LearnerHandler extends ZooK
@Override
public void run() {
try {
- tickOfNextAckDeadline = leader.self.tick
+ tickOfNextAckDeadline = leader.self.tick.get()
+ leader.self.initLimit + leader.self.syncLimit;
ia = BinaryInputArchive.getArchive(new BufferedInputStream(sock
@@ -499,7 +499,7 @@ public class LearnerHandler extends ZooK
if (LOG.isTraceEnabled()) {
ZooTrace.logQuorumPacket(LOG, traceMask, 'i', qp);
}
- tickOfNextAckDeadline = leader.self.tick + leader.self.syncLimit;
+ tickOfNextAckDeadline = leader.self.tick.get() + leader.self.syncLimit;
ByteBuffer bb;
@@ -931,7 +931,7 @@ public class LearnerHandler extends ZooK
public boolean synced() {
return isAlive()
- && leader.self.tick <= tickOfNextAckDeadline;
+ && leader.self.tick.get() <= tickOfNextAckDeadline;
}
/**
Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java?rev=1612015&r1=1612014&r2=1612015&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java Sun Jul 20 02:50:20 2014
@@ -43,6 +43,7 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.Properties;
import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
import org.apache.zookeeper.KeeperException.NoNodeException;
@@ -434,7 +435,7 @@ public class QuorumPeer extends ZooKeepe
/**
* The current tick
*/
- protected volatile int tick;
+ protected AtomicInteger tick = new AtomicInteger();
/**
* Whether or not to listen on all IPs for the two quorum ports
@@ -1257,7 +1258,7 @@ public class QuorumPeer extends ZooKeepe
* Get the current tick
*/
public int getTick() {
- return tick;
+ return tick.get();
}
public QuorumVerifier configFromString(String s) throws IOException, ConfigException{