You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by ph...@apache.org on 2010/08/18 08:24:09 UTC
svn commit: r986575 [2/4] - in /hadoop/zookeeper/trunk: ./
src/docs/src/documentation/content/xdocs/ src/java/libtest/
src/java/main/org/apache/zookeeper/
src/java/main/org/apache/zookeeper/server/
src/java/main/org/apache/zookeeper/server/auth/ src/ja...
Added: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/NIOServerCnxnFactory.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/NIOServerCnxnFactory.java?rev=986575&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/NIOServerCnxnFactory.java (added)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/NIOServerCnxnFactory.java Wed Aug 18 06:24:08 2010
@@ -0,0 +1,306 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+import java.nio.channels.ServerSocketChannel;
+import java.nio.channels.SocketChannel;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.log4j.Logger;
+
+public class NIOServerCnxnFactory extends ServerCnxnFactory implements Runnable {
+ private static final Logger LOG = Logger.getLogger(NIOServerCnxnFactory.class);
+
+ static {
+ Thread.setDefaultUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
+ public void uncaughtException(Thread t, Throwable e) {
+ LOG.error("Thread " + t + " died", e);
+ }
+ });
+ /**
+ * this is to avoid the jvm bug:
+ * NullPointerException in Selector.open()
+ * http://bugs.sun.com/view_bug.do?bug_id=6427854
+ */
+ try {
+ Selector.open().close();
+ } catch(IOException ie) {
+ LOG.error("Selector failed to open", ie);
+ }
+ }
+
+ ServerSocketChannel ss;
+
+ final Selector selector = Selector.open();
+
+ /**
+ * We use this buffer to do efficient socket I/O. Since there is a single
+ * sender thread per NIOServerCnxn instance, we can use a member variable to
+ * only allocate it once.
+ */
+ final ByteBuffer directBuffer = ByteBuffer.allocateDirect(64 * 1024);
+
+ final HashSet<ServerCnxn> cnxns = new HashSet<ServerCnxn>();
+ final HashMap<InetAddress, Set<NIOServerCnxn>> ipMap =
+ new HashMap<InetAddress, Set<NIOServerCnxn>>( );
+
+ int maxClientCnxns = 10;
+
+
+ /**
+ * Construct a new server connection factory which will accept an unlimited number
+ * of concurrent connections from each client (up to the file descriptor
+ * limits of the operating system). startup(zks) must be called subsequently.
+ * @throws IOException
+ */
+ public NIOServerCnxnFactory() throws IOException {
+ }
+
+ Thread thread;
+ @Override
+ public void configure(InetSocketAddress addr, int maxcc) throws IOException {
+ thread = new Thread(this, "NIOServerCxn.Factory:" + addr);
+ thread.setDaemon(true);
+ maxClientCnxns = maxcc;
+ this.ss = ServerSocketChannel.open();
+ ss.socket().setReuseAddress(true);
+ LOG.info("binding to port " + addr);
+ ss.socket().bind(addr);
+ ss.configureBlocking(false);
+ ss.register(selector, SelectionKey.OP_ACCEPT);
+ }
+
+ /** {@inheritDoc} */
+ public int getMaxClientCnxnsPerHost() {
+ return maxClientCnxns;
+ }
+
+ /** {@inheritDoc} */
+ public void setMaxClientCnxnsPerHost(int max) {
+ maxClientCnxns = max;
+ }
+
+ @Override
+ public void start() {
+ // ensure thread is started once and only once
+ if (thread.getState() == Thread.State.NEW) {
+ thread.start();
+ }
+ }
+
+ @Override
+ public void startup(ZooKeeperServer zks) throws IOException,
+ InterruptedException {
+ start();
+ zks.startdata();
+ zks.startup();
+ setZooKeeperServer(zks);
+ }
+
+ @Override
+ public InetSocketAddress getLocalAddress(){
+ return (InetSocketAddress)ss.socket().getLocalSocketAddress();
+ }
+
+ @Override
+ public int getLocalPort(){
+ return ss.socket().getLocalPort();
+ }
+
+ private void addCnxn(NIOServerCnxn cnxn) {
+ synchronized (cnxns) {
+ cnxns.add(cnxn);
+ synchronized (ipMap){
+ InetAddress addr = cnxn.sock.socket().getInetAddress();
+ Set<NIOServerCnxn> s = ipMap.get(addr);
+ if (s == null) {
+ // in general we will see 1 connection from each
+ // host, setting the initial cap to 2 allows us
+ // to minimize mem usage in the common case
+ // of 1 entry -- we need to set the initial cap
+ // to 2 to avoid rehash when the first entry is added
+ s = new HashSet<NIOServerCnxn>(2);
+ s.add(cnxn);
+ ipMap.put(addr,s);
+ } else {
+ s.add(cnxn);
+ }
+ }
+ }
+ }
+
+ protected NIOServerCnxn createConnection(SocketChannel sock,
+ SelectionKey sk) throws IOException {
+ return new NIOServerCnxn(zkServer, sock, sk, this);
+ }
+
+ private int getClientCnxnCount(InetAddress cl) {
+ // The ipMap lock covers both the map, and its contents
+ // (that is, the cnxn sets shouldn't be modified outside of
+ // this lock)
+ synchronized (ipMap) {
+ Set<NIOServerCnxn> s = ipMap.get(cl);
+ if (s == null) return 0;
+ return s.size();
+ }
+ }
+
+ public void run() {
+ while (!ss.socket().isClosed()) {
+ try {
+ selector.select(1000);
+ Set<SelectionKey> selected;
+ synchronized (this) {
+ selected = selector.selectedKeys();
+ }
+ ArrayList<SelectionKey> selectedList = new ArrayList<SelectionKey>(
+ selected);
+ Collections.shuffle(selectedList);
+ for (SelectionKey k : selectedList) {
+ if ((k.readyOps() & SelectionKey.OP_ACCEPT) != 0) {
+ SocketChannel sc = ((ServerSocketChannel) k
+ .channel()).accept();
+ InetAddress ia = sc.socket().getInetAddress();
+ int cnxncount = getClientCnxnCount(ia);
+ if (maxClientCnxns > 0 && cnxncount >= maxClientCnxns){
+ LOG.warn("Too many connections from " + ia
+ + " - max is " + maxClientCnxns );
+ sc.close();
+ } else {
+ LOG.info("Accepted socket connection from "
+ + sc.socket().getRemoteSocketAddress());
+ sc.configureBlocking(false);
+ SelectionKey sk = sc.register(selector,
+ SelectionKey.OP_READ);
+ NIOServerCnxn cnxn = createConnection(sc, sk);
+ sk.attach(cnxn);
+ addCnxn(cnxn);
+ }
+ } else if ((k.readyOps() & (SelectionKey.OP_READ | SelectionKey.OP_WRITE)) != 0) {
+ NIOServerCnxn c = (NIOServerCnxn) k.attachment();
+ c.doIO(k);
+ } else {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Unexpected ops in select "
+ + k.readyOps());
+ }
+ }
+ }
+ selected.clear();
+ } catch (RuntimeException e) {
+ LOG.warn("Ignoring unexpected runtime exception", e);
+ } catch (Exception e) {
+ LOG.warn("Ignoring exception", e);
+ }
+ }
+ closeAll();
+ LOG.info("NIOServerCnxn factory exited run method");
+ }
+
+ /**
+ * clear all the connections in the selector
+ *
+ */
+ @Override
+ @SuppressWarnings("unchecked")
+ synchronized public void closeAll() {
+ selector.wakeup();
+ HashSet<NIOServerCnxn> cnxns;
+ synchronized (this.cnxns) {
+ cnxns = (HashSet<NIOServerCnxn>)this.cnxns.clone();
+ }
+ // got to clear all the connections that we have in the selector
+ for (NIOServerCnxn cnxn: cnxns) {
+ try {
+ // don't hold this.cnxns lock as deadlock may occur
+ cnxn.close();
+ } catch (Exception e) {
+ LOG.warn("Ignoring exception closing cnxn sessionid 0x"
+ + Long.toHexString(cnxn.sessionId), e);
+ }
+ }
+ }
+
+ public void shutdown() {
+ try {
+ ss.close();
+ closeAll();
+ thread.interrupt();
+ thread.join();
+ } catch (InterruptedException e) {
+ LOG.warn("Ignoring interrupted exception during shutdown", e);
+ } catch (Exception e) {
+ LOG.warn("Ignoring unexpected exception during shutdown", e);
+ }
+ try {
+ selector.close();
+ } catch (IOException e) {
+ LOG.warn("Selector closing", e);
+ }
+ if (zkServer != null) {
+ zkServer.shutdown();
+ }
+ }
+
+ @Override
+ public synchronized void closeSession(long sessionId) {
+ selector.wakeup();
+ closeSessionWithoutWakeup(sessionId);
+ }
+
+ @SuppressWarnings("unchecked")
+ private void closeSessionWithoutWakeup(long sessionId) {
+ HashSet<NIOServerCnxn> cnxns;
+ synchronized (this.cnxns) {
+ cnxns = (HashSet<NIOServerCnxn>)this.cnxns.clone();
+ }
+
+ for (NIOServerCnxn cnxn : cnxns) {
+ if (cnxn.getSessionId() == sessionId) {
+ try {
+ cnxn.close();
+ } catch (Exception e) {
+ LOG.warn("exception during session close", e);
+ }
+ break;
+ }
+ }
+ }
+
+ @Override
+ public void join() throws InterruptedException {
+ thread.join();
+ }
+
+ @Override
+ public Iterable<ServerCnxn> getConnections() {
+ return cnxns;
+ }
+
+}
\ No newline at end of file
Added: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/NettyServerCnxn.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/NettyServerCnxn.java?rev=986575&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/NettyServerCnxn.java (added)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/NettyServerCnxn.java Wed Aug 18 06:24:08 2010
@@ -0,0 +1,820 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server;
+
+import static org.jboss.netty.buffer.ChannelBuffers.wrappedBuffer;
+
+import java.io.BufferedWriter;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.io.StringWriter;
+import java.io.Writer;
+import java.lang.management.ManagementFactory;
+import java.lang.management.OperatingSystemMXBean;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.nio.ByteBuffer;
+import java.util.AbstractSet;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.jute.BinaryInputArchive;
+import org.apache.jute.BinaryOutputArchive;
+import org.apache.jute.Record;
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.Environment;
+import org.apache.zookeeper.Version;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.proto.ReplyHeader;
+import org.apache.zookeeper.proto.WatcherEvent;
+import org.apache.zookeeper.server.quorum.Leader;
+import org.apache.zookeeper.server.quorum.LeaderZooKeeperServer;
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.buffer.ChannelBuffers;
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelFuture;
+import org.jboss.netty.channel.MessageEvent;
+
+import com.sun.management.UnixOperatingSystemMXBean;
+
+public class NettyServerCnxn extends ServerCnxn {
+ Logger LOG = Logger.getLogger(NettyServerCnxn.class);
+ Channel channel;
+ ChannelBuffer queuedBuffer;
+ volatile boolean throttled;
+ ByteBuffer bb;
+ ByteBuffer bbLen = ByteBuffer.allocate(4);
+ long sessionId;
+ int sessionTimeout;
+ AtomicLong outstandingCount = new AtomicLong();
+
+ /** The ZooKeeperServer for this connection. May be null if the server
+ * is not currently serving requests (for example if the server is not
+ * an active quorum participant.
+ */
+ private volatile ZooKeeperServer zkServer;
+
+ NettyServerCnxnFactory factory;
+ boolean initialized;
+
+ NettyServerCnxn(Channel channel, ZooKeeperServer zks, NettyServerCnxnFactory factory) {
+ this.channel = channel;
+ this.zkServer = zks;
+ this.factory = factory;
+ }
+
+ @Override
+ public void close() {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("close called for sessionid:0x"
+ + Long.toHexString(sessionId));
+ }
+ synchronized(factory.cnxns){
+ // if this is not in cnxns then it's already closed
+ if (!factory.cnxns.remove(this)) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("cnxns size:" + factory.cnxns.size());
+ }
+ return;
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("close in progress for sessionid:0x"
+ + Long.toHexString(sessionId));
+ }
+
+ synchronized (factory.ipMap) {
+ Set<NettyServerCnxn> s =
+ factory.ipMap.get(((InetSocketAddress)channel
+ .getRemoteAddress()).getAddress());
+ s.remove(this);
+ }
+
+ if (channel.isOpen()) {
+ channel.close();
+ }
+ factory.unregisterConnection(this);
+ }
+ }
+
+ @Override
+ public long getSessionId() {
+ return sessionId;
+ }
+
+ @Override
+ public int getSessionTimeout() {
+ return sessionTimeout;
+ }
+
+ @Override
+ public void process(WatchedEvent event) {
+ ReplyHeader h = new ReplyHeader(-1, -1L, 0);
+ if (LOG.isTraceEnabled()) {
+ ZooTrace.logTraceMessage(LOG, ZooTrace.EVENT_DELIVERY_TRACE_MASK,
+ "Deliver event " + event + " to 0x"
+ + Long.toHexString(this.sessionId)
+ + " through " + this);
+ }
+
+ // Convert WatchedEvent to a type that can be sent over the wire
+ WatcherEvent e = event.getWrapper();
+
+ try {
+ sendResponse(h, e, "notification");
+ } catch (IOException e1) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Problem sending to " + getRemoteSocketAddress(), e1);
+ }
+ close();
+ }
+ }
+
+ private static final byte[] fourBytes = new byte[4];
+ static class ResumeMessageEvent implements MessageEvent {
+ Channel channel;
+ ResumeMessageEvent(Channel channel) {
+ this.channel = channel;
+ }
+ public Object getMessage() {return null;}
+ public SocketAddress getRemoteAddress() {return null;}
+ public Channel getChannel() {return channel;}
+ public ChannelFuture getFuture() {return null;}
+ };
+
+ @Override
+ public void sendResponse(ReplyHeader h, Record r, String tag)
+ throws IOException {
+ if (!channel.isOpen()) {
+ return;
+ }
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ // Make space for length
+ BinaryOutputArchive bos = BinaryOutputArchive.getArchive(baos);
+ try {
+ baos.write(fourBytes);
+ bos.writeRecord(h, "header");
+ if (r != null) {
+ bos.writeRecord(r, tag);
+ }
+ baos.close();
+ } catch (IOException e) {
+ LOG.error("Error serializing response");
+ }
+ byte b[] = baos.toByteArray();
+ ByteBuffer bb = ByteBuffer.wrap(b);
+ bb.putInt(b.length - 4).rewind();
+ sendBuffer(bb);
+ if (h.getXid() > 0) {
+ // zks cannot be null otherwise we would not have gotten here!
+ if (!zkServer.shouldThrottle(outstandingCount.decrementAndGet())) {
+ enableRecv();
+ }
+ }
+ }
+
+ @Override
+ public void setSessionId(long sessionId) {
+ this.sessionId = sessionId;
+ }
+
+ @Override
+ public void enableRecv() {
+ if (throttled) {
+ throttled = false;
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Sending unthrottle event " + this);
+ }
+ channel.getPipeline().sendUpstream(new ResumeMessageEvent(channel));
+ }
+ }
+
+ @Override
+ public void sendBuffer(ByteBuffer sendBuffer) {
+ if (sendBuffer == ServerCnxnFactory.closeConn) {
+ channel.close();
+ return;
+ }
+ channel.write(wrappedBuffer(sendBuffer));
+ packetSent();
+ }
+
+ /**
+ * clean up the socket related to a command and also make sure we flush the
+ * data before we do that
+ *
+ * @param pwriter
+ * the pwriter for a command socket
+ */
+ private void cleanupWriterSocket(PrintWriter pwriter) {
+ try {
+ if (pwriter != null) {
+ pwriter.flush();
+ pwriter.close();
+ }
+ } catch (Exception e) {
+ LOG.info("Error closing PrintWriter ", e);
+ } finally {
+ try {
+ close();
+ } catch (Exception e) {
+ LOG.error("Error closing a command socket ", e);
+ }
+ }
+ }
+
+ /**
+ * This class wraps the sendBuffer method of NIOServerCnxn. It is
+ * responsible for chunking up the response to a client. Rather
+ * than cons'ing up a response fully in memory, which may be large
+ * for some commands, this class chunks up the result.
+ */
+ private class SendBufferWriter extends Writer {
+ private StringBuffer sb = new StringBuffer();
+
+ /**
+ * Check if we are ready to send another chunk.
+ * @param force force sending, even if not a full chunk
+ */
+ private void checkFlush(boolean force) {
+ if ((force && sb.length() > 0) || sb.length() > 2048) {
+ sendBuffer(ByteBuffer.wrap(sb.toString().getBytes()));
+ // clear our internal buffer
+ sb.setLength(0);
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (sb == null) return;
+ checkFlush(true);
+ sb = null; // clear out the ref to ensure no reuse
+ }
+
+ @Override
+ public void flush() throws IOException {
+ checkFlush(true);
+ }
+
+ @Override
+ public void write(char[] cbuf, int off, int len) throws IOException {
+ sb.append(cbuf, off, len);
+ checkFlush(false);
+ }
+ }
+
+ private static final String ZK_NOT_SERVING =
+ "This ZooKeeper instance is not currently serving requests";
+
+ /**
+ * Set of threads for commmand ports. All the 4
+ * letter commands are run via a thread. Each class
+ * maps to a correspoding 4 letter command. CommandThread
+ * is the abstract class from which all the others inherit.
+ */
+ private abstract class CommandThread /*extends Thread*/ {
+ PrintWriter pw;
+
+ CommandThread(PrintWriter pw) {
+ this.pw = pw;
+ }
+
+ public void start() {
+ run();
+ }
+
+ public void run() {
+ try {
+ commandRun();
+ } catch (IOException ie) {
+ LOG.error("Error in running command ", ie);
+ } finally {
+ cleanupWriterSocket(pw);
+ }
+ }
+
+ public abstract void commandRun() throws IOException;
+ }
+
+ private class RuokCommand extends CommandThread {
+ public RuokCommand(PrintWriter pw) {
+ super(pw);
+ }
+
+ @Override
+ public void commandRun() {
+ pw.print("imok");
+
+ }
+ }
+
+ private class TraceMaskCommand extends CommandThread {
+ TraceMaskCommand(PrintWriter pw) {
+ super(pw);
+ }
+
+ @Override
+ public void commandRun() {
+ long traceMask = ZooTrace.getTextTraceLevel();
+ pw.print(traceMask);
+ }
+ }
+
+ private class SetTraceMaskCommand extends CommandThread {
+ long trace = 0;
+ SetTraceMaskCommand(PrintWriter pw, long trace) {
+ super(pw);
+ this.trace = trace;
+ }
+
+ @Override
+ public void commandRun() {
+ pw.print(trace);
+ }
+ }
+
+ private class EnvCommand extends CommandThread {
+ EnvCommand(PrintWriter pw) {
+ super(pw);
+ }
+
+ @Override
+ public void commandRun() {
+ List<Environment.Entry> env = Environment.list();
+
+ pw.println("Environment:");
+ for(Environment.Entry e : env) {
+ pw.print(e.getKey());
+ pw.print("=");
+ pw.println(e.getValue());
+ }
+
+ }
+ }
+
+ private class ConfCommand extends CommandThread {
+ ConfCommand(PrintWriter pw) {
+ super(pw);
+ }
+
+ @Override
+ public void commandRun() {
+ if (zkServer == null) {
+ pw.println(ZK_NOT_SERVING);
+ } else {
+ zkServer.dumpConf(pw);
+ }
+ }
+ }
+
+ private class StatResetCommand extends CommandThread {
+ public StatResetCommand(PrintWriter pw) {
+ super(pw);
+ }
+
+ @Override
+ public void commandRun() {
+ if (zkServer == null) {
+ pw.println(ZK_NOT_SERVING);
+ }
+ else {
+ zkServer.serverStats().reset();
+ pw.println("Server stats reset.");
+ }
+ }
+ }
+
+ private class CnxnStatResetCommand extends CommandThread {
+ public CnxnStatResetCommand(PrintWriter pw) {
+ super(pw);
+ }
+
+ @Override
+ public void commandRun() {
+ if (zkServer == null) {
+ pw.println(ZK_NOT_SERVING);
+ } else {
+ synchronized(factory.cnxns){
+ for(ServerCnxn c : factory.cnxns){
+ c.resetStats();
+ }
+ }
+ pw.println("Connection stats reset.");
+ }
+ }
+ }
+
+ private class DumpCommand extends CommandThread {
+ public DumpCommand(PrintWriter pw) {
+ super(pw);
+ }
+
+ @Override
+ public void commandRun() {
+ if (zkServer == null) {
+ pw.println(ZK_NOT_SERVING);
+ }
+ else {
+ pw.println("SessionTracker dump:");
+ zkServer.sessionTracker.dumpSessions(pw);
+ pw.println("ephemeral nodes dump:");
+ zkServer.dumpEphemerals(pw);
+ }
+ }
+ }
+
+ private class StatCommand extends CommandThread {
+ int len;
+ public StatCommand(PrintWriter pw, int len) {
+ super(pw);
+ this.len = len;
+ }
+
+ @Override
+ public void commandRun() {
+ if (zkServer == null) {
+ pw.println(ZK_NOT_SERVING);
+ }
+ else {
+ pw.print("Zookeeper version: ");
+ pw.println(Version.getFullVersion());
+ if (len == statCmd) {
+ LOG.info("Stat command output");
+ pw.println("Clients:");
+ // clone should be faster than iteration
+ // ie give up the cnxns lock faster
+ HashSet<ServerCnxn> cnxns;
+ synchronized(factory.cnxns){
+ cnxns = new HashSet<ServerCnxn>(factory.cnxns);
+ }
+ for(ServerCnxn c : cnxns){
+ c.dumpConnectionInfo(pw, true);
+ }
+ pw.println();
+ }
+ pw.print(zkServer.serverStats().toString());
+ pw.print("Node count: ");
+ pw.println(zkServer.getZKDatabase().getNodeCount());
+ }
+
+ }
+ }
+
+ private class ConsCommand extends CommandThread {
+ public ConsCommand(PrintWriter pw) {
+ super(pw);
+ }
+
+ @Override
+ public void commandRun() {
+ if (zkServer == null) {
+ pw.println(ZK_NOT_SERVING);
+ } else {
+ // clone should be faster than iteration
+ // ie give up the cnxns lock faster
+ AbstractSet<ServerCnxn> cnxns;
+ synchronized (factory.cnxns) {
+ cnxns = new HashSet<ServerCnxn>(factory.cnxns);
+ }
+ for (ServerCnxn c : cnxns) {
+ c.dumpConnectionInfo(pw, false);
+ }
+ pw.println();
+ }
+ }
+ }
+
+ private class WatchCommand extends CommandThread {
+ int len = 0;
+ public WatchCommand(PrintWriter pw, int len) {
+ super(pw);
+ this.len = len;
+ }
+
+ @Override
+ public void commandRun() {
+ if (zkServer == null) {
+ pw.println(ZK_NOT_SERVING);
+ } else {
+ DataTree dt = zkServer.getZKDatabase().getDataTree();
+ if (len == wchsCmd) {
+ dt.dumpWatchesSummary(pw);
+ } else if (len == wchpCmd) {
+ dt.dumpWatches(pw, true);
+ } else {
+ dt.dumpWatches(pw, false);
+ }
+ pw.println();
+ }
+ }
+ }
+
+ private class MonitorCommand extends CommandThread {
+
+ MonitorCommand(PrintWriter pw) {
+ super(pw);
+ }
+
+ @Override
+ public void commandRun() {
+ if(zkServer == null) {
+ pw.println(ZK_NOT_SERVING);
+ return;
+ }
+ ZKDatabase zkdb = zkServer.getZKDatabase();
+ ServerStats stats = zkServer.serverStats();
+
+ print("version", Version.getFullVersion());
+
+ print("avg_latency", stats.getAvgLatency());
+ print("max_latency", stats.getMaxLatency());
+ print("min_latency", stats.getMinLatency());
+
+ print("packets_received", stats.getPacketsReceived());
+ print("packets_sent", stats.getPacketsSent());
+
+ print("outstanding_requests", stats.getOutstandingRequests());
+
+ print("server_state", stats.getServerState());
+ print("znode_count", zkdb.getNodeCount());
+
+ print("watch_count", zkdb.getDataTree().getWatchCount());
+ print("ephemerals_count", zkdb.getDataTree().getEphemeralsCount());
+ print("approximate_data_size", zkdb.getDataTree().approximateDataSize());
+
+ OperatingSystemMXBean osMbean = ManagementFactory.getOperatingSystemMXBean();
+ if(osMbean != null && osMbean instanceof UnixOperatingSystemMXBean) {
+ UnixOperatingSystemMXBean unixos = (UnixOperatingSystemMXBean)osMbean;
+
+ print("open_file_descriptor_count", unixos.getOpenFileDescriptorCount());
+ print("max_file_descriptor_count", unixos.getMaxFileDescriptorCount());
+ }
+
+ if(stats.getServerState() == "leader") {
+ Leader leader = ((LeaderZooKeeperServer)zkServer).getLeader();
+
+ print("followers", leader.learners.size());
+ print("synced_followers", leader.forwardingFollowers.size());
+ print("pending_syncs", leader.pendingSyncs.size());
+ }
+ }
+
+ private void print(String key, long number) {
+ print(key, "" + number);
+ }
+
+ private void print(String key, String value) {
+ pw.print("zk_");
+ pw.print(key);
+ pw.print("\t");
+ pw.println(value);
+ }
+
+ }
+
+
+ /** Return if four letter word found and responded to, otw false **/
+ private boolean checkFourLetterWord(final Channel channel,
+ ChannelBuffer message, final int len) throws IOException
+ {
+ // We take advantage of the limited size of the length to look
+ // for cmds. They are all 4-bytes which fits inside of an int
+ String cmd = cmd2String.get(len);
+ if (cmd == null) {
+ return false;
+ }
+ channel.setInterestOps(0).awaitUninterruptibly();
+ LOG.info("Processing " + cmd + " command from "
+ + channel.getRemoteAddress());
+ packetReceived();
+
+ final PrintWriter pwriter = new PrintWriter(
+ new BufferedWriter(new SendBufferWriter()));
+ if (len == ruokCmd) {
+ RuokCommand ruok = new RuokCommand(pwriter);
+ ruok.start();
+ return true;
+ } else if (len == getTraceMaskCmd) {
+ TraceMaskCommand tmask = new TraceMaskCommand(pwriter);
+ tmask.start();
+ return true;
+ } else if (len == setTraceMaskCmd) {
+ ByteBuffer mask = ByteBuffer.allocate(4);
+ message.readBytes(mask);
+
+ bb.flip();
+ long traceMask = mask.getLong();
+ ZooTrace.setTextTraceLevel(traceMask);
+ SetTraceMaskCommand setMask = new SetTraceMaskCommand(pwriter, traceMask);
+ setMask.start();
+ return true;
+ } else if (len == enviCmd) {
+ EnvCommand env = new EnvCommand(pwriter);
+ env.start();
+ return true;
+ } else if (len == confCmd) {
+ ConfCommand ccmd = new ConfCommand(pwriter);
+ ccmd.start();
+ return true;
+ } else if (len == srstCmd) {
+ StatResetCommand strst = new StatResetCommand(pwriter);
+ strst.start();
+ return true;
+ } else if (len == crstCmd) {
+ CnxnStatResetCommand crst = new CnxnStatResetCommand(pwriter);
+ crst.start();
+ return true;
+ } else if (len == dumpCmd) {
+ DumpCommand dump = new DumpCommand(pwriter);
+ dump.start();
+ return true;
+ } else if (len == statCmd || len == srvrCmd) {
+ StatCommand stat = new StatCommand(pwriter, len);
+ stat.start();
+ return true;
+ } else if (len == consCmd) {
+ ConsCommand cons = new ConsCommand(pwriter);
+ cons.start();
+ return true;
+ } else if (len == wchpCmd || len == wchcCmd || len == wchsCmd) {
+ WatchCommand wcmd = new WatchCommand(pwriter, len);
+ wcmd.start();
+ return true;
+ } else if (len == mntrCmd) {
+ MonitorCommand mntr = new MonitorCommand(pwriter);
+ mntr.start();
+ return true;
+ }
+ return false;
+ }
+
+ public void receiveMessage(ChannelBuffer message) {
+ try {
+ while(message.readable() && !throttled) {
+ if (bb != null) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("message readable " + message.readableBytes()
+ + " bb len " + bb.remaining() + " " + bb);
+ ByteBuffer dat = bb.duplicate();
+ dat.flip();
+ LOG.trace(Long.toHexString(sessionId)
+ + " bb 0x"
+ + ChannelBuffers.hexDump(
+ ChannelBuffers.copiedBuffer(dat)));
+ }
+
+ if (bb.remaining() > message.readableBytes()) {
+ int newLimit = bb.position() + message.readableBytes();
+ bb.limit(newLimit);
+ }
+ message.readBytes(bb);
+ bb.limit(bb.capacity());
+
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("after readBytes message readable "
+ + message.readableBytes()
+ + " bb len " + bb.remaining() + " " + bb);
+ ByteBuffer dat = bb.duplicate();
+ dat.flip();
+ LOG.trace("after readbytes "
+ + Long.toHexString(sessionId)
+ + " bb 0x"
+ + ChannelBuffers.hexDump(
+ ChannelBuffers.copiedBuffer(dat)));
+ }
+ if (bb.remaining() == 0) {
+ packetReceived();
+ bb.flip();
+
+ ZooKeeperServer zks = this.zkServer;
+ if (zks == null) {
+ throw new IOException("ZK down");
+ }
+ if (initialized) {
+ zks.processPacket(this, bb);
+
+ if (zks.shouldThrottle(outstandingCount.incrementAndGet())) {
+ disableRecv();
+ }
+ } else {
+ LOG.debug("got conn req request from "
+ + getRemoteSocketAddress());
+ zks.processConnectRequest(this, bb);
+ initialized = true;
+ }
+ bb = null;
+ }
+ } else {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("message readable "
+ + message.readableBytes()
+ + " bblenrem " + bbLen.remaining());
+ ByteBuffer dat = bbLen.duplicate();
+ dat.flip();
+ LOG.trace(Long.toHexString(sessionId)
+ + " bbLen 0x"
+ + ChannelBuffers.hexDump(
+ ChannelBuffers.copiedBuffer(dat)));
+ }
+
+ if (message.readableBytes() < bbLen.remaining()) {
+ bbLen.limit(bbLen.position() + message.readableBytes());
+ }
+ message.readBytes(bbLen);
+ bbLen.limit(bbLen.capacity());
+ if (bbLen.remaining() == 0) {
+ bbLen.flip();
+
+ if (LOG.isTraceEnabled()) {
+ LOG.trace(Long.toHexString(sessionId)
+ + " bbLen 0x"
+ + ChannelBuffers.hexDump(
+ ChannelBuffers.copiedBuffer(bbLen)));
+ }
+ int len = bbLen.getInt();
+ if (LOG.isTraceEnabled()) {
+ LOG.trace(Long.toHexString(sessionId)
+ + " bbLen len is " + len);
+ }
+
+ bbLen.clear();
+ if (!initialized) {
+ if (checkFourLetterWord(channel, message, len)) {
+ return;
+ }
+ }
+ if (len < 0 || len > BinaryInputArchive.maxBuffer) {
+ throw new IOException("Len error " + len);
+ }
+ bb = ByteBuffer.allocate(len);
+ }
+ }
+ }
+ } catch(IOException e) {
+ LOG.warn("Closing connection to " + getRemoteSocketAddress(), e);
+ close();
+ }
+ }
+
+ @Override
+ public void disableRecv() {
+ throttled = true;
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Throttling - disabling recv " + this);
+ }
+ channel.setReadable(false).awaitUninterruptibly();
+ }
+
+ @Override
+ public long getOutstandingRequests() {
+ return outstandingCount.longValue();
+ }
+
+ @Override
+ public void setSessionTimeout(int sessionTimeout) {
+ this.sessionTimeout = sessionTimeout;
+ }
+
+ @Override
+ public int getInterestOps() {
+ return channel.getInterestOps();
+ }
+
+ @Override
+ public InetSocketAddress getRemoteSocketAddress() {
+ return (InetSocketAddress)channel.getRemoteAddress();
+ }
+
+ /** Send close connection packet to the client.
+ */
+ public void sendCloseSession() {
+ sendBuffer(ServerCnxnFactory.closeConn);
+ }
+
+ @Override
+ protected ServerStats serverStats() {
+ if (zkServer == null) {
+ return null;
+ }
+ return zkServer.serverStats();
+ }
+
+}
Added: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/NettyServerCnxnFactory.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/NettyServerCnxnFactory.java?rev=986575&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/NettyServerCnxnFactory.java (added)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/NettyServerCnxnFactory.java Wed Aug 18 06:24:08 2010
@@ -0,0 +1,396 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server;
+
+import static org.jboss.netty.buffer.ChannelBuffers.dynamicBuffer;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.Executors;
+
+import org.apache.log4j.Logger;
+import org.jboss.netty.bootstrap.ServerBootstrap;
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.buffer.ChannelBuffers;
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.channel.ChannelPipelineCoverage;
+import org.jboss.netty.channel.ChannelStateEvent;
+import org.jboss.netty.channel.ExceptionEvent;
+import org.jboss.netty.channel.MessageEvent;
+import org.jboss.netty.channel.SimpleChannelHandler;
+import org.jboss.netty.channel.WriteCompletionEvent;
+import org.jboss.netty.channel.group.ChannelGroup;
+import org.jboss.netty.channel.group.DefaultChannelGroup;
+import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
+
+public class NettyServerCnxnFactory extends ServerCnxnFactory {
+ Logger LOG = Logger.getLogger(NettyServerCnxnFactory.class);
+
+ ServerBootstrap bootstrap;
+ Channel parentChannel;
+ ChannelGroup allChannels = new DefaultChannelGroup("zkServerCnxns");
+ HashSet<ServerCnxn> cnxns = new HashSet<ServerCnxn>();
+ HashMap<InetAddress, Set<NettyServerCnxn>> ipMap =
+ new HashMap<InetAddress, Set<NettyServerCnxn>>( );
+ InetSocketAddress localAddress;
+ int maxClientCnxns = 10;
+
+ /**
+ * This is an inner class since we need to extend SimpleChannelHandler, but
+ * NettyServerCnxnFactory already extends ServerCnxnFactory. By making it inner
+ * this class gets access to the member variables and methods.
+ */
+ @ChannelPipelineCoverage("all")
+ class CnxnChannelHandler extends SimpleChannelHandler {
+
+ @Override
+ public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e)
+ throws Exception
+ {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Channel closed " + e);
+ }
+ allChannels.remove(ctx.getChannel());
+ }
+
+ @Override
+ public void channelConnected(ChannelHandlerContext ctx,
+ ChannelStateEvent e) throws Exception
+ {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Channel connected " + e);
+ }
+ allChannels.add(ctx.getChannel());
+ NettyServerCnxn cnxn = new NettyServerCnxn(ctx.getChannel(),
+ zkServer, NettyServerCnxnFactory.this);
+ ctx.setAttachment(cnxn);
+ addCnxn(cnxn);
+ }
+
+ @Override
+ public void channelDisconnected(ChannelHandlerContext ctx,
+ ChannelStateEvent e) throws Exception
+ {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Channel disconnected " + e);
+ }
+ NettyServerCnxn cnxn = (NettyServerCnxn) ctx.getAttachment();
+ if (cnxn != null) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Channel disconnect caused close " + e);
+ }
+ cnxn.close();
+ }
+ }
+
+ @Override
+ public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e)
+ throws Exception
+ {
+ LOG.warn("Exception caught " + e, e.getCause());
+ NettyServerCnxn cnxn = (NettyServerCnxn) ctx.getAttachment();
+ if (cnxn != null) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Closing " + cnxn);
+ cnxn.close();
+ }
+ }
+ }
+
+ @Override
+ public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
+ throws Exception
+ {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("message received called " + e.getMessage());
+ }
+ try {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("New message " + e.toString()
+ + " from " + ctx.getChannel());
+ }
+ NettyServerCnxn cnxn = (NettyServerCnxn)ctx.getAttachment();
+ synchronized(cnxn) {
+ processMessage(e, cnxn);
+ }
+ } catch(Exception ex) {
+ LOG.error("Unexpected exception in receive", ex);
+ throw ex;
+ }
+ }
+
+ private void processMessage(MessageEvent e, NettyServerCnxn cnxn) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(Long.toHexString(cnxn.sessionId) + " queuedBuffer: "
+ + cnxn.queuedBuffer);
+ }
+
+ if (e instanceof NettyServerCnxn.ResumeMessageEvent) {
+ LOG.debug("Received ResumeMessageEvent");
+ if (cnxn.queuedBuffer != null) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("processing queue "
+ + Long.toHexString(cnxn.sessionId)
+ + " queuedBuffer 0x"
+ + ChannelBuffers.hexDump(cnxn.queuedBuffer));
+ }
+ cnxn.receiveMessage(cnxn.queuedBuffer);
+ if (!cnxn.queuedBuffer.readable()) {
+ LOG.debug("Processed queue - no bytes remaining");
+ cnxn.queuedBuffer = null;
+ } else {
+ LOG.debug("Processed queue - bytes remaining");
+ }
+ } else {
+ LOG.debug("queue empty");
+ }
+ cnxn.channel.setReadable(true);
+ } else {
+ ChannelBuffer buf = (ChannelBuffer)e.getMessage();
+ if (LOG.isTraceEnabled()) {
+ LOG.trace(Long.toHexString(cnxn.sessionId)
+ + " buf 0x"
+ + ChannelBuffers.hexDump(buf));
+ }
+
+ if (cnxn.throttled) {
+ LOG.debug("Received message while throttled");
+ // we are throttled, so we need to queue
+ if (cnxn.queuedBuffer == null) {
+ LOG.debug("allocating queue");
+ cnxn.queuedBuffer = dynamicBuffer(buf.readableBytes());
+ }
+ cnxn.queuedBuffer.writeBytes(buf);
+ LOG.debug(Long.toHexString(cnxn.sessionId)
+ + " queuedBuffer 0x"
+ + ChannelBuffers.hexDump(cnxn.queuedBuffer));
+ } else {
+ LOG.debug("not throttled");
+ if (cnxn.queuedBuffer != null) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace(Long.toHexString(cnxn.sessionId)
+ + " queuedBuffer 0x"
+ + ChannelBuffers.hexDump(cnxn.queuedBuffer));
+ }
+ cnxn.queuedBuffer.writeBytes(buf);
+ if (LOG.isTraceEnabled()) {
+ LOG.trace(Long.toHexString(cnxn.sessionId)
+ + " queuedBuffer 0x"
+ + ChannelBuffers.hexDump(cnxn.queuedBuffer));
+ }
+
+ cnxn.receiveMessage(cnxn.queuedBuffer);
+ if (!cnxn.queuedBuffer.readable()) {
+ LOG.debug("Processed queue - no bytes remaining");
+ cnxn.queuedBuffer = null;
+ } else {
+ LOG.debug("Processed queue - bytes remaining");
+ }
+ } else {
+ cnxn.receiveMessage(buf);
+ if (buf.readable()) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Before copy " + buf);
+ }
+ cnxn.queuedBuffer = dynamicBuffer(buf.readableBytes());
+ cnxn.queuedBuffer.writeBytes(buf);
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Copy is " + cnxn.queuedBuffer);
+ LOG.trace(Long.toHexString(cnxn.sessionId)
+ + " queuedBuffer 0x"
+ + ChannelBuffers.hexDump(cnxn.queuedBuffer));
+ }
+ }
+ }
+ }
+ }
+ }
+
+ @Override
+ public void writeComplete(ChannelHandlerContext ctx,
+ WriteCompletionEvent e) throws Exception
+ {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("write complete " + e);
+ }
+ }
+
+ }
+
+ CnxnChannelHandler channelHandler = new CnxnChannelHandler();
+
+ NettyServerCnxnFactory() {
+ bootstrap = new ServerBootstrap(
+ new NioServerSocketChannelFactory(
+ Executors.newCachedThreadPool(),
+ Executors.newCachedThreadPool()));
+ // parent channel
+ bootstrap.setOption("reuseAddress", true);
+ // child channels
+ bootstrap.setOption("child.tcpNoDelay", true);
+ bootstrap.setOption("child.soLinger", 2);
+
+ bootstrap.getPipeline().addLast("servercnxnfactory", channelHandler);
+ }
+
+ @Override
+ public void closeAll() {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("closeAll()");
+ }
+
+ synchronized (cnxns) {
+ // got to clear all the connections that we have in the selector
+ for (NettyServerCnxn cnxn : cnxns.toArray(new NettyServerCnxn[cnxns.size()])) {
+ try {
+ cnxn.close();
+ } catch (Exception e) {
+ LOG.warn("Ignoring exception closing cnxn sessionid 0x"
+ + Long.toHexString(cnxn.getSessionId()), e);
+ }
+ }
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("allChannels size:" + allChannels.size()
+ + " cnxns size:" + cnxns.size());
+ }
+ }
+
+ @Override
+ public void closeSession(long sessionId) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("closeSession sessionid:0x" + sessionId);
+ }
+
+ synchronized (cnxns) {
+ for (NettyServerCnxn cnxn : cnxns.toArray(new NettyServerCnxn[cnxns.size()])) {
+ if (cnxn.getSessionId() == sessionId) {
+ try {
+ cnxn.close();
+ } catch (Exception e) {
+ LOG.warn("exception during session close", e);
+ }
+ break;
+ }
+ }
+ }
+ }
+
+ @Override
+ public void configure(InetSocketAddress addr, int maxClientCnxns)
+ throws IOException
+ {
+ localAddress = addr;
+ this.maxClientCnxns = maxClientCnxns;
+ }
+
+ /** {@inheritDoc} */
+ public int getMaxClientCnxnsPerHost() {
+ return maxClientCnxns;
+ }
+
+ /** {@inheritDoc} */
+ public void setMaxClientCnxnsPerHost(int max) {
+ maxClientCnxns = max;
+ }
+
+ @Override
+ public int getLocalPort() {
+ return localAddress.getPort();
+ }
+
+ boolean killed;
+ @Override
+ public void join() throws InterruptedException {
+ synchronized(this) {
+ while(!killed) {
+ wait();
+ }
+ }
+ }
+
+ @Override
+ public void shutdown() {
+ LOG.info("shutdown called " + localAddress);
+
+ // null if factory never started
+ if (parentChannel != null) {
+ parentChannel.close().awaitUninterruptibly();
+ closeAll();
+ allChannels.close().awaitUninterruptibly();
+ bootstrap.releaseExternalResources();
+ }
+
+ if (zkServer != null) {
+ zkServer.shutdown();
+ }
+ synchronized(this) {
+ killed = true;
+ notifyAll();
+ }
+ }
+
+ @Override
+ public void start() {
+ LOG.info("binding to port " + localAddress);
+ parentChannel = bootstrap.bind(localAddress);
+ }
+
+ @Override
+ public void startup(ZooKeeperServer zks) throws IOException,
+ InterruptedException {
+ start();
+ zks.startdata();
+ zks.startup();
+ setZooKeeperServer(zks);
+ }
+
+ @Override
+ public Iterable<ServerCnxn> getConnections() {
+ return cnxns;
+ }
+
+ @Override
+ public InetSocketAddress getLocalAddress() {
+ return localAddress;
+ }
+
+ private void addCnxn(NettyServerCnxn cnxn) {
+ synchronized (cnxns) {
+ cnxns.add(cnxn);
+ synchronized (ipMap){
+ InetAddress addr =
+ ((InetSocketAddress)cnxn.channel.getRemoteAddress())
+ .getAddress();
+ Set<NettyServerCnxn> s = ipMap.get(addr);
+ if (s == null) {
+ s = new HashSet<NettyServerCnxn>();
+ }
+ s.add(cnxn);
+ ipMap.put(addr,s);
+ }
+ }
+ }
+
+}
Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/PrepRequestProcessor.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/PrepRequestProcessor.java?rev=986575&r1=986574&r2=986575&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/PrepRequestProcessor.java (original)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/PrepRequestProcessor.java Wed Aug 18 06:24:08 2010
@@ -84,7 +84,8 @@ public class PrepRequestProcessor extend
public PrepRequestProcessor(ZooKeeperServer zks,
RequestProcessor nextProcessor) {
- super("ProcessThread:" + zks.getClientPort());
+ super("ProcessThread(sid:" + zks.getServerId()
+ + " cport:" + zks.getClientPort() + "):");
this.nextProcessor = nextProcessor;
this.zks = zks;
}
@@ -505,6 +506,7 @@ public class PrepRequestProcessor extend
}
public void shutdown() {
+ LOG.info("Shutting down");
submittedRequests.clear();
submittedRequests.add(Request.requestOfDeath);
nextProcessor.shutdown();
Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ServerCnxn.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ServerCnxn.java?rev=986575&r1=986574&r2=986575&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ServerCnxn.java (original)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ServerCnxn.java Wed Aug 18 06:24:08 2010
@@ -19,90 +19,414 @@
package org.apache.zookeeper.server;
import java.io.IOException;
+import java.io.PrintWriter;
+import java.io.StringWriter;
import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
import org.apache.jute.Record;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.data.Id;
import org.apache.zookeeper.proto.ReplyHeader;
+import org.apache.zookeeper.proto.RequestHeader;
/**
* Interface to a Server connection - represents a connection from a client
* to the server.
*/
-public interface ServerCnxn extends Watcher {
+public abstract class ServerCnxn implements Stats, Watcher {
// This is just an arbitrary object to represent requests issued by
// (aka owned by) this class
final public static Object me = new Object();
+
+ protected ArrayList<Id> authInfo = new ArrayList<Id>();
- int getSessionTimeout();
+ abstract int getSessionTimeout();
- void sendResponse(ReplyHeader h, Record r, String tag) throws IOException;
+ abstract void close();
+
+ abstract void sendResponse(ReplyHeader h, Record r, String tag)
+ throws IOException;
/* notify the client the session is closing and close/cleanup socket */
- void sendCloseSession();
+ abstract void sendCloseSession();
+
+ public abstract void process(WatchedEvent event);
+
+ abstract long getSessionId();
+
+ abstract void setSessionId(long sessionId);
+
+ /** auth info for the cnxn, returns an unmodifyable list */
+ public List<Id> getAuthInfo() {
+ return Collections.unmodifiableList(authInfo);
+ }
+
+ public void addAuthInfo(Id id) {
+ authInfo.add(id);
+ }
+
+ public boolean removeAuthInfo(Id id) {
+ return authInfo.remove(id);
+ }
+
+ abstract void sendBuffer(ByteBuffer closeConn);
+
+ abstract void enableRecv();
+
+ abstract void disableRecv();
+
+ abstract void setSessionTimeout(int sessionTimeout);
+
+ protected static class CloseRequestException extends IOException {
+ private static final long serialVersionUID = -7854505709816442681L;
+
+ public CloseRequestException(String msg) {
+ super(msg);
+ }
+ }
+
+ protected static class EndOfStreamException extends IOException {
+ private static final long serialVersionUID = -8255690282104294178L;
+
+ public EndOfStreamException(String msg) {
+ super(msg);
+ }
+
+ public String toString() {
+ return "EndOfStreamException: " + getMessage();
+ }
+ }
+
+ /*
+ * See <a href="{@docRoot}/../../../docs/zookeeperAdmin.html#sc_zkCommands">
+ * Zk Admin</a>. this link is for all the commands.
+ */
+ protected final static int confCmd =
+ ByteBuffer.wrap("conf".getBytes()).getInt();
- void finishSessionInit(boolean valid);
+ /*
+ * See <a href="{@docRoot}/../../../docs/zookeeperAdmin.html#sc_zkCommands">
+ * Zk Admin</a>. this link is for all the commands.
+ */
+ protected final static int consCmd =
+ ByteBuffer.wrap("cons".getBytes()).getInt();
+
+ /*
+ * See <a href="{@docRoot}/../../../docs/zookeeperAdmin.html#sc_zkCommands">
+ * Zk Admin</a>. this link is for all the commands.
+ */
+ protected final static int crstCmd =
+ ByteBuffer.wrap("crst".getBytes()).getInt();
+
+ /*
+ * See <a href="{@docRoot}/../../../docs/zookeeperAdmin.html#sc_zkCommands">
+ * Zk Admin</a>. this link is for all the commands.
+ */
+ protected final static int dumpCmd =
+ ByteBuffer.wrap("dump".getBytes()).getInt();
+
+ /*
+ * See <a href="{@docRoot}/../../../docs/zookeeperAdmin.html#sc_zkCommands">
+ * Zk Admin</a>. this link is for all the commands.
+ */
+ protected final static int enviCmd =
+ ByteBuffer.wrap("envi".getBytes()).getInt();
+
+ /*
+ * See <a href="{@docRoot}/../../../docs/zookeeperAdmin.html#sc_zkCommands">
+ * Zk Admin</a>. this link is for all the commands.
+ */
+ protected final static int getTraceMaskCmd =
+ ByteBuffer.wrap("gtmk".getBytes()).getInt();
+
+ /*
+ * See <a href="{@docRoot}/../../../docs/zookeeperAdmin.html#sc_zkCommands">
+ * Zk Admin</a>. this link is for all the commands.
+ */
+ protected final static int ruokCmd =
+ ByteBuffer.wrap("ruok".getBytes()).getInt();
+ /*
+ * See <a href="{@docRoot}/../../../docs/zookeeperAdmin.html#sc_zkCommands">
+ * Zk Admin</a>. this link is for all the commands.
+ */
+ protected final static int setTraceMaskCmd =
+ ByteBuffer.wrap("stmk".getBytes()).getInt();
+
+ /*
+ * See <a href="{@docRoot}/../../../docs/zookeeperAdmin.html#sc_zkCommands">
+ * Zk Admin</a>. this link is for all the commands.
+ */
+ protected final static int srvrCmd =
+ ByteBuffer.wrap("srvr".getBytes()).getInt();
- void process(WatchedEvent event);
+ /*
+ * See <a href="{@docRoot}/../../../docs/zookeeperAdmin.html#sc_zkCommands">
+ * Zk Admin</a>. this link is for all the commands.
+ */
+ protected final static int srstCmd =
+ ByteBuffer.wrap("srst".getBytes()).getInt();
- long getSessionId();
+ /*
+ * See <a href="{@docRoot}/../../../docs/zookeeperAdmin.html#sc_zkCommands">
+ * Zk Admin</a>. this link is for all the commands.
+ */
+ protected final static int statCmd =
+ ByteBuffer.wrap("stat".getBytes()).getInt();
- void setSessionId(long sessionId);
+ /*
+ * See <a href="{@docRoot}/../../../docs/zookeeperAdmin.html#sc_zkCommands">
+ * Zk Admin</a>. this link is for all the commands.
+ */
+ protected final static int wchcCmd =
+ ByteBuffer.wrap("wchc".getBytes()).getInt();
- ArrayList<Id> getAuthInfo();
+ /*
+ * See <a href="{@docRoot}/../../../docs/zookeeperAdmin.html#sc_zkCommands">
+ * Zk Admin</a>. this link is for all the commands.
+ */
+ protected final static int wchpCmd =
+ ByteBuffer.wrap("wchp".getBytes()).getInt();
+
+ /*
+ * See <a href="{@docRoot}/../../../docs/zookeeperAdmin.html#sc_zkCommands">
+ * Zk Admin</a>. this link is for all the commands.
+ */
+ protected final static int wchsCmd =
+ ByteBuffer.wrap("wchs".getBytes()).getInt();
+
+ /*
+ * See <a href="{@docRoot}/../../../docs/zookeeperAdmin.html#sc_zkCommands">
+ * Zk Admin</a>. this link is for all the commands.
+ */
+ protected final static int mntrCmd = ByteBuffer.wrap("mntr".getBytes())
+ .getInt();
- InetSocketAddress getRemoteAddress();
+ protected final static HashMap<Integer, String> cmd2String =
+ new HashMap<Integer, String>();
+
+ // specify all of the commands that are available
+ static {
+ cmd2String.put(confCmd, "conf");
+ cmd2String.put(consCmd, "cons");
+ cmd2String.put(crstCmd, "crst");
+ cmd2String.put(dumpCmd, "dump");
+ cmd2String.put(enviCmd, "envi");
+ cmd2String.put(getTraceMaskCmd, "gtmk");
+ cmd2String.put(ruokCmd, "ruok");
+ cmd2String.put(setTraceMaskCmd, "stmk");
+ cmd2String.put(srstCmd, "srst");
+ cmd2String.put(srvrCmd, "srvr");
+ cmd2String.put(statCmd, "stat");
+ cmd2String.put(wchcCmd, "wchc");
+ cmd2String.put(wchpCmd, "wchp");
+ cmd2String.put(wchsCmd, "wchs");
+ cmd2String.put(mntrCmd, "mntr");
+ }
+
+ protected void packetReceived() {
+ incrPacketsReceived();
+ ServerStats serverStats = serverStats();
+ if (serverStats != null) {
+ serverStats().incrementPacketsReceived();
+ }
+ }
+
+ protected void packetSent() {
+ incrPacketsSent();
+ ServerStats serverStats = serverStats();
+ if (serverStats != null) {
+ serverStats().incrementPacketsSent();
+ }
+ }
+
+ protected abstract ServerStats serverStats();
+
+ protected final Date established = new Date();
+
+ protected final AtomicLong packetsReceived = new AtomicLong();
+ protected final AtomicLong packetsSent = new AtomicLong();
+
+ protected long minLatency;
+ protected long maxLatency;
+ protected String lastOp;
+ protected long lastCxid;
+ protected long lastZxid;
+ protected long lastResponseTime;
+ protected long lastLatency;
+
+ protected long count;
+ protected long totalLatency;
+
+ public synchronized void resetStats() {
+ packetsReceived.set(0);
+ packetsSent.set(0);
+ minLatency = Long.MAX_VALUE;
+ maxLatency = 0;
+ lastOp = "NA";
+ lastCxid = -1;
+ lastZxid = -1;
+ lastResponseTime = 0;
+ lastLatency = 0;
+
+ count = 0;
+ totalLatency = 0;
+ }
+
+ protected long incrPacketsReceived() {
+ return packetsReceived.incrementAndGet();
+ }
+
+ protected void incrOutstandingRequests(RequestHeader h) {
+ }
+
+ protected long incrPacketsSent() {
+ return packetsSent.incrementAndGet();
+ }
+
+ protected synchronized void updateStatsForResponse(long cxid, long zxid,
+ String op, long start, long end)
+ {
+ // don't overwrite with "special" xids - we're interested
+ // in the clients last real operation
+ if (cxid >= 0) {
+ lastCxid = cxid;
+ }
+ lastZxid = zxid;
+ lastOp = op;
+ lastResponseTime = end;
+ long elapsed = end - start;
+ lastLatency = elapsed;
+ if (elapsed < minLatency) {
+ minLatency = elapsed;
+ }
+ if (elapsed > maxLatency) {
+ maxLatency = elapsed;
+ }
+ count++;
+ totalLatency += elapsed;
+ }
+
+ public Date getEstablished() {
+ return (Date)established.clone();
+ }
+
+ public abstract long getOutstandingRequests();
+
+ public long getPacketsReceived() {
+ return packetsReceived.longValue();
+ }
+
+ public long getPacketsSent() {
+ return packetsSent.longValue();
+ }
+
+ public synchronized long getMinLatency() {
+ return minLatency == Long.MAX_VALUE ? 0 : minLatency;
+ }
+
+ public synchronized long getAvgLatency() {
+ return count == 0 ? 0 : totalLatency / count;
+ }
+
+ public synchronized long getMaxLatency() {
+ return maxLatency;
+ }
+
+ public synchronized String getLastOperation() {
+ return lastOp;
+ }
+
+ public synchronized long getLastCxid() {
+ return lastCxid;
+ }
+
+ public synchronized long getLastZxid() {
+ return lastZxid;
+ }
+
+ public synchronized long getLastResponseTime() {
+ return lastResponseTime;
+ }
+
+ public synchronized long getLastLatency() {
+ return lastLatency;
+ }
+
+ /**
+ * Prints detailed stats information for the connection.
+ *
+ * @see dumpConnectionInfo(PrintWriter, boolean) for brief stats
+ */
+ @Override
+ public String toString() {
+ StringWriter sw = new StringWriter();
+ PrintWriter pwriter = new PrintWriter(sw);
+ dumpConnectionInfo(pwriter, false);
+ pwriter.flush();
+ pwriter.close();
+ return sw.toString();
+ }
+ public abstract InetSocketAddress getRemoteSocketAddress();
+ public abstract int getInterestOps();
+
/**
- * Statistics on the ServerCnxn
+ * Print information about the connection.
+ * @param brief iff true prints brief details, otw full detail
+ * @return information about this connection
*/
- interface Stats {
- /** Date/time the connection was established
- * @since 3.3.0 */
- Date getEstablished();
-
- /**
- * The number of requests that have been submitted but not yet
- * responded to.
- */
- long getOutstandingRequests();
- /** Number of packets received */
- long getPacketsReceived();
- /** Number of packets sent (incl notifications) */
- long getPacketsSent();
- /** Min latency in ms
- * @since 3.3.0 */
- long getMinLatency();
- /** Average latency in ms
- * @since 3.3.0 */
- long getAvgLatency();
- /** Max latency in ms
- * @since 3.3.0 */
- long getMaxLatency();
- /** Last operation performed by this connection
- * @since 3.3.0 */
- String getLastOperation();
- /** Last cxid of this connection
- * @since 3.3.0 */
- long getLastCxid();
- /** Last zxid of this connection
- * @since 3.3.0 */
- long getLastZxid();
- /** Last time server sent a response to client on this connection
- * @since 3.3.0 */
- long getLastResponseTime();
- /** Latency of last response to client on this connection in ms
- * @since 3.3.0 */
- long getLastLatency();
-
- /** Reset counters
- * @since 3.3.0 */
- void reset();
+ protected synchronized void
+ dumpConnectionInfo(PrintWriter pwriter, boolean brief) {
+ pwriter.print(" ");
+ pwriter.print(getRemoteSocketAddress());
+ pwriter.print("[");
+ int interestOps = getInterestOps();
+ pwriter.print(interestOps == 0 ? "0" : Integer.toHexString(interestOps));
+ pwriter.print("](queued=");
+ pwriter.print(getOutstandingRequests());
+ pwriter.print(",recved=");
+ pwriter.print(getPacketsReceived());
+ pwriter.print(",sent=");
+ pwriter.print(getPacketsSent());
+
+ if (!brief) {
+ long sessionId = getSessionId();
+ if (sessionId != 0) {
+ pwriter.print(",sid=0x");
+ pwriter.print(Long.toHexString(sessionId));
+ pwriter.print(",lop=");
+ pwriter.print(getLastOperation());
+ pwriter.print(",est=");
+ pwriter.print(getEstablished().getTime());
+ pwriter.print(",to=");
+ pwriter.print(getSessionTimeout());
+ long lastCxid = getLastCxid();
+ if (lastCxid >= 0) {
+ pwriter.print(",lcxid=0x");
+ pwriter.print(Long.toHexString(lastCxid));
+ }
+ pwriter.print(",lzxid=0x");
+ pwriter.print(Long.toHexString(getLastZxid()));
+ pwriter.print(",lresp=");
+ pwriter.print(getLastResponseTime());
+ pwriter.print(",llat=");
+ pwriter.print(getLastLatency());
+ pwriter.print(",minlat=");
+ pwriter.print(getMinLatency());
+ pwriter.print(",avglat=");
+ pwriter.print(getAvgLatency());
+ pwriter.print(",maxlat=");
+ pwriter.print(getMaxLatency());
+ }
+ }
}
- Stats getStats();
}
Added: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ServerCnxnFactory.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ServerCnxnFactory.java?rev=986575&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ServerCnxnFactory.java (added)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ServerCnxnFactory.java Wed Aug 18 06:24:08 2010
@@ -0,0 +1,134 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+
+import javax.management.JMException;
+
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.jmx.MBeanRegistry;
+
+public abstract class ServerCnxnFactory {
+
+ public static final String ZOOKEEPER_SERVER_CNXN_FACTORY = "zookeeper.serverCnxnFactory";
+
+ public interface PacketProcessor {
+ public void processPacket(ByteBuffer packet, ServerCnxn src);
+ }
+
+ Logger LOG = Logger.getLogger(ServerCnxnFactory.class);
+
+ /**
+ * The buffer will cause the connection to be close when we do a send.
+ */
+ static final ByteBuffer closeConn = ByteBuffer.allocate(0);
+
+ public abstract int getLocalPort();
+
+ public abstract Iterable<ServerCnxn> getConnections();
+
+ public abstract void closeSession(long sessionId);
+
+ public abstract void configure(InetSocketAddress addr,
+ int maxClientCnxns) throws IOException;
+
+ /** Maximum number of connections allowed from particular host (ip) */
+ public abstract int getMaxClientCnxnsPerHost();
+
+ /** Maximum number of connections allowed from particular host (ip) */
+ public abstract void setMaxClientCnxnsPerHost(int max);
+
+ public abstract void startup(ZooKeeperServer zkServer)
+ throws IOException, InterruptedException;
+
+ public abstract void join() throws InterruptedException;
+
+ public abstract void shutdown();
+
+ public abstract void start();
+
+ protected ZooKeeperServer zkServer;
+ final public void setZooKeeperServer(ZooKeeperServer zk) {
+ this.zkServer = zk;
+ if (zk != null) {
+ zk.setServerCnxnFactory(this);
+ }
+ }
+
+ public abstract void closeAll();
+
+ static public ServerCnxnFactory createFactory() throws IOException {
+ String serverCnxnFactoryName =
+ System.getProperty(ZOOKEEPER_SERVER_CNXN_FACTORY);
+ if (serverCnxnFactoryName == null) {
+ serverCnxnFactoryName = NIOServerCnxnFactory.class.getName();
+ }
+ try {
+ return (ServerCnxnFactory) Class.forName(serverCnxnFactoryName)
+ .newInstance();
+ } catch (Exception e) {
+ IOException ioe = new IOException("Couldn't instantiate "
+ + serverCnxnFactoryName);
+ ioe.initCause(e);
+ throw ioe;
+ }
+ }
+
+ static public ServerCnxnFactory createFactory(int clientPort,
+ int maxClientCnxns) throws IOException
+ {
+ return createFactory(new InetSocketAddress(clientPort), maxClientCnxns);
+ }
+
+ static public ServerCnxnFactory createFactory(InetSocketAddress addr,
+ int maxClientCnxns) throws IOException
+ {
+ ServerCnxnFactory factory = createFactory();
+ factory.configure(addr, maxClientCnxns);
+ return factory;
+ }
+
+ public abstract InetSocketAddress getLocalAddress();
+
+ private HashMap<ServerCnxn, ConnectionBean> connectionBeans = new HashMap<ServerCnxn, ConnectionBean>();
+ public void unregisterConnection(ServerCnxn serverCnxn) {
+ ConnectionBean jmxConnectionBean = connectionBeans.remove(serverCnxn);
+ if (jmxConnectionBean != null){
+ MBeanRegistry.getInstance().unregister(jmxConnectionBean);
+ }
+ }
+
+ public void registerConnection(ServerCnxn serverCnxn) {
+ if (zkServer != null) {
+ ConnectionBean jmxConnectionBean = new ConnectionBean(serverCnxn, zkServer);
+ try {
+ MBeanRegistry.getInstance().register(jmxConnectionBean, zkServer.jmxServerBean);
+ connectionBeans.put(serverCnxn, jmxConnectionBean);
+ } catch (JMException e) {
+ LOG.warn("Could not register connection", e);
+ }
+ }
+
+ }
+
+}
Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/SessionTrackerImpl.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/SessionTrackerImpl.java?rev=986575&r1=986574&r2=986575&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/SessionTrackerImpl.java (original)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/SessionTrackerImpl.java Wed Aug 18 06:24:08 2010
@@ -202,6 +202,8 @@ public class SessionTrackerImpl extends
}
public void shutdown() {
+ LOG.info("Shutting down");
+
running = false;
if (LOG.isTraceEnabled()) {
ZooTrace.logTraceMessage(LOG, ZooTrace.getTextTraceLevel(),
Added: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/Stats.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/Stats.java?rev=986575&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/Stats.java (added)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/Stats.java Wed Aug 18 06:24:08 2010
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server;
+
+import java.util.Date;
+
+/**
+ * Statistics on the ServerCnxn
+ */
+interface Stats {
+ /** Date/time the connection was established
+ * @since 3.3.0 */
+ Date getEstablished();
+
+ /**
+ * The number of requests that have been submitted but not yet
+ * responded to.
+ */
+ long getOutstandingRequests();
+ /** Number of packets received */
+ long getPacketsReceived();
+ /** Number of packets sent (incl notifications) */
+ long getPacketsSent();
+ /** Min latency in ms
+ * @since 3.3.0 */
+ long getMinLatency();
+ /** Average latency in ms
+ * @since 3.3.0 */
+ long getAvgLatency();
+ /** Max latency in ms
+ * @since 3.3.0 */
+ long getMaxLatency();
+ /** Last operation performed by this connection
+ * @since 3.3.0 */
+ String getLastOperation();
+ /** Last cxid of this connection
+ * @since 3.3.0 */
+ long getLastCxid();
+ /** Last zxid of this connection
+ * @since 3.3.0 */
+ long getLastZxid();
+ /** Last time server sent a response to client on this connection
+ * @since 3.3.0 */
+ long getLastResponseTime();
+ /** Latency of last response to client on this connection in ms
+ * @since 3.3.0 */
+ long getLastLatency();
+
+ /** Reset counters
+ * @since 3.3.0 */
+ void resetStats();
+}
\ No newline at end of file
Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/SyncRequestProcessor.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/SyncRequestProcessor.java?rev=986575&r1=986574&r2=986575&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/SyncRequestProcessor.java (original)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/SyncRequestProcessor.java Wed Aug 18 06:24:08 2010
@@ -166,6 +166,7 @@ public class SyncRequestProcessor extend
}
public void shutdown() {
+ LOG.info("Shutting down");
queuedRequests.add(requestOfDeath);
try {
this.join();