You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by ph...@apache.org on 2010/08/18 08:24:09 UTC

svn commit: r986575 [3/4] - in /hadoop/zookeeper/trunk: ./ src/docs/src/documentation/content/xdocs/ src/java/libtest/ src/java/main/org/apache/zookeeper/ src/java/main/org/apache/zookeeper/server/ src/java/main/org/apache/zookeeper/server/auth/ src/ja...

Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java?rev=986575&r1=986574&r2=986575&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java (original)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java Wed Aug 18 06:24:08 2010
@@ -18,8 +18,10 @@
 
 package org.apache.zookeeper.server;
 
+import java.io.ByteArrayOutputStream;
 import java.io.File;
 import java.io.IOException;
+import java.io.InputStream;
 import java.io.PrintWriter;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
@@ -31,18 +33,27 @@ import java.util.Random;
 import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.jute.BinaryInputArchive;
+import org.apache.jute.BinaryOutputArchive;
 import org.apache.jute.Record;
 import org.apache.log4j.Logger;
 import org.apache.zookeeper.Environment;
+import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.KeeperException.SessionExpiredException;
 import org.apache.zookeeper.ZooDefs.OpCode;
 import org.apache.zookeeper.data.ACL;
 import org.apache.zookeeper.data.Id;
 import org.apache.zookeeper.data.StatPersisted;
 import org.apache.zookeeper.jmx.MBeanRegistry;
+import org.apache.zookeeper.proto.AuthPacket;
+import org.apache.zookeeper.proto.ConnectRequest;
+import org.apache.zookeeper.proto.ConnectResponse;
+import org.apache.zookeeper.proto.ReplyHeader;
 import org.apache.zookeeper.proto.RequestHeader;
+import org.apache.zookeeper.server.ServerCnxn.CloseRequestException;
 import org.apache.zookeeper.server.SessionTracker.Session;
 import org.apache.zookeeper.server.SessionTracker.SessionExpirer;
+import org.apache.zookeeper.server.auth.AuthenticationProvider;
+import org.apache.zookeeper.server.auth.ProviderRegistry;
 import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
 
 /**
@@ -62,16 +73,7 @@ public class ZooKeeperServer implements 
     protected ZooKeeperServerBean jmxServerBean;
     protected DataTreeBean jmxDataTreeBean;
 
-    /**
-     * Create an instance of ZooKeeper server
-     */
-    static public interface Factory {
-        public ZooKeeperServer createServer() throws IOException;
-
-        public NIOServerCnxn.Factory createConnectionFactory()
-                throws IOException;
-    }
-
+ 
     /**
      * The server delegates loading of the tree to an instance of the interface
      */
@@ -112,7 +114,7 @@ public class ZooKeeperServer implements 
     final HashMap<String, ChangeRecord> outstandingChangesForPath =
         new HashMap<String, ChangeRecord>();
     
-    private NIOServerCnxn.Factory serverCnxnFactory;
+    private ServerCnxnFactory serverCnxnFactory;
 
     private final ServerStats serverStats;
 
@@ -182,7 +184,7 @@ public class ZooKeeperServer implements 
         pwriter.print("tickTime=");
         pwriter.println(getTickTime());
         pwriter.print("maxClientCnxns=");
-        pwriter.println(serverCnxnFactory.getMaxClientCnxns());
+        pwriter.println(serverCnxnFactory.getMaxClientCnxnsPerHost());
         pwriter.print("minSessionTimeout=");
         pwriter.println(getMinSessionTimeout());
         pwriter.print("maxSessionTimeout=");
@@ -240,7 +242,7 @@ public class ZooKeeperServer implements 
         setZxid(zkDb.loadDataBase());
         // Clean up dead sessions
         LinkedList<Long> deadSessions = new LinkedList<Long>();
