You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by fp...@apache.org on 2014/12/20 15:29:08 UTC
svn commit: r1646985 - in /zookeeper/trunk: ./
src/java/main/org/apache/zookeeper/
src/java/main/org/apache/zookeeper/client/
src/java/test/org/apache/zookeeper/test/
Author: fpj
Date: Sat Dec 20 14:29:07 2014
New Revision: 1646985
URL: http://svn.apache.org/r1646985
Log:
ZOOKEEPER-2069 Netty Support for ClientCnxnSocket (Hongchao via fpj)
Added:
zookeeper/trunk/src/java/main/org/apache/zookeeper/ClientCnxnSocketNetty.java
zookeeper/trunk/src/java/test/org/apache/zookeeper/test/NettyNettySuiteBase.java
zookeeper/trunk/src/java/test/org/apache/zookeeper/test/NettyNettySuiteHammerTest.java
zookeeper/trunk/src/java/test/org/apache/zookeeper/test/NettyNettySuiteTest.java
Modified:
zookeeper/trunk/CHANGES.txt
zookeeper/trunk/src/java/main/org/apache/zookeeper/ClientCnxn.java
zookeeper/trunk/src/java/main/org/apache/zookeeper/ClientCnxnSocket.java
zookeeper/trunk/src/java/main/org/apache/zookeeper/ClientCnxnSocketNIO.java
zookeeper/trunk/src/java/main/org/apache/zookeeper/ZooKeeperTestable.java
zookeeper/trunk/src/java/main/org/apache/zookeeper/client/ZooKeeperSaslClient.java
zookeeper/trunk/src/java/test/org/apache/zookeeper/test/ClientTest.java
Modified: zookeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/zookeeper/trunk/CHANGES.txt?rev=1646985&r1=1646984&r2=1646985&view=diff
==============================================================================
--- zookeeper/trunk/CHANGES.txt (original)
+++ zookeeper/trunk/CHANGES.txt Sat Dec 20 14:29:07 2014
@@ -44,6 +44,8 @@ IMPROVEMENTS:
ZOOKEEPER-1963 Make JDK 7 the minimum requirement for Zookeeper
(Hongchao via fpj)
+ ZOOKEEPER-2069 Netty Support for ClientCnxnSocket (Hongchao via fpj)
+
Release 3.5.0 - 8/4/2014
NEW FEATURES:
Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/ClientCnxn.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/ClientCnxn.java?rev=1646985&r1=1646984&r2=1646985&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/ClientCnxn.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/ClientCnxn.java Sat Dec 20 14:29:07 2014
@@ -22,13 +22,13 @@ import java.io.BufferedReader;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStreamReader;
-import java.lang.Thread.UncaughtExceptionHandler;
import java.net.ConnectException;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.util.HashSet;
+import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
@@ -36,6 +36,7 @@ import java.util.Random;
import java.util.Set;
import java.util.Map.Entry;
import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.LinkedBlockingQueue;
import javax.security.auth.login.LoginException;
@@ -134,7 +135,7 @@ public class ClientCnxn {
/**
* These are the packets that need to be sent.
*/
- private final LinkedList<Packet> outgoingQueue = new LinkedList<Packet>();
+ private final LinkedBlockingDeque<Packet> outgoingQueue = new LinkedBlockingDeque<Packet>();
private int connectTimeout;
@@ -883,7 +884,7 @@ public class ClientCnxn {
// If SASL authentication is currently in progress, construct and
// send a response packet immediately, rather than queuing a
// response as with other packets.
- if (clientTunneledAuthenticationInProgress()) {
+ if (tunnelAuthInProgress()) {
GetSASLRequest request = new GetSASLRequest();
request.deserialize(bbia,"token");
zooKeeperSaslClient.respondToServer(request.getToken(),
@@ -959,6 +960,9 @@ public class ClientCnxn {
return clientCnxnSocket;
}
+ /**
+ * Setup session, previous watches, authentication.
+ */
void primeConnection() throws IOException {
LOG.info("Socket connection established to "
+ clientCnxnSocket.getRemoteSocketAddress()
@@ -967,38 +971,36 @@ public class ClientCnxn {
long sessId = (seenRwServerBefore) ? sessionId : 0;
ConnectRequest conReq = new ConnectRequest(0, lastZxid,
sessionTimeout, sessId, sessionPasswd);
- synchronized (outgoingQueue) {
- // We add backwards since we are pushing into the front
- // Only send if there's a pending watch
- // TODO: here we have the only remaining use of zooKeeper in
- // this class. It's to be eliminated!
- if (!disableAutoWatchReset) {
- List<String> dataWatches = zooKeeper.getDataWatches();
- List<String> existWatches = zooKeeper.getExistWatches();
- List<String> childWatches = zooKeeper.getChildWatches();
- if (!dataWatches.isEmpty()
- || !existWatches.isEmpty() || !childWatches.isEmpty()) {
- SetWatches sw = new SetWatches(lastZxid,
- prependChroot(dataWatches),
- prependChroot(existWatches),
- prependChroot(childWatches));
- RequestHeader h = new RequestHeader();
- h.setType(ZooDefs.OpCode.setWatches);
- h.setXid(-8);
- Packet packet = new Packet(h, new ReplyHeader(), sw, null, null);
- outgoingQueue.addFirst(packet);
- }
+ // We add backwards since we are pushing into the front
+ // Only send if there's a pending watch
+ // TODO: here we have the only remaining use of zooKeeper in
+ // this class. It's to be eliminated!
+ if (!disableAutoWatchReset) {
+ List<String> dataWatches = zooKeeper.getDataWatches();
+ List<String> existWatches = zooKeeper.getExistWatches();
+ List<String> childWatches = zooKeeper.getChildWatches();
+ if (!dataWatches.isEmpty()
+ || !existWatches.isEmpty() || !childWatches.isEmpty()) {
+ SetWatches sw = new SetWatches(lastZxid,
+ prependChroot(dataWatches),
+ prependChroot(existWatches),
+ prependChroot(childWatches));
+ RequestHeader h = new RequestHeader();
+ h.setType(ZooDefs.OpCode.setWatches);
+ h.setXid(-8);
+ Packet packet = new Packet(h, new ReplyHeader(), sw, null, null);
+ outgoingQueue.addFirst(packet);
}
-
- for (AuthData id : authInfo) {
- outgoingQueue.addFirst(new Packet(new RequestHeader(-4,
- OpCode.auth), null, new AuthPacket(0, id.scheme,
- id.data), null, null));
- }
- outgoingQueue.addFirst(new Packet(null, null, conReq,
- null, null, readOnly));
}
- clientCnxnSocket.enableReadWriteOnly();
+
+ for (AuthData id : authInfo) {
+ outgoingQueue.addFirst(new Packet(new RequestHeader(-4,
+ OpCode.auth), null, new AuthPacket(0, id.scheme,
+ id.data), null, null));
+ }
+ outgoingQueue.addFirst(new Packet(null, null, conReq,
+ null, null, readOnly));
+ clientCnxnSocket.connectionPrimed();
if (LOG.isDebugEnabled()) {
LOG.debug("Session establishment request sent on "
+ clientCnxnSocket.getRemoteSocketAddress());
@@ -1095,10 +1097,9 @@ public class ClientCnxn {
private static final String RETRY_CONN_MSG =
", closing socket connection and attempting reconnect";
-
@Override
public void run() {
- clientCnxnSocket.introduce(this,sessionId);
+ clientCnxnSocket.introduce(this, sessionId, outgoingQueue);
clientCnxnSocket.updateNow();
clientCnxnSocket.updateLastSendAndHeard();
int to;
@@ -1189,7 +1190,7 @@ public class ClientCnxn {
to = Math.min(to, pingRwTimeout - idlePingRwServer);
}
- clientCnxnSocket.doTransport(to, pendingQueue, outgoingQueue, ClientCnxn.this);
+ clientCnxnSocket.doTransport(to, pendingQueue, ClientCnxn.this);
} catch (Throwable e) {
if (closing) {
if (LOG.isDebugEnabled()) {
@@ -1218,6 +1219,8 @@ public class ClientCnxn {
+ ", unexpected error"
+ RETRY_CONN_MSG, e);
}
+ // At this point, there might still be new packets appended to outgoingQueue.
+ // they will be handled in next connection or cleared up if closed.
cleanup();
if (state.isAlive()) {
eventThread.queueEvent(new WatchedEvent(
@@ -1230,6 +1233,8 @@ public class ClientCnxn {
}
}
}
+ // When it comes to this point, it guarantees that later queued packet to outgoingQueue will be
+ // notified of death.
cleanup();
clientCnxnSocket.close();
if (state.isAlive()) {
@@ -1300,11 +1305,14 @@ public class ClientCnxn {
}
pendingQueue.clear();
}
- synchronized (outgoingQueue) {
- for (Packet p : outgoingQueue) {
- conLossPacket(p);
- }
- outgoingQueue.clear();
+ // We can't call outgoingQueue.clear() here because
+ // between iterating and clear up there might be new
+ // packets added in queuePacket().
+ Iterator<Packet> iter = outgoingQueue.iterator();
+ while (iter.hasNext()) {
+ Packet p = iter.next();
+ conLossPacket(p);
+ iter.remove();
}
}
@@ -1357,14 +1365,14 @@ public class ClientCnxn {
void close() {
state = States.CLOSED;
- clientCnxnSocket.wakeupCnxn();
+ clientCnxnSocket.onClosing();
}
void testableCloseSocket() throws IOException {
clientCnxnSocket.testableCloseSocket();
}
- public boolean clientTunneledAuthenticationInProgress() {
+ public boolean tunnelAuthInProgress() {
// 1. SASL client is disabled.
if (!ZooKeeperSaslClient.isEnabled()) {
return false;
@@ -1464,8 +1472,8 @@ public class ClientCnxn {
return r;
}
- public void enableWrite() {
- sendThread.getClientCnxnSocket().enableWrite();
+ public void saslCompleted() {
+ sendThread.getClientCnxnSocket().saslCompleted();
}
public void sendPacket(Record request, Record response, AsyncCallback cb, int opCode)
@@ -1501,25 +1509,23 @@ public class ClientCnxn {
// Note that we do not generate the Xid for the packet yet. It is
// generated later at send-time, by an implementation of ClientCnxnSocket::doIO(),
// where the packet is actually sent.
- synchronized (outgoingQueue) {
- packet = new Packet(h, r, request, response, watchRegistration);
- packet.cb = cb;
- packet.ctx = ctx;
- packet.clientPath = clientPath;
- packet.serverPath = serverPath;
- packet.watchDeregistration = watchDeregistration;
- if (!state.isAlive() || closing) {
- conLossPacket(packet);
- } else {
- // If the client is asking to close the session then
- // mark as closing
- if (h.getType() == OpCode.closeSession) {
- closing = true;
- }
- outgoingQueue.add(packet);
+ packet = new Packet(h, r, request, response, watchRegistration);
+ packet.cb = cb;
+ packet.ctx = ctx;
+ packet.clientPath = clientPath;
+ packet.serverPath = serverPath;
+ packet.watchDeregistration = watchDeregistration;
+ if (!state.isAlive() || closing) {
+ conLossPacket(packet);
+ } else {
+ // If the client is asking to close the session then
+ // mark as closing
+ if (h.getType() == OpCode.closeSession) {
+ closing = true;
}
+ outgoingQueue.add(packet);
}
- sendThread.getClientCnxnSocket().wakeupCnxn();
+ sendThread.getClientCnxnSocket().packetAdded();
return packet;
}
Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/ClientCnxnSocket.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/ClientCnxnSocket.java?rev=1646985&r1=1646984&r2=1646985&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/ClientCnxnSocket.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/ClientCnxnSocket.java Sat Dec 20 14:29:07 2014
@@ -22,8 +22,8 @@ import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
-import java.util.LinkedList;
import java.util.List;
+import java.util.concurrent.LinkedBlockingDeque;
import org.apache.jute.BinaryInputArchive;
import org.apache.zookeeper.ClientCnxn.Packet;
@@ -61,6 +61,7 @@ abstract class ClientCnxnSocket {
protected long lastSend;
protected long now;
protected ClientCnxn.SendThread sendThread;
+ protected LinkedBlockingDeque<Packet> outgoingQueue;
/**
* The sessionId is only available here for Log and Exception messages.
@@ -68,9 +69,11 @@ abstract class ClientCnxnSocket {
*/
protected long sessionId;
- void introduce(ClientCnxn.SendThread sendThread, long sessionId) {
+ void introduce(ClientCnxn.SendThread sendThread, long sessionId,
+ LinkedBlockingDeque<Packet> outgoingQueue) {
this.sendThread = sendThread;
this.sessionId = sessionId;
+ this.outgoingQueue = outgoingQueue;
}
void updateNow() {
@@ -148,27 +151,75 @@ abstract class ClientCnxnSocket {
abstract void connect(InetSocketAddress addr) throws IOException;
+ /**
+ * Returns the address to which the socket is connected.
+ */
abstract SocketAddress getRemoteSocketAddress();
+ /**
+ * Returns the address to which the socket is bound.
+ */
abstract SocketAddress getLocalSocketAddress();
+ /**
+ * Clean up resources for a fresh new socket.
+ * It's called before reconnect or close.
+ */
abstract void cleanup();
- abstract void close();
-
- abstract void wakeupCnxn();
+ /**
+ * new packets are added to outgoingQueue.
+ */
+ abstract void packetAdded();
- abstract void enableWrite();
+ /**
+ * connState is marked CLOSED and notify ClientCnxnSocket to react.
+ */
+ abstract void onClosing();
- abstract void disableWrite();
+ /**
+ * Sasl completes. Allows non-priming packgets to be sent.
+ * Note that this method will only be called if Sasl starts and completes.
+ */
+ abstract void saslCompleted();
- abstract void enableReadWriteOnly();
+ /**
+ * being called after ClientCnxn finish PrimeConnection
+ */
+ abstract void connectionPrimed();
+ /**
+ * Do transportation work:
+ * - read packets into incomingBuffer.
+ * - write outgoing queue packets.
+ * - update relevant timestamp.
+ *
+ * @param waitTimeOut timeout in blocking wait. Unit in MilliSecond.
+ * @param pendingQueue These are the packets that have been sent and
+ * are waiting for a response.
+ * @param cnxn
+ * @throws IOException
+ * @throws InterruptedException
+ */
abstract void doTransport(int waitTimeOut, List<Packet> pendingQueue,
- LinkedList<Packet> outgoingQueue, ClientCnxn cnxn)
+ ClientCnxn cnxn)
throws IOException, InterruptedException;
+ /**
+ * Close the socket.
+ */
abstract void testableCloseSocket() throws IOException;
+ /**
+ * Close this client.
+ */
+ abstract void close();
+
+ /**
+ * Send Sasl packets directly.
+ * The Sasl process will send the first (requestHeader == null) packet,
+ * and then block the doTransport write,
+ * finally unblock it when finished.
+ */
abstract void sendPacket(Packet p) throws IOException;
}
Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/ClientCnxnSocketNIO.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/ClientCnxnSocketNIO.java?rev=1646985&r1=1646984&r2=1646985&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/ClientCnxnSocketNIO.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/ClientCnxnSocketNIO.java Sat Dec 20 14:29:07 2014
@@ -26,10 +26,10 @@ import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
-import java.util.LinkedList;
+import java.util.Iterator;
import java.util.List;
-import java.util.ListIterator;
import java.util.Set;
+import java.util.concurrent.LinkedBlockingDeque;
import org.apache.zookeeper.ClientCnxn.EndOfStreamException;
import org.apache.zookeeper.ClientCnxn.Packet;
@@ -63,7 +63,7 @@ public class ClientCnxnSocketNIO extends
* @throws InterruptedException
* @throws IOException
*/
- void doIO(List<Packet> pendingQueue, LinkedList<Packet> outgoingQueue, ClientCnxn cnxn)
+ void doIO(List<Packet> pendingQueue, ClientCnxn cnxn)
throws InterruptedException, IOException {
SocketChannel sock = (SocketChannel) sockKey.channel();
if (sock == null) {
@@ -86,7 +86,7 @@ public class ClientCnxnSocketNIO extends
readConnectResult();
enableRead();
if (findSendablePacket(outgoingQueue,
- cnxn.sendThread.clientTunneledAuthenticationInProgress()) != null) {
+ sendThread.tunnelAuthInProgress()) != null) {
// Since SASL authentication has completed (if client is configured to do so),
// outgoing packets waiting in the outgoingQueue can now be sent.
enableWrite();
@@ -104,96 +104,87 @@ public class ClientCnxnSocketNIO extends
}
}
if (sockKey.isWritable()) {
- synchronized(outgoingQueue) {
- Packet p = findSendablePacket(outgoingQueue,
- cnxn.sendThread.clientTunneledAuthenticationInProgress());
-
- if (p != null) {
- updateLastSend();
- // If we already started writing p, p.bb will already exist
- if (p.bb == null) {
- if ((p.requestHeader != null) &&
- (p.requestHeader.getType() != OpCode.ping) &&
- (p.requestHeader.getType() != OpCode.auth)) {
- p.requestHeader.setXid(cnxn.getXid());
- }
- p.createBB();
+ Packet p = findSendablePacket(outgoingQueue,
+ sendThread.tunnelAuthInProgress());
+
+ if (p != null) {
+ updateLastSend();
+ // If we already started writing p, p.bb will already exist
+ if (p.bb == null) {
+ if ((p.requestHeader != null) &&
+ (p.requestHeader.getType() != OpCode.ping) &&
+ (p.requestHeader.getType() != OpCode.auth)) {
+ p.requestHeader.setXid(cnxn.getXid());
}
- sock.write(p.bb);
- if (!p.bb.hasRemaining()) {
- sentCount++;
- outgoingQueue.removeFirstOccurrence(p);
- if (p.requestHeader != null
- && p.requestHeader.getType() != OpCode.ping
- && p.requestHeader.getType() != OpCode.auth) {
- synchronized (pendingQueue) {
- pendingQueue.add(p);
- }
+ p.createBB();
+ }
+ sock.write(p.bb);
+ if (!p.bb.hasRemaining()) {
+ sentCount++;
+ outgoingQueue.removeFirstOccurrence(p);
+ if (p.requestHeader != null
+ && p.requestHeader.getType() != OpCode.ping
+ && p.requestHeader.getType() != OpCode.auth) {
+ synchronized (pendingQueue) {
+ pendingQueue.add(p);
}
}
}
- if (outgoingQueue.isEmpty()) {
- // No more packets to send: turn off write interest flag.
- // Will be turned on later by a later call to enableWrite(),
- // from within ZooKeeperSaslClient (if client is configured
- // to attempt SASL authentication), or in either doIO() or
- // in doTransport() if not.
- disableWrite();
- } else if (!initialized && p != null && !p.bb.hasRemaining()) {
- // On initial connection, write the complete connect request
- // packet, but then disable further writes until after
- // receiving a successful connection response. If the
- // session is expired, then the server sends the expiration
- // response and immediately closes its end of the socket. If
- // the client is simultaneously writing on its end, then the
- // TCP stack may choose to abort with RST, in which case the
- // client would never receive the session expired event. See
- // http://docs.oracle.com/javase/6/docs/technotes/guides/net/articles/connection_release.html
- disableWrite();
- } else {
- // Just in case
- enableWrite();
- }
+ }
+ if (outgoingQueue.isEmpty()) {
+ // No more packets to send: turn off write interest flag.
+ // Will be turned on later by a later call to enableWrite(),
+ // from within ZooKeeperSaslClient (if client is configured
+ // to attempt SASL authentication), or in either doIO() or
+ // in doTransport() if not.
+ disableWrite();
+ } else if (!initialized && p != null && !p.bb.hasRemaining()) {
+ // On initial connection, write the complete connect request
+ // packet, but then disable further writes until after
+ // receiving a successful connection response. If the
+ // session is expired, then the server sends the expiration
+ // response and immediately closes its end of the socket. If
+ // the client is simultaneously writing on its end, then the
+ // TCP stack may choose to abort with RST, in which case the
+ // client would never receive the session expired event. See
+ // http://docs.oracle.com/javase/6/docs/technotes/guides/net/articles/connection_release.html
+ disableWrite();
+ } else {
+ // Just in case
+ enableWrite();
}
}
}
- private Packet findSendablePacket(LinkedList<Packet> outgoingQueue,
- boolean clientTunneledAuthenticationInProgress) {
- synchronized (outgoingQueue) {
- if (outgoingQueue.isEmpty()) {
- return null;
- }
- if (outgoingQueue.getFirst().bb != null // If we've already starting sending the first packet, we better finish
- || !clientTunneledAuthenticationInProgress) {
- return outgoingQueue.getFirst();
- }
-
- // Since client's authentication with server is in progress,
- // send only the null-header packet queued by primeConnection().
- // This packet must be sent so that the SASL authentication process
- // can proceed, but all other packets should wait until
- // SASL authentication completes.
- ListIterator<Packet> iter = outgoingQueue.listIterator();
- while (iter.hasNext()) {
- Packet p = iter.next();
- if (p.requestHeader == null) {
- // We've found the priming-packet. Move it to the beginning of the queue.
- iter.remove();
- outgoingQueue.add(0, p);
- return p;
- } else {
- // Non-priming packet: defer it until later, leaving it in the queue
- // until authentication completes.
- if (LOG.isDebugEnabled()) {
- LOG.debug("deferring non-priming packet: " + p +
- "until SASL authentication completes.");
- }
- }
- }
- // no sendable packet found.
+ private Packet findSendablePacket(LinkedBlockingDeque<Packet> outgoingQueue,
+ boolean tunneledAuthInProgres) {
+ if (outgoingQueue.isEmpty()) {
return null;
}
+ // If we've already starting sending the first packet, we better finish
+ if (outgoingQueue.getFirst().bb != null || !tunneledAuthInProgres) {
+ return outgoingQueue.getFirst();
+ }
+ // Since client's authentication with server is in progress,
+ // send only the null-header packet queued by primeConnection().
+ // This packet must be sent so that the SASL authentication process
+ // can proceed, but all other packets should wait until
+ // SASL authentication completes.
+ Iterator<Packet> iter = outgoingQueue.iterator();
+ while (iter.hasNext()) {
+ Packet p = iter.next();
+ if (p.requestHeader == null) {
+ // We've found the priming-packet. Move it to the beginning of the queue.
+ iter.remove();
+ outgoingQueue.addFirst(p);
+ return p;
+ } else {
+ // Non-priming packet: defer it until later, leaving it in the queue
+ // until authentication completes.
+ LOG.debug("deferring non-priming packet {} until SASL authentation completes.", p);
+ }
+ }
+ return null;
}
@Override
@@ -333,13 +324,21 @@ public class ClientCnxnSocketNIO extends
}
@Override
- synchronized void wakeupCnxn() {
+ void packetAdded() {
+ wakeupCnxn();
+ }
+
+ @Override
+ void onClosing() {
+ wakeupCnxn();
+ }
+
+ private synchronized void wakeupCnxn() {
selector.wakeup();
}
@Override
- void doTransport(int waitTimeOut, List<Packet> pendingQueue, LinkedList<Packet> outgoingQueue,
- ClientCnxn cnxn)
+ void doTransport(int waitTimeOut, List<Packet> pendingQueue, ClientCnxn cnxn)
throws IOException, InterruptedException {
selector.select(waitTimeOut);
Set<SelectionKey> selected;
@@ -359,15 +358,13 @@ public class ClientCnxnSocketNIO extends
sendThread.primeConnection();
}
} else if ((k.readyOps() & (SelectionKey.OP_READ | SelectionKey.OP_WRITE)) != 0) {
- doIO(pendingQueue, outgoingQueue, cnxn);
+ doIO(pendingQueue, cnxn);
}
}
if (sendThread.getZkState().isConnected()) {
- synchronized(outgoingQueue) {
- if (findSendablePacket(outgoingQueue,
- cnxn.sendThread.clientTunneledAuthenticationInProgress()) != null) {
- enableWrite();
- }
+ if (findSendablePacket(outgoingQueue,
+ sendThread.tunnelAuthInProgress()) != null) {
+ enableWrite();
}
}
selected.clear();
@@ -386,6 +383,10 @@ public class ClientCnxnSocketNIO extends
}
@Override
+ void saslCompleted() {
+ enableWrite();
+ }
+
synchronized void enableWrite() {
int i = sockKey.interestOps();
if ((i & SelectionKey.OP_WRITE) == 0) {
@@ -393,8 +394,7 @@ public class ClientCnxnSocketNIO extends
}
}
- @Override
- public synchronized void disableWrite() {
+ private synchronized void disableWrite() {
int i = sockKey.interestOps();
if ((i & SelectionKey.OP_WRITE) != 0) {
sockKey.interestOps(i & (~SelectionKey.OP_WRITE));
@@ -409,7 +409,7 @@ public class ClientCnxnSocketNIO extends
}
@Override
- synchronized void enableReadWriteOnly() {
+ void connectionPrimed() {
sockKey.interestOps(SelectionKey.OP_READ | SelectionKey.OP_WRITE);
}
@@ -427,6 +427,4 @@ public class ClientCnxnSocketNIO extends
ByteBuffer pbb = p.bb;
sock.write(pbb);
}
-
-
}
Added: zookeeper/trunk/src/java/main/org/apache/zookeeper/ClientCnxnSocketNetty.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/ClientCnxnSocketNetty.java?rev=1646985&view=auto
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/ClientCnxnSocketNetty.java (added)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/ClientCnxnSocketNetty.java Sat Dec 20 14:29:07 2014
@@ -0,0 +1,416 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper;
+
+import org.apache.zookeeper.ClientCnxn.EndOfStreamException;
+import org.apache.zookeeper.ClientCnxn.Packet;
+import org.jboss.netty.bootstrap.ClientBootstrap;
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.buffer.ChannelBuffers;
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelFactory;
+import org.jboss.netty.channel.ChannelFuture;
+import org.jboss.netty.channel.ChannelFutureListener;
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.channel.ChannelPipeline;
+import org.jboss.netty.channel.ChannelPipelineFactory;
+import org.jboss.netty.channel.ChannelStateEvent;
+import org.jboss.netty.channel.Channels;
+import org.jboss.netty.channel.ExceptionEvent;
+import org.jboss.netty.channel.MessageEvent;
+import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
+import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * ClientCnxnSocketNetty implements ClientCnxnSocket abstract methods.
+ * It's responsible for connecting to server, reading/writing network traffic and
+ * being a layer between network data and higher level packets.
+ */
+public class ClientCnxnSocketNetty extends ClientCnxnSocket {
+ private static final Logger LOG = LoggerFactory.getLogger(ClientCnxnSocketNetty.class);
+
+ ChannelFactory channelFactory = new NioClientSocketChannelFactory(
+ Executors.newCachedThreadPool(), Executors.newCachedThreadPool());
+ Channel channel;
+ CountDownLatch firstConnect;
+ ChannelFuture connectFuture;
+ Lock connectLock = new ReentrantLock();
+ AtomicBoolean disconnected = new AtomicBoolean();
+ AtomicBoolean needSasl = new AtomicBoolean();
+ Semaphore waitSasl = new Semaphore(0);
+
+ /**
+ * lifecycles diagram:
+ * <p/>
+ * loop:
+ * - try:
+ * - - !isConnected()
+ * - - - connect()
+ * - - doTransport()
+ * - catch:
+ * - - cleanup()
+ * close()
+ * <p/>
+ * Other none lifecycle methods are in jeopardy getting a null channel
+ * when calling in concurrency. We must handle it.
+ */
+
+ @Override
+ boolean isConnected() {
+ // Assuming that isConnected() is only used to initiate connection,
+ // not used by some other connection status judgement.
+ return channel != null;
+ }
+
+ @Override
+ void connect(InetSocketAddress addr) throws IOException {
+ firstConnect = new CountDownLatch(1);
+
+ ClientBootstrap bootstrap = new ClientBootstrap(channelFactory);
+
+ bootstrap.setPipelineFactory(new ZKClientPipelineFactory());
+ bootstrap.setOption("soLinger", -1);
+ bootstrap.setOption("tcpNoDelay", true);
+
+ connectFuture = bootstrap.connect(addr);
+ connectFuture.addListener(new ChannelFutureListener() {
+ @Override
+ public void operationComplete(ChannelFuture channelFuture) throws Exception {
+ // this lock guarantees that channel won't be assgined after cleanup().
+ connectLock.lock();
+ try {
+ if (!channelFuture.isSuccess() || connectFuture == null) {
+ LOG.info("future isn't success, cause: {}", channelFuture.getCause());
+ return;
+ }
+ // setup channel, variables, connection, etc.
+ channel = channelFuture.getChannel();
+
+ disconnected.set(false);
+ initialized = false;
+ lenBuffer.clear();
+ incomingBuffer = lenBuffer;
+
+ sendThread.primeConnection();
+ updateNow();
+ updateLastSendAndHeard();
+
+ if (sendThread.tunnelAuthInProgress()) {
+ waitSasl.drainPermits();
+ needSasl.set(true);
+ sendPrimePacket();
+ } else {
+ needSasl.set(false);
+ }
+
+ // we need to wake up on first connect to avoid timeout.
+ wakeupCnxn();
+ firstConnect.countDown();
+ LOG.info("channel is connected: {}", channelFuture.getChannel());
+ } finally {
+ connectLock.unlock();
+ }
+ }
+ });
+ }
+
+ @Override
+ void cleanup() {
+ connectLock.lock();
+ try {
+ if (connectFuture != null) {
+ connectFuture.cancel();
+ connectFuture = null;
+ }
+ if (channel != null) {
+ channel.close().awaitUninterruptibly();
+ channel = null;
+ }
+ } finally {
+ connectLock.unlock();
+ }
+ Iterator<Packet> iter = outgoingQueue.iterator();
+ while (iter.hasNext()) {
+ Packet p = iter.next();
+ if (p == WakeupPacket.getInstance()) {
+ iter.remove();
+ }
+ }
+ }
+
+ @Override
+ void close() {
+ channelFactory.releaseExternalResources();
+ }
+
+ @Override
+ void saslCompleted() {
+ needSasl.set(false);
+ waitSasl.release();
+ }
+
+ @Override
+ void connectionPrimed() {
+ }
+
+ @Override
+ void packetAdded() {
+ }
+
+ @Override
+ void onClosing() {
+ firstConnect.countDown();
+ wakeupCnxn();
+ LOG.info("channel is told closing");
+ }
+
+ private void wakeupCnxn() {
+ if (needSasl.get()) {
+ waitSasl.release();
+ }
+ outgoingQueue.add(WakeupPacket.getInstance());
+ }
+
+ @Override
+ void doTransport(int waitTimeOut,
+ List<Packet> pendingQueue,
+ ClientCnxn cnxn)
+ throws IOException, InterruptedException {
+ try {
+ if (!firstConnect.await(waitTimeOut, TimeUnit.MILLISECONDS)) {
+ return;
+ }
+ Packet head = null;
+ if (needSasl.get()) {
+ if (!waitSasl.tryAcquire(waitTimeOut, TimeUnit.MILLISECONDS)) {
+ return;
+ }
+ } else {
+ if ((head = outgoingQueue.poll(waitTimeOut, TimeUnit.MILLISECONDS)) == null) {
+ return;
+ }
+ }
+ // check if being waken up on closing.
+ if (!sendThread.getZkState().isAlive()) {
+ // adding back the patck to notify of failure in conLossPacket().
+ addBack(head);
+ return;
+ }
+ // channel disconnection happened
+ if (disconnected.get()) {
+ addBack(head);
+ throw new EndOfStreamException("channel for sessionid 0x"
+ + Long.toHexString(sessionId)
+ + " is lost");
+ }
+ if (head != null) {
+ doWrite(pendingQueue, head, cnxn);
+ }
+ } finally {
+ updateNow();
+ }
+ }
+
+ private void addBack(Packet head) {
+ if (head != null && head != WakeupPacket.getInstance()) {
+ outgoingQueue.addFirst(head);
+ }
+ }
+
+ private void sendPkt(Packet p) {
+ // Assuming the packet will be sent out successfully. Because if it fails,
+ // the channel will close and clean up queues.
+ p.createBB();
+ updateLastSend();
+ sentCount++;
+ channel.write(ChannelBuffers.wrappedBuffer(p.bb));
+ }
+
+ private void sendPrimePacket() {
+ // assuming the first packet is the priming packet.
+ sendPkt(outgoingQueue.remove());
+ }
+
+ /**
+ * doWrite handles writing the packets from outgoingQueue via network to server.
+ */
+ private void doWrite(List<Packet> pendingQueue, Packet p, ClientCnxn cnxn) {
+ updateNow();
+ while (true) {
+ if (p != WakeupPacket.getInstance()) {
+ if ((p.requestHeader != null) &&
+ (p.requestHeader.getType() != ZooDefs.OpCode.ping) &&
+ (p.requestHeader.getType() != ZooDefs.OpCode.auth)) {
+ p.requestHeader.setXid(cnxn.getXid());
+ synchronized (pendingQueue) {
+ pendingQueue.add(p);
+ }
+ }
+ sendPkt(p);
+ }
+ if (outgoingQueue.isEmpty()) {
+ break;
+ }
+ p = outgoingQueue.remove();
+ }
+ }
+
+ @Override
+ void sendPacket(ClientCnxn.Packet p) throws IOException {
+ if (channel == null) {
+ throw new IOException("channel has been closed");
+ }
+ sendPkt(p);
+ }
+
+ @Override
+ SocketAddress getRemoteSocketAddress() {
+ Channel copiedChanRef = channel;
+ return (copiedChanRef == null) ? null : copiedChanRef.getRemoteAddress();
+ }
+
+ @Override
+ SocketAddress getLocalSocketAddress() {
+ Channel copiedChanRef = channel;
+ return (copiedChanRef == null) ? null : copiedChanRef.getLocalAddress();
+ }
+
+ @Override
+ void testableCloseSocket() throws IOException {
+ Channel copiedChanRef = channel;
+ if (copiedChanRef != null) {
+ copiedChanRef.disconnect().awaitUninterruptibly();
+ }
+ }
+
+
+ // *************** <END> CientCnxnSocketNetty </END> ******************
+ private static class WakeupPacket {
+ private static Packet instance = null;
+
+ protected WakeupPacket() {
+ // Exists only to defeat instantiation.
+ }
+
+ public static Packet getInstance() {
+ if (instance == null) {
+ instance = new Packet(null, null, null, null, null);
+ }
+ return instance;
+ }
+ }
+ /**
+ * ZKClientPipelineFactory is the netty pipeline factory for this netty
+ * connection implementation.
+ */
+ private class ZKClientPipelineFactory implements ChannelPipelineFactory {
+ @Override
+ public ChannelPipeline getPipeline() throws Exception {
+ ChannelPipeline pipeline = Channels.pipeline();
+ // add ssl here
+ pipeline.addLast("handler", new ZKClientHandler());
+ return pipeline;
+ }
+ }
+
+ /**
+ * ZKClientHandler is the netty handler that sits in netty upstream last
+ * place. It mainly handles read traffic and helps synchronize connection state.
+ */
+ private class ZKClientHandler extends SimpleChannelUpstreamHandler {
+ AtomicBoolean channelClosed = new AtomicBoolean(false);
+
+ @Override
+ public void channelDisconnected(ChannelHandlerContext ctx,
+ ChannelStateEvent e) throws Exception {
+ LOG.info("channel is disconnected: {}", ctx.getChannel());
+ cleanup();
+ }
+
+ /**
+ * netty handler has encountered problems. We are cleaning it up and tell outside to close
+ * the channel/connection.
+ */
+ private void cleanup() {
+ if (!channelClosed.compareAndSet(false, true)) {
+ return;
+ }
+ disconnected.set(true);
+ onClosing();
+ }
+
+ @Override
+ public void messageReceived(ChannelHandlerContext ctx,
+ MessageEvent e) throws Exception {
+ updateNow();
+ ChannelBuffer buf = (ChannelBuffer) e.getMessage();
+ while (buf.readable()) {
+ if (incomingBuffer.remaining() > buf.readableBytes()) {
+ int newLimit = incomingBuffer.position()
+ + buf.readableBytes();
+ incomingBuffer.limit(newLimit);
+ }
+ buf.readBytes(incomingBuffer);
+ incomingBuffer.limit(incomingBuffer.capacity());
+
+ if (!incomingBuffer.hasRemaining()) {
+ incomingBuffer.flip();
+ if (incomingBuffer == lenBuffer) {
+ recvCount++;
+ readLength();
+ } else if (!initialized) {
+ readConnectResult();
+ lenBuffer.clear();
+ incomingBuffer = lenBuffer;
+ initialized = true;
+ updateLastHeard();
+ } else {
+ sendThread.readResponse(incomingBuffer);
+ lenBuffer.clear();
+ incomingBuffer = lenBuffer;
+ updateLastHeard();
+ }
+ }
+ }
+ wakeupCnxn();
+ }
+
+ @Override
+ public void exceptionCaught(ChannelHandlerContext ctx,
+ ExceptionEvent e) throws Exception {
+ LOG.warn("Exception caught: {}", e, e.getCause());
+ cleanup();
+ }
+ }
+}
Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/ZooKeeperTestable.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/ZooKeeperTestable.java?rev=1646985&r1=1646984&r2=1646985&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/ZooKeeperTestable.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/ZooKeeperTestable.java Sat Dec 20 14:29:07 2014
@@ -41,7 +41,7 @@ class ZooKeeperTestable implements Testa
Watcher.Event.EventType.None,
Watcher.Event.KeeperState.Expired, null));
clientCnxn.eventThread.queueEventOfDeath();
- clientCnxn.sendThread.getClientCnxnSocket().wakeupCnxn();
clientCnxn.state = ZooKeeper.States.CLOSED;
+ clientCnxn.sendThread.getClientCnxnSocket().onClosing();
}
}
Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/client/ZooKeeperSaslClient.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/client/ZooKeeperSaslClient.java?rev=1646985&r1=1646984&r2=1646985&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/client/ZooKeeperSaslClient.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/client/ZooKeeperSaslClient.java Sat Dec 20 14:29:07 2014
@@ -339,7 +339,7 @@ public class ZooKeeperSaslClient {
// SASL authentication is completed, successfully or not:
// enable the socket's writable flag so that any packets waiting for authentication to complete in
// the outgoing queue will be sent to the Zookeeper server.
- cnxn.enableWrite();
+ cnxn.saslCompleted();
}
}
Modified: zookeeper/trunk/src/java/test/org/apache/zookeeper/test/ClientTest.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/ClientTest.java?rev=1646985&r1=1646984&r2=1646985&view=diff
==============================================================================
--- zookeeper/trunk/src/java/test/org/apache/zookeeper/test/ClientTest.java (original)
+++ zookeeper/trunk/src/java/test/org/apache/zookeeper/test/ClientTest.java Sat Dec 20 14:29:07 2014
@@ -842,11 +842,6 @@ public class ClientTest extends ClientBa
ReplyHeader r = zk.submitRequest(h, request, response, null);
Assert.assertEquals(r.getErr(), Code.UNIMPLEMENTED.intValue());
-
- try {
- zk.exists("/m1", false);
- fail("The connection should have been closed");
- } catch (KeeperException.ConnectionLossException expected) {
- }
+ zk.testableWaitForShutdown(CONNECTION_TIMEOUT);
}
}
Added: zookeeper/trunk/src/java/test/org/apache/zookeeper/test/NettyNettySuiteBase.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/NettyNettySuiteBase.java?rev=1646985&view=auto
==============================================================================
--- zookeeper/trunk/src/java/test/org/apache/zookeeper/test/NettyNettySuiteBase.java (added)
+++ zookeeper/trunk/src/java/test/org/apache/zookeeper/test/NettyNettySuiteBase.java Sat Dec 20 14:29:07 2014
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.test;
+
+import org.apache.zookeeper.ClientCnxnSocketNetty;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.server.NettyServerCnxnFactory;
+import org.apache.zookeeper.server.ServerCnxnFactory;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.runner.RunWith;
+import org.junit.runners.Suite;
+
+/**
+ * Run tests with: Netty Client against Netty server
+ */
+@RunWith(Suite.class)
+public class NettyNettySuiteBase {
+ @BeforeClass
+ public static void setUp() {
+ System.setProperty(ServerCnxnFactory.ZOOKEEPER_SERVER_CNXN_FACTORY,
+ NettyServerCnxnFactory.class.getName());
+ System.setProperty(ZooKeeper.ZOOKEEPER_CLIENT_CNXN_SOCKET,
+ ClientCnxnSocketNetty.class.getName());
+ }
+
+ @AfterClass
+ public static void tearDown() {
+ System.clearProperty(ServerCnxnFactory.ZOOKEEPER_SERVER_CNXN_FACTORY);
+ System.clearProperty(ZooKeeper.ZOOKEEPER_CLIENT_CNXN_SOCKET);
+ }
+}
Added: zookeeper/trunk/src/java/test/org/apache/zookeeper/test/NettyNettySuiteHammerTest.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/NettyNettySuiteHammerTest.java?rev=1646985&view=auto
==============================================================================
--- zookeeper/trunk/src/java/test/org/apache/zookeeper/test/NettyNettySuiteHammerTest.java (added)
+++ zookeeper/trunk/src/java/test/org/apache/zookeeper/test/NettyNettySuiteHammerTest.java Sat Dec 20 14:29:07 2014
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.test;
+
+import org.junit.runners.Suite;
+
+/**
+ * Run tests with: Netty Client against Netty server
+ */
+@Suite.SuiteClasses({
+ AsyncHammerTest.class
+})
+public class NettyNettySuiteHammerTest extends NettyNettySuiteBase {
+}
Added: zookeeper/trunk/src/java/test/org/apache/zookeeper/test/NettyNettySuiteTest.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/NettyNettySuiteTest.java?rev=1646985&view=auto
==============================================================================
--- zookeeper/trunk/src/java/test/org/apache/zookeeper/test/NettyNettySuiteTest.java (added)
+++ zookeeper/trunk/src/java/test/org/apache/zookeeper/test/NettyNettySuiteTest.java Sat Dec 20 14:29:07 2014
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.test;
+
+import org.junit.runners.Suite;
+
+/**
+ * Run tests with: Netty Client against Netty server
+ */
+@Suite.SuiteClasses({
+ ACLTest.class,
+ AsyncOpsTest.class,
+ ChrootClientTest.class,
+ ClientTest.class,
+ FourLetterWordsTest.class,
+ NullDataTest.class,
+ SessionTest.class,
+ WatcherTest.class
+})
+public class NettyNettySuiteTest extends NettyNettySuiteBase {
+}