You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by ph...@apache.org on 2014/07/20 04:50:21 UTC

svn commit: r1612015 - in /zookeeper/trunk: ./ src/java/main/org/apache/zookeeper/server/quorum/

Author: phunt
Date: Sun Jul 20 02:50:20 2014
New Revision: 1612015

URL: http://svn.apache.org/r1612015
Log:
ZOOKEEPER-1972. Fix invalid volatile long/int increment (++) (Hongchao Deng via phunt)

Modified:
    zookeeper/trunk/CHANGES.txt
    zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/AuthFastLeaderElection.java
    zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FastLeaderElection.java
    zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Leader.java
    zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LearnerHandler.java
    zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java

Modified: zookeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/zookeeper/trunk/CHANGES.txt?rev=1612015&r1=1612014&r2=1612015&view=diff
==============================================================================
--- zookeeper/trunk/CHANGES.txt (original)
+++ zookeeper/trunk/CHANGES.txt Sun Jul 20 02:50:20 2014
@@ -711,6 +711,9 @@ BUGFIXES:
   ZOOKEEPER-1807. Observers spam each other creating connections to the
   election addr (Alex Shraer via fpj)
 
+  ZOOKEEPER-1972. Fix invalid volatile long/int increment (++)
+  (Hongchao Deng via phunt)
+
 IMPROVEMENTS:
 
   ZOOKEEPER-1170. Fix compiler (eclipse) warnings: unused imports,

Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/AuthFastLeaderElection.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/AuthFastLeaderElection.java?rev=1612015&r1=1612014&r2=1612015&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/AuthFastLeaderElection.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/AuthFastLeaderElection.java Sun Jul 20 02:50:20 2014
@@ -34,6 +34,7 @@ import java.util.concurrent.Semaphore;
 
 import java.util.concurrent.TimeUnit;
 import java.util.Random;
+import java.util.concurrent.atomic.AtomicLong;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -284,7 +285,7 @@ public class AuthFastLeaderElection impl
                         // Receive challenge request
                         ToSend c = new ToSend(ToSend.mType.challenge, tag,
                                 current.getId(), current.getZxid(),
-                                logicalclock, self.getPeerState(),
+                                logicalclock.get(), self.getPeerState(),
                                 (InetSocketAddress) responsePacket
                                         .getSocketAddress());
                         sendqueue.offer(c);
@@ -328,7 +329,7 @@ public class AuthFastLeaderElection impl
                                         ToSend a = new ToSend(ToSend.mType.ack,
                                                 tag, current.getId(),
                                                 current.getZxid(),
-                                                logicalclock, self.getPeerState(),
+                                                logicalclock.get(), self.getPeerState(),
                                                 addr);
 
                                         sendqueue.offer(a);
@@ -347,7 +348,7 @@ public class AuthFastLeaderElection impl
 
                             ToSend a = new ToSend(ToSend.mType.ack, tag,
                                     current.getId(), current.getZxid(),
-                                    logicalclock, self.getPeerState(),
+                                    logicalclock.get(), self.getPeerState(),
                                     (InetSocketAddress) responsePacket
                                             .getSocketAddress());
 
@@ -662,7 +663,7 @@ public class AuthFastLeaderElection impl
                      * Return message to queue for another attempt later if
                      * epoch hasn't changed.
                      */
-                    if (m.epoch == logicalclock) {
+                    if (m.epoch == logicalclock.get()) {
                         challengeMap.remove(m.tag);
                         sendqueue.offer(m);
                     }
@@ -734,7 +735,7 @@ public class AuthFastLeaderElection impl
 
     QuorumPeer self;
     int port;
-    volatile long logicalclock; /* Election instance */
+    AtomicLong logicalclock = new AtomicLong(); /* Election instance */
     DatagramSocket mySocket;
     long proposedLeader;
     long proposedZxid;
@@ -769,7 +770,7 @@ public class AuthFastLeaderElection impl
     }
 
     private void leaveInstance() {
-        logicalclock++;
+        logicalclock.incrementAndGet();
     }
 
     private void sendNotifications() {
@@ -777,7 +778,7 @@ public class AuthFastLeaderElection impl
 
             ToSend notmsg = new ToSend(ToSend.mType.notification,
                     AuthFastLeaderElection.sequencer++, proposedLeader,
-                    proposedZxid, logicalclock, QuorumPeer.ServerState.LOOKING,
+                    proposedZxid, logicalclock.get(), QuorumPeer.ServerState.LOOKING,
                     self.getView().get(server.id).electionAddr);
 
             sendqueue.offer(notmsg);
@@ -843,7 +844,7 @@ public class AuthFastLeaderElection impl
             HashMap<InetSocketAddress, Vote> outofelection = 
                 new HashMap<InetSocketAddress, Vote>();
     
-            logicalclock++;
+            logicalclock.incrementAndGet();
     
             proposedLeader = self.getId();
             proposedZxid = self.getLastLoggedZxid();
@@ -873,15 +874,15 @@ public class AuthFastLeaderElection impl
                 } else
                     switch (n.state) {
                     case LOOKING:
-                        if (n.epoch > logicalclock) {
-                            logicalclock = n.epoch;
+                        if (n.epoch > logicalclock.get()) {
+                            logicalclock.set( n.epoch );
                             recvset.clear();
                             if (totalOrderPredicate(n.leader, n.zxid)) {
                                 proposedLeader = n.leader;
                                 proposedZxid = n.zxid;
                             }
                             sendNotifications();
-                        } else if (n.epoch < logicalclock) {
+                        } else if (n.epoch < logicalclock.get()) {
                             break;
                         } else if (totalOrderPredicate(n.leader, n.zxid)) {
                             proposedLeader = n.leader;

Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FastLeaderElection.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FastLeaderElection.java?rev=1612015&r1=1612014&r2=1612015&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FastLeaderElection.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FastLeaderElection.java Sun Jul 20 02:50:20 2014
@@ -26,6 +26,7 @@ import java.util.HashSet;
 import java.util.Map;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.zookeeper.jmx.MBeanRegistry;
 import org.apache.zookeeper.server.ZooKeeperThread;
@@ -321,7 +322,7 @@ public class FastLeaderElection implemen
                             ToSend notmsg = new ToSend(ToSend.mType.notification,
                                     current.getId(),
                                     current.getZxid(),
-                                    logicalclock,
+                                    logicalclock.get(),
                                     self.getPeerState(),
                                     response.sid,
                                     current.getPeerEpoch(),
@@ -382,13 +383,13 @@ public class FastLeaderElection implemen
                                  * lagging behind.
                                  */
                                 if((ackstate == QuorumPeer.ServerState.LOOKING)
-                                        && (n.electionEpoch < logicalclock)){
+                                        && (n.electionEpoch < logicalclock.get())){
                                     Vote v = getVote();
                                     QuorumVerifier qv = self.getQuorumVerifier();
                                     ToSend notmsg = new ToSend(ToSend.mType.notification,
                                             v.getId(),
                                             v.getZxid(),
-                                            logicalclock,
+                                            logicalclock.get(),
                                             self.getPeerState(),
                                             response.sid,
                                             v.getPeerEpoch(),
@@ -526,7 +527,7 @@ public class FastLeaderElection implemen
 
     QuorumPeer self;
     Messenger messenger;
-    volatile long logicalclock; /* Election instance */
+    AtomicLong logicalclock = new AtomicLong(); /* Election instance */
     long proposedLeader;
     long proposedZxid;
     long proposedEpoch;
@@ -536,7 +537,7 @@ public class FastLeaderElection implemen
      * Returns the current vlue of the logical clock counter
      */
     public long getLogicalClock(){
-        return logicalclock;
+        return logicalclock.get();
     }
 
     static ByteBuffer buildMsg(int state,
@@ -663,13 +664,13 @@ public class FastLeaderElection implemen
             ToSend notmsg = new ToSend(ToSend.mType.notification,
                     proposedLeader,
                     proposedZxid,
-                    logicalclock,
+                    logicalclock.get(),
                     QuorumPeer.ServerState.LOOKING,
                     sid,
                     proposedEpoch, qv.toString().getBytes());
             if(LOG.isDebugEnabled()){
                 LOG.debug("Sending Notification: " + proposedLeader + " (n.leader), 0x"  +
-                      Long.toHexString(proposedZxid) + " (n.zxid), 0x" + Long.toHexString(logicalclock)  +
+                      Long.toHexString(proposedZxid) + " (n.zxid), 0x" + Long.toHexString(logicalclock.get())  +
                       " (n.round), " + sid + " (recipient), " + self.getId() +
                       " (myid), 0x" + Long.toHexString(proposedEpoch) + " (n.peerEpoch)");
             }
@@ -776,7 +777,7 @@ public class FastLeaderElection implemen
         if(leader != self.getId()){
             if(votes.get(leader) == null) predicate = false;
             else if(votes.get(leader).getState() != ServerState.LEADING) predicate = false;
-        } else if(logicalclock != electionEpoch) {
+        } else if(logicalclock.get() != electionEpoch) {
             predicate = false;
         }
 
@@ -880,7 +881,7 @@ public class FastLeaderElection implemen
             int notTimeout = finalizeWait;
 
             synchronized(this){
-                logicalclock++;
+                logicalclock.incrementAndGet();
                 updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch());
             }
 
@@ -928,8 +929,8 @@ public class FastLeaderElection implemen
                     switch (n.state) {
                     case LOOKING:
                         // If notification > current, replace and send messages out
-                        if (n.electionEpoch > logicalclock) {
-                            logicalclock = n.electionEpoch;
+                        if (n.electionEpoch > logicalclock.get()) {
+                            logicalclock.set(n.electionEpoch);
                             recvset.clear();
                             if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
                                     getInitId(), getInitLastLoggedZxid(), getPeerEpoch())) {
@@ -940,11 +941,11 @@ public class FastLeaderElection implemen
                                         getPeerEpoch());
                             }
                             sendNotifications();
-                        } else if (n.electionEpoch < logicalclock) {
+                        } else if (n.electionEpoch < logicalclock.get()) {
                             if(LOG.isDebugEnabled()){
                                 LOG.debug("Notification election epoch is smaller than logicalclock. n.electionEpoch = 0x"
                                         + Long.toHexString(n.electionEpoch)
-                                        + ", logicalclock=0x" + Long.toHexString(logicalclock));
+                                        + ", logicalclock=0x" + Long.toHexString(logicalclock.get()));
                             }
                             break;
                         } else if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
@@ -964,7 +965,7 @@ public class FastLeaderElection implemen
 
                         if (termPredicate(recvset,
                                 new Vote(proposedLeader, proposedZxid,
-                                        logicalclock, proposedEpoch))) {
+                                        logicalclock.get(), proposedEpoch))) {
 
                             // Verify if there is any change in the proposed leader
                             while((n = recvqueue.poll(finalizeWait,
@@ -1000,7 +1001,7 @@ public class FastLeaderElection implemen
                          * Consider all notifications from the same epoch
                          * together.
                          */
-                        if(n.electionEpoch == logicalclock){
+                        if(n.electionEpoch == logicalclock.get()){
                             recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch));
                             if(termPredicate(recvset, new Vote(n.leader,
                                             n.zxid, n.electionEpoch, n.peerEpoch, n.state))
@@ -1034,7 +1035,7 @@ public class FastLeaderElection implemen
                                 IGNOREVALUE, IGNOREVALUE, n.peerEpoch, n.state))
                                 && checkLeader(outofelection, n.leader, IGNOREVALUE)) {
                             synchronized(this){
-                                logicalclock = n.electionEpoch;
+                                logicalclock.set(n.electionEpoch);
                                 self.setPeerState((n.leader == self.getId()) ?
                                         ServerState.LEADING: learningState());
                             }

Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Leader.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Leader.java?rev=1612015&r1=1612014&r2=1612015&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Leader.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Leader.java Sun Jul 20 02:50:20 2014
@@ -414,7 +414,7 @@ public class Leader {
         zk.registerJMX(new LeaderBean(this, zk), self.jmxLocalPeerBean);
 
         try {
-            self.tick = 0;
+            self.tick.set(0);
             zk.loadData();
 
             leaderStateSummary = new StateSummary(self.getCurrentEpoch(), zk.getLastProcessedZxid());
@@ -558,7 +558,7 @@ public class Leader {
                     }
 
                     if (!tickSkip) {
-                        self.tick++;
+                        self.tick.incrementAndGet();
                     }
 
                     // We use an instance of SyncedLearnerTracker to

Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LearnerHandler.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LearnerHandler.java?rev=1612015&r1=1612014&r2=1612015&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LearnerHandler.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LearnerHandler.java Sun Jul 20 02:50:20 2014
@@ -322,7 +322,7 @@ public class LearnerHandler extends ZooK
     @Override
     public void run() {
         try {
-            tickOfNextAckDeadline = leader.self.tick
+            tickOfNextAckDeadline = leader.self.tick.get()
                     + leader.self.initLimit + leader.self.syncLimit;
 
             ia = BinaryInputArchive.getArchive(new BufferedInputStream(sock
@@ -499,7 +499,7 @@ public class LearnerHandler extends ZooK
                 if (LOG.isTraceEnabled()) {
                     ZooTrace.logQuorumPacket(LOG, traceMask, 'i', qp);
                 }
-                tickOfNextAckDeadline = leader.self.tick + leader.self.syncLimit;
+                tickOfNextAckDeadline = leader.self.tick.get() + leader.self.syncLimit;
 
 
                 ByteBuffer bb;
@@ -931,7 +931,7 @@ public class LearnerHandler extends ZooK
 
     public boolean synced() {
         return isAlive()
-        && leader.self.tick <= tickOfNextAckDeadline;
+        && leader.self.tick.get() <= tickOfNextAckDeadline;
     }
     
     /**

Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java?rev=1612015&r1=1612014&r2=1612015&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java Sun Jul 20 02:50:20 2014
@@ -43,6 +43,7 @@ import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Properties;
 import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
 
 
 import org.apache.zookeeper.KeeperException.NoNodeException;
@@ -434,7 +435,7 @@ public class QuorumPeer extends ZooKeepe
     /**
      * The current tick
      */
-    protected volatile int tick;
+    protected AtomicInteger tick = new AtomicInteger();
 
     /**
      * Whether or not to listen on all IPs for the two quorum ports
@@ -1257,7 +1258,7 @@ public class QuorumPeer extends ZooKeepe
      * Get the current tick
      */
     public int getTick() {
-        return tick;
+        return tick.get();
     }
 
     public QuorumVerifier configFromString(String s) throws IOException, ConfigException{