-        for (long session : zkDb.getSessions()) {
+        for (Long session : zkDb.getSessions()) {
             sessionsWithTimeouts = zkDb.getSessionWithTimeOuts();
             if (sessionsWithTimeouts.get(session) == null) {
                 deadSessions.add(session);
@@ -400,6 +402,8 @@ public class ZooKeeperServer implements 
     }
 
     public void shutdown() {
+        LOG.info("shutting down");
+
         // new RuntimeException("Calling shutdown").printStackTrace();
         this.running = false;
         // Since sessionTracker and syncThreads poll we just have to
@@ -497,8 +501,7 @@ public class ZooKeeperServer implements 
                 && Arrays.equals(passwd, generatePasswd(sessionId));
     }
 
-    long createSession(ServerCnxn cnxn, byte passwd[], int timeout)
-            throws InterruptedException {
+    long createSession(ServerCnxn cnxn, byte passwd[], int timeout) {
         long sessionId = sessionTracker.createSession(timeout);
         Random r = new Random(sessionId ^ superSecret);
         r.nextBytes(passwd);
@@ -520,25 +523,71 @@ public class ZooKeeperServer implements 
     }
 
     protected void revalidateSession(ServerCnxn cnxn, long sessionId,
-            int sessionTimeout) throws IOException, InterruptedException {
+            int sessionTimeout) throws IOException {
         boolean rc = sessionTracker.touchSession(sessionId, sessionTimeout);
         if (LOG.isTraceEnabled()) {
             ZooTrace.logTraceMessage(LOG,ZooTrace.SESSION_TRACE_MASK,
                                      "Session 0x" + Long.toHexString(sessionId) +
                     " is valid: " + rc);
         }
-        cnxn.finishSessionInit(rc);
+        finishSessionInit(cnxn, rc);
     }
 
     public void reopenSession(ServerCnxn cnxn, long sessionId, byte[] passwd,
-            int sessionTimeout) throws IOException, InterruptedException {
+            int sessionTimeout) throws IOException {
         if (!checkPasswd(sessionId, passwd)) {
-            cnxn.finishSessionInit(false);
+            finishSessionInit(cnxn, false);
         } else {
             revalidateSession(cnxn, sessionId, sessionTimeout);
         }
     }
 
+    public void finishSessionInit(ServerCnxn cnxn, boolean valid) {
+        // register with JMX
+        try {
+            if (valid) {
+                serverCnxnFactory.registerConnection(cnxn);
+            }
+        } catch (Exception e) {
+                LOG.warn("Failed to register with JMX", e);
+        }
+
+        try {
+            ConnectResponse rsp = new ConnectResponse(0, valid ? cnxn.getSessionTimeout()
+                    : 0, valid ? cnxn.getSessionId() : 0, // send 0 if session is no
+                            // longer valid
+                            valid ? generatePasswd(cnxn.getSessionId()) : new byte[16]);
+            ByteArrayOutputStream baos = new ByteArrayOutputStream();
+            BinaryOutputArchive bos = BinaryOutputArchive.getArchive(baos);
+            bos.writeInt(-1, "len");
+            rsp.serialize(bos, "connect");
+            baos.close();
+            ByteBuffer bb = ByteBuffer.wrap(baos.toByteArray());
+            bb.putInt(bb.remaining() - 4).rewind();
+            cnxn.sendBuffer(bb);    
+
+            if (!valid) {
+                LOG.info("Invalid session 0x"
+                        + Long.toHexString(cnxn.getSessionId())
+                        + " for client "
+                        + cnxn.getRemoteSocketAddress()
+                        + ", probably expired");
+                cnxn.sendBuffer(ServerCnxnFactory.closeConn);
+            } else {
+                LOG.info("Established session 0x"
+                        + Long.toHexString(cnxn.getSessionId())
+                        + " with negotiated timeout " + cnxn.getSessionTimeout()
+                        + " for client "
+                        + cnxn.getRemoteSocketAddress());
+            }
+                
+            cnxn.enableRecv();
+        } catch (Exception e) {
+            LOG.warn("Exception while establishing session, closing", e);
+            cnxn.close();
+        }
+    }
+    
     public void closeSession(ServerCnxn cnxn, RequestHeader requestHeader) {
         closeSession(cnxn.getSessionId());
     }
@@ -620,11 +669,11 @@ public class ZooKeeperServer implements 
         return limit;
     }
 
-    public void setServerCnxnFactory(NIOServerCnxn.Factory factory) {
+    public void setServerCnxnFactory(ServerCnxnFactory factory) {
         serverCnxnFactory = factory;
     }
 
-    public NIOServerCnxn.Factory getServerCnxnFactory() {
+    public ServerCnxnFactory getServerCnxnFactory() {
         return serverCnxnFactory;
     }
 
@@ -684,7 +733,7 @@ public class ZooKeeperServer implements 
     }
 
     public int getClientPort() {
-        return serverCnxnFactory != null ? serverCnxnFactory.ss.socket().getLocalPort() : -1;
+        return serverCnxnFactory != null ? serverCnxnFactory.getLocalPort() : -1;
     }
 
     public void setTxnLogFactory(FileTxnSnapLog txnLog) {
@@ -703,4 +752,115 @@ public class ZooKeeperServer implements 
     	zkDb.dumpEphemerals(pwriter);
     }
     
+    public void processConnectRequest(ServerCnxn cnxn, ByteBuffer incomingBuffer) throws IOException {
+        BinaryInputArchive bia = BinaryInputArchive.getArchive(new ByteBufferInputStream(incomingBuffer));
+        ConnectRequest connReq = new ConnectRequest();
+        connReq.deserialize(bia, "connect");
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Session establishment request from client "
+                    + cnxn.getRemoteSocketAddress()
+                    + " client's lastZxid is 0x"
+                    + Long.toHexString(connReq.getLastZxidSeen()));
+        }
+        if (connReq.getLastZxidSeen() > zkDb.dataTree.lastProcessedZxid) {
+            String msg = "Refusing session request for client "
+                + cnxn.getRemoteSocketAddress()
+                + " as it has seen zxid 0x"
+                + Long.toHexString(connReq.getLastZxidSeen())
+                + " our last zxid is 0x"
+                + Long.toHexString(getZKDatabase().getDataTreeLastProcessedZxid())
+                + " client must try another server";
+
+            LOG.info(msg);
+            throw new CloseRequestException(msg);
+        }
+        int sessionTimeout = connReq.getTimeOut();
+        byte passwd[] = connReq.getPasswd();
+        int minSessionTimeout = getMinSessionTimeout();
+        if (sessionTimeout < minSessionTimeout) {
+            sessionTimeout = minSessionTimeout;
+        }
+        int maxSessionTimeout = getMaxSessionTimeout();
+        if (sessionTimeout > maxSessionTimeout) {
+            sessionTimeout = maxSessionTimeout;
+        }
+        cnxn.setSessionTimeout(sessionTimeout);
+        // We don't want to receive any packets until we are sure that the
+        // session is setup
+        cnxn.disableRecv();
+        long sessionId = connReq.getSessionId();
+        if (sessionId != 0) {
+            long clientSessionId = connReq.getSessionId();
+            LOG.info("Client attempting to renew session 0x"
+                    + Long.toHexString(clientSessionId)
+                    + " at " + cnxn.getRemoteSocketAddress());
+            serverCnxnFactory.closeSession(sessionId);
+            cnxn.setSessionId(sessionId);
+            reopenSession(cnxn, sessionId, passwd, sessionTimeout);
+        } else {
+            LOG.info("Client attempting to establish new session at "
+                    + cnxn.getRemoteSocketAddress());
+            createSession(cnxn, passwd, sessionTimeout);
+        }
+    }
+
+    public boolean shouldThrottle(long outStandingCount) {
+        if (getGlobalOutstandingLimit() < getInProcess()) {
+            return outStandingCount > 0;
+        }
+        return false; 
+    }
+
+    public void processPacket(ServerCnxn cnxn, ByteBuffer incomingBuffer) throws IOException {
+        // We have the request, now process and setup for next
+        InputStream bais = new ByteBufferInputStream(incomingBuffer);
+        BinaryInputArchive bia = BinaryInputArchive.getArchive(bais);
+        RequestHeader h = new RequestHeader();
+        h.deserialize(bia, "header");
+        // Through the magic of byte buffers, txn will not be
+        // pointing
+        // to the start of the txn
+        incomingBuffer = incomingBuffer.slice();
+        if (h.getType() == OpCode.auth) {
+            LOG.info("got auth packet " + cnxn.getRemoteSocketAddress());
+            AuthPacket authPacket = new AuthPacket();
+            ZooKeeperServer.byteBuffer2Record(incomingBuffer, authPacket);
+            String scheme = authPacket.getScheme();
+            AuthenticationProvider ap = ProviderRegistry.getProvider(scheme);
+            if (ap == null
+                    || (ap.handleAuthentication(cnxn, authPacket.getAuth())
+                            != KeeperException.Code.OK)) {
+                if (ap == null) {
+                    LOG.warn("No authentication provider for scheme: "
+                            + scheme + " has "
+                            + ProviderRegistry.listProviders());
+                } else {
+                    LOG.warn("Authentication failed for scheme: " + scheme);
+                }
+                // send a response...
+                ReplyHeader rh = new ReplyHeader(h.getXid(), 0,
+                        KeeperException.Code.AUTHFAILED.intValue());
+                cnxn.sendResponse(rh, null, null);
+                // ... and close connection
+                cnxn.sendBuffer(ServerCnxnFactory.closeConn);
+                cnxn.disableRecv();
+            } else {
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("Authentication succeeded for scheme: "
+                              + scheme);
+                }
+                LOG.info("auth success " + cnxn.getRemoteSocketAddress());
+                ReplyHeader rh = new ReplyHeader(h.getXid(), 0,
+                        KeeperException.Code.OK.intValue());
+                cnxn.sendResponse(rh, null, null);
+            }
+            return;
+        } else {
+            Request si = new Request(cnxn, cnxn.getSessionId(), h.getXid(),
+                    h.getType(), incomingBuffer, cnxn.getAuthInfo());
+            si.setOwner(ServerCnxn.me);
+            submitRequest(si);
+        }
+        cnxn.incrOutstandingRequests(h);
+    }
 }

Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ZooKeeperServerBean.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ZooKeeperServerBean.java?rev=986575&r1=986574&r2=986575&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ZooKeeperServerBean.java (original)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ZooKeeperServerBean.java Wed Aug 18 06:24:08 2010
@@ -90,16 +90,16 @@ public class ZooKeeperServerBean impleme
     }
 
     public int getMaxClientCnxnsPerHost() {
-        NIOServerCnxn.Factory fac = zks.getServerCnxnFactory();
+        ServerCnxnFactory fac = zks.getServerCnxnFactory();
         if (fac == null) {
             return -1;
         }
-        return fac.getMaxClientCnxns();
+        return fac.getMaxClientCnxnsPerHost();
     }
 
     public void setMaxClientCnxnsPerHost(int max) {
         // if fac is null the exception will be propagated to the client
-        zks.getServerCnxnFactory().maxClientCnxns = max;
+        zks.getServerCnxnFactory().setMaxClientCnxnsPerHost(max);
     }
 
     public int getMinSessionTimeout() {

Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ZooKeeperServerMain.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ZooKeeperServerMain.java?rev=986575&r1=986574&r2=986575&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ZooKeeperServerMain.java (original)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ZooKeeperServerMain.java Wed Aug 18 06:24:08 2010
@@ -38,7 +38,7 @@ public class ZooKeeperServerMain {
     private static final String USAGE =
         "Usage: ZooKeeperServerMain configfile | port datadir [ticktime] [maxcnxns]";
 
-    private NIOServerCnxn.Factory cnxnFactory;
+    private ServerCnxnFactory cnxnFactory;
 
     /*
      * Start up the ZooKeeper server.
@@ -105,7 +105,8 @@ public class ZooKeeperServerMain {
             zkServer.setTickTime(config.tickTime);
             zkServer.setMinSessionTimeout(config.minSessionTimeout);
             zkServer.setMaxSessionTimeout(config.maxSessionTimeout);
-            cnxnFactory = new NIOServerCnxn.Factory(config.getClientPortAddress(),
+            cnxnFactory = ServerCnxnFactory.createFactory();
+            cnxnFactory.configure(config.getClientPortAddress(),
                     config.getMaxClientCnxns());
             cnxnFactory.startup(zkServer);
             cnxnFactory.join();

Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/auth/DigestAuthenticationProvider.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/auth/DigestAuthenticationProvider.java?rev=986575&r1=986574&r2=986575&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/auth/DigestAuthenticationProvider.java (original)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/auth/DigestAuthenticationProvider.java Wed Aug 18 06:24:08 2010
@@ -102,9 +102,9 @@ public class DigestAuthenticationProvide
         try {
             String digest = generateDigest(id);
             if (digest.equals(superDigest)) {
-                cnxn.getAuthInfo().add(new Id("super", ""));
+                cnxn.addAuthInfo(new Id("super", ""));
             }
-            cnxn.getAuthInfo().add(new Id(getScheme(), digest));
+            cnxn.addAuthInfo(new Id(getScheme(), digest));
             return KeeperException.Code.OK;
         } catch (NoSuchAlgorithmException e) {
             LOG.error("Missing algorithm",e);

Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/auth/IPAuthenticationProvider.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/auth/IPAuthenticationProvider.java?rev=986575&r1=986574&r2=986575&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/auth/IPAuthenticationProvider.java (original)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/auth/IPAuthenticationProvider.java Wed Aug 18 06:24:08 2010
@@ -18,9 +18,9 @@
 
 package org.apache.zookeeper.server.auth;
 
+import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.data.Id;
 import org.apache.zookeeper.server.ServerCnxn;
-import org.apache.zookeeper.KeeperException;
 
 public class IPAuthenticationProvider implements AuthenticationProvider {
 
@@ -31,8 +31,8 @@ public class IPAuthenticationProvider im
     public KeeperException.Code
         handleAuthentication(ServerCnxn cnxn, byte[] authData)
     {
-        String id = cnxn.getRemoteAddress().getAddress().getHostAddress();
-        cnxn.getAuthInfo().add(new Id(getScheme(), id));
+        String id = cnxn.getRemoteSocketAddress().getAddress().getHostAddress();
+        cnxn.addAuthInfo(new Id(getScheme(), id));
         return KeeperException.Code.OK;
     }
 

Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/CommitProcessor.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/CommitProcessor.java?rev=986575&r1=986574&r2=986575&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/CommitProcessor.java (original)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/CommitProcessor.java Wed Aug 18 06:24:08 2010
@@ -176,6 +176,7 @@ public class CommitProcessor extends Thr
     }
 
     public void shutdown() {
+        LOG.info("Shutting down");
         synchronized (this) {
             finished = true;
             queuedRequests.clear();

Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FastLeaderElection.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FastLeaderElection.java?rev=986575&r1=986574&r2=986575&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FastLeaderElection.java (original)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FastLeaderElection.java Wed Aug 18 06:24:08 2010
@@ -383,14 +383,14 @@ public class FastLeaderElection implemen
             this.ws = new WorkerSender(manager);
 
             Thread t = new Thread(this.ws,
-                    "WorkerSender Thread");
+                    "WorkerSender(" + Thread.currentThread().getName() + ")");
             t.setDaemon(true);
             t.start();
 
             this.wr = new WorkerReceiver(manager);
 
             t = new Thread(this.wr,
-                                    "WorkerReceiver Thread");
+                    "WorkerReceiver(" + Thread.currentThread().getName() + ")");
             t.setDaemon(true);
             t.start();
         }

Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Follower.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Follower.java?rev=986575&r1=986574&r2=986575&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Follower.java (original)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Follower.java Wed Aug 18 06:24:08 2010
@@ -59,8 +59,8 @@ public class Follower extends Learner{
      */
     void followLeader() throws InterruptedException {
         fzk.registerJMX(new FollowerBean(this, zk), self.jmxLocalPeerBean);
-        try {            
-            InetSocketAddress addr = findLeader();            
+        try {
+            InetSocketAddress addr = findLeader();
             try {
                 connectToLeader(addr);
                 long newLeaderZxid = registerWithLeader(Leader.FOLLOWERINFO);
@@ -76,8 +76,8 @@ public class Follower extends Learner{
                 QuorumPacket qp = new QuorumPacket();
                 while (self.isRunning()) {
                     readPacket(qp);
-                    processPacket(qp);                   
-                }                              
+                    processPacket(qp);
+                }
             } catch (IOException e) {
                 LOG.warn("Exception when following the leader", e);
                 try {
@@ -137,7 +137,6 @@ public class Follower extends Learner{
         }
     }
 
-
     /**
      * The zxid of the last operation seen
      * @return zxid

Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FollowerRequestProcessor.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FollowerRequestProcessor.java?rev=986575&r1=986574&r2=986575&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FollowerRequestProcessor.java (original)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FollowerRequestProcessor.java Wed Aug 18 06:24:08 2010
@@ -100,6 +100,7 @@ public class FollowerRequestProcessor ex
     }
 
     public void shutdown() {
+        LOG.info("Shutting down");
         finished = true;
         queuedRequests.clear();
         queuedRequests.add(Request.requestOfDeath);

Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FollowerZooKeeperServer.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FollowerZooKeeperServer.java?rev=986575&r1=986574&r2=986575&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FollowerZooKeeperServer.java (original)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FollowerZooKeeperServer.java Wed Aug 18 06:24:08 2010
@@ -80,7 +80,7 @@ public class FollowerZooKeeperServer ext
                 new SendAckRequestProcessor((Learner)getFollower()));
         syncProcessor.start();
     }
-    
+
     LinkedBlockingQueue<Request> pendingTxns = new LinkedBlockingQueue<Request>();
 
     public void logRequest(TxnHeader hdr, Record txn) {
@@ -135,6 +135,7 @@ public class FollowerZooKeeperServer ext
     
     @Override
     public void shutdown() {
+        LOG.info("Shutting down");
         try {
             super.shutdown();
         } catch (Exception e) {

Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Leader.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Leader.java?rev=986575&r1=986574&r2=986575&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Leader.java (original)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Leader.java Wed Aug 18 06:24:08 2010
@@ -383,6 +383,8 @@ public class Leader {
      * Close down all the LearnerHandlers
      */
     void shutdown(String reason) {
+        LOG.info("Shutting down");
+
         if (isShutdown) {
             return;
         }
@@ -402,7 +404,7 @@ public class Leader {
             LOG.warn("Ignoring unexpected exception during close",e);
         }
         // clear all the connections
-        self.cnxnFactory.clear();
+        self.cnxnFactory.closeAll();
         // shutdown the previous zk
         if (zk != null) {
             zk.shutdown();
@@ -551,6 +553,7 @@ public class Leader {
          * @see org.apache.zookeeper.server.RequestProcessor#shutdown()
          */
         public void shutdown() {
+            LOG.info("Shutting down");
             next.shutdown();
         }
     }

Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LeaderZooKeeperServer.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LeaderZooKeeperServer.java?rev=986575&r1=986574&r2=986575&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LeaderZooKeeperServer.java (original)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LeaderZooKeeperServer.java Wed Aug 18 06:24:08 2010
@@ -163,7 +163,7 @@ public class LeaderZooKeeperServer exten
     
     @Override
     protected void revalidateSession(ServerCnxn cnxn, long sessionId,
-        int sessionTimeout) throws IOException, InterruptedException {
+        int sessionTimeout) throws IOException {
         super.revalidateSession(cnxn, sessionId, sessionTimeout);
         try {
             // setowner as the leader itself, unless updated

Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Learner.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Learner.java?rev=986575&r1=986574&r2=986575&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Learner.java (original)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Learner.java Wed Aug 18 06:24:08 2010
@@ -88,10 +88,9 @@ public class Learner {       
      *                the timeout for which the session is valid
      * @return
      * @throws IOException
-     * @throws InterruptedException
      */
     void validateSession(ServerCnxn cnxn, long clientId, int timeout)
-            throws IOException, InterruptedException {
+            throws IOException {
         LOG.info("Revalidating client: 0x" + Long.toHexString(clientId));
         ByteArrayOutputStream baos = new ByteArrayOutputStream();
         DataOutputStream dos = new DataOutputStream(baos);
@@ -336,7 +335,7 @@ public class Learner {       
                         + Long.toHexString(sessionId)
                         + " for validation");
             } else {
-                cnxn.finishSessionInit(valid);
+                zk.finishSessionInit(cnxn, valid);
             }
         }
         if (LOG.isTraceEnabled()) {
@@ -369,7 +368,7 @@ public class Learner {       
         // set the zookeeper server to null
         self.cnxnFactory.setZooKeeperServer(null);
         // clear all the connections
-        self.cnxnFactory.clear();
+        self.cnxnFactory.closeAll();
         // shutdown previous zookeeper
         if (zk != null) {
             zk.shutdown();

Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LearnerZooKeeperServer.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LearnerZooKeeperServer.java?rev=986575&r1=986574&r2=986575&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LearnerZooKeeperServer.java (original)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LearnerZooKeeperServer.java Wed Aug 18 06:24:08 2010
@@ -77,7 +77,7 @@ public abstract class LearnerZooKeeperSe
     
     @Override
     protected void revalidateSession(ServerCnxn cnxn, long sessionId,
-            int sessionTimeout) throws IOException, InterruptedException {
+            int sessionTimeout) throws IOException {
         getLearner().validateSession(cnxn, sessionId, sessionTimeout);
     }
     

Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LocalPeerBean.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LocalPeerBean.java?rev=986575&r1=986574&r2=986575&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LocalPeerBean.java (original)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LocalPeerBean.java Wed Aug 18 06:24:08 2010
@@ -18,8 +18,6 @@
 
 package org.apache.zookeeper.server.quorum;
 
-import org.apache.zookeeper.server.NIOServerCnxn;
-import org.apache.zookeeper.server.quorum.QuorumPeer;
 
 /**
  * Implementation of the local peer MBean interface.
@@ -44,11 +42,7 @@ public class LocalPeerBean extends Serve
     }
     
     public int getMaxClientCnxnsPerHost() {
-        NIOServerCnxn.Factory fac = peer.getCnxnFactory();
-        if (fac == null) {
-            return -1;
-        }
-        return fac.getMaxClientCnxns();
+        return peer.getMaxClientCnxnsPerHost();
     }
 
     public int getMinSessionTimeout() {

Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/ObserverRequestProcessor.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/ObserverRequestProcessor.java?rev=986575&r1=986574&r2=986575&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/ObserverRequestProcessor.java (original)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/ObserverRequestProcessor.java Wed Aug 18 06:24:08 2010
@@ -114,6 +114,7 @@ public class ObserverRequestProcessor ex
      * Shutdown the processor.
      */
     public void shutdown() {
+        LOG.info("Shutting down");
         finished = true;
         queuedRequests.clear();
         queuedRequests.add(Request.requestOfDeath);

Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/ProposalRequestProcessor.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/ProposalRequestProcessor.java?rev=986575&r1=986574&r2=986575&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/ProposalRequestProcessor.java (original)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/ProposalRequestProcessor.java Wed Aug 18 06:24:08 2010
@@ -18,15 +18,20 @@
 
 package org.apache.zookeeper.server.quorum;
 
+import org.apache.log4j.Logger;
 import org.apache.zookeeper.server.Request;
 import org.apache.zookeeper.server.RequestProcessor;
 import org.apache.zookeeper.server.SyncRequestProcessor;
+import org.apache.zookeeper.server.ZooKeeperServer;
 
 /**
  * This RequestProcessor simply forwards requests to an AckRequestProcessor and
  * SyncRequestProcessor.
  */
 public class ProposalRequestProcessor implements RequestProcessor {
+    private static final Logger LOG =
+        Logger.getLogger(ProposalRequestProcessor.class);
+
     LeaderZooKeeperServer zks;
     
     RequestProcessor nextProcessor;
@@ -75,6 +80,7 @@ public class ProposalRequestProcessor im
     }
 
     public void shutdown() {
+        LOG.info("Shutting down");
         nextProcessor.shutdown();
         syncProcessor.shutdown();
     }

Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java?rev=986575&r1=986574&r2=986575&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java (original)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java Wed Aug 18 06:24:08 2010
@@ -433,9 +433,11 @@ public class QuorumCnxManager {
                 try {
                     ss = ServerSocketChannel.open();
                     int port = self.quorumPeers.get(self.getId()).electionAddr.getPort();
-                    LOG.info("My election bind port: " + port);
-                    ss.socket().setReuseAddress(true); 
-                    ss.socket().bind(new InetSocketAddress(port));
+                    ss.socket().setReuseAddress(true);
+                    InetSocketAddress addr = new InetSocketAddress(port);
+                    LOG.info("My election bind port: " + addr.toString());
+                    setName(addr.toString());
+                    ss.socket().bind(addr);
 
                     while (!shutdown) {
                         SocketChannel client = ss.accept();

Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java?rev=986575&r1=986574&r2=986575&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java (original)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java Wed Aug 18 06:24:08 2010
@@ -25,6 +25,7 @@ import java.net.InetSocketAddress;
 import java.net.SocketException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
@@ -33,7 +34,7 @@ import java.util.Map;
 import org.apache.log4j.Logger;
 import org.apache.zookeeper.jmx.MBeanRegistry;
 import org.apache.zookeeper.jmx.ZKMBeanInfo;
-import org.apache.zookeeper.server.NIOServerCnxn;
+import org.apache.zookeeper.server.ServerCnxnFactory;
 import org.apache.zookeeper.server.ZKDatabase;
 import org.apache.zookeeper.server.ZooKeeperServer;
 import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
@@ -73,7 +74,7 @@ public class QuorumPeer extends Thread i
     QuorumBean jmxQuorumBean;
     LocalPeerBean jmxLocalPeerBean;
     LeaderElectionBean jmxLeaderElectionBean;
-    
+
     /* ZKDatabase is a top level member of quorumpeer 
      * which will be used in all the zookeeperservers
      * instantiated later. Also, it is created once on 
@@ -81,14 +82,6 @@ public class QuorumPeer extends Thread i
      * message from the leader
      */
     private ZKDatabase zkDb;
-    
-    /**
-     * Create an instance of a quorum peer
-     */
-    public interface Factory{
-        public QuorumPeer create(NIOServerCnxn.Factory cnxnFactory) throws IOException;
-        public NIOServerCnxn.Factory createConnectionFactory() throws IOException;
-    }
 
     public static class QuorumServer {
         public QuorumServer(long id, InetSocketAddress addr,
@@ -343,7 +336,7 @@ public class QuorumPeer extends Thread i
 
     Election electionAlg;
 
-    NIOServerCnxn.Factory cnxnFactory;
+    ServerCnxnFactory cnxnFactory;
     private FileTxnSnapLog logFactory = null;
 
     private final QuorumStats quorumStats;
@@ -361,7 +354,7 @@ public class QuorumPeer extends Thread i
     public QuorumPeer(Map<Long, QuorumServer> quorumPeers, File dataDir,
             File dataLogDir, int electionType,
             long myid, int tickTime, int initLimit, int syncLimit,
-            NIOServerCnxn.Factory cnxnFactory) throws IOException {
+            ServerCnxnFactory cnxnFactory) throws IOException {
         this(quorumPeers, dataDir, dataLogDir, electionType, myid, tickTime, 
         		initLimit, syncLimit, cnxnFactory, 
         		new QuorumMaj(countParticipants(quorumPeers)));
@@ -370,7 +363,7 @@ public class QuorumPeer extends Thread i
     public QuorumPeer(Map<Long, QuorumServer> quorumPeers, File dataDir,
             File dataLogDir, int electionType,
             long myid, int tickTime, int initLimit, int syncLimit,
-            NIOServerCnxn.Factory cnxnFactory, 
+            ServerCnxnFactory cnxnFactory, 
             QuorumVerifier quorumConfig) throws IOException {
         this();
         this.cnxnFactory = cnxnFactory;
@@ -459,8 +452,7 @@ public class QuorumPeer extends Thread i
     {
         this(quorumPeers, snapDir, logDir, electionAlg,
                 myid,tickTime, initLimit,syncLimit,
-                new NIOServerCnxn.Factory(
-                        new InetSocketAddress(clientPort)),
+                ServerCnxnFactory.createFactory(new InetSocketAddress(clientPort), -1),
                 new QuorumMaj(countParticipants(quorumPeers)));
     }
     
@@ -476,8 +468,8 @@ public class QuorumPeer extends Thread i
     {
         this(quorumPeers, snapDir, logDir, electionAlg,
                 myid,tickTime, initLimit,syncLimit,
-                new NIOServerCnxn.Factory(new InetSocketAddress(clientPort)),
-                    quorumConfig);
+                ServerCnxnFactory.createFactory(new InetSocketAddress(clientPort), -1),
+                quorumConfig);
     }
     
     /**
@@ -759,7 +751,9 @@ public class QuorumPeer extends Thread i
         synchronized (this) {
             if (leader != null) {
                 synchronized (leader.learners) {
-                    for (LearnerHandler fh : leader.learners) {
+                    for (LearnerHandler fh :
+                        (Collection<LearnerHandler>)leader.learners)
+                    {
                         if (fh.getSocket() == null)
                             continue;
                         String s = fh.getSocket().getRemoteSocketAddress().toString();
@@ -819,6 +813,15 @@ public class QuorumPeer extends Thread i
         this.tickTime = tickTime;
     }
 
+    /** Maximum number of connections allowed from particular host (ip) */
+    public int getMaxClientCnxnsPerHost() {
+        ServerCnxnFactory fac = getCnxnFactory();
+        if (fac == null) {
+            return -1;
+        }
+        return fac.getMaxClientCnxnsPerHost();
+    }
+    
     /** minimum session timeout in milliseconds */
     public int getMinSessionTimeout() {
         return minSessionTimeout == -1 ? tickTime * 2 : minSessionTimeout;
@@ -912,11 +915,11 @@ public class QuorumPeer extends Thread i
         this.electionType = electionType;
     }
 
-    public NIOServerCnxn.Factory getCnxnFactory() {
+    public ServerCnxnFactory getCnxnFactory() {
         return cnxnFactory;
     }
 
-    public void setCnxnFactory(NIOServerCnxn.Factory cnxnFactory) {
+    public void setCnxnFactory(ServerCnxnFactory cnxnFactory) {
         this.cnxnFactory = cnxnFactory;
     }
 

Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeerMain.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeerMain.java?rev=986575&r1=986574&r2=986575&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeerMain.java (original)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeerMain.java Wed Aug 18 06:24:08 2010
@@ -24,7 +24,7 @@ import javax.management.JMException;
 
 import org.apache.log4j.Logger;
 import org.apache.zookeeper.jmx.ManagedUtil;
-import org.apache.zookeeper.server.NIOServerCnxn;
+import org.apache.zookeeper.server.ServerCnxnFactory;
 import org.apache.zookeeper.server.ZKDatabase;
 import org.apache.zookeeper.server.ZooKeeperServerMain;
 import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
@@ -118,9 +118,9 @@ public class QuorumPeerMain {
   
       LOG.info("Starting quorum peer");
       try {
-          NIOServerCnxn.Factory cnxnFactory =
-              new NIOServerCnxn.Factory(config.getClientPortAddress(),
-                      config.getMaxClientCnxns());
+          ServerCnxnFactory cnxnFactory = ServerCnxnFactory.createFactory();
+          cnxnFactory.configure(config.getClientPortAddress(),
+                                config.getMaxClientCnxns());
   
           quorumPeer = new QuorumPeer();
           quorumPeer.setClientPortAddress(config.getClientPortAddress());

Modified: hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/server/CRCTest.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/server/CRCTest.java?rev=986575&r1=986574&r2=986575&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/server/CRCTest.java (original)
+++ hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/server/CRCTest.java Wed Aug 18 06:24:08 2010
@@ -26,7 +26,6 @@ import java.io.FileInputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.RandomAccessFile;
-import java.net.InetSocketAddress;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
@@ -116,8 +115,7 @@ public class CRCTest extends ZKTestCase 
         ZooKeeperServer zks = new ZooKeeperServer(tmpDir, tmpDir, 3000);
         SyncRequestProcessor.setSnapCount(150);
         final int PORT = Integer.parseInt(HOSTPORT.split(":")[1]);
-        NIOServerCnxn.Factory f = new NIOServerCnxn.Factory(
-                new InetSocketAddress(PORT));
+        ServerCnxnFactory f = ServerCnxnFactory.createFactory(PORT, -1);
         f.startup(zks);
         LOG.info("starting up the zookeeper server .. waiting");
         Assert.assertTrue("waiting for server being up",

Modified: hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/server/InvalidSnapshotTest.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/server/InvalidSnapshotTest.java?rev=986575&r1=986574&r2=986575&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/server/InvalidSnapshotTest.java (original)
+++ hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/server/InvalidSnapshotTest.java Wed Aug 18 06:24:08 2010
@@ -20,7 +20,6 @@ package org.apache.zookeeper.server;
 
 import java.io.File;
 import java.io.RandomAccessFile;
-import java.net.InetSocketAddress;
 
 import org.apache.log4j.Logger;
 import org.apache.zookeeper.CreateMode;
@@ -60,8 +59,7 @@ public class InvalidSnapshotTest extends
        ZooKeeperServer zks = new ZooKeeperServer(tmpDir, tmpDir, 3000);
        SyncRequestProcessor.setSnapCount(100);
        final int PORT = Integer.parseInt(HOSTPORT.split(":")[1]);
-       NIOServerCnxn.Factory f = new NIOServerCnxn.Factory(
-               new InetSocketAddress(PORT));
+       ServerCnxnFactory f = ServerCnxnFactory.createFactory(PORT, -1);
        f.startup(zks);
        Assert.assertTrue("waiting for server being up ",
                ClientBase.waitForServerUp(HOSTPORT,CONNECTION_TIMEOUT));
@@ -85,7 +83,7 @@ public class InvalidSnapshotTest extends
        // now restart the server and see if it starts
        zks = new ZooKeeperServer(tmpDir, tmpDir, 3000);
        SyncRequestProcessor.setSnapCount(100);
-       f = new NIOServerCnxn.Factory(new InetSocketAddress(PORT));
+       f = ServerCnxnFactory.createFactory(PORT, -1);
        f.startup(zks);
        Assert.assertTrue("waiting for server being up ",
                ClientBase.waitForServerUp(HOSTPORT,CONNECTION_TIMEOUT));

Modified: hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/ACLTest.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/ACLTest.java?rev=986575&r1=986574&r2=986575&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/ACLTest.java (original)
+++ hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/ACLTest.java Wed Aug 18 06:24:08 2010
@@ -21,7 +21,6 @@ package org.apache.zookeeper.test;
 import static org.apache.zookeeper.test.ClientBase.CONNECTION_TIMEOUT;
 
 import java.io.File;
-import java.net.InetSocketAddress;
 import java.util.ArrayList;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
@@ -37,7 +36,7 @@ import org.apache.zookeeper.Watcher.Even
 import org.apache.zookeeper.ZooDefs.Ids;
 import org.apache.zookeeper.data.ACL;
 import org.apache.zookeeper.data.Id;
-import org.apache.zookeeper.server.NIOServerCnxn;
+import org.apache.zookeeper.server.ServerCnxnFactory;
 import org.apache.zookeeper.server.SyncRequestProcessor;
 import org.apache.zookeeper.server.ZooKeeperServer;
 import org.junit.Assert;
@@ -56,25 +55,26 @@ public class ACLTest extends ZKTestCase 
         ZooKeeperServer zks = new ZooKeeperServer(tmpDir, tmpDir, 3000);
         SyncRequestProcessor.setSnapCount(1000);
         final int PORT = Integer.parseInt(HOSTPORT.split(":")[1]);
-        NIOServerCnxn.Factory f = new NIOServerCnxn.Factory(
-                new InetSocketAddress(PORT));
+        ServerCnxnFactory f = ServerCnxnFactory.createFactory(PORT, -1);
         f.startup(zks);
-        LOG.info("starting up the zookeeper server .. waiting");
-        Assert.assertTrue("waiting for server being up",
-                ClientBase.waitForServerUp(HOSTPORT, CONNECTION_TIMEOUT));
-        ZooKeeper zk = new ZooKeeper(HOSTPORT, CONNECTION_TIMEOUT, this);
         try {
-            zk.addAuthInfo("digest", "pat:test".getBytes());
-            zk.setACL("/", Ids.CREATOR_ALL_ACL, -1);
+            LOG.info("starting up the zookeeper server .. waiting");
+            Assert.assertTrue("waiting for server being up",
+                    ClientBase.waitForServerUp(HOSTPORT, CONNECTION_TIMEOUT));
+            ZooKeeper zk = new ZooKeeper(HOSTPORT, CONNECTION_TIMEOUT, this);
+            try {
+                zk.addAuthInfo("digest", "pat:test".getBytes());
+                zk.setACL("/", Ids.CREATOR_ALL_ACL, -1);
+            } finally {
+                zk.close();
+            }
         } finally {
-            zk.close();
-        }
+            f.shutdown();
 
-        f.shutdown();
-
-        Assert.assertTrue("waiting for server down",
-                   ClientBase.waitForServerDown(HOSTPORT,
-                           ClientBase.CONNECTION_TIMEOUT));
+            Assert.assertTrue("waiting for server down",
+                    ClientBase.waitForServerDown(HOSTPORT,
+                            ClientBase.CONNECTION_TIMEOUT));
+        }
     }
 
     /**
@@ -89,74 +89,78 @@ public class ACLTest extends ZKTestCase 
         ZooKeeperServer zks = new ZooKeeperServer(tmpDir, tmpDir, 3000);
         SyncRequestProcessor.setSnapCount(1000);
         final int PORT = Integer.parseInt(HOSTPORT.split(":")[1]);
-        NIOServerCnxn.Factory f = new NIOServerCnxn.Factory(
-                new InetSocketAddress(PORT));
+        ServerCnxnFactory f = ServerCnxnFactory.createFactory(PORT, -1);
         f.startup(zks);
-        LOG.info("starting up the zookeeper server .. waiting");
-        Assert.assertTrue("waiting for server being up",
-                ClientBase.waitForServerUp(HOSTPORT, CONNECTION_TIMEOUT));
-        ZooKeeper zk = new ZooKeeper(HOSTPORT, CONNECTION_TIMEOUT, this);
+        ZooKeeper zk;
         String path;
-        LOG.info("starting creating acls");
-        for (int i = 0; i < 100; i++) {
-            path = "/" + i;
-            zk.create(path, path.getBytes(), Ids.OPEN_ACL_UNSAFE,
-                    CreateMode.PERSISTENT);
-        }
-        Assert.assertTrue("size of the acl map ", (1 == zks.getZKDatabase().getAclSize()));
-        for (int j = 100; j < 200; j++) {
-            path = "/" + j;
-            ACL acl = new ACL();
-            acl.setPerms(0);
-            Id id = new Id();
-            id.setId("1.1.1."+j);
-            id.setScheme("ip");
-            acl.setId(id);
-            ArrayList<ACL> list = new ArrayList<ACL>();
-            list.add(acl);
-            zk.create(path, path.getBytes(), list, CreateMode.PERSISTENT);
+        try {
+            LOG.info("starting up the zookeeper server .. waiting");
+            Assert.assertTrue("waiting for server being up",
+                    ClientBase.waitForServerUp(HOSTPORT, CONNECTION_TIMEOUT));
+            zk = new ZooKeeper(HOSTPORT, CONNECTION_TIMEOUT, this);
+            LOG.info("starting creating acls");
+            for (int i = 0; i < 100; i++) {
+                path = "/" + i;
+                zk.create(path, path.getBytes(), Ids.OPEN_ACL_UNSAFE,
+                        CreateMode.PERSISTENT);
+            }
+            Assert.assertTrue("size of the acl map ", (1 == zks.getZKDatabase().getAclSize()));
+            for (int j = 100; j < 200; j++) {
+                path = "/" + j;
+                ACL acl = new ACL();
+                acl.setPerms(0);
+                Id id = new Id();
+                id.setId("1.1.1."+j);
+                id.setScheme("ip");
+                acl.setId(id);
+                ArrayList<ACL> list = new ArrayList<ACL>();
+                list.add(acl);
+                zk.create(path, path.getBytes(), list, CreateMode.PERSISTENT);
+            }
+            Assert.assertTrue("size of the acl map ", (101 == zks.getZKDatabase().getAclSize()));
+        } finally {
+            // now shutdown the server and restart it
+            f.shutdown();
+            Assert.assertTrue("waiting for server down",
+                    ClientBase.waitForServerDown(HOSTPORT, CONNECTION_TIMEOUT));
         }
-        Assert.assertTrue("size of the acl map ", (101 == zks.getZKDatabase().getAclSize()));
-        // now shutdown the server and restart it
-        f.shutdown();
-        Assert.assertTrue("waiting for server down",
-                ClientBase.waitForServerDown(HOSTPORT, CONNECTION_TIMEOUT));
         startSignal = new CountDownLatch(1);
 
         zks = new ZooKeeperServer(tmpDir, tmpDir, 3000);
-        f = new NIOServerCnxn.Factory(new InetSocketAddress(PORT));
+        f = ServerCnxnFactory.createFactory(PORT, -1);
 
         f.startup(zks);
-
-        Assert.assertTrue("waiting for server up",
-                   ClientBase.waitForServerUp(HOSTPORT, CONNECTION_TIMEOUT));
-
-        startSignal.await(CONNECTION_TIMEOUT,
-                TimeUnit.MILLISECONDS);
-        Assert.assertTrue("count == 0", startSignal.getCount() == 0);
-
-        Assert.assertTrue("acl map ", (101 == zks.getZKDatabase().getAclSize()));
-        for (int j = 200; j < 205; j++) {
-            path = "/" + j;
-            ACL acl = new ACL();
-            acl.setPerms(0);
-            Id id = new Id();
-            id.setId("1.1.1."+j);
-            id.setScheme("ip");
-            acl.setId(id);
-            ArrayList<ACL> list = new ArrayList<ACL>();
-            list.add(acl);
-            zk.create(path, path.getBytes(), list, CreateMode.PERSISTENT);
+        try {
+            Assert.assertTrue("waiting for server up",
+                       ClientBase.waitForServerUp(HOSTPORT, CONNECTION_TIMEOUT));
+    
+            startSignal.await(CONNECTION_TIMEOUT,
+                    TimeUnit.MILLISECONDS);
+            Assert.assertTrue("count == 0", startSignal.getCount() == 0);
+    
+            Assert.assertTrue("acl map ", (101 == zks.getZKDatabase().getAclSize()));
+            for (int j = 200; j < 205; j++) {
+                path = "/" + j;
+                ACL acl = new ACL();
+                acl.setPerms(0);
+                Id id = new Id();
+                id.setId("1.1.1."+j);
+                id.setScheme("ip");
+                acl.setId(id);
+                ArrayList<ACL> list = new ArrayList<ACL>();
+                list.add(acl);
+                zk.create(path, path.getBytes(), list, CreateMode.PERSISTENT);
+            }
+            Assert.assertTrue("acl map ", (106 == zks.getZKDatabase().getAclSize()));
+    
+            zk.close();
+        } finally {
+            f.shutdown();
+    
+            Assert.assertTrue("waiting for server down",
+                       ClientBase.waitForServerDown(HOSTPORT,
+                               ClientBase.CONNECTION_TIMEOUT));
         }
-        Assert.assertTrue("acl map ", (106 == zks.getZKDatabase().getAclSize()));
-
-        zk.close();
-
-        f.shutdown();
-
-        Assert.assertTrue("waiting for server down",
-                   ClientBase.waitForServerDown(HOSTPORT,
-                           ClientBase.CONNECTION_TIMEOUT));
 
     }
 

Modified: hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/AsyncHammerTest.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/AsyncHammerTest.java?rev=986575&r1=986574&r2=986575&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/AsyncHammerTest.java (original)
+++ hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/AsyncHammerTest.java Wed Aug 18 06:24:08 2010
@@ -35,9 +35,7 @@ import org.apache.zookeeper.AsyncCallbac
 import org.apache.zookeeper.ZooDefs.Ids;
 import org.apache.zookeeper.data.Stat;
 import org.apache.zookeeper.test.ClientBase.CountdownWatcher;
-import org.junit.After;
 import org.junit.Assert;
-import org.junit.Before;
 import org.junit.Test;
 
 public class AsyncHammerTest extends ZKTestCase

Modified: hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/ClientBase.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/ClientBase.java?rev=986575&r1=986574&r2=986575&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/ClientBase.java (original)
+++ hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/ClientBase.java Wed Aug 18 06:24:08 2010
@@ -25,7 +25,6 @@ import java.io.InputStreamReader;
 import java.io.OutputStream;
 import java.lang.management.ManagementFactory;
 import java.lang.management.OperatingSystemMXBean;
-import java.net.InetSocketAddress;
 import java.net.Socket;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -45,10 +44,10 @@ import org.apache.zookeeper.PortAssignme
 import org.apache.zookeeper.TestableZooKeeper;
 import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.Watcher.Event.KeeperState;
 import org.apache.zookeeper.ZKTestCase;
 import org.apache.zookeeper.ZooKeeper;
-import org.apache.zookeeper.Watcher.Event.KeeperState;
-import org.apache.zookeeper.server.NIOServerCnxn;
+import org.apache.zookeeper.server.ServerCnxnFactory;
 import org.apache.zookeeper.server.ZKDatabase;
 import org.apache.zookeeper.server.ZooKeeperServer;
 import org.apache.zookeeper.server.persistence.FileTxnLog;
@@ -56,6 +55,7 @@ import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
 
+import com.j2speed.accessor.FieldAccessor;
 import com.sun.management.UnixOperatingSystemMXBean;
 
 public abstract class ClientBase extends ZKTestCase {
@@ -67,8 +67,11 @@ public abstract class ClientBase extends
 
     protected String hostPort = "127.0.0.1:" + PortAssignment.unique();
     protected int maxCnxns = 0;
-    protected NIOServerCnxn.Factory serverFactory = null;
+    protected ServerCnxnFactory serverFactory = null;
     protected File tmpDir = null;
+    
+    long initialFdCount;
+    
     public ClientBase() {
         super();
     }
@@ -330,14 +333,14 @@ public abstract class ClientBase extends
         return Integer.parseInt(portstr);
     }
 
-    static NIOServerCnxn.Factory createNewServerInstance(File dataDir,
-            NIOServerCnxn.Factory factory, String hostPort, int maxCnxns)
+    static ServerCnxnFactory createNewServerInstance(File dataDir,
+            ServerCnxnFactory factory, String hostPort, int maxCnxns)
         throws IOException, InterruptedException
     {
         ZooKeeperServer zks = new ZooKeeperServer(dataDir, dataDir, 3000);
         final int PORT = getPort(hostPort);
         if (factory == null) {
-            factory = new NIOServerCnxn.Factory(new InetSocketAddress(PORT),maxCnxns);
+            factory = ServerCnxnFactory.createFactory(PORT, maxCnxns);
         }
         factory.startup(zks);
         Assert.assertTrue("waiting for server up",
@@ -347,11 +350,16 @@ public abstract class ClientBase extends
         return factory;
     }
 
-    static void shutdownServerInstance(NIOServerCnxn.Factory factory,
+    static void shutdownServerInstance(ServerCnxnFactory factory,
             String hostPort)
     {
         if (factory != null) {
-            ZKDatabase zkDb = factory.getZooKeeperServer().getZKDatabase();
+            ZKDatabase zkDb;
+            {
+                ZooKeeperServer zs = getServer(factory);
+        
+                zkDb = zs.getZKDatabase();
+            }
             factory.shutdown();
             try {
                 zkDb.close();
@@ -385,16 +393,6 @@ public abstract class ClientBase extends
 
     @Before
     public void setUp() throws Exception {
-        setupTestEnv();
-
-        JMXEnv.setUp();
-
-        setUpAll();
-
-        tmpDir = createTmpDir(BASETEST);
-
-        startServer();
-
         /* some useful information - log the number of fds used before
          * and after a test is run. Helps to verify we are freeing resources
          * correctly. Unfortunately this only works on unix systems (the
@@ -405,10 +403,21 @@ public abstract class ClientBase extends
         if (osMbean != null && osMbean instanceof UnixOperatingSystemMXBean) {
             UnixOperatingSystemMXBean unixos =
                 (UnixOperatingSystemMXBean)osMbean;
+            initialFdCount = unixos.getOpenFileDescriptorCount();
             LOG.info("Initial fdcount is: "
-                    + unixos.getOpenFileDescriptorCount());
+                    + initialFdCount);
         }
 
+        setupTestEnv();
+
+        JMXEnv.setUp();
+
+        setUpAll();
+
+        tmpDir = createTmpDir(BASETEST);
+
+        startServer();
+
         LOG.info("Client test setup finished");
     }
 
@@ -427,8 +436,14 @@ public abstract class ClientBase extends
         JMXEnv.ensureOnly();
     }
 
-    protected ZooKeeperServer getServer() {
-        return serverFactory.getZooKeeperServer();
+    protected static ZooKeeperServer getServer(ServerCnxnFactory fac) {
+        // access the private field - test only
+        FieldAccessor<ServerCnxnFactory,ZooKeeperServer> zkServerAcc =
+            new FieldAccessor<ServerCnxnFactory,ZooKeeperServer>
+                    ("zkServer", ServerCnxnFactory.class);
+        ZooKeeperServer zs = zkServerAcc.get(fac);
+
+        return zs;
     }
 
     protected void tearDownAll() throws Exception {
@@ -449,20 +464,6 @@ public abstract class ClientBase extends
     public void tearDown() throws Exception {
         LOG.info("tearDown starting");
 
-        /* some useful information - log the number of fds used before
-         * and after a test is run. Helps to verify we are freeing resources
-         * correctly. Unfortunately this only works on unix systems (the
-         * only place sun has implemented as part of the mgmt bean api.
-         */
-        OperatingSystemMXBean osMbean =
-            ManagementFactory.getOperatingSystemMXBean();
-        if (osMbean != null && osMbean instanceof UnixOperatingSystemMXBean) {
-            UnixOperatingSystemMXBean unixos =
-                (UnixOperatingSystemMXBean)osMbean;
-            LOG.info("fdcount after test is: "
-                    + unixos.getOpenFileDescriptorCount());
-        }
-
         tearDownAll();
 
         stopServer();
@@ -475,6 +476,27 @@ public abstract class ClientBase extends
         serverFactory = null;
 
         JMXEnv.tearDown();
+
+        /* some useful information - log the number of fds used before
+         * and after a test is run. Helps to verify we are freeing resources
+         * correctly. Unfortunately this only works on unix systems (the
+         * only place sun has implemented as part of the mgmt bean api.
+         */
+        OperatingSystemMXBean osMbean =
+            ManagementFactory.getOperatingSystemMXBean();
+        if (osMbean != null && osMbean instanceof UnixOperatingSystemMXBean) {
+            UnixOperatingSystemMXBean unixos =
+                (UnixOperatingSystemMXBean)osMbean;
+            long fdCount = unixos.getOpenFileDescriptorCount();
+            String message = "fdcount after test is: "
+                    + fdCount + " at start it was " + initialFdCount;
+            LOG.info(message);
+            if (fdCount > initialFdCount) {
+                LOG.info("sleeping for 20 secs");
+                //Thread.sleep(60000);
+                //assertTrue(message, fdCount <= initialFdCount);
+            }
+        }
     }
 
     public static MBeanServerConnection jmxConn() throws IOException {

Modified: hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/ClientPortBindTest.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/ClientPortBindTest.java?rev=986575&r1=986574&r2=986575&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/ClientPortBindTest.java (original)
+++ hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/ClientPortBindTest.java Wed Aug 18 06:24:08 2010
@@ -35,7 +35,7 @@ import org.apache.zookeeper.Watcher;
 import org.apache.zookeeper.ZKTestCase;
 import org.apache.zookeeper.ZooKeeper;
 import org.apache.zookeeper.Watcher.Event.KeeperState;
-import org.apache.zookeeper.server.NIOServerCnxn;
+import org.apache.zookeeper.server.ServerCnxnFactory;
 import org.apache.zookeeper.server.ZooKeeperServer;
 import org.junit.Assert;
 import org.junit.Test;
@@ -82,8 +82,8 @@ public class ClientPortBindTest extends 
         ClientBase.setupTestEnv();
         ZooKeeperServer zks = new ZooKeeperServer(tmpDir, tmpDir, 3000);
 
-        NIOServerCnxn.Factory f = new NIOServerCnxn.Factory(
-                new InetSocketAddress(bindAddress, PORT));
+        ServerCnxnFactory f = ServerCnxnFactory.createFactory(
+                new InetSocketAddress(bindAddress, PORT), -1);
         f.startup(zks);
         LOG.info("starting up the the server, waiting");
 

Modified: hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/DisconnectableZooKeeper.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/DisconnectableZooKeeper.java?rev=986575&r1=986574&r2=986575&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/DisconnectableZooKeeper.java (original)
+++ hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/DisconnectableZooKeeper.java Wed Aug 18 06:24:08 2010
@@ -46,4 +46,13 @@ public class DisconnectableZooKeeper ext
         cnxn.disconnect();
     }
 
+    /**
+     * Prevent the client from automatically reconnecting if the connection to the
+     * server is lost
+     */
+    public void dontReconnect() throws Exception {
+        java.lang.reflect.Field f = cnxn.getClass().getDeclaredField("closing");
+        f.setAccessible(true);
+        f.setBoolean(cnxn, true);
+    }
 }

Modified: hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/InvalidSnapshotTest.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/InvalidSnapshotTest.java?rev=986575&r1=986574&r2=986575&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/InvalidSnapshotTest.java (original)
+++ hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/InvalidSnapshotTest.java Wed Aug 18 06:24:08 2010
@@ -21,7 +21,6 @@ package org.apache.zookeeper.test;
 import static org.apache.zookeeper.test.ClientBase.CONNECTION_TIMEOUT;
 
 import java.io.File;
-import java.net.InetSocketAddress;
 import java.util.concurrent.CountDownLatch;
 
 import org.apache.log4j.Logger;
@@ -32,7 +31,7 @@ import org.apache.zookeeper.ZKTestCase;
 import org.apache.zookeeper.ZooKeeper;
 import org.apache.zookeeper.Watcher.Event.KeeperState;
 import org.apache.zookeeper.server.LogFormatter;
-import org.apache.zookeeper.server.NIOServerCnxn;
+import org.apache.zookeeper.server.ServerCnxnFactory;
 import org.apache.zookeeper.server.SyncRequestProcessor;
 import org.apache.zookeeper.server.ZooKeeperServer;
 import org.junit.Assert;
@@ -68,8 +67,7 @@ public class InvalidSnapshotTest extends
         ZooKeeperServer zks = new ZooKeeperServer(snapDir, snapDir, 3000);
         SyncRequestProcessor.setSnapCount(1000);
         final int PORT = Integer.parseInt(HOSTPORT.split(":")[1]);
-        NIOServerCnxn.Factory f = new NIOServerCnxn.Factory(
-                new InetSocketAddress(PORT));
+        ServerCnxnFactory f = ServerCnxnFactory.createFactory(PORT, -1);
         f.startup(zks);
         LOG.info("starting up the zookeeper server .. waiting");
         Assert.assertTrue("waiting for server being up",

Modified: hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/LENonTerminateTest.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/LENonTerminateTest.java?rev=986575&r1=986574&r2=986575&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/LENonTerminateTest.java (original)
+++ hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/LENonTerminateTest.java Wed Aug 18 06:24:08 2010
@@ -25,6 +25,7 @@ import java.net.DatagramSocket;
 import java.net.InetSocketAddress;
 import java.net.SocketException;
 import java.nio.ByteBuffer;
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
@@ -34,7 +35,7 @@ import java.util.concurrent.TimeUnit;
 import org.apache.log4j.Logger;
 import org.apache.zookeeper.PortAssignment;
 import org.apache.zookeeper.ZKTestCase;
-import org.apache.zookeeper.server.NIOServerCnxn;
+import org.apache.zookeeper.server.ServerCnxnFactory;
 import org.apache.zookeeper.server.quorum.Election;
 import org.apache.zookeeper.server.quorum.LeaderElection;
 import org.apache.zookeeper.server.quorum.QuorumPeer;
@@ -93,7 +94,9 @@ public class LENonTerminateTest extends 
                 requestBuffer.putInt(xid);
                 requestPacket.setLength(4);
                 HashSet<Long> heardFrom = new HashSet<Long>();
-                for (QuorumServer server : self.getVotingView().values()) {
+                for (QuorumServer server :
+                    (Collection<QuorumServer>)self.getVotingView().values())
+                {
                     LOG.info("Server address: " + server.addr);
                     try {
                         requestPacket.setSocketAddress(server.addr);
@@ -213,7 +216,7 @@ public class LENonTerminateTest extends 
         {
             super(quorumPeers, snapDir, logDir, electionAlg,
                     myid,tickTime, initLimit,syncLimit,
-                    new NIOServerCnxn.Factory(new InetSocketAddress(clientPort)),
+                    ServerCnxnFactory.createFactory(clientPort, -1),
                     new QuorumMaj(countParticipants(quorumPeers)));
         }
         

Added: hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/NioNettySuiteBase.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/NioNettySuiteBase.java?rev=986575&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/NioNettySuiteBase.java (added)
+++ hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/NioNettySuiteBase.java Wed Aug 18 06:24:08 2010
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.test;
+
+import org.apache.zookeeper.server.NettyServerCnxnFactory;
+import org.apache.zookeeper.server.ServerCnxnFactory;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.runner.RunWith;
+import org.junit.runners.Suite;
+
+/**
+ * Run tests with: Nio Client against Netty server
+ */
+@RunWith(Suite.class)
+public class NioNettySuiteBase {
+    @BeforeClass
+    public static void setUp() {
+        System.setProperty(ServerCnxnFactory.ZOOKEEPER_SERVER_CNXN_FACTORY,
+                NettyServerCnxnFactory.class.getName());
+    }
+
+    @AfterClass
+    public static void tearDown() {
+        System.clearProperty(ServerCnxnFactory.ZOOKEEPER_SERVER_CNXN_FACTORY);
+    }
+}

Added: hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/NioNettySuiteHammerTest.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/NioNettySuiteHammerTest.java?rev=986575&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/NioNettySuiteHammerTest.java (added)
+++ hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/NioNettySuiteHammerTest.java Wed Aug 18 06:24:08 2010
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.test;
+
+import org.junit.runners.Suite;
+
+/**
+ * Run tests with: Nio Client against Netty server
+ */
+@Suite.SuiteClasses({
+        AsyncHammerTest.class
+        })
+public class NioNettySuiteHammerTest extends NioNettySuiteBase {
+}

Added: hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/NioNettySuiteTest.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/NioNettySuiteTest.java?rev=986575&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/NioNettySuiteTest.java (added)
+++ hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/NioNettySuiteTest.java Wed Aug 18 06:24:08 2010
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.test;
+
+import org.junit.runners.Suite;
+
+/**
+ * Run tests with: Nio Client against Netty server
+ */
+@Suite.SuiteClasses({
+        ACLTest.class,
+        AsyncOpsTest.class,
+        ChrootClientTest.class,
+        ClientTest.class,
+        FourLetterWordsTest.class,
+        NullDataTest.class,
+        SessionTest.class,
+        WatcherTest.class
+        })
+public class NioNettySuiteTest extends NioNettySuiteBase {
+}

Modified: hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/OOMTest.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/OOMTest.java?rev=986575&r1=986574&r2=986575&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/OOMTest.java (original)
+++ hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/OOMTest.java Wed Aug 18 06:24:08 2010
@@ -22,7 +22,6 @@ import static org.apache.zookeeper.test.
 
 import java.io.File;
 import java.io.IOException;
-import java.net.InetSocketAddress;
 import java.util.ArrayList;
 
 import org.apache.zookeeper.CreateMode;
@@ -34,7 +33,7 @@ import org.apache.zookeeper.ZKTestCase;
 import org.apache.zookeeper.ZooKeeper;
 import org.apache.zookeeper.ZooDefs.Ids;
 import org.apache.zookeeper.data.Stat;
-import org.apache.zookeeper.server.NIOServerCnxn;
+import org.apache.zookeeper.server.ServerCnxnFactory;
 import org.apache.zookeeper.server.ZooKeeperServer;
 import org.junit.Assert;
 import org.junit.Test;
@@ -42,7 +41,7 @@ import org.junit.Test;
 public class OOMTest extends ZKTestCase implements Watcher {
     @Test
     public void testOOM() throws IOException, InterruptedException, KeeperException {
-        // This test takes too long to run!
+        // This test takes too long tos run!
         if (true)
             return;
         File tmpDir = ClientBase.createTmpDir();
@@ -61,8 +60,7 @@ public class OOMTest extends ZKTestCase 
         ZooKeeperServer zks = new ZooKeeperServer(tmpDir, tmpDir, 3000);
 
         final int PORT = PortAssignment.unique();
-        NIOServerCnxn.Factory f = new NIOServerCnxn.Factory(
-                new InetSocketAddress(PORT));
+        ServerCnxnFactory f = ServerCnxnFactory.createFactory(PORT, -1);
         f.startup(zks);
         Assert.assertTrue("waiting for server up",
                    ClientBase.waitForServerUp("127.0.0.1:" + PORT,

Modified: hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/ObserverTest.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/ObserverTest.java?rev=986575&r1=986574&r2=986575&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/ObserverTest.java (original)
+++ hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/ObserverTest.java Wed Aug 18 06:24:08 2010
@@ -111,18 +111,23 @@ public class ObserverTest extends Quorum
         
         Assert.assertEquals(zk.getState(), States.CONNECTED);
         
+        LOG.info("Shutting down server 2");
         // Now kill one of the other real servers        
         q2.shutdown();
                 
         Assert.assertTrue("Waiting for server 2 to shut down",
                     ClientBase.waitForServerDown("127.0.0.1:"+CLIENT_PORT_QP2, 
                                     ClientBase.CONNECTION_TIMEOUT));
-        
+
+        LOG.info("Server 2 down");
+
         // Now the resulting ensemble shouldn't be quorate         
         latch.await();        
         Assert.assertNotSame("Client is still connected to non-quorate cluster", 
                 KeeperState.SyncConnected,lastEvent.getState());
 
+        LOG.info("Latch returned");
+
         try {
             Assert.assertFalse("Shouldn't get a response when cluster not quorate!",
                     new String(zk.getData("/obstest", null, null)).equals("test"));
@@ -133,14 +138,19 @@ public class ObserverTest extends Quorum
         
         latch = new CountDownLatch(1);
 
+        LOG.info("Restarting server 2");
+
         // Bring it back
         q2 = new MainThread(2, CLIENT_PORT_QP2, quorumCfgSection);
         q2.start();
+        
         LOG.info("Waiting for server 2 to come up");
         Assert.assertTrue("waiting for server 2 being up",
                 ClientBase.waitForServerUp("127.0.0.1:" + CLIENT_PORT_QP2,
                         CONNECTION_TIMEOUT));
         
+        LOG.info("Server 2 started, waiting for latch");
+
         latch.await();
         // It's possible our session expired - but this is ok, shows we 
         // were able to talk to the ensemble
@@ -149,10 +159,14 @@ public class ObserverTest extends Quorum
                 (KeeperState.SyncConnected==lastEvent.getState() ||
                 KeeperState.Expired==lastEvent.getState())); 
 
+        LOG.info("Shutting down all servers");
+
         q1.shutdown();
         q2.shutdown();
         q3.shutdown();
         
+        LOG.info("Closing zk client");
+
         zk.close();        
         Assert.assertTrue("Waiting for server 1 to shut down",
                 ClientBase.waitForServerDown("127.0.0.1:"+CLIENT_PORT_QP1, 

Modified: hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/PurgeTxnTest.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/PurgeTxnTest.java?rev=986575&r1=986574&r2=986575&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/PurgeTxnTest.java (original)
+++ hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/PurgeTxnTest.java Wed Aug 18 06:24:08 2010
@@ -19,7 +19,6 @@
 package org.apache.zookeeper.test;
 
 import java.io.File;
-import java.net.InetSocketAddress;
 import java.util.List;
 
 import org.apache.zookeeper.CreateMode;
@@ -29,8 +28,8 @@ import org.apache.zookeeper.Watcher;
 import org.apache.zookeeper.ZKTestCase;
 import org.apache.zookeeper.ZooKeeper;
 import org.apache.zookeeper.ZooDefs.Ids;
-import org.apache.zookeeper.server.NIOServerCnxn;
 import org.apache.zookeeper.server.PurgeTxnLog;
+import org.apache.zookeeper.server.ServerCnxnFactory;
 import org.apache.zookeeper.server.SyncRequestProcessor;
 import org.apache.zookeeper.server.ZooKeeperServer;
 import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
@@ -52,8 +51,7 @@ public class PurgeTxnTest extends ZKTest
         ZooKeeperServer zks = new ZooKeeperServer(tmpDir, tmpDir, 3000);
         SyncRequestProcessor.setSnapCount(100);
         final int PORT = Integer.parseInt(HOSTPORT.split(":")[1]);
-        NIOServerCnxn.Factory f = new NIOServerCnxn.Factory(
-                new InetSocketAddress(PORT));
+        ServerCnxnFactory f = ServerCnxnFactory.createFactory(PORT, -1);
         f.startup(zks);
         Assert.assertTrue("waiting for server being up ",
                 ClientBase.waitForServerUp(HOSTPORT,CONNECTION_TIMEOUT));

Modified: hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/QuorumBase.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/QuorumBase.java?rev=986575&r1=986574&r2=986575&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/QuorumBase.java (original)
+++ hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/QuorumBase.java Wed Aug 18 06:24:08 2010
@@ -35,6 +35,7 @@ import org.apache.zookeeper.server.quoru
 import org.apache.zookeeper.server.quorum.QuorumPeer.LearnerType;
 import org.apache.zookeeper.server.quorum.QuorumPeer.QuorumServer;
 import org.junit.Assert;
+import org.junit.Test;
 
 import com.sun.management.UnixOperatingSystemMXBean;
 
@@ -55,6 +56,11 @@ public class QuorumBase extends ClientBa
     private int portLE4;
     private int portLE5;
 
+    @Test
+    // This just avoids complaints by junit
+    public void testNull() {
+    }
+    
     @Override
     public void setUp() throws Exception {
         setUp(false);

Modified: hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/QuorumTest.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/QuorumTest.java?rev=986575&r1=986574&r2=986575&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/QuorumTest.java (original)
+++ hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/QuorumTest.java Wed Aug 18 06:24:08 2010
@@ -29,10 +29,10 @@ import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.ZooDefs;
-import org.apache.zookeeper.ZooKeeper;
 import org.apache.zookeeper.Watcher.Event.KeeperState;
+import org.apache.zookeeper.ZooDefs;
 import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.ZooKeeper;
 import org.apache.zookeeper.data.Stat;
 import org.apache.zookeeper.server.quorum.Leader;
 import org.apache.zookeeper.server.quorum.LearnerHandler;
@@ -40,6 +40,7 @@ import org.apache.zookeeper.server.quoru
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
+import org.junit.Ignore;
 import org.junit.Test;
 
 public class QuorumTest extends QuorumBase {
@@ -187,16 +188,21 @@ public class QuorumTest extends QuorumBa
      * @throws KeeperException
      */
     @Test
-    public void testSessionMoved() throws IOException, InterruptedException, KeeperException {
+    @Ignore
+    public void testSessionMoved() throws Exception {
         String hostPorts[] = qb.hostPort.split(",");
-        DisconnectableZooKeeper zk = new DisconnectableZooKeeper(hostPorts[0], ClientBase.CONNECTION_TIMEOUT, new Watcher() {
+        DisconnectableZooKeeper zk = new DisconnectableZooKeeper(hostPorts[0],
+                ClientBase.CONNECTION_TIMEOUT, new Watcher() {
             public void process(WatchedEvent event) {
             }});
         zk.create("/sessionMoveTest", new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
         // we want to loop through the list twice
         for(int i = 0; i < hostPorts.length*2; i++) {
+            zk.dontReconnect();
             // This should stomp the zk handle
-            DisconnectableZooKeeper zknew = new DisconnectableZooKeeper(hostPorts[(i+1)%hostPorts.length], ClientBase.CONNECTION_TIMEOUT,
+            DisconnectableZooKeeper zknew =
+                new DisconnectableZooKeeper(hostPorts[(i+1)%hostPorts.length],
+                    ClientBase.CONNECTION_TIMEOUT,
                     new Watcher() {public void process(WatchedEvent event) {
                     }},
                     zk.getSessionId(),
@@ -207,7 +213,6 @@ public class QuorumTest extends QuorumBa
                 Assert.fail("Should have lost the connection");
             } catch(KeeperException.ConnectionLossException e) {
             }
-            zk.disconnect(); // close w/o closing session
             zk = zknew;
         }
         zk.close();
@@ -227,15 +232,17 @@ public class QuorumTest extends QuorumBa
      * make sure we cannot do any changes.
      */
     @Test
-    public void testSessionMove() throws IOException, InterruptedException, KeeperException {
+    @Ignore
+    public void testSessionMove() throws Exception {
         String hps[] = qb.hostPort.split(",");
         DiscoWatcher oldWatcher = new DiscoWatcher();
-        ZooKeeper zk = new DisconnectableZooKeeper(hps[0],
+        DisconnectableZooKeeper zk = new DisconnectableZooKeeper(hps[0],
                 ClientBase.CONNECTION_TIMEOUT, oldWatcher);
         zk.create("/t1", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
+        zk.dontReconnect();
         // This should stomp the zk handle
         DiscoWatcher watcher = new DiscoWatcher();
-        ZooKeeper zknew = new DisconnectableZooKeeper(hps[1],
+        DisconnectableZooKeeper zknew = new DisconnectableZooKeeper(hps[1],
                 ClientBase.CONNECTION_TIMEOUT, watcher, zk.getSessionId(),
                 zk.getSessionPasswd());
         zknew.create("/t2", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
@@ -257,6 +264,7 @@ public class QuorumTest extends QuorumBa
         toClose.add(zknew);
         // Let's just make sure it can still move
         for(int i = 0; i < 10; i++) {
+            zknew.dontReconnect();
             zknew = new DisconnectableZooKeeper(hps[1],
                     ClientBase.CONNECTION_TIMEOUT, new DiscoWatcher(),
                     zk.getSessionId(), zk.getSessionPasswd());