You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by br...@apache.org on 2011/02/24 00:18:45 UTC

svn commit: r1073983 - in /zookeeper/branches/branch-3.3: ./ src/java/main/org/apache/zookeeper/server/quorum/ src/java/test/org/apache/zookeeper/test/

Author: breed
Date: Wed Feb 23 23:18:44 2011
New Revision: 1073983

URL: http://svn.apache.org/viewvc?rev=1073983&view=rev
Log:
ZOOKEEPER-880. QuorumCnxManager$SendWorker grows without bounds

Modified:
    zookeeper/branches/branch-3.3/CHANGES.txt
    zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java
    zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java
    zookeeper/branches/branch-3.3/src/java/test/org/apache/zookeeper/test/CnxManagerTest.java

Modified: zookeeper/branches/branch-3.3/CHANGES.txt
URL: http://svn.apache.org/viewvc/zookeeper/branches/branch-3.3/CHANGES.txt?rev=1073983&r1=1073982&r2=1073983&view=diff
==============================================================================
--- zookeeper/branches/branch-3.3/CHANGES.txt (original)
+++ zookeeper/branches/branch-3.3/CHANGES.txt Wed Feb 23 23:18:44 2011
@@ -21,6 +21,8 @@ BUGFIXES:
 
   ZOOKEEPER-985. Test BookieRecoveryTest fails on trunk. (fpj via breed)
 
+  ZOOKEEPER-880. QuorumCnxManager$SendWorker grows without bounds (vishal via breed)
+
 IMPROVEMENTS:
   ZOOKEEPER-963. Make Forrest work with JDK6 (Carl Steinbach via henryr)
 

Modified: zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java
URL: http://svn.apache.org/viewvc/zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java?rev=1073983&r1=1073982&r2=1073983&view=diff
==============================================================================
--- zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java (original)
+++ zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java Wed Feb 23 23:18:44 2011
@@ -30,6 +30,8 @@ import java.util.Random;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.Date;
 
 import org.apache.log4j.Logger;
 
