You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by ph...@apache.org on 2012/12/19 19:07:14 UTC

svn commit: r1423990 [1/2] - in /zookeeper/trunk: ./ src/java/main/org/apache/zookeeper/server/ src/java/test/org/apache/zookeeper/test/

Author: phunt
Date: Wed Dec 19 18:07:14 2012
New Revision: 1423990

URL: http://svn.apache.org/viewvc?rev=1423990&view=rev
Log:
ZOOKEEPER-1504. Multi-thread NIOServerCnxn (Jay Shrauner via phunt)

Added:
    zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ExpiryQueue.java
    zookeeper/trunk/src/java/main/org/apache/zookeeper/server/RateLogger.java
    zookeeper/trunk/src/java/test/org/apache/zookeeper/test/ServerCnxnTest.java
Modified:
    zookeeper/trunk/CHANGES.txt
    zookeeper/trunk/src/java/main/org/apache/zookeeper/server/NIOServerCnxn.java
    zookeeper/trunk/src/java/main/org/apache/zookeeper/server/NIOServerCnxnFactory.java
    zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ServerCnxn.java
    zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ServerCnxnFactory.java
    zookeeper/trunk/src/java/main/org/apache/zookeeper/server/SessionTrackerImpl.java
    zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java

Modified: zookeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/zookeeper/trunk/CHANGES.txt?rev=1423990&r1=1423989&r2=1423990&view=diff
==============================================================================
--- zookeeper/trunk/CHANGES.txt (original)
+++ zookeeper/trunk/CHANGES.txt Wed Dec 19 18:07:14 2012
@@ -450,6 +450,8 @@ IMPROVEMENTS:
   ZOOKEEPER-1335. Add support for --config to zkEnv.sh to specify a config
   directory different than what is expected (Arpit Gupta via mahadev)
 
+  ZOOKEEPER-1504. Multi-thread NIOServerCnxn (Jay Shrauner via phunt)
+
 Release 3.4.0 - 
 
 Non-backward compatible changes:

Added: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ExpiryQueue.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ExpiryQueue.java?rev=1423990&view=auto
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ExpiryQueue.java (added)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ExpiryQueue.java Wed Dec 19 18:07:14 2012
@@ -0,0 +1,180 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server;
+
+import java.io.PrintWriter;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Date;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * ExpiryQueue tracks elements in time sorted fixed duration buckets.
+ * It's used by SessionTrackerImpl to expire sessions and NIOServerCnxnFactory
+ * to expire connections.
+ */
+public class ExpiryQueue<E> {
+    private final ConcurrentHashMap<E, Long> elemMap =
+        new ConcurrentHashMap<E, Long>();
+    /**
+     * The maximum number of buckets is equal to max timeout/expirationInterval,
+     * so the expirationInterval should not be too small compared to the
+     * max timeout that this expiry queue needs to maintain.
+     */
+    private final ConcurrentHashMap<Long, Set<E>> expiryMap =
+        new ConcurrentHashMap<Long, Set<E>>();
+
+    private final AtomicLong nextExpirationTime = new AtomicLong();
+    private final int expirationInterval;
+
+    public ExpiryQueue(int expirationInterval) {
+        this.expirationInterval = expirationInterval;
+        nextExpirationTime.set(roundToNextInterval(System.currentTimeMillis()));
+    }
+
+    private long roundToNextInterval(long time) {
+        return (time / expirationInterval + 1) * expirationInterval;
+    }
+
+    /**
+     * Removes element from the queue.
+     * @param elem  element to remove
+     * @return      time at which the element was set to expire, or null if
+     *              it wasn't present
+     */
+    public Long remove(E elem) {
+        Long expiryTime = elemMap.remove(elem);
+        if (expiryTime != null) {
+            Set<E> set = expiryMap.get(expiryTime);
+            if (set != null) {
+                set.remove(elem);
+                // We don't need to worry about removing empty sets,
+                // they'll eventually be removed when they expire.
+            }
+        }
+        return expiryTime;
+    }
+
+    /**
+     * Adds or updates expiration time for element in queue, rounding the
+     * timeout to the expiry interval bucketed used by this queue.
+     * @param elem     element to add/update
+     * @param timeout  timout in milliseconds
+     * @return         time at which the element is now set to expire if
+     *                 changed, or null if unchanged
+     */
+    public Long update(E elem, int timeout) {
+        Long prevExpiryTime = elemMap.get(elem);
+        long now = System.currentTimeMillis();
+        Long newExpiryTime = roundToNextInterval(now + timeout);
+
+        if (newExpiryTime.equals(prevExpiryTime)) {
+            // No change, so nothing to update
+            return null;
+        }
+
+        // First add the elem to the new expiry time bucket in expiryMap.
+        Set<E> set = expiryMap.get(newExpiryTime);
+        if (set == null) {
+            // Construct a ConcurrentHashSet using a ConcurrentHashMap
+            set = Collections.newSetFromMap(
+                new ConcurrentHashMap<E, Boolean>());
+            // Put the new set in the map, but only if another thread
+            // hasn't beaten us to it
+            Set<E> existingSet = expiryMap.putIfAbsent(newExpiryTime, set);
+            if (existingSet != null) {
+                set = existingSet;
+            }
+        }
+        set.add(elem);
+
+        // Map the elem to the new expiry time. If a different previous
+        // mapping was present, clean up the previous expiry bucket.
+        prevExpiryTime = elemMap.put(elem, newExpiryTime);
+        if (prevExpiryTime != null && !newExpiryTime.equals(prevExpiryTime)) {
+            Set<E> prevSet = expiryMap.get(prevExpiryTime);
+            if (prevSet != null) {
+                prevSet.remove(elem);
+            }
+        }
+        return newExpiryTime;
+    }
+
+    /**
+     * @return milliseconds until next expiration time, or 0 if has already past
+     */
+    public long getWaitTime() {
+        long now = System.currentTimeMillis();
+        long expirationTime = nextExpirationTime.get();
+        return now < expirationTime ? (expirationTime - now) : 0L;
+    }
+
+    /**
+     * Remove the next expired set of elements from expireMap. This method needs
+     * to be called frequently enough by checking getWaitTime(), otherwise there
+     * will be a backlog of empty sets queued up in expiryMap.
+     *
+     * @return next set of expired elements, or an empty set if none are
+     *         ready
+     */
+    public Set<E> poll() {
+        long now = System.currentTimeMillis();
+        long expirationTime = nextExpirationTime.get();
+        if (now < expirationTime) {
+            return Collections.emptySet();
+        }
+
+        Set<E> set = null;
+        long newExpirationTime = expirationTime + expirationInterval;
+        if (nextExpirationTime.compareAndSet(
+              expirationTime, newExpirationTime)) {
+            set = expiryMap.remove(expirationTime);
+        }
+        if (set == null) {
+            return Collections.emptySet();
+        }
+        return set;
+    }
+
+    public void dump(PrintWriter pwriter) {
+        pwriter.print("Sets (");
+        pwriter.print(expiryMap.size());
+        pwriter.print(")/(");
+        pwriter.print(elemMap.size());
+        pwriter.println("):");
+        ArrayList<Long> keys = new ArrayList<Long>(expiryMap.keySet());
+        Collections.sort(keys);
+        for (long time : keys) {
+            Set<E> set = expiryMap.get(time);
+            if (set != null) {
+                pwriter.print(set.size());
+                pwriter.print(" expire at ");
+                pwriter.print(new Date(time));
+                pwriter.println(":");
+                for (E elem : set) {
+                    pwriter.print("\t");
+                    pwriter.println(elem.toString());
+                }
+            }
+        }
+    }
+}
+

Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/NIOServerCnxn.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/NIOServerCnxn.java?rev=1423990&r1=1423989&r2=1423990&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/NIOServerCnxn.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/NIOServerCnxn.java Wed Dec 19 18:07:14 2012
@@ -29,9 +29,11 @@ import java.nio.ByteBuffer;
 import java.nio.channels.CancelledKeyException;
 import java.nio.channels.SelectionKey;
 import java.nio.channels.SocketChannel;
-import java.util.HashSet;
 import java.util.List;
+import java.util.Queue;
 import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.LinkedBlockingQueue;
 
 import org.apache.jute.BinaryInputArchive;
@@ -46,6 +48,7 @@ import org.apache.zookeeper.data.Id;
 import org.apache.zookeeper.proto.ReplyHeader;
 import org.apache.zookeeper.proto.RequestHeader;
 import org.apache.zookeeper.proto.WatcherEvent;
+import org.apache.zookeeper.server.NIOServerCnxnFactory.SelectorThread;
 import org.apache.zookeeper.server.quorum.Leader;
 import org.apache.zookeeper.server.quorum.LeaderZooKeeperServer;
 import org.apache.zookeeper.server.quorum.ReadOnlyZooKeeperServer;
