You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by ph...@apache.org on 2010/11/10 23:36:35 UTC

svn commit: r1033770 - in /hadoop/zookeeper/trunk: ./ src/java/main/org/apache/zookeeper/ src/java/main/org/apache/zookeeper/server/ src/java/test/org/apache/zookeeper/

Author: phunt
Date: Wed Nov 10 22:36:34 2010
New Revision: 1033770

URL: http://svn.apache.org/viewvc?rev=1033770&view=rev
Log:
ZOOKEEPER-909. Extract NIO specific code from ClientCnxn

Added:
    hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/ClientCnxnSocket.java
    hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/ClientCnxnSocketNIO.java
Modified:
    hadoop/zookeeper/trunk/CHANGES.txt
    hadoop/zookeeper/trunk/ivy.xml
    hadoop/zookeeper/trunk/ivysettings.xml
    hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/ClientCnxn.java
    hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/ZooKeeper.java
    hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/ZooKeeperMain.java
    hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/NettyServerCnxnFactory.java
    hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/TestableZooKeeper.java

Modified: hadoop/zookeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/CHANGES.txt?rev=1033770&r1=1033769&r2=1033770&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/CHANGES.txt (original)
+++ hadoop/zookeeper/trunk/CHANGES.txt Wed Nov 10 22:36:34 2010
@@ -187,9 +187,14 @@ IMPROVEMENTS:
 
   ZOOKEEPER-864. Hedwig C++ client improvements (Ivan Kelly via breed)
 
-  ZOOKEEPER-862. Hedwig created ledgers with hardcoded Bookkeeper ensemble and quorum size. Make these a server config parameter instead. (Erwin T via breed)
+  ZOOKEEPER-862. Hedwig created ledgers with hardcoded Bookkeeper ensemble and
+  quorum size. Make these a server config parameter instead. (Erwin T via breed)
 
-  ZOOKEEPER-926. Fork Hadoop common's test-patch.sh and modify for Zookeeper. (nigel)
+  ZOOKEEPER-926. Fork Hadoop common's test-patch.sh and modify for Zookeeper.
+  (nigel)
+
+  ZOOKEEPER-909. Extract NIO specific code from ClientCnxn
+  (Thomas Koch via phunt)
 
 NEW FEATURES:
   ZOOKEEPER-729. Java client API to recursively delete a subtree.

Modified: hadoop/zookeeper/trunk/ivy.xml
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/ivy.xml?rev=1033770&r1=1033769&r2=1033770&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/ivy.xml (original)
+++ hadoop/zookeeper/trunk/ivy.xml Wed Nov 10 22:36:34 2010
@@ -41,7 +41,7 @@
     <dependency org="log4j" name="log4j" rev="1.2.15" transitive="false" conf="default"/>
     <dependency org="jline" name="jline" rev="0.9.94" transitive="false" conf="default"/>
 
-    <dependency org="org.jboss.netty" name="netty" conf="default" rev="3.1.5.GA">
+    <dependency org="org.jboss.netty" name="netty" conf="default" rev="3.2.2.Final">
       <artifact name="netty" type="jar" conf="default"/>
     </dependency>
 

Modified: hadoop/zookeeper/trunk/ivysettings.xml
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/ivysettings.xml?rev=1033770&r1=1033769&r2=1033770&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/ivysettings.xml (original)
+++ hadoop/zookeeper/trunk/ivysettings.xml Wed Nov 10 22:36:34 2010
@@ -20,7 +20,7 @@
   <property name="repo.maven.org"
     value="http://repo1.maven.org/maven2/" override="false"/>
   <property name="repo.jboss.org"
-    value="http://repository.jboss.com/maven2/" override="false"/>
+    value="http://repository.jboss.org/nexus/content/groups/public/" override="false"/>
   <property name="repo.sun.org"
     value="http://download.java.net/maven/2/" override="false"/>
   <property name="maven2.pattern"

Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/ClientCnxn.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/ClientCnxn.java?rev=1033770&r1=1033769&r2=1033770&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/ClientCnxn.java (original)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/ClientCnxn.java Wed Nov 10 22:36:34 2010
@@ -25,14 +25,12 @@ import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.net.SocketAddress;
 import java.nio.ByteBuffer;
-import java.nio.channels.SelectionKey;
-import java.nio.channels.Selector;
-import java.nio.channels.SocketChannel;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.LinkedList;
 import java.util.Random;
 import java.util.Set;
+import java.util.concurrent.CopyOnWriteArraySet;
 import java.util.concurrent.LinkedBlockingQueue;
 
 import org.apache.jute.BinaryInputArchive;
@@ -55,7 +53,6 @@ import org.apache.zookeeper.ZooKeeper.Wa
 import org.apache.zookeeper.common.PathUtils;
 import org.apache.zookeeper.proto.AuthPacket;
 import org.apache.zookeeper.proto.ConnectRequest;
-import org.apache.zookeeper.proto.ConnectResponse;
 import org.apache.zookeeper.proto.CreateResponse;
 import org.apache.zookeeper.proto.ExistsResponse;
 import org.apache.zookeeper.proto.GetACLResponse;
