You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by ph...@apache.org on 2010/08/18 08:24:09 UTC

svn commit: r986575 [1/4] - in /hadoop/zookeeper/trunk: ./ src/docs/src/documentation/content/xdocs/ src/java/libtest/ src/java/main/org/apache/zookeeper/ src/java/main/org/apache/zookeeper/server/ src/java/main/org/apache/zookeeper/server/auth/ src/ja...

Author: phunt
Date: Wed Aug 18 06:24:08 2010
New Revision: 986575

URL: http://svn.apache.org/viewvc?rev=986575&view=rev
Log:
ZOOKEEPER-733. use netty to handle client connections

Added:
    hadoop/zookeeper/trunk/src/java/libtest/
    hadoop/zookeeper/trunk/src/java/libtest/accessive.LICENSE.txt
    hadoop/zookeeper/trunk/src/java/libtest/accessive.jar   (with props)
    hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/NIOServerCnxnFactory.java
    hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/NettyServerCnxn.java
    hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/NettyServerCnxnFactory.java
    hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ServerCnxnFactory.java
    hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/Stats.java
    hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/NioNettySuiteBase.java
    hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/NioNettySuiteHammerTest.java
    hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/NioNettySuiteTest.java
Modified:
    hadoop/zookeeper/trunk/CHANGES.txt
    hadoop/zookeeper/trunk/build.xml
    hadoop/zookeeper/trunk/ivy.xml
    hadoop/zookeeper/trunk/src/docs/src/documentation/content/xdocs/zookeeperAdmin.xml
    hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/ClientCnxn.java
    hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/ZooKeeper.java
    hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ConnectionBean.java
    hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/FinalRequestProcessor.java
    hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/NIOServerCnxn.java
    hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/PrepRequestProcessor.java
    hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ServerCnxn.java
    hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/SessionTrackerImpl.java
    hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/SyncRequestProcessor.java
    hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java
    hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ZooKeeperServerBean.java
    hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ZooKeeperServerMain.java
    hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/auth/DigestAuthenticationProvider.java
    hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/auth/IPAuthenticationProvider.java
    hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/CommitProcessor.java
    hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FastLeaderElection.java
    hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Follower.java
    hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FollowerRequestProcessor.java
    hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FollowerZooKeeperServer.java
    hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Leader.java
    hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LeaderZooKeeperServer.java
    hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Learner.java
    hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LearnerZooKeeperServer.java
    hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LocalPeerBean.java
    hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/ObserverRequestProcessor.java
    hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/ProposalRequestProcessor.java
    hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java
    hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java
    hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeerMain.java
    hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/server/CRCTest.java
    hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/server/InvalidSnapshotTest.java
    hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/ACLTest.java
    hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/AsyncHammerTest.java
    hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/ClientBase.java
    hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/ClientPortBindTest.java
    hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/DisconnectableZooKeeper.java
    hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/InvalidSnapshotTest.java
    hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/LENonTerminateTest.java
    hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/OOMTest.java
    hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/ObserverTest.java
    hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/PurgeTxnTest.java
    hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/QuorumBase.java
    hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/QuorumTest.java
    hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/RecoveryTest.java
    hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/RepeatStartupTest.java
    hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/SessionTest.java
    hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/TruncateTest.java
    hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/UpgradeTest.java
    hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/WatcherTest.java

Modified: hadoop/zookeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/CHANGES.txt?rev=986575&r1=986574&r2=986575&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/CHANGES.txt (original)
+++ hadoop/zookeeper/trunk/CHANGES.txt Wed Aug 18 06:24:08 2010
@@ -107,6 +107,8 @@ IMPROVEMENTS:
 
   ZOOKEEPER-809. Improved REST Interface (Andrei Savu via phunt)
 
+  ZOOKEEPER-733. use netty to handle client connections (breed and phunt)
+
 NEW FEATURES:
   ZOOKEEPER-729. Java client API to recursively delete a subtree.
   (Kay Kay via henry)

Modified: hadoop/zookeeper/trunk/build.xml
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/build.xml?rev=986575&r1=986574&r2=986575&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/build.xml (original)
+++ hadoop/zookeeper/trunk/build.xml Wed Aug 18 06:24:08 2010
@@ -62,6 +62,7 @@
 
     <property name="test.java.build.dir" value="${build.dir}/test"/>
     <property name="test.java.classes" value="${test.java.build.dir}/classes"/>
+    <property name="test.lib.dir" value="${src.dir}/java/libtest" />
     <property name="test.src.dir" value="${src.dir}/java/test"/>
     <property name="systest.src.dir" value="${src.dir}/java/systest"/>
     <property name="test.log.dir" value="${test.java.build.dir}/logs" />
@@ -335,6 +336,10 @@
     </target>
 
     <target name="compile-test" depends="ivy-retrieve-test,compile">
+      <copy todir="${ivy.test.lib}">
+        <fileset dir="${test.lib.dir}" includes="*.jar"/>
+      </copy>
+
       <mkdir dir="${test.java.classes}"/>
       <javac srcdir="${test.src.dir}" destdir="${test.java.classes}"
              target="${javac.target}" debug="on">

Modified: hadoop/zookeeper/trunk/ivy.xml
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/ivy.xml?rev=986575&r1=986574&r2=986575&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/ivy.xml (original)
+++ hadoop/zookeeper/trunk/ivy.xml Wed Aug 18 06:24:08 2010
@@ -32,10 +32,18 @@
     <conf name="releaseaudit" visibility="private" description="Artifacts required for releaseaudit target"/>
   </configurations>
 
+  <publications>
+    <artifact name='org.jboss.netty' type='jar' ext='jar' />
+  </publications>
+
   <dependencies>
     <!-- transitive false turns off dependency checking, log4j deps seem borked -->
-    <dependency org="log4j" name="log4j" rev="1.2.15" transitive="false"/>
-    <dependency org="jline" name="jline" rev="0.9.94" transitive="false"/>
+    <dependency org="log4j" name="log4j" rev="1.2.15" transitive="false" conf="default"/>
+    <dependency org="jline" name="jline" rev="0.9.94" transitive="false" conf="default"/>
+
+    <dependency org="org.jboss.netty" name="netty" conf="default" rev="3.1.5.GA">
+      <artifact name="netty" type="jar" conf="default"/>
+    </dependency>
 
     <dependency org="junit" name="junit" rev="4.8.1" conf="test->default"/>
     <dependency org="checkstyle" name="checkstyle" rev="5.0"

Modified: hadoop/zookeeper/trunk/src/docs/src/documentation/content/xdocs/zookeeperAdmin.xml
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/docs/src/documentation/content/xdocs/zookeeperAdmin.xml?rev=986575&r1=986574&r2=986575&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/docs/src/documentation/content/xdocs/zookeeperAdmin.xml (original)
+++ hadoop/zookeeper/trunk/src/docs/src/documentation/content/xdocs/zookeeperAdmin.xml Wed Aug 18 06:24:08 2010
@@ -1016,6 +1016,43 @@ server.3=zoo3:2888:3888</programlisting>
           </varlistentry>
         </variablelist>
       </section>
+
+      <section>
+        <title>Communication using the Netty framework</title>
+
+        <para><emphasis role="bold">New in
+            3.4:</emphasis> <ulink url="http://jboss.org/netty">Netty</ulink>
+            is an NIO based client/server communication framework, it
+            simplifies (over NIO being used directly) many of the
+            complexities of network level communication for java
+            applications. Additionally the Netty framework has built
+            in support for encryption (SSL) and authentication
+            (certificates). These are optional features and can be
+            turned on or off individually.
+        </para>
+        <para>Prior to version 3.4 ZooKeeper has always used NIO
+            directly, however in versions 3.4 and later Netty is
+            supported as an option to NIO (replaces). NIO continues to
+            be the default, however Netty based communication can be
+            used in place of NIO by setting the environment variable
+            "zookeeper.serverCnxnFactory" to
+            "org.apache.zookeeper.server.NettyServerCnxnFactory". You
+            have the option of setting this on either the client(s) or
+            server(s), typically you would want to set this on both,
+            however that is at your discretion.
+        </para>
+        <para>
+          TBD - tuning options for netty - currently there are none that are netty specific but we should add some. Esp around max bound on the number of reader worker threads netty creates.
+        </para>
+        <para>
+          TBD - how to manage encryption
+        </para>
+        <para>
+          TBD - how to manage certificates
+        </para>
+
+      </section>
+
     </section>
 
     <section id="sc_zkCommands">

Added: hadoop/zookeeper/trunk/src/java/libtest/accessive.LICENSE.txt
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/libtest/accessive.LICENSE.txt?rev=986575&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/java/libtest/accessive.LICENSE.txt (added)
+++ hadoop/zookeeper/trunk/src/java/libtest/accessive.LICENSE.txt Wed Aug 18 06:24:08 2010
@@ -0,0 +1,202 @@
+
+                                 Apache License
+                           Version 2.0, January 2004
+                        http://www.apache.org/licenses/
+
+   TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
+
+   1. Definitions.
+
+      "License" shall mean the terms and conditions for use, reproduction,
+      and distribution as defined by Sections 1 through 9 of this document.
+
+      "Licensor" shall mean the copyright owner or entity authorized by
+      the copyright owner that is granting the License.
+
+      "Legal Entity" shall mean the union of the acting entity and all
+      other entities that control, are controlled by, or are under common
+      control with that entity. For the purposes of this definition,
+      "control" means (i) the power, direct or indirect, to cause the
+      direction or management of such entity, whether by contract or
+      otherwise, or (ii) ownership of fifty percent (50%) or more of the
+      outstanding shares, or (iii) beneficial ownership of such entity.
+
+      "You" (or "Your") shall mean an individual or Legal Entity
+      exercising permissions granted by this License.
+
+      "Source" form shall mean the preferred form for making modifications,
+      including but not limited to software source code, documentation
+      source, and configuration files.
+
+      "Object" form shall mean any form resulting from mechanical
+      transformation or translation of a Source form, including but
+      not limited to compiled object code, generated documentation,
+      and conversions to other media types.
+
+      "Work" shall mean the work of authorship, whether in Source or
+      Object form, made available under the License, as indicated by a
+      copyright notice that is included in or attached to the work
+      (an example is provided in the Appendix below).
+
+      "Derivative Works" shall mean any work, whether in Source or Object
+      form, that is based on (or derived from) the Work and for which the
+      editorial revisions, annotations, elaborations, or other modifications
+      represent, as a whole, an original work of authorship. For the purposes
+      of this License, Derivative Works shall not include works that remain
+      separable from, or merely link (or bind by name) to the interfaces of,
+      the Work and Derivative Works thereof.
+
+      "Contribution" shall mean any work of authorship, including
+      the original version of the Work and any modifications or additions
+      to that Work or Derivative Works thereof, that is intentionally
+      submitted to Licensor for inclusion in the Work by the copyright owner
+      or by an individual or Legal Entity authorized to submit on behalf of
+      the copyright owner. For the purposes of this definition, "submitted"
+      means any form of electronic, verbal, or written communication sent
+      to the Licensor or its representatives, including but not limited to
+      communication on electronic mailing lists, source code control systems,
+      and issue tracking systems that are managed by, or on behalf of, the
+      Licensor for the purpose of discussing and improving the Work, but
+      excluding communication that is conspicuously marked or otherwise
+      designated in writing by the copyright owner as "Not a Contribution."
+
+      "Contributor" shall mean Licensor and any individual or Legal Entity
+      on behalf of whom a Contribution has been received by Licensor and
+      subsequently incorporated within the Work.
+
+   2. Grant of Copyright License. Subject to the terms and conditions of
+      this License, each Contributor hereby grants to You a perpetual,
+      worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+      copyright license to reproduce, prepare Derivative Works of,
+      publicly display, publicly perform, sublicense, and distribute the
+      Work and such Derivative Works in Source or Object form.
+
+   3. Grant of Patent License. Subject to the terms and conditions of
+      this License, each Contributor hereby grants to You a perpetual,
+      worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+      (except as stated in this section) patent license to make, have made,
+      use, offer to sell, sell, import, and otherwise transfer the Work,
+      where such license applies only to those patent claims licensable
+      by such Contributor that are necessarily infringed by their
+      Contribution(s) alone or by combination of their Contribution(s)
+      with the Work to which such Contribution(s) was submitted. If You
+      institute patent litigation against any entity (including a
+      cross-claim or counterclaim in a lawsuit) alleging that the Work
+      or a Contribution incorporated within the Work constitutes direct
+      or contributory patent infringement, then any patent licenses
+      granted to You under this License for that Work shall terminate
+      as of the date such litigation is filed.
+
+   4. Redistribution. You may reproduce and distribute copies of the
+      Work or Derivative Works thereof in any medium, with or without
+      modifications, and in Source or Object form, provided that You
+      meet the following conditions:
+
+      (a) You must give any other recipients of the Work or
+          Derivative Works a copy of this License; and
+
+      (b) You must cause any modified files to carry prominent notices
+          stating that You changed the files; and
+
+      (c) You must retain, in the Source form of any Derivative Works
+          that You distribute, all copyright, patent, trademark, and
+          attribution notices from the Source form of the Work,
+          excluding those notices that do not pertain to any part of
+          the Derivative Works; and
+
+      (d) If the Work includes a "NOTICE" text file as part of its
+          distribution, then any Derivative Works that You distribute must
+          include a readable copy of the attribution notices contained
+          within such NOTICE file, excluding those notices that do not
+          pertain to any part of the Derivative Works, in at least one
+          of the following places: within a NOTICE text file distributed
+          as part of the Derivative Works; within the Source form or
+          documentation, if provided along with the Derivative Works; or,
+          within a display generated by the Derivative Works, if and
+          wherever such third-party notices normally appear. The contents
+          of the NOTICE file are for informational purposes only and
+          do not modify the License. You may add Your own attribution
+          notices within Derivative Works that You distribute, alongside
+          or as an addendum to the NOTICE text from the Work, provided
+          that such additional attribution notices cannot be construed
+          as modifying the License.
+
+      You may add Your own copyright statement to Your modifications and
+      may provide additional or different license terms and conditions
+      for use, reproduction, or distribution of Your modifications, or
+      for any such Derivative Works as a whole, provided Your use,
+      reproduction, and distribution of the Work otherwise complies with
+      the conditions stated in this License.
+
+   5. Submission of Contributions. Unless You explicitly state otherwise,
+      any Contribution intentionally submitted for inclusion in the Work
+      by You to the Licensor shall be under the terms and conditions of
+      this License, without any additional terms or conditions.
+      Notwithstanding the above, nothing herein shall supersede or modify
+      the terms of any separate license agreement you may have executed
+      with Licensor regarding such Contributions.
+
+   6. Trademarks. This License does not grant permission to use the trade
+      names, trademarks, service marks, or product names of the Licensor,
+      except as required for reasonable and customary use in describing the
+      origin of the Work and reproducing the content of the NOTICE file.
+
+   7. Disclaimer of Warranty. Unless required by applicable law or
+      agreed to in writing, Licensor provides the Work (and each
+      Contributor provides its Contributions) on an "AS IS" BASIS,
+      WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+      implied, including, without limitation, any warranties or conditions
+      of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
+      PARTICULAR PURPOSE. You are solely responsible for determining the
+      appropriateness of using or redistributing the Work and assume any
+      risks associated with Your exercise of permissions under this License.
+
+   8. Limitation of Liability. In no event and under no legal theory,
+      whether in tort (including negligence), contract, or otherwise,
+      unless required by applicable law (such as deliberate and grossly
+      negligent acts) or agreed to in writing, shall any Contributor be
+      liable to You for damages, including any direct, indirect, special,
+      incidental, or consequential damages of any character arising as a
+      result of this License or out of the use or inability to use the
+      Work (including but not limited to damages for loss of goodwill,
+      work stoppage, computer failure or malfunction, or any and all
+      other commercial damages or losses), even if such Contributor
+      has been advised of the possibility of such damages.
+
+   9. Accepting Warranty or Additional Liability. While redistributing
+      the Work or Derivative Works thereof, You may choose to offer,
+      and charge a fee for, acceptance of support, warranty, indemnity,
+      or other liability obligations and/or rights consistent with this
+      License. However, in accepting such obligations, You may act only
+      on Your own behalf and on Your sole responsibility, not on behalf
+      of any other Contributor, and only if You agree to indemnify,
+      defend, and hold each Contributor harmless for any liability
+      incurred by, or claims asserted against, such Contributor by reason
+      of your accepting any such warranty or additional liability.
+
+   END OF TERMS AND CONDITIONS
+
+   APPENDIX: How to apply the Apache License to your work.
+
+      To apply the Apache License to your work, attach the following
+      boilerplate notice, with the fields enclosed by brackets "[]"
+      replaced with your own identifying information. (Don't include
+      the brackets!)  The text should be enclosed in the appropriate
+      comment syntax for the file format. We also recommend that a
+      file or class name and description of purpose be included on the
+      same "printed page" as the copyright notice for easier
+      identification within third-party archives.
+
+   Copyright 1999-2005 The Apache Software Foundation
+
+   Licensed under the Apache License, Version 2.0 (the "License");
+   you may not use this file except in compliance with the License.
+   You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.

Added: hadoop/zookeeper/trunk/src/java/libtest/accessive.jar
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/libtest/accessive.jar?rev=986575&view=auto
==============================================================================
Binary file - no diff available.

Propchange: hadoop/zookeeper/trunk/src/java/libtest/accessive.jar
------------------------------------------------------------------------------
    svn:mime-type = application/octet-stream

Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/ClientCnxn.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/ClientCnxn.java?rev=986575&r1=986574&r2=986575&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/ClientCnxn.java (original)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/ClientCnxn.java Wed Aug 18 06:24:08 2010
@@ -688,6 +688,15 @@ public class ClientCnxn {
         }
 
         void readConnectResult() throws IOException {
+            if (LOG.isTraceEnabled()) {
+                StringBuffer buf = new StringBuffer("0x[");
+                for (byte b : incomingBuffer.array()) {
+                    buf.append(Integer.toHexString(b) + ",");
+                }
+                buf.append("]");
+                LOG.trace("readConnectRestult " + incomingBuffer.remaining() 
+                        + " " + buf.toString());
+            }
             ByteBufferInputStream bbis = new ByteBufferInputStream(
                     incomingBuffer);
             BinaryInputArchive bbia = BinaryInputArchive.getArchive(bbis);
@@ -1154,7 +1163,13 @@ public class ClientCnxn {
             }
             cleanup();
             try {
+                if (LOG.isTraceEnabled()) {
+                    LOG.trace("Doing client selector close");
+                }
                 selector.close();
+                if (LOG.isTraceEnabled()) {
+                    LOG.trace("Closed client selector");
+                }
             } catch (IOException e) {
                 LOG.warn("Ignoring exception during selector close", e);
             }

Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/ZooKeeper.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/ZooKeeper.java?rev=986575&r1=986574&r2=986575&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/ZooKeeper.java (original)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/ZooKeeper.java Wed Aug 18 06:24:08 2010
@@ -438,7 +438,7 @@ public class ZooKeeper {
         LOG.info("Initiating client connection, connectString=" + connectString
                 + " sessionTimeout=" + sessionTimeout
                 + " watcher=" + watcher
-                + " sessionId=" + sessionId
+                + " sessionId=" + Long.toHexString(sessionId)
                 + " sessionPasswd="
                 + (sessionPasswd == null ? "<null>" : "<hidden>"));
 

Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ConnectionBean.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ConnectionBean.java?rev=986575&r1=986574&r2=986575&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ConnectionBean.java (original)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ConnectionBean.java Wed Aug 18 06:24:08 2010
@@ -29,7 +29,6 @@ import javax.management.ObjectName;
 import org.apache.log4j.Logger;
 import org.apache.zookeeper.jmx.MBeanRegistry;
 import org.apache.zookeeper.jmx.ZKMBeanInfo;
-import org.apache.zookeeper.server.NIOServerCnxn.CnxnStats;
 
 /**
  * Implementation of connection MBean interface.
@@ -38,7 +37,7 @@ public class ConnectionBean implements C
     private static final Logger LOG = Logger.getLogger(ConnectionBean.class);
 
     private final ServerCnxn connection;
-    private final CnxnStats stats;
+    private final Stats stats;
 
     private final ZooKeeperServer zk;
     
@@ -47,10 +46,10 @@ public class ConnectionBean implements C
 
     public ConnectionBean(ServerCnxn connection,ZooKeeperServer zk){
         this.connection = connection;
-        this.stats = (CnxnStats)connection.getStats();
+        this.stats = connection;
         this.zk = zk;
         
-        InetSocketAddress sockAddr = connection.getRemoteAddress();
+        InetSocketAddress sockAddr = connection.getRemoteSocketAddress();
         if (sockAddr == null) {
             remoteIP = "Unknown";
         } else {
@@ -69,7 +68,7 @@ public class ConnectionBean implements C
     }
 
     public String getSourceIP() {
-        InetSocketAddress sockAddr = connection.getRemoteAddress();
+        InetSocketAddress sockAddr = connection.getRemoteSocketAddress();
         if (sockAddr == null) {
             return null;
         }
@@ -88,7 +87,7 @@ public class ConnectionBean implements C
     
     public String[] getEphemeralNodes() {
         if(zk.getZKDatabase()  !=null){
-            String[] res= zk.getZKDatabase().getEphemerals(sessionId)
+            String[] res = zk.getZKDatabase().getEphemerals(sessionId)
                 .toArray(new String[0]);
             Arrays.sort(res);
             return res;
@@ -114,7 +113,7 @@ public class ConnectionBean implements C
     }
 
     public void resetCounters() {
-        stats.reset();
+        stats.resetStats();
     }
 
     @Override

Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/FinalRequestProcessor.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/FinalRequestProcessor.java?rev=986575&r1=986574&r2=986575&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/FinalRequestProcessor.java (original)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/FinalRequestProcessor.java Wed Aug 18 06:24:08 2010
@@ -49,8 +49,6 @@ import org.apache.zookeeper.proto.SetWat
 import org.apache.zookeeper.proto.SyncRequest;
 import org.apache.zookeeper.proto.SyncResponse;
 import org.apache.zookeeper.server.DataTree.ProcessTxnResult;
-import org.apache.zookeeper.server.NIOServerCnxn.CnxnStats;
-import org.apache.zookeeper.server.NIOServerCnxn.Factory;
 import org.apache.zookeeper.server.ZooKeeperServer.ChangeRecord;
 import org.apache.zookeeper.txn.CreateSessionTxn;
 import org.apache.zookeeper.txn.ErrorTxn;
@@ -122,7 +120,7 @@ public class FinalRequestProcessor imple
         }
 
         if (request.hdr != null && request.hdr.getType() == OpCode.closeSession) {
-            Factory scxn = zks.getServerCnxnFactory();
+            ServerCnxnFactory scxn = zks.getServerCnxnFactory();
             // this might be possible since
             // we might just be playing diffs from the leader
             if (scxn != null && request.cnxn == null) {
@@ -164,8 +162,7 @@ public class FinalRequestProcessor imple
                 zks.serverStats().updateLatency(request.createTime);
 
                 lastOp = "PING";
-                ((CnxnStats)cnxn.getStats())
-                .updateForResponse(request.cxid, request.zxid, lastOp,
+                cnxn.updateStatsForResponse(request.cxid, request.zxid, lastOp,
                         request.createTime, System.currentTimeMillis());
 
                 cnxn.sendResponse(new ReplyHeader(-2,
@@ -176,11 +173,10 @@ public class FinalRequestProcessor imple
                 zks.serverStats().updateLatency(request.createTime);
 
                 lastOp = "SESS";
-                ((CnxnStats)cnxn.getStats())
-                .updateForResponse(request.cxid, request.zxid, lastOp,
+                cnxn.updateStatsForResponse(request.cxid, request.zxid, lastOp,
                         request.createTime, System.currentTimeMillis());
 
-                cnxn.finishSessionInit(true);
+                zks.finishSessionInit(request.cnxn, true);
                 return;
             }
             case OpCode.create: {
@@ -359,8 +355,7 @@ public class FinalRequestProcessor imple
             new ReplyHeader(request.cxid, request.zxid, err.intValue());
 
         zks.serverStats().updateLatency(request.createTime);
-        ((CnxnStats)cnxn.getStats())
-            .updateForResponse(request.cxid, request.zxid, lastOp,
+        cnxn.updateStatsForResponse(request.cxid, request.zxid, lastOp,
                     request.createTime, System.currentTimeMillis());
 
         try {

Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/NIOServerCnxn.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/NIOServerCnxn.java?rev=986575&r1=986574&r2=986575&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/NIOServerCnxn.java (original)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/NIOServerCnxn.java Wed Aug 18 06:24:08 2010
@@ -21,9 +21,7 @@ package org.apache.zookeeper.server;
 import java.io.BufferedWriter;
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
-import java.io.InputStream;
 import java.io.PrintWriter;
-import java.io.StringWriter;
 import java.io.Writer;
 import java.lang.management.ManagementFactory;
 import java.lang.management.OperatingSystemMXBean;
@@ -31,41 +29,24 @@ import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.nio.ByteBuffer;
 import java.nio.channels.CancelledKeyException;
-import java.nio.channels.Channel;
 import java.nio.channels.SelectionKey;
-import java.nio.channels.Selector;
-import java.nio.channels.ServerSocketChannel;
 import java.nio.channels.SocketChannel;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Date;
-import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.jute.BinaryInputArchive;
 import org.apache.jute.BinaryOutputArchive;
 import org.apache.jute.Record;
 import org.apache.log4j.Logger;
 import org.apache.zookeeper.Environment;
-import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.Version;
 import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.ZooDefs.OpCode;
 import org.apache.zookeeper.data.Id;
-import org.apache.zookeeper.jmx.MBeanRegistry;
-import org.apache.zookeeper.proto.AuthPacket;
-import org.apache.zookeeper.proto.ConnectRequest;
-import org.apache.zookeeper.proto.ConnectResponse;
 import org.apache.zookeeper.proto.ReplyHeader;
 import org.apache.zookeeper.proto.RequestHeader;
 import org.apache.zookeeper.proto.WatcherEvent;
-import org.apache.zookeeper.server.auth.AuthenticationProvider;
-import org.apache.zookeeper.server.auth.ProviderRegistry;
 import org.apache.zookeeper.server.quorum.Leader;
 import org.apache.zookeeper.server.quorum.LeaderZooKeeperServer;
 
@@ -75,321 +56,63 @@ import com.sun.management.UnixOperatingS
  * This class handles communication with clients using NIO. There is one per
  * client, but only one thread doing the communication.
  */
-public class NIOServerCnxn implements Watcher, ServerCnxn {
-    private static final Logger LOG = Logger.getLogger(NIOServerCnxn.class);
+public class NIOServerCnxn extends ServerCnxn {
+    static final Logger LOG = Logger.getLogger(NIOServerCnxn.class);
 
-    private ConnectionBean jmxConnectionBean;
+    NIOServerCnxnFactory factory;
 
-    static public class Factory extends Thread {
-        static {
-            Thread.setDefaultUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
-                public void uncaughtException(Thread t, Throwable e) {
-                    LOG.error("Thread " + t + " died", e);
-                }
-            });
-            /**
-             * this is to avoid the jvm bug:
-             * NullPointerException in Selector.open()
-             * http://bugs.sun.com/view_bug.do?bug_id=6427854
-             */
-            try {
-                Selector.open().close();
-            } catch(IOException ie) {
-                LOG.error("Selector failed to open", ie);
-            }
-        }
-
-        ZooKeeperServer zks;
-
-        final ServerSocketChannel ss;
-
-        final Selector selector = Selector.open();
-
-        /**
-         * We use this buffer to do efficient socket I/O. Since there is a single
-         * sender thread per NIOServerCnxn instance, we can use a member variable to
-         * only allocate it once.
-        */
-        final ByteBuffer directBuffer = ByteBuffer.allocateDirect(64 * 1024);
-
-        final HashSet<NIOServerCnxn> cnxns = new HashSet<NIOServerCnxn>();
-        final HashMap<InetAddress, Set<NIOServerCnxn>> ipMap =
-            new HashMap<InetAddress, Set<NIOServerCnxn>>( );
-
-        int outstandingLimit = 1;
-
-        int maxClientCnxns = 10;
-
-        /**
-         * Construct a new server connection factory which will accept an unlimited number
-         * of concurrent connections from each client (up to the file descriptor
-         * limits of the operating system). startup(zks) must be called subsequently.
-         * @param port
-         * @throws IOException
-         */
-        public Factory(InetSocketAddress addr) throws IOException {
-            this(addr, 0);
-        }
-
-
-        /**
-         * Constructs a new server connection factory where the number of concurrent connections
-         * from a single IP address is limited to maxcc (or unlimited if 0).
-         * startup(zks) must be called subsequently.
-         * @param port - the port to listen on for connections.
-         * @param maxcc - the number of concurrent connections allowed from a single client.
-         * @throws IOException
-         */
-        public Factory(InetSocketAddress addr, int maxcc) throws IOException {
-            super("NIOServerCxn.Factory:" + addr);
-            setDaemon(true);
-            maxClientCnxns = maxcc;
-            this.ss = ServerSocketChannel.open();
-            ss.socket().setReuseAddress(true);
-            LOG.info("binding to port " + addr);
-            ss.socket().bind(addr);
-            ss.configureBlocking(false);
-            ss.register(selector, SelectionKey.OP_ACCEPT);
-        }
+    SocketChannel sock;
 
-        @Override
-        public void start() {
-            // ensure thread is started once and only once
-            if (getState() == Thread.State.NEW) {
-                super.start();
-            }
-        }
-
-        public void startup(ZooKeeperServer zks) throws IOException,
-                InterruptedException {
-            start();
-            zks.startdata();
-            zks.startup();
-            setZooKeeperServer(zks);
-        }
-
-        public void setZooKeeperServer(ZooKeeperServer zks) {
-            this.zks = zks;
-            if (zks != null) {
-                this.outstandingLimit = zks.getGlobalOutstandingLimit();
-                zks.setServerCnxnFactory(this);
-            } else {
-                this.outstandingLimit = 1;
-            }
-        }
-
-        public ZooKeeperServer getZooKeeperServer() {
-            return this.zks;
-        }
-
-        public InetSocketAddress getLocalAddress(){
-            return (InetSocketAddress)ss.socket().getLocalSocketAddress();
-        }
-
-        public int getLocalPort(){
-            return ss.socket().getLocalPort();
-        }
-
-        public int getMaxClientCnxns() {
-            return maxClientCnxns;
-        }
-
-        private void addCnxn(NIOServerCnxn cnxn) {
-            synchronized (cnxns) {
-                cnxns.add(cnxn);
-                synchronized (ipMap){
-                    InetAddress addr = cnxn.sock.socket().getInetAddress();
-                    Set<NIOServerCnxn> s = ipMap.get(addr);
-                    if (s == null) {
-                        // in general we will see 1 connection from each
-                        // host, setting the initial cap to 2 allows us
-                        // to minimize mem usage in the common case
-                        // of 1 entry --  we need to set the initial cap
-                        // to 2 to avoid rehash when the first entry is added
-                        s = new HashSet<NIOServerCnxn>(2);
-                        s.add(cnxn);
-                        ipMap.put(addr,s);
-                    } else {
-                        s.add(cnxn);
-                    }
-                }
-            }
-        }
-
-        protected NIOServerCnxn createConnection(SocketChannel sock,
-                SelectionKey sk) throws IOException {
-            return new NIOServerCnxn(zks, sock, sk, this);
-        }
-
-        private int getClientCnxnCount(InetAddress cl) {
-            // The ipMap lock covers both the map, and its contents
-            // (that is, the cnxn sets shouldn't be modified outside of
-            // this lock)
-            synchronized (ipMap) {
-                Set<NIOServerCnxn> s = ipMap.get(cl);
-                if (s == null) return 0;
-                return s.size();
-            }
-        }
+    private SelectionKey sk;
 
-        public void run() {
-            while (!ss.socket().isClosed()) {
-                try {
-                    selector.select(1000);
-                    Set<SelectionKey> selected;
-                    synchronized (this) {
-                        selected = selector.selectedKeys();
-                    }
-                    ArrayList<SelectionKey> selectedList = new ArrayList<SelectionKey>(
-                            selected);
-                    Collections.shuffle(selectedList);
-                    for (SelectionKey k : selectedList) {
-                        if ((k.readyOps() & SelectionKey.OP_ACCEPT) != 0) {
-                            SocketChannel sc = ((ServerSocketChannel) k
-                                    .channel()).accept();
-                            InetAddress ia = sc.socket().getInetAddress();
-                            int cnxncount = getClientCnxnCount(ia);
-                            if (maxClientCnxns > 0 && cnxncount >= maxClientCnxns){
-                                LOG.warn("Too many connections from " + ia
-                                         + " - max is " + maxClientCnxns );
-                                sc.close();
-                            } else {
-                                LOG.info("Accepted socket connection from "
-                                        + sc.socket().getRemoteSocketAddress());
-                                sc.configureBlocking(false);
-                                SelectionKey sk = sc.register(selector,
-                                        SelectionKey.OP_READ);
-                                NIOServerCnxn cnxn = createConnection(sc, sk);
-                                sk.attach(cnxn);
-                                addCnxn(cnxn);
-                            }
-                        } else if ((k.readyOps() & (SelectionKey.OP_READ | SelectionKey.OP_WRITE)) != 0) {
-                            NIOServerCnxn c = (NIOServerCnxn) k.attachment();
-                            c.doIO(k);
-                        } else {
-                            if (LOG.isDebugEnabled()) {
-                                LOG.debug("Unexpected ops in select "
-                                          + k.readyOps());
-                            }
-                        }
-                    }
-                    selected.clear();
-                } catch (RuntimeException e) {
-                    LOG.warn("Ignoring unexpected runtime exception", e);
-                } catch (Exception e) {
-                    LOG.warn("Ignoring exception", e);
-                }
-            }
-            clear();
-            LOG.info("NIOServerCnxn factory exited run method");
-        }
+    boolean initialized;
 
-        /**
-         * Clear all the connections in the selector.
-         * 
-         * You must first close ss (the serversocketchannel) if you wish
-         * to block any new connections from being established.
-         *
-         */
-        @SuppressWarnings("unchecked")
-        synchronized public void clear() {
-            selector.wakeup();
-            HashSet<NIOServerCnxn> cnxns;
-            synchronized (this.cnxns) {
-                cnxns = (HashSet<NIOServerCnxn>)this.cnxns.clone();
-            }
-            // got to clear all the connections that we have in the selector
-            for (NIOServerCnxn cnxn: cnxns) {
-                try {
-                    // don't hold this.cnxns lock as deadlock may occur
-                    cnxn.close();
-                } catch (Exception e) {
-                    LOG.warn("Ignoring exception closing cnxn sessionid 0x"
-                            + Long.toHexString(cnxn.sessionId), e);
-                }
-            }
-        }
+    ByteBuffer lenBuffer = ByteBuffer.allocate(4);
 
-        public void shutdown() {
-            try {
-                ss.close();
-                clear();
-                this.interrupt();
-                this.join();
-            } catch (InterruptedException e) {
-                LOG.warn("Ignoring interrupted exception during shutdown", e);
-            } catch (Exception e) {
-                LOG.warn("Ignoring unexpected exception during shutdown", e);
-            }
-            try {
-                selector.close();
-            } catch (IOException e) {
-                LOG.warn("Selector closing", e);
-            }
-            if (zks != null) {
-                zks.shutdown();
-            }
-        }
+    ByteBuffer incomingBuffer = lenBuffer;
 
-        synchronized void closeSession(long sessionId) {
-            selector.wakeup();
-            closeSessionWithoutWakeup(sessionId);
-        }
+    LinkedBlockingQueue<ByteBuffer> outgoingBuffers = new LinkedBlockingQueue<ByteBuffer>();
 
-        @SuppressWarnings("unchecked")
-        private void closeSessionWithoutWakeup(long sessionId) {
-            HashSet<NIOServerCnxn> cnxns;
-            synchronized (this.cnxns) {
-                cnxns = (HashSet<NIOServerCnxn>)this.cnxns.clone();
-            }
+    int sessionTimeout;
 
-            for (NIOServerCnxn cnxn : cnxns) {
-                if (cnxn.sessionId == sessionId) {
-                    try {
-                        cnxn.close();
-                    } catch (Exception e) {
-                        LOG.warn("exception during session close", e);
-                    }
-                    break;
-                }
-            }
-        }
-    }
+    private final ZooKeeperServer zkServer;
 
     /**
-     * The buffer will cause the connection to be close when we do a send.
+     * The number of requests that have been submitted but not yet responded to.
      */
-    static final ByteBuffer closeConn = ByteBuffer.allocate(0);
-
-    final Factory factory;
+    int outstandingRequests;
 
-    /** The ZooKeeperServer for this connection. May be null if the server
-     * is not currently serving requests (for example if the server is not
-     * an active quorum participant.
+    /**
+     * This is the id that uniquely identifies the session of a client. Once
+     * this session is no longer active, the ephemeral nodes will go away.
      */
-    private final ZooKeeperServer zk;
-
-    private SocketChannel sock;
-
-    private SelectionKey sk;
-
-    boolean initialized;
-
-    ByteBuffer lenBuffer = ByteBuffer.allocate(4);
-
-    ByteBuffer incomingBuffer = lenBuffer;
-
-    LinkedBlockingQueue<ByteBuffer> outgoingBuffers = new LinkedBlockingQueue<ByteBuffer>();
+    long sessionId;
 
-    int sessionTimeout;
+    static long nextSessionId = 1;
+    int outstandingLimit = 1;
 
-    ArrayList<Id> authInfo = new ArrayList<Id>();
+    public NIOServerCnxn(ZooKeeperServer zk, SocketChannel sock,
+            SelectionKey sk, NIOServerCnxnFactory factory) throws IOException {
+        this.zkServer = zk;
+        this.sock = sock;
+        this.sk = sk;
+        this.factory = factory;
+        if (zk != null) { 
+            outstandingLimit = zk.getGlobalOutstandingLimit();
+        }
+        sock.socket().setTcpNoDelay(true);
+        sock.socket().setSoLinger(true, 2);
+        InetAddress addr = ((InetSocketAddress) sock.socket()
+                .getRemoteSocketAddress()).getAddress();
+        authInfo.add(new Id("ip", addr.getHostAddress()));
+        sk.interestOps(SelectionKey.OP_READ);
+    }
 
     /* Send close connection packet to the client, doIO will eventually
      * close the underlying machinery (like socket, selectorkey, etc...)
      */
     public void sendCloseSession() {
-        sendBuffer(closeConn);
+        sendBuffer(ServerCnxnFactory.closeConn);
     }
 
     /**
@@ -404,7 +127,7 @@ public class NIOServerCnxn implements Wa
             * a tight while loop
             */
            sock.configureBlocking(true);
-           if (bb != closeConn) {
+           if (bb != ServerCnxnFactory.closeConn) {
                if (sock != null) {
                    sock.write(bb);
                }
@@ -415,9 +138,9 @@ public class NIOServerCnxn implements Wa
        }
     }
     
-    void sendBuffer(ByteBuffer bb) {
+    public void sendBuffer(ByteBuffer bb) {
         try {
-            if (bb != closeConn) {
+            if (bb != ServerCnxnFactory.closeConn) {
                 // We check if write interest here because if it is NOT set,
                 // nothing is queued, so we can try to send the buffer right
                 // away without waking up the selector
@@ -452,27 +175,7 @@ public class NIOServerCnxn implements Wa
         }
     }
 
-    private static class CloseRequestException extends IOException {
-        private static final long serialVersionUID = -7854505709816442681L;
-
-        public CloseRequestException(String msg) {
-            super(msg);
-        }
-    }
-
-    private static class EndOfStreamException extends IOException {
-        private static final long serialVersionUID = -8255690282104294178L;
-
-        public EndOfStreamException(String msg) {
-            super(msg);
-        }
-
-        public String toString() {
-            return "EndOfStreamException: " + getMessage();
-        }
-    }
-
-    /** Read the request payload (everything followng the length prefix) */
+    /** Read the request payload (everything following the length prefix) */
     private void readPayload() throws IOException, InterruptedException {
         if (incomingBuffer.remaining() != 0) { // have we read length bytes?
             int rc = sock.read(incomingBuffer); // sock is non-blocking, so ok
@@ -589,7 +292,7 @@ public class NIOServerCnxn implements Wa
                     // Remove the buffers that we have sent
                     while (outgoingBuffers.size() > 0) {
                         bb = outgoingBuffers.peek();
-                        if (bb == closeConn) {
+                        if (bb == ServerCnxnFactory.closeConn) {
                             throw new CloseRequestException("close requested");
                         }
                         int left = bb.remaining() - sent;
@@ -653,61 +356,19 @@ public class NIOServerCnxn implements Wa
     }
 
     private void readRequest() throws IOException {
-        // We have the request, now process and setup for next
-        InputStream bais = new ByteBufferInputStream(incomingBuffer);
-        BinaryInputArchive bia = BinaryInputArchive.getArchive(bais);
-        RequestHeader h = new RequestHeader();
-        h.deserialize(bia, "header");
-        // Through the magic of byte buffers, txn will not be
-        // pointing
-        // to the start of the txn
-        incomingBuffer = incomingBuffer.slice();
-        if (h.getType() == OpCode.auth) {
-            AuthPacket authPacket = new AuthPacket();
-            ZooKeeperServer.byteBuffer2Record(incomingBuffer, authPacket);
-            String scheme = authPacket.getScheme();
-            AuthenticationProvider ap = ProviderRegistry.getProvider(scheme);
-            if (ap == null
-                    || (ap.handleAuthentication(this, authPacket.getAuth())
-                            != KeeperException.Code.OK)) {
-                if (ap == null) {
-                    LOG.warn("No authentication provider for scheme: "
-                            + scheme + " has "
-                            + ProviderRegistry.listProviders());
-                } else {
-                    LOG.warn("Authentication failed for scheme: " + scheme);
-                }
-                // send a response...
-                ReplyHeader rh = new ReplyHeader(h.getXid(), 0,
-                        KeeperException.Code.AUTHFAILED.intValue());
-                sendResponse(rh, null, null);
-                // ... and close connection
-                sendCloseSession();
-                disableRecv();
-            } else {
-                if (LOG.isDebugEnabled()) {
-                    LOG.debug("Authentication succeeded for scheme: "
-                              + scheme);
-                }
-                ReplyHeader rh = new ReplyHeader(h.getXid(), 0,
-                        KeeperException.Code.OK.intValue());
-                sendResponse(rh, null, null);
-            }
-            return;
-        } else {
-            Request si = new Request(this, sessionId, h.getXid(), h.getType(), incomingBuffer, authInfo);
-            si.setOwner(ServerCnxn.me);
-            zk.submitRequest(si);
-        }
+        zkServer.processPacket(this, incomingBuffer);
+    }
+    
+    protected void incrOutstandingRequests(RequestHeader h) {
         if (h.getXid() >= 0) {
             synchronized (this) {
                 outstandingRequests++;
             }
             synchronized (this.factory) {        
                 // check throttling
-                if (zk.getInProcess() > factory.outstandingLimit) {
+                if (zkServer.getInProcess() > outstandingLimit) {
                     if (LOG.isDebugEnabled()) {
-                        LOG.debug("Throttling recv " + zk.getInProcess());
+                        LOG.debug("Throttling recv " + zkServer.getInProcess());
                     }
                     disableRecv();
                     // following lines should not be needed since we are
@@ -717,6 +378,7 @@ public class NIOServerCnxn implements Wa
                 }
             }
         }
+
     }
 
     public void disableRecv() {
@@ -724,210 +386,25 @@ public class NIOServerCnxn implements Wa
     }
 
     public void enableRecv() {
-        if (sk.isValid()) {
-            int interest = sk.interestOps();
-            if ((interest & SelectionKey.OP_READ) == 0) {
-                sk.interestOps(interest | SelectionKey.OP_READ);
+        synchronized (this.factory) {
+            sk.selector().wakeup();
+            if (sk.isValid()) {
+                int interest = sk.interestOps();
+                if ((interest & SelectionKey.OP_READ) == 0) {
+                    sk.interestOps(interest | SelectionKey.OP_READ);
+                }
             }
         }
     }
 
     private void readConnectRequest() throws IOException, InterruptedException {
-        BinaryInputArchive bia = BinaryInputArchive
-                .getArchive(new ByteBufferInputStream(incomingBuffer));
-        ConnectRequest connReq = new ConnectRequest();
-        connReq.deserialize(bia, "connect");
-        if (LOG.isDebugEnabled()) {
-            LOG.debug("Session establishment request from client "
-                    + sock.socket().getRemoteSocketAddress()
-                    + " client's lastZxid is 0x"
-                    + Long.toHexString(connReq.getLastZxidSeen()));
-        }
-        if (zk == null) {
+        if (zkServer == null) {
             throw new IOException("ZooKeeperServer not running");
         }
-        if (connReq.getLastZxidSeen() > zk.getZKDatabase().getDataTreeLastProcessedZxid()) {
-            String msg = "Refusing session request for client "
-                + sock.socket().getRemoteSocketAddress()
-                + " as it has seen zxid 0x"
-                + Long.toHexString(connReq.getLastZxidSeen())
-                + " our last zxid is 0x"
-                + Long.toHexString(zk.getZKDatabase().getDataTreeLastProcessedZxid())
-                + " client must try another server";
-
-            LOG.info(msg);
-            throw new CloseRequestException(msg);
-        }
-        sessionTimeout = connReq.getTimeOut();
-        byte passwd[] = connReq.getPasswd();
-        int minSessionTimeout = zk.getMinSessionTimeout();
-        if (sessionTimeout < minSessionTimeout) {
-            sessionTimeout = minSessionTimeout;
-        }
-        int maxSessionTimeout = zk.getMaxSessionTimeout();
-        if (sessionTimeout > maxSessionTimeout) {
-            sessionTimeout = maxSessionTimeout;
-        }
-        // We don't want to receive any packets until we are sure that the
-        // session is setup
-        disableRecv();
-        if (connReq.getSessionId() != 0) {
-            long clientSessionId = connReq.getSessionId();
-            LOG.info("Client attempting to renew session 0x"
-                    + Long.toHexString(clientSessionId)
-                    + " at " + sock.socket().getRemoteSocketAddress());
-            factory.closeSessionWithoutWakeup(clientSessionId);
-            setSessionId(clientSessionId);
-            zk.reopenSession(this, sessionId, passwd, sessionTimeout);
-        } else {
-            LOG.info("Client attempting to establish new session at "
-                    + sock.socket().getRemoteSocketAddress());
-            zk.createSession(this, passwd, sessionTimeout);
-        }
+        zkServer.processConnectRequest(this, incomingBuffer);
         initialized = true;
     }
 
-    private void packetReceived() {
-        stats.incrPacketsReceived();
-        if (zk != null) {
-            zk.serverStats().incrementPacketsReceived();
-        }
-    }
-
-    private void packetSent() {
-        stats.incrPacketsSent();
-        if (zk != null) {
-            zk.serverStats().incrementPacketsSent();
-        }
-    }
-
-    /*
-     * See <a href="{@docRoot}/../../../docs/zookeeperAdmin.html#sc_zkCommands">
-     * Zk Admin</a>. this link is for all the commands.
-     */
-    private final static int confCmd =
-        ByteBuffer.wrap("conf".getBytes()).getInt();
-
-    /*
-     * See <a href="{@docRoot}/../../../docs/zookeeperAdmin.html#sc_zkCommands">
-     * Zk Admin</a>. this link is for all the commands.
-     */
-    private final static int consCmd =
-        ByteBuffer.wrap("cons".getBytes()).getInt();
-
-    /*
-     * See <a href="{@docRoot}/../../../docs/zookeeperAdmin.html#sc_zkCommands">
-     * Zk Admin</a>. this link is for all the commands.
-     */
-    private final static int crstCmd =
-        ByteBuffer.wrap("crst".getBytes()).getInt();
-
-    /*
-     * See <a href="{@docRoot}/../../../docs/zookeeperAdmin.html#sc_zkCommands">
-     * Zk Admin</a>. this link is for all the commands.
-     */
-    private final static int dumpCmd =
-        ByteBuffer.wrap("dump".getBytes()).getInt();
-
-    /*
-     * See <a href="{@docRoot}/../../../docs/zookeeperAdmin.html#sc_zkCommands">
-     * Zk Admin</a>. this link is for all the commands.
-     */
-    private final static int enviCmd =
-        ByteBuffer.wrap("envi".getBytes()).getInt();
-
-    /*
-     * See <a href="{@docRoot}/../../../docs/zookeeperAdmin.html#sc_zkCommands">
-     * Zk Admin</a>. this link is for all the commands.
-     */
-    private final static int getTraceMaskCmd =
-        ByteBuffer.wrap("gtmk".getBytes()).getInt();
-
-    /*
-     * See <a href="{@docRoot}/../../../docs/zookeeperAdmin.html#sc_zkCommands">
-     * Zk Admin</a>. this link is for all the commands.
-     */
-    private final static int ruokCmd =
-        ByteBuffer.wrap("ruok".getBytes()).getInt();
-    /*
-     * See <a href="{@docRoot}/../../../docs/zookeeperAdmin.html#sc_zkCommands">
-     * Zk Admin</a>. this link is for all the commands.
-     */
-    private final static int setTraceMaskCmd =
-        ByteBuffer.wrap("stmk".getBytes()).getInt();
-
-    /*
-     * See <a href="{@docRoot}/../../../docs/zookeeperAdmin.html#sc_zkCommands">
-     * Zk Admin</a>. this link is for all the commands.
-     */
-    private final static int srvrCmd =
-        ByteBuffer.wrap("srvr".getBytes()).getInt();
-
-    /*
-     * See <a href="{@docRoot}/../../../docs/zookeeperAdmin.html#sc_zkCommands">
-     * Zk Admin</a>. this link is for all the commands.
-     */
-    private final static int srstCmd =
-        ByteBuffer.wrap("srst".getBytes()).getInt();
-
-    /*
-     * See <a href="{@docRoot}/../../../docs/zookeeperAdmin.html#sc_zkCommands">
-     * Zk Admin</a>. this link is for all the commands.
-     */
-    private final static int statCmd =
-        ByteBuffer.wrap("stat".getBytes()).getInt();
-
-    /*
-     * See <a href="{@docRoot}/../../../docs/zookeeperAdmin.html#sc_zkCommands">
-     * Zk Admin</a>. this link is for all the commands.
-     */
-    private final static int wchcCmd =
-        ByteBuffer.wrap("wchc".getBytes()).getInt();
-
-    /*
-     * See <a href="{@docRoot}/../../../docs/zookeeperAdmin.html#sc_zkCommands">
-     * Zk Admin</a>. this link is for all the commands.
-     */
-    private final static int wchpCmd =
-        ByteBuffer.wrap("wchp".getBytes()).getInt();
-
-    /*
-     * See <a href="{@docRoot}/../../../docs/zookeeperAdmin.html#sc_zkCommands">
-     * Zk Admin</a>. this link is for all the commands.
-     */
-    private final static int wchsCmd =
-        ByteBuffer.wrap("wchs".getBytes()).getInt();
-
-    /*
-     * See <a href="{@docRoot}/../../../docs/zookeeperAdmin.html#sc_zkCommands">
-     * Zk Admin</a>. this link is for all the commands.
-     */
-    private final static int mntrCmd = ByteBuffer.wrap("mntr".getBytes())
-            .getInt();
-
-
-    private final static HashMap<Integer, String> cmd2String =
-        new HashMap<Integer, String>();
-
-    // specify all of the commands that are available
-    static {
-        cmd2String.put(confCmd, "conf");
-        cmd2String.put(consCmd, "cons");
-        cmd2String.put(crstCmd, "crst");
-        cmd2String.put(dumpCmd, "dump");
-        cmd2String.put(enviCmd, "envi");
-        cmd2String.put(getTraceMaskCmd, "gtmk");
-        cmd2String.put(ruokCmd, "ruok");
-        cmd2String.put(setTraceMaskCmd, "stmk");
-        cmd2String.put(srstCmd, "srst");
-        cmd2String.put(srvrCmd, "srvr");
-        cmd2String.put(statCmd, "stat");
-        cmd2String.put(wchcCmd, "wchc");
-        cmd2String.put(wchpCmd, "wchp");
-        cmd2String.put(wchsCmd, "wchs");
-        cmd2String.put(mntrCmd, "mntr");
-    }
-
     /**
      * clean up the socket related to a command and also make sure we flush the
      * data before we do that
@@ -1084,10 +561,10 @@ public class NIOServerCnxn implements Wa
             
         @Override
         public void commandRun() {
-            if (zk == null) {
+            if (zkServer == null) {
                 pw.println(ZK_NOT_SERVING);
             } else {
-                zk.dumpConf(pw);
+                zkServer.dumpConf(pw);
             }
         }
     }
@@ -1099,11 +576,11 @@ public class NIOServerCnxn implements Wa
         
         @Override
         public void commandRun() {
-            if (zk == null) {
+            if (zkServer == null) {
                 pw.println(ZK_NOT_SERVING);
             }
             else { 
-                zk.serverStats().reset();
+                zkServer.serverStats().reset();
                 pw.println("Server stats reset.");
             }
         }
@@ -1116,12 +593,12 @@ public class NIOServerCnxn implements Wa
         
         @Override
         public void commandRun() {
-            if (zk == null) {
+            if (zkServer == null) {
                 pw.println(ZK_NOT_SERVING);
             } else {
                 synchronized(factory.cnxns){
-                    for(NIOServerCnxn c : factory.cnxns){
-                        c.getStats().reset();
+                    for(ServerCnxn c : factory.cnxns){
+                        c.resetStats();
                     }
                 }
                 pw.println("Connection stats reset.");
@@ -1136,14 +613,14 @@ public class NIOServerCnxn implements Wa
         
         @Override
         public void commandRun() {
-            if (zk == null) {
+            if (zkServer == null) {
                 pw.println(ZK_NOT_SERVING);
             }
             else {
                 pw.println("SessionTracker dump:");
-                zk.sessionTracker.dumpSessions(pw);
+                zkServer.sessionTracker.dumpSessions(pw);
                 pw.println("ephemeral nodes dump:");
-                zk.dumpEphemerals(pw);
+                zkServer.dumpEphemerals(pw);
             }
         }
     }
@@ -1158,7 +635,7 @@ public class NIOServerCnxn implements Wa
         @SuppressWarnings("unchecked")
         @Override
         public void commandRun() {
-            if (zk == null) {
+            if (zkServer == null) {
                 pw.println(ZK_NOT_SERVING);
             }
             else {   
@@ -1175,14 +652,13 @@ public class NIOServerCnxn implements Wa
                         .cnxns.clone();
                     }
                     for(NIOServerCnxn c : cnxnset){
-                        ((CnxnStats)c.getStats())
-                        .dumpConnectionInfo(pw, true);
+                        c.dumpConnectionInfo(pw, true);
                     }
                     pw.println();
                 }
-                pw.print(zk.serverStats().toString());
+                pw.print(zkServer.serverStats().toString());
                 pw.print("Node count: ");
-                pw.println(zk.getZKDatabase().getNodeCount());
+                pw.println(zkServer.getZKDatabase().getNodeCount());
             }
             
         }
@@ -1196,7 +672,7 @@ public class NIOServerCnxn implements Wa
         @SuppressWarnings("unchecked")
         @Override
         public void commandRun() {
-            if (zk == null) {
+            if (zkServer == null) {
                 pw.println(ZK_NOT_SERVING);
             } else {
                 // clone should be faster than iteration
@@ -1206,7 +682,7 @@ public class NIOServerCnxn implements Wa
                     cnxns = (HashSet<NIOServerCnxn>) factory.cnxns.clone();
                 }
                 for (NIOServerCnxn c : cnxns) {
-                    ((CnxnStats) c.getStats()).dumpConnectionInfo(pw, false);
+                    c.dumpConnectionInfo(pw, false);
                 }
                 pw.println();
             }
@@ -1222,10 +698,10 @@ public class NIOServerCnxn implements Wa
 
         @Override
         public void commandRun() {
-            if (zk == null) {
+            if (zkServer == null) {
                 pw.println(ZK_NOT_SERVING);
             } else {
-                DataTree dt = zk.getZKDatabase().getDataTree();
+                DataTree dt = zkServer.getZKDatabase().getDataTree();
                 if (len == wchsCmd) {
                     dt.dumpWatchesSummary(pw);
                 } else if (len == wchpCmd) {
@@ -1246,12 +722,12 @@ public class NIOServerCnxn implements Wa
 
         @Override
         public void commandRun() {
-            if(zk == null) {
+            if(zkServer == null) {
                 pw.println(ZK_NOT_SERVING);
                 return;
             }
-            ZKDatabase zkdb = zk.getZKDatabase();
-            ServerStats stats = zk.serverStats();
+            ZKDatabase zkdb = zkServer.getZKDatabase();
+            ServerStats stats = zkServer.serverStats();
 
             print("version", Version.getFullVersion());
 
@@ -1280,7 +756,7 @@ public class NIOServerCnxn implements Wa
             }
 
             if(stats.getServerState() == "leader") {
-                Leader leader = ((LeaderZooKeeperServer)zk).getLeader();
+                Leader leader = ((LeaderZooKeeperServer)zkServer).getLeader();
 
                 print("followers", leader.learners.size());
                 print("synced_followers", leader.forwardingFollowers.size());
@@ -1405,23 +881,26 @@ public class NIOServerCnxn implements Wa
     private boolean readLength(SelectionKey k) throws IOException {
         // Read the length, now get the buffer
         int len = lenBuffer.getInt();
-        if (!initialized && checkFourLetterWord(k, len)) {
+        if (!initialized && checkFourLetterWord(sk, len)) {
             return false;
         }
         if (len < 0 || len > BinaryInputArchive.maxBuffer) {
             throw new IOException("Len error " + len);
         }
-        if (zk == null) {
+        if (zkServer == null) {
             throw new IOException("ZooKeeperServer not running");
         }
         incomingBuffer = ByteBuffer.allocate(len);
         return true;
     }
 
-    /**
-     * The number of requests that have been submitted but not yet responded to.
-     */
-    int outstandingRequests;
+    public long getOutstandingRequests() {
+        synchronized (this) {
+            synchronized (this.factory) {
+                return outstandingRequests;
+            }
+        }
+    }
 
     /*
      * (non-Javadoc)
@@ -1432,28 +911,6 @@ public class NIOServerCnxn implements Wa
         return sessionTimeout;
     }
 
-    /**
-     * This is the id that uniquely identifies the session of a client. Once
-     * this session is no longer active, the ephemeral nodes will go away.
-     */
-    long sessionId;
-
-    static long nextSessionId = 1;
-
-    public NIOServerCnxn(ZooKeeperServer zk, SocketChannel sock,
-            SelectionKey sk, Factory factory) throws IOException {
-        this.zk = zk;
-        this.sock = sock;
-        this.sk = sk;
-        this.factory = factory;
-        sock.socket().setTcpNoDelay(true);
-        sock.socket().setSoLinger(true, 2);
-        InetAddress addr = ((InetSocketAddress) sock.socket()
-                .getRemoteSocketAddress()).getAddress();
-        authInfo.add(new Id("ip", addr.getHostAddress()));
-        sk.interestOps(SelectionKey.OP_READ);
-    }
-
     @Override
     public String toString() {
         return "NIOServerCnxn object with sock = " + sock + " and sk = " + sk;
@@ -1464,6 +921,7 @@ public class NIOServerCnxn implements Wa
      * 
      * This function returns immediately if the cnxn is not on the cnxns list.
      */
+    @Override
     public void close() {
         synchronized(factory.cnxns){
             // if this is not in cnxns then it's already closed
@@ -1477,18 +935,10 @@ public class NIOServerCnxn implements Wa
                 s.remove(this);
             }
 
-            // unregister from JMX
-            try {
-                if(jmxConnectionBean != null){
-                    MBeanRegistry.getInstance().unregister(jmxConnectionBean);
-                }
-            } catch (Exception e) {
-                LOG.warn("Failed to unregister with JMX", e);
-            }
-            jmxConnectionBean = null;
-    
-            if (zk != null) {
-                zk.removeCnxn(this);
+            factory.unregisterConnection(this);
+
+            if (zkServer != null) {
+                zkServer.removeCnxn(this);
             }
     
             closeSock();
@@ -1570,6 +1020,7 @@ public class NIOServerCnxn implements Wa
      * @see org.apache.zookeeper.server.ServerCnxnIface#sendResponse(org.apache.zookeeper.proto.ReplyHeader,
      *      org.apache.jute.Record, java.lang.String)
      */
+    @Override
     synchronized public void sendResponse(ReplyHeader h, Record r, String tag) {
         try {
             ByteArrayOutputStream baos = new ByteArrayOutputStream();
@@ -1595,7 +1046,7 @@ public class NIOServerCnxn implements Wa
                 }
                 // check throttling
                 synchronized (this.factory) {        
-                    if (zk.getInProcess() < factory.outstandingLimit
+                    if (zkServer.getInProcess() < outstandingLimit
                             || outstandingRequests < 1) {
                         sk.selector().wakeup();
                         enableRecv();
@@ -1612,6 +1063,7 @@ public class NIOServerCnxn implements Wa
      *
      * @see org.apache.zookeeper.server.ServerCnxnIface#process(org.apache.zookeeper.proto.WatcherEvent)
      */
+    @Override
     synchronized public void process(WatchedEvent event) {
         ReplyHeader h = new ReplyHeader(-1, -1L, 0);
         if (LOG.isTraceEnabled()) {
@@ -1627,274 +1079,45 @@ public class NIOServerCnxn implements Wa
         sendResponse(h, e, "notification");
     }
 
-    public void finishSessionInit(boolean valid) {
-        // register with JMX
-        try {
-            jmxConnectionBean = new ConnectionBean(this, zk);
-            MBeanRegistry.getInstance().register(jmxConnectionBean, zk.jmxServerBean);
-        } catch (Exception e) {
-            LOG.warn("Failed to register with JMX", e);
-            jmxConnectionBean = null;
-        }
-
-        try {
-            ConnectResponse rsp = new ConnectResponse(0, valid ? sessionTimeout
-                    : 0, valid ? sessionId : 0, // send 0 if session is no
-                    // longer valid
-                    valid ? zk.generatePasswd(sessionId) : new byte[16]);
-            ByteArrayOutputStream baos = new ByteArrayOutputStream();
-            BinaryOutputArchive bos = BinaryOutputArchive.getArchive(baos);
-            bos.writeInt(-1, "len");
-            rsp.serialize(bos, "connect");
-            baos.close();
-            ByteBuffer bb = ByteBuffer.wrap(baos.toByteArray());
-            bb.putInt(bb.remaining() - 4).rewind();
-            sendBuffer(bb);
-
-            if (!valid) {
-                LOG.info("Invalid session 0x"
-                        + Long.toHexString(sessionId)
-                        + " for client "
-                        + sock.socket().getRemoteSocketAddress()
-                        + ", probably expired");
-                sendCloseSession();
-            } else {
-                LOG.info("Established session 0x"
-                        + Long.toHexString(sessionId)
-                        + " with negotiated timeout " + sessionTimeout
-                        + " for client "
-                        + sock.socket().getRemoteSocketAddress());
-            }
-
-            // Now that the session is ready we can start receiving packets
-            synchronized (this.factory) {
-                sk.selector().wakeup();
-                enableRecv();
-            }
-        } catch (Exception e) {
-            LOG.warn("Exception while establishing session, closing", e);
-            close();
-        }
-    }
-
     /*
      * (non-Javadoc)
      *
      * @see org.apache.zookeeper.server.ServerCnxnIface#getSessionId()
      */
+    @Override
     public long getSessionId() {
         return sessionId;
     }
 
+    @Override
     public void setSessionId(long sessionId) {
         this.sessionId = sessionId;
     }
 
-    public ArrayList<Id> getAuthInfo() {
-        return authInfo;
+    @Override
+    public void setSessionTimeout(int sessionTimeout) {
+        this.sessionTimeout = sessionTimeout;
+    }
+
+    @Override
+    public int getInterestOps() {
+        return sk.isValid() ? sk.interestOps() : 0;
     }
 
-    public synchronized InetSocketAddress getRemoteAddress() {
+    @Override
+    public InetSocketAddress getRemoteSocketAddress() {
         if (sock == null) {
             return null;
         }
         return (InetSocketAddress) sock.socket().getRemoteSocketAddress();
     }
 
-    class CnxnStats implements ServerCnxn.Stats {
-        private final Date established = new Date();
-
-        private final AtomicLong packetsReceived = new AtomicLong();
-        private final AtomicLong packetsSent = new AtomicLong();
-
-        private long minLatency;
-        private long maxLatency;
-        private String lastOp;
-        private long lastCxid;
-        private long lastZxid;
-        private long lastResponseTime;
-        private long lastLatency;
-
-        private long count;
-        private long totalLatency;
-
-        CnxnStats() {
-            reset();
-        }
-
-        public synchronized void reset() {
-            packetsReceived.set(0);
-            packetsSent.set(0);
-            minLatency = Long.MAX_VALUE;
-            maxLatency = 0;
-            lastOp = "NA";
-            lastCxid = -1;
-            lastZxid = -1;
-            lastResponseTime = 0;
-            lastLatency = 0;
-
-            count = 0;
-            totalLatency = 0;
-        }
-
-        long incrPacketsReceived() {
-            return packetsReceived.incrementAndGet();
-        }
-
-        long incrPacketsSent() {
-            return packetsSent.incrementAndGet();
-        }
-
-        synchronized void updateForResponse(long cxid, long zxid, String op,
-                long start, long end)
-        {
-            // don't overwrite with "special" xids - we're interested
-            // in the clients last real operation
-            if (cxid >= 0) {
-                lastCxid = cxid;
-            }
-            lastZxid = zxid;
-            lastOp = op;
-            lastResponseTime = end;
-            long elapsed = end - start;
-            lastLatency = elapsed;
-            if (elapsed < minLatency) {
-                minLatency = elapsed;
-            }
-            if (elapsed > maxLatency) {
-                maxLatency = elapsed;
-            }
-            count++;
-            totalLatency += elapsed;
-        }
-
-        public Date getEstablished() {
-            return established;
-        }
-
-        public long getOutstandingRequests() {
-            synchronized (NIOServerCnxn.this) {
-                synchronized (NIOServerCnxn.this.factory) {
-                    return outstandingRequests;
-                }
-            }
-        }
-
-        public long getPacketsReceived() {
-            return packetsReceived.longValue();
-        }
-
-        public long getPacketsSent() {
-            return packetsSent.longValue();
-        }
-
-        public synchronized long getMinLatency() {
-            return minLatency == Long.MAX_VALUE ? 0 : minLatency;
-        }
-
-        public synchronized long getAvgLatency() {
-            return count == 0 ? 0 : totalLatency / count;
-        }
-
-        public synchronized long getMaxLatency() {
-            return maxLatency;
-        }
-
-        public synchronized String getLastOperation() {
-            return lastOp;
-        }
-
-        public synchronized long getLastCxid() {
-            return lastCxid;
-        }
-
-        public synchronized long getLastZxid() {
-            return lastZxid;
-        }
-
-        public synchronized long getLastResponseTime() {
-            return lastResponseTime;
-        }
-
-        public synchronized long getLastLatency() {
-            return lastLatency;
-        }
-
-        /**
-         * Prints detailed stats information for the connection.
-         *
-         * @see dumpConnectionInfo(PrintWriter, boolean) for brief stats
-         */
-        @Override
-        public String toString() {
-            StringWriter sw = new StringWriter();
-            PrintWriter pwriter = new PrintWriter(sw);
-            dumpConnectionInfo(pwriter, false);
-            pwriter.flush();
-            pwriter.close();
-            return sw.toString();
-        }
-
-        /**
-         * Print information about the connection.
-         * @param brief iff true prints brief details, otw full detail
-         * @return information about this connection
-         */
-        public synchronized void
-        dumpConnectionInfo(PrintWriter pwriter, boolean brief)
-        {
-            Channel channel = sk.channel();
-            if (channel instanceof SocketChannel) {
-                pwriter.print(" ");
-                pwriter.print(((SocketChannel)channel).socket()
-                        .getRemoteSocketAddress());
-                pwriter.print("[");
-                pwriter.print(sk.isValid() ? Integer.toHexString(sk.interestOps())
-                        : "0");
-                pwriter.print("](queued=");
-                pwriter.print(getOutstandingRequests());
-                pwriter.print(",recved=");
-                pwriter.print(getPacketsReceived());
-                pwriter.print(",sent=");
-                pwriter.print(getPacketsSent());
-
-                if (!brief) {
-                    long sessionId = getSessionId();
-                    if (sessionId != 0) {
-                        pwriter.print(",sid=0x");
-                        pwriter.print(Long.toHexString(sessionId));
-                        pwriter.print(",lop=");
-                        pwriter.print(getLastOperation());
-                        pwriter.print(",est=");
-                        pwriter.print(getEstablished().getTime());
-                        pwriter.print(",to=");
-                        pwriter.print(getSessionTimeout());
-                        long lastCxid = getLastCxid();
-                        if (lastCxid >= 0) {
-                            pwriter.print(",lcxid=0x");
-                            pwriter.print(Long.toHexString(lastCxid));
-                        }
-                        pwriter.print(",lzxid=0x");
-                        pwriter.print(Long.toHexString(getLastZxid()));
-                        pwriter.print(",lresp=");
-                        pwriter.print(getLastResponseTime());
-                        pwriter.print(",llat=");
-                        pwriter.print(getLastLatency());
-                        pwriter.print(",minlat=");
-                        pwriter.print(getMinLatency());
-                        pwriter.print(",avglat=");
-                        pwriter.print(getAvgLatency());
-                        pwriter.print(",maxlat=");
-                        pwriter.print(getMaxLatency());
-                    }
-                }
-                pwriter.println(")");
-            }
+    @Override
+    protected ServerStats serverStats() {
+        if (zkServer == null) {
+            return null;
         }
+        return zkServer.serverStats();
     }
 
-    private final CnxnStats stats = new CnxnStats();
-    public Stats getStats() {
-        return stats;
-    }
 }