@@ -58,58 +61,63 @@ import org.apache.zookeeper.server.util.
 public class NIOServerCnxn extends ServerCnxn {
     static final Logger LOG = LoggerFactory.getLogger(NIOServerCnxn.class);
 
-    NIOServerCnxnFactory factory;
+    private final NIOServerCnxnFactory factory;
 
-    SocketChannel sock;
+    private SocketChannel sock;
+
+    private final SelectorThread selectorThread;
 
     private final SelectionKey sk;
 
-    boolean initialized;
+    private boolean initialized;
 
-    ByteBuffer lenBuffer = ByteBuffer.allocate(4);
+    private final ByteBuffer lenBuffer = ByteBuffer.allocate(4);
 
-    ByteBuffer incomingBuffer = lenBuffer;
+    private ByteBuffer incomingBuffer = lenBuffer;
 
-    LinkedBlockingQueue<ByteBuffer> outgoingBuffers = new LinkedBlockingQueue<ByteBuffer>();
+    private final Queue<ByteBuffer> outgoingBuffers =
+        new LinkedBlockingQueue<ByteBuffer>();
 
-    int sessionTimeout;
+    private int sessionTimeout;
 
     private final ZooKeeperServer zkServer;
 
     /**
      * The number of requests that have been submitted but not yet responded to.
      */
-    int outstandingRequests;
+    private final AtomicInteger outstandingRequests = new AtomicInteger(0);
 
     /**
      * This is the id that uniquely identifies the session of a client. Once
      * this session is no longer active, the ephemeral nodes will go away.
      */
-    long sessionId;
+    private long sessionId;
 
-    static long nextSessionId = 1;
-    int outstandingLimit = 1;
+    private final int outstandingLimit;
 
     public NIOServerCnxn(ZooKeeperServer zk, SocketChannel sock,
-            SelectionKey sk, NIOServerCnxnFactory factory) throws IOException {
+                         SelectionKey sk, NIOServerCnxnFactory factory,
+                         SelectorThread selectorThread) throws IOException {
         this.zkServer = zk;
         this.sock = sock;
         this.sk = sk;
         this.factory = factory;
+        this.selectorThread = selectorThread;
         if (this.factory.login != null) {
             this.zooKeeperSaslServer = new ZooKeeperSaslServer(factory.login);
         }
-        if (zk != null) { 
+        if (zk != null) {
             outstandingLimit = zk.getGlobalOutstandingLimit();
+        } else {
+            outstandingLimit = 1;
         }
         sock.socket().setTcpNoDelay(true);
-        /* set socket linger to false, so that socket close does not
-         * block */
+        /* set socket linger to false, so that socket close does not block */
         sock.socket().setSoLinger(false, -1);
         InetAddress addr = ((InetSocketAddress) sock.socket()
                 .getRemoteSocketAddress()).getAddress();
         authInfo.add(new Id("ip", addr.getHostAddress()));
-        sk.interestOps(SelectionKey.OP_READ);
+        this.sessionTimeout = factory.sessionlessCnxnTimeout;
     }
 
     /* Send close connection packet to the client, doIO will eventually
@@ -127,56 +135,32 @@ public class NIOServerCnxn extends Serve
     void sendBufferSync(ByteBuffer bb) {
        try {
            /* configure socket to be blocking
-            * so that we dont have to do write in 
+            * so that we dont have to do write in
             * a tight while loop
             */
-           sock.configureBlocking(true);
            if (bb != ServerCnxnFactory.closeConn) {
                if (sock != null) {
+                   sock.configureBlocking(true);
                    sock.write(bb);
                }
                packetSent();
-           } 
+           }
        } catch (IOException ie) {
            LOG.error("Error sending data synchronously ", ie);
        }
     }
-    
-    public void sendBuffer(ByteBuffer bb) {
-        try {
-            if (bb != ServerCnxnFactory.closeConn) {
-                // We check if write interest here because if it is NOT set,
-                // nothing is queued, so we can try to send the buffer right
-                // away without waking up the selector
-                if ((sk.interestOps() & SelectionKey.OP_WRITE) == 0) {
-                    try {
-                        sock.write(bb);
-                    } catch (IOException e) {
-                        // we are just doing best effort right now
-                    }
-                }
-                // if there is nothing left to send, we are done
-                if (bb.remaining() == 0) {
-                    packetSent();
-                    return;
-                }
-            }
 
-            synchronized(this.factory){
-                sk.selector().wakeup();
-                if (LOG.isTraceEnabled()) {
-                    LOG.trace("Add a buffer to outgoingBuffers, sk " + sk
-                            + " is valid: " + sk.isValid());
-                }
-                outgoingBuffers.add(bb);
-                if (sk.isValid()) {
-                    sk.interestOps(sk.interestOps() | SelectionKey.OP_WRITE);
-                }
-            }
-            
-        } catch(Exception e) {
-            LOG.error("Unexpected Exception: ", e);
+    /**
+     * sendBuffer pushes a byte buffer onto the outgoing buffer queue for
+     * asynchronous writes.
+     */
+    public void sendBuffer(ByteBuffer bb) {
+        if (LOG.isTraceEnabled()) {
+            LOG.trace("Add a buffer to outgoingBuffers, sk " + sk
+                      + " is valid: " + sk.isValid());
         }
+        outgoingBuffers.add(bb);
+        requestInterestOpsUpdate();
     }
 
     /** Read the request payload (everything following the length prefix) */
@@ -204,6 +188,123 @@ public class NIOServerCnxn extends Serve
         }
     }
 