@@ -85,8 +82,6 @@ public class ClientCnxn {
      * option allows the client to turn off this behavior by setting
      * the environment variable "zookeeper.disableAutoWatchReset" to "true" */
     private static boolean disableAutoWatchReset;
-
-    public static final int packetLen;
     static {
         // this var should not be public, but otw there is no easy way
         // to test
@@ -96,7 +91,6 @@ public class ClientCnxn {
             LOG.debug("zookeeper.disableAutoWatchReset is "
                     + disableAutoWatchReset);
         }
-        packetLen = Integer.getInteger("jute.maxbuffer", 4096 * 1024);
     }
 
     private final ArrayList<InetSocketAddress> serverAddrs =
@@ -113,7 +107,7 @@ public class ClientCnxn {
         byte data[];
     }
 
-    private final ArrayList<AuthData> authInfo = new ArrayList<AuthData>();
+    private final CopyOnWriteArraySet<AuthData> authInfo = new CopyOnWriteArraySet<AuthData>();
 
     /**
      * These are the packets that have been sent and are waiting for a response.
@@ -129,10 +123,11 @@ public class ClientCnxn {
 
     private int connectTimeout;
 
-    /** The timeout in ms the client negotiated with the server. This is the 
-     *  "real" timeout, not the timeout request by the client (which may
-     *  have been increased/decreased by the server which applies bounds
-     *  to this value.
+    /**
+     * The timeout in ms the client negotiated with the server. This is the
+     * "real" timeout, not the timeout request by the client (which may have
+     * been increased/decreased by the server which applies bounds to this
+     * value.
      */
     private volatile int negotiatedSessionTimeout;
 
@@ -154,15 +149,13 @@ public class ClientCnxn {
 
     final EventThread eventThread;
 
-    final Selector selector = Selector.open();
-
     /**
      * Set to true when close is called. Latches the connection such that we
      * don't attempt to re-connect to the server if in the middle of closing the
      * connection (client sends session disconnect to server as part of close
      * operation)
      */
-    volatile boolean closing = false;
+    private volatile boolean closing = false;
 
     public long getSessionId() {
         return sessionId;
@@ -180,56 +173,22 @@ public class ClientCnxn {
     public String toString() {
         StringBuilder sb = new StringBuilder();
 
-        SocketAddress local = getLocalSocketAddress();
-        SocketAddress remote = getRemoteSocketAddress();
+        SocketAddress local = sendThread.getClientCnxnSocket().getLocalSocketAddress();
+        SocketAddress remote = sendThread.getClientCnxnSocket().getRemoteSocketAddress();
         sb
             .append("sessionid:0x").append(Long.toHexString(getSessionId()))
             .append(" local:").append(local)
             .append(" remoteserver:").append(remote)
             .append(" lastZxid:").append(lastZxid)
             .append(" xid:").append(xid)
-            .append(" sent:").append(sendThread.sentCount)
-            .append(" recv:").append(sendThread.recvCount)
+            .append(" sent:").append(sendThread.getClientCnxnSocket().getSentCount())
+            .append(" recv:").append(sendThread.getClientCnxnSocket().getRecvCount())
             .append(" queuedpkts:").append(outgoingQueue.size())
             .append(" pendingresp:").append(pendingQueue.size())
             .append(" queuedevents:").append(eventThread.waitingEvents.size());
 
         return sb.toString();
     }
-    
-    /**
-     * Returns the address to which the socket is connected.
-     * @return ip address of the remote side of the connection or null if
-     *         not connected
-     */
-    SocketAddress getRemoteSocketAddress() {
-        // a lot could go wrong here, so rather than put in a bunch of code
-        // to check for nulls all down the chain let's do it the simple
-        // yet bulletproof way
-        try {
-            return ((SocketChannel)sendThread.sockKey.channel())
-                .socket().getRemoteSocketAddress();
-        } catch (NullPointerException e) {
-            return null;
-        }
-    }
-
-    /** 
-     * Returns the local address to which the socket is bound.
-     * @return ip address of the remote side of the connection or null if
-     *         not connected
-     */
-    SocketAddress getLocalSocketAddress() {
-        // a lot could go wrong here, so rather than put in a bunch of code
-        // to check for nulls all down the chain let's do it the simple
-        // yet bulletproof way
-        try {
-            return ((SocketChannel)sendThread.sockKey.channel())
-                .socket().getLocalSocketAddress();
-        } catch (NullPointerException e) {
-            return null;
-        }
-    }
 
     /**
      * This class allows us to pass the headers and the relevant records around.
@@ -319,13 +278,14 @@ public class ClientCnxn {
      * @param zooKeeper
      *                the zookeeper object that this connection is related to.
      * @param watcher watcher for this connection
+     * @param clientCnxnSocket
+     *                the socket implementation used (e.g. NIO/Netty)
      * @throws IOException
      */
     public ClientCnxn(String hosts, int sessionTimeout, ZooKeeper zooKeeper,
-            ClientWatchManager watcher)
-        throws IOException
-    {
-        this(hosts, sessionTimeout, zooKeeper, watcher, 0, new byte[16]);
+            ClientWatchManager watcher, ClientCnxnSocket clientCnxnSocket)
+            throws IOException {
+        this(hosts, sessionTimeout, zooKeeper, watcher, clientCnxnSocket, 0, new byte[16]);
     }
 
     /**
@@ -340,14 +300,15 @@ public class ClientCnxn {
      * @param zooKeeper
      *                the zookeeper object that this connection is related to.
      * @param watcher watcher for this connection
+     * @param clientCnxnSocket
+     *                the socket implementation used (e.g. NIO/Netty)
      * @param sessionId session id if re-establishing session
      * @param sessionPasswd session passwd if re-establishing session
      * @throws IOException
      */
     public ClientCnxn(String hosts, int sessionTimeout, ZooKeeper zooKeeper,
-            ClientWatchManager watcher, long sessionId, byte[] sessionPasswd)
-        throws IOException
-    {
+            ClientWatchManager watcher, ClientCnxnSocket clientCnxnSocket,
+            long sessionId, byte[] sessionPasswd) throws IOException {
         this.zooKeeper = zooKeeper;
         this.watcher = watcher;
         this.sessionId = sessionId;
@@ -389,7 +350,7 @@ public class ClientCnxn {
         connectTimeout = sessionTimeout / hostsList.length;
         readTimeout = sessionTimeout * 2 / 3;
         Collections.shuffle(serverAddrs);
-        sendThread = new SendThread();
+        sendThread = new SendThread(clientCnxnSocket);
         eventThread = new EventThread();
     }
 
@@ -412,9 +373,10 @@ public class ClientCnxn {
         eventThread.start();
     }
 
-    Object eventOfDeath = new Object();
+    private Object eventOfDeath = new Object();
 
-    final static UncaughtExceptionHandler uncaughtExceptionHandler = new UncaughtExceptionHandler() {
+    private final static UncaughtExceptionHandler uncaughtExceptionHandler = new UncaughtExceptionHandler() {
+        @Override
         public void uncaughtException(Thread t, Throwable e) {
             LOG.error("from " + t.getName(), e);
         }
@@ -640,7 +602,7 @@ public class ClientCnxn {
         if (p.replyHeader == null) {
             return;
         }
-        switch(zooKeeper.state) {
+        switch (state) {
         case AUTH_FAILED:
             p.replyHeader.setErr(KeeperException.Code.AUTHFAILED.intValue());
             break;
@@ -653,15 +615,16 @@ public class ClientCnxn {
         finishPacket(p);
     }
 
-    volatile long lastZxid;
+    private volatile long lastZxid;
 
-    private static class EndOfStreamException extends IOException {
+    static class EndOfStreamException extends IOException {
         private static final long serialVersionUID = -5438877188796231422L;
 
         public EndOfStreamException(String msg) {
             super(msg);
         }
         
+        @Override
         public String toString() {
             return "EndOfStreamException: " + getMessage();
         }
@@ -683,75 +646,21 @@ public class ClientCnxn {
         }
     }
     
+    public static final int packetLen = Integer.getInteger("jute.maxbuffer",
+            4096 * 1024);
+
     /**
      * This class services the outgoing request queue and generates the heart
      * beats. It also spawns the ReadThread.
      */
     class SendThread extends Thread {
-        SelectionKey sockKey;
-
-        final ByteBuffer lenBuffer = ByteBuffer.allocateDirect(4);
-
-        ByteBuffer incomingBuffer = lenBuffer;
-
-        boolean initialized;
-
         private long lastPingSentNs;
+        private final ClientCnxnSocket clientCnxnSocket;
+        private int lastConnectIndex = -1;
+        private int currentConnectIndex;
+        private Random r = new Random(System.nanoTime());
 
-        long sentCount = 0;
-        long recvCount = 0;
-
-        void readLength() throws IOException {
-            int len = incomingBuffer.getInt();
-            if (len < 0 || len >= packetLen) {
-                throw new IOException("Packet len" + len + " is out of range!");
-            }
-            incomingBuffer = ByteBuffer.allocate(len);
-        }
-
-        void readConnectResult() throws IOException {
-            if (LOG.isTraceEnabled()) {
-                StringBuffer buf = new StringBuffer("0x[");
-                for (byte b : incomingBuffer.array()) {
-                    buf.append(Integer.toHexString(b) + ",");
-                }
-                buf.append("]");
-                LOG.trace("readConnectRestult " + incomingBuffer.remaining() 
-                        + " " + buf.toString());
-            }
-            ByteBufferInputStream bbis = new ByteBufferInputStream(
-                    incomingBuffer);
-            BinaryInputArchive bbia = BinaryInputArchive.getArchive(bbis);
-            ConnectResponse conRsp = new ConnectResponse();
-            conRsp.deserialize(bbia, "connect");
-            negotiatedSessionTimeout = conRsp.getTimeOut();
-            if (negotiatedSessionTimeout <= 0) {
-                zooKeeper.state = States.CLOSED;
-
-                eventThread.queueEvent(new WatchedEvent(
-                        Watcher.Event.EventType.None,
-                        Watcher.Event.KeeperState.Expired, null));
-                eventThread.queueEventOfDeath();
-                throw new SessionExpiredException(
-                        "Unable to reconnect to ZooKeeper service, session 0x"
-                        + Long.toHexString(sessionId) + " has expired");
-            }
-            readTimeout = negotiatedSessionTimeout * 2 / 3;
-            connectTimeout = negotiatedSessionTimeout / serverAddrs.size();
-            sessionId = conRsp.getSessionId();
-            sessionPasswd = conRsp.getPasswd();
-            zooKeeper.state = States.CONNECTED;
-            LOG.info("Session establishment complete on server "
-                    + ((SocketChannel)sockKey.channel())
-                        .socket().getRemoteSocketAddress()
-                    + ", sessionid = 0x"
-                    + Long.toHexString(sessionId)
-                    + ", negotiated timeout = " + negotiatedSessionTimeout);
-            eventThread.queueEvent(new WatchedEvent(Watcher.Event.EventType.None,
-                    Watcher.Event.KeeperState.SyncConnected, null));
-        }
-
-        void readResponse() throws IOException {
+        void readResponse(ByteBuffer incomingBuffer) throws IOException {
             ByteBufferInputStream bbis = new ByteBufferInputStream(
                     incomingBuffer);
             BinaryInputArchive bbia = BinaryInputArchive.getArchive(bbis);
@@ -772,7 +681,7 @@ public class ClientCnxn {
             if (replyHdr.getXid() == -4) {
                 // -4 is the xid for AuthPacket               
                 if(replyHdr.getErr() == KeeperException.Code.AUTHFAILED.intValue()) {
-                    zooKeeper.state = States.AUTH_FAILED;                    
+                    state = States.AUTH_FAILED;                    
                     eventThread.queueEvent( new WatchedEvent(Watcher.Event.EventType.None, 
                             Watcher.Event.KeeperState.AuthFailed, null) );            		            		
                 }
@@ -809,12 +718,12 @@ public class ClientCnxn {
                 eventThread.queueEvent( we );
                 return;
             }
-            if (pendingQueue.size() == 0) {
-                throw new IOException("Nothing in the queue, but got "
-                        + replyHdr.getXid());
-            }
-            Packet packet = null;
+            Packet packet;
             synchronized (pendingQueue) {
+                if (pendingQueue.size() == 0) {
+                    throw new IOException("Nothing in the queue, but got "
+                            + replyHdr.getXid());
+                }
                 packet = pendingQueue.remove();
             }
             /*
@@ -825,9 +734,13 @@ public class ClientCnxn {
                 if (packet.header.getXid() != replyHdr.getXid()) {
                     packet.replyHeader.setErr(
                             KeeperException.Code.CONNECTIONLOSS.intValue());
-                    throw new IOException("Xid out of order. Got "
-                            + replyHdr.getXid() + " expected "
-                            + packet.header.getXid());
+                    throw new IOException("Xid out of order. Got Xid "
+                            + replyHdr.getXid() + " with err " +
+                            + replyHdr.getErr() +
+                            " expected Xid "
+                            + packet.header.getXid()
+                            + " for a packet with details: "
+                            + packet );
                 }
 
                 packet.replyHeader.setXid(replyHdr.getXid());
@@ -849,113 +762,34 @@ public class ClientCnxn {
             }
         }
 
-        /**
-         * @return true if a packet was received
-         * @throws InterruptedException
-         * @throws IOException
-         */
-        boolean doIO() throws InterruptedException, IOException {
-            boolean packetReceived = false;
-            SocketChannel sock = (SocketChannel) sockKey.channel();
-            if (sock == null) {
-                throw new IOException("Socket is null!");
-            }
-            if (sockKey.isReadable()) {
-                int rc = sock.read(incomingBuffer);
-                if (rc < 0) {
-                    throw new EndOfStreamException(
-                            "Unable to read additional data from server sessionid 0x"
-                            + Long.toHexString(sessionId)
-                            + ", likely server has closed socket");
-                }
-                if (!incomingBuffer.hasRemaining()) {
-                    incomingBuffer.flip();
-                    if (incomingBuffer == lenBuffer) {
-                        recvCount++;
-                        readLength();
-                    } else if (!initialized) {
-                        readConnectResult();
-                        enableRead();
-                        if (!outgoingQueue.isEmpty()) {
-                            enableWrite();
-                        }
-                        lenBuffer.clear();
-                        incomingBuffer = lenBuffer;
-                        packetReceived = true;
-                        initialized = true;
-                    } else {
-                        readResponse();
-                        lenBuffer.clear();
-                        incomingBuffer = lenBuffer;
-                        packetReceived = true;
-                    }
-                }
-            }
-            if (sockKey.isWritable()) {
-                synchronized (outgoingQueue) {
-                    if (!outgoingQueue.isEmpty()) {
-                        ByteBuffer pbb = outgoingQueue.getFirst().bb;
-                        sock.write(pbb);
-                        if (!pbb.hasRemaining()) {
-                            sentCount++;
-                            Packet p = outgoingQueue.removeFirst();
-                            if (p.header != null
-                                    && p.header.getType() != OpCode.ping
-                                    && p.header.getType() != OpCode.auth) {
-                                pendingQueue.add(p);
-                            }
-                        }
-                    }
-                }
-            }
-            if (outgoingQueue.isEmpty()) {
-                disableWrite();
-            } else {
-                enableWrite();
-            }
-            return packetReceived;
-        }
-
-        synchronized private void enableWrite() {
-            int i = sockKey.interestOps();
-            if ((i & SelectionKey.OP_WRITE) == 0) {
-                sockKey.interestOps(i | SelectionKey.OP_WRITE);
-            }
-        }
-
-        synchronized private void disableWrite() {
-            int i = sockKey.interestOps();
-            if ((i & SelectionKey.OP_WRITE) != 0) {
-                sockKey.interestOps(i & (~SelectionKey.OP_WRITE));
-            }
-        }
-
-        synchronized private void enableRead() {
-            int i = sockKey.interestOps();
-            if ((i & SelectionKey.OP_READ) == 0) {
-                sockKey.interestOps(i | SelectionKey.OP_READ);
-            }
+        SendThread(ClientCnxnSocket clientCnxnSocket) {
+            super(makeThreadName("-SendThread()"));
+            state = States.CONNECTING;
+            this.clientCnxnSocket = clientCnxnSocket;
+            setUncaughtExceptionHandler(uncaughtExceptionHandler);
+            setDaemon(true);
         }
 
-        synchronized private void disableRead() {
-            int i = sockKey.interestOps();
-            if ((i & SelectionKey.OP_READ) != 0) {
-                sockKey.interestOps(i & (~SelectionKey.OP_READ));
-            }
+        // TODO: can not name this method getState since Thread.getState()
+        // already exists
+        // It would be cleaner to make class SendThread an implementation of
+        // Runnable
+        /**
+         * Used by ClientCnxnSocket
+         * 
+         * @return
+         */
+        ZooKeeper.States getZkState() {
+            return state;
         }
 
-        SendThread() {
-            super(makeThreadName("-SendThread()"));
-            zooKeeper.state = States.CONNECTING;
-            setUncaughtExceptionHandler(uncaughtExceptionHandler);
-            setDaemon(true);
+        ClientCnxnSocket getClientCnxnSocket() {
+            return clientCnxnSocket;
         }
 
-        private void primeConnection(SelectionKey k) throws IOException {
+        void primeConnection() throws IOException {
             LOG.info("Socket connection established to "
-                    + ((SocketChannel)sockKey.channel())
-                        .socket().getRemoteSocketAddress()
-                    + ", initiating session");
+                    + clientCnxnSocket.getRemoteSocketAddress() + ", initiating session");
             lastConnectIndex = currentConnectIndex;
             ConnectRequest conReq = new ConnectRequest(0, lastZxid,
                     sessionTimeout, sessionId, sessionPasswd);
@@ -970,11 +804,12 @@ public class ClientCnxn {
             synchronized (outgoingQueue) {
                 // We add backwards since we are pushing into the front
                 // Only send if there's a pending watch
-                if (!disableAutoWatchReset &&
-                        (!zooKeeper.getDataWatches().isEmpty()
-                         || !zooKeeper.getExistWatches().isEmpty()
-                         || !zooKeeper.getChildWatches().isEmpty()))
-                {
+                // TODO: here we have the only remaining use of zooKeeper in
+                // this class. It's to be eliminated!
+                if (!disableAutoWatchReset
+                        && (!zooKeeper.getDataWatches().isEmpty()
+                                || !zooKeeper.getExistWatches().isEmpty() || !zooKeeper
+                                .getChildWatches().isEmpty())) {
                     SetWatches sw = new SetWatches(lastZxid,
                             zooKeeper.getDataWatches(),
                             zooKeeper.getExistWatches(),
@@ -995,13 +830,10 @@ public class ClientCnxn {
                 outgoingQueue.addFirst((new Packet(null, null, null, null, bb,
                         null)));
             }
-            synchronized (this) {
-                k.interestOps(SelectionKey.OP_READ | SelectionKey.OP_WRITE);
-            }
+            clientCnxnSocket.enableReadWriteOnly();
             if (LOG.isDebugEnabled()) {
                 LOG.debug("Session establishment request sent on "
-                        + ((SocketChannel)sockKey.channel())
-                            .socket().getRemoteSocketAddress());
+                        + clientCnxnSocket.getRemoteSocketAddress());
             }
         }
 
@@ -1011,12 +843,6 @@ public class ClientCnxn {
             queuePacket(h, null, null, null, null, null, null, null, null);
         }
 
-        int lastConnectIndex = -1;
-
-        int currentConnectIndex;
-
-        Random r = new Random(System.nanoTime());
-
         private void startConnect() throws IOException {
             if (lastConnectIndex == -1) {
                 // We don't want to delay the first try at a connect, so we
@@ -1037,7 +863,7 @@ public class ClientCnxn {
                     }
                 }
             }
-            zooKeeper.state = States.CONNECTING;
+            state = States.CONNECTING;
             currentConnectIndex = nextAddrToTry;
             InetSocketAddress addr = serverAddrs.get(nextAddrToTry);
             nextAddrToTry++;
@@ -1045,24 +871,11 @@ public class ClientCnxn {
                 nextAddrToTry = 0;
             }
             LOG.info("Opening socket connection to server " + addr);
-            SocketChannel sock;
-            sock = SocketChannel.open();
-            sock.configureBlocking(false);
-            sock.socket().setSoLinger(false, -1);
-            sock.socket().setTcpNoDelay(true);
+
             setName(getName().replaceAll("\\(.*\\)",
                     "(" + addr.getHostName() + ":" + addr.getPort() + ")"));
-            sockKey = sock.register(selector, SelectionKey.OP_CONNECT);
-            if (sock.connect(addr)) {
-                primeConnection(sockKey);
-            }
-            initialized = false;
 
-            /*
-             * Reset incomingBuffer
-             */
-            lenBuffer.clear();
-            incomingBuffer = lenBuffer;
+            clientCnxnSocket.connect(addr);
         }
 
         private static final String RETRY_CONN_MSG =
@@ -1070,39 +883,38 @@ public class ClientCnxn {
         
         @Override
         public void run() {
-            long now = System.currentTimeMillis();
-            long lastHeard = now;
-            long lastSend = now;
-            while (zooKeeper.state.isAlive()) {
+            clientCnxnSocket.introduce(this,sessionId);
+            clientCnxnSocket.updateNow();
+            clientCnxnSocket.updateLastSendAndHeard();
+            while (state.isAlive()) {
                 try {
-                    if (sockKey == null) {
+                    if (!clientCnxnSocket.isConnected()) {
                         // don't re-establish connection if we are closing
                         if (closing) {
                             break;
                         }
                         startConnect();
-                        lastSend = now;
-                        lastHeard = now;
+                        clientCnxnSocket.updateLastSendAndHeard();
                     }
-                    int idleRecv = (int) (now - lastHeard);
-                    int idleSend = (int) (now - lastSend);
-                    int to = readTimeout - idleRecv;
-                    if (zooKeeper.state != States.CONNECTED) {
-                        to = connectTimeout - idleRecv;
+
+                    int to = readTimeout - clientCnxnSocket.getIdleRecv();
+                    if (state != States.CONNECTED) {
+                        to = connectTimeout - clientCnxnSocket.getIdleRecv();
                     }
                     if (to <= 0) {
                         throw new SessionTimeoutException(
                                 "Client session timed out, have not heard from server in "
-                                + idleRecv + "ms"
-                                + " for sessionid 0x"
-                                + Long.toHexString(sessionId));
+                                        + clientCnxnSocket.getIdleRecv() + "ms"
+                                        + " for sessionid 0x"
+                                        + Long.toHexString(sessionId));
                     }
-                    if (zooKeeper.state == States.CONNECTED) {
-                        int timeToNextPing = readTimeout/2 - idleSend;
+                    if (state == States.CONNECTED) {
+                        int timeToNextPing = readTimeout / 2
+                                - clientCnxnSocket.getIdleSend();
                         if (timeToNextPing <= 0) {
                             sendPing();
-                            lastSend = now;
-                            enableWrite();
+                            clientCnxnSocket.updateLastSend();
+                            clientCnxnSocket.enableWrite();
                         } else {
                             if (timeToNextPing < to) {
                                 to = timeToNextPing;
@@ -1110,42 +922,8 @@ public class ClientCnxn {
                         }
                     }
 
-                    selector.select(to);
-                    Set<SelectionKey> selected;
-                    synchronized (this) {
-                        selected = selector.selectedKeys();
-                    }
-                    // Everything below and until we get back to the select is
-                    // non blocking, so time is effectively a constant. That is
-                    // Why we just have to do this once, here
-                    now = System.currentTimeMillis();
-                    for (SelectionKey k : selected) {
-                        SocketChannel sc = ((SocketChannel) k.channel());
-                        if ((k.readyOps() & SelectionKey.OP_CONNECT) != 0) {
-                            if (sc.finishConnect()) {
-                                lastHeard = now;
-                                lastSend = now;
-                                primeConnection(k);
-                            }
-                        } else if ((k.readyOps() & (SelectionKey.OP_READ | SelectionKey.OP_WRITE)) != 0) {
-                            if (outgoingQueue.size() > 0) {
-                                // We have something to send so it's the same
-                                // as if we do the send now.
-                                lastSend = now;
-                            }
-                            if (doIO()) {
-                                lastHeard = now;
-                            }
-                        }
-                    }
-                    if (zooKeeper.state == States.CONNECTED) {
-                        if (outgoingQueue.size() > 0) {
-                            enableWrite();
-                        } else {
-                            disableWrite();
-                        }
-                    }
-                    selected.clear();
+                    clientCnxnSocket.doTransport(to, pendingQueue, outgoingQueue);
+
                 } catch (Exception e) {
                     if (closing) {
                         if (LOG.isDebugEnabled()) {
@@ -1164,92 +942,38 @@ public class ClientCnxn {
                         } else if (e instanceof EndOfStreamException) {
                             LOG.info(e.getMessage() + RETRY_CONN_MSG);
                         } else {
-                            LOG.warn("Session 0x"
-                                    + Long.toHexString(getSessionId())
-                                    + " for server "
-                                    + ((SocketChannel)sockKey.channel())
-                                        .socket().getRemoteSocketAddress()
-                                    + ", unexpected error"
-                                    + RETRY_CONN_MSG,
-                                    e);
+                            LOG.warn(
+                                    "Session 0x"
+                                            + Long.toHexString(getSessionId())
+                                            + " for server "
+                                            + clientCnxnSocket.getRemoteSocketAddress()
+                                            + ", unexpected error"
+                                            + RETRY_CONN_MSG, e);
                         }
                         cleanup();
-                        if (zooKeeper.state.isAlive()) {
+                        if (state.isAlive()) {
                             eventThread.queueEvent(new WatchedEvent(
                                     Event.EventType.None,
                                     Event.KeeperState.Disconnected,
                                     null));
                         }
-
-                        now = System.currentTimeMillis();
-                        lastHeard = now;
-                        lastSend = now;
+                        clientCnxnSocket.updateNow();
+                        clientCnxnSocket.updateLastSendAndHeard();
                     }
                 }
             }
             cleanup();
-            try {
-                if (LOG.isTraceEnabled()) {
-                    LOG.trace("Doing client selector close");
-                }
-                selector.close();
-                if (LOG.isTraceEnabled()) {
-                    LOG.trace("Closed client selector");
-                }
-            } catch (IOException e) {
-                LOG.warn("Ignoring exception during selector close", e);
-            }
-            if (zooKeeper.state.isAlive()) {
-                eventThread.queueEvent(new WatchedEvent(
-                        Event.EventType.None,
-                        Event.KeeperState.Disconnected,
-                        null));
+            clientCnxnSocket.close();
+            if (state.isAlive()) {
+                eventThread.queueEvent(new WatchedEvent(Event.EventType.None,
+                        Event.KeeperState.Disconnected, null));
             }
             ZooTrace.logTraceMessage(LOG, ZooTrace.getTextTraceLevel(),
                                      "SendThread exitedloop.");
         }
 
         private void cleanup() {
-            if (sockKey != null) {
-                SocketChannel sock = (SocketChannel) sockKey.channel();
-                sockKey.cancel();
-                try {
-                    sock.socket().shutdownInput();
-                } catch (IOException e) {
-                    if (LOG.isDebugEnabled()) {
-                        LOG.debug("Ignoring exception during shutdown input", e);
-                    }
-                }
-                try {
-                    sock.socket().shutdownOutput();
-                } catch (IOException e) {
-                    if (LOG.isDebugEnabled()) {
-                        LOG.debug("Ignoring exception during shutdown output", e);
-                    }
-                }
-                try {
-                    sock.socket().close();
-                } catch (IOException e) {
-                    if (LOG.isDebugEnabled()) {
-                        LOG.debug("Ignoring exception during socket close", e);
-                    }
-                }
-                try {
-                    sock.close();
-                } catch (IOException e) {
-                    if (LOG.isDebugEnabled()) {
-                        LOG.debug("Ignoring exception during channel close", e);
-                    }
-                }
-            }
-            try {
-                Thread.sleep(100);
-            } catch (InterruptedException e) {
-                if (LOG.isDebugEnabled()) {
-                    LOG.debug("SendThread interrupted during sleep, ignoring");
-                }
-            }
-            sockKey = null;
+            clientCnxnSocket.cleanup();
             synchronized (pendingQueue) {
                 for (Packet p : pendingQueue) {
                     conLossPacket(p);
@@ -1264,11 +988,50 @@ public class ClientCnxn {
             }
         }
 
-        public void close() {
-            synchronized (this) {
-               zooKeeper.state = States.CLOSED;
-               selector.wakeup();
+        /**
+         * Callback invoked by the ClientCnxnSocket once a connection has been
+         * established.
+         * 
+         * @param _negotiatedSessionTimeout
+         * @param _sessionId
+         * @param _sessionPasswd
+         * @throws IOException
+         */
+        void onConnected(int _negotiatedSessionTimeout, long _sessionId,
+                byte[] _sessionPasswd) throws IOException {
+            negotiatedSessionTimeout = _negotiatedSessionTimeout;
+            if (negotiatedSessionTimeout <= 0) {
+                state = States.CLOSED;
+
+                eventThread.queueEvent(new WatchedEvent(
+                        Watcher.Event.EventType.None,
+                        Watcher.Event.KeeperState.Expired, null));
+                eventThread.queueEventOfDeath();
+                throw new SessionExpiredException(
+                        "Unable to reconnect to ZooKeeper service, session 0x"
+                                + Long.toHexString(sessionId) + " has expired");
             }
+            readTimeout = negotiatedSessionTimeout * 2 / 3;
+            connectTimeout = negotiatedSessionTimeout / serverAddrs.size();
+            sessionId = _sessionId;
+            sessionPasswd = _sessionPasswd;
+            state = States.CONNECTED;
+            LOG.info("Session establishment complete on server "
+                    + clientCnxnSocket.getRemoteSocketAddress() + ", sessionid = 0x"
+                    + Long.toHexString(sessionId) + ", negotiated timeout = "
+                    + negotiatedSessionTimeout);
+            eventThread.queueEvent(new WatchedEvent(
+                    Watcher.Event.EventType.None,
+                    Watcher.Event.KeeperState.SyncConnected, null));
+        }
+
+        void close() {
+            state = States.CLOSED;
+            clientCnxnSocket.wakeupCnxn();
+        }
+
+        void testableCloseSocket() throws IOException {
+            clientCnxnSocket.testableCloseSocket();
         }
     }
 
@@ -1314,6 +1077,8 @@ public class ClientCnxn {
 
     private int xid = 1;
 
+    private volatile States state;
+
     synchronized private int getXid() {
         return xid++;
     }
@@ -1347,7 +1112,7 @@ public class ClientCnxn {
             packet.ctx = ctx;
             packet.clientPath = clientPath;
             packet.serverPath = serverPath;
-            if (!zooKeeper.state.isAlive() || closing) {
+            if (!state.isAlive() || closing) {
                 conLossPacket(packet);
             } else {
                 // If the client is asking to close the session then
@@ -1358,14 +1123,12 @@ public class ClientCnxn {
                 outgoingQueue.add(packet);
             }
         }
-        synchronized (sendThread) {
-            selector.wakeup();
-        }
+        sendThread.getClientCnxnSocket().wakeupCnxn();
         return packet;
     }
 
     public void addAuthInfo(String scheme, byte auth[]) {
-        if (!zooKeeper.state.isAlive()) {
+        if (!state.isAlive()) {
             return;
         }
         authInfo.add(new AuthData(scheme, auth));
@@ -1373,4 +1136,8 @@ public class ClientCnxn {
                 new AuthPacket(0, scheme, auth), null, null, null, null,
                 null, null);
     }
+
+    States getState() {
+        return state;
+    }
 }

Added: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/ClientCnxnSocket.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/ClientCnxnSocket.java?rev=1033770&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/ClientCnxnSocket.java (added)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/ClientCnxnSocket.java Wed Nov 10 22:36:34 2010
@@ -0,0 +1,154 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.nio.ByteBuffer;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.jute.BinaryInputArchive;
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.ClientCnxn.Packet;
+import org.apache.zookeeper.proto.ConnectResponse;
+import org.apache.zookeeper.server.ByteBufferInputStream;
+
+/**
+ * A ClientCnxnSocket does the lower level communication with a socket
+ * implementation.
+ * 
+ * This code has been moved out of ClientCnxn so that a Netty implementation can
+ * be provided as an alternative to the NIO socket code.
+ * 
+ */
+abstract class ClientCnxnSocket {
+    private static final Logger LOG = Logger.getLogger(ClientCnxnSocket.class);
+
+    protected boolean initialized;
+
+    /**
+     * This buffer is only used to read the length of the incoming message.
+     */
+    protected final ByteBuffer lenBuffer = ByteBuffer.allocateDirect(4);
+
+    /**
+     * After the length is read, a new incomingBuffer is allocated in
+     * readLength() to receive the full message.
+     */
+    protected ByteBuffer incomingBuffer = lenBuffer;
+    protected long sentCount = 0;
+    protected long recvCount = 0;
+    protected long lastHeard;
+    protected long lastSend;
+    protected long now;
+    protected ClientCnxn.SendThread sendThread;
+
+    protected long sessionId;
+
+    void introduce(ClientCnxn.SendThread sendThread, long sessionId) {
+        this.sendThread = sendThread;
+        this.sessionId = sessionId;
+    }
+
+    void updateNow() {
+        now = System.currentTimeMillis();
+    }
+
+    int getIdleRecv() {
+        return (int) (now - lastHeard);
+    }
+
+    int getIdleSend() {
+        return (int) (now - lastSend);
+    }
+
+    long getSentCount() {
+        return sentCount;
+    }
+
+    long getRecvCount() {
+        return recvCount;
+    }
+
+    void updateLastHeard() {
+        this.lastHeard = now;
+    }
+
+    void updateLastSend() {
+        this.lastSend = now;
+    }
+
+    void updateLastSendAndHeard() {
+        this.lastSend = now;
+        this.lastHeard = now;
+    }
+
+    protected void readLength() throws IOException {
+        int len = incomingBuffer.getInt();
+        if (len < 0 || len >= ClientCnxn.packetLen) {
+            throw new IOException("Packet len" + len + " is out of range!");
+        }
+        incomingBuffer = ByteBuffer.allocate(len);
+    }
+
+    void readConnectResult() throws IOException {
+        if (LOG.isTraceEnabled()) {
+            StringBuffer buf = new StringBuffer("0x[");
+            for (byte b : incomingBuffer.array()) {
+                buf.append(Integer.toHexString(b) + ",");
+            }
+            buf.append("]");
+            LOG.trace("readConnectRestult " + incomingBuffer.remaining() + " "
+                    + buf.toString());
+        }
+        ByteBufferInputStream bbis = new ByteBufferInputStream(incomingBuffer);
+        BinaryInputArchive bbia = BinaryInputArchive.getArchive(bbis);
+        ConnectResponse conRsp = new ConnectResponse();
+        conRsp.deserialize(bbia, "connect");
+        this.sessionId = conRsp.getSessionId();
+        sendThread.onConnected(conRsp.getTimeOut(), this.sessionId,
+                conRsp.getPasswd());
+    }
+
+    abstract boolean isConnected();
+
+    abstract void connect(InetSocketAddress addr) throws IOException;
+
+    abstract SocketAddress getRemoteSocketAddress();
+
+    abstract SocketAddress getLocalSocketAddress();
+
+    abstract void cleanup();
+
+    abstract void close();
+
+    abstract void wakeupCnxn();
+
+    abstract void enableWrite();
+
+    abstract void enableReadWriteOnly();
+
+    abstract void doTransport(int waitTimeOut, List<Packet> pendingQueue,
+            LinkedList<Packet> outgoingQueue) throws IOException,
+            InterruptedException;
+
+    abstract void testableCloseSocket() throws IOException;
+}

Added: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/ClientCnxnSocketNIO.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/ClientCnxnSocketNIO.java?rev=1033770&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/ClientCnxnSocketNIO.java (added)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/ClientCnxnSocketNIO.java Wed Nov 10 22:36:34 2010
@@ -0,0 +1,318 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.nio.ByteBuffer;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+import java.nio.channels.SocketChannel;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.ClientCnxn.EndOfStreamException;
+import org.apache.zookeeper.ClientCnxn.Packet;
+import org.apache.zookeeper.ZooDefs.OpCode;
+import org.apache.zookeeper.ZooKeeper.States;
+
+public class ClientCnxnSocketNIO extends ClientCnxnSocket {
+    private static final Logger LOG = Logger
+            .getLogger(ClientCnxnSocketNIO.class);
+
+    private final Selector selector = Selector.open();
+
+    private SelectionKey sockKey;
+
+    ClientCnxnSocketNIO() throws IOException {
+        super();
+    }
+
+    @Override
+    boolean isConnected() {
+        return sockKey != null;
+    }
+    
+    /**
+     * @return true if a packet was received
+     * @throws InterruptedException
+     * @throws IOException
+     */
+    boolean doIO(List<Packet> pendingQueue, LinkedList<Packet> outgoingQueue) throws InterruptedException, IOException {
+        boolean packetReceived = false;
+        SocketChannel sock = (SocketChannel) sockKey.channel();
+        if (sock == null) {
+            throw new IOException("Socket is null!");
+        }
+        if (sockKey.isReadable()) {
+            int rc = sock.read(incomingBuffer);
+            if (rc < 0) {
+                throw new EndOfStreamException(
+                        "Unable to read additional data from server sessionid 0x"
+                                + Long.toHexString(sessionId)
+                                + ", likely server has closed socket");
+            }
+            if (!incomingBuffer.hasRemaining()) {
+                incomingBuffer.flip();
+                if (incomingBuffer == lenBuffer) {
+                    recvCount++;
+                    readLength();
+                } else if (!initialized) {
+                    readConnectResult();
+                    enableRead();
+                    if (!outgoingQueue.isEmpty()) {
+                        enableWrite();
+                    }
+                    lenBuffer.clear();
+                    incomingBuffer = lenBuffer;
+                    packetReceived = true;
+                    initialized = true;
+                } else {
+                    sendThread.readResponse(incomingBuffer);
+                    lenBuffer.clear();
+                    incomingBuffer = lenBuffer;
+                    packetReceived = true;
+                }
+            }
+        }
+        if (sockKey.isWritable()) {
+            synchronized (outgoingQueue) {
+                if (!outgoingQueue.isEmpty()) {
+                    ByteBuffer pbb = outgoingQueue.getFirst().bb;
+                    sock.write(pbb);
+                    if (!pbb.hasRemaining()) {
+                        sentCount++;
+                        Packet p = outgoingQueue.removeFirst();
+                        if (p.header != null
+                                && p.header.getType() != OpCode.ping
+                                && p.header.getType() != OpCode.auth) {
+                            pendingQueue.add(p);
+                        }
+                    }
+                }
+            }
+        }
+        if (outgoingQueue.isEmpty()) {
+            disableWrite();
+        } else {
+            enableWrite();
+        }
+        return packetReceived;
+    }
+
+    @Override
+    void cleanup() {
+        if (sockKey != null) {
+            SocketChannel sock = (SocketChannel) sockKey.channel();
+            sockKey.cancel();
+            try {
+                sock.socket().shutdownInput();
+            } catch (IOException e) {
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("Ignoring exception during shutdown input", e);
+                }
+            }
+            try {
+                sock.socket().shutdownOutput();
+            } catch (IOException e) {
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("Ignoring exception during shutdown output",
+                            e);
+                }
+            }
+            try {
+                sock.socket().close();
+            } catch (IOException e) {
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("Ignoring exception during socket close", e);
+                }
+            }
+            try {
+                sock.close();
+            } catch (IOException e) {
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("Ignoring exception during channel close", e);
+                }
+            }
+        }
+        try {
+            Thread.sleep(100);
+        } catch (InterruptedException e) {
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("SendThread interrupted during sleep, ignoring");
+            }
+        }
+        sockKey = null;
+    }
+ 
+    @Override
+    void close() {
+        try {
+            if (LOG.isTraceEnabled()) {
+                LOG.trace("Doing client selector close");
+            }
+            selector.close();
+            if (LOG.isTraceEnabled()) {
+                LOG.trace("Closed client selector");
+            }
+        } catch (IOException e) {
+            LOG.warn("Ignoring exception during selector close", e);
+        }
+    }
+
+    @Override
+    void connect(InetSocketAddress addr) throws IOException {
+        SocketChannel sock;
+        sock = SocketChannel.open();
+        sock.configureBlocking(false);
+        sock.socket().setSoLinger(false, -1);
+        sock.socket().setTcpNoDelay(true);
+        sockKey = sock.register(selector, SelectionKey.OP_CONNECT);
+        if (sock.connect(addr)) {
+            sendThread.primeConnection();
+        }
+        initialized = false;
+
+        /*
+         * Reset incomingBuffer
+         */
+        lenBuffer.clear();
+        incomingBuffer = lenBuffer;
+    }
+
+    /**
+     * Returns the address to which the socket is connected.
+     * 
+     * @return ip address of the remote side of the connection or null if not
+     *         connected
+     */
+    @Override
+    SocketAddress getRemoteSocketAddress() {
+        // a lot could go wrong here, so rather than put in a bunch of code
+        // to check for nulls all down the chain let's do it the simple
+        // yet bulletproof way
+        try {
+            return ((SocketChannel) sockKey.channel()).socket()
+                    .getRemoteSocketAddress();
+        } catch (NullPointerException e) {
+            return null;
+        }
+    }
+
+    /**
+     * Returns the local address to which the socket is bound.
+     * 
+     * @return ip address of the remote side of the connection or null if not
+     *         connected
+     */
+    @Override
+    SocketAddress getLocalSocketAddress() {
+        // a lot could go wrong here, so rather than put in a bunch of code
+        // to check for nulls all down the chain let's do it the simple
+        // yet bulletproof way
+        try {
+            return ((SocketChannel) sockKey.channel()).socket()
+                    .getLocalSocketAddress();
+        } catch (NullPointerException e) {
+            return null;
+        }
+    }
+
+    @Override
+    synchronized void wakeupCnxn() {
+        selector.wakeup();
+    }
+    
+    @Override
+    void doTransport(int waitTimeOut, List<Packet> pendingQueue, LinkedList<Packet> outgoingQueue )
+            throws IOException, InterruptedException {
+        selector.select(waitTimeOut);
+        Set<SelectionKey> selected;
+        synchronized (this) {
+            selected = selector.selectedKeys();
+        }
+        // Everything below and until we get back to the select is
+        // non blocking, so time is effectively a constant. That is
+        // Why we just have to do this once, here
+        updateNow();
+        for (SelectionKey k : selected) {
+            SocketChannel sc = ((SocketChannel) k.channel());
+            if ((k.readyOps() & SelectionKey.OP_CONNECT) != 0) {
+                if (sc.finishConnect()) {
+                    updateLastSendAndHeard();
+                    sendThread.primeConnection();
+                }
+            } else if ((k.readyOps() & (SelectionKey.OP_READ | SelectionKey.OP_WRITE)) != 0) {
+                if (outgoingQueue.size() > 0) {
+                    // We have something to send so it's the same
+                    // as if we do the send now.
+                    updateLastSend();
+                }
+                if (doIO(pendingQueue, outgoingQueue)) {
+                    updateLastHeard();
+                }
+            }
+        }
+        if (sendThread.getZkState() == States.CONNECTED) {
+            if (outgoingQueue.size() > 0) {
+                enableWrite();
+            } else {
+                disableWrite();
+            }
+        }
+        selected.clear();
+    }
+
+    //TODO should this be synchronized?
+    @Override
+    void testableCloseSocket() throws IOException {
+        LOG.info("testableCloseSocket() called");
+        ((SocketChannel) sockKey.channel()).socket().close();
+    }
+
+    @Override
+    synchronized void enableWrite() {
+        int i = sockKey.interestOps();
+        if ((i & SelectionKey.OP_WRITE) == 0) {
+            sockKey.interestOps(i | SelectionKey.OP_WRITE);
+        }
+    }
+
+    private synchronized void disableWrite() {
+        int i = sockKey.interestOps();
+        if ((i & SelectionKey.OP_WRITE) != 0) {
+            sockKey.interestOps(i & (~SelectionKey.OP_WRITE));
+        }
+    }
+
+    synchronized private void enableRead() {
+        int i = sockKey.interestOps();
+        if ((i & SelectionKey.OP_READ) == 0) {
+            sockKey.interestOps(i | SelectionKey.OP_READ);
+        }
+    }
+
+    @Override
+    synchronized void enableReadWriteOnly() {
+        sockKey.interestOps(SelectionKey.OP_READ | SelectionKey.OP_WRITE);
+    }
+}
\ No newline at end of file

Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/ZooKeeper.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/ZooKeeper.java?rev=1033770&r1=1033769&r2=1033770&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/ZooKeeper.java (original)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/ZooKeeper.java Wed Nov 10 22:36:34 2010
@@ -105,6 +105,7 @@ import org.apache.zookeeper.server.DataT
  */
 public class ZooKeeper {
     private static final Logger LOG;
+    public static final String ZOOKEEPER_CLIENT_CNXN_SOCKET = "zookeeper.clientCnxnSocket";
 
     static {
         LOG = Logger.getLogger(ZooKeeper.class);
@@ -154,6 +155,7 @@ public class ZooKeeper {
         /* (non-Javadoc)
          * @see org.apache.zookeeper.ClientWatchManager#materialize(Event.KeeperState, Event.EventType, java.lang.String)
          */
+        @Override
         public Set<Watcher> materialize(Watcher.Event.KeeperState state,
                                         Watcher.Event.EventType type,
                                         String clientPath)
@@ -322,8 +324,6 @@ public class ZooKeeper {
         }
     }
 
-    volatile States state;
-
     protected final ClientCnxn cnxn;
 
     /**
@@ -376,7 +376,8 @@ public class ZooKeeper {
                 + " sessionTimeout=" + sessionTimeout + " watcher=" + watcher);
 
         watchManager.defaultWatcher = watcher;
-        cnxn = new ClientCnxn(connectString, sessionTimeout, this, watchManager);
+        cnxn = new ClientCnxn(connectString, sessionTimeout, this,
+                watchManager, getClientCnxnSocket());
         cnxn.start();
     }
 
@@ -443,8 +444,8 @@ public class ZooKeeper {
                 + (sessionPasswd == null ? "<null>" : "<hidden>"));
 
         watchManager.defaultWatcher = watcher;
-        cnxn = new ClientCnxn(connectString, sessionTimeout, this, watchManager,
-                sessionId, sessionPasswd);
+        cnxn = new ClientCnxn(connectString, sessionTimeout, this,
+                watchManager, getClientCnxnSocket(), sessionId, sessionPasswd);
         cnxn.start();
     }
 
@@ -518,7 +519,7 @@ public class ZooKeeper {
      * @throws InterruptedException
      */
     public synchronized void close() throws InterruptedException {
-        if (!state.isAlive()) {
+        if (!cnxn.getState().isAlive()) {
             if (LOG.isDebugEnabled()) {
                 LOG.debug("Close called on already closed client");
             }
@@ -1557,7 +1558,7 @@ public class ZooKeeper {
     }
 
     public States getState() {
-        return state;
+        return cnxn.getState();
     }
 
     /**
@@ -1617,7 +1618,7 @@ public class ZooKeeper {
      *         not connected
      */
     protected SocketAddress testableRemoteSocketAddress() {
-        return cnxn.getRemoteSocketAddress();
+        return cnxn.sendThread.getClientCnxnSocket().getRemoteSocketAddress();
     }
 
     /** 
@@ -1630,6 +1631,23 @@ public class ZooKeeper {
      *         not connected
      */
     protected SocketAddress testableLocalSocketAddress() {
-        return cnxn.getLocalSocketAddress();
+        return cnxn.sendThread.getClientCnxnSocket().getLocalSocketAddress();
+    }
+
+    private static ClientCnxnSocket getClientCnxnSocket() throws IOException {
+        String clientCnxnSocketName = System
+                .getProperty(ZOOKEEPER_CLIENT_CNXN_SOCKET);
+        if (clientCnxnSocketName == null) {
+            clientCnxnSocketName = ClientCnxnSocketNIO.class.getName();
+        }
+        try {
+            return (ClientCnxnSocket) Class.forName(clientCnxnSocketName)
+                    .newInstance();
+        } catch (Exception e) {
+            IOException ioe = new IOException("Couldn't instantiate "
+                    + clientCnxnSocketName);
+            ioe.initCause(e);
+            throw ioe;
+        }
     }
 }

Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/ZooKeeperMain.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/ZooKeeperMain.java?rev=1033770&r1=1033769&r2=1033770&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/ZooKeeperMain.java (original)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/ZooKeeperMain.java Wed Nov 10 22:36:34 2010
@@ -653,7 +653,7 @@ public class ZooKeeperMain {
         } 
         
         // Below commands all need a live connection
-        if (zk == null || !zk.state.isAlive()) {
+        if (zk == null || !zk.getState().isAlive()) {
             System.out.println("Not connected");
             return false;
         }

Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/NettyServerCnxnFactory.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/NettyServerCnxnFactory.java?rev=1033770&r1=1033769&r2=1033770&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/NettyServerCnxnFactory.java (original)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/NettyServerCnxnFactory.java Wed Nov 10 22:36:34 2010
@@ -33,8 +33,8 @@ import org.jboss.netty.bootstrap.ServerB
 import org.jboss.netty.buffer.ChannelBuffer;
 import org.jboss.netty.buffer.ChannelBuffers;
 import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelHandler.Sharable;
 import org.jboss.netty.channel.ChannelHandlerContext;
-import org.jboss.netty.channel.ChannelPipelineCoverage;
 import org.jboss.netty.channel.ChannelStateEvent;
 import org.jboss.netty.channel.ExceptionEvent;
 import org.jboss.netty.channel.MessageEvent;
@@ -61,7 +61,7 @@ public class NettyServerCnxnFactory exte
      * NettyServerCnxnFactory already extends ServerCnxnFactory. By making it inner
      * this class gets access to the member variables and methods.
      */
-    @ChannelPipelineCoverage("all")
+    @Sharable
     class CnxnChannelHandler extends SimpleChannelHandler {
 
         @Override

Modified: hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/TestableZooKeeper.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/TestableZooKeeper.java?rev=1033770&r1=1033769&r2=1033770&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/TestableZooKeeper.java (original)
+++ hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/TestableZooKeeper.java Wed Nov 10 22:36:34 2010
@@ -59,7 +59,7 @@ public class TestableZooKeeper extends Z
                 synchronized(cnxn) {
                     try {
                         try {
-                            ((SocketChannel)cnxn.sendThread.sockKey.channel()).socket().close();
+                            cnxn.sendThread.testableCloseSocket();
                         } catch (IOException e) {
                             e.printStackTrace();
                         }