You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by fp...@apache.org on 2014/12/20 15:29:08 UTC

svn commit: r1646985 - in /zookeeper/trunk: ./ src/java/main/org/apache/zookeeper/ src/java/main/org/apache/zookeeper/client/ src/java/test/org/apache/zookeeper/test/

Author: fpj
Date: Sat Dec 20 14:29:07 2014
New Revision: 1646985

URL: http://svn.apache.org/r1646985
Log:
ZOOKEEPER-2069 Netty Support for ClientCnxnSocket (Hongchao via fpj)


Added:
    zookeeper/trunk/src/java/main/org/apache/zookeeper/ClientCnxnSocketNetty.java
    zookeeper/trunk/src/java/test/org/apache/zookeeper/test/NettyNettySuiteBase.java
    zookeeper/trunk/src/java/test/org/apache/zookeeper/test/NettyNettySuiteHammerTest.java
    zookeeper/trunk/src/java/test/org/apache/zookeeper/test/NettyNettySuiteTest.java
Modified:
    zookeeper/trunk/CHANGES.txt
    zookeeper/trunk/src/java/main/org/apache/zookeeper/ClientCnxn.java
    zookeeper/trunk/src/java/main/org/apache/zookeeper/ClientCnxnSocket.java
    zookeeper/trunk/src/java/main/org/apache/zookeeper/ClientCnxnSocketNIO.java
    zookeeper/trunk/src/java/main/org/apache/zookeeper/ZooKeeperTestable.java
    zookeeper/trunk/src/java/main/org/apache/zookeeper/client/ZooKeeperSaslClient.java
    zookeeper/trunk/src/java/test/org/apache/zookeeper/test/ClientTest.java

Modified: zookeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/zookeeper/trunk/CHANGES.txt?rev=1646985&r1=1646984&r2=1646985&view=diff
==============================================================================
--- zookeeper/trunk/CHANGES.txt (original)
+++ zookeeper/trunk/CHANGES.txt Sat Dec 20 14:29:07 2014
@@ -44,6 +44,8 @@ IMPROVEMENTS:
   ZOOKEEPER-1963 Make JDK 7 the minimum requirement for Zookeeper 
   (Hongchao via fpj)
 
+  ZOOKEEPER-2069 Netty Support for ClientCnxnSocket (Hongchao via fpj)
+
 Release 3.5.0 - 8/4/2014
 
 NEW FEATURES:

Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/ClientCnxn.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/ClientCnxn.java?rev=1646985&r1=1646984&r2=1646985&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/ClientCnxn.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/ClientCnxn.java Sat Dec 20 14:29:07 2014
@@ -22,13 +22,13 @@ import java.io.BufferedReader;
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.io.InputStreamReader;
-import java.lang.Thread.UncaughtExceptionHandler;
 import java.net.ConnectException;
 import java.net.InetSocketAddress;
 import java.net.Socket;
 import java.net.SocketAddress;
 import java.nio.ByteBuffer;
 import java.util.HashSet;
+import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
@@ -36,6 +36,7 @@ import java.util.Random;
 import java.util.Set;
 import java.util.Map.Entry;
 import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.LinkedBlockingDeque;
 import java.util.concurrent.LinkedBlockingQueue;
 
 import javax.security.auth.login.LoginException;