@@ -108,6 +110,11 @@ public class QuorumCnxManager {
      */
     public final Listener listener;
 
+    /*
+     * Counter to count worker threads
+     */
+    private AtomicInteger threadCnt = new AtomicInteger(0);
+
     static public class Message {
         Message(ByteBuffer buffer, long sid) {
             this.buffer = buffer;
@@ -183,7 +190,7 @@ public class QuorumCnxManager {
         // Otherwise proceed with the connection
         } else {    
             SendWorker sw = new SendWorker(s, sid);
-            RecvWorker rw = new RecvWorker(s, sid);
+            RecvWorker rw = new RecvWorker(s, sid, sw);
             sw.setRecv(rw);
 
             SendWorker vsw = senderWorkerMap.get(sid);
@@ -206,8 +213,6 @@ public class QuorumCnxManager {
         return false;
     }
 
-    
-    
     /**
      * If this server receives a connection request, then it gives up on the new
      * connection if it wins. Notice that it checks whether it has a connection
@@ -268,7 +273,7 @@ public class QuorumCnxManager {
         //Otherwise start worker threads to receive data.
         } else {
             SendWorker sw = new SendWorker(s, sid);
-            RecvWorker rw = new RecvWorker(s, sid);
+            RecvWorker rw = new RecvWorker(s, sid, sw);
             sw.setRecv(rw);
 
             SendWorker vsw = senderWorkerMap.get(sid);
@@ -435,6 +440,20 @@ public class QuorumCnxManager {
     	}   	
     }
 
+    /*
+     * Return number of worker threads
+     */
+    public long getThreadCount() {
+        return threadCnt.get();
+    }
+
+    /**
+     * Return reference to QuorumPeer
+     */
+    public QuorumPeer getQuorumPeer() {
+        return self;
+    }
+
     /**
      * Thread to listen on some port
      */
@@ -564,6 +583,7 @@ public class QuorumCnxManager {
                 LOG.debug("Removing entry from senderWorkerMap sid=" + sid);
             }
             senderWorkerMap.remove(sid);
+            threadCnt.decrementAndGet();
             return running;
         }
         
@@ -583,6 +603,7 @@ public class QuorumCnxManager {
 
         @Override
         public void run() {
+            threadCnt.incrementAndGet();
             try{
                 ByteBuffer b = lastMessageSent.get(sid); 
                 if(b != null) send(b);   
@@ -630,10 +651,12 @@ public class QuorumCnxManager {
         Long sid;
         SocketChannel channel;
         volatile boolean running = true;
+        final SendWorker sw;
 
-        RecvWorker(SocketChannel channel, Long sid) {
+        RecvWorker(SocketChannel channel, Long sid, SendWorker sw) {
             this.sid = sid;
             this.channel = channel;
+            this.sw = sw;
         }
         
         /**
@@ -651,11 +674,13 @@ public class QuorumCnxManager {
             running = false;            
 
             this.interrupt();
+            threadCnt.decrementAndGet();
             return running;
         }
 
         @Override
         public void run() {
+            threadCnt.incrementAndGet();
             try {
                 byte[] size = new byte[4];
                 ByteBuffer msgLength = ByteBuffer.wrap(size);
@@ -702,6 +727,8 @@ public class QuorumCnxManager {
                 LOG.warn("Connection broken for id " + sid + ", my id = " + 
                         self.getId() + ", error = " + e);
             } finally {
+                LOG.warn("Interrupting SendWorker");
+                sw.finish();
                 try{
                     channel.socket().close();
                 } catch (IOException e) {

Modified: zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java
URL: http://svn.apache.org/viewvc/zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java?rev=1073983&r1=1073982&r2=1073983&view=diff
==============================================================================
--- zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java (original)
+++ zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java Wed Feb 23 23:18:44 2011
@@ -73,6 +73,7 @@ public class QuorumPeer extends Thread i
     QuorumBean jmxQuorumBean;
     LocalPeerBean jmxLocalPeerBean;
     LeaderElectionBean jmxLeaderElectionBean;
+    QuorumCnxManager qcm;
     
     /* ZKDatabase is a top level member of quorumpeer 
      * which will be used in all the zookeeperservers
@@ -532,11 +533,11 @@ public class QuorumPeer extends Thread i
             le = new AuthFastLeaderElection(this, true);
             break;
         case 3:
-            QuorumCnxManager mng = new QuorumCnxManager(this);
-            QuorumCnxManager.Listener listener = mng.listener;
+            qcm = new QuorumCnxManager(this);
+            QuorumCnxManager.Listener listener = qcm.listener;
             if(listener != null){
                 listener.start();
-                le = new FastLeaderElection(this,mng);
+                le = new FastLeaderElection(this, qcm);
             } else {
                 LOG.error("Null listener when initializing cnx manager");
             }
@@ -954,4 +955,11 @@ public class QuorumPeer extends Thread i
     public boolean isRunning() {
         return running;
     }
+
+    /**
+     * get reference to QuorumCnxManager
+     */
+    public QuorumCnxManager getQuorumCnxManager() {
+        return qcm;
+}
 }

Modified: zookeeper/branches/branch-3.3/src/java/test/org/apache/zookeeper/test/CnxManagerTest.java
URL: http://svn.apache.org/viewvc/zookeeper/branches/branch-3.3/src/java/test/org/apache/zookeeper/test/CnxManagerTest.java?rev=1073983&r1=1073982&r2=1073983&view=diff
==============================================================================
--- zookeeper/branches/branch-3.3/src/java/test/org/apache/zookeeper/test/CnxManagerTest.java (original)
+++ zookeeper/branches/branch-3.3/src/java/test/org/apache/zookeeper/test/CnxManagerTest.java Wed Feb 23 23:18:44 2011
@@ -25,6 +25,9 @@ import java.nio.channels.SocketChannel;
 import java.util.HashMap;
 import java.util.Random;
 import java.util.concurrent.TimeUnit;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.Map.Entry;
 
 import junit.framework.TestCase;
 
@@ -36,7 +39,7 @@ import org.apache.zookeeper.server.quoru
 import org.apache.zookeeper.server.quorum.QuorumPeer.QuorumServer;
 import org.apache.zookeeper.server.quorum.QuorumPeer.ServerState;
 import org.junit.Test;
-
+import org.junit.Assert;
 
 /**
  * This test uses two mock servers, each running an instance of QuorumCnxManager.
@@ -270,5 +273,57 @@ public class CnxManagerTest extends Test
         } catch (Exception e) {
             LOG.info("Socket has been closed as expected");
         }
+        peer.shutdown();
+        cnxManager.halt();
     }   
-}
\ No newline at end of file
+
+    /*
+     * Test if Worker threads are getting killed after connection loss
+     */
+    @Test
+    public void testWorkerThreads() throws Exception {
+        ArrayList<QuorumPeer> peerList = new ArrayList<QuorumPeer>();
+
+        for (int sid = 0; sid < 3; sid++) {
+            QuorumPeer peer = new QuorumPeer(peers, tmpdir[sid], tmpdir[sid],
+                                             port[sid], 3, sid, 1000, 2, 2);
+            LOG.info("Starting peer " + peer.getId());
+            peer.start();
+            peerList.add(sid, peer);
+        }
+        Thread.sleep(10000);
+        verifyThreadCount(peerList, 4);
+        for (int myid = 0; myid < 3; myid++) {
+            for (int i = 0; i < 5; i++) {
+                // halt one of the listeners and verify count
+                QuorumPeer peer = peerList.get(myid);
+                LOG.info("Round " + i + ", halting peer " + peer.getId());
+                peer.shutdown();
+                peerList.remove(myid);
+                Thread.sleep((peer.getSyncLimit() * peer.getTickTime()) + 2000);
+                verifyThreadCount(peerList, 2);
+
+                // Restart halted node and verify count
+                peer = new QuorumPeer(peers, tmpdir[myid], tmpdir[myid],
+                                        port[myid], 3, myid, 1000, 2, 2);
+                LOG.info("Round " + i + ", restarting peer " + peer.getId());
+                peer.start();
+                peerList.add(myid, peer);
+                Thread.sleep(peer.getInitLimit() * peer.getTickTime() * 2);
+                verifyThreadCount(peerList, 4);
+            }
+        }
+    }
+
+    public void verifyThreadCount(ArrayList<QuorumPeer> peerList, long ecnt) {
+        for (int myid = 0; myid < peerList.size(); myid++) {
+            QuorumPeer peer = peerList.get(myid);
+            QuorumCnxManager cnxManager = peer.getQuorumCnxManager();
+            long cnt = cnxManager.getThreadCount();
+            if (cnt != ecnt) {
+                Assert.fail(new Date() + " Incorrect number of Worker threads for sid=" +
+                        myid + " expected " + ecnt + " found " + cnt);
+            }
+        }
+    }
+}