+    /**
+     * This boolean tracks whether the connection is ready for selection or
+     * not. A connection is marked as not ready for selection while it is
+     * processing an IO request. The flag is used to gatekeep pushing interest
+     * op updates onto the selector.
+     */
+    private final AtomicBoolean selectable = new AtomicBoolean(true);
+
+    public boolean isSelectable() {
+        return sk.isValid() && selectable.get();
+    }
+
+    public void disableSelectable() {
+        selectable.set(false);
+    }
+
+    public void enableSelectable() {
+        selectable.set(true);
+    }
+
+    private void requestInterestOpsUpdate() {
+        if (isSelectable()) {
+            selectorThread.addInterestOpsUpdateRequest(sk);
+        }
+    }
+
+    void handleWrite(SelectionKey k) throws IOException, CloseRequestException {
+        if (outgoingBuffers.isEmpty()) {
+            return;
+        }
+
+        /*
+         * This is going to reset the buffer position to 0 and the
+         * limit to the size of the buffer, so that we can fill it
+         * with data from the non-direct buffers that we need to
+         * send.
+         */
+        ByteBuffer directBuffer = NIOServerCnxnFactory.getDirectBuffer();
+        if (directBuffer == null) {
+            ByteBuffer[] bufferList = new ByteBuffer[outgoingBuffers.size()];
+            // Use gathered write call. This updates the positions of the
+            // byte buffers to reflect the bytes that were written out.
+            sock.write(outgoingBuffers.toArray(bufferList));
+
+            // Remove the buffers that we have sent
+            ByteBuffer bb;
+            while ((bb = outgoingBuffers.peek()) != null) {
+                if (bb == ServerCnxnFactory.closeConn) {
+                    throw new CloseRequestException("close requested");
+                }
+                if (bb.remaining() > 0) {
+                    break;
+                }
+                packetSent();
+                outgoingBuffers.remove();
+            }
+         } else {
+            directBuffer.clear();
+
+            for (ByteBuffer b : outgoingBuffers) {
+                if (directBuffer.remaining() < b.remaining()) {
+                    /*
+                     * When we call put later, if the directBuffer is to
+                     * small to hold everything, nothing will be copied,
+                     * so we've got to slice the buffer if it's too big.
+                     */
+                    b = (ByteBuffer) b.slice().limit(
+                        directBuffer.remaining());
+                }
+                /*
+                 * put() is going to modify the positions of both
+                 * buffers, put we don't want to change the position of
+                 * the source buffers (we'll do that after the send, if
+                 * needed), so we save and reset the position after the
+                 * copy
+                 */
+                int p = b.position();
+                directBuffer.put(b);
+                b.position(p);
+                if (directBuffer.remaining() == 0) {
+                    break;
+                }
+            }
+            /*
+             * Do the flip: limit becomes position, position gets set to
+             * 0. This sets us up for the write.
+             */
+            directBuffer.flip();
+
+            int sent = sock.write(directBuffer);
+
+            ByteBuffer bb;
+
+            // Remove the buffers that we have sent
+            while ((bb = outgoingBuffers.peek()) != null) {
+                if (bb == ServerCnxnFactory.closeConn) {
+                    throw new CloseRequestException("close requested");
+                }
+                if (sent < bb.remaining()) {
+                    /*
+                     * We only partially sent this buffer, so we update
+                     * the position and exit the loop.
+                     */
+                    bb.position(bb.position() + sent);
+                    break;
+                }
+                packetSent();
+                /* We've sent the whole buffer, so drop the buffer */
+                sent -= bb.remaining();
+                outgoingBuffers.remove();
+            }
+        }
+    }
+
+    /**
+     * Handles read/write IO on connection.
+     */
     void doIO(SelectionKey k) throws InterruptedException {
         try {
             if (sock == null) {
@@ -241,101 +342,15 @@ public class NIOServerCnxn extends Serve
                 }
             }
             if (k.isWritable()) {
-                // ZooLog.logTraceMessage(LOG,
-                // ZooLog.CLIENT_DATA_PACKET_TRACE_MASK
-                // "outgoingBuffers.size() = " +
-                // outgoingBuffers.size());
-                if (outgoingBuffers.size() > 0) {
-                    // ZooLog.logTraceMessage(LOG,
-                    // ZooLog.CLIENT_DATA_PACKET_TRACE_MASK,
-                    // "sk " + k + " is valid: " +
-                    // k.isValid());
-
-                    /*
-                     * This is going to reset the buffer position to 0 and the
-                     * limit to the size of the buffer, so that we can fill it
-                     * with data from the non-direct buffers that we need to
-                     * send.
-                     */
-                    ByteBuffer directBuffer = factory.directBuffer;
-                    directBuffer.clear();
+                handleWrite(k);
 
-                    for (ByteBuffer b : outgoingBuffers) {
-                        if (directBuffer.remaining() < b.remaining()) {
-                            /*
-                             * When we call put later, if the directBuffer is to
-                             * small to hold everything, nothing will be copied,
-                             * so we've got to slice the buffer if it's too big.
-                             */
-                            b = (ByteBuffer) b.slice().limit(
-                                    directBuffer.remaining());
-                        }
-                        /*
-                         * put() is going to modify the positions of both
-                         * buffers, put we don't want to change the position of
-                         * the source buffers (we'll do that after the send, if
-                         * needed), so we save and reset the position after the
-                         * copy
-                         */
-                        int p = b.position();
-                        directBuffer.put(b);
-                        b.position(p);
-                        if (directBuffer.remaining() == 0) {
-                            break;
-                        }
-                    }
-                    /*
-                     * Do the flip: limit becomes position, position gets set to
-                     * 0. This sets us up for the write.
-                     */
-                    directBuffer.flip();
-
-                    int sent = sock.write(directBuffer);
-                    ByteBuffer bb;
-
-                    // Remove the buffers that we have sent
-                    while (outgoingBuffers.size() > 0) {
-                        bb = outgoingBuffers.peek();
-                        if (bb == ServerCnxnFactory.closeConn) {
-                            throw new CloseRequestException("close requested");
-                        }
-                        int left = bb.remaining() - sent;
-                        if (left > 0) {
-                            /*
-                             * We only partially sent this buffer, so we update
-                             * the position and exit the loop.
-                             */
-                            bb.position(bb.position() + sent);
-                            break;
-                        }
-                        packetSent();
-                        /* We've sent the whole buffer, so drop the buffer */
-                        sent -= bb.remaining();
-                        outgoingBuffers.remove();
-                    }
-                    // ZooLog.logTraceMessage(LOG,
-                    // ZooLog.CLIENT_DATA_PACKET_TRACE_MASK, "after send,
-                    // outgoingBuffers.size() = " + outgoingBuffers.size());
-                }
-
-                synchronized(this.factory){
-                    if (outgoingBuffers.size() == 0) {
-                        if (!initialized
-                                && (sk.interestOps() & SelectionKey.OP_READ) == 0) {
-                            throw new CloseRequestException("responded to info probe");
-                        }
-                        sk.interestOps(sk.interestOps()
-                                & (~SelectionKey.OP_WRITE));
-                    } else {
-                        sk.interestOps(sk.interestOps()
-                                | SelectionKey.OP_WRITE);
-                    }
+                if (!initialized && !getReadInterest() && !getWriteInterest()) {
+                    throw new CloseRequestException("responded to info probe");
                 }
             }
         } catch (CancelledKeyException e) {
-            LOG.warn("Exception causing close of session 0x"
-                    + Long.toHexString(sessionId)
-                    + " due to " + e);
+            LOG.warn("CancelledKeyException causing close of session 0x"
+                     + Long.toHexString(sessionId));
             if (LOG.isDebugEnabled()) {
                 LOG.debug("CancelledKeyException stack trace", e);
             }
@@ -344,14 +359,12 @@ public class NIOServerCnxn extends Serve
             // expecting close to log session closure
             close();
         } catch (EndOfStreamException e) {
-            LOG.warn("caught end of stream exception",e); // tell user why
-
+            LOG.warn(e.getMessage());
             // expecting close to log session closure
             close();
         } catch (IOException e) {
             LOG.warn("Exception causing close of session 0x"
-                    + Long.toHexString(sessionId)
-                    + " due to " + e);
+                     + Long.toHexString(sessionId) + ": " + e.getMessage());
             if (LOG.isDebugEnabled()) {
                 LOG.debug("IOException stack trace", e);
             }
@@ -362,42 +375,50 @@ public class NIOServerCnxn extends Serve
     private void readRequest() throws IOException {
         zkServer.processPacket(this, incomingBuffer);
     }
-    
+
+    // Only called as callback from zkServer.processPacket()
     protected void incrOutstandingRequests(RequestHeader h) {
         if (h.getXid() >= 0) {
-            synchronized (this) {
-                outstandingRequests++;
-            }
-            synchronized (this.factory) {        
-                // check throttling
-                if (zkServer.getInProcess() > outstandingLimit) {
-                    if (LOG.isDebugEnabled()) {
-                        LOG.debug("Throttling recv " + zkServer.getInProcess());
-                    }
-                    disableRecv();
-                    // following lines should not be needed since we are
-                    // already reading
-                    // } else {
-                    // enableRecv();
+            outstandingRequests.incrementAndGet();
+            // check throttling
+            int inProcess = zkServer.getInProcess();
+            if (inProcess > outstandingLimit) {
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("Throttling recv " + inProcess);
                 }
+                disableRecv();
             }
         }
+    }
 
+    // returns whether we are interested in writing, which is determined
+    // by whether we have any pending buffers on the output queue or not
+    private boolean getWriteInterest() {
+        return !outgoingBuffers.isEmpty();
     }
 
+    // returns whether we are interested in taking new requests, which is
+    // determined by whether we are currently throttled or not
+    private boolean getReadInterest() {
+        return !throttled.get();
+    }
+
+    private final AtomicBoolean throttled = new AtomicBoolean(false);
+
+    // Throttle acceptance of new requests. If this entailed a state change,
+    // register an interest op update request with the selector.
     public void disableRecv() {
-        sk.interestOps(sk.interestOps() & (~SelectionKey.OP_READ));
+        if (throttled.compareAndSet(false, true)) {
+            requestInterestOpsUpdate();
+        }
     }
 
+    // Disable throttling and resume acceptance of new requests. If this
+    // entailed a state change, register an interest op update request with
+    // the selector.
     public void enableRecv() {
-        synchronized (this.factory) {
-            sk.selector().wakeup();
-            if (sk.isValid()) {
-                int interest = sk.interestOps();
-                if ((interest & SelectionKey.OP_READ) == 0) {
-                    sk.interestOps(interest | SelectionKey.OP_READ);
-                }
-            }
+        if (throttled.compareAndSet(true, false)) {
+            requestInterestOpsUpdate();
         }
     }
 
@@ -412,7 +433,7 @@ public class NIOServerCnxn extends Serve
     /**
      * clean up the socket related to a command and also make sure we flush the
      * data before we do that
-     * 
+     *
      * @param pwriter
      *            the pwriter for a command socket
      */
@@ -432,7 +453,7 @@ public class NIOServerCnxn extends Serve
             }
         }
     }
-    
+
     /**
      * This class wraps the sendBuffer method of NIOServerCnxn. It is
      * responsible for chunking up the response to a client. Rather
@@ -441,7 +462,7 @@ public class NIOServerCnxn extends Serve
      */
     private class SendBufferWriter extends Writer {
         private StringBuffer sb = new StringBuffer();
-        
+
         /**
          * Check if we are ready to send another chunk.
          * @param force force sending, even if not a full chunk
@@ -475,20 +496,24 @@ public class NIOServerCnxn extends Serve
 
     private static final String ZK_NOT_SERVING =
         "This ZooKeeper instance is not currently serving requests";
-    
+
     /**
      * Set of threads for commmand ports. All the 4
      * letter commands are run via a thread. Each class
      * maps to a corresponding 4 letter command. CommandThread
      * is the abstract class from which all the others inherit.
      */
-    private abstract class CommandThread extends Thread {
+    private abstract class CommandThread {
         PrintWriter pw;
-        
+
         CommandThread(PrintWriter pw) {
             this.pw = pw;
         }
-        
+
+        public void start() {
+            run();
+        }
+
         public void run() {
             try {
                 commandRun();
@@ -498,52 +523,52 @@ public class NIOServerCnxn extends Serve
                 cleanupWriterSocket(pw);
             }
         }
-        
+
         public abstract void commandRun() throws IOException;
     }
-    
+
     private class RuokCommand extends CommandThread {
         public RuokCommand(PrintWriter pw) {
             super(pw);
         }
-        
+
         @Override
         public void commandRun() {
             pw.print("imok");
-            
+
         }
     }
-    
+
     private class TraceMaskCommand extends CommandThread {
         TraceMaskCommand(PrintWriter pw) {
             super(pw);
         }
-        
+
         @Override
         public void commandRun() {
             long traceMask = ZooTrace.getTextTraceLevel();
             pw.print(traceMask);
         }
     }
-    
+
     private class SetTraceMaskCommand extends CommandThread {
         long trace = 0;
         SetTraceMaskCommand(PrintWriter pw, long trace) {
             super(pw);
             this.trace = trace;
         }
-        
+
         @Override
         public void commandRun() {
             pw.print(trace);
         }
     }
-    
+
     private class EnvCommand extends CommandThread {
         EnvCommand(PrintWriter pw) {
             super(pw);
         }
-        
+
         @Override
         public void commandRun() {
             List<Environment.Entry> env = Environment.list();
@@ -554,15 +579,15 @@ public class NIOServerCnxn extends Serve
                 pw.print("=");
                 pw.println(e.getValue());
             }
-            
-        } 
+
+        }
     }
-    
+
     private class ConfCommand extends CommandThread {
         ConfCommand(PrintWriter pw) {
             super(pw);
         }
-            
+
         @Override
         public void commandRun() {
             if (zkServer == null) {
@@ -572,38 +597,36 @@ public class NIOServerCnxn extends Serve
             }
         }
     }