@@ -134,7 +135,7 @@ public class ClientCnxn {
     /**
      * These are the packets that need to be sent.
      */
-    private final LinkedList<Packet> outgoingQueue = new LinkedList<Packet>();
+    private final LinkedBlockingDeque<Packet> outgoingQueue = new LinkedBlockingDeque<Packet>();
 
     private int connectTimeout;
 
@@ -883,7 +884,7 @@ public class ClientCnxn {
             // If SASL authentication is currently in progress, construct and
             // send a response packet immediately, rather than queuing a
             // response as with other packets.
-            if (clientTunneledAuthenticationInProgress()) {
+            if (tunnelAuthInProgress()) {
                 GetSASLRequest request = new GetSASLRequest();
                 request.deserialize(bbia,"token");
                 zooKeeperSaslClient.respondToServer(request.getToken(),
@@ -959,6 +960,9 @@ public class ClientCnxn {
             return clientCnxnSocket;
         }
 
+        /**
+         * Setup session, previous watches, authentication.
+         */
         void primeConnection() throws IOException {
             LOG.info("Socket connection established to "
                      + clientCnxnSocket.getRemoteSocketAddress()
@@ -967,38 +971,36 @@ public class ClientCnxn {
             long sessId = (seenRwServerBefore) ? sessionId : 0;
             ConnectRequest conReq = new ConnectRequest(0, lastZxid,
                     sessionTimeout, sessId, sessionPasswd);
-            synchronized (outgoingQueue) {
-                // We add backwards since we are pushing into the front
-                // Only send if there's a pending watch
-                // TODO: here we have the only remaining use of zooKeeper in
-                // this class. It's to be eliminated!
-                if (!disableAutoWatchReset) {
-                    List<String> dataWatches = zooKeeper.getDataWatches();
-                    List<String> existWatches = zooKeeper.getExistWatches();
-                    List<String> childWatches = zooKeeper.getChildWatches();
-                    if (!dataWatches.isEmpty()
-                                || !existWatches.isEmpty() || !childWatches.isEmpty()) {
-                        SetWatches sw = new SetWatches(lastZxid,
-                                prependChroot(dataWatches),
-                                prependChroot(existWatches),
-                                prependChroot(childWatches));
-                        RequestHeader h = new RequestHeader();
-                        h.setType(ZooDefs.OpCode.setWatches);
-                        h.setXid(-8);
-                        Packet packet = new Packet(h, new ReplyHeader(), sw, null, null);
-                        outgoingQueue.addFirst(packet);
-                    }
+            // We add backwards since we are pushing into the front
+            // Only send if there's a pending watch
+            // TODO: here we have the only remaining use of zooKeeper in
+            // this class. It's to be eliminated!
+            if (!disableAutoWatchReset) {
+                List<String> dataWatches = zooKeeper.getDataWatches();
+                List<String> existWatches = zooKeeper.getExistWatches();
+                List<String> childWatches = zooKeeper.getChildWatches();
+                if (!dataWatches.isEmpty()
+                        || !existWatches.isEmpty() || !childWatches.isEmpty()) {
+                    SetWatches sw = new SetWatches(lastZxid,
+                            prependChroot(dataWatches),
+                            prependChroot(existWatches),
+                            prependChroot(childWatches));
+                    RequestHeader h = new RequestHeader();
+                    h.setType(ZooDefs.OpCode.setWatches);
+                    h.setXid(-8);
+                    Packet packet = new Packet(h, new ReplyHeader(), sw, null, null);
+                    outgoingQueue.addFirst(packet);
                 }
-
-                for (AuthData id : authInfo) {
-                    outgoingQueue.addFirst(new Packet(new RequestHeader(-4,
-                            OpCode.auth), null, new AuthPacket(0, id.scheme,
-                            id.data), null, null));
-                }
-                outgoingQueue.addFirst(new Packet(null, null, conReq,
-                            null, null, readOnly));
             }
-            clientCnxnSocket.enableReadWriteOnly();
+
+            for (AuthData id : authInfo) {
+                outgoingQueue.addFirst(new Packet(new RequestHeader(-4,
+                        OpCode.auth), null, new AuthPacket(0, id.scheme,
+                        id.data), null, null));
+            }
+            outgoingQueue.addFirst(new Packet(null, null, conReq,
+                    null, null, readOnly));
+            clientCnxnSocket.connectionPrimed();
             if (LOG.isDebugEnabled()) {
                 LOG.debug("Session establishment request sent on "
                         + clientCnxnSocket.getRemoteSocketAddress());
@@ -1095,10 +1097,9 @@ public class ClientCnxn {
 
         private static final String RETRY_CONN_MSG =
             ", closing socket connection and attempting reconnect";
-        
         @Override
         public void run() {
-            clientCnxnSocket.introduce(this,sessionId);
+            clientCnxnSocket.introduce(this, sessionId, outgoingQueue);
             clientCnxnSocket.updateNow();
             clientCnxnSocket.updateLastSendAndHeard();
             int to;
@@ -1189,7 +1190,7 @@ public class ClientCnxn {
                         to = Math.min(to, pingRwTimeout - idlePingRwServer);
                     }
 
-                    clientCnxnSocket.doTransport(to, pendingQueue, outgoingQueue, ClientCnxn.this);
+                    clientCnxnSocket.doTransport(to, pendingQueue, ClientCnxn.this);
                 } catch (Throwable e) {
                     if (closing) {
                         if (LOG.isDebugEnabled()) {
@@ -1218,6 +1219,8 @@ public class ClientCnxn {
                                             + ", unexpected error"
                                             + RETRY_CONN_MSG, e);
                         }
+                        // At this point, there might still be new packets appended to outgoingQueue.
+                        // they will be handled in next connection or cleared up if closed.
                         cleanup();
                         if (state.isAlive()) {
                             eventThread.queueEvent(new WatchedEvent(
@@ -1230,6 +1233,8 @@ public class ClientCnxn {
                     }
                 }
             }
+            // When it comes to this point, it guarantees that later queued packet to outgoingQueue will be
+            // notified of death.
             cleanup();
             clientCnxnSocket.close();
             if (state.isAlive()) {
@@ -1300,11 +1305,14 @@ public class ClientCnxn {
                 }
                 pendingQueue.clear();
             }
-            synchronized (outgoingQueue) {
-                for (Packet p : outgoingQueue) {
-                    conLossPacket(p);
-                }
-                outgoingQueue.clear();
+            // We can't call outgoingQueue.clear() here because
+            // between iterating and clear up there might be new
+            // packets added in queuePacket().
+            Iterator<Packet> iter = outgoingQueue.iterator();
+            while (iter.hasNext()) {
+                Packet p = iter.next();
+                conLossPacket(p);
+                iter.remove();
             }
         }
 
@@ -1357,14 +1365,14 @@ public class ClientCnxn {
 
         void close() {
             state = States.CLOSED;
-            clientCnxnSocket.wakeupCnxn();
+            clientCnxnSocket.onClosing();
         }
 
         void testableCloseSocket() throws IOException {
             clientCnxnSocket.testableCloseSocket();
         }
 
-        public boolean clientTunneledAuthenticationInProgress() {
+        public boolean tunnelAuthInProgress() {
             // 1. SASL client is disabled.
             if (!ZooKeeperSaslClient.isEnabled()) {
                 return false;
@@ -1464,8 +1472,8 @@ public class ClientCnxn {
         return r;
     }
 
-    public void enableWrite() {
-        sendThread.getClientCnxnSocket().enableWrite();
+    public void saslCompleted() {
+        sendThread.getClientCnxnSocket().saslCompleted();
     }
 
     public void sendPacket(Record request, Record response, AsyncCallback cb, int opCode)
@@ -1501,25 +1509,23 @@ public class ClientCnxn {
         // Note that we do not generate the Xid for the packet yet. It is
         // generated later at send-time, by an implementation of ClientCnxnSocket::doIO(),
         // where the packet is actually sent.
-        synchronized (outgoingQueue) {
-            packet = new Packet(h, r, request, response, watchRegistration);
-            packet.cb = cb;
-            packet.ctx = ctx;
-            packet.clientPath = clientPath;
-            packet.serverPath = serverPath;
-            packet.watchDeregistration = watchDeregistration;
-            if (!state.isAlive() || closing) {
-                conLossPacket(packet);
-            } else {
-                // If the client is asking to close the session then
-                // mark as closing
-                if (h.getType() == OpCode.closeSession) {
-                    closing = true;
-                }
-                outgoingQueue.add(packet);
+        packet = new Packet(h, r, request, response, watchRegistration);
+        packet.cb = cb;
+        packet.ctx = ctx;
+        packet.clientPath = clientPath;
+        packet.serverPath = serverPath;
+        packet.watchDeregistration = watchDeregistration;
+        if (!state.isAlive() || closing) {
+            conLossPacket(packet);
+        } else {
+            // If the client is asking to close the session then
+            // mark as closing
+            if (h.getType() == OpCode.closeSession) {
+                closing = true;
             }
+            outgoingQueue.add(packet);
         }
-        sendThread.getClientCnxnSocket().wakeupCnxn();
+        sendThread.getClientCnxnSocket().packetAdded();
         return packet;
     }
 

Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/ClientCnxnSocket.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/ClientCnxnSocket.java?rev=1646985&r1=1646984&r2=1646985&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/ClientCnxnSocket.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/ClientCnxnSocket.java Sat Dec 20 14:29:07 2014
@@ -22,8 +22,8 @@ import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.net.SocketAddress;
 import java.nio.ByteBuffer;
-import java.util.LinkedList;
 import java.util.List;
+import java.util.concurrent.LinkedBlockingDeque;
 
 import org.apache.jute.BinaryInputArchive;
 import org.apache.zookeeper.ClientCnxn.Packet;
@@ -61,6 +61,7 @@ abstract class ClientCnxnSocket {
     protected long lastSend;
     protected long now;
     protected ClientCnxn.SendThread sendThread;
+    protected LinkedBlockingDeque<Packet> outgoingQueue;
 
     /**
      * The sessionId is only available here for Log and Exception messages.
@@ -68,9 +69,11 @@ abstract class ClientCnxnSocket {
      */
     protected long sessionId;
 
-    void introduce(ClientCnxn.SendThread sendThread, long sessionId) {
+    void introduce(ClientCnxn.SendThread sendThread, long sessionId,
+                   LinkedBlockingDeque<Packet> outgoingQueue) {
         this.sendThread = sendThread;
         this.sessionId = sessionId;
+        this.outgoingQueue = outgoingQueue;
     }
 
     void updateNow() {
@@ -148,27 +151,75 @@ abstract class ClientCnxnSocket {
 
     abstract void connect(InetSocketAddress addr) throws IOException;
 
+    /**
+     * Returns the address to which the socket is connected.
+     */
     abstract SocketAddress getRemoteSocketAddress();
 
+    /**
+     * Returns the address to which the socket is bound.
+     */
     abstract SocketAddress getLocalSocketAddress();
 
+    /**
+     * Clean up resources for a fresh new socket.
+     * It's called before reconnect or close.
+     */
     abstract void cleanup();
 
-    abstract void close();
-
-    abstract void wakeupCnxn();
+    /**
+     * new packets are added to outgoingQueue.
+     */
+    abstract void packetAdded();
 
-    abstract void enableWrite();
+    /**
+     * connState is marked CLOSED and notify ClientCnxnSocket to react.
+     */
+    abstract void onClosing();
 
-    abstract void disableWrite();
+    /**
+     * Sasl completes. Allows non-priming packgets to be sent.
+     * Note that this method will only be called if Sasl starts and completes.
+     */
+    abstract void saslCompleted();
 
-    abstract void enableReadWriteOnly();
+    /**
+     * being called after ClientCnxn finish PrimeConnection
+     */
+    abstract void connectionPrimed();
 
+    /**
+     * Do transportation work:
+     * - read packets into incomingBuffer.
+     * - write outgoing queue packets.
+     * - update relevant timestamp.
+     *
+     * @param waitTimeOut timeout in blocking wait. Unit in MilliSecond.
+     * @param pendingQueue These are the packets that have been sent and
+     *                     are waiting for a response.
+     * @param cnxn
+     * @throws IOException
+     * @throws InterruptedException
+     */
     abstract void doTransport(int waitTimeOut, List<Packet> pendingQueue,
-            LinkedList<Packet> outgoingQueue, ClientCnxn cnxn)
+            ClientCnxn cnxn)
             throws IOException, InterruptedException;
 
+    /**
+     * Close the socket.
+     */
     abstract void testableCloseSocket() throws IOException;
 
+    /**
+     * Close this client.
+     */
+    abstract void close();
+
+    /**
+     * Send Sasl packets directly.
+     * The Sasl process will send the first (requestHeader == null) packet,
+     * and then block the doTransport write,
+     * finally unblock it when finished.
+     */
     abstract void sendPacket(Packet p) throws IOException;
 }

Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/ClientCnxnSocketNIO.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/ClientCnxnSocketNIO.java?rev=1646985&r1=1646984&r2=1646985&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/ClientCnxnSocketNIO.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/ClientCnxnSocketNIO.java Sat Dec 20 14:29:07 2014
@@ -26,10 +26,10 @@ import java.nio.ByteBuffer;
 import java.nio.channels.SelectionKey;
 import java.nio.channels.Selector;
 import java.nio.channels.SocketChannel;
-import java.util.LinkedList;
+import java.util.Iterator;
 import java.util.List;
-import java.util.ListIterator;
 import java.util.Set;
+import java.util.concurrent.LinkedBlockingDeque;
 
 import org.apache.zookeeper.ClientCnxn.EndOfStreamException;
 import org.apache.zookeeper.ClientCnxn.Packet;
@@ -63,7 +63,7 @@ public class ClientCnxnSocketNIO extends
      * @throws InterruptedException
      * @throws IOException
      */
-    void doIO(List<Packet> pendingQueue, LinkedList<Packet> outgoingQueue, ClientCnxn cnxn)
+    void doIO(List<Packet> pendingQueue, ClientCnxn cnxn)
       throws InterruptedException, IOException {
         SocketChannel sock = (SocketChannel) sockKey.channel();
         if (sock == null) {
@@ -86,7 +86,7 @@ public class ClientCnxnSocketNIO extends
                     readConnectResult();
                     enableRead();
                     if (findSendablePacket(outgoingQueue,
-                            cnxn.sendThread.clientTunneledAuthenticationInProgress()) != null) {
+                            sendThread.tunnelAuthInProgress()) != null) {
                         // Since SASL authentication has completed (if client is configured to do so),
                         // outgoing packets waiting in the outgoingQueue can now be sent.
                         enableWrite();
@@ -104,96 +104,87 @@ public class ClientCnxnSocketNIO extends
             }
         }
         if (sockKey.isWritable()) {
-            synchronized(outgoingQueue) {
-                Packet p = findSendablePacket(outgoingQueue,
-                        cnxn.sendThread.clientTunneledAuthenticationInProgress());
-
-                if (p != null) {
-                    updateLastSend();
-                    // If we already started writing p, p.bb will already exist
-                    if (p.bb == null) {
-                        if ((p.requestHeader != null) &&
-                                (p.requestHeader.getType() != OpCode.ping) &&
-                                (p.requestHeader.getType() != OpCode.auth)) {
-                            p.requestHeader.setXid(cnxn.getXid());
-                        }
-                        p.createBB();
+            Packet p = findSendablePacket(outgoingQueue,
+                    sendThread.tunnelAuthInProgress());
+
+            if (p != null) {
+                updateLastSend();
+                // If we already started writing p, p.bb will already exist
+                if (p.bb == null) {
+                    if ((p.requestHeader != null) &&
+                            (p.requestHeader.getType() != OpCode.ping) &&
+                            (p.requestHeader.getType() != OpCode.auth)) {
+                        p.requestHeader.setXid(cnxn.getXid());
                     }
-                    sock.write(p.bb);
-                    if (!p.bb.hasRemaining()) {
-                        sentCount++;
-                        outgoingQueue.removeFirstOccurrence(p);
-                        if (p.requestHeader != null
-                                && p.requestHeader.getType() != OpCode.ping
-                                && p.requestHeader.getType() != OpCode.auth) {
-                            synchronized (pendingQueue) {
-                                pendingQueue.add(p);
-                            }
+                    p.createBB();
+                }
+                sock.write(p.bb);
+                if (!p.bb.hasRemaining()) {
+                    sentCount++;
+                    outgoingQueue.removeFirstOccurrence(p);
+                    if (p.requestHeader != null
+                            && p.requestHeader.getType() != OpCode.ping
+                            && p.requestHeader.getType() != OpCode.auth) {
+                        synchronized (pendingQueue) {
+                            pendingQueue.add(p);
                         }
                     }
                 }
-                if (outgoingQueue.isEmpty()) {
-                    // No more packets to send: turn off write interest flag.
-                    // Will be turned on later by a later call to enableWrite(),
-                    // from within ZooKeeperSaslClient (if client is configured
-                    // to attempt SASL authentication), or in either doIO() or
-                    // in doTransport() if not.
-                    disableWrite();
-                } else if (!initialized && p != null && !p.bb.hasRemaining()) {
-                    // On initial connection, write the complete connect request
-                    // packet, but then disable further writes until after
-                    // receiving a successful connection response.  If the
-                    // session is expired, then the server sends the expiration
-                    // response and immediately closes its end of the socket.  If
-                    // the client is simultaneously writing on its end, then the
-                    // TCP stack may choose to abort with RST, in which case the
-                    // client would never receive the session expired event.  See
-                    // http://docs.oracle.com/javase/6/docs/technotes/guides/net/articles/connection_release.html
-                    disableWrite();
-                } else {
-                    // Just in case
-                    enableWrite();
-                }
+            }
+            if (outgoingQueue.isEmpty()) {
+                // No more packets to send: turn off write interest flag.
+                // Will be turned on later by a later call to enableWrite(),
+                // from within ZooKeeperSaslClient (if client is configured
+                // to attempt SASL authentication), or in either doIO() or
+                // in doTransport() if not.
+                disableWrite();
+            } else if (!initialized && p != null && !p.bb.hasRemaining()) {
+                // On initial connection, write the complete connect request
+                // packet, but then disable further writes until after
+                // receiving a successful connection response.  If the
+                // session is expired, then the server sends the expiration
+                // response and immediately closes its end of the socket.  If
+                // the client is simultaneously writing on its end, then the
+                // TCP stack may choose to abort with RST, in which case the
+                // client would never receive the session expired event.  See
+                // http://docs.oracle.com/javase/6/docs/technotes/guides/net/articles/connection_release.html
+                disableWrite();
+            } else {
+                // Just in case
+                enableWrite();
             }
         }
     }
 
-    private Packet findSendablePacket(LinkedList<Packet> outgoingQueue,
-                                      boolean clientTunneledAuthenticationInProgress) {
-        synchronized (outgoingQueue) {
-            if (outgoingQueue.isEmpty()) {
-                return null;
-            }
-            if (outgoingQueue.getFirst().bb != null // If we've already starting sending the first packet, we better finish
-                || !clientTunneledAuthenticationInProgress) {
-                return outgoingQueue.getFirst();
-            }
-
-            // Since client's authentication with server is in progress,
-            // send only the null-header packet queued by primeConnection().
-            // This packet must be sent so that the SASL authentication process
-            // can proceed, but all other packets should wait until
-            // SASL authentication completes.
-            ListIterator<Packet> iter = outgoingQueue.listIterator();
-            while (iter.hasNext()) {
-                Packet p = iter.next();
-                if (p.requestHeader == null) {
-                    // We've found the priming-packet. Move it to the beginning of the queue.
-                    iter.remove();
-                    outgoingQueue.add(0, p);
-                    return p;
-                } else {
-                    // Non-priming packet: defer it until later, leaving it in the queue
-                    // until authentication completes.
-                    if (LOG.isDebugEnabled()) {
-                        LOG.debug("deferring non-priming packet: " + p +
-                                "until SASL authentication completes.");
-                    }
-                }
-            }
-            // no sendable packet found.
+    private Packet findSendablePacket(LinkedBlockingDeque<Packet> outgoingQueue,
+                                      boolean tunneledAuthInProgres) {
+        if (outgoingQueue.isEmpty()) {
             return null;
         }
+        // If we've already starting sending the first packet, we better finish
+        if (outgoingQueue.getFirst().bb != null || !tunneledAuthInProgres) {
+            return outgoingQueue.getFirst();
+        }
+        // Since client's authentication with server is in progress,
+        // send only the null-header packet queued by primeConnection().
+        // This packet must be sent so that the SASL authentication process
+        // can proceed, but all other packets should wait until
+        // SASL authentication completes.
+        Iterator<Packet> iter = outgoingQueue.iterator();
+        while (iter.hasNext()) {
+            Packet p = iter.next();
+            if (p.requestHeader == null) {
+                // We've found the priming-packet. Move it to the beginning of the queue.
+                iter.remove();
+                outgoingQueue.addFirst(p);
+                return p;
+            } else {
+                // Non-priming packet: defer it until later, leaving it in the queue
+                // until authentication completes.
+                LOG.debug("deferring non-priming packet {} until SASL authentation completes.", p);
+            }
+        }
+        return null;
     }
 
     @Override
@@ -333,13 +324,21 @@ public class ClientCnxnSocketNIO extends
     }
 
     @Override
-    synchronized void wakeupCnxn() {
+    void packetAdded() {
+        wakeupCnxn();
+    }
+
+    @Override
+    void onClosing() {
+        wakeupCnxn();
+    }
+
+    private synchronized void wakeupCnxn() {
         selector.wakeup();
     }
     
     @Override
-    void doTransport(int waitTimeOut, List<Packet> pendingQueue, LinkedList<Packet> outgoingQueue,
-                     ClientCnxn cnxn)
+    void doTransport(int waitTimeOut, List<Packet> pendingQueue, ClientCnxn cnxn)
             throws IOException, InterruptedException {
         selector.select(waitTimeOut);
         Set<SelectionKey> selected;
@@ -359,15 +358,13 @@ public class ClientCnxnSocketNIO extends
                     sendThread.primeConnection();
                 }
             } else if ((k.readyOps() & (SelectionKey.OP_READ | SelectionKey.OP_WRITE)) != 0) {
-                doIO(pendingQueue, outgoingQueue, cnxn);
+                doIO(pendingQueue, cnxn);
             }
         }
         if (sendThread.getZkState().isConnected()) {
-            synchronized(outgoingQueue) {
-                if (findSendablePacket(outgoingQueue,
-                        cnxn.sendThread.clientTunneledAuthenticationInProgress()) != null) {
-                    enableWrite();
-                }
+            if (findSendablePacket(outgoingQueue,
+                    sendThread.tunnelAuthInProgress()) != null) {
+                enableWrite();
             }
         }
         selected.clear();
@@ -386,6 +383,10 @@ public class ClientCnxnSocketNIO extends
     }
 
     @Override
+    void saslCompleted() {
+        enableWrite();
+    }
+
     synchronized void enableWrite() {
         int i = sockKey.interestOps();
         if ((i & SelectionKey.OP_WRITE) == 0) {
@@ -393,8 +394,7 @@ public class ClientCnxnSocketNIO extends
         }
     }
 
-    @Override
-    public synchronized void disableWrite() {
+    private synchronized void disableWrite() {
         int i = sockKey.interestOps();
         if ((i & SelectionKey.OP_WRITE) != 0) {
             sockKey.interestOps(i & (~SelectionKey.OP_WRITE));
@@ -409,7 +409,7 @@ public class ClientCnxnSocketNIO extends
     }
 
     @Override
-    synchronized void enableReadWriteOnly() {
+    void connectionPrimed() {
         sockKey.interestOps(SelectionKey.OP_READ | SelectionKey.OP_WRITE);
     }
 
@@ -427,6 +427,4 @@ public class ClientCnxnSocketNIO extends
         ByteBuffer pbb = p.bb;
         sock.write(pbb);
     }
-
-
 }

Added: zookeeper/trunk/src/java/main/org/apache/zookeeper/ClientCnxnSocketNetty.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/ClientCnxnSocketNetty.java?rev=1646985&view=auto
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/ClientCnxnSocketNetty.java (added)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/ClientCnxnSocketNetty.java Sat Dec 20 14:29:07 2014
@@ -0,0 +1,416 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper;
+
+import org.apache.zookeeper.ClientCnxn.EndOfStreamException;
+import org.apache.zookeeper.ClientCnxn.Packet;
+import org.jboss.netty.bootstrap.ClientBootstrap;
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.buffer.ChannelBuffers;
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelFactory;
+import org.jboss.netty.channel.ChannelFuture;
+import org.jboss.netty.channel.ChannelFutureListener;
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.channel.ChannelPipeline;
+import org.jboss.netty.channel.ChannelPipelineFactory;
+import org.jboss.netty.channel.ChannelStateEvent;
+import org.jboss.netty.channel.Channels;
+import org.jboss.netty.channel.ExceptionEvent;
+import org.jboss.netty.channel.MessageEvent;
+import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
+import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * ClientCnxnSocketNetty implements ClientCnxnSocket abstract methods.
+ * It's responsible for connecting to server, reading/writing network traffic and
+ * being a layer between network data and higher level packets.
+ */
+public class ClientCnxnSocketNetty extends ClientCnxnSocket {
+    private static final Logger LOG = LoggerFactory.getLogger(ClientCnxnSocketNetty.class);
+
+    ChannelFactory channelFactory = new NioClientSocketChannelFactory(
+            Executors.newCachedThreadPool(), Executors.newCachedThreadPool());
+    Channel channel;
+    CountDownLatch firstConnect;
+    ChannelFuture connectFuture;
+    Lock connectLock = new ReentrantLock();
+    AtomicBoolean disconnected = new AtomicBoolean();
+    AtomicBoolean needSasl = new AtomicBoolean();
+    Semaphore waitSasl = new Semaphore(0);
+
+    /**
+     * lifecycles diagram:
+     * <p/>
+     * loop:
+     * - try:
+     * - - !isConnected()
+     * - - - connect()
+     * - - doTransport()
+     * - catch:
+     * - - cleanup()
+     * close()
+     * <p/>
+     * Other none lifecycle methods are in jeopardy getting a null channel
+     * when calling in concurrency. We must handle it.
+     */
+
+    @Override
+    boolean isConnected() {
+        // Assuming that isConnected() is only used to initiate connection,
+        // not used by some other connection status judgement.
+        return channel != null;
+    }
+
+    @Override
+    void connect(InetSocketAddress addr) throws IOException {
+        firstConnect = new CountDownLatch(1);
+
+        ClientBootstrap bootstrap = new ClientBootstrap(channelFactory);
+
+        bootstrap.setPipelineFactory(new ZKClientPipelineFactory());
+        bootstrap.setOption("soLinger", -1);
+        bootstrap.setOption("tcpNoDelay", true);
+
+        connectFuture = bootstrap.connect(addr);
+        connectFuture.addListener(new ChannelFutureListener() {
+            @Override
+            public void operationComplete(ChannelFuture channelFuture) throws Exception {
+                // this lock guarantees that channel won't be assgined after cleanup().
+                connectLock.lock();
+                try {
+                    if (!channelFuture.isSuccess() || connectFuture == null) {
+                        LOG.info("future isn't success, cause: {}", channelFuture.getCause());
+                        return;
+                    }
+                    // setup channel, variables, connection, etc.
+                    channel = channelFuture.getChannel();
+
+                    disconnected.set(false);
+                    initialized = false;
+                    lenBuffer.clear();
+                    incomingBuffer = lenBuffer;
+
+                    sendThread.primeConnection();
+                    updateNow();
+                    updateLastSendAndHeard();
+
+                    if (sendThread.tunnelAuthInProgress()) {
+                        waitSasl.drainPermits();
+                        needSasl.set(true);
+                        sendPrimePacket();
+                    } else {
+                        needSasl.set(false);
+                    }
+
+                    // we need to wake up on first connect to avoid timeout.
+                    wakeupCnxn();
+                    firstConnect.countDown();
+                    LOG.info("channel is connected: {}", channelFuture.getChannel());
+                } finally {
+                    connectLock.unlock();
+                }
+            }
+        });
+    }
+
+    @Override
+    void cleanup() {
+        connectLock.lock();
+        try {
+            if (connectFuture != null) {
+                connectFuture.cancel();
+                connectFuture = null;
+            }
+            if (channel != null) {
+                channel.close().awaitUninterruptibly();
+                channel = null;
+            }
+        } finally {
+            connectLock.unlock();
+        }
+        Iterator<Packet> iter = outgoingQueue.iterator();
+        while (iter.hasNext()) {
+            Packet p = iter.next();
+            if (p == WakeupPacket.getInstance()) {
+                iter.remove();
+            }
+        }
+    }
+
+    @Override
+    void close() {
+        channelFactory.releaseExternalResources();
+    }
+
+    @Override
+    void saslCompleted() {
+        needSasl.set(false);
+        waitSasl.release();
+    }
+
+    @Override
+    void connectionPrimed() {
+    }
+
+    @Override
+    void packetAdded() {
+    }
+
+    @Override
+    void onClosing() {
+        firstConnect.countDown();
+        wakeupCnxn();
+        LOG.info("channel is told closing");
+    }
+
+    private void wakeupCnxn() {
+        if (needSasl.get()) {
+            waitSasl.release();
+        }
+        outgoingQueue.add(WakeupPacket.getInstance());
+    }
+
+    @Override
+    void doTransport(int waitTimeOut,
+                     List<Packet> pendingQueue,
+                     ClientCnxn cnxn)
+            throws IOException, InterruptedException {
+        try {
+            if (!firstConnect.await(waitTimeOut, TimeUnit.MILLISECONDS)) {
+                return;
+            }
+            Packet head = null;
+            if (needSasl.get()) {
+                if (!waitSasl.tryAcquire(waitTimeOut, TimeUnit.MILLISECONDS)) {
+                    return;
+                }
+            } else {
+                if ((head = outgoingQueue.poll(waitTimeOut, TimeUnit.MILLISECONDS)) == null) {
+                    return;
+                }
+            }
+            // check if being waken up on closing.
+            if (!sendThread.getZkState().isAlive()) {
+                // adding back the patck to notify of failure in conLossPacket().
+                addBack(head);
+                return;
+            }
+            // channel disconnection happened
+            if (disconnected.get()) {
+                addBack(head);
+                throw new EndOfStreamException("channel for sessionid 0x"
+                        + Long.toHexString(sessionId)
+                        + " is lost");
+            }
+            if (head != null) {
+                doWrite(pendingQueue, head, cnxn);
+            }
+        } finally {
+            updateNow();
+        }
+    }
+
+    private void addBack(Packet head) {
+        if (head != null && head != WakeupPacket.getInstance()) {
+            outgoingQueue.addFirst(head);
+        }
+    }
+
+    private void sendPkt(Packet p) {
+        // Assuming the packet will be sent out successfully. Because if it fails,
+        // the channel will close and clean up queues.
+        p.createBB();
+        updateLastSend();
+        sentCount++;
+        channel.write(ChannelBuffers.wrappedBuffer(p.bb));
+    }
+
+    private void sendPrimePacket() {
+        // assuming the first packet is the priming packet.
+        sendPkt(outgoingQueue.remove());
+    }
+
+    /**
+     * doWrite handles writing the packets from outgoingQueue via network to server.
+     */
+    private void doWrite(List<Packet> pendingQueue, Packet p, ClientCnxn cnxn) {
+        updateNow();
+        while (true) {
+            if (p != WakeupPacket.getInstance()) {
+                if ((p.requestHeader != null) &&
+                        (p.requestHeader.getType() != ZooDefs.OpCode.ping) &&
+                        (p.requestHeader.getType() != ZooDefs.OpCode.auth)) {
+                    p.requestHeader.setXid(cnxn.getXid());
+                    synchronized (pendingQueue) {
+                        pendingQueue.add(p);
+                    }
+                }
+                sendPkt(p);
+            }
+            if (outgoingQueue.isEmpty()) {
+                break;
+            }
+            p = outgoingQueue.remove();
+        }
+    }
+
+    @Override
+    void sendPacket(ClientCnxn.Packet p) throws IOException {
+        if (channel == null) {
+            throw new IOException("channel has been closed");
+        }
+        sendPkt(p);
+    }
+
+    @Override
+    SocketAddress getRemoteSocketAddress() {
+        Channel copiedChanRef = channel;
+        return (copiedChanRef == null) ? null : copiedChanRef.getRemoteAddress();
+    }
+
+    @Override
+    SocketAddress getLocalSocketAddress() {
+        Channel copiedChanRef = channel;
+        return (copiedChanRef == null) ? null : copiedChanRef.getLocalAddress();
+    }
+
+    @Override
+    void testableCloseSocket() throws IOException {
+        Channel copiedChanRef = channel;
+        if (copiedChanRef != null) {
+            copiedChanRef.disconnect().awaitUninterruptibly();
+        }
+    }
+
+
+    // *************** <END> CientCnxnSocketNetty </END> ******************
+    private static class WakeupPacket {
+        private static Packet instance = null;
+
+        protected WakeupPacket() {
+            // Exists only to defeat instantiation.
+        }
+
+        public static Packet getInstance() {
+            if (instance == null) {
+                instance = new Packet(null, null, null, null, null);
+            }
+            return instance;
+        }
+    }
+    /**
+     * ZKClientPipelineFactory is the netty pipeline factory for this netty
+     * connection implementation.
+     */
+    private class ZKClientPipelineFactory implements ChannelPipelineFactory {
+        @Override
+        public ChannelPipeline getPipeline() throws Exception {
+            ChannelPipeline pipeline = Channels.pipeline();
+            // add ssl here
+            pipeline.addLast("handler", new ZKClientHandler());
+            return pipeline;
+        }
+    }
+
+    /**
+     * ZKClientHandler is the netty handler that sits in netty upstream last
+     * place. It mainly handles read traffic and helps synchronize connection state.
+     */
+    private class ZKClientHandler extends SimpleChannelUpstreamHandler {
+        AtomicBoolean channelClosed = new AtomicBoolean(false);
+
+        @Override
+        public void channelDisconnected(ChannelHandlerContext ctx,
+                                        ChannelStateEvent e) throws Exception {
+            LOG.info("channel is disconnected: {}", ctx.getChannel());
+            cleanup();
+        }
+
+        /**
+         * netty handler has encountered problems. We are cleaning it up and tell outside to close
+         * the channel/connection.
+         */
+        private void cleanup() {
+            if (!channelClosed.compareAndSet(false, true)) {
+                return;
+            }
+            disconnected.set(true);
+            onClosing();
+        }
+
+        @Override
+        public void messageReceived(ChannelHandlerContext ctx,
+                                    MessageEvent e) throws Exception {
+            updateNow();
+            ChannelBuffer buf = (ChannelBuffer) e.getMessage();
+            while (buf.readable()) {
+                if (incomingBuffer.remaining() > buf.readableBytes()) {
+                    int newLimit = incomingBuffer.position()
+                            + buf.readableBytes();
+                    incomingBuffer.limit(newLimit);
+                }
+                buf.readBytes(incomingBuffer);
+                incomingBuffer.limit(incomingBuffer.capacity());
+
+                if (!incomingBuffer.hasRemaining()) {
+                    incomingBuffer.flip();
+                    if (incomingBuffer == lenBuffer) {
+                        recvCount++;
+                        readLength();
+                    } else if (!initialized) {
+                        readConnectResult();
+                        lenBuffer.clear();
+                        incomingBuffer = lenBuffer;
+                        initialized = true;
+                        updateLastHeard();
+                    } else {
+                        sendThread.readResponse(incomingBuffer);
+                        lenBuffer.clear();
+                        incomingBuffer = lenBuffer;
+                        updateLastHeard();
+                    }
+                }
+            }
+            wakeupCnxn();
+        }
+
+        @Override
+        public void exceptionCaught(ChannelHandlerContext ctx,
+                                    ExceptionEvent e) throws Exception {
+            LOG.warn("Exception caught: {}", e, e.getCause());
+            cleanup();
+        }
+    }
+}

Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/ZooKeeperTestable.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/ZooKeeperTestable.java?rev=1646985&r1=1646984&r2=1646985&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/ZooKeeperTestable.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/ZooKeeperTestable.java Sat Dec 20 14:29:07 2014
@@ -41,7 +41,7 @@ class ZooKeeperTestable implements Testa
                 Watcher.Event.EventType.None,
                 Watcher.Event.KeeperState.Expired, null));
         clientCnxn.eventThread.queueEventOfDeath();
-        clientCnxn.sendThread.getClientCnxnSocket().wakeupCnxn();
         clientCnxn.state = ZooKeeper.States.CLOSED;
+        clientCnxn.sendThread.getClientCnxnSocket().onClosing();
     }
 }

Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/client/ZooKeeperSaslClient.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/client/ZooKeeperSaslClient.java?rev=1646985&r1=1646984&r2=1646985&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/client/ZooKeeperSaslClient.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/client/ZooKeeperSaslClient.java Sat Dec 20 14:29:07 2014
@@ -339,7 +339,7 @@ public class ZooKeeperSaslClient {
             // SASL authentication is completed, successfully or not:
             // enable the socket's writable flag so that any packets waiting for authentication to complete in
             // the outgoing queue will be sent to the Zookeeper server.
-            cnxn.enableWrite();
+            cnxn.saslCompleted();
         }
     }
 

Modified: zookeeper/trunk/src/java/test/org/apache/zookeeper/test/ClientTest.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/ClientTest.java?rev=1646985&r1=1646984&r2=1646985&view=diff
==============================================================================
--- zookeeper/trunk/src/java/test/org/apache/zookeeper/test/ClientTest.java (original)
+++ zookeeper/trunk/src/java/test/org/apache/zookeeper/test/ClientTest.java Sat Dec 20 14:29:07 2014
@@ -842,11 +842,6 @@ public class ClientTest extends ClientBa
         ReplyHeader r = zk.submitRequest(h, request, response, null);
 
         Assert.assertEquals(r.getErr(), Code.UNIMPLEMENTED.intValue());
-
-        try {
-            zk.exists("/m1", false);
-            fail("The connection should have been closed");
-        } catch (KeeperException.ConnectionLossException expected) {
-        }
+        zk.testableWaitForShutdown(CONNECTION_TIMEOUT);
     }
 }

Added: zookeeper/trunk/src/java/test/org/apache/zookeeper/test/NettyNettySuiteBase.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/NettyNettySuiteBase.java?rev=1646985&view=auto
==============================================================================
--- zookeeper/trunk/src/java/test/org/apache/zookeeper/test/NettyNettySuiteBase.java (added)
+++ zookeeper/trunk/src/java/test/org/apache/zookeeper/test/NettyNettySuiteBase.java Sat Dec 20 14:29:07 2014
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.test;
+
+import org.apache.zookeeper.ClientCnxnSocketNetty;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.server.NettyServerCnxnFactory;
+import org.apache.zookeeper.server.ServerCnxnFactory;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.runner.RunWith;
+import org.junit.runners.Suite;
+
+/**
+ * Run tests with: Netty Client against Netty server
+ */
+@RunWith(Suite.class)
+public class NettyNettySuiteBase {
+    @BeforeClass
+    public static void setUp() {
+        System.setProperty(ServerCnxnFactory.ZOOKEEPER_SERVER_CNXN_FACTORY,
+                NettyServerCnxnFactory.class.getName());
+        System.setProperty(ZooKeeper.ZOOKEEPER_CLIENT_CNXN_SOCKET,
+                ClientCnxnSocketNetty.class.getName());
+    }
+
+    @AfterClass
+    public static void tearDown() {
+        System.clearProperty(ServerCnxnFactory.ZOOKEEPER_SERVER_CNXN_FACTORY);
+        System.clearProperty(ZooKeeper.ZOOKEEPER_CLIENT_CNXN_SOCKET);
+    }
+}

Added: zookeeper/trunk/src/java/test/org/apache/zookeeper/test/NettyNettySuiteHammerTest.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/NettyNettySuiteHammerTest.java?rev=1646985&view=auto
==============================================================================
--- zookeeper/trunk/src/java/test/org/apache/zookeeper/test/NettyNettySuiteHammerTest.java (added)
+++ zookeeper/trunk/src/java/test/org/apache/zookeeper/test/NettyNettySuiteHammerTest.java Sat Dec 20 14:29:07 2014
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.test;
+
+import org.junit.runners.Suite;
+
+/**
+ * Run tests with: Netty Client against Netty server
+ */
+@Suite.SuiteClasses({
+        AsyncHammerTest.class
+})
+public class NettyNettySuiteHammerTest extends NettyNettySuiteBase {
+}

Added: zookeeper/trunk/src/java/test/org/apache/zookeeper/test/NettyNettySuiteTest.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/NettyNettySuiteTest.java?rev=1646985&view=auto
==============================================================================
--- zookeeper/trunk/src/java/test/org/apache/zookeeper/test/NettyNettySuiteTest.java (added)
+++ zookeeper/trunk/src/java/test/org/apache/zookeeper/test/NettyNettySuiteTest.java Sat Dec 20 14:29:07 2014
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.test;
+
+import org.junit.runners.Suite;
+
+/**
+ * Run tests with: Netty Client against Netty server
+ */
+@Suite.SuiteClasses({
+        ACLTest.class,
+        AsyncOpsTest.class,
+        ChrootClientTest.class,
+        ClientTest.class,
+        FourLetterWordsTest.class,
+        NullDataTest.class,
+        SessionTest.class,
+        WatcherTest.class
+})
+public class NettyNettySuiteTest extends NettyNettySuiteBase {
+}