You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by an...@apache.org on 2018/11/22 16:56:10 UTC

[2/2] zookeeper git commit: ZOOKEEPER-3152: Port ZK netty stack to netty4

ZOOKEEPER-3152: Port ZK netty stack to netty4

Summary: Ported the client connection netty stack from netty3 to netty4. This includes both the server side (NettyServerCnxn and friends) and the client side (ClientCnxnSocketNetty).

Test Plan: Modified `FourLetterWordsTest` and `NettyServerCnxnTest`, plus manual testing on a regional ensemble.

FB Reviewers: nixon

Author: Ilya Maykov <il...@fb.com>

Reviewers: andor@apache.org

Closes #669 from ivmaykov/ZOOKEEPER-3152


Project: http://git-wip-us.apache.org/repos/asf/zookeeper/repo
Commit: http://git-wip-us.apache.org/repos/asf/zookeeper/commit/caca0627
Tree: http://git-wip-us.apache.org/repos/asf/zookeeper/tree/caca0627
Diff: http://git-wip-us.apache.org/repos/asf/zookeeper/diff/caca0627

Branch: refs/heads/master
Commit: caca062767c36525e6ecead2ae0f34c447394809
Parents: 1507f67
Author: Ilya Maykov <il...@fb.com>
Authored: Thu Nov 22 17:56:01 2018 +0100
Committer: Andor Molnar <an...@apache.org>
Committed: Thu Nov 22 17:56:01 2018 +0100

----------------------------------------------------------------------
 build.xml                                       |   2 +-
 ivy.xml                                         |   4 +-
 .../org/apache/zookeeper/ClientCnxnSocket.java  |   9 +-
 .../apache/zookeeper/ClientCnxnSocketNIO.java   |   4 +-
 .../apache/zookeeper/ClientCnxnSocketNetty.java | 312 +++++++-----
 .../org/apache/zookeeper/common/NettyUtils.java |  76 +++
 .../zookeeper/server/NettyServerCnxn.java       | 364 +++++++++-----
 .../server/NettyServerCnxnFactory.java          | 474 ++++++++++---------
 .../server/quorum/UnifiedServerSocket.java      |   6 +-
 .../apache/zookeeper/ClientCnxnSocketTest.java  |  13 +
 .../zookeeper/server/NettyServerCnxnTest.java   |  71 +++
 .../org/apache/zookeeper/test/ClientTest.java   |   1 +
 .../zookeeper/test/NettyNettySuiteBase.java     |  13 +
 .../zookeeper/test/NioNettySuiteBase.java       |  13 +
 .../org/apache/zookeeper/test/ReconfigTest.java |  79 ++--
 .../zookeeper/test/TestByteBufAllocator.java    | 152 ++++++
 .../test/TestByteBufAllocatorTestHelper.java    |  52 ++
 17 files changed, 1126 insertions(+), 519 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/zookeeper/blob/caca0627/build.xml
----------------------------------------------------------------------
diff --git a/build.xml b/build.xml
index 3411025..5868532 100644
--- a/build.xml
+++ b/build.xml
@@ -36,7 +36,7 @@ xmlns:cs="antlib:com.puppycrawl.tools.checkstyle.ant">
 
     <property name="audience-annotations.version" value="0.5.0" />
 
-    <property name="netty.version" value="3.10.6.Final"/>
+    <property name="netty.version" value="4.1.29.Final"/>
 
     <property name="junit.version" value="4.12"/>
     <property name="mockito.version" value="1.8.5"/>

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/caca0627/ivy.xml
----------------------------------------------------------------------
diff --git a/ivy.xml b/ivy.xml
index 8692640..c7f79b6 100644
--- a/ivy.xml
+++ b/ivy.xml
@@ -59,8 +59,8 @@
     <dependency org="org.apache.yetus" name="audience-annotations"
                 rev="${audience-annotations.version}"/>
 
-    <dependency org="io.netty" name="netty" conf="default" rev="${netty.version}">
-      <artifact name="netty" type="jar" conf="default"/>
+    <dependency org="io.netty" name="netty-all" conf="default" rev="${netty.version}">
+      <artifact name="netty-all" type="jar" conf="default"/>
     </dependency>
 
     <dependency org="com.googlecode.json-simple" name="json-simple" rev="${json.version}" >

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/caca0627/zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxnSocket.java
----------------------------------------------------------------------
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxnSocket.java b/zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxnSocket.java
index 51ae8bf..ba3806c 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxnSocket.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxnSocket.java
@@ -25,6 +25,7 @@ import java.nio.ByteBuffer;
 import java.text.MessageFormat;
 import java.util.List;
 import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.jute.BinaryInputArchive;
 import org.apache.zookeeper.ClientCnxn.Packet;
@@ -59,8 +60,8 @@ abstract class ClientCnxnSocket {
      * readLength() to receive the full message.
      */
     protected ByteBuffer incomingBuffer = lenBuffer;
-    protected long sentCount = 0;
-    protected long recvCount = 0;
+    protected final AtomicLong sentCount = new AtomicLong(0L);
+    protected final AtomicLong recvCount = new AtomicLong(0L);
     protected long lastHeard;
     protected long lastSend;
     protected long now;
@@ -95,11 +96,11 @@ abstract class ClientCnxnSocket {
     }
 
     long getSentCount() {
-        return sentCount;
+        return sentCount.get();
     }
 
     long getRecvCount() {
-        return recvCount;
+        return recvCount.get();
     }
 
     void updateLastHeard() {

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/caca0627/zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxnSocketNIO.java
----------------------------------------------------------------------
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxnSocketNIO.java b/zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxnSocketNIO.java
index f17a819..4c97721 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxnSocketNIO.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxnSocketNIO.java
@@ -82,7 +82,7 @@ public class ClientCnxnSocketNIO extends ClientCnxnSocket {
             if (!incomingBuffer.hasRemaining()) {
                 incomingBuffer.flip();
                 if (incomingBuffer == lenBuffer) {
-                    recvCount++;
+                    recvCount.getAndIncrement();
                     readLength();
                 } else if (!initialized) {
                     readConnectResult();
@@ -122,7 +122,7 @@ public class ClientCnxnSocketNIO extends ClientCnxnSocket {
                 }
                 sock.write(p.bb);
                 if (!p.bb.hasRemaining()) {
-                    sentCount++;
+                    sentCount.getAndIncrement();
                     outgoingQueue.removeFirstOccurrence(p);
                     if (p.requestHeader != null
                             && p.requestHeader.getType() != OpCode.ping

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/caca0627/zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxnSocketNetty.java
----------------------------------------------------------------------
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxnSocketNetty.java b/zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxnSocketNetty.java
index 34c3db3..74d1283 100755
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxnSocketNetty.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxnSocketNetty.java
@@ -18,46 +18,45 @@
 
 package org.apache.zookeeper;
 
-import org.apache.zookeeper.ClientCnxn.EndOfStreamException;
-import org.apache.zookeeper.ClientCnxn.Packet;
-import org.apache.zookeeper.client.ZKClientConfig;
-import org.apache.zookeeper.common.ClientX509Util;
-import org.jboss.netty.bootstrap.ClientBootstrap;
-import org.jboss.netty.buffer.ChannelBuffer;
-import org.jboss.netty.buffer.ChannelBuffers;
-import org.jboss.netty.channel.Channel;
-import org.jboss.netty.channel.ChannelFactory;
-import org.jboss.netty.channel.ChannelFuture;
-import org.jboss.netty.channel.ChannelFutureListener;
-import org.jboss.netty.channel.ChannelHandlerContext;
-import org.jboss.netty.channel.ChannelPipeline;
-import org.jboss.netty.channel.ChannelPipelineFactory;
-import org.jboss.netty.channel.ChannelStateEvent;
-import org.jboss.netty.channel.Channels;
-import org.jboss.netty.channel.ExceptionEvent;
-import org.jboss.netty.channel.MessageEvent;
-import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
-import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
-import org.jboss.netty.handler.ssl.SslHandler;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.net.ssl.SSLContext;
-import javax.net.ssl.SSLEngine;
-
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.net.SocketAddress;
 import java.util.Iterator;
 import java.util.List;
 import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.Executors;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLEngine;
+
+import io.netty.bootstrap.Bootstrap;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.ChannelPipeline;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.SimpleChannelInboundHandler;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.handler.ssl.SslHandler;
+import org.apache.zookeeper.ClientCnxn.EndOfStreamException;
+import org.apache.zookeeper.ClientCnxn.Packet;
+import org.apache.zookeeper.client.ZKClientConfig;
+import org.apache.zookeeper.common.ClientX509Util;
+import org.apache.zookeeper.common.NettyUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import static org.apache.zookeeper.common.X509Exception.SSLContextException;
 
 /**
@@ -68,18 +67,21 @@ import static org.apache.zookeeper.common.X509Exception.SSLContextException;
 public class ClientCnxnSocketNetty extends ClientCnxnSocket {
     private static final Logger LOG = LoggerFactory.getLogger(ClientCnxnSocketNetty.class);
 
-    ChannelFactory channelFactory = new NioClientSocketChannelFactory(
-            Executors.newCachedThreadPool(), Executors.newCachedThreadPool());
-    Channel channel;
-    CountDownLatch firstConnect;
-    ChannelFuture connectFuture;
-    Lock connectLock = new ReentrantLock();
-    AtomicBoolean disconnected = new AtomicBoolean();
-    AtomicBoolean needSasl = new AtomicBoolean();
-    Semaphore waitSasl = new Semaphore(0);
+    private final EventLoopGroup eventLoopGroup;
+    private Channel channel;
+    private CountDownLatch firstConnect;
+    private ChannelFuture connectFuture;
+    private final Lock connectLock = new ReentrantLock();
+    private final AtomicBoolean disconnected = new AtomicBoolean();
+    private final AtomicBoolean needSasl = new AtomicBoolean();
+    private final Semaphore waitSasl = new Semaphore(0);
+
+    private static final AtomicReference<ByteBufAllocator> TEST_ALLOCATOR =
+            new AtomicReference<>(null);
 
     ClientCnxnSocketNetty(ZKClientConfig clientConfig) throws IOException {
         this.clientConfig = clientConfig;
+        eventLoopGroup = NettyUtils.newNioOrEpollEventLoopGroup();
         initProperties();
     }
 
@@ -103,59 +105,90 @@ public class ClientCnxnSocketNetty extends ClientCnxnSocket {
     boolean isConnected() {
         // Assuming that isConnected() is only used to initiate connection,
         // not used by some other connection status judgement.
-        return channel != null;
+        connectLock.lock();
+        try {
+            return channel != null || connectFuture != null;
+        } finally {
+            connectLock.unlock();
+        }
+    }
+
+    private Bootstrap configureBootstrapAllocator(Bootstrap bootstrap) {
+        ByteBufAllocator testAllocator = TEST_ALLOCATOR.get();
+        if (testAllocator != null) {
+            return bootstrap.option(ChannelOption.ALLOCATOR, testAllocator);
+        } else {
+            return bootstrap;
+        }
     }
 
     @Override
     void connect(InetSocketAddress addr) throws IOException {
         firstConnect = new CountDownLatch(1);
 
-        ClientBootstrap bootstrap = new ClientBootstrap(channelFactory);
-
-        bootstrap.setPipelineFactory(new ZKClientPipelineFactory(addr.getHostString(), addr.getPort()));
-        bootstrap.setOption("soLinger", -1);
-        bootstrap.setOption("tcpNoDelay", true);
-
-        connectFuture = bootstrap.connect(addr);
-        connectFuture.addListener(new ChannelFutureListener() {
-            @Override
-            public void operationComplete(ChannelFuture channelFuture) throws Exception {
-                // this lock guarantees that channel won't be assgined after cleanup().
-                connectLock.lock();
-                try {
-                    if (!channelFuture.isSuccess() || connectFuture == null) {
-                        LOG.info("future isn't success, cause: {}", channelFuture.getCause());
-                        return;
-                    }
-                    // setup channel, variables, connection, etc.
-                    channel = channelFuture.getChannel();
-
-                    disconnected.set(false);
-                    initialized = false;
-                    lenBuffer.clear();
-                    incomingBuffer = lenBuffer;
-
-                    sendThread.primeConnection();
-                    updateNow();
-                    updateLastSendAndHeard();
-
-                    if (sendThread.tunnelAuthInProgress()) {
-                        waitSasl.drainPermits();
-                        needSasl.set(true);
-                        sendPrimePacket();
-                    } else {
-                        needSasl.set(false);
-                    }
+        Bootstrap bootstrap = new Bootstrap()
+                .group(eventLoopGroup)
+                .channel(NettyUtils.nioOrEpollSocketChannel())
+                .option(ChannelOption.SO_LINGER, -1)
+                .option(ChannelOption.TCP_NODELAY, true)
+                .handler(new ZKClientPipelineFactory(addr.getHostString(), addr.getPort()));
+        bootstrap = configureBootstrapAllocator(bootstrap);
+        bootstrap.validate();
 
-                    // we need to wake up on first connect to avoid timeout.
-                    wakeupCnxn();
-                    firstConnect.countDown();
-                    LOG.info("channel is connected: {}", channelFuture.getChannel());
-                } finally {
-                    connectLock.unlock();
+        connectLock.lock();
+        try {
+            connectFuture = bootstrap.connect(addr);
+            connectFuture.addListener(new ChannelFutureListener() {
+                @Override
+                public void operationComplete(ChannelFuture channelFuture) throws Exception {
+                    // this lock guarantees that channel won't be assigned after cleanup().
+                    connectLock.lock();
+                    try {
+                        if (!channelFuture.isSuccess()) {
+                            LOG.info("future isn't success, cause:", channelFuture.cause());
+                            return;
+                        } else if (connectFuture == null) {
+                            LOG.info("connect attempt cancelled");
+                            // If the connect attempt was cancelled but succeeded
+                            // anyway, make sure to close the channel, otherwise
+                            // we may leak a file descriptor.
+                            channelFuture.channel().close();
+                            return;
+                        }
+                        // setup channel, variables, connection, etc.
+                        channel = channelFuture.channel();
+
+                        disconnected.set(false);
+                        initialized = false;
+                        lenBuffer.clear();
+                        incomingBuffer = lenBuffer;
+
+                        sendThread.primeConnection();
+                        updateNow();
+                        updateLastSendAndHeard();
+
+                        if (sendThread.tunnelAuthInProgress()) {
+                            waitSasl.drainPermits();
+                            needSasl.set(true);
+                            sendPrimePacket();
+                        } else {
+                            needSasl.set(false);
+                        }
+                        LOG.info("channel is connected: {}", channelFuture.channel());
+                    } finally {
+                        connectFuture = null;
+                        connectLock.unlock();
+                        // need to wake on connect success or failure to avoid
+                        // timing out ClientCnxn.SendThread which may be
+                        // blocked waiting for first connect in doTransport().
+                        wakeupCnxn();
+                        firstConnect.countDown();
+                    }
                 }
-            }
-        });
+            });
+        } finally {
+            connectLock.unlock();
+        }
     }
 
     @Override
@@ -163,11 +196,11 @@ public class ClientCnxnSocketNetty extends ClientCnxnSocket {
         connectLock.lock();
         try {
             if (connectFuture != null) {
-                connectFuture.cancel();
+                connectFuture.cancel(false);
                 connectFuture = null;
             }
             if (channel != null) {
-                channel.close().awaitUninterruptibly();
+                channel.close().syncUninterruptibly();
                 channel = null;
             }
         } finally {
@@ -184,7 +217,9 @@ public class ClientCnxnSocketNetty extends ClientCnxnSocket {
 
     @Override
     void close() {
-        channelFactory.releaseExternalResources();
+        if (!eventLoopGroup.isShuttingDown()) {
+            eventLoopGroup.shutdownGracefully();
+        }
     }
 
     @Override
@@ -199,6 +234,9 @@ public class ClientCnxnSocketNetty extends ClientCnxnSocket {
 
     @Override
     void packetAdded() {
+        // NO-OP. Adding a packet will already wake up a netty connection
+        // so we don't need to add a dummy packet to the queue to trigger
+        // a wake-up.
     }
 
     @Override
@@ -230,13 +268,11 @@ public class ClientCnxnSocketNetty extends ClientCnxnSocket {
                     return;
                 }
             } else {
-                if ((head = outgoingQueue.poll(waitTimeOut, TimeUnit.MILLISECONDS)) == null) {
-                    return;
-                }
+                head = outgoingQueue.poll(waitTimeOut, TimeUnit.MILLISECONDS);
             }
             // check if being waken up on closing.
             if (!sendThread.getZkState().isAlive()) {
-                // adding back the patck to notify of failure in conLossPacket().
+                // adding back the packet to notify of failure in conLossPacket().
                 addBack(head);
                 return;
             }
@@ -261,18 +297,46 @@ public class ClientCnxnSocketNetty extends ClientCnxnSocket {
         }
     }
 
-    private void sendPkt(Packet p) {
+    /**
+     * Sends a packet to the remote peer and flushes the channel.
+     * @param p packet to send.
+     * @return a ChannelFuture that will complete when the write operation
+     *         succeeds or fails.
+     */
+    private ChannelFuture sendPktAndFlush(Packet p) {
+        return sendPkt(p, true);
+    }
+
+    /**
+     * Sends a packet to the remote peer but does not flush() the channel.
+     * @param p packet to send.
+     * @return a ChannelFuture that will complete when the write operation
+     *         succeeds or fails.
+     */
+    private ChannelFuture sendPktOnly(Packet p) {
+        return sendPkt(p, false);
+    }
+
+    private ChannelFuture sendPkt(Packet p, boolean doFlush) {
         // Assuming the packet will be sent out successfully. Because if it fails,
         // the channel will close and clean up queues.
         p.createBB();
         updateLastSend();
-        sentCount++;
-        channel.write(ChannelBuffers.wrappedBuffer(p.bb));
+        ChannelFuture result = channel.write(Unpooled.wrappedBuffer(p.bb));
+        result.addListener(f -> {
+            if (f.isSuccess()) {
+                sentCount.getAndIncrement();
+            }
+        });
+        if (doFlush) {
+            channel.flush();
+        }
+        return result;
     }
 
     private void sendPrimePacket() {
         // assuming the first packet is the priming packet.
-        sendPkt(outgoingQueue.remove());
+        sendPktAndFlush(outgoingQueue.remove());
     }
 
     /**
@@ -290,13 +354,16 @@ public class ClientCnxnSocketNetty extends ClientCnxnSocket {
                         pendingQueue.add(p);
                     }
                 }
-                sendPkt(p);
+                sendPktOnly(p);
             }
             if (outgoingQueue.isEmpty()) {
                 break;
             }
             p = outgoingQueue.remove();
         }
+        // TODO: maybe we should flush in the loop above every N packets/bytes?
+        // But, how do we determine the right value for N ...
+        channel.flush();
     }
 
     @Override
@@ -304,19 +371,19 @@ public class ClientCnxnSocketNetty extends ClientCnxnSocket {
         if (channel == null) {
             throw new IOException("channel has been closed");
         }
-        sendPkt(p);
+        sendPktAndFlush(p);
     }
 
     @Override
     SocketAddress getRemoteSocketAddress() {
         Channel copiedChanRef = channel;
-        return (copiedChanRef == null) ? null : copiedChanRef.getRemoteAddress();
+        return (copiedChanRef == null) ? null : copiedChanRef.remoteAddress();
     }
 
     @Override
     SocketAddress getLocalSocketAddress() {
         Channel copiedChanRef = channel;
-        return (copiedChanRef == null) ? null : copiedChanRef.getLocalAddress();
+        return (copiedChanRef == null) ? null : copiedChanRef.localAddress();
     }
 
     @Override
@@ -345,7 +412,7 @@ public class ClientCnxnSocketNetty extends ClientCnxnSocket {
      * ZKClientPipelineFactory is the netty pipeline factory for this netty
      * connection implementation.
      */
-    private class ZKClientPipelineFactory implements ChannelPipelineFactory {
+    private class ZKClientPipelineFactory extends ChannelInitializer<SocketChannel> {
         private SSLContext sslContext = null;
         private SSLEngine sslEngine = null;
         private String host;
@@ -357,13 +424,12 @@ public class ClientCnxnSocketNetty extends ClientCnxnSocket {
         }
 
         @Override
-        public ChannelPipeline getPipeline() throws Exception {
-            ChannelPipeline pipeline = Channels.pipeline();
+        protected void initChannel(SocketChannel ch) throws Exception {
+            ChannelPipeline pipeline = ch.pipeline();
             if (clientConfig.getBoolean(ZKClientConfig.SECURE_CLIENT)) {
                 initSSL(pipeline);
             }
             pipeline.addLast("handler", new ZKClientHandler());
-            return pipeline;
         }
 
         // The synchronized is to prevent the race on shared variable "sslEngine".
@@ -375,7 +441,7 @@ public class ClientCnxnSocketNetty extends ClientCnxnSocket {
                 sslEngine.setUseClientMode(true);
             }
             pipeline.addLast("ssl", new SslHandler(sslEngine));
-            LOG.info("SSL handler added for channel: {}", pipeline.getChannel());
+            LOG.info("SSL handler added for channel: {}", pipeline.channel());
         }
     }
 
@@ -383,13 +449,12 @@ public class ClientCnxnSocketNetty extends ClientCnxnSocket {
      * ZKClientHandler is the netty handler that sits in netty upstream last
      * place. It mainly handles read traffic and helps synchronize connection state.
      */
-    private class ZKClientHandler extends SimpleChannelUpstreamHandler {
+    private class ZKClientHandler extends SimpleChannelInboundHandler<ByteBuf> {
         AtomicBoolean channelClosed = new AtomicBoolean(false);
 
         @Override
-        public void channelDisconnected(ChannelHandlerContext ctx,
-                                        ChannelStateEvent e) throws Exception {
-            LOG.info("channel is disconnected: {}", ctx.getChannel());
+        public void channelInactive(ChannelHandlerContext ctx) throws Exception {
+            LOG.info("channel is disconnected: {}", ctx.channel());
             cleanup();
         }
 
@@ -406,11 +471,9 @@ public class ClientCnxnSocketNetty extends ClientCnxnSocket {
         }
 
         @Override
-        public void messageReceived(ChannelHandlerContext ctx,
-                                    MessageEvent e) throws Exception {
+        protected void channelRead0(ChannelHandlerContext ctx, ByteBuf buf) throws Exception {
             updateNow();
-            ChannelBuffer buf = (ChannelBuffer) e.getMessage();
-            while (buf.readable()) {
+            while (buf.isReadable()) {
                 if (incomingBuffer.remaining() > buf.readableBytes()) {
                     int newLimit = incomingBuffer.position()
                             + buf.readableBytes();
@@ -422,7 +485,7 @@ public class ClientCnxnSocketNetty extends ClientCnxnSocket {
                 if (!incomingBuffer.hasRemaining()) {
                     incomingBuffer.flip();
                     if (incomingBuffer == lenBuffer) {
-                        recvCount++;
+                        recvCount.getAndIncrement();
                         readLength();
                     } else if (!initialized) {
                         readConnectResult();
@@ -439,13 +502,34 @@ public class ClientCnxnSocketNetty extends ClientCnxnSocket {
                 }
             }
             wakeupCnxn();
+            // Note: SimpleChannelInboundHandler releases the ByteBuf for us
+            // so we don't need to do it.
         }
 
         @Override
-        public void exceptionCaught(ChannelHandlerContext ctx,
-                                    ExceptionEvent e) throws Exception {
-            LOG.warn("Exception caught: {}", e, e.getCause());
+        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
+            LOG.warn("Exception caught", cause);
             cleanup();
         }
     }
+
+    /**
+     * Sets the test ByteBufAllocator. This allocator will be used by all
+     * future instances of this class.
+     * It is not recommended to use this method outside of testing.
+     * @param allocator the ByteBufAllocator to use for all netty buffer
+     *                  allocations.
+     */
+    static void setTestAllocator(ByteBufAllocator allocator) {
+        TEST_ALLOCATOR.set(allocator);
+    }
+
+    /**
+     * Clears the test ByteBufAllocator. The default allocator will be used
+     * by all future instances of this class.
+     * It is not recommended to use this method outside of testing.
+     */
+    static void clearTestAllocator() {
+        TEST_ALLOCATOR.set(null);
+    }
 }

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/caca0627/zookeeper-server/src/main/java/org/apache/zookeeper/common/NettyUtils.java
----------------------------------------------------------------------
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/common/NettyUtils.java b/zookeeper-server/src/main/java/org/apache/zookeeper/common/NettyUtils.java
new file mode 100644
index 0000000..5883296
--- /dev/null
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/common/NettyUtils.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.common;
+
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.epoll.Epoll;
+import io.netty.channel.epoll.EpollEventLoopGroup;
+import io.netty.channel.epoll.EpollServerSocketChannel;
+import io.netty.channel.epoll.EpollSocketChannel;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.ServerSocketChannel;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.channel.socket.nio.NioServerSocketChannel;
+import io.netty.channel.socket.nio.NioSocketChannel;
+
+/**
+ * Helper methods for netty code.
+ */
+public class NettyUtils {
+    /**
+     * If {@link Epoll#isAvailable()} <code>== true</code>, returns a new
+     * {@link EpollEventLoopGroup}, otherwise returns a new
+     * {@link NioEventLoopGroup}.
+     * @return a new {@link EventLoopGroup}.
+     */
+    public static EventLoopGroup newNioOrEpollEventLoopGroup() {
+        if (Epoll.isAvailable()) {
+            return new EpollEventLoopGroup();
+        } else {
+            return new NioEventLoopGroup();
+        }
+    }
+
+    /**
+     * If {@link Epoll#isAvailable()} <code>== true</code>, returns
+     * {@link EpollSocketChannel}, otherwise returns {@link NioSocketChannel}.
+     * @return a socket channel class.
+     */
+    public static Class<? extends SocketChannel> nioOrEpollSocketChannel() {
+        if (Epoll.isAvailable()) {
+            return EpollSocketChannel.class;
+        } else {
+            return NioSocketChannel.class;
+        }
+    }
+
+    /**
+     * If {@link Epoll#isAvailable()} <code>== true</code>, returns
+     * {@link EpollServerSocketChannel}, otherwise returns
+     * {@link NioServerSocketChannel}.
+     * @return a server socket channel class.
+     */
+    public static Class<? extends ServerSocketChannel> nioOrEpollServerSocketChannel() {
+        if (Epoll.isAvailable()) {
+            return EpollServerSocketChannel.class;
+        } else {
+            return NioServerSocketChannel.class;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/caca0627/zookeeper-server/src/main/java/org/apache/zookeeper/server/NettyServerCnxn.java
----------------------------------------------------------------------
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/NettyServerCnxn.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/NettyServerCnxn.java
index f0a8f7f..311d3c1 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/NettyServerCnxn.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/NettyServerCnxn.java
@@ -18,23 +18,26 @@
 
 package org.apache.zookeeper.server;
 
-import static org.jboss.netty.buffer.ChannelBuffers.wrappedBuffer;
-
 import java.io.BufferedWriter;
-import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.io.PrintWriter;
 import java.io.Writer;
 import java.net.InetSocketAddress;
-import java.net.SocketAddress;
 import java.nio.ByteBuffer;
+import java.nio.channels.SelectionKey;
 import java.security.cert.Certificate;
 import java.util.Arrays;
-import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufUtil;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
+import io.netty.util.ReferenceCountUtil;
 import org.apache.jute.BinaryInputArchive;
-import org.apache.jute.BinaryOutputArchive;
 import org.apache.jute.Record;
 import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.proto.ReplyHeader;
@@ -43,29 +46,23 @@ import org.apache.zookeeper.server.command.CommandExecutor;
 import org.apache.zookeeper.server.command.FourLetterCommands;
 import org.apache.zookeeper.server.command.NopCommand;
 import org.apache.zookeeper.server.command.SetTraceMaskCommand;
-import org.jboss.netty.buffer.ChannelBuffer;
-import org.jboss.netty.buffer.ChannelBuffers;
-import org.jboss.netty.channel.Channel;
-import org.jboss.netty.channel.ChannelFuture;
-import org.jboss.netty.channel.ChannelFutureListener;
-import org.jboss.netty.channel.MessageEvent;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class NettyServerCnxn extends ServerCnxn {
     private static final Logger LOG = LoggerFactory.getLogger(NettyServerCnxn.class);
-    Channel channel;
-    ChannelBuffer queuedBuffer;
-    volatile boolean throttled;
-    ByteBuffer bb;
-    ByteBuffer bbLen = ByteBuffer.allocate(4);
-    long sessionId;
-    int sessionTimeout;
-    Certificate[] clientChain;
-    volatile boolean closingChannel;
-
-    NettyServerCnxnFactory factory;
-    boolean initialized;
+    private final Channel channel;
+    private ByteBuf queuedBuffer;
+    private final AtomicBoolean throttled = new AtomicBoolean(false);
+    private ByteBuffer bb;
+    private final ByteBuffer bbLen = ByteBuffer.allocate(4);
+    private long sessionId;
+    private int sessionTimeout;
+    private Certificate[] clientChain;
+    private volatile boolean closingChannel;
+
+    private final NettyServerCnxnFactory factory;
+    private boolean initialized;
 
     NettyServerCnxn(Channel channel, ZooKeeperServer zks, NettyServerCnxnFactory factory) {
         super(zks);
@@ -82,8 +79,8 @@ public class NettyServerCnxn extends ServerCnxn {
         closingChannel = true;
 
         if (LOG.isDebugEnabled()) {
-            LOG.debug("close called for sessionid:0x"
-                    + Long.toHexString(sessionId));
+            LOG.debug("close called for sessionid:0x{}",
+                    Long.toHexString(sessionId));
         }
         setStale();
 
@@ -92,28 +89,23 @@ public class NettyServerCnxn extends ServerCnxn {
         // connection bean leak under certain race conditions.
         factory.unregisterConnection(this);
 
-        synchronized(factory.cnxns){
-            // if this is not in cnxns then it's already closed
-            if (!factory.cnxns.remove(this)) {
-                if (LOG.isDebugEnabled()) {
-                    LOG.debug("cnxns size:" + factory.cnxns.size());
-                }
-                return;
-            }
+        // if this is not in cnxns then it's already closed
+        if (!factory.cnxns.remove(this)) {
             if (LOG.isDebugEnabled()) {
-                LOG.debug("close in progress for sessionid:0x"
-                        + Long.toHexString(sessionId));
+                LOG.debug("cnxns size:{}", factory.cnxns.size());
             }
+            return;
+        }
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("close in progress for sessionid:0x{}",
+                    Long.toHexString(sessionId));
+        }
 
-            factory.removeCnxnFromSessionMap(this);
+        factory.removeCnxnFromSessionMap(this);
 
-            synchronized (factory.ipMap) {
-                Set<NettyServerCnxn> s =
-                    factory.ipMap.get(((InetSocketAddress)channel
-                            .getRemoteAddress()).getAddress());
-                s.remove(this);
-            }
-        }
+        factory.removeCnxnFromIpMap(
+                this,
+                ((InetSocketAddress)channel.remoteAddress()).getAddress());
 
         if (zkServer != null) {
             zkServer.removeCnxn(this);
@@ -123,7 +115,14 @@ public class NettyServerCnxn extends ServerCnxn {
             // Since we don't check on the futures created by write calls to the channel complete we need to make sure
             // that all writes have been completed before closing the channel or we risk data loss
             // See: http://lists.jboss.org/pipermail/netty-users/2009-August/001122.html
-            channel.write(ChannelBuffers.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE);
+            channel.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(new ChannelFutureListener() {
+                @Override
+                public void operationComplete(ChannelFuture future) {
+                    future.channel().close().addListener(f -> releaseQueuedBuffer());
+                }
+            });
+        } else {
+            channel.eventLoop().execute(this::releaseQueuedBuffer);
         }
     }
 
@@ -160,21 +159,6 @@ public class NettyServerCnxn extends ServerCnxn {
         }
     }
 
-    static class ResumeMessageEvent implements MessageEvent {
-        Channel channel;
-        ResumeMessageEvent(Channel channel) {
-            this.channel = channel;
-        }
-        @Override
-        public Object getMessage() {return null;}
-        @Override
-        public SocketAddress getRemoteAddress() {return null;}
-        @Override
-        public Channel getChannel() {return channel;}
-        @Override
-        public ChannelFuture getFuture() {return null;}
-    };
-
     @Override
     public void sendResponse(ReplyHeader h, Record r, String tag)
             throws IOException {
@@ -192,28 +176,18 @@ public class NettyServerCnxn extends ServerCnxn {
     }
 
     @Override
-    public void enableRecv() {
-        if (throttled) {
-            throttled = false;
-            if (LOG.isDebugEnabled()) {
-                LOG.debug("Sending unthrottle event " + this);
-            }
-            channel.getPipeline().sendUpstream(new ResumeMessageEvent(channel));
-        }
-    }
-
-    @Override
     public void sendBuffer(ByteBuffer sendBuffer) {
         if (sendBuffer == ServerCnxnFactory.closeConn) {
             close();
             return;
         }
-        channel.write(wrappedBuffer(sendBuffer));
-        packetSent();
+        channel.writeAndFlush(Unpooled.wrappedBuffer(sendBuffer)).addListener(f -> {
+            if (f.isSuccess()) {
+                packetSent();
+            }
+        });
     }
 
-
-
     /**
      * This class wraps the sendBuffer method of NIOServerCnxn. It is
      * responsible for chunking up the response to a client. Rather
@@ -255,9 +229,7 @@ public class NettyServerCnxn extends ServerCnxn {
     }
 
     /** Return if four letter word found and responded to, otw false **/
-    private boolean checkFourLetterWord(final Channel channel,
-            ChannelBuffer message, final int len) throws IOException
-    {
+    private boolean checkFourLetterWord(final Channel channel, ByteBuf message, final int len) {
         // We take advantage of the limited size of the length to look
         // for cmds. They are all 4-bytes which fits inside of an int
         if (!FourLetterCommands.isKnown(len)) {
@@ -266,7 +238,10 @@ public class NettyServerCnxn extends ServerCnxn {
 
         String cmd = FourLetterCommands.getCommandString(len);
 
-        channel.setInterestOps(0).awaitUninterruptibly();
+        // Stops automatic reads of incoming data on this channel. We don't
+        // expect any more traffic from the client when processing a 4LW
+        // so this shouldn't break anything.
+        channel.config().setAutoRead(false);
         packetReceived(4);
 
         final PrintWriter pwriter = new PrintWriter(
@@ -281,8 +256,7 @@ public class NettyServerCnxn extends ServerCnxn {
             return true;
         }
 
-        LOG.info("Processing " + cmd + " command from "
-                + channel.getRemoteAddress());
+        LOG.info("Processing {} command from {}", cmd, channel.remoteAddress());
 
        if (len == FourLetterCommands.setTraceMaskCmd) {
             ByteBuffer mask = ByteBuffer.allocate(8);
@@ -299,19 +273,126 @@ public class NettyServerCnxn extends ServerCnxn {
         }
     }
 
-    public void receiveMessage(ChannelBuffer message) {
+    /**
+     * Process incoming message. This should only be called from the event
+     * loop thread.
+     * @param buf the message bytes to process.
+     */
+    void processMessage(ByteBuf buf) {
+        assert channel.eventLoop().inEventLoop();
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("0x{} queuedBuffer: {}",
+                    Long.toHexString(sessionId),
+                    queuedBuffer);
+        }
+
+        if (LOG.isTraceEnabled()) {
+            LOG.trace("0x{} buf {}",
+                    Long.toHexString(sessionId),
+                    ByteBufUtil.hexDump(buf));
+        }
+
+        if (throttled.get()) {
+            LOG.debug("Received message while throttled");
+            // we are throttled, so we need to queue
+            if (queuedBuffer == null) {
+                LOG.debug("allocating queue");
+                queuedBuffer = channel.alloc().buffer(buf.readableBytes());
+            }
+            queuedBuffer.writeBytes(buf);
+            if (LOG.isTraceEnabled()) {
+                LOG.trace("0x{} queuedBuffer {}",
+                        Long.toHexString(sessionId),
+                        ByteBufUtil.hexDump(queuedBuffer));
+            }
+        } else {
+            LOG.debug("not throttled");
+            if (queuedBuffer != null) {
+                queuedBuffer.writeBytes(buf);
+                processQueuedBuffer();
+            } else {
+                receiveMessage(buf);
+                // Have to check !closingChannel, because an error in
+                // receiveMessage() could have led to close() being called.
+                if (!closingChannel && buf.isReadable()) {
+                    if (LOG.isTraceEnabled()) {
+                        LOG.trace("Before copy {}", buf);
+                    }
+                    if (queuedBuffer == null) {
+                        queuedBuffer = channel.alloc().buffer(buf.readableBytes());
+                    }
+                    queuedBuffer.writeBytes(buf);
+                    if (LOG.isTraceEnabled()) {
+                        LOG.trace("Copy is {}", queuedBuffer);
+                        LOG.trace("0x{} queuedBuffer {}",
+                                Long.toHexString(sessionId),
+                                ByteBufUtil.hexDump(queuedBuffer));
+                    }
+                }
+            }
+        }
+    }
+
+    /**
+     * Try to process previously queued message. This should only be called
+     * from the event loop thread.
+     */
+    void processQueuedBuffer() {
+        assert channel.eventLoop().inEventLoop();
+        if (queuedBuffer != null) {
+            if (LOG.isTraceEnabled()) {
+                LOG.trace("processing queue 0x{} queuedBuffer {}",
+                        Long.toHexString(sessionId),
+                        ByteBufUtil.hexDump(queuedBuffer));
+            }
+            receiveMessage(queuedBuffer);
+            if (closingChannel) {
+                // close() could have been called if receiveMessage() failed
+                LOG.debug("Processed queue - channel closed, dropping remaining bytes");
+            } else if (!queuedBuffer.isReadable()) {
+                LOG.debug("Processed queue - no bytes remaining");
+                releaseQueuedBuffer();
+            } else {
+                LOG.debug("Processed queue - bytes remaining");
+            }
+        } else {
+            LOG.debug("queue empty");
+        }
+    }
+
+    /**
+     * Clean up queued buffer once it's no longer needed. This should only be
+     * called from the event loop thread.
+     */
+    private void releaseQueuedBuffer() {
+        assert channel.eventLoop().inEventLoop();
+        if (queuedBuffer != null) {
+            ReferenceCountUtil.release(queuedBuffer);
+            queuedBuffer = null;
+        }
+    }
+
+    /**
+     * Receive a message, which can come from the queued buffer or from a new
+     * buffer coming in over the channel. This should only be called from the
+     * event loop thread.
+     * @param message the message bytes to process.
+     */
+    private void receiveMessage(ByteBuf message) {
+        assert channel.eventLoop().inEventLoop();
         try {
-            while(message.readable() && !throttled) {
+            while(message.isReadable() && !throttled.get()) {
                 if (bb != null) {
                     if (LOG.isTraceEnabled()) {
-                        LOG.trace("message readable " + message.readableBytes()
-                                + " bb len " + bb.remaining() + " " + bb);
+                        LOG.trace("message readable {} bb len {} {}",
+                                message.readableBytes(),
+                                bb.remaining(),
+                                bb);
                         ByteBuffer dat = bb.duplicate();
                         dat.flip();
-                        LOG.trace(Long.toHexString(sessionId)
-                                + " bb 0x"
-                                + ChannelBuffers.hexDump(
-                                        ChannelBuffers.copiedBuffer(dat)));
+                        LOG.trace("0x{} bb {}",
+                                Long.toHexString(sessionId),
+                                ByteBufUtil.hexDump(Unpooled.wrappedBuffer(dat)));
                     }
 
                     if (bb.remaining() > message.readableBytes()) {
@@ -322,16 +403,15 @@ public class NettyServerCnxn extends ServerCnxn {
                     bb.limit(bb.capacity());
 
                     if (LOG.isTraceEnabled()) {
-                        LOG.trace("after readBytes message readable "
-                                + message.readableBytes()
-                                + " bb len " + bb.remaining() + " " + bb);
+                        LOG.trace("after readBytes message readable {} bb len {} {}",
+                                message.readableBytes(),
+                                bb.remaining(),
+                                bb);
                         ByteBuffer dat = bb.duplicate();
                         dat.flip();
-                        LOG.trace("after readbytes "
-                                + Long.toHexString(sessionId)
-                                + " bb 0x"
-                                + ChannelBuffers.hexDump(
-                                        ChannelBuffers.copiedBuffer(dat)));
+                        LOG.trace("after readbytes 0x{} bb {}",
+                                Long.toHexString(sessionId),
+                                ByteBufUtil.hexDump(Unpooled.wrappedBuffer(dat)));
                     }
                     if (bb.remaining() == 0) {
                         bb.flip();
@@ -342,10 +422,14 @@ public class NettyServerCnxn extends ServerCnxn {
                             throw new IOException("ZK down");
                         }
                         if (initialized) {
+                            // TODO: if zks.processPacket() is changed to take a ByteBuffer[],
+                            // we could implement zero-copy queueing.
                             zks.processPacket(this, bb);
                         } else {
-                            LOG.debug("got conn req request from "
-                                    + getRemoteSocketAddress());
+                            if (LOG.isDebugEnabled()) {
+                                LOG.debug("got conn req request from {}",
+                                        getRemoteSocketAddress());
+                            }
                             zks.processConnectRequest(this, bb);
                             initialized = true;
                         }
@@ -353,15 +437,14 @@ public class NettyServerCnxn extends ServerCnxn {
                     }
                 } else {
                     if (LOG.isTraceEnabled()) {
-                        LOG.trace("message readable "
-                                + message.readableBytes()
-                                + " bblenrem " + bbLen.remaining());
+                        LOG.trace("message readable {} bblenrem {}",
+                                message.readableBytes(),
+                                bbLen.remaining());
                         ByteBuffer dat = bbLen.duplicate();
                         dat.flip();
-                        LOG.trace(Long.toHexString(sessionId)
-                                + " bbLen 0x"
-                                + ChannelBuffers.hexDump(
-                                        ChannelBuffers.copiedBuffer(dat)));
+                        LOG.trace("0x{} bbLen {}",
+                                Long.toHexString(sessionId),
+                                ByteBufUtil.hexDump(Unpooled.wrappedBuffer(dat)));
                     }
 
                     if (message.readableBytes() < bbLen.remaining()) {
@@ -373,15 +456,15 @@ public class NettyServerCnxn extends ServerCnxn {
                         bbLen.flip();
 
                         if (LOG.isTraceEnabled()) {
-                            LOG.trace(Long.toHexString(sessionId)
-                                    + " bbLen 0x"
-                                    + ChannelBuffers.hexDump(
-                                            ChannelBuffers.copiedBuffer(bbLen)));
+                            LOG.trace("0x{} bbLen {}",
+                                    Long.toHexString(sessionId),
+                                    ByteBufUtil.hexDump(Unpooled.wrappedBuffer(bbLen)));
                         }
                         int len = bbLen.getInt();
                         if (LOG.isTraceEnabled()) {
-                            LOG.trace(Long.toHexString(sessionId)
-                                    + " bbLen len is " + len);
+                            LOG.trace("0x{} bbLen len is {}",
+                                    Long.toHexString(sessionId),
+                                    len);
                         }
 
                         bbLen.clear();
@@ -403,16 +486,38 @@ public class NettyServerCnxn extends ServerCnxn {
         }
     }
 
+    /**
+     * An event that triggers a change in the channel's "Auto Read" setting.
+     * Used for throttling. By using an enum we can treat the two values as
+     * singletons and compare with ==.
+     */
+    enum AutoReadEvent {
+        DISABLE,
+        ENABLE
+    }
+
+    /**
+     * Note that the netty implementation ignores the <code>waitDisableRecv</code>
+     * parameter and is always asynchronous.
+     * @param waitDisableRecv ignored by this implementation.
+     */
     @Override
     public void disableRecv(boolean waitDisableRecv) {
-        throttled = true;
-        if (LOG.isDebugEnabled()) {
-            LOG.debug("Throttling - disabling recv " + this);
+        if (throttled.compareAndSet(false, true)) {
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Throttling - disabling recv {}", this);
+            }
+            channel.pipeline().fireUserEventTriggered(AutoReadEvent.DISABLE);
         }
-        ChannelFuture cf = channel.setReadable(false);
+    }
 
-        if (waitDisableRecv) {
-            cf.awaitUninterruptibly();
+    @Override
+    public void enableRecv() {
+        if (throttled.compareAndSet(true, false)) {
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Sending unthrottle event {}", this);
+            }
+            channel.pipeline().fireUserEventTriggered(AutoReadEvent.ENABLE);
         }
     }
 
@@ -423,12 +528,26 @@ public class NettyServerCnxn extends ServerCnxn {
 
     @Override
     public int getInterestOps() {
-        return channel.getInterestOps();
+        // This might not be 100% right, but it's only used for printing
+        // connection info in the netty implementation so it's probably ok.
+        if (channel == null || !channel.isOpen()) {
+            return 0;
+        }
+        int interestOps = 0;
+        if (!throttled.get()) {
+            interestOps |= SelectionKey.OP_READ;
+        }
+        if (!channel.isWritable()) {
+            // OP_READ means "can read", but OP_WRITE means "cannot write",
+            // it's weird.
+            interestOps |= SelectionKey.OP_WRITE;
+        }
+        return interestOps;
     }
 
     @Override
     public InetSocketAddress getRemoteSocketAddress() {
-        return (InetSocketAddress)channel.getRemoteAddress();
+        return (InetSocketAddress)channel.remoteAddress();
     }
 
     /** Send close connection packet to the client.
@@ -469,4 +588,9 @@ public class NettyServerCnxn extends ServerCnxn {
             clientChain = Arrays.copyOf(chain, chain.length);
         }
     }
+
+    // For tests and NettyServerCnxnFactory only, thus package-private.
+    Channel getChannel() {
+        return channel;
+    }
 }

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/caca0627/zookeeper-server/src/main/java/org/apache/zookeeper/server/NettyServerCnxnFactory.java
----------------------------------------------------------------------
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/NettyServerCnxnFactory.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/NettyServerCnxnFactory.java
index d3abf38..99de0e6 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/NettyServerCnxnFactory.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/NettyServerCnxnFactory.java
@@ -18,41 +18,6 @@
 
 package org.apache.zookeeper.server;
 
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.common.ClientX509Util;
-import org.apache.zookeeper.common.X509Exception;
-import org.apache.zookeeper.common.X509Exception.SSLContextException;
-import org.apache.zookeeper.server.auth.ProviderRegistry;
-import org.apache.zookeeper.server.auth.X509AuthenticationProvider;
-import org.jboss.netty.bootstrap.ServerBootstrap;
-import org.jboss.netty.buffer.ChannelBuffer;
-import org.jboss.netty.buffer.ChannelBuffers;
-import org.jboss.netty.channel.Channel;
-import org.jboss.netty.channel.ChannelFuture;
-import org.jboss.netty.channel.ChannelFutureListener;
-import org.jboss.netty.channel.ChannelHandler.Sharable;
-import org.jboss.netty.channel.ChannelHandlerContext;
-import org.jboss.netty.channel.ChannelPipeline;
-import org.jboss.netty.channel.ChannelPipelineFactory;
-import org.jboss.netty.channel.ChannelStateEvent;
-import org.jboss.netty.channel.Channels;
-import org.jboss.netty.channel.ExceptionEvent;
-import org.jboss.netty.channel.MessageEvent;
-import org.jboss.netty.channel.SimpleChannelHandler;
-import org.jboss.netty.channel.WriteCompletionEvent;
-import org.jboss.netty.channel.group.ChannelGroup;
-import org.jboss.netty.channel.group.DefaultChannelGroup;
-import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
-import org.jboss.netty.handler.ssl.SslHandler;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.net.ssl.SSLContext;
-import javax.net.ssl.SSLEngine;
-import javax.net.ssl.SSLPeerUnverifiedException;
-import javax.net.ssl.SSLSession;
-import javax.net.ssl.X509KeyManager;
-import javax.net.ssl.X509TrustManager;
 import java.io.IOException;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
@@ -61,51 +26,86 @@ import java.security.NoSuchAlgorithmException;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Set;
-import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicReference;
+
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLEngine;
+import javax.net.ssl.SSLPeerUnverifiedException;
+import javax.net.ssl.SSLSession;
+import javax.net.ssl.X509KeyManager;
+import javax.net.ssl.X509TrustManager;
 
-import static org.jboss.netty.buffer.ChannelBuffers.dynamicBuffer;
+import io.netty.bootstrap.ServerBootstrap;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelDuplexHandler;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelHandler.Sharable;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.ChannelPipeline;
+import io.netty.channel.ChannelPromise;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.group.ChannelGroup;
+import io.netty.channel.group.ChannelGroupFuture;
+import io.netty.channel.group.DefaultChannelGroup;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.handler.ssl.SslHandler;
+import io.netty.util.AttributeKey;
+import io.netty.util.ReferenceCountUtil;
+import io.netty.util.concurrent.DefaultEventExecutor;
+import io.netty.util.concurrent.Future;
+import io.netty.util.concurrent.GenericFutureListener;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.common.ClientX509Util;
+import org.apache.zookeeper.common.NettyUtils;
+import org.apache.zookeeper.common.X509Exception;
+import org.apache.zookeeper.common.X509Exception.SSLContextException;
+import org.apache.zookeeper.server.auth.ProviderRegistry;
+import org.apache.zookeeper.server.auth.X509AuthenticationProvider;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class NettyServerCnxnFactory extends ServerCnxnFactory {
     private static final Logger LOG = LoggerFactory.getLogger(NettyServerCnxnFactory.class);
 
-    ServerBootstrap bootstrap;
-    Channel parentChannel;
-    ChannelGroup allChannels = new DefaultChannelGroup("zkServerCnxns");
-    Map<InetAddress, Set<NettyServerCnxn>> ipMap =
-        new HashMap<InetAddress, Set<NettyServerCnxn>>( );
-    InetSocketAddress localAddress;
-    int maxClientCnxns = 60;
-    ClientX509Util x509Util;
+    private final ServerBootstrap bootstrap;
+    private Channel parentChannel;
+    private final ChannelGroup allChannels =
+            new DefaultChannelGroup("zkServerCnxns", new DefaultEventExecutor());
+    // Access to ipMap or to any Set contained in the map needs to be
+    // protected with synchronized (ipMap) { ... }
+    private final Map<InetAddress, Set<NettyServerCnxn>> ipMap = new HashMap<>();
+    private InetSocketAddress localAddress;
+    private int maxClientCnxns = 60;
+    private final ClientX509Util x509Util;
+
+    private static final AttributeKey<NettyServerCnxn> CONNECTION_ATTRIBUTE =
+            AttributeKey.valueOf("NettyServerCnxn");
+
+    private static final AtomicReference<ByteBufAllocator> TEST_ALLOCATOR =
+            new AtomicReference<>(null);
 
     /**
-     * This is an inner class since we need to extend SimpleChannelHandler, but
+     * This is an inner class since we need to extend ChannelDuplexHandler, but
      * NettyServerCnxnFactory already extends ServerCnxnFactory. By making it inner
      * this class gets access to the member variables and methods.
      */
     @Sharable
-    class CnxnChannelHandler extends SimpleChannelHandler {
-
-        @Override
-        public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e)
-            throws Exception
-        {
-            if (LOG.isTraceEnabled()) {
-                LOG.trace("Channel closed " + e);
-            }
-            allChannels.remove(ctx.getChannel());
-        }
+    class CnxnChannelHandler extends ChannelDuplexHandler {
 
         @Override
-        public void channelConnected(ChannelHandlerContext ctx,
-                ChannelStateEvent e) throws Exception
-        {
+        public void channelActive(ChannelHandlerContext ctx) throws Exception {
             if (LOG.isTraceEnabled()) {
-                LOG.trace("Channel connected " + e);
+                LOG.trace("Channel active {}", ctx.channel());
             }
 
-            Channel channel = ctx.getChannel();
-            InetAddress addr = ((InetSocketAddress) channel.getRemoteAddress())
+            final Channel channel = ctx.channel();
+            InetAddress addr = ((InetSocketAddress) channel.remoteAddress())
                     .getAddress();
             if (maxClientCnxns > 0 && getClientCnxnCount(addr) >= maxClientCnxns) {
                 LOG.warn("Too many connections from {} - max is {}", addr,
@@ -116,170 +116,104 @@ public class NettyServerCnxnFactory extends ServerCnxnFactory {
 
             NettyServerCnxn cnxn = new NettyServerCnxn(channel,
                     zkServer, NettyServerCnxnFactory.this);
-            ctx.setAttachment(cnxn);
+            ctx.channel().attr(CONNECTION_ATTRIBUTE).set(cnxn);
 
             if (secure) {
-                SslHandler sslHandler = ctx.getPipeline().get(SslHandler.class);
-                ChannelFuture handshakeFuture = sslHandler.handshake();
+                SslHandler sslHandler = ctx.pipeline().get(SslHandler.class);
+                Future<Channel> handshakeFuture = sslHandler.handshakeFuture();
                 handshakeFuture.addListener(new CertificateVerifier(sslHandler, cnxn));
             } else {
-                allChannels.add(ctx.getChannel());
+                allChannels.add(ctx.channel());
                 addCnxn(cnxn);
             }
         }
 
         @Override
-        public void channelDisconnected(ChannelHandlerContext ctx,
-                ChannelStateEvent e) throws Exception
-        {
+        public void channelInactive(ChannelHandlerContext ctx) throws Exception {
             if (LOG.isTraceEnabled()) {
-                LOG.trace("Channel disconnected " + e);
+                LOG.trace("Channel inactive {}", ctx.channel());
             }
-            NettyServerCnxn cnxn = (NettyServerCnxn) ctx.getAttachment();
+            allChannels.remove(ctx.channel());
+            NettyServerCnxn cnxn = ctx.channel().attr(CONNECTION_ATTRIBUTE).getAndSet(null);
             if (cnxn != null) {
                 if (LOG.isTraceEnabled()) {
-                    LOG.trace("Channel disconnect caused close " + e);
+                    LOG.trace("Channel inactive caused close {}", cnxn);
                 }
                 cnxn.close();
             }
         }
 
         @Override
-        public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e)
-            throws Exception
-        {
-            LOG.warn("Exception caught " + e, e.getCause());
-            NettyServerCnxn cnxn = (NettyServerCnxn) ctx.getAttachment();
+        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
+            LOG.warn("Exception caught", cause);
+            NettyServerCnxn cnxn = ctx.channel().attr(CONNECTION_ATTRIBUTE).getAndSet(null);
             if (cnxn != null) {
                 if (LOG.isDebugEnabled()) {
-                    LOG.debug("Closing " + cnxn);
+                    LOG.debug("Closing {}", cnxn);
                 }
                 cnxn.close();
             }
         }
 
         @Override
-        public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
-            throws Exception
-        {
-            if (LOG.isTraceEnabled()) {
-                LOG.trace("message received called " + e.getMessage());
-            }
+        public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
             try {
-                if (LOG.isDebugEnabled()) {
-                    LOG.debug("New message " + e.toString()
-                            + " from " + ctx.getChannel());
-                }
-                NettyServerCnxn cnxn = (NettyServerCnxn)ctx.getAttachment();
-                synchronized(cnxn) {
-                    processMessage(e, cnxn);
+                if (evt == NettyServerCnxn.AutoReadEvent.ENABLE) {
+                    LOG.debug("Received AutoReadEvent.ENABLE");
+                    NettyServerCnxn cnxn = ctx.channel().attr(CONNECTION_ATTRIBUTE).get();
+                    // TODO(ilyam): Not sure if cnxn can be null here. It becomes null if channelInactive()
+                    // or exceptionCaught() trigger, but it's unclear to me if userEventTriggered() can run
+                    // after either of those. Check for null just to be safe ...
+                    if (cnxn != null) {
+                        cnxn.processQueuedBuffer();
+                    }
+                    ctx.channel().config().setAutoRead(true);
+                } else if (evt == NettyServerCnxn.AutoReadEvent.DISABLE) {
+                    LOG.debug("Received AutoReadEvent.DISABLE");
+                    ctx.channel().config().setAutoRead(false);
                 }
-            } catch(Exception ex) {
-                LOG.error("Unexpected exception in receive", ex);
-                throw ex;
+            } finally {
+                ReferenceCountUtil.release(evt);
             }
         }
 
-        private void processMessage(MessageEvent e, NettyServerCnxn cnxn) {
-            if (LOG.isDebugEnabled()) {
-                LOG.debug(Long.toHexString(cnxn.sessionId) + " queuedBuffer: "
-                        + cnxn.queuedBuffer);
-            }
-
-            if (e instanceof NettyServerCnxn.ResumeMessageEvent) {
-                LOG.debug("Received ResumeMessageEvent");
-                if (cnxn.queuedBuffer != null) {
-                    if (LOG.isTraceEnabled()) {
-                        LOG.trace("processing queue "
-                                + Long.toHexString(cnxn.sessionId)
-                                + " queuedBuffer 0x"
-                                + ChannelBuffers.hexDump(cnxn.queuedBuffer));
-                    }
-                    cnxn.receiveMessage(cnxn.queuedBuffer);
-                    if (!cnxn.queuedBuffer.readable()) {
-                        LOG.debug("Processed queue - no bytes remaining");
-                        cnxn.queuedBuffer = null;
-                    } else {
-                        LOG.debug("Processed queue - bytes remaining");
-                    }
-                } else {
-                    LOG.debug("queue empty");
-                }
-                cnxn.channel.setReadable(true);
-            } else {
-                ChannelBuffer buf = (ChannelBuffer)e.getMessage();
+        @Override
+        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
+            try {
                 if (LOG.isTraceEnabled()) {
-                    LOG.trace(Long.toHexString(cnxn.sessionId)
-                            + " buf 0x"
-                            + ChannelBuffers.hexDump(buf));
+                    LOG.trace("message received called {}", msg);
                 }
-                
-                if (cnxn.throttled) {
-                    LOG.debug("Received message while throttled");
-                    // we are throttled, so we need to queue
-                    if (cnxn.queuedBuffer == null) {
-                        LOG.debug("allocating queue");
-                        cnxn.queuedBuffer = dynamicBuffer(buf.readableBytes());
-                    }
-                    cnxn.queuedBuffer.writeBytes(buf);
-                    if (LOG.isTraceEnabled()) {
-                        LOG.trace(Long.toHexString(cnxn.sessionId)
-                                + " queuedBuffer 0x"
-                                + ChannelBuffers.hexDump(cnxn.queuedBuffer));
+                try {
+                    if (LOG.isDebugEnabled()) {
+                        LOG.debug("New message {} from {}", msg, ctx.channel());
                     }
-                } else {
-                    LOG.debug("not throttled");
-                    if (cnxn.queuedBuffer != null) {
-                        if (LOG.isTraceEnabled()) {
-                            LOG.trace(Long.toHexString(cnxn.sessionId)
-                                    + " queuedBuffer 0x"
-                                    + ChannelBuffers.hexDump(cnxn.queuedBuffer));
-                        }
-                        cnxn.queuedBuffer.writeBytes(buf);
-                        if (LOG.isTraceEnabled()) {
-                            LOG.trace(Long.toHexString(cnxn.sessionId)
-                                    + " queuedBuffer 0x"
-                                    + ChannelBuffers.hexDump(cnxn.queuedBuffer));
-                        }
-
-                        cnxn.receiveMessage(cnxn.queuedBuffer);
-                        if (!cnxn.queuedBuffer.readable()) {
-                            LOG.debug("Processed queue - no bytes remaining");
-                            cnxn.queuedBuffer = null;
-                        } else {
-                            LOG.debug("Processed queue - bytes remaining");
-                        }
+                    NettyServerCnxn cnxn = ctx.channel().attr(CONNECTION_ATTRIBUTE).get();
+                    if (cnxn == null) {
+                        LOG.error("channelRead() on a closed or closing NettyServerCnxn");
                     } else {
-                        cnxn.receiveMessage(buf);
-                        if (buf.readable()) {
-                            if (LOG.isTraceEnabled()) {
-                                LOG.trace("Before copy " + buf);
-                            }
-                            cnxn.queuedBuffer = dynamicBuffer(buf.readableBytes()); 
-                            cnxn.queuedBuffer.writeBytes(buf);
-                            if (LOG.isTraceEnabled()) {
-                                LOG.trace("Copy is " + cnxn.queuedBuffer);
-                                LOG.trace(Long.toHexString(cnxn.sessionId)
-                                        + " queuedBuffer 0x"
-                                        + ChannelBuffers.hexDump(cnxn.queuedBuffer));
-                            }
-                        }
+                        cnxn.processMessage((ByteBuf) msg);
                     }
+                } catch (Exception ex) {
+                    LOG.error("Unexpected exception in receive", ex);
+                    throw ex;
                 }
+            } finally {
+                ReferenceCountUtil.release(msg);
             }
         }
 
         @Override
-        public void writeComplete(ChannelHandlerContext ctx,
-                WriteCompletionEvent e) throws Exception
-        {
+        public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
             if (LOG.isTraceEnabled()) {
-                LOG.trace("write complete " + e);
+                promise.addListener((future) -> {
+                    LOG.trace("write {}",
+                            future.isSuccess() ? "complete" : "failed");
+                });
             }
+            super.write(ctx, msg, promise);
         }
 
-        private final class CertificateVerifier
-                implements ChannelFutureListener {
+        private final class CertificateVerifier implements GenericFutureListener<Future<Channel>> {
             private final SslHandler sslHandler;
             private final NettyServerCnxn cnxn;
 
@@ -291,12 +225,13 @@ public class NettyServerCnxnFactory extends ServerCnxnFactory {
             /**
              * Only allow the connection to stay open if certificate passes auth
              */
-            public void operationComplete(ChannelFuture future)
-                    throws SSLPeerUnverifiedException {
+            public void operationComplete(Future<Channel> future) throws SSLPeerUnverifiedException {
                 if (future.isSuccess()) {
-                    LOG.debug("Successful handshake with session 0x{}",
-                            Long.toHexString(cnxn.sessionId));
-                    SSLEngine eng = sslHandler.getEngine();
+                    if (LOG.isDebugEnabled()) {
+                        LOG.debug("Successful handshake with session 0x{}",
+                                Long.toHexString(cnxn.getSessionId()));
+                    }
+                    SSLEngine eng = sslHandler.engine();
                     SSLSession session = eng.getSession();
                     cnxn.setClientCertificateChain(session.getPeerCertificates());
 
@@ -316,16 +251,17 @@ public class NettyServerCnxnFactory extends ServerCnxnFactory {
                     if (KeeperException.Code.OK !=
                             authProvider.handleAuthentication(cnxn, null)) {
                         LOG.error("Authentication failed for session 0x{}",
-                                Long.toHexString(cnxn.sessionId));
+                                Long.toHexString(cnxn.getSessionId()));
                         cnxn.close();
                         return;
                     }
 
-                    allChannels.add(future.getChannel());
+                    final Channel futureChannel = future.getNow();
+                    allChannels.add(Objects.requireNonNull(futureChannel));
                     addCnxn(cnxn);
                 } else {
                     LOG.error("Unsuccessful handshake with session 0x{}",
-                            Long.toHexString(cnxn.sessionId));
+                            Long.toHexString(cnxn.getSessionId()));
                     cnxn.close();
                 }
             }
@@ -334,30 +270,42 @@ public class NettyServerCnxnFactory extends ServerCnxnFactory {
     
     CnxnChannelHandler channelHandler = new CnxnChannelHandler();
 
-    NettyServerCnxnFactory() {
-        bootstrap = new ServerBootstrap(
-                new NioServerSocketChannelFactory(
-                        Executors.newCachedThreadPool(),
-                        Executors.newCachedThreadPool()));
-        // parent channel
-        bootstrap.setOption("reuseAddress", true);
-        // child channels
-        bootstrap.setOption("child.tcpNoDelay", true);
-        /* set socket linger to off, so that socket close does not block */
-        bootstrap.setOption("child.soLinger", -1);
-        bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
-            @Override
-            public ChannelPipeline getPipeline() throws Exception {
-                ChannelPipeline p = Channels.pipeline();
-                if (secure) {
-                    initSSL(p);
-                }
-                p.addLast("servercnxnfactory", channelHandler);
+    private ServerBootstrap configureBootstrapAllocator(ServerBootstrap bootstrap) {
+        ByteBufAllocator testAllocator = TEST_ALLOCATOR.get();
+        if (testAllocator != null) {
+            return bootstrap
+                    .option(ChannelOption.ALLOCATOR, testAllocator)
+                    .childOption(ChannelOption.ALLOCATOR, testAllocator);
+        } else {
+            return bootstrap;
+        }
+    }
 
-                return p;
-            }
-        });
+    NettyServerCnxnFactory() {
         x509Util = new ClientX509Util();
+
+        EventLoopGroup bossGroup = NettyUtils.newNioOrEpollEventLoopGroup();
+        EventLoopGroup workerGroup = NettyUtils.newNioOrEpollEventLoopGroup();
+        ServerBootstrap bootstrap = new ServerBootstrap()
+                .group(bossGroup, workerGroup)
+                .channel(NettyUtils.nioOrEpollServerSocketChannel())
+                // parent channel options
+                .option(ChannelOption.SO_REUSEADDR, true)
+                // child channels options
+                .childOption(ChannelOption.TCP_NODELAY, true)
+                .childOption(ChannelOption.SO_LINGER, -1)
+                .childHandler(new ChannelInitializer<SocketChannel>() {
+                    @Override
+                    protected void initChannel(SocketChannel ch) throws Exception {
+                        ChannelPipeline pipeline = ch.pipeline();
+                        if (secure) {
+                            initSSL(pipeline);
+                        }
+                        pipeline.addLast("servercnxnfactory", channelHandler);
+                    }
+                });
+        this.bootstrap = configureBootstrapAllocator(bootstrap);
+        this.bootstrap.validate();
     }
 
     private synchronized void initSSL(ChannelPipeline p)
@@ -390,7 +338,7 @@ public class NettyServerCnxnFactory extends ServerCnxnFactory {
         sslEngine.setNeedClientAuth(true);
 
         p.addLast("ssl", new SslHandler(sslEngine));
-        LOG.info("SSL handler added for channel: {}", p.getChannel());
+        LOG.info("SSL handler added for channel: {}", p.channel());
     }
 
     @Override
@@ -440,7 +388,7 @@ public class NettyServerCnxnFactory extends ServerCnxnFactory {
         return localAddress.getPort();
     }
 
-    boolean killed;
+    private boolean killed; // use synchronized(this) to access
     @Override
     public void join() throws InterruptedException {
         synchronized(this) {
@@ -452,16 +400,42 @@ public class NettyServerCnxnFactory extends ServerCnxnFactory {
 
     @Override
     public void shutdown() {
-        LOG.info("shutdown called " + localAddress);
+        synchronized (this) {
+            if (killed) {
+                LOG.info("already shutdown {}", localAddress);
+                return;
+            }
+        }
+        LOG.info("shutdown called {}", localAddress);
+
         if (login != null) {
             login.shutdown();
         }
+
+        final EventLoopGroup bossGroup = bootstrap.config().group();
+        final EventLoopGroup workerGroup = bootstrap.config().childGroup();
         // null if factory never started
         if (parentChannel != null) {
-            parentChannel.close().awaitUninterruptibly();
+            ChannelFuture parentCloseFuture = parentChannel.close();
+            if (bossGroup != null) {
+                parentCloseFuture.addListener(future -> {
+                    bossGroup.shutdownGracefully();
+                });
+            }
             closeAll();
-            allChannels.close().awaitUninterruptibly();
-            bootstrap.releaseExternalResources();
+            ChannelGroupFuture allChannelsCloseFuture = allChannels.close();
+            if (workerGroup != null) {
+                allChannelsCloseFuture.addListener(future -> {
+                    workerGroup.shutdownGracefully();
+                });
+            }
+        } else {
+            if (bossGroup != null) {
+                bossGroup.shutdownGracefully();
+            }
+            if (workerGroup != null) {
+                workerGroup.shutdownGracefully();
+            }
         }
 
         if (zkServer != null) {
@@ -475,16 +449,23 @@ public class NettyServerCnxnFactory extends ServerCnxnFactory {
     
     @Override
     public void start() {
-        LOG.info("binding to port " + localAddress);
-        parentChannel = bootstrap.bind(localAddress);
+        LOG.info("binding to port {}", localAddress);
+        parentChannel = bootstrap.bind(localAddress).syncUninterruptibly().channel();
+        // Port changes after bind() if the original port was 0, update
+        // localAddress to get the real port.
+        localAddress = (InetSocketAddress) parentChannel.localAddress();
+        LOG.info("bound to port " + getLocalPort());
     }
     
     public void reconfigure(InetSocketAddress addr) {
        Channel oldChannel = parentChannel;
        try {
            LOG.info("binding to port {}", addr);
-           parentChannel = bootstrap.bind(addr);
-           localAddress = addr;
+           parentChannel = bootstrap.bind(addr).syncUninterruptibly().channel();
+           // Port changes after bind() if the original port was 0, update
+           // localAddress to get the real port.
+           localAddress = (InetSocketAddress) parentChannel.localAddress();
+           LOG.info("bound to port " + getLocalPort());
        } catch (Exception e) {
            LOG.error("Error while reconfiguring", e);
        } finally {
@@ -517,21 +498,39 @@ public class NettyServerCnxnFactory extends ServerCnxnFactory {
         cnxns.add(cnxn);
         synchronized (ipMap){
             InetAddress addr =
-                ((InetSocketAddress)cnxn.channel.getRemoteAddress())
-                    .getAddress();
+                ((InetSocketAddress)cnxn.getChannel().remoteAddress()).getAddress();
             Set<NettyServerCnxn> s = ipMap.get(addr);
             if (s == null) {
-                s = new HashSet<NettyServerCnxn>();
+                s = new HashSet<>();
+                ipMap.put(addr, s);
             }
             s.add(cnxn);
-            ipMap.put(addr,s);
+        }
+    }
+
+    void removeCnxnFromIpMap(NettyServerCnxn cnxn, InetAddress remoteAddress) {
+        synchronized (ipMap) {
+            Set<NettyServerCnxn> s = ipMap.get(remoteAddress);
+            if (s != null) {
+                s.remove(cnxn);
+                if (s.isEmpty()) {
+                    ipMap.remove(remoteAddress);
+                }
+            } else {
+                LOG.error(
+                        "Unexpected null set for remote address {} when removing cnxn {}",
+                        remoteAddress,
+                        cnxn);
+            }
         }
     }
 
     private int getClientCnxnCount(InetAddress addr) {
-        Set<NettyServerCnxn> s = ipMap.get(addr);
-        if (s == null) return 0;
-        return s.size();
+        synchronized (ipMap) {
+            Set<NettyServerCnxn> s = ipMap.get(addr);
+            if (s == null) return 0;
+            return s.size();
+        }
     }
 
     @Override
@@ -552,4 +551,23 @@ public class NettyServerCnxnFactory extends ServerCnxnFactory {
         return info;
     }
 
+    /**
+     * Sets the test ByteBufAllocator. This allocator will be used by all
+     * future instances of this class.
+     * It is not recommended to use this method outside of testing.
+     * @param allocator the ByteBufAllocator to use for all netty buffer
+     *                  allocations.
+     */
+    static void setTestAllocator(ByteBufAllocator allocator) {
+        TEST_ALLOCATOR.set(allocator);
+    }
+
+    /**
+     * Clears the test ByteBufAllocator. The default allocator will be used
+     * by all future instances of this class.
+     * It is not recommended to use this method outside of testing.
+     */
+    static void clearTestAllocator() {
+        TEST_ALLOCATOR.set(null);
+    }
 }

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/caca0627/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/UnifiedServerSocket.java
----------------------------------------------------------------------
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/UnifiedServerSocket.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/UnifiedServerSocket.java
index 4802ecf..d1e3ba5 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/UnifiedServerSocket.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/UnifiedServerSocket.java
@@ -18,10 +18,10 @@
 
 package org.apache.zookeeper.server.quorum;
 
+import io.netty.buffer.Unpooled;
+import io.netty.handler.ssl.SslHandler;
 import org.apache.zookeeper.common.X509Exception;
 import org.apache.zookeeper.common.X509Util;
-import org.jboss.netty.buffer.ChannelBuffers;
-import org.jboss.netty.handler.ssl.SslHandler;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -61,7 +61,7 @@ public class UnifiedServerSocket extends ServerSocket {
         int bytesRead = prependableSocket.getInputStream().read(litmus, 0, 5);
         prependableSocket.prependToInputStream(litmus);
 
-        if (bytesRead == 5 && SslHandler.isEncrypted(ChannelBuffers.wrappedBuffer(litmus))) {
+        if (bytesRead == 5 && SslHandler.isEncrypted(Unpooled.wrappedBuffer(litmus))) {
             LOG.info(getInetAddress() + " attempting to connect over ssl");
             SSLSocket sslSocket;
             try {

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/caca0627/zookeeper-server/src/test/java/org/apache/zookeeper/ClientCnxnSocketTest.java
----------------------------------------------------------------------
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/ClientCnxnSocketTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/ClientCnxnSocketTest.java
index 054e1ed..0550bcf 100644
--- a/zookeeper-server/src/test/java/org/apache/zookeeper/ClientCnxnSocketTest.java
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/ClientCnxnSocketTest.java
@@ -23,10 +23,23 @@ import static org.junit.Assert.fail;
 import java.io.IOException;
 
 import org.apache.zookeeper.client.ZKClientConfig;
+import org.apache.zookeeper.test.TestByteBufAllocator;
 import org.apache.zookeeper.common.ZKConfig;
+import org.junit.After;
+import org.junit.Before;
 import org.junit.Test;
 
 public class ClientCnxnSocketTest {
+    @Before
+    public void setUp() {
+        ClientCnxnSocketNetty.setTestAllocator(TestByteBufAllocator.getInstance());
+    }
+
+    @After
+    public void tearDown() {
+        ClientCnxnSocketNetty.clearTestAllocator();
+        TestByteBufAllocator.checkForLeaks();
+    }
 
     @Test
     public void testWhenInvalidJuteMaxBufferIsConfiguredIOExceptionIsThrown() {