-    
+
     private class StatResetCommand extends CommandThread {
         public StatResetCommand(PrintWriter pw) {
             super(pw);
         }
-        
+
         @Override
         public void commandRun() {
             if (zkServer == null) {
                 pw.println(ZK_NOT_SERVING);
             }
-            else { 
+            else {
                 zkServer.serverStats().reset();
                 pw.println("Server stats reset.");
             }
         }
     }
-    
+
     private class CnxnStatResetCommand extends CommandThread {
         public CnxnStatResetCommand(PrintWriter pw) {
             super(pw);
         }
-        
+
         @Override
         public void commandRun() {
             if (zkServer == null) {
                 pw.println(ZK_NOT_SERVING);
             } else {
-                synchronized(factory.cnxns){
-                    for(ServerCnxn c : factory.cnxns){
-                        c.resetStats();
-                    }
+                for(ServerCnxn c : factory.cnxns){
+                    c.resetStats();
                 }
                 pw.println("Connection stats reset.");
             }
@@ -614,7 +637,7 @@ public class NIOServerCnxn extends Serve
         public DumpCommand(PrintWriter pw) {
             super(pw);
         }
-        
+
         @Override
         public void commandRun() {
             if (zkServer == null) {
@@ -625,24 +648,26 @@ public class NIOServerCnxn extends Serve
                 zkServer.sessionTracker.dumpSessions(pw);
                 pw.println("ephemeral nodes dump:");
                 zkServer.dumpEphemerals(pw);
+                pw.println("Connections dump:");
+                factory.dumpConnections(pw);
             }
         }
     }
-    
+
     private class StatCommand extends CommandThread {
         int len;
         public StatCommand(PrintWriter pw, int len) {
             super(pw);
             this.len = len;
         }
-        
+
         @SuppressWarnings("unchecked")
         @Override
         public void commandRun() {
             if (zkServer == null) {
                 pw.println(ZK_NOT_SERVING);
             }
-            else {   
+            else {
                 pw.print("Zookeeper version: ");
                 pw.println(Version.getFullVersion());
                 if (zkServer instanceof ReadOnlyZooKeeperServer) {
@@ -652,14 +677,7 @@ public class NIOServerCnxn extends Serve
                 if (len == statCmd) {
                     LOG.info("Stat command output");
                     pw.println("Clients:");
-                    // clone should be faster than iteration
-                    // ie give up the cnxns lock faster
-                    HashSet<NIOServerCnxn> cnxnset;
-                    synchronized(factory.cnxns){
-                        cnxnset = (HashSet<NIOServerCnxn>)factory
-                        .cnxns.clone();
-                    }
-                    for(NIOServerCnxn c : cnxnset){
+                    for(ServerCnxn c : factory.cnxns){
                         c.dumpConnectionInfo(pw, true);
                         pw.println();
                     }
@@ -669,28 +687,22 @@ public class NIOServerCnxn extends Serve
                 pw.print("Node count: ");
                 pw.println(zkServer.getZKDatabase().getNodeCount());
             }
-            
+
         }
     }
-    
+
     private class ConsCommand extends CommandThread {
         public ConsCommand(PrintWriter pw) {
             super(pw);
         }
-        
+
         @SuppressWarnings("unchecked")
         @Override
         public void commandRun() {
             if (zkServer == null) {
                 pw.println(ZK_NOT_SERVING);
             } else {
-                // clone should be faster than iteration
-                // ie give up the cnxns lock faster
-                HashSet<NIOServerCnxn> cnxns;
-                synchronized (factory.cnxns) {
-                    cnxns = (HashSet<NIOServerCnxn>) factory.cnxns.clone();
-                }
-                for (NIOServerCnxn c : cnxns) {
+                for (ServerCnxn c : factory.cnxns) {
                     c.dumpConnectionInfo(pw, false);
                     pw.println();
                 }
@@ -698,7 +710,7 @@ public class NIOServerCnxn extends Serve
             }
         }
     }
-    
+
     private class WatchCommand extends CommandThread {
         int len = 0;
         public WatchCommand(PrintWriter pw, int len) {
@@ -821,7 +833,7 @@ public class NIOServerCnxn extends Serve
         /** cancel the selection key to remove the socket handling
          * from selector. This is to prevent netcat problem wherein
          * netcat immediately closes the sending side after sending the
-         * commands and still keeps the receiving channel open. 
+         * commands and still keeps the receiving channel open.
          * The idea is to remove the selectionkey from the selector
          * so that the selector does not notice the closed read on the
          * socket channel and keep the socket alive to write the data to
@@ -897,6 +909,9 @@ public class NIOServerCnxn extends Serve
             IsroCommand isro = new IsroCommand(pwriter);
             isro.start();
             return true;
+        } else if (len == telnetCloseCmd) {
+            cleanupWriterSocket(null);
+            return true;
         }
         return false;
     }
@@ -925,11 +940,7 @@ public class NIOServerCnxn extends Serve
     }
 
     public long getOutstandingRequests() {
-        synchronized (this) {
-            synchronized (this.factory) {
-                return outstandingRequests;
-            }
-        }
+        return outstandingRequests.get();
     }
 
     /*
@@ -941,53 +952,45 @@ public class NIOServerCnxn extends Serve
         return sessionTimeout;
     }
 
+    /**
+     * Used by "dump" 4-letter command to list all connection in
+     * cnxnExpiryMap
+     */
     @Override
     public String toString() {
-        return "NIOServerCnxn object with sock = " + sock + " and sk = " + sk;
+        return "ip: " + sock.socket().getRemoteSocketAddress() +
+               " sessionId: 0x" + Long.toHexString(sessionId);
     }
 
-    /*
+    /**
      * Close the cnxn and remove it from the factory cnxns list.
-     * 
-     * This function returns immediately if the cnxn is not on the cnxns list.
      */
     @Override
     public void close() {
-        synchronized(factory.cnxns){
-            // if this is not in cnxns then it's already closed
-            if (!factory.cnxns.remove(this)) {
-                return;
-            }
+        if (!factory.removeCnxn(this)) {
+            return;
+        }
 
-            synchronized (factory.ipMap) {
-                Set<NIOServerCnxn> s =
-                    factory.ipMap.get(sock.socket().getInetAddress());
-                s.remove(this);
-            }
-
-            factory.unregisterConnection(this);
-
-            if (zkServer != null) {
-                zkServer.removeCnxn(this);
-            }
-    
-            closeSock();
-    
-            if (sk != null) {
-                try {
-                    // need to cancel this selection key from the selector
-                    sk.cancel();
-                } catch (Exception e) {
-                    if (LOG.isDebugEnabled()) {
-                        LOG.debug("ignoring exception during selectionkey cancel", e);
-                    }
+        if (zkServer != null) {
+            zkServer.removeCnxn(this);
+        }
+
+        if (sk != null) {
+            try {
+                // need to cancel this selection key from the selector
+                sk.cancel();
+            } catch (Exception e) {
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("ignoring exception during selectionkey cancel", e);
                 }
             }
         }
+
+        closeSock();
     }
 
     /**
-     * Close resources associated with the sock of this cnxn. 
+     * Close resources associated with the sock of this cnxn.
      */
     private void closeSock() {
         if (sock == null) {
@@ -999,6 +1002,18 @@ public class NIOServerCnxn extends Serve
                 + (sessionId != 0 ?
                         " which had sessionid 0x" + Long.toHexString(sessionId) :
                         " (no session established for client)"));
+        closeSock(sock);
+        sock = null;
+    }
+
+    /**
+     * Close resources associated with a sock.
+     */
+    public static void closeSock(SocketChannel sock) {
+        if (sock == null) {
+            return;
+        }
+
         try {
             /*
              * The following sequence of code is stupid! You would think that
@@ -1030,18 +1045,13 @@ public class NIOServerCnxn extends Serve
         }
         try {
             sock.close();
-            // XXX The next line doesn't seem to be needed, but some posts
-            // to forums suggest that it is needed. Keep in mind if errors in
-            // this section arise.
-            // factory.selector.wakeup();
         } catch (IOException e) {
             if (LOG.isDebugEnabled()) {
                 LOG.debug("ignoring exception during socketchannel close", e);
             }
         }
-        sock = null;
     }
-    
+
     private final static byte fourBytes[] = new byte[4];
 
     /*
@@ -1051,7 +1061,7 @@ public class NIOServerCnxn extends Serve
      *      org.apache.jute.Record, java.lang.String)
      */
     @Override
-    synchronized public void sendResponse(ReplyHeader h, Record r, String tag) {
+    public void sendResponse(ReplyHeader h, Record r, String tag) {
         try {
             ByteArrayOutputStream baos = new ByteArrayOutputStream();
             // Make space for length
@@ -1071,16 +1081,10 @@ public class NIOServerCnxn extends Serve
             bb.putInt(b.length - 4).rewind();
             sendBuffer(bb);
             if (h.getXid() > 0) {
-                synchronized(this){
-                    outstandingRequests--;
-                }
                 // check throttling
-                synchronized (this.factory) {        
-                    if (zkServer.getInProcess() < outstandingLimit
-                            || outstandingRequests < 1) {
-                        sk.selector().wakeup();
-                        enableRecv();
-                    }
+                if (outstandingRequests.decrementAndGet() < 1 ||
+                    zkServer.getInProcess() < outstandingLimit) {
+                    enableRecv();
                 }
             }
          } catch(Exception e) {
@@ -1094,7 +1098,7 @@ public class NIOServerCnxn extends Serve
      * @see org.apache.zookeeper.server.ServerCnxnIface#process(org.apache.zookeeper.proto.WatcherEvent)
      */
     @Override
-    synchronized public void process(WatchedEvent event) {
+    public void process(WatchedEvent event) {
         ReplyHeader h = new ReplyHeader(-1, -1L, 0);
         if (LOG.isTraceEnabled()) {
             ZooTrace.logTraceMessage(LOG, ZooTrace.EVENT_DELIVERY_TRACE_MASK,
@@ -1122,16 +1126,28 @@ public class NIOServerCnxn extends Serve
     @Override
     public void setSessionId(long sessionId) {
         this.sessionId = sessionId;
+        factory.addSession(sessionId, this);
     }
 
     @Override
     public void setSessionTimeout(int sessionTimeout) {
         this.sessionTimeout = sessionTimeout;
+        factory.touchCnxn(this);
     }
 
     @Override
     public int getInterestOps() {
-        return sk.isValid() ? sk.interestOps() : 0;
+        if (!isSelectable()) {
+            return 0;
+        }
+        int interestOps = 0;
+        if (getReadInterest()) {
+            interestOps |= SelectionKey.OP_READ;
+        }
+        if (getWriteInterest()) {
+            interestOps |= SelectionKey.OP_WRITE;
+        }
+        return interestOps;
     }
 
     @Override
@@ -1142,6 +1158,13 @@ public class NIOServerCnxn extends Serve
         return (InetSocketAddress) sock.socket().getRemoteSocketAddress();
     }
 
+    public InetAddress getSocketAddress() {
+        if (sock == null) {
+            return null;
+        }
+        return sock.socket().getInetAddress();
+    }
+
     @Override
     protected ServerStats serverStats() {
         if (zkServer == null) {

Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/NIOServerCnxnFactory.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/NIOServerCnxnFactory.java?rev=1423990&r1=1423989&r2=1423990&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/NIOServerCnxnFactory.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/NIOServerCnxnFactory.java Wed Dec 19 18:07:14 2012
@@ -19,25 +19,73 @@
 package org.apache.zookeeper.server;
 
 import java.io.IOException;
+import java.io.PrintWriter;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
+import java.net.SocketException;
 import java.nio.ByteBuffer;
 import java.nio.channels.SelectionKey;
 import java.nio.channels.Selector;
 import java.nio.channels.ServerSocketChannel;
 import java.nio.channels.SocketChannel;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Queue;
 import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.LinkedBlockingQueue;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class NIOServerCnxnFactory extends ServerCnxnFactory implements Runnable {
+/**
+ * NIOServerCnxnFactory implements a multi-threaded ServerCnxnFactory using
+ * NIO non-blocking socket calls. Communication between threads is handled via
+ * queues.
+ *
+ *   - 1   accept thread, which accepts new connections and assigns to a
+ *         selector thread
+ *   - 1-N selector threads, each of which selects on 1/N of the connections.
+ *         The reason the factory supports more than one selector thread is that
+ *         with large numbers of connections, select() itself can become a
+ *         performance bottleneck.
+ *   - 0-M socket I/O worker threads, which perform basic socket reads and
+ *         writes. If configured with 0 worker threads, the selector threads
+ *         do the socket I/O directly.
+ *   - 1   connection expiration thread, which closes idle connections; this is
+ *         necessary to expire connections on which no session is established.
+ *
+ * Typical (default) thread counts are: on a 32 core machine, 1 accept thread,
+ * 1 connection expiration thread, 4 selector threads, and 64 worker threads.
+ */
+public class NIOServerCnxnFactory extends ServerCnxnFactory {
     private static final Logger LOG = LoggerFactory.getLogger(NIOServerCnxnFactory.class);
 
+    /** Default sessionless connection timeout in ms: 10000 (10s) */
+    public static final String ZOOKEEPER_NIO_SESSIONLESS_CNXN_TIMEOUT =
+        "zookeeper.nio.sessionlessCnxnTimeout";
+    /**
+     * With 500 connections to an observer with watchers firing on each, is
+     * unable to exceed 1GigE rates with only 1 selector.
+     * Defaults to using 2 selector threads with 8 cores and 4 with 32 cores.
+     * Expressed as sqrt(numCores/2). Must have at least 1 selector thread.
+     */
+    public static final String ZOOKEEPER_NIO_NUM_SELECTOR_THREADS =
+        "zookeeper.nio.numSelectorThreads";
+    /** Default: 2 * numCores */
+    public static final String ZOOKEEPER_NIO_NUM_WORKER_THREADS =
+        "zookeeper.nio.numWorkerThreads";
+    /** Default: 64kB */
+    public static final String ZOOKEEPER_NIO_DIRECT_BUFFER_BYTES =
+        "zookeeper.nio.directBufferBytes";
+    /** Default worker pool shutdown timeout in ms: 5000 (5s) */
+    public static final String ZOOKEEPER_NIO_SHUTDOWN_TIMEOUT =
+        "zookeeper.nio.shutdownTimeout";
+
     static {
         Thread.setDefaultUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
                 public void uncaughtException(Thread t, Throwable e) {
@@ -54,47 +102,565 @@ public class NIOServerCnxnFactory extend
         } catch(IOException ie) {
             LOG.error("Selector failed to open", ie);
         }
+
+        /**
+         * Value of 0 disables use of direct buffers and instead uses
+         * gathered write call.
+         *
+         * Default to using 64k direct buffers.
+         */
+        directBufferBytes = Integer.getInteger(
+            ZOOKEEPER_NIO_DIRECT_BUFFER_BYTES, 64 * 1024);
     }
 
-    ServerSocketChannel ss;
+    /**
+     * AbstractSelectThread is an abstract base class containing a few bits
+     * of code shared by the AcceptThread (which selects on the listen socket)
+     * and SelectorThread (which selects on client connections) classes.
+     */
+    private abstract class AbstractSelectThread extends Thread {
+        protected final Selector selector;
+
+        public AbstractSelectThread(String name) throws IOException {
+            super(name);
+            // Allows the JVM to shutdown even if this thread is still running.
+            setDaemon(true);
+            this.selector = Selector.open();
+        }
+
+        public void wakeupSelector() {
+            selector.wakeup();
+        }
+
+        protected void cleanupSelectionKey(SelectionKey key) {
+            if (key != null) {
+                try {
+                    key.cancel();
+                } catch (Exception ex) {
+                    if (LOG.isDebugEnabled()) {
+                        LOG.debug("ignoring exception during selectionkey cancel", ex);
+                    }
+                }
+            }
+        }
 
-    final Selector selector = Selector.open();
+        protected void fastCloseSock(SocketChannel sc) {
+            if (sc != null) {
+                try {
+                    // Hard close immediately, discarding buffers
+                    sc.socket().setSoLinger(true, 0);
+                } catch (SocketException e) {
+                    LOG.warn("Unable to set socket linger to 0, socket close"
+                             + " may stall in CLOSE_WAIT", e);
+                }
+                NIOServerCnxn.closeSock(sc);
+            }
+        }
+    }
 
     /**
-     * We use this buffer to do efficient socket I/O. Since there is a single
-     * sender thread per NIOServerCnxn instance, we can use a member variable to
-     * only allocate it once.
-    */
-    final ByteBuffer directBuffer = ByteBuffer.allocateDirect(64 * 1024);
+     * There is a single AcceptThread which accepts new connections and assigns
+     * them to a SelectorThread using a simple round-robin scheme to spread
+     * them across the SelectorThreads. It enforces maximum number of
+     * connections per IP and attempts to cope with running out of file
+     * descriptors by briefly sleeping before retrying.
+     */
+    private class AcceptThread extends AbstractSelectThread {
+        private final ServerSocketChannel acceptSocket;
+        private final SelectionKey acceptKey;
+        private final RateLogger acceptErrorLogger = new RateLogger(LOG);
+        private final Collection<SelectorThread> selectorThreads;
+        private Iterator<SelectorThread> selectorIterator;
+
+        public AcceptThread(ServerSocketChannel ss, InetSocketAddress addr,
+                Set<SelectorThread> selectorThreads) throws IOException {
+            super("NIOServerCxnFactory.AcceptThread:" + addr);
+            this.acceptSocket = ss;
+            this.acceptKey =
+                acceptSocket.register(selector, SelectionKey.OP_ACCEPT);
+            this.selectorThreads = Collections.unmodifiableList(
+                new ArrayList<SelectorThread>(selectorThreads));
+            selectorIterator = this.selectorThreads.iterator();
+        }
 
-    final HashMap<InetAddress, Set<NIOServerCnxn>> ipMap =
-        new HashMap<InetAddress, Set<NIOServerCnxn>>( );
+        public void run() {
+            try {
+                while (!stopped && !acceptSocket.socket().isClosed()) {
+                    try {
+                        select();
+                    } catch (RuntimeException e) {
+                        LOG.warn("Ignoring unexpected runtime exception", e);
+                    } catch (Exception e) {
+                        LOG.warn("Ignoring unexpected exception", e);
+                    }
+                }
+            } finally {
+                // This will wake up the selector threads, and tell the
+                // worker thread pool to begin shutdown.
+                NIOServerCnxnFactory.this.stop();
+                LOG.info("accept thread exitted run method");
+            }
+        }
+
+        private void select() {
+            try {
+                selector.select();
 
-    int maxClientCnxns = 60;
+                Iterator<SelectionKey> selectedKeys =
+                    selector.selectedKeys().iterator();
+                while (!stopped && selectedKeys.hasNext()) {
+                    SelectionKey key = selectedKeys.next();
+                    selectedKeys.remove();
+
+                    if (!key.isValid()) {
+                        continue;
+                    }
+                    if (key.isAcceptable()) {
+                        if (!doAccept()) {
+                            // If unable to pull a new connection off the accept
+                            // queue, pause accepting to give us time to free
+                            // up file descriptors and so the accept thread
+                            // doesn't spin in a tight loop.
+                            pauseAccept(10);
+                        }
+                    } else {
+                        LOG.warn("Unexpected ops in accept select "
+                                 + key.readyOps());
+                    }
+                }
+            } catch (IOException e) {
+                LOG.warn("Ignoring IOException while selecting", e);
+            }
+        }
+
+        /**
+         * Mask off the listen socket interest ops and use select() to sleep
+         * so that other threads can wake us up by calling wakeup() on the
+         * selector.
+         */
+        private void pauseAccept(long millisecs) {
+            acceptKey.interestOps(0);
+            try {
+                selector.select(millisecs);
+            } catch (IOException e) {
+                // ignore
+            } finally {
+                acceptKey.interestOps(SelectionKey.OP_ACCEPT);
+            }
+        }
+
+        /**
+         * Accept new socket connections. Enforces maximum number of connections
+         * per client IP address. Round-robin assigns to selector thread for
+         * handling. Returns whether pulled a connection off the accept queue
+         * or not. If encounters an error attempts to fast close the socket.
+         *
+         * @return whether was able to accept a connection or not
+         */
+        private boolean doAccept() {
+            boolean accepted = false;
+            SocketChannel sc = null;
+            try {
+                sc = acceptSocket.accept();
+                accepted = true;
+                InetAddress ia = sc.socket().getInetAddress();
+                int cnxncount = getClientCnxnCount(ia);
+
+                if (maxClientCnxns > 0 && cnxncount >= maxClientCnxns){
+                    throw new IOException("Too many connections from " + ia
+                                          + " - max is " + maxClientCnxns );
+                }
+
+                LOG.info("Accepted socket connection from "
+                         + sc.socket().getRemoteSocketAddress());
+                sc.configureBlocking(false);
+
+                // Round-robin assign this connection to a selector thread
+                if (!selectorIterator.hasNext()) {
+                    selectorIterator = selectorThreads.iterator();
+                }
+                SelectorThread selectorThread = selectorIterator.next();
+                if (!selectorThread.addAcceptedConnection(sc)) {
+                    throw new IOException(
+                        "Unable to add connection to selector queue"
+                        + (stopped ? " (shutdown in progress)" : ""));
+                }
+                acceptErrorLogger.flush();
+            } catch (IOException e) {
+                // accept, maxClientCnxns, configureBlocking
+                acceptErrorLogger.rateLimitLog(
+                    "Error accepting new connection: " + e.getMessage());
+                fastCloseSock(sc);
+            }
+            return accepted;
+        }
+    }
+
+    /**
+     * The SelectorThread receives newly accepted connections from the
+     * AcceptThread and is responsible for selecting for I/O readiness
+     * across the connections. This thread is the only thread that performs
+     * any non-threadsafe or potentially blocking calls on the selector
+     * (registering new connections and reading/writing interest ops).
+     *
+     * Assignment of a connection to a SelectorThread is permanent and only
+     * one SelectorThread will ever interact with the connection. There are
+     * 1-N SelectorThreads, with connections evenly apportioned between the
+     * SelectorThreads.
+     *
+     * If there is a worker thread pool, when a connection has I/O to perform
+     * the SelectorThread removes it from selection by clearing its interest
+     * ops and schedules the I/O for processing by a worker thread. When the
+     * work is complete, the connection is placed on the ready queue to have
+     * its interest ops restored and resume selection.
+     *
+     * If there is no worker thread pool, the SelectorThread performs the I/O
+     * directly.
+     */
+    class SelectorThread extends AbstractSelectThread {
+        private final int id;
+        private final Queue<SocketChannel> acceptedQueue;
+        private final Queue<SelectionKey> updateQueue;
+
+        public SelectorThread(int id) throws IOException {
+            super("NIOServerCxnFactory.SelectorThread-" + id);
+            this.id = id;
+            acceptedQueue = new LinkedBlockingQueue<SocketChannel>();
+            updateQueue = new LinkedBlockingQueue<SelectionKey>();
+        }
+
+        /**
+         * Place new accepted connection onto a queue for adding. Do this
+         * so only the selector thread modifies what keys are registered
+         * with the selector.
+         */
+        public boolean addAcceptedConnection(SocketChannel accepted) {
+            if (stopped || !acceptedQueue.offer(accepted)) {
+                return false;
+            }
+            wakeupSelector();
+            return true;
+        }
+
+        /**
+         * Place interest op update requests onto a queue so that only the
+         * selector thread modifies interest ops, because interest ops
+         * reads/sets are potentially blocking operations if other select
+         * operations are happening.
+         */
+        public boolean addInterestOpsUpdateRequest(SelectionKey sk) {
+            if (stopped || !updateQueue.offer(sk)) {
+                return false;
+            }
+            wakeupSelector();
+            return true;
+        }
+
+        /**
+         * The main loop for the thread selects() on the connections and
+         * dispatches ready I/O work requests, then registers all pending
+         * newly accepted connections and updates any interest ops on the
+         * queue.
+         */
+        public void run() {
+            try {
+                while (!stopped) {
+                    try {
+                        select();
+                        processAcceptedConnections();
+                        processInterestOpsUpdateRequests();
+                    } catch (RuntimeException e) {
+                        LOG.warn("Ignoring unexpected runtime exception", e);
+                    } catch (Exception e) {
+                        LOG.warn("Ignoring unexpected exception", e);
+                    }
+                }
+
+                // Close connections still pending on the selector. Any others
+                // with in-flight work, let drain out of the work queue.
+                for (SelectionKey key : selector.keys()) {
+                    NIOServerCnxn cnxn = (NIOServerCnxn) key.attachment();
+                    if (cnxn.isSelectable()) {
+                        cnxn.close();
+                    }
+                    cleanupSelectionKey(key);
+                }
+                SocketChannel accepted;
+                while ((accepted = acceptedQueue.poll()) != null) {
+                    fastCloseSock(accepted);
+                }
+                updateQueue.clear();
+            } finally {
+                // This will wake up the accept thread and the other selector
+                // threads, and tell the worker thread pool to begin shutdown.
+                NIOServerCnxnFactory.this.stop();
+                LOG.info("selector thread exitted run method");
+            }
+        }
+
+        private void select() {
+            try {
+                selector.select();
+
+                Set<SelectionKey> selected = selector.selectedKeys();
+                ArrayList<SelectionKey> selectedList =
+                    new ArrayList<SelectionKey>(selected);
+                Collections.shuffle(selectedList);
+                Iterator<SelectionKey> selectedKeys = selectedList.iterator();
+                while(!stopped && selectedKeys.hasNext()) {
+                    SelectionKey key = selectedKeys.next();
+                    selected.remove(key);
+
+                    if (!key.isValid()) {
+                        cleanupSelectionKey(key);
+                        continue;
+                    }
+                    if (key.isReadable() || key.isWritable()) {
+                        handleIO(key);
+                    } else {
+                        LOG.warn("Unexpected ops in select " + key.readyOps());
+                    }
+                }
+            } catch (IOException e) {
+                LOG.warn("Ignoring IOException while selecting", e);
+            }
+        }
+
+        /**
+         * Schedule I/O for processing on the connection associated with
+         * the given SelectionKey. If a worker thread pool is not being used,
+         * I/O is run directly by this thread.
+         */
+        private void handleIO(SelectionKey key) {
+            IOWorkRequest workRequest = new IOWorkRequest(this, key);
+            NIOServerCnxn cnxn = (NIOServerCnxn) key.attachment();
+
+            // Stop selecting this key while processing on its
+            // connection
+            cnxn.disableSelectable();
+            key.interestOps(0);
+            touchCnxn(cnxn);
+            workerPool.schedule(workRequest);
+        }
+
+        /**
+         * Iterate over the queue of accepted connections that have been
+         * assigned to this thread but not yet placed on the selector.
+         */
+        private void processAcceptedConnections() {
+            SocketChannel accepted;
+            while (!stopped && (accepted = acceptedQueue.poll()) != null) {
+                SelectionKey key = null;
+                try {
+                    key = accepted.register(selector, SelectionKey.OP_READ);
+                    NIOServerCnxn cnxn = createConnection(accepted, key, this);
+                    key.attach(cnxn);
+                    addCnxn(cnxn);
+                } catch (IOException e) {
+                    // register, createConnection
+                    cleanupSelectionKey(key);
+                    fastCloseSock(accepted);
+                }
+            }
+        }
+
+        /**
+         * Iterate over the queue of connections ready to resume selection,
+         * and restore their interest ops selection mask.
+         */
+        private void processInterestOpsUpdateRequests() {
+            SelectionKey key;
+            while (!stopped && (key = updateQueue.poll()) != null) {
+                if (!key.isValid()) {
+                    cleanupSelectionKey(key);
+                }
+                NIOServerCnxn cnxn = (NIOServerCnxn) key.attachment();
+                if (cnxn.isSelectable()) {
+                    key.interestOps(cnxn.getInterestOps());
+                }
+            }
+        }
+    }
+
+    /**
+     * IOWorkRequest is a small wrapper class to allow doIO() calls to be
+     * run on a connection using a WorkerService.
+     */
+    private class IOWorkRequest extends WorkerService.WorkRequest {
+        private final SelectorThread selectorThread;
+        private final SelectionKey key;
+        private final NIOServerCnxn cnxn;
+
+        IOWorkRequest(SelectorThread selectorThread, SelectionKey key) {
+            this.selectorThread = selectorThread;
+            this.key = key;
+            this.cnxn = (NIOServerCnxn) key.attachment();
+        }
+
+        public void doWork() throws InterruptedException {
+            if (!key.isValid()) {
+                selectorThread.cleanupSelectionKey(key);
+                return;
+            }
+
+            if (key.isReadable() || key.isWritable()) {
+                cnxn.doIO(key);
+
+                // Check if we shutdown or doIO() closed this connection
+                if (stopped) {
+                    cnxn.close();
+                    return;
+                }
+                if (!key.isValid()) {
+                    selectorThread.cleanupSelectionKey(key);
+                    return;
+                }
+                touchCnxn(cnxn);
+            }
+
+            // Mark this connection as once again ready for selection
+            cnxn.enableSelectable();
+            // Push an update request on the queue to resume selecting
+            // on the current set of interest ops, which may have changed
+            // as a result of the I/O operations we just performed.
+            if (!selectorThread.addInterestOpsUpdateRequest(key)) {
+                cnxn.close();
+            }
+        }
+
+        @Override
+        public void cleanup() {
+            cnxn.close();
+        }
+    }
+
+    /**
+     * This thread is responsible for closing stale connections so that
+     * connections on which no session is established are properly expired.
+     */
+    private class ConnectionExpirerThread extends Thread {
+        ConnectionExpirerThread() {
+            super("ConnnectionExpirer");
+        }
+
+        public void run() {
+            try {
+                while (!stopped) {
+                    long waitTime = cnxnExpiryQueue.getWaitTime();
+                    if (waitTime > 0) {
+                        Thread.sleep(waitTime);
+                        continue;
+                    }
+                    for (NIOServerCnxn conn : cnxnExpiryQueue.poll()) {
+                        conn.close();
+                    }
+                }
+
+            } catch (InterruptedException e) {
+                  LOG.info("ConnnectionExpirerThread interrupted");
+            }
+        }
+    }
+
+    ServerSocketChannel ss;
+
+    /**
+     * We use this buffer to do efficient socket I/O. Because I/O is handled
+     * by the worker threads (or the selector threads directly, if no worker
+     * thread pool is created), we can create a fixed set of these to be
+     * shared by connections.
+     */
+    private static final ThreadLocal<ByteBuffer> directBuffer =
+        new ThreadLocal<ByteBuffer>() {
+            @Override protected ByteBuffer initialValue() {
+                return ByteBuffer.allocateDirect(directBufferBytes);
+            }
+        };
+
+    public static ByteBuffer getDirectBuffer() {
+        return directBufferBytes > 0 ? directBuffer.get() : null;
+    }
+
+    // sessionMap is used by closeSession()
+    private final ConcurrentHashMap<Long, NIOServerCnxn> sessionMap =
+        new ConcurrentHashMap<Long, NIOServerCnxn>();
+    // ipMap is used to limit connections per IP
+    private final ConcurrentHashMap<InetAddress, Set<NIOServerCnxn>> ipMap =
+        new ConcurrentHashMap<InetAddress, Set<NIOServerCnxn>>( );
+
+    protected int maxClientCnxns = 60;
+
+    int sessionlessCnxnTimeout;
+    private ExpiryQueue<NIOServerCnxn> cnxnExpiryQueue;
+
+
+    protected WorkerService workerPool;
+
+    private static int directBufferBytes;
+    private int numSelectorThreads;
+    private int numWorkerThreads;
+    private long workerShutdownTimeoutMS;
 
     /**
      * Construct a new server connection factory which will accept an unlimited number
      * of concurrent connections from each client (up to the file descriptor
      * limits of the operating system). startup(zks) must be called subsequently.
-     * @throws IOException
      */
-    public NIOServerCnxnFactory() throws IOException {
+    public NIOServerCnxnFactory() {
     }
 
-    Thread thread;
+    private volatile boolean stopped = true;
+    private ConnectionExpirerThread expirerThread;
+    private AcceptThread acceptThread;
+    private final Set<SelectorThread> selectorThreads =
+        new HashSet<SelectorThread>();
+
     @Override
     public void configure(InetSocketAddress addr, int maxcc) throws IOException {
         configureSaslLogin();
 
-        thread = new Thread(this, "NIOServerCxn.Factory:" + addr);
-        thread.setDaemon(true);
         maxClientCnxns = maxcc;
+        sessionlessCnxnTimeout = Integer.getInteger(
+            ZOOKEEPER_NIO_SESSIONLESS_CNXN_TIMEOUT, 10000);
+        // We also use the sessionlessCnxnTimeout as expiring interval for
+        // cnxnExpiryQueue. These don't need to be the same, but the expiring
+        // interval passed into the ExpiryQueue() constructor below should be
+        // less than or equal to the timeout.
+        cnxnExpiryQueue =
+            new ExpiryQueue<NIOServerCnxn>(sessionlessCnxnTimeout);
+        expirerThread = new ConnectionExpirerThread();
+
+        int numCores = Runtime.getRuntime().availableProcessors();
+        // 32 cores sweet spot seems to be 4 selector threads
+        numSelectorThreads = Integer.getInteger(
+            ZOOKEEPER_NIO_NUM_SELECTOR_THREADS,
+            Math.max((int) Math.sqrt((float) numCores/2), 1));
+        if (numSelectorThreads < 1) {
+            throw new IOException("numSelectorThreads must be at least 1");
+        }
+
+        numWorkerThreads = Integer.getInteger(
+            ZOOKEEPER_NIO_NUM_WORKER_THREADS, 2 * numCores);
+        workerShutdownTimeoutMS = Long.getLong(
+            ZOOKEEPER_NIO_SHUTDOWN_TIMEOUT, 5000);
+
+        LOG.info("Configuring NIO connection handler with "
+                 + (sessionlessCnxnTimeout/1000) + "s sessionless connection"
+                 + " timeout, " + numSelectorThreads + " selector thread(s), "
+                 + (numWorkerThreads > 0 ? numWorkerThreads : "no")
+                 + " worker threads, and "
+                 + (directBufferBytes == 0 ? "gathered writes." :
+                    ("" + (directBufferBytes/1024) + " kB direct buffers.")));
+        for(int i=0; i<numSelectorThreads; ++i) {
+            selectorThreads.add(new SelectorThread(i));
+        }
+
         this.ss = ServerSocketChannel.open();
         ss.socket().setReuseAddress(true);
         LOG.info("binding to port " + addr);
         ss.socket().bind(addr);
         ss.configureBlocking(false);
-        ss.register(selector, SelectionKey.OP_ACCEPT);
+        acceptThread = new AcceptThread(ss, addr, selectorThreads);
     }
 
     /** {@inheritDoc} */
@@ -109,9 +675,22 @@ public class NIOServerCnxnFactory extend
 
     @Override
     public void start() {
+        stopped = false;
+        if (workerPool == null) {
+            workerPool = new WorkerService(
+                "NIOWorker", numWorkerThreads, false);
+        }
+        for(SelectorThread thread : selectorThreads) {
+            if (thread.getState() == Thread.State.NEW) {
+                thread.start();
+            }
+        }
         // ensure thread is started once and only once
-        if (thread.getState() == Thread.State.NEW) {
-            thread.start();
+        if (acceptThread.getState() == Thread.State.NEW) {
+            acceptThread.start();
+        }
+        if (expirerThread.getState() == Thread.State.NEW) {
+            expirerThread.start();
         }
     }
 
@@ -134,94 +713,79 @@ public class NIOServerCnxnFactory extend
         return ss.socket().getLocalPort();
     }
 
-    private void addCnxn(NIOServerCnxn cnxn) {
-        synchronized (cnxns) {
-            cnxns.add(cnxn);
-            synchronized (ipMap){
-                InetAddress addr = cnxn.sock.socket().getInetAddress();
-                Set<NIOServerCnxn> s = ipMap.get(addr);
-                if (s == null) {
-                    // in general we will see 1 connection from each
-                    // host, setting the initial cap to 2 allows us
-                    // to minimize mem usage in the common case
-                    // of 1 entry --  we need to set the initial cap
-                    // to 2 to avoid rehash when the first entry is added
-                    s = new HashSet<NIOServerCnxn>(2);
-                    s.add(cnxn);
-                    ipMap.put(addr,s);
-                } else {
-                    s.add(cnxn);
-                }
+    /**
+     * De-registers the connection from the various mappings maintained
+     * by the factory.
+     */
+    public boolean removeCnxn(NIOServerCnxn cnxn) {
+        // If the connection is not in the master list it's already been closed
+        if (!cnxns.remove(cnxn)) {
+            return false;
+        }
+        cnxnExpiryQueue.remove(cnxn);
+
+        long sessionId = cnxn.getSessionId();
+        if (sessionId != 0) {
+            sessionMap.remove(sessionId);
+        }
+
+        InetAddress addr = cnxn.getSocketAddress();
+        if (addr != null) {
+            Set<NIOServerCnxn> set = ipMap.get(addr);
+            if (set != null) {
+                set.remove(cnxn);
+                // Note that we make no effort here to remove empty mappings
+                // from ipMap.
             }
         }
-    }
 
-    protected NIOServerCnxn createConnection(SocketChannel sock,
-            SelectionKey sk) throws IOException {
-        return new NIOServerCnxn(zkServer, sock, sk, this);
+        // unregister from JMX
+        unregisterConnection(cnxn);
+        return true;
     }
 
-    private int getClientCnxnCount(InetAddress cl) {
-        // The ipMap lock covers both the map, and its contents
-        // (that is, the cnxn sets shouldn't be modified outside of
-        // this lock)
-        synchronized (ipMap) {
-            Set<NIOServerCnxn> s = ipMap.get(cl);
-            if (s == null) return 0;
-            return s.size();
-        }
+    /**
+     * Add or update cnxn in our cnxnExpiryQueue
+     * @param cnxn
+     */
+    public void touchCnxn(NIOServerCnxn cnxn) {
+        cnxnExpiryQueue.update(cnxn, cnxn.getSessionTimeout());
     }
 
-    public void run() {
-        while (!ss.socket().isClosed()) {
-            try {
-                selector.select(1000);
-                Set<SelectionKey> selected;
-                synchronized (this) {
-                    selected = selector.selectedKeys();
-                }
-                ArrayList<SelectionKey> selectedList = new ArrayList<SelectionKey>(
-                        selected);
-                Collections.shuffle(selectedList);
-                for (SelectionKey k : selectedList) {
-                    if ((k.readyOps() & SelectionKey.OP_ACCEPT) != 0) {
-                        SocketChannel sc = ((ServerSocketChannel) k
-                                .channel()).accept();
-                        InetAddress ia = sc.socket().getInetAddress();
-                        int cnxncount = getClientCnxnCount(ia);
-                        if (maxClientCnxns > 0 && cnxncount >= maxClientCnxns){
-                            LOG.warn("Too many connections from " + ia
-                                     + " - max is " + maxClientCnxns );
-                            sc.close();
-                        } else {
-                            LOG.info("Accepted socket connection from "
-                                     + sc.socket().getRemoteSocketAddress());
-                            sc.configureBlocking(false);
-                            SelectionKey sk = sc.register(selector,
-                                    SelectionKey.OP_READ);
-                            NIOServerCnxn cnxn = createConnection(sc, sk);
-                            sk.attach(cnxn);
-                            addCnxn(cnxn);
-                        }
-                    } else if ((k.readyOps() & (SelectionKey.OP_READ | SelectionKey.OP_WRITE)) != 0) {
-                        NIOServerCnxn c = (NIOServerCnxn) k.attachment();
-                        c.doIO(k);
-                    } else {
-                        if (LOG.isDebugEnabled()) {
-                            LOG.debug("Unexpected ops in select "
-                                      + k.readyOps());
-                        }
-                    }
-                }
-                selected.clear();
-            } catch (RuntimeException e) {
-                LOG.warn("Ignoring unexpected runtime exception", e);
-            } catch (Exception e) {
-                LOG.warn("Ignoring exception", e);
+    private void addCnxn(NIOServerCnxn cnxn) {
+        InetAddress addr = cnxn.getSocketAddress();
+        Set<NIOServerCnxn> set = ipMap.get(addr);
+        if (set == null) {
+            // in general we will see 1 connection from each
+            // host, setting the initial cap to 2 allows us
+            // to minimize mem usage in the common case
+            // of 1 entry --  we need to set the initial cap
+            // to 2 to avoid rehash when the first entry is added
+            // Construct a ConcurrentHashSet using a ConcurrentHashMap
+            set = Collections.newSetFromMap(
+                new ConcurrentHashMap<NIOServerCnxn, Boolean>(2));
+            // Put the new set in the map, but only if another thread
+            // hasn't beaten us to it
+            Set<NIOServerCnxn> existingSet = ipMap.putIfAbsent(addr, set);
+            if (existingSet != null) {
+                set = existingSet;
             }
         }
-        closeAll();
-        LOG.info("NIOServerCnxn factory exited run method");
+        set.add(cnxn);
+
+        cnxns.add(cnxn);
+        touchCnxn(cnxn);
+    }
+
+    protected NIOServerCnxn createConnection(SocketChannel sock,
+            SelectionKey sk, SelectorThread selectorThread) throws IOException {
+        return new NIOServerCnxn(zkServer, sock, sk, this, selectorThread);
+    }
+
+    private int getClientCnxnCount(InetAddress cl) {
+        Set<NIOServerCnxn> s = ipMap.get(cl);
+        if (s == null) return 0;
+        return s.size();
     }
 
     /**
@@ -230,30 +794,54 @@ public class NIOServerCnxnFactory extend
      */
     @Override
     @SuppressWarnings("unchecked")
-    synchronized public void closeAll() {
-        selector.wakeup();
-        HashSet<NIOServerCnxn> cnxns;
-        synchronized (this.cnxns) {
-            cnxns = (HashSet<NIOServerCnxn>)this.cnxns.clone();
-        }
-        // got to clear all the connections that we have in the selector
-        for (NIOServerCnxn cnxn: cnxns) {
+    public void closeAll() {
+        // clear all the connections on which we are selecting
+        for (ServerCnxn cnxn : cnxns) {
             try {
-                // don't hold this.cnxns lock as deadlock may occur
+                // This will remove the cnxn from cnxns
                 cnxn.close();
             } catch (Exception e) {
                 LOG.warn("Ignoring exception closing cnxn sessionid 0x"
-                         + Long.toHexString(cnxn.sessionId), e);
+                         + Long.toHexString(cnxn.getSessionId()), e);
             }
         }
     }
 
-    public void shutdown() {
+    public void stop() {
+        stopped = true;
+
+        // Stop queuing connection attempts
         try {
             ss.close();
+        } catch (IOException e) {
+            LOG.warn("Error closing listen socket", e);
+        }
+
+        if (acceptThread != null) {
+            acceptThread.wakeupSelector();
+        }
+        if (expirerThread != null) {
+            expirerThread.interrupt();
+        }
+        for (SelectorThread thread : selectorThreads) {
+            thread.wakeupSelector();
+        }
+        if (workerPool != null) {
+            workerPool.stop();
+        }
+    }
+
+    public void shutdown() {
+        try {
+            // close listen socket and signal selector threads to stop
+            stop();
+
+            // wait for selector and worker threads to shutdown
+            join();
+
+            // close all open connections
             closeAll();
-            thread.interrupt();
-            thread.join();
+
             if (login != null) {
                 login.shutdown();
             }
@@ -262,48 +850,45 @@ public class NIOServerCnxnFactory extend
         } catch (Exception e) {
             LOG.warn("Ignoring unexpected exception during shutdown", e);
         }
-        try {
-            selector.close();
-        } catch (IOException e) {
-            LOG.warn("Selector closing", e);
-        }
+
         if (zkServer != null) {
             zkServer.shutdown();
         }
     }
 
-    @Override
-    public synchronized void closeSession(long sessionId) {
-        selector.wakeup();
-        closeSessionWithoutWakeup(sessionId);
+    public void addSession(long sessionId, NIOServerCnxn cnxn) {
+        sessionMap.put(sessionId, cnxn);
     }
 
-    @SuppressWarnings("unchecked")
-    private void closeSessionWithoutWakeup(long sessionId) {
-        HashSet<NIOServerCnxn> cnxns;
-        synchronized (this.cnxns) {
-            cnxns = (HashSet<NIOServerCnxn>)this.cnxns.clone();
-        }
-
-        for (NIOServerCnxn cnxn : cnxns) {
-            if (cnxn.getSessionId() == sessionId) {
-                try {
-                    cnxn.close();
-                } catch (Exception e) {
-                    LOG.warn("exception during session close", e);
-                }
-                break;
-            }
+    @Override
+    public void closeSession(long sessionId) {
+        NIOServerCnxn cnxn = sessionMap.remove(sessionId);
+        if (cnxn != null) {
+            cnxn.close();
         }
     }
 
     @Override
     public void join() throws InterruptedException {
-        thread.join();
+        if (acceptThread != null) {
+            acceptThread.join();
+        }
+        for (SelectorThread thread : selectorThreads) {
+            thread.join();
+        }
+        if (workerPool != null) {
+            workerPool.join(workerShutdownTimeoutMS);
+        }
     }
 
     @Override
     public Iterable<ServerCnxn> getConnections() {
         return cnxns;
     }
+
+    public void dumpConnections(PrintWriter pwriter) {
+        pwriter.print("Connections ");
+        cnxnExpiryQueue.dump(pwriter);
+    }
+
 }

Added: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/RateLogger.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/RateLogger.java?rev=1423990&view=auto
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/RateLogger.java (added)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/RateLogger.java Wed Dec 19 18:07:14 2012
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server;
+
+import org.slf4j.Logger;
+
+public class RateLogger {
+    public RateLogger(Logger log) {
+        LOG = log;
+    }
+
+    private final Logger LOG;
+    private String msg = null;
+    private long timestamp;
+    private int count = 0;
+
+    public void flush() {
+        if (msg != null) {
+            if (count > 1) {
+                LOG.warn("[" + count + " times] " + msg);
+            } else if (count == 1) {
+                LOG.warn(msg);
+            }
+        }
+        msg = null;
+        count = 0;
+    }
+
+    public void rateLimitLog(String newMsg) {
+        long now = System.currentTimeMillis();
+        if (newMsg.equals(msg)) {
+            ++count;
+            if (now - timestamp >= 100) {
+                flush();
+                msg = newMsg;
+                timestamp = now;
+            }
+        } else {
+            flush();
+            msg = newMsg;
+            timestamp = now;
+            LOG.warn(msg);
+        }
+    }
+}

Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ServerCnxn.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ServerCnxn.java?rev=1423990&r1=1423989&r2=1423990&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ServerCnxn.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ServerCnxn.java Wed Dec 19 18:07:14 2012
@@ -227,6 +227,14 @@ public abstract class ServerCnxn impleme
     protected final static int isroCmd = ByteBuffer.wrap("isro".getBytes())
             .getInt();
 
+    /*
+     * The control sequence sent by the telnet program when it closes a
+     * connection. Include simply to keep the logs cleaner (the server would
+     * close the connection anyway because it would parse this as a negative
+     * length).
+     */
+    protected final static int telnetCloseCmd = 0xfff4fffd;
+
     protected final static HashMap<Integer, String> cmd2String =
         new HashMap<Integer, String>();
 
@@ -248,6 +256,7 @@ public abstract class ServerCnxn impleme
         cmd2String.put(wchsCmd, "wchs");
         cmd2String.put(mntrCmd, "mntr");
         cmd2String.put(isroCmd, "isro");
+        cmd2String.put(telnetCloseCmd, "telnet close");
     }
 
     protected void packetReceived() {