You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by br...@apache.org on 2011/02/24 00:18:45 UTC
svn commit: r1073983 - in /zookeeper/branches/branch-3.3: ./
src/java/main/org/apache/zookeeper/server/quorum/
src/java/test/org/apache/zookeeper/test/
Author: breed
Date: Wed Feb 23 23:18:44 2011
New Revision: 1073983
URL: http://svn.apache.org/viewvc?rev=1073983&view=rev
Log:
ZOOKEEPER-880. QuorumCnxManager$SendWorker grows without bounds
Modified:
zookeeper/branches/branch-3.3/CHANGES.txt
zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java
zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java
zookeeper/branches/branch-3.3/src/java/test/org/apache/zookeeper/test/CnxManagerTest.java
Modified: zookeeper/branches/branch-3.3/CHANGES.txt
URL: http://svn.apache.org/viewvc/zookeeper/branches/branch-3.3/CHANGES.txt?rev=1073983&r1=1073982&r2=1073983&view=diff
==============================================================================
--- zookeeper/branches/branch-3.3/CHANGES.txt (original)
+++ zookeeper/branches/branch-3.3/CHANGES.txt Wed Feb 23 23:18:44 2011
@@ -21,6 +21,8 @@ BUGFIXES:
ZOOKEEPER-985. Test BookieRecoveryTest fails on trunk. (fpj via breed)
+ ZOOKEEPER-880. QuorumCnxManager$SendWorker grows without bounds (vishal via breed)
+
IMPROVEMENTS:
ZOOKEEPER-963. Make Forrest work with JDK6 (Carl Steinbach via henryr)
Modified: zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java
URL: http://svn.apache.org/viewvc/zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java?rev=1073983&r1=1073982&r2=1073983&view=diff
==============================================================================
--- zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java (original)
+++ zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java Wed Feb 23 23:18:44 2011
@@ -30,6 +30,8 @@ import java.util.Random;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.Date;
import org.apache.log4j.Logger;
@@ -108,6 +110,11 @@ public class QuorumCnxManager {
*/
public final Listener listener;
+ /*
+ * Counter to count worker threads
+ */
+ private AtomicInteger threadCnt = new AtomicInteger(0);
+
static public class Message {
Message(ByteBuffer buffer, long sid) {
this.buffer = buffer;
@@ -183,7 +190,7 @@ public class QuorumCnxManager {
// Otherwise proceed with the connection
} else {
SendWorker sw = new SendWorker(s, sid);
- RecvWorker rw = new RecvWorker(s, sid);
+ RecvWorker rw = new RecvWorker(s, sid, sw);
sw.setRecv(rw);
SendWorker vsw = senderWorkerMap.get(sid);
@@ -206,8 +213,6 @@ public class QuorumCnxManager {
return false;
}
-
-
/**
* If this server receives a connection request, then it gives up on the new
* connection if it wins. Notice that it checks whether it has a connection
@@ -268,7 +273,7 @@ public class QuorumCnxManager {
//Otherwise start worker threads to receive data.
} else {
SendWorker sw = new SendWorker(s, sid);
- RecvWorker rw = new RecvWorker(s, sid);
+ RecvWorker rw = new RecvWorker(s, sid, sw);
sw.setRecv(rw);
SendWorker vsw = senderWorkerMap.get(sid);
@@ -435,6 +440,20 @@ public class QuorumCnxManager {
}
}
+ /*
+ * Return number of worker threads
+ */
+ public long getThreadCount() {
+ return threadCnt.get();
+ }
+
+ /**
+ * Return reference to QuorumPeer
+ */
+ public QuorumPeer getQuorumPeer() {
+ return self;
+ }
+
/**
* Thread to listen on some port
*/
@@ -564,6 +583,7 @@ public class QuorumCnxManager {
LOG.debug("Removing entry from senderWorkerMap sid=" + sid);
}
senderWorkerMap.remove(sid);
+ threadCnt.decrementAndGet();
return running;
}
@@ -583,6 +603,7 @@ public class QuorumCnxManager {
@Override
public void run() {
+ threadCnt.incrementAndGet();
try{
ByteBuffer b = lastMessageSent.get(sid);
if(b != null) send(b);
@@ -630,10 +651,12 @@ public class QuorumCnxManager {
Long sid;
SocketChannel channel;
volatile boolean running = true;
+ final SendWorker sw;
- RecvWorker(SocketChannel channel, Long sid) {
+ RecvWorker(SocketChannel channel, Long sid, SendWorker sw) {
this.sid = sid;
this.channel = channel;
+ this.sw = sw;
}
/**
@@ -651,11 +674,13 @@ public class QuorumCnxManager {
running = false;
this.interrupt();
+ threadCnt.decrementAndGet();
return running;
}
@Override
public void run() {
+ threadCnt.incrementAndGet();
try {
byte[] size = new byte[4];
ByteBuffer msgLength = ByteBuffer.wrap(size);
@@ -702,6 +727,8 @@ public class QuorumCnxManager {
LOG.warn("Connection broken for id " + sid + ", my id = " +
self.getId() + ", error = " + e);
} finally {
+ LOG.warn("Interrupting SendWorker");
+ sw.finish();
try{
channel.socket().close();
} catch (IOException e) {
Modified: zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java
URL: http://svn.apache.org/viewvc/zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java?rev=1073983&r1=1073982&r2=1073983&view=diff
==============================================================================
--- zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java (original)
+++ zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java Wed Feb 23 23:18:44 2011
@@ -73,6 +73,7 @@ public class QuorumPeer extends Thread i
QuorumBean jmxQuorumBean;
LocalPeerBean jmxLocalPeerBean;
LeaderElectionBean jmxLeaderElectionBean;
+ QuorumCnxManager qcm;
/* ZKDatabase is a top level member of quorumpeer
* which will be used in all the zookeeperservers
@@ -532,11 +533,11 @@ public class QuorumPeer extends Thread i
le = new AuthFastLeaderElection(this, true);
break;
case 3:
- QuorumCnxManager mng = new QuorumCnxManager(this);
- QuorumCnxManager.Listener listener = mng.listener;
+ qcm = new QuorumCnxManager(this);
+ QuorumCnxManager.Listener listener = qcm.listener;
if(listener != null){
listener.start();
- le = new FastLeaderElection(this,mng);
+ le = new FastLeaderElection(this, qcm);
} else {
LOG.error("Null listener when initializing cnx manager");
}
@@ -954,4 +955,11 @@ public class QuorumPeer extends Thread i
public boolean isRunning() {
return running;
}
+
+ /**
+ * get reference to QuorumCnxManager
+ */
+ public QuorumCnxManager getQuorumCnxManager() {
+ return qcm;
+}
}
Modified: zookeeper/branches/branch-3.3/src/java/test/org/apache/zookeeper/test/CnxManagerTest.java
URL: http://svn.apache.org/viewvc/zookeeper/branches/branch-3.3/src/java/test/org/apache/zookeeper/test/CnxManagerTest.java?rev=1073983&r1=1073982&r2=1073983&view=diff
==============================================================================
--- zookeeper/branches/branch-3.3/src/java/test/org/apache/zookeeper/test/CnxManagerTest.java (original)
+++ zookeeper/branches/branch-3.3/src/java/test/org/apache/zookeeper/test/CnxManagerTest.java Wed Feb 23 23:18:44 2011
@@ -25,6 +25,9 @@ import java.nio.channels.SocketChannel;
import java.util.HashMap;
import java.util.Random;
import java.util.concurrent.TimeUnit;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.Map.Entry;
import junit.framework.TestCase;
@@ -36,7 +39,7 @@ import org.apache.zookeeper.server.quoru
import org.apache.zookeeper.server.quorum.QuorumPeer.QuorumServer;
import org.apache.zookeeper.server.quorum.QuorumPeer.ServerState;
import org.junit.Test;
-
+import org.junit.Assert;
/**
* This test uses two mock servers, each running an instance of QuorumCnxManager.
@@ -270,5 +273,57 @@ public class CnxManagerTest extends Test
} catch (Exception e) {
LOG.info("Socket has been closed as expected");
}
+ peer.shutdown();
+ cnxManager.halt();
}
-}
\ No newline at end of file
+
+ /*
+ * Test if Worker threads are getting killed after connection loss
+ */
+ @Test
+ public void testWorkerThreads() throws Exception {
+ ArrayList<QuorumPeer> peerList = new ArrayList<QuorumPeer>();
+
+ for (int sid = 0; sid < 3; sid++) {
+ QuorumPeer peer = new QuorumPeer(peers, tmpdir[sid], tmpdir[sid],
+ port[sid], 3, sid, 1000, 2, 2);
+ LOG.info("Starting peer " + peer.getId());
+ peer.start();
+ peerList.add(sid, peer);
+ }
+ Thread.sleep(10000);
+ verifyThreadCount(peerList, 4);
+ for (int myid = 0; myid < 3; myid++) {
+ for (int i = 0; i < 5; i++) {
+ // halt one of the listeners and verify count
+ QuorumPeer peer = peerList.get(myid);
+ LOG.info("Round " + i + ", halting peer " + peer.getId());
+ peer.shutdown();
+ peerList.remove(myid);
+ Thread.sleep((peer.getSyncLimit() * peer.getTickTime()) + 2000);
+ verifyThreadCount(peerList, 2);
+
+ // Restart halted node and verify count
+ peer = new QuorumPeer(peers, tmpdir[myid], tmpdir[myid],
+ port[myid], 3, myid, 1000, 2, 2);
+ LOG.info("Round " + i + ", restarting peer " + peer.getId());
+ peer.start();
+ peerList.add(myid, peer);
+ Thread.sleep(peer.getInitLimit() * peer.getTickTime() * 2);
+ verifyThreadCount(peerList, 4);
+ }
+ }
+ }
+
+ public void verifyThreadCount(ArrayList<QuorumPeer> peerList, long ecnt) {
+ for (int myid = 0; myid < peerList.size(); myid++) {
+ QuorumPeer peer = peerList.get(myid);
+ QuorumCnxManager cnxManager = peer.getQuorumCnxManager();
+ long cnt = cnxManager.getThreadCount();
+ if (cnt != ecnt) {
+ Assert.fail(new Date() + " Incorrect number of Worker threads for sid=" +
+ myid + " expected " + ecnt + " found " + cnt);
+ }
+ }
+ }
+}