You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by ph...@apache.org on 2010/08/18 08:24:09 UTC
svn commit: r986575 [1/4] - in /hadoop/zookeeper/trunk: ./
src/docs/src/documentation/content/xdocs/ src/java/libtest/
src/java/main/org/apache/zookeeper/
src/java/main/org/apache/zookeeper/server/
src/java/main/org/apache/zookeeper/server/auth/ src/ja...
Author: phunt
Date: Wed Aug 18 06:24:08 2010
New Revision: 986575
URL: http://svn.apache.org/viewvc?rev=986575&view=rev
Log:
ZOOKEEPER-733. use netty to handle client connections
Added:
hadoop/zookeeper/trunk/src/java/libtest/
hadoop/zookeeper/trunk/src/java/libtest/accessive.LICENSE.txt
hadoop/zookeeper/trunk/src/java/libtest/accessive.jar (with props)
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/NIOServerCnxnFactory.java
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/NettyServerCnxn.java
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/NettyServerCnxnFactory.java
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ServerCnxnFactory.java
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/Stats.java
hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/NioNettySuiteBase.java
hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/NioNettySuiteHammerTest.java
hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/NioNettySuiteTest.java
Modified:
hadoop/zookeeper/trunk/CHANGES.txt
hadoop/zookeeper/trunk/build.xml
hadoop/zookeeper/trunk/ivy.xml
hadoop/zookeeper/trunk/src/docs/src/documentation/content/xdocs/zookeeperAdmin.xml
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/ClientCnxn.java
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/ZooKeeper.java
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ConnectionBean.java
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/FinalRequestProcessor.java
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/NIOServerCnxn.java
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/PrepRequestProcessor.java
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ServerCnxn.java
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/SessionTrackerImpl.java
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/SyncRequestProcessor.java
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ZooKeeperServerBean.java
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ZooKeeperServerMain.java
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/auth/DigestAuthenticationProvider.java
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/auth/IPAuthenticationProvider.java
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/CommitProcessor.java
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FastLeaderElection.java
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Follower.java
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FollowerRequestProcessor.java
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FollowerZooKeeperServer.java
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Leader.java
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LeaderZooKeeperServer.java
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Learner.java
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LearnerZooKeeperServer.java
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LocalPeerBean.java
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/ObserverRequestProcessor.java
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/ProposalRequestProcessor.java
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeerMain.java
hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/server/CRCTest.java
hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/server/InvalidSnapshotTest.java
hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/ACLTest.java
hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/AsyncHammerTest.java
hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/ClientBase.java
hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/ClientPortBindTest.java
hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/DisconnectableZooKeeper.java
hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/InvalidSnapshotTest.java
hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/LENonTerminateTest.java
hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/OOMTest.java
hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/ObserverTest.java
hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/PurgeTxnTest.java
hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/QuorumBase.java
hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/QuorumTest.java
hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/RecoveryTest.java
hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/RepeatStartupTest.java
hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/SessionTest.java
hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/TruncateTest.java
hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/UpgradeTest.java
hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/WatcherTest.java
Modified: hadoop/zookeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/CHANGES.txt?rev=986575&r1=986574&r2=986575&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/CHANGES.txt (original)
+++ hadoop/zookeeper/trunk/CHANGES.txt Wed Aug 18 06:24:08 2010
@@ -107,6 +107,8 @@ IMPROVEMENTS:
ZOOKEEPER-809. Improved REST Interface (Andrei Savu via phunt)
+ ZOOKEEPER-733. use netty to handle client connections (breed and phunt)
+
NEW FEATURES:
ZOOKEEPER-729. Java client API to recursively delete a subtree.
(Kay Kay via henry)
Modified: hadoop/zookeeper/trunk/build.xml
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/build.xml?rev=986575&r1=986574&r2=986575&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/build.xml (original)
+++ hadoop/zookeeper/trunk/build.xml Wed Aug 18 06:24:08 2010
@@ -62,6 +62,7 @@
<property name="test.java.build.dir" value="${build.dir}/test"/>
<property name="test.java.classes" value="${test.java.build.dir}/classes"/>
+ <property name="test.lib.dir" value="${src.dir}/java/libtest" />
<property name="test.src.dir" value="${src.dir}/java/test"/>
<property name="systest.src.dir" value="${src.dir}/java/systest"/>
<property name="test.log.dir" value="${test.java.build.dir}/logs" />
@@ -335,6 +336,10 @@
</target>
<target name="compile-test" depends="ivy-retrieve-test,compile">
+ <copy todir="${ivy.test.lib}">
+ <fileset dir="${test.lib.dir}" includes="*.jar"/>
+ </copy>
+
<mkdir dir="${test.java.classes}"/>
<javac srcdir="${test.src.dir}" destdir="${test.java.classes}"
target="${javac.target}" debug="on">
Modified: hadoop/zookeeper/trunk/ivy.xml
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/ivy.xml?rev=986575&r1=986574&r2=986575&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/ivy.xml (original)
+++ hadoop/zookeeper/trunk/ivy.xml Wed Aug 18 06:24:08 2010
@@ -32,10 +32,18 @@
<conf name="releaseaudit" visibility="private" description="Artifacts required for releaseaudit target"/>
</configurations>
+ <publications>
+ <artifact name='org.jboss.netty' type='jar' ext='jar' />
+ </publications>
+
<dependencies>
<!-- transitive false turns off dependency checking, log4j deps seem borked -->
- <dependency org="log4j" name="log4j" rev="1.2.15" transitive="false"/>
- <dependency org="jline" name="jline" rev="0.9.94" transitive="false"/>
+ <dependency org="log4j" name="log4j" rev="1.2.15" transitive="false" conf="default"/>
+ <dependency org="jline" name="jline" rev="0.9.94" transitive="false" conf="default"/>
+
+ <dependency org="org.jboss.netty" name="netty" conf="default" rev="3.1.5.GA">
+ <artifact name="netty" type="jar" conf="default"/>
+ </dependency>
<dependency org="junit" name="junit" rev="4.8.1" conf="test->default"/>
<dependency org="checkstyle" name="checkstyle" rev="5.0"
Modified: hadoop/zookeeper/trunk/src/docs/src/documentation/content/xdocs/zookeeperAdmin.xml
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/docs/src/documentation/content/xdocs/zookeeperAdmin.xml?rev=986575&r1=986574&r2=986575&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/docs/src/documentation/content/xdocs/zookeeperAdmin.xml (original)
+++ hadoop/zookeeper/trunk/src/docs/src/documentation/content/xdocs/zookeeperAdmin.xml Wed Aug 18 06:24:08 2010
@@ -1016,6 +1016,43 @@ server.3=zoo3:2888:3888</programlisting>
</varlistentry>
</variablelist>
</section>
+
+ <section>
+ <title>Communication using the Netty framework</title>
+
+ <para><emphasis role="bold">New in
+ 3.4:</emphasis> <ulink url="http://jboss.org/netty">Netty</ulink>
+ is an NIO based client/server communication framework, it
+ simplifies (over NIO being used directly) many of the
+ complexities of network level communication for java
+ applications. Additionally the Netty framework has built
+ in support for encryption (SSL) and authentication
+ (certificates). These are optional features and can be
+ turned on or off individually.
+ </para>
+ <para>Prior to version 3.4 ZooKeeper has always used NIO
+ directly, however in versions 3.4 and later Netty is
+ supported as an option to NIO (replaces). NIO continues to
+ be the default, however Netty based communication can be
+ used in place of NIO by setting the environment variable
+ "zookeeper.serverCnxnFactory" to
+ "org.apache.zookeeper.server.NettyServerCnxnFactory". You
+ have the option of setting this on either the client(s) or
+ server(s), typically you would want to set this on both,
+ however that is at your discretion.
+ </para>
+ <para>
+ TBD - tuning options for netty - currently there are none that are netty specific but we should add some. Esp around max bound on the number of reader worker threads netty creates.
+ </para>
+ <para>
+ TBD - how to manage encryption
+ </para>
+ <para>
+ TBD - how to manage certificates
+ </para>
+
+ </section>
+
</section>
<section id="sc_zkCommands">
Added: hadoop/zookeeper/trunk/src/java/libtest/accessive.LICENSE.txt
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/libtest/accessive.LICENSE.txt?rev=986575&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/java/libtest/accessive.LICENSE.txt (added)
+++ hadoop/zookeeper/trunk/src/java/libtest/accessive.LICENSE.txt Wed Aug 18 06:24:08 2010
@@ -0,0 +1,202 @@
+
+ Apache License
+ Version 2.0, January 2004
+ http://www.apache.org/licenses/
+
+ TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
+
+ 1. Definitions.
+
+ "License" shall mean the terms and conditions for use, reproduction,
+ and distribution as defined by Sections 1 through 9 of this document.
+
+ "Licensor" shall mean the copyright owner or entity authorized by
+ the copyright owner that is granting the License.
+
+ "Legal Entity" shall mean the union of the acting entity and all
+ other entities that control, are controlled by, or are under common
+ control with that entity. For the purposes of this definition,
+ "control" means (i) the power, direct or indirect, to cause the
+ direction or management of such entity, whether by contract or
+ otherwise, or (ii) ownership of fifty percent (50%) or more of the
+ outstanding shares, or (iii) beneficial ownership of such entity.
+
+ "You" (or "Your") shall mean an individual or Legal Entity
+ exercising permissions granted by this License.
+
+ "Source" form shall mean the preferred form for making modifications,
+ including but not limited to software source code, documentation
+ source, and configuration files.
+
+ "Object" form shall mean any form resulting from mechanical
+ transformation or translation of a Source form, including but
+ not limited to compiled object code, generated documentation,
+ and conversions to other media types.
+
+ "Work" shall mean the work of authorship, whether in Source or
+ Object form, made available under the License, as indicated by a
+ copyright notice that is included in or attached to the work
+ (an example is provided in the Appendix below).
+
+ "Derivative Works" shall mean any work, whether in Source or Object
+ form, that is based on (or derived from) the Work and for which the
+ editorial revisions, annotations, elaborations, or other modifications
+ represent, as a whole, an original work of authorship. For the purposes
+ of this License, Derivative Works shall not include works that remain
+ separable from, or merely link (or bind by name) to the interfaces of,
+ the Work and Derivative Works thereof.
+
+ "Contribution" shall mean any work of authorship, including
+ the original version of the Work and any modifications or additions
+ to that Work or Derivative Works thereof, that is intentionally
+ submitted to Licensor for inclusion in the Work by the copyright owner
+ or by an individual or Legal Entity authorized to submit on behalf of
+ the copyright owner. For the purposes of this definition, "submitted"
+ means any form of electronic, verbal, or written communication sent
+ to the Licensor or its representatives, including but not limited to
+ communication on electronic mailing lists, source code control systems,
+ and issue tracking systems that are managed by, or on behalf of, the
+ Licensor for the purpose of discussing and improving the Work, but
+ excluding communication that is conspicuously marked or otherwise
+ designated in writing by the copyright owner as "Not a Contribution."
+
+ "Contributor" shall mean Licensor and any individual or Legal Entity
+ on behalf of whom a Contribution has been received by Licensor and
+ subsequently incorporated within the Work.
+
+ 2. Grant of Copyright License. Subject to the terms and conditions of
+ this License, each Contributor hereby grants to You a perpetual,
+ worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+ copyright license to reproduce, prepare Derivative Works of,
+ publicly display, publicly perform, sublicense, and distribute the
+ Work and such Derivative Works in Source or Object form.
+
+ 3. Grant of Patent License. Subject to the terms and conditions of
+ this License, each Contributor hereby grants to You a perpetual,
+ worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+ (except as stated in this section) patent license to make, have made,
+ use, offer to sell, sell, import, and otherwise transfer the Work,
+ where such license applies only to those patent claims licensable
+ by such Contributor that are necessarily infringed by their
+ Contribution(s) alone or by combination of their Contribution(s)
+ with the Work to which such Contribution(s) was submitted. If You
+ institute patent litigation against any entity (including a
+ cross-claim or counterclaim in a lawsuit) alleging that the Work
+ or a Contribution incorporated within the Work constitutes direct
+ or contributory patent infringement, then any patent licenses
+ granted to You under this License for that Work shall terminate
+ as of the date such litigation is filed.
+
+ 4. Redistribution. You may reproduce and distribute copies of the
+ Work or Derivative Works thereof in any medium, with or without
+ modifications, and in Source or Object form, provided that You
+ meet the following conditions:
+
+ (a) You must give any other recipients of the Work or
+ Derivative Works a copy of this License; and
+
+ (b) You must cause any modified files to carry prominent notices
+ stating that You changed the files; and
+
+ (c) You must retain, in the Source form of any Derivative Works
+ that You distribute, all copyright, patent, trademark, and
+ attribution notices from the Source form of the Work,
+ excluding those notices that do not pertain to any part of
+ the Derivative Works; and
+
+ (d) If the Work includes a "NOTICE" text file as part of its
+ distribution, then any Derivative Works that You distribute must
+ include a readable copy of the attribution notices contained
+ within such NOTICE file, excluding those notices that do not
+ pertain to any part of the Derivative Works, in at least one
+ of the following places: within a NOTICE text file distributed
+ as part of the Derivative Works; within the Source form or
+ documentation, if provided along with the Derivative Works; or,
+ within a display generated by the Derivative Works, if and
+ wherever such third-party notices normally appear. The contents
+ of the NOTICE file are for informational purposes only and
+ do not modify the License. You may add Your own attribution
+ notices within Derivative Works that You distribute, alongside
+ or as an addendum to the NOTICE text from the Work, provided
+ that such additional attribution notices cannot be construed
+ as modifying the License.
+
+ You may add Your own copyright statement to Your modifications and
+ may provide additional or different license terms and conditions
+ for use, reproduction, or distribution of Your modifications, or
+ for any such Derivative Works as a whole, provided Your use,
+ reproduction, and distribution of the Work otherwise complies with
+ the conditions stated in this License.
+
+ 5. Submission of Contributions. Unless You explicitly state otherwise,
+ any Contribution intentionally submitted for inclusion in the Work
+ by You to the Licensor shall be under the terms and conditions of
+ this License, without any additional terms or conditions.
+ Notwithstanding the above, nothing herein shall supersede or modify
+ the terms of any separate license agreement you may have executed
+ with Licensor regarding such Contributions.
+
+ 6. Trademarks. This License does not grant permission to use the trade
+ names, trademarks, service marks, or product names of the Licensor,
+ except as required for reasonable and customary use in describing the
+ origin of the Work and reproducing the content of the NOTICE file.
+
+ 7. Disclaimer of Warranty. Unless required by applicable law or
+ agreed to in writing, Licensor provides the Work (and each
+ Contributor provides its Contributions) on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ implied, including, without limitation, any warranties or conditions
+ of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
+ PARTICULAR PURPOSE. You are solely responsible for determining the
+ appropriateness of using or redistributing the Work and assume any
+ risks associated with Your exercise of permissions under this License.
+
+ 8. Limitation of Liability. In no event and under no legal theory,
+ whether in tort (including negligence), contract, or otherwise,
+ unless required by applicable law (such as deliberate and grossly
+ negligent acts) or agreed to in writing, shall any Contributor be
+ liable to You for damages, including any direct, indirect, special,
+ incidental, or consequential damages of any character arising as a
+ result of this License or out of the use or inability to use the
+ Work (including but not limited to damages for loss of goodwill,
+ work stoppage, computer failure or malfunction, or any and all
+ other commercial damages or losses), even if such Contributor
+ has been advised of the possibility of such damages.
+
+ 9. Accepting Warranty or Additional Liability. While redistributing
+ the Work or Derivative Works thereof, You may choose to offer,
+ and charge a fee for, acceptance of support, warranty, indemnity,
+ or other liability obligations and/or rights consistent with this
+ License. However, in accepting such obligations, You may act only
+ on Your own behalf and on Your sole responsibility, not on behalf
+ of any other Contributor, and only if You agree to indemnify,
+ defend, and hold each Contributor harmless for any liability
+ incurred by, or claims asserted against, such Contributor by reason
+ of your accepting any such warranty or additional liability.
+
+ END OF TERMS AND CONDITIONS
+
+ APPENDIX: How to apply the Apache License to your work.
+
+ To apply the Apache License to your work, attach the following
+ boilerplate notice, with the fields enclosed by brackets "[]"
+ replaced with your own identifying information. (Don't include
+ the brackets!) The text should be enclosed in the appropriate
+ comment syntax for the file format. We also recommend that a
+ file or class name and description of purpose be included on the
+ same "printed page" as the copyright notice for easier
+ identification within third-party archives.
+
+ Copyright 1999-2005 The Apache Software Foundation
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
Added: hadoop/zookeeper/trunk/src/java/libtest/accessive.jar
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/libtest/accessive.jar?rev=986575&view=auto
==============================================================================
Binary file - no diff available.
Propchange: hadoop/zookeeper/trunk/src/java/libtest/accessive.jar
------------------------------------------------------------------------------
svn:mime-type = application/octet-stream
Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/ClientCnxn.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/ClientCnxn.java?rev=986575&r1=986574&r2=986575&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/ClientCnxn.java (original)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/ClientCnxn.java Wed Aug 18 06:24:08 2010
@@ -688,6 +688,15 @@ public class ClientCnxn {
}
void readConnectResult() throws IOException {
+ if (LOG.isTraceEnabled()) {
+ StringBuffer buf = new StringBuffer("0x[");
+ for (byte b : incomingBuffer.array()) {
+ buf.append(Integer.toHexString(b) + ",");
+ }
+ buf.append("]");
+ LOG.trace("readConnectRestult " + incomingBuffer.remaining()
+ + " " + buf.toString());
+ }
ByteBufferInputStream bbis = new ByteBufferInputStream(
incomingBuffer);
BinaryInputArchive bbia = BinaryInputArchive.getArchive(bbis);
@@ -1154,7 +1163,13 @@ public class ClientCnxn {
}
cleanup();
try {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Doing client selector close");
+ }
selector.close();
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Closed client selector");
+ }
} catch (IOException e) {
LOG.warn("Ignoring exception during selector close", e);
}
Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/ZooKeeper.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/ZooKeeper.java?rev=986575&r1=986574&r2=986575&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/ZooKeeper.java (original)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/ZooKeeper.java Wed Aug 18 06:24:08 2010
@@ -438,7 +438,7 @@ public class ZooKeeper {
LOG.info("Initiating client connection, connectString=" + connectString
+ " sessionTimeout=" + sessionTimeout
+ " watcher=" + watcher
- + " sessionId=" + sessionId
+ + " sessionId=" + Long.toHexString(sessionId)
+ " sessionPasswd="
+ (sessionPasswd == null ? "<null>" : "<hidden>"));
Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ConnectionBean.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ConnectionBean.java?rev=986575&r1=986574&r2=986575&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ConnectionBean.java (original)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ConnectionBean.java Wed Aug 18 06:24:08 2010
@@ -29,7 +29,6 @@ import javax.management.ObjectName;
import org.apache.log4j.Logger;
import org.apache.zookeeper.jmx.MBeanRegistry;
import org.apache.zookeeper.jmx.ZKMBeanInfo;
-import org.apache.zookeeper.server.NIOServerCnxn.CnxnStats;
/**
* Implementation of connection MBean interface.
@@ -38,7 +37,7 @@ public class ConnectionBean implements C
private static final Logger LOG = Logger.getLogger(ConnectionBean.class);
private final ServerCnxn connection;
- private final CnxnStats stats;
+ private final Stats stats;
private final ZooKeeperServer zk;
@@ -47,10 +46,10 @@ public class ConnectionBean implements C
public ConnectionBean(ServerCnxn connection,ZooKeeperServer zk){
this.connection = connection;
- this.stats = (CnxnStats)connection.getStats();
+ this.stats = connection;
this.zk = zk;
- InetSocketAddress sockAddr = connection.getRemoteAddress();
+ InetSocketAddress sockAddr = connection.getRemoteSocketAddress();
if (sockAddr == null) {
remoteIP = "Unknown";
} else {
@@ -69,7 +68,7 @@ public class ConnectionBean implements C
}
public String getSourceIP() {
- InetSocketAddress sockAddr = connection.getRemoteAddress();
+ InetSocketAddress sockAddr = connection.getRemoteSocketAddress();
if (sockAddr == null) {
return null;
}
@@ -88,7 +87,7 @@ public class ConnectionBean implements C
public String[] getEphemeralNodes() {
if(zk.getZKDatabase() !=null){
- String[] res= zk.getZKDatabase().getEphemerals(sessionId)
+ String[] res = zk.getZKDatabase().getEphemerals(sessionId)
.toArray(new String[0]);
Arrays.sort(res);
return res;
@@ -114,7 +113,7 @@ public class ConnectionBean implements C
}
public void resetCounters() {
- stats.reset();
+ stats.resetStats();
}
@Override
Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/FinalRequestProcessor.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/FinalRequestProcessor.java?rev=986575&r1=986574&r2=986575&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/FinalRequestProcessor.java (original)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/FinalRequestProcessor.java Wed Aug 18 06:24:08 2010
@@ -49,8 +49,6 @@ import org.apache.zookeeper.proto.SetWat
import org.apache.zookeeper.proto.SyncRequest;
import org.apache.zookeeper.proto.SyncResponse;
import org.apache.zookeeper.server.DataTree.ProcessTxnResult;
-import org.apache.zookeeper.server.NIOServerCnxn.CnxnStats;
-import org.apache.zookeeper.server.NIOServerCnxn.Factory;
import org.apache.zookeeper.server.ZooKeeperServer.ChangeRecord;
import org.apache.zookeeper.txn.CreateSessionTxn;
import org.apache.zookeeper.txn.ErrorTxn;
@@ -122,7 +120,7 @@ public class FinalRequestProcessor imple
}
if (request.hdr != null && request.hdr.getType() == OpCode.closeSession) {
- Factory scxn = zks.getServerCnxnFactory();
+ ServerCnxnFactory scxn = zks.getServerCnxnFactory();
// this might be possible since
// we might just be playing diffs from the leader
if (scxn != null && request.cnxn == null) {
@@ -164,8 +162,7 @@ public class FinalRequestProcessor imple
zks.serverStats().updateLatency(request.createTime);
lastOp = "PING";
- ((CnxnStats)cnxn.getStats())
- .updateForResponse(request.cxid, request.zxid, lastOp,
+ cnxn.updateStatsForResponse(request.cxid, request.zxid, lastOp,
request.createTime, System.currentTimeMillis());
cnxn.sendResponse(new ReplyHeader(-2,
@@ -176,11 +173,10 @@ public class FinalRequestProcessor imple
zks.serverStats().updateLatency(request.createTime);
lastOp = "SESS";
- ((CnxnStats)cnxn.getStats())
- .updateForResponse(request.cxid, request.zxid, lastOp,
+ cnxn.updateStatsForResponse(request.cxid, request.zxid, lastOp,
request.createTime, System.currentTimeMillis());
- cnxn.finishSessionInit(true);
+ zks.finishSessionInit(request.cnxn, true);
return;
}
case OpCode.create: {
@@ -359,8 +355,7 @@ public class FinalRequestProcessor imple
new ReplyHeader(request.cxid, request.zxid, err.intValue());
zks.serverStats().updateLatency(request.createTime);
- ((CnxnStats)cnxn.getStats())
- .updateForResponse(request.cxid, request.zxid, lastOp,
+ cnxn.updateStatsForResponse(request.cxid, request.zxid, lastOp,
request.createTime, System.currentTimeMillis());
try {
Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/NIOServerCnxn.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/NIOServerCnxn.java?rev=986575&r1=986574&r2=986575&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/NIOServerCnxn.java (original)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/NIOServerCnxn.java Wed Aug 18 06:24:08 2010
@@ -21,9 +21,7 @@ package org.apache.zookeeper.server;
import java.io.BufferedWriter;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
-import java.io.InputStream;
import java.io.PrintWriter;
-import java.io.StringWriter;
import java.io.Writer;
import java.lang.management.ManagementFactory;
import java.lang.management.OperatingSystemMXBean;
@@ -31,41 +29,24 @@ import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.CancelledKeyException;
-import java.nio.channels.Channel;
import java.nio.channels.SelectionKey;
-import java.nio.channels.Selector;
-import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Date;
-import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.atomic.AtomicLong;
import org.apache.jute.BinaryInputArchive;
import org.apache.jute.BinaryOutputArchive;
import org.apache.jute.Record;
import org.apache.log4j.Logger;
import org.apache.zookeeper.Environment;
-import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.Version;
import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.ZooDefs.OpCode;
import org.apache.zookeeper.data.Id;
-import org.apache.zookeeper.jmx.MBeanRegistry;
-import org.apache.zookeeper.proto.AuthPacket;
-import org.apache.zookeeper.proto.ConnectRequest;
-import org.apache.zookeeper.proto.ConnectResponse;
import org.apache.zookeeper.proto.ReplyHeader;
import org.apache.zookeeper.proto.RequestHeader;
import org.apache.zookeeper.proto.WatcherEvent;
-import org.apache.zookeeper.server.auth.AuthenticationProvider;
-import org.apache.zookeeper.server.auth.ProviderRegistry;
import org.apache.zookeeper.server.quorum.Leader;
import org.apache.zookeeper.server.quorum.LeaderZooKeeperServer;
@@ -75,321 +56,63 @@ import com.sun.management.UnixOperatingS
* This class handles communication with clients using NIO. There is one per
* client, but only one thread doing the communication.
*/
-public class NIOServerCnxn implements Watcher, ServerCnxn {
- private static final Logger LOG = Logger.getLogger(NIOServerCnxn.class);
+public class NIOServerCnxn extends ServerCnxn {
+ static final Logger LOG = Logger.getLogger(NIOServerCnxn.class);
- private ConnectionBean jmxConnectionBean;
+ NIOServerCnxnFactory factory;
- static public class Factory extends Thread {
- static {
- Thread.setDefaultUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
- public void uncaughtException(Thread t, Throwable e) {
- LOG.error("Thread " + t + " died", e);
- }
- });
- /**
- * this is to avoid the jvm bug:
- * NullPointerException in Selector.open()
- * http://bugs.sun.com/view_bug.do?bug_id=6427854
- */
- try {
- Selector.open().close();
- } catch(IOException ie) {
- LOG.error("Selector failed to open", ie);
- }
- }
-
- ZooKeeperServer zks;
-
- final ServerSocketChannel ss;
-
- final Selector selector = Selector.open();
-
- /**
- * We use this buffer to do efficient socket I/O. Since there is a single
- * sender thread per NIOServerCnxn instance, we can use a member variable to
- * only allocate it once.
- */
- final ByteBuffer directBuffer = ByteBuffer.allocateDirect(64 * 1024);
-
- final HashSet<NIOServerCnxn> cnxns = new HashSet<NIOServerCnxn>();
- final HashMap<InetAddress, Set<NIOServerCnxn>> ipMap =
- new HashMap<InetAddress, Set<NIOServerCnxn>>( );
-
- int outstandingLimit = 1;
-
- int maxClientCnxns = 10;
-
- /**
- * Construct a new server connection factory which will accept an unlimited number
- * of concurrent connections from each client (up to the file descriptor
- * limits of the operating system). startup(zks) must be called subsequently.
- * @param port
- * @throws IOException
- */
- public Factory(InetSocketAddress addr) throws IOException {
- this(addr, 0);
- }
-
-
- /**
- * Constructs a new server connection factory where the number of concurrent connections
- * from a single IP address is limited to maxcc (or unlimited if 0).
- * startup(zks) must be called subsequently.
- * @param port - the port to listen on for connections.
- * @param maxcc - the number of concurrent connections allowed from a single client.
- * @throws IOException
- */
- public Factory(InetSocketAddress addr, int maxcc) throws IOException {
- super("NIOServerCxn.Factory:" + addr);
- setDaemon(true);
- maxClientCnxns = maxcc;
- this.ss = ServerSocketChannel.open();
- ss.socket().setReuseAddress(true);
- LOG.info("binding to port " + addr);
- ss.socket().bind(addr);
- ss.configureBlocking(false);
- ss.register(selector, SelectionKey.OP_ACCEPT);
- }
+ SocketChannel sock;
- @Override
- public void start() {
- // ensure thread is started once and only once
- if (getState() == Thread.State.NEW) {
- super.start();
- }
- }
-
- public void startup(ZooKeeperServer zks) throws IOException,
- InterruptedException {
- start();
- zks.startdata();
- zks.startup();
- setZooKeeperServer(zks);
- }
-
- public void setZooKeeperServer(ZooKeeperServer zks) {
- this.zks = zks;
- if (zks != null) {
- this.outstandingLimit = zks.getGlobalOutstandingLimit();
- zks.setServerCnxnFactory(this);
- } else {
- this.outstandingLimit = 1;
- }
- }
-
- public ZooKeeperServer getZooKeeperServer() {
- return this.zks;
- }
-
- public InetSocketAddress getLocalAddress(){
- return (InetSocketAddress)ss.socket().getLocalSocketAddress();
- }
-
- public int getLocalPort(){
- return ss.socket().getLocalPort();
- }
-
- public int getMaxClientCnxns() {
- return maxClientCnxns;
- }
-
- private void addCnxn(NIOServerCnxn cnxn) {
- synchronized (cnxns) {
- cnxns.add(cnxn);
- synchronized (ipMap){
- InetAddress addr = cnxn.sock.socket().getInetAddress();
- Set<NIOServerCnxn> s = ipMap.get(addr);
- if (s == null) {
- // in general we will see 1 connection from each
- // host, setting the initial cap to 2 allows us
- // to minimize mem usage in the common case
- // of 1 entry -- we need to set the initial cap
- // to 2 to avoid rehash when the first entry is added
- s = new HashSet<NIOServerCnxn>(2);
- s.add(cnxn);
- ipMap.put(addr,s);
- } else {
- s.add(cnxn);
- }
- }
- }
- }
-
- protected NIOServerCnxn createConnection(SocketChannel sock,
- SelectionKey sk) throws IOException {
- return new NIOServerCnxn(zks, sock, sk, this);
- }
-
- private int getClientCnxnCount(InetAddress cl) {
- // The ipMap lock covers both the map, and its contents
- // (that is, the cnxn sets shouldn't be modified outside of
- // this lock)
- synchronized (ipMap) {
- Set<NIOServerCnxn> s = ipMap.get(cl);
- if (s == null) return 0;
- return s.size();
- }
- }
+ private SelectionKey sk;
- public void run() {
- while (!ss.socket().isClosed()) {
- try {
- selector.select(1000);
- Set<SelectionKey> selected;
- synchronized (this) {
- selected = selector.selectedKeys();
- }
- ArrayList<SelectionKey> selectedList = new ArrayList<SelectionKey>(
- selected);
- Collections.shuffle(selectedList);
- for (SelectionKey k : selectedList) {
- if ((k.readyOps() & SelectionKey.OP_ACCEPT) != 0) {
- SocketChannel sc = ((ServerSocketChannel) k
- .channel()).accept();
- InetAddress ia = sc.socket().getInetAddress();
- int cnxncount = getClientCnxnCount(ia);
- if (maxClientCnxns > 0 && cnxncount >= maxClientCnxns){
- LOG.warn("Too many connections from " + ia
- + " - max is " + maxClientCnxns );
- sc.close();
- } else {
- LOG.info("Accepted socket connection from "
- + sc.socket().getRemoteSocketAddress());
- sc.configureBlocking(false);
- SelectionKey sk = sc.register(selector,
- SelectionKey.OP_READ);
- NIOServerCnxn cnxn = createConnection(sc, sk);
- sk.attach(cnxn);
- addCnxn(cnxn);
- }
- } else if ((k.readyOps() & (SelectionKey.OP_READ | SelectionKey.OP_WRITE)) != 0) {
- NIOServerCnxn c = (NIOServerCnxn) k.attachment();
- c.doIO(k);
- } else {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Unexpected ops in select "
- + k.readyOps());
- }
- }
- }
- selected.clear();
- } catch (RuntimeException e) {
- LOG.warn("Ignoring unexpected runtime exception", e);
- } catch (Exception e) {
- LOG.warn("Ignoring exception", e);
- }
- }
- clear();
- LOG.info("NIOServerCnxn factory exited run method");
- }
+ boolean initialized;
- /**
- * Clear all the connections in the selector.
- *
- * You must first close ss (the serversocketchannel) if you wish
- * to block any new connections from being established.
- *
- */
- @SuppressWarnings("unchecked")
- synchronized public void clear() {
- selector.wakeup();
- HashSet<NIOServerCnxn> cnxns;
- synchronized (this.cnxns) {
- cnxns = (HashSet<NIOServerCnxn>)this.cnxns.clone();
- }
- // got to clear all the connections that we have in the selector
- for (NIOServerCnxn cnxn: cnxns) {
- try {
- // don't hold this.cnxns lock as deadlock may occur
- cnxn.close();
- } catch (Exception e) {
- LOG.warn("Ignoring exception closing cnxn sessionid 0x"
- + Long.toHexString(cnxn.sessionId), e);
- }
- }
- }
+ ByteBuffer lenBuffer = ByteBuffer.allocate(4);
- public void shutdown() {
- try {
- ss.close();
- clear();
- this.interrupt();
- this.join();
- } catch (InterruptedException e) {
- LOG.warn("Ignoring interrupted exception during shutdown", e);
- } catch (Exception e) {
- LOG.warn("Ignoring unexpected exception during shutdown", e);
- }
- try {
- selector.close();
- } catch (IOException e) {
- LOG.warn("Selector closing", e);
- }
- if (zks != null) {
- zks.shutdown();
- }
- }
+ ByteBuffer incomingBuffer = lenBuffer;
- synchronized void closeSession(long sessionId) {
- selector.wakeup();
- closeSessionWithoutWakeup(sessionId);
- }
+ LinkedBlockingQueue<ByteBuffer> outgoingBuffers = new LinkedBlockingQueue<ByteBuffer>();
- @SuppressWarnings("unchecked")
- private void closeSessionWithoutWakeup(long sessionId) {
- HashSet<NIOServerCnxn> cnxns;
- synchronized (this.cnxns) {
- cnxns = (HashSet<NIOServerCnxn>)this.cnxns.clone();
- }
+ int sessionTimeout;
- for (NIOServerCnxn cnxn : cnxns) {
- if (cnxn.sessionId == sessionId) {
- try {
- cnxn.close();
- } catch (Exception e) {
- LOG.warn("exception during session close", e);
- }
- break;
- }
- }
- }
- }
+ private final ZooKeeperServer zkServer;
/**
- * The buffer will cause the connection to be close when we do a send.
+ * The number of requests that have been submitted but not yet responded to.
*/
- static final ByteBuffer closeConn = ByteBuffer.allocate(0);
-
- final Factory factory;
+ int outstandingRequests;
- /** The ZooKeeperServer for this connection. May be null if the server
- * is not currently serving requests (for example if the server is not
- * an active quorum participant.
+ /**
+ * This is the id that uniquely identifies the session of a client. Once
+ * this session is no longer active, the ephemeral nodes will go away.
*/
- private final ZooKeeperServer zk;
-
- private SocketChannel sock;
-
- private SelectionKey sk;
-
- boolean initialized;
-
- ByteBuffer lenBuffer = ByteBuffer.allocate(4);
-
- ByteBuffer incomingBuffer = lenBuffer;
-
- LinkedBlockingQueue<ByteBuffer> outgoingBuffers = new LinkedBlockingQueue<ByteBuffer>();
+ long sessionId;
- int sessionTimeout;
+ static long nextSessionId = 1;
+ int outstandingLimit = 1;
- ArrayList<Id> authInfo = new ArrayList<Id>();
+ public NIOServerCnxn(ZooKeeperServer zk, SocketChannel sock,
+ SelectionKey sk, NIOServerCnxnFactory factory) throws IOException {
+ this.zkServer = zk;
+ this.sock = sock;
+ this.sk = sk;
+ this.factory = factory;
+ if (zk != null) {
+ outstandingLimit = zk.getGlobalOutstandingLimit();
+ }
+ sock.socket().setTcpNoDelay(true);
+ sock.socket().setSoLinger(true, 2);
+ InetAddress addr = ((InetSocketAddress) sock.socket()
+ .getRemoteSocketAddress()).getAddress();
+ authInfo.add(new Id("ip", addr.getHostAddress()));
+ sk.interestOps(SelectionKey.OP_READ);
+ }
/* Send close connection packet to the client, doIO will eventually
* close the underlying machinery (like socket, selectorkey, etc...)
*/
public void sendCloseSession() {
- sendBuffer(closeConn);
+ sendBuffer(ServerCnxnFactory.closeConn);
}
/**
@@ -404,7 +127,7 @@ public class NIOServerCnxn implements Wa
* a tight while loop
*/
sock.configureBlocking(true);
- if (bb != closeConn) {
+ if (bb != ServerCnxnFactory.closeConn) {
if (sock != null) {
sock.write(bb);
}
@@ -415,9 +138,9 @@ public class NIOServerCnxn implements Wa
}
}
- void sendBuffer(ByteBuffer bb) {
+ public void sendBuffer(ByteBuffer bb) {
try {
- if (bb != closeConn) {
+ if (bb != ServerCnxnFactory.closeConn) {
// We check if write interest here because if it is NOT set,
// nothing is queued, so we can try to send the buffer right
// away without waking up the selector
@@ -452,27 +175,7 @@ public class NIOServerCnxn implements Wa
}
}
- private static class CloseRequestException extends IOException {
- private static final long serialVersionUID = -7854505709816442681L;
-
- public CloseRequestException(String msg) {
- super(msg);
- }
- }
-
- private static class EndOfStreamException extends IOException {
- private static final long serialVersionUID = -8255690282104294178L;
-
- public EndOfStreamException(String msg) {
- super(msg);
- }
-
- public String toString() {
- return "EndOfStreamException: " + getMessage();
- }
- }
-
- /** Read the request payload (everything followng the length prefix) */
+ /** Read the request payload (everything following the length prefix) */
private void readPayload() throws IOException, InterruptedException {
if (incomingBuffer.remaining() != 0) { // have we read length bytes?
int rc = sock.read(incomingBuffer); // sock is non-blocking, so ok
@@ -589,7 +292,7 @@ public class NIOServerCnxn implements Wa
// Remove the buffers that we have sent
while (outgoingBuffers.size() > 0) {
bb = outgoingBuffers.peek();
- if (bb == closeConn) {
+ if (bb == ServerCnxnFactory.closeConn) {
throw new CloseRequestException("close requested");
}
int left = bb.remaining() - sent;
@@ -653,61 +356,19 @@ public class NIOServerCnxn implements Wa
}
private void readRequest() throws IOException {
- // We have the request, now process and setup for next
- InputStream bais = new ByteBufferInputStream(incomingBuffer);
- BinaryInputArchive bia = BinaryInputArchive.getArchive(bais);
- RequestHeader h = new RequestHeader();
- h.deserialize(bia, "header");
- // Through the magic of byte buffers, txn will not be
- // pointing
- // to the start of the txn
- incomingBuffer = incomingBuffer.slice();
- if (h.getType() == OpCode.auth) {
- AuthPacket authPacket = new AuthPacket();
- ZooKeeperServer.byteBuffer2Record(incomingBuffer, authPacket);
- String scheme = authPacket.getScheme();
- AuthenticationProvider ap = ProviderRegistry.getProvider(scheme);
- if (ap == null
- || (ap.handleAuthentication(this, authPacket.getAuth())
- != KeeperException.Code.OK)) {
- if (ap == null) {
- LOG.warn("No authentication provider for scheme: "
- + scheme + " has "
- + ProviderRegistry.listProviders());
- } else {
- LOG.warn("Authentication failed for scheme: " + scheme);
- }
- // send a response...
- ReplyHeader rh = new ReplyHeader(h.getXid(), 0,
- KeeperException.Code.AUTHFAILED.intValue());
- sendResponse(rh, null, null);
- // ... and close connection
- sendCloseSession();
- disableRecv();
- } else {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Authentication succeeded for scheme: "
- + scheme);
- }
- ReplyHeader rh = new ReplyHeader(h.getXid(), 0,
- KeeperException.Code.OK.intValue());
- sendResponse(rh, null, null);
- }
- return;
- } else {
- Request si = new Request(this, sessionId, h.getXid(), h.getType(), incomingBuffer, authInfo);
- si.setOwner(ServerCnxn.me);
- zk.submitRequest(si);
- }
+ zkServer.processPacket(this, incomingBuffer);
+ }
+
+ protected void incrOutstandingRequests(RequestHeader h) {
if (h.getXid() >= 0) {
synchronized (this) {
outstandingRequests++;
}
synchronized (this.factory) {
// check throttling
- if (zk.getInProcess() > factory.outstandingLimit) {
+ if (zkServer.getInProcess() > outstandingLimit) {
if (LOG.isDebugEnabled()) {
- LOG.debug("Throttling recv " + zk.getInProcess());
+ LOG.debug("Throttling recv " + zkServer.getInProcess());
}
disableRecv();
// following lines should not be needed since we are
@@ -717,6 +378,7 @@ public class NIOServerCnxn implements Wa
}
}
}
+
}
public void disableRecv() {
@@ -724,210 +386,25 @@ public class NIOServerCnxn implements Wa
}
public void enableRecv() {
- if (sk.isValid()) {
- int interest = sk.interestOps();
- if ((interest & SelectionKey.OP_READ) == 0) {
- sk.interestOps(interest | SelectionKey.OP_READ);
+ synchronized (this.factory) {
+ sk.selector().wakeup();
+ if (sk.isValid()) {
+ int interest = sk.interestOps();
+ if ((interest & SelectionKey.OP_READ) == 0) {
+ sk.interestOps(interest | SelectionKey.OP_READ);
+ }
}
}
}
private void readConnectRequest() throws IOException, InterruptedException {
- BinaryInputArchive bia = BinaryInputArchive
- .getArchive(new ByteBufferInputStream(incomingBuffer));
- ConnectRequest connReq = new ConnectRequest();
- connReq.deserialize(bia, "connect");
- if (LOG.isDebugEnabled()) {
- LOG.debug("Session establishment request from client "
- + sock.socket().getRemoteSocketAddress()
- + " client's lastZxid is 0x"
- + Long.toHexString(connReq.getLastZxidSeen()));
- }
- if (zk == null) {
+ if (zkServer == null) {
throw new IOException("ZooKeeperServer not running");
}
- if (connReq.getLastZxidSeen() > zk.getZKDatabase().getDataTreeLastProcessedZxid()) {
- String msg = "Refusing session request for client "
- + sock.socket().getRemoteSocketAddress()
- + " as it has seen zxid 0x"
- + Long.toHexString(connReq.getLastZxidSeen())
- + " our last zxid is 0x"
- + Long.toHexString(zk.getZKDatabase().getDataTreeLastProcessedZxid())
- + " client must try another server";
-
- LOG.info(msg);
- throw new CloseRequestException(msg);
- }
- sessionTimeout = connReq.getTimeOut();
- byte passwd[] = connReq.getPasswd();
- int minSessionTimeout = zk.getMinSessionTimeout();
- if (sessionTimeout < minSessionTimeout) {
- sessionTimeout = minSessionTimeout;
- }
- int maxSessionTimeout = zk.getMaxSessionTimeout();
- if (sessionTimeout > maxSessionTimeout) {
- sessionTimeout = maxSessionTimeout;
- }
- // We don't want to receive any packets until we are sure that the
- // session is setup
- disableRecv();
- if (connReq.getSessionId() != 0) {
- long clientSessionId = connReq.getSessionId();
- LOG.info("Client attempting to renew session 0x"
- + Long.toHexString(clientSessionId)
- + " at " + sock.socket().getRemoteSocketAddress());
- factory.closeSessionWithoutWakeup(clientSessionId);
- setSessionId(clientSessionId);
- zk.reopenSession(this, sessionId, passwd, sessionTimeout);
- } else {
- LOG.info("Client attempting to establish new session at "
- + sock.socket().getRemoteSocketAddress());
- zk.createSession(this, passwd, sessionTimeout);
- }
+ zkServer.processConnectRequest(this, incomingBuffer);
initialized = true;
}
- private void packetReceived() {
- stats.incrPacketsReceived();
- if (zk != null) {
- zk.serverStats().incrementPacketsReceived();
- }
- }
-
- private void packetSent() {
- stats.incrPacketsSent();
- if (zk != null) {
- zk.serverStats().incrementPacketsSent();
- }
- }
-
- /*
- * See <a href="{@docRoot}/../../../docs/zookeeperAdmin.html#sc_zkCommands">
- * Zk Admin</a>. this link is for all the commands.
- */
- private final static int confCmd =
- ByteBuffer.wrap("conf".getBytes()).getInt();
-
- /*
- * See <a href="{@docRoot}/../../../docs/zookeeperAdmin.html#sc_zkCommands">
- * Zk Admin</a>. this link is for all the commands.
- */
- private final static int consCmd =
- ByteBuffer.wrap("cons".getBytes()).getInt();
-
- /*
- * See <a href="{@docRoot}/../../../docs/zookeeperAdmin.html#sc_zkCommands">
- * Zk Admin</a>. this link is for all the commands.
- */
- private final static int crstCmd =
- ByteBuffer.wrap("crst".getBytes()).getInt();
-
- /*
- * See <a href="{@docRoot}/../../../docs/zookeeperAdmin.html#sc_zkCommands">
- * Zk Admin</a>. this link is for all the commands.
- */
- private final static int dumpCmd =
- ByteBuffer.wrap("dump".getBytes()).getInt();
-
- /*
- * See <a href="{@docRoot}/../../../docs/zookeeperAdmin.html#sc_zkCommands">
- * Zk Admin</a>. this link is for all the commands.
- */
- private final static int enviCmd =
- ByteBuffer.wrap("envi".getBytes()).getInt();
-
- /*
- * See <a href="{@docRoot}/../../../docs/zookeeperAdmin.html#sc_zkCommands">
- * Zk Admin</a>. this link is for all the commands.
- */
- private final static int getTraceMaskCmd =
- ByteBuffer.wrap("gtmk".getBytes()).getInt();
-
- /*
- * See <a href="{@docRoot}/../../../docs/zookeeperAdmin.html#sc_zkCommands">
- * Zk Admin</a>. this link is for all the commands.
- */
- private final static int ruokCmd =
- ByteBuffer.wrap("ruok".getBytes()).getInt();
- /*
- * See <a href="{@docRoot}/../../../docs/zookeeperAdmin.html#sc_zkCommands">
- * Zk Admin</a>. this link is for all the commands.
- */
- private final static int setTraceMaskCmd =
- ByteBuffer.wrap("stmk".getBytes()).getInt();
-
- /*
- * See <a href="{@docRoot}/../../../docs/zookeeperAdmin.html#sc_zkCommands">
- * Zk Admin</a>. this link is for all the commands.
- */
- private final static int srvrCmd =
- ByteBuffer.wrap("srvr".getBytes()).getInt();
-
- /*
- * See <a href="{@docRoot}/../../../docs/zookeeperAdmin.html#sc_zkCommands">
- * Zk Admin</a>. this link is for all the commands.
- */
- private final static int srstCmd =
- ByteBuffer.wrap("srst".getBytes()).getInt();
-
- /*
- * See <a href="{@docRoot}/../../../docs/zookeeperAdmin.html#sc_zkCommands">
- * Zk Admin</a>. this link is for all the commands.
- */
- private final static int statCmd =
- ByteBuffer.wrap("stat".getBytes()).getInt();
-
- /*
- * See <a href="{@docRoot}/../../../docs/zookeeperAdmin.html#sc_zkCommands">
- * Zk Admin</a>. this link is for all the commands.
- */
- private final static int wchcCmd =
- ByteBuffer.wrap("wchc".getBytes()).getInt();
-
- /*
- * See <a href="{@docRoot}/../../../docs/zookeeperAdmin.html#sc_zkCommands">
- * Zk Admin</a>. this link is for all the commands.
- */
- private final static int wchpCmd =
- ByteBuffer.wrap("wchp".getBytes()).getInt();
-
- /*
- * See <a href="{@docRoot}/../../../docs/zookeeperAdmin.html#sc_zkCommands">
- * Zk Admin</a>. this link is for all the commands.
- */
- private final static int wchsCmd =
- ByteBuffer.wrap("wchs".getBytes()).getInt();
-
- /*
- * See <a href="{@docRoot}/../../../docs/zookeeperAdmin.html#sc_zkCommands">
- * Zk Admin</a>. this link is for all the commands.
- */
- private final static int mntrCmd = ByteBuffer.wrap("mntr".getBytes())
- .getInt();
-
-
- private final static HashMap<Integer, String> cmd2String =
- new HashMap<Integer, String>();
-
- // specify all of the commands that are available
- static {
- cmd2String.put(confCmd, "conf");
- cmd2String.put(consCmd, "cons");
- cmd2String.put(crstCmd, "crst");
- cmd2String.put(dumpCmd, "dump");
- cmd2String.put(enviCmd, "envi");
- cmd2String.put(getTraceMaskCmd, "gtmk");
- cmd2String.put(ruokCmd, "ruok");
- cmd2String.put(setTraceMaskCmd, "stmk");
- cmd2String.put(srstCmd, "srst");
- cmd2String.put(srvrCmd, "srvr");
- cmd2String.put(statCmd, "stat");
- cmd2String.put(wchcCmd, "wchc");
- cmd2String.put(wchpCmd, "wchp");
- cmd2String.put(wchsCmd, "wchs");
- cmd2String.put(mntrCmd, "mntr");
- }
-
/**
* clean up the socket related to a command and also make sure we flush the
* data before we do that
@@ -1084,10 +561,10 @@ public class NIOServerCnxn implements Wa
@Override
public void commandRun() {
- if (zk == null) {
+ if (zkServer == null) {
pw.println(ZK_NOT_SERVING);
} else {
- zk.dumpConf(pw);
+ zkServer.dumpConf(pw);
}
}
}
@@ -1099,11 +576,11 @@ public class NIOServerCnxn implements Wa
@Override
public void commandRun() {
- if (zk == null) {
+ if (zkServer == null) {
pw.println(ZK_NOT_SERVING);
}
else {
- zk.serverStats().reset();
+ zkServer.serverStats().reset();
pw.println("Server stats reset.");
}
}
@@ -1116,12 +593,12 @@ public class NIOServerCnxn implements Wa
@Override
public void commandRun() {
- if (zk == null) {
+ if (zkServer == null) {
pw.println(ZK_NOT_SERVING);
} else {
synchronized(factory.cnxns){
- for(NIOServerCnxn c : factory.cnxns){
- c.getStats().reset();
+ for(ServerCnxn c : factory.cnxns){
+ c.resetStats();
}
}
pw.println("Connection stats reset.");
@@ -1136,14 +613,14 @@ public class NIOServerCnxn implements Wa
@Override
public void commandRun() {
- if (zk == null) {
+ if (zkServer == null) {
pw.println(ZK_NOT_SERVING);
}
else {
pw.println("SessionTracker dump:");
- zk.sessionTracker.dumpSessions(pw);
+ zkServer.sessionTracker.dumpSessions(pw);
pw.println("ephemeral nodes dump:");
- zk.dumpEphemerals(pw);
+ zkServer.dumpEphemerals(pw);
}
}
}
@@ -1158,7 +635,7 @@ public class NIOServerCnxn implements Wa
@SuppressWarnings("unchecked")
@Override
public void commandRun() {
- if (zk == null) {
+ if (zkServer == null) {
pw.println(ZK_NOT_SERVING);
}
else {
@@ -1175,14 +652,13 @@ public class NIOServerCnxn implements Wa
.cnxns.clone();
}
for(NIOServerCnxn c : cnxnset){
- ((CnxnStats)c.getStats())
- .dumpConnectionInfo(pw, true);
+ c.dumpConnectionInfo(pw, true);
}
pw.println();
}
- pw.print(zk.serverStats().toString());
+ pw.print(zkServer.serverStats().toString());
pw.print("Node count: ");
- pw.println(zk.getZKDatabase().getNodeCount());
+ pw.println(zkServer.getZKDatabase().getNodeCount());
}
}
@@ -1196,7 +672,7 @@ public class NIOServerCnxn implements Wa
@SuppressWarnings("unchecked")
@Override
public void commandRun() {
- if (zk == null) {
+ if (zkServer == null) {
pw.println(ZK_NOT_SERVING);
} else {
// clone should be faster than iteration
@@ -1206,7 +682,7 @@ public class NIOServerCnxn implements Wa
cnxns = (HashSet<NIOServerCnxn>) factory.cnxns.clone();
}
for (NIOServerCnxn c : cnxns) {
- ((CnxnStats) c.getStats()).dumpConnectionInfo(pw, false);
+ c.dumpConnectionInfo(pw, false);
}
pw.println();
}
@@ -1222,10 +698,10 @@ public class NIOServerCnxn implements Wa
@Override
public void commandRun() {
- if (zk == null) {
+ if (zkServer == null) {
pw.println(ZK_NOT_SERVING);
} else {
- DataTree dt = zk.getZKDatabase().getDataTree();
+ DataTree dt = zkServer.getZKDatabase().getDataTree();
if (len == wchsCmd) {
dt.dumpWatchesSummary(pw);
} else if (len == wchpCmd) {
@@ -1246,12 +722,12 @@ public class NIOServerCnxn implements Wa
@Override
public void commandRun() {
- if(zk == null) {
+ if(zkServer == null) {
pw.println(ZK_NOT_SERVING);
return;
}
- ZKDatabase zkdb = zk.getZKDatabase();
- ServerStats stats = zk.serverStats();
+ ZKDatabase zkdb = zkServer.getZKDatabase();
+ ServerStats stats = zkServer.serverStats();
print("version", Version.getFullVersion());
@@ -1280,7 +756,7 @@ public class NIOServerCnxn implements Wa
}
if(stats.getServerState() == "leader") {
- Leader leader = ((LeaderZooKeeperServer)zk).getLeader();
+ Leader leader = ((LeaderZooKeeperServer)zkServer).getLeader();
print("followers", leader.learners.size());
print("synced_followers", leader.forwardingFollowers.size());
@@ -1405,23 +881,26 @@ public class NIOServerCnxn implements Wa
private boolean readLength(SelectionKey k) throws IOException {
// Read the length, now get the buffer
int len = lenBuffer.getInt();
- if (!initialized && checkFourLetterWord(k, len)) {
+ if (!initialized && checkFourLetterWord(sk, len)) {
return false;
}
if (len < 0 || len > BinaryInputArchive.maxBuffer) {
throw new IOException("Len error " + len);
}
- if (zk == null) {
+ if (zkServer == null) {
throw new IOException("ZooKeeperServer not running");
}
incomingBuffer = ByteBuffer.allocate(len);
return true;
}
- /**
- * The number of requests that have been submitted but not yet responded to.
- */
- int outstandingRequests;
+ public long getOutstandingRequests() {
+ synchronized (this) {
+ synchronized (this.factory) {
+ return outstandingRequests;
+ }
+ }
+ }
/*
* (non-Javadoc)
@@ -1432,28 +911,6 @@ public class NIOServerCnxn implements Wa
return sessionTimeout;
}
- /**
- * This is the id that uniquely identifies the session of a client. Once
- * this session is no longer active, the ephemeral nodes will go away.
- */
- long sessionId;
-
- static long nextSessionId = 1;
-
- public NIOServerCnxn(ZooKeeperServer zk, SocketChannel sock,
- SelectionKey sk, Factory factory) throws IOException {
- this.zk = zk;
- this.sock = sock;
- this.sk = sk;
- this.factory = factory;
- sock.socket().setTcpNoDelay(true);
- sock.socket().setSoLinger(true, 2);
- InetAddress addr = ((InetSocketAddress) sock.socket()
- .getRemoteSocketAddress()).getAddress();
- authInfo.add(new Id("ip", addr.getHostAddress()));
- sk.interestOps(SelectionKey.OP_READ);
- }
-
@Override
public String toString() {
return "NIOServerCnxn object with sock = " + sock + " and sk = " + sk;
@@ -1464,6 +921,7 @@ public class NIOServerCnxn implements Wa
*
* This function returns immediately if the cnxn is not on the cnxns list.
*/
+ @Override
public void close() {
synchronized(factory.cnxns){
// if this is not in cnxns then it's already closed
@@ -1477,18 +935,10 @@ public class NIOServerCnxn implements Wa
s.remove(this);
}
- // unregister from JMX
- try {
- if(jmxConnectionBean != null){
- MBeanRegistry.getInstance().unregister(jmxConnectionBean);
- }
- } catch (Exception e) {
- LOG.warn("Failed to unregister with JMX", e);
- }
- jmxConnectionBean = null;
-
- if (zk != null) {
- zk.removeCnxn(this);
+ factory.unregisterConnection(this);
+
+ if (zkServer != null) {
+ zkServer.removeCnxn(this);
}
closeSock();
@@ -1570,6 +1020,7 @@ public class NIOServerCnxn implements Wa
* @see org.apache.zookeeper.server.ServerCnxnIface#sendResponse(org.apache.zookeeper.proto.ReplyHeader,
* org.apache.jute.Record, java.lang.String)
*/
+ @Override
synchronized public void sendResponse(ReplyHeader h, Record r, String tag) {
try {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
@@ -1595,7 +1046,7 @@ public class NIOServerCnxn implements Wa
}
// check throttling
synchronized (this.factory) {
- if (zk.getInProcess() < factory.outstandingLimit
+ if (zkServer.getInProcess() < outstandingLimit
|| outstandingRequests < 1) {
sk.selector().wakeup();
enableRecv();
@@ -1612,6 +1063,7 @@ public class NIOServerCnxn implements Wa
*
* @see org.apache.zookeeper.server.ServerCnxnIface#process(org.apache.zookeeper.proto.WatcherEvent)
*/
+ @Override
synchronized public void process(WatchedEvent event) {
ReplyHeader h = new ReplyHeader(-1, -1L, 0);
if (LOG.isTraceEnabled()) {
@@ -1627,274 +1079,45 @@ public class NIOServerCnxn implements Wa
sendResponse(h, e, "notification");
}
- public void finishSessionInit(boolean valid) {
- // register with JMX
- try {
- jmxConnectionBean = new ConnectionBean(this, zk);
- MBeanRegistry.getInstance().register(jmxConnectionBean, zk.jmxServerBean);
- } catch (Exception e) {
- LOG.warn("Failed to register with JMX", e);
- jmxConnectionBean = null;
- }
-
- try {
- ConnectResponse rsp = new ConnectResponse(0, valid ? sessionTimeout
- : 0, valid ? sessionId : 0, // send 0 if session is no
- // longer valid
- valid ? zk.generatePasswd(sessionId) : new byte[16]);
- ByteArrayOutputStream baos = new ByteArrayOutputStream();
- BinaryOutputArchive bos = BinaryOutputArchive.getArchive(baos);
- bos.writeInt(-1, "len");
- rsp.serialize(bos, "connect");
- baos.close();
- ByteBuffer bb = ByteBuffer.wrap(baos.toByteArray());
- bb.putInt(bb.remaining() - 4).rewind();
- sendBuffer(bb);
-
- if (!valid) {
- LOG.info("Invalid session 0x"
- + Long.toHexString(sessionId)
- + " for client "
- + sock.socket().getRemoteSocketAddress()
- + ", probably expired");
- sendCloseSession();
- } else {
- LOG.info("Established session 0x"
- + Long.toHexString(sessionId)
- + " with negotiated timeout " + sessionTimeout
- + " for client "
- + sock.socket().getRemoteSocketAddress());
- }
-
- // Now that the session is ready we can start receiving packets
- synchronized (this.factory) {
- sk.selector().wakeup();
- enableRecv();
- }
- } catch (Exception e) {
- LOG.warn("Exception while establishing session, closing", e);
- close();
- }
- }
-
/*
* (non-Javadoc)
*
* @see org.apache.zookeeper.server.ServerCnxnIface#getSessionId()
*/
+ @Override
public long getSessionId() {
return sessionId;
}
+ @Override
public void setSessionId(long sessionId) {
this.sessionId = sessionId;
}
- public ArrayList<Id> getAuthInfo() {
- return authInfo;
+ @Override
+ public void setSessionTimeout(int sessionTimeout) {
+ this.sessionTimeout = sessionTimeout;
+ }
+
+ @Override
+ public int getInterestOps() {
+ return sk.isValid() ? sk.interestOps() : 0;
}
- public synchronized InetSocketAddress getRemoteAddress() {
+ @Override
+ public InetSocketAddress getRemoteSocketAddress() {
if (sock == null) {
return null;
}
return (InetSocketAddress) sock.socket().getRemoteSocketAddress();
}
- class CnxnStats implements ServerCnxn.Stats {
- private final Date established = new Date();
-
- private final AtomicLong packetsReceived = new AtomicLong();
- private final AtomicLong packetsSent = new AtomicLong();
-
- private long minLatency;
- private long maxLatency;
- private String lastOp;
- private long lastCxid;
- private long lastZxid;
- private long lastResponseTime;
- private long lastLatency;
-
- private long count;
- private long totalLatency;
-
- CnxnStats() {
- reset();
- }
-
- public synchronized void reset() {
- packetsReceived.set(0);
- packetsSent.set(0);
- minLatency = Long.MAX_VALUE;
- maxLatency = 0;
- lastOp = "NA";
- lastCxid = -1;
- lastZxid = -1;
- lastResponseTime = 0;
- lastLatency = 0;
-
- count = 0;
- totalLatency = 0;
- }
-
- long incrPacketsReceived() {
- return packetsReceived.incrementAndGet();
- }
-
- long incrPacketsSent() {
- return packetsSent.incrementAndGet();
- }
-
- synchronized void updateForResponse(long cxid, long zxid, String op,
- long start, long end)
- {
- // don't overwrite with "special" xids - we're interested
- // in the clients last real operation
- if (cxid >= 0) {
- lastCxid = cxid;
- }
- lastZxid = zxid;
- lastOp = op;
- lastResponseTime = end;
- long elapsed = end - start;
- lastLatency = elapsed;
- if (elapsed < minLatency) {
- minLatency = elapsed;
- }
- if (elapsed > maxLatency) {
- maxLatency = elapsed;
- }
- count++;
- totalLatency += elapsed;
- }
-
- public Date getEstablished() {
- return established;
- }
-
- public long getOutstandingRequests() {
- synchronized (NIOServerCnxn.this) {
- synchronized (NIOServerCnxn.this.factory) {
- return outstandingRequests;
- }
- }
- }
-
- public long getPacketsReceived() {
- return packetsReceived.longValue();
- }
-
- public long getPacketsSent() {
- return packetsSent.longValue();
- }
-
- public synchronized long getMinLatency() {
- return minLatency == Long.MAX_VALUE ? 0 : minLatency;
- }
-
- public synchronized long getAvgLatency() {
- return count == 0 ? 0 : totalLatency / count;
- }
-
- public synchronized long getMaxLatency() {
- return maxLatency;
- }
-
- public synchronized String getLastOperation() {
- return lastOp;
- }
-
- public synchronized long getLastCxid() {
- return lastCxid;
- }
-
- public synchronized long getLastZxid() {
- return lastZxid;
- }
-
- public synchronized long getLastResponseTime() {
- return lastResponseTime;
- }
-
- public synchronized long getLastLatency() {
- return lastLatency;
- }
-
- /**
- * Prints detailed stats information for the connection.
- *
- * @see dumpConnectionInfo(PrintWriter, boolean) for brief stats
- */
- @Override
- public String toString() {
- StringWriter sw = new StringWriter();
- PrintWriter pwriter = new PrintWriter(sw);
- dumpConnectionInfo(pwriter, false);
- pwriter.flush();
- pwriter.close();
- return sw.toString();
- }
-
- /**
- * Print information about the connection.
- * @param brief iff true prints brief details, otw full detail
- * @return information about this connection
- */
- public synchronized void
- dumpConnectionInfo(PrintWriter pwriter, boolean brief)
- {
- Channel channel = sk.channel();
- if (channel instanceof SocketChannel) {
- pwriter.print(" ");
- pwriter.print(((SocketChannel)channel).socket()
- .getRemoteSocketAddress());
- pwriter.print("[");
- pwriter.print(sk.isValid() ? Integer.toHexString(sk.interestOps())
- : "0");
- pwriter.print("](queued=");
- pwriter.print(getOutstandingRequests());
- pwriter.print(",recved=");
- pwriter.print(getPacketsReceived());
- pwriter.print(",sent=");
- pwriter.print(getPacketsSent());
-
- if (!brief) {
- long sessionId = getSessionId();
- if (sessionId != 0) {
- pwriter.print(",sid=0x");
- pwriter.print(Long.toHexString(sessionId));
- pwriter.print(",lop=");
- pwriter.print(getLastOperation());
- pwriter.print(",est=");
- pwriter.print(getEstablished().getTime());
- pwriter.print(",to=");
- pwriter.print(getSessionTimeout());
- long lastCxid = getLastCxid();
- if (lastCxid >= 0) {
- pwriter.print(",lcxid=0x");
- pwriter.print(Long.toHexString(lastCxid));
- }
- pwriter.print(",lzxid=0x");
- pwriter.print(Long.toHexString(getLastZxid()));
- pwriter.print(",lresp=");
- pwriter.print(getLastResponseTime());
- pwriter.print(",llat=");
- pwriter.print(getLastLatency());
- pwriter.print(",minlat=");
- pwriter.print(getMinLatency());
- pwriter.print(",avglat=");
- pwriter.print(getAvgLatency());
- pwriter.print(",maxlat=");
- pwriter.print(getMaxLatency());
- }
- }
- pwriter.println(")");
- }
+ @Override
+ protected ServerStats serverStats() {
+ if (zkServer == null) {
+ return null;
}
+ return zkServer.serverStats();
}
- private final CnxnStats stats = new CnxnStats();
- public Stats getStats() {
- return stats;
- }
}