You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by ph...@apache.org on 2011/02/07 20:26:48 UTC

svn commit: r1068067 - in /zookeeper/trunk: ./ src/java/main/org/apache/zookeeper/server/quorum/ src/java/test/bin/ src/java/test/config/

Author: phunt
Date: Mon Feb  7 19:26:47 2011
New Revision: 1068067

URL: http://svn.apache.org/viewvc?rev=1068067&view=rev
Log:
ZOOKEEPER-902. Fix findbug issue in trunk "Malicious code vulnerability"

Modified:
    zookeeper/trunk/CHANGES.txt
    zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/AuthFastLeaderElection.java
    zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Follower.java
    zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Learner.java
    zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Observer.java
    zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java
    zookeeper/trunk/src/java/test/bin/test-patch.properties
    zookeeper/trunk/src/java/test/config/findbugsExcludeFile.xml

Modified: zookeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/zookeeper/trunk/CHANGES.txt?rev=1068067&r1=1068066&r2=1068067&view=diff
==============================================================================
--- zookeeper/trunk/CHANGES.txt (original)
+++ zookeeper/trunk/CHANGES.txt Mon Feb  7 19:26:47 2011
@@ -168,7 +168,11 @@ BUGFIXES: 
 
   ZOOKEEPER-882. Startup loads last transaction from snapshot (j:ared via fpj)
 
-  ZOOKEEPER-962. leader/follower coherence issue when follower is receiving a DIFF (camille fournier via breed)
+  ZOOKEEPER-962. leader/follower coherence issue when follower is receiving a DIFF
+  (camille fournier via breed)
+
+  ZOOKEEPER-902. Fix findbug issue in trunk "Malicious code vulnerability"
+  (flavio and phunt via phunt)
 
 IMPROVEMENTS:
   ZOOKEEPER-724. Improve junit test integration - log harness information 
@@ -190,7 +194,8 @@ IMPROVEMENTS:
   ZOOKEEPER-797 c client source with AI_ADDRCONFIG cannot be compiled with
   early glibc (Qian Ye via phunt)
 
-  ZOOKEEPER-790. Last processed zxid set prematurely while establishing leadership (fpj via breed)
+  ZOOKEEPER-790. Last processed zxid set prematurely while establishing leadership
+  (fpj via breed)
 
   ZOOKEEPER-821. Add ZooKeeper version information to zkpython (Rich
   Schumacher via mahadev)

Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/AuthFastLeaderElection.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/AuthFastLeaderElection.java?rev=1068067&r1=1068066&r2=1068067&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/AuthFastLeaderElection.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/AuthFastLeaderElection.java Mon Feb  7 19:26:47 2011
@@ -25,7 +25,11 @@ import java.net.InetSocketAddress;
 import java.net.SocketException;
 import java.nio.ByteBuffer;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.Semaphore;
 
@@ -193,11 +197,11 @@ public class AuthFastLeaderElection impl
         long lastProposedLeader;
         long lastProposedZxid;
         long lastEpoch;
-        final LinkedBlockingQueue<Long> acksqueue;
-        final HashMap<Long, Long> challengeMap;
-        final HashMap<Long, Semaphore> challengeMutex;
-        final HashMap<Long, Semaphore> ackMutex;
-        final HashMap<InetSocketAddress, HashMap<Long, Long>> addrChallengeMap;
+        final Set<Long> ackset;
+        final ConcurrentHashMap<Long, Long> challengeMap;
+        final ConcurrentHashMap<Long, Semaphore> challengeMutex;
+        final ConcurrentHashMap<Long, Semaphore> ackMutex;
+        final ConcurrentHashMap<InetSocketAddress, ConcurrentHashMap<Long, Long>> addrChallengeMap;
 
         class WorkerReceiver implements Runnable {
 
@@ -210,11 +214,9 @@ public class AuthFastLeaderElection impl
             }
 
             boolean saveChallenge(long tag, long challenge) {
-
-                //Long l = challengeMutex.get(tag);
                 Semaphore s = challengeMutex.get(tag);
                 if (s != null) {
-                        synchronized (challengeMap) {
+                        synchronized (Messenger.this) {
                             challengeMap.put(tag, challenge);
                             challengeMutex.remove(tag);
                         }
@@ -310,27 +312,30 @@ public class AuthFastLeaderElection impl
                         InetSocketAddress addr = (InetSocketAddress) responsePacket
                                 .getSocketAddress();
                         if (authEnabled) {
-                            if (addrChallengeMap.get(addr).get(tag) != null) {
-                                recChallenge = responseBuffer.getLong();
-
-                                if (addrChallengeMap.get(addr).get(tag) == recChallenge) {
-                                    recvqueue.offer(n);
-
-                                    ToSend a = new ToSend(ToSend.mType.ack,
-                                            tag, current.id,
-                                            current.zxid,
-                                            logicalclock, self.getPeerState(),
-                                            addr);
-
-                                    sendqueue.offer(a);
+                            ConcurrentHashMap<Long, Long> tmpMap = addrChallengeMap.get(addr);
+                            if(tmpMap != null){
+                                if (tmpMap.get(tag) != null) {
+                                    recChallenge = responseBuffer.getLong();
+
+                                    if (tmpMap.get(tag) == recChallenge) {
+                                        recvqueue.offer(n);
+
+                                        ToSend a = new ToSend(ToSend.mType.ack,
+                                                tag, current.id,
+                                                current.zxid,
+                                                logicalclock, self.getPeerState(),
+                                                addr);
+
+                                        sendqueue.offer(a);
+                                    } else {
+                                        LOG.warn("Incorrect challenge: "
+                                                + recChallenge + ", "
+                                                + addrChallengeMap.toString());
+                                    }
                                 } else {
-                                    LOG.warn("Incorrect challenge: "
-                                            + recChallenge + ", "
-                                            + addrChallengeMap.toString());
+                                    LOG.warn("No challenge for host: " + addr
+                                            + " " + tag);
                                 }
-                            } else {
-                                LOG.warn("No challenge for host: " + addr
-                                        + " " + tag);
                             }
                         } else {
                             recvqueue.offer(n);
@@ -354,11 +359,17 @@ public class AuthFastLeaderElection impl
                             s.release();
                         else LOG.error("Empty ack semaphore");
                         
-                        acksqueue.offer(tag);
+                        ackset.add(tag);
 
                         if (authEnabled) {
-                            addrChallengeMap.get(responsePacket
-                                            .getSocketAddress()).remove(tag);
+                            ConcurrentHashMap<Long, Long> tmpMap = addrChallengeMap.get(responsePacket
+                                    .getSocketAddress());
+                            if(tmpMap != null) {
+                                tmpMap.remove(tag);
+                            } else {
+                                LOG.warn("No such address in the ensemble configuration " + responsePacket
+                                    .getSocketAddress());
+                            }
                         }
 
                         if (ackstate != QuorumPeer.ServerState.LOOKING) {
@@ -485,40 +496,46 @@ public class AuthFastLeaderElection impl
                      */
 
                     long newChallenge;
-                    if (addrChallengeMap.get(m.addr).containsKey(m.tag)) {
-                        newChallenge = addrChallengeMap.get(m.addr).get(m.tag);
-                    } else {
-                        newChallenge = genChallenge();
-                    }
+                    ConcurrentHashMap<Long, Long> tmpMap = addrChallengeMap.get(m.addr); 
+                    if(tmpMap != null){
+                        Long tmpLong = tmpMap.get(m.tag);
+                        if (tmpLong != null) {
+                            newChallenge = tmpLong;
+                        } else {
+                            newChallenge = genChallenge();
+                        }
 
-                    addrChallengeMap.get(m.addr).put(m.tag, newChallenge);
+                        tmpMap.put(m.tag, newChallenge);
 
-                    requestBuffer.clear();
-                    requestBuffer.putInt(ToSend.mType.challenge.ordinal());
-                    requestBuffer.putLong(m.tag);
-                    requestBuffer.putInt(m.state.ordinal());
-                    requestBuffer.putLong(newChallenge);
-                    zeroes = new byte[24];
-                    requestBuffer.put(zeroes);
+                        requestBuffer.clear();
+                        requestBuffer.putInt(ToSend.mType.challenge.ordinal());
+                        requestBuffer.putLong(m.tag);
+                        requestBuffer.putInt(m.state.ordinal());
+                        requestBuffer.putLong(newChallenge);
+                        zeroes = new byte[24];
+                        requestBuffer.put(zeroes);
 
-                    requestPacket.setLength(48);
-                    try {
-                        requestPacket.setSocketAddress(m.addr);
-                    } catch (IllegalArgumentException e) {
-                        // Sun doesn't include the address that causes this
-                        // exception to be thrown, so we wrap the exception
-                        // in order to capture this critical detail.
-                        throw new IllegalArgumentException(
-                                "Unable to set socket address on packet, msg:"
-                                + e.getMessage() + " with addr:" + m.addr,
-                                e);
-                    }
+                        requestPacket.setLength(48);
+                        try {
+                            requestPacket.setSocketAddress(m.addr);
+                        } catch (IllegalArgumentException e) {
+                            // Sun doesn't include the address that causes this
+                            // exception to be thrown, so we wrap the exception
+                            // in order to capture this critical detail.
+                            throw new IllegalArgumentException(
+                                    "Unable to set socket address on packet, msg:"
+                                    + e.getMessage() + " with addr:" + m.addr,
+                                    e);
+                        }
 
 
-                    try {
-                        mySocket.send(requestPacket);
-                    } catch (IOException e) {
-                        LOG.warn("Exception while sending challenge: ", e);
+                        try {
+                            mySocket.send(requestPacket);
+                        } catch (IOException e) {
+                            LOG.warn("Exception while sending challenge: ", e);
+                        }
+                    } else {
+                        LOG.error("Address is not in the configuration: " + m.addr);
                     }
 
                     break;
@@ -573,9 +590,8 @@ public class AuthFastLeaderElection impl
                                     double timeout = ackWait
                                             * java.lang.Math.pow(2, attempts);
 
-                                    //Long l = new Long(m.tag);
                                     Semaphore s = new Semaphore(0);
-                                    synchronized (s) {
+                                    synchronized(Messenger.this) {
                                         challengeMutex.put(m.tag, s);
                                         s.tryAcquire((long) timeout, TimeUnit.MILLISECONDS);
                                         myChallenge = challengeMap
@@ -598,7 +614,12 @@ public class AuthFastLeaderElection impl
 
                             if (authEnabled) {
                                 requestBuffer.position(40);
-                                requestBuffer.putLong(challengeMap.get(m.tag));
+                                Long tmpLong = challengeMap.get(m.tag);
+                                if(tmpLong != null){
+                                    requestBuffer.putLong(tmpLong);
+                                } else {
+                                    LOG.warn("No challenge with tag: " + m.tag);
+                                }
                             }
                             mySocket.send(requestPacket);
                             try {
@@ -610,26 +631,11 @@ public class AuthFastLeaderElection impl
                             } catch (InterruptedException e) {
                                 LOG.warn("Ack exception: ", e);
                             }
-                            synchronized (acksqueue) {
-                                for (int i = 0; i < acksqueue.size(); ++i) {
-                                    Long newack = acksqueue.poll();
-
-                                    /*
-                                     * Under highly concurrent load, a thread
-                                     * may get into this loop but by the time it
-                                     * tries to read from the queue, the queue
-                                     * is empty. There are two alternatives:
-                                     * synchronize this block, or test if newack
-                                     * is null.
-                                     *
-                                     */
-
-                                    if (newack == m.tag) {
-                                        myAck = true;
-                                    } else
-                                        acksqueue.offer(newack);
-                                }
-                            }
+                            
+                            if(ackset.remove(m.tag)){
+                                myAck = true;
+                            } 
+                        
                         } catch (IOException e) {
                             LOG.warn("Sending exception: ", e);
                             /*
@@ -640,8 +646,8 @@ public class AuthFastLeaderElection impl
                             /*
                              * Received ack successfully, so return
                              */
-                            if (challengeMap.get(m.tag) != null)
-                                challengeMap.remove(m.tag);
+                            challengeMap.remove(m.tag);
+                            
                             return;
                         } else
                             attempts++;
@@ -690,17 +696,17 @@ public class AuthFastLeaderElection impl
         }
 
         public boolean queueEmpty() {
-            return (sendqueue.isEmpty() || acksqueue.isEmpty() || recvqueue
+            return (sendqueue.isEmpty() || ackset.isEmpty() || recvqueue
                     .isEmpty());
         }
 
         Messenger(int threads, DatagramSocket s) {
             mySocket = s;
-            acksqueue = new LinkedBlockingQueue<Long>();
-            challengeMap = new HashMap<Long, Long>();
-            challengeMutex = new HashMap<Long, Semaphore>();
-            ackMutex = new HashMap<Long, Semaphore>();
-            addrChallengeMap = new HashMap<InetSocketAddress, HashMap<Long, Long>>();
+            ackset =  Collections.<Long>newSetFromMap(new ConcurrentHashMap<Long, Boolean>());
+            challengeMap = new ConcurrentHashMap<Long, Long>();
+            challengeMutex = new ConcurrentHashMap<Long, Semaphore>();
+            ackMutex = new ConcurrentHashMap<Long, Semaphore>();
+            addrChallengeMap = new ConcurrentHashMap<InetSocketAddress, ConcurrentHashMap<Long, Long>>();
             lastProposedLeader = 0;
             lastProposedZxid = 0;
             lastEpoch = 0;
@@ -715,7 +721,7 @@ public class AuthFastLeaderElection impl
             for (QuorumServer server : self.getVotingView().values()) {
                 InetSocketAddress saddr = new InetSocketAddress(server.addr
                         .getAddress(), port);
-                addrChallengeMap.put(saddr, new HashMap<Long, Long>());
+                addrChallengeMap.put(saddr, new ConcurrentHashMap<Long, Long>());
             }
 
             Thread t = new Thread(new WorkerReceiver(s, this),

Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Follower.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Follower.java?rev=1068067&r1=1068066&r2=1068067&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Follower.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Follower.java Mon Feb  7 19:26:47 2011
@@ -86,11 +86,8 @@ public class Follower extends Learner{
                     e1.printStackTrace();
                 }
     
-                synchronized (pendingRevalidations) {
-                    // clear pending revalidations
-                    pendingRevalidations.clear();
-                    pendingRevalidations.notifyAll();
-                }
+                // clear pending revalidations
+                pendingRevalidations.clear();
             }
         } finally {
             zk.unregisterJMX((Learner)this);

Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Learner.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Learner.java?rev=1068067&r1=1068066&r2=1068067&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Learner.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Learner.java Mon Feb  7 19:26:47 2011
@@ -384,16 +384,14 @@ public class Learner {       
         DataInputStream dis = new DataInputStream(bis);
         long sessionId = dis.readLong();
         boolean valid = dis.readBoolean();
-        synchronized (pendingRevalidations) {
-            ServerCnxn cnxn = pendingRevalidations
-                    .remove(sessionId);
-            if (cnxn == null) {
-                LOG.warn("Missing session 0x"
-                        + Long.toHexString(sessionId)
-                        + " for validation");
-            } else {
-                zk.finishSessionInit(cnxn, valid);
-            }
+        ServerCnxn cnxn = pendingRevalidations
+        .remove(sessionId);
+        if (cnxn == null) {
+            LOG.warn("Missing session 0x"
+                    + Long.toHexString(sessionId)
+                    + " for validation");
+        } else {
+            zk.finishSessionInit(cnxn, valid);
         }
         if (LOG.isTraceEnabled()) {
             ZooTrace.logTraceMessage(LOG,

Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Observer.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Observer.java?rev=1068067&r1=1068066&r2=1068067&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Observer.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Observer.java Mon Feb  7 19:26:47 2011
@@ -83,11 +83,8 @@ public class Observer extends Learner{  
                     e1.printStackTrace();
                 }
     
-                synchronized (pendingRevalidations) {
-                    // clear pending revalidations
-                    pendingRevalidations.clear();
-                    pendingRevalidations.notifyAll();
-                }
+                // clear pending revalidations
+                pendingRevalidations.clear();
             }
         } finally {
             zk.unregisterJMX(this);

Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java?rev=1068067&r1=1068066&r2=1068067&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java Mon Feb  7 19:26:47 2011
@@ -698,7 +698,6 @@ public class QuorumCnxManager {
         @Override
         public void run() {
             try {
-                byte[] size = new byte[4];
                 while (running && !shutdown && sock != null) {
                     /**
                      * Reads the first int to determine the length of the
@@ -716,9 +715,7 @@ public class QuorumCnxManager {
                     byte[] msgArray = new byte[length];
                     din.readFully(msgArray, 0, length);
                     ByteBuffer message = ByteBuffer.wrap(msgArray);
-                    synchronized (recvQueue) {
-                        recvQueue.put(new Message(message.duplicate(), sid));
-                    }
+                    recvQueue.put(new Message(message.duplicate(), sid));
                 }
             } catch (Exception e) {
                 LOG.warn("Connection broken for id " + sid + ", my id = " + 

Modified: zookeeper/trunk/src/java/test/bin/test-patch.properties
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/test/bin/test-patch.properties?rev=1068067&r1=1068066&r2=1068067&view=diff
==============================================================================
--- zookeeper/trunk/src/java/test/bin/test-patch.properties (original)
+++ zookeeper/trunk/src/java/test/bin/test-patch.properties Mon Feb  7 19:26:47 2011
@@ -14,5 +14,5 @@
 # limitations under the License.
 
 OK_RELEASEAUDIT_WARNINGS=24
-OK_FINDBUGS_WARNINGS=1
+OK_FINDBUGS_WARNINGS=0
 OK_JAVADOC_WARNINGS=0

Modified: zookeeper/trunk/src/java/test/config/findbugsExcludeFile.xml
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/test/config/findbugsExcludeFile.xml?rev=1068067&r1=1068066&r2=1068067&view=diff
==============================================================================
--- zookeeper/trunk/src/java/test/config/findbugsExcludeFile.xml (original)
+++ zookeeper/trunk/src/java/test/config/findbugsExcludeFile.xml Mon Feb  7 19:26:47 2011
@@ -32,7 +32,6 @@
     <Bug pattern="REC_CATCH_EXCEPTION" />
   </Match>
 
-
    <!-- If we cannot open a socket to elect a leader, then we should
             simply exit -->
    <Match>
@@ -108,4 +107,22 @@
   <Match>
     <Class name="org.apache.zookeeper.server.upgrade.DataTreeV1"/>
   </Match>
+
+  <!-- References code in a generated file that may or maynot be null -->
+  <Match>
+    <Class name="org.apache.zookeeper.Version" />
+    <Method name="getVersion" />
+    <Bug pattern="RCN_REDUNDANT_NULLCHECK_OF_NULL_VALUE" />
+  </Match>
+
+  <!-- sync'd object is also being used to protect the isrunning flag, this is ok -->
+  <Match>
+    <Class name="org.apache.zookeeper.ClientCnxn$EventThread"/>
+    <Bug code="JLM"/>
+    <Or>
+      <Method name="queuePacket" />
+      <Method name="run" />
+    </Or>
+  </Match>
+
 </FindBugsFilter>