You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by ph...@apache.org on 2010/08/18 08:24:09 UTC

svn commit: r986575 [2/4] - in /hadoop/zookeeper/trunk: ./ src/docs/src/documentation/content/xdocs/ src/java/libtest/ src/java/main/org/apache/zookeeper/ src/java/main/org/apache/zookeeper/server/ src/java/main/org/apache/zookeeper/server/auth/ src/ja...

Added: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/NIOServerCnxnFactory.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/NIOServerCnxnFactory.java?rev=986575&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/NIOServerCnxnFactory.java (added)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/NIOServerCnxnFactory.java Wed Aug 18 06:24:08 2010
@@ -0,0 +1,306 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+import java.nio.channels.ServerSocketChannel;
+import java.nio.channels.SocketChannel;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.log4j.Logger;
+
+public class NIOServerCnxnFactory extends ServerCnxnFactory implements Runnable {
+    private static final Logger LOG = Logger.getLogger(NIOServerCnxnFactory.class);
+
+    static {
+        Thread.setDefaultUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
+                public void uncaughtException(Thread t, Throwable e) {
+                    LOG.error("Thread " + t + " died", e);
+                }
+            });
+        /**
+         * this is to avoid the jvm bug:
+         * NullPointerException in Selector.open()
+         * http://bugs.sun.com/view_bug.do?bug_id=6427854
+         */
+        try {
+            Selector.open().close();
+        } catch(IOException ie) {
+            LOG.error("Selector failed to open", ie);
+        }
+    }
+
+    ServerSocketChannel ss;
+
+    final Selector selector = Selector.open();
+
+    /**
+     * We use this buffer to do efficient socket I/O. Since there is a single
+     * sender thread per NIOServerCnxn instance, we can use a member variable to
+     * only allocate it once.
+    */
+    final ByteBuffer directBuffer = ByteBuffer.allocateDirect(64 * 1024);
+
+    final HashSet<ServerCnxn> cnxns = new HashSet<ServerCnxn>();
+    final HashMap<InetAddress, Set<NIOServerCnxn>> ipMap =
+        new HashMap<InetAddress, Set<NIOServerCnxn>>( );
+
+    int maxClientCnxns = 10;
+
+
+    /**
+     * Construct a new server connection factory which will accept an unlimited number
+     * of concurrent connections from each client (up to the file descriptor
+     * limits of the operating system). startup(zks) must be called subsequently.
+     * @throws IOException
+     */
+    public NIOServerCnxnFactory() throws IOException {
+    }
+
+    Thread thread;
+    @Override
+    public void configure(InetSocketAddress addr, int maxcc) throws IOException {
+        thread = new Thread(this, "NIOServerCxn.Factory:" + addr);
+        thread.setDaemon(true);
+        maxClientCnxns = maxcc;
+        this.ss = ServerSocketChannel.open();
+        ss.socket().setReuseAddress(true);
+        LOG.info("binding to port " + addr);
+        ss.socket().bind(addr);
+        ss.configureBlocking(false);
+        ss.register(selector, SelectionKey.OP_ACCEPT);
+    }
+
+    /** {@inheritDoc} */
+    public int getMaxClientCnxnsPerHost() {
+        return maxClientCnxns;
+    }
+
+    /** {@inheritDoc} */
+    public void setMaxClientCnxnsPerHost(int max) {
+        maxClientCnxns = max;
+    }
+
+    @Override
+    public void start() {
+        // ensure thread is started once and only once
+        if (thread.getState() == Thread.State.NEW) {
+            thread.start();
+        }
+    }
+
+    @Override
+    public void startup(ZooKeeperServer zks) throws IOException,
+            InterruptedException {
+        start();
+        zks.startdata();
+        zks.startup();
+        setZooKeeperServer(zks);
+    }
+
+    @Override
+    public InetSocketAddress getLocalAddress(){
+        return (InetSocketAddress)ss.socket().getLocalSocketAddress();
+    }
+
+    @Override
+    public int getLocalPort(){
+        return ss.socket().getLocalPort();
+    }
+
+    private void addCnxn(NIOServerCnxn cnxn) {
+        synchronized (cnxns) {
+            cnxns.add(cnxn);
+            synchronized (ipMap){
+                InetAddress addr = cnxn.sock.socket().getInetAddress();
+                Set<NIOServerCnxn> s = ipMap.get(addr);
+                if (s == null) {
+                    // in general we will see 1 connection from each
+                    // host, setting the initial cap to 2 allows us
+                    // to minimize mem usage in the common case
+                    // of 1 entry --  we need to set the initial cap
+                    // to 2 to avoid rehash when the first entry is added
+                    s = new HashSet<NIOServerCnxn>(2);
+                    s.add(cnxn);
+                    ipMap.put(addr,s);
+                } else {
+                    s.add(cnxn);
+                }
+            }
+        }
+    }
+
+    protected NIOServerCnxn createConnection(SocketChannel sock,
+            SelectionKey sk) throws IOException {
+        return new NIOServerCnxn(zkServer, sock, sk, this);
+    }
+
+    private int getClientCnxnCount(InetAddress cl) {
+        // The ipMap lock covers both the map, and its contents
+        // (that is, the cnxn sets shouldn't be modified outside of
+        // this lock)
+        synchronized (ipMap) {
+            Set<NIOServerCnxn> s = ipMap.get(cl);
+            if (s == null) return 0;
+            return s.size();
+        }
+    }
+
+    public void run() {
+        while (!ss.socket().isClosed()) {
+            try {
+                selector.select(1000);
+                Set<SelectionKey> selected;
+                synchronized (this) {
+                    selected = selector.selectedKeys();
+                }
+                ArrayList<SelectionKey> selectedList = new ArrayList<SelectionKey>(
+                        selected);
+                Collections.shuffle(selectedList);
+                for (SelectionKey k : selectedList) {
+                    if ((k.readyOps() & SelectionKey.OP_ACCEPT) != 0) {
+                        SocketChannel sc = ((ServerSocketChannel) k
+                                .channel()).accept();
+                        InetAddress ia = sc.socket().getInetAddress();
+                        int cnxncount = getClientCnxnCount(ia);
+                        if (maxClientCnxns > 0 && cnxncount >= maxClientCnxns){
+                            LOG.warn("Too many connections from " + ia
+                                     + " - max is " + maxClientCnxns );
+                            sc.close();
+                        } else {
+                            LOG.info("Accepted socket connection from "
+                                     + sc.socket().getRemoteSocketAddress());
+                            sc.configureBlocking(false);
+                            SelectionKey sk = sc.register(selector,
+                                    SelectionKey.OP_READ);
+                            NIOServerCnxn cnxn = createConnection(sc, sk);
+                            sk.attach(cnxn);
+                            addCnxn(cnxn);
+                        }
+                    } else if ((k.readyOps() & (SelectionKey.OP_READ | SelectionKey.OP_WRITE)) != 0) {
+                        NIOServerCnxn c = (NIOServerCnxn) k.attachment();
+                        c.doIO(k);
+                    } else {
+                        if (LOG.isDebugEnabled()) {
+                            LOG.debug("Unexpected ops in select "
+                                      + k.readyOps());
+                        }
+                    }
+                }
+                selected.clear();
+            } catch (RuntimeException e) {
+                LOG.warn("Ignoring unexpected runtime exception", e);
+            } catch (Exception e) {
+                LOG.warn("Ignoring exception", e);
+            }
+        }
+        closeAll();
+        LOG.info("NIOServerCnxn factory exited run method");
+    }
+
+    /**
+     * clear all the connections in the selector
+     *
+     */
+    @Override
+    @SuppressWarnings("unchecked")
+    synchronized public void closeAll() {
+        selector.wakeup();
+        HashSet<NIOServerCnxn> cnxns;
+        synchronized (this.cnxns) {
+            cnxns = (HashSet<NIOServerCnxn>)this.cnxns.clone();
+        }
+        // got to clear all the connections that we have in the selector
+        for (NIOServerCnxn cnxn: cnxns) {
+            try {
+                // don't hold this.cnxns lock as deadlock may occur
+                cnxn.close();
+            } catch (Exception e) {
+                LOG.warn("Ignoring exception closing cnxn sessionid 0x"
+                         + Long.toHexString(cnxn.sessionId), e);
+            }
+        }
+    }
+
+    public void shutdown() {
+        try {
+            ss.close();
+            closeAll();
+            thread.interrupt();
+            thread.join();
+        } catch (InterruptedException e) {
+            LOG.warn("Ignoring interrupted exception during shutdown", e);
+        } catch (Exception e) {
+            LOG.warn("Ignoring unexpected exception during shutdown", e);
+        }
+        try {
+            selector.close();
+        } catch (IOException e) {
+            LOG.warn("Selector closing", e);
+        }
+        if (zkServer != null) {
+            zkServer.shutdown();
+        }
+    }
+
+    @Override
+    public synchronized void closeSession(long sessionId) {
+        selector.wakeup();
+        closeSessionWithoutWakeup(sessionId);
+    }
+
+    @SuppressWarnings("unchecked")
+    private void closeSessionWithoutWakeup(long sessionId) {
+        HashSet<NIOServerCnxn> cnxns;
+        synchronized (this.cnxns) {
+            cnxns = (HashSet<NIOServerCnxn>)this.cnxns.clone();
+        }
+
+        for (NIOServerCnxn cnxn : cnxns) {
+            if (cnxn.getSessionId() == sessionId) {
+                try {
+                    cnxn.close();
+                } catch (Exception e) {
+                    LOG.warn("exception during session close", e);
+                }
+                break;
+            }
+        }
+    }
+
+    @Override
+    public void join() throws InterruptedException {
+        thread.join();
+    }
+
+    @Override
+    public Iterable<ServerCnxn> getConnections() {
+        return cnxns;
+    }
+
+}
\ No newline at end of file

Added: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/NettyServerCnxn.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/NettyServerCnxn.java?rev=986575&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/NettyServerCnxn.java (added)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/NettyServerCnxn.java Wed Aug 18 06:24:08 2010
@@ -0,0 +1,820 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server;
+
+import static org.jboss.netty.buffer.ChannelBuffers.wrappedBuffer;
+
+import java.io.BufferedWriter;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.io.StringWriter;
+import java.io.Writer;
+import java.lang.management.ManagementFactory;
+import java.lang.management.OperatingSystemMXBean;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.nio.ByteBuffer;
+import java.util.AbstractSet;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.jute.BinaryInputArchive;
+import org.apache.jute.BinaryOutputArchive;
+import org.apache.jute.Record;
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.Environment;
+import org.apache.zookeeper.Version;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.proto.ReplyHeader;
+import org.apache.zookeeper.proto.WatcherEvent;
+import org.apache.zookeeper.server.quorum.Leader;
+import org.apache.zookeeper.server.quorum.LeaderZooKeeperServer;
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.buffer.ChannelBuffers;
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelFuture;
+import org.jboss.netty.channel.MessageEvent;
+
+import com.sun.management.UnixOperatingSystemMXBean;
+
+public class NettyServerCnxn extends ServerCnxn {
+    Logger LOG = Logger.getLogger(NettyServerCnxn.class);
+    Channel channel;
+    ChannelBuffer queuedBuffer;
+    volatile boolean throttled;
+    ByteBuffer bb;
+    ByteBuffer bbLen = ByteBuffer.allocate(4);
+    long sessionId;
+    int sessionTimeout;
+    AtomicLong outstandingCount = new AtomicLong();
+
+    /** The ZooKeeperServer for this connection. May be null if the server
+     * is not currently serving requests (for example if the server is not
+     * an active quorum participant.
+     */
+    private volatile ZooKeeperServer zkServer;
+
+    NettyServerCnxnFactory factory;
+    boolean initialized;
+    
+    NettyServerCnxn(Channel channel, ZooKeeperServer zks, NettyServerCnxnFactory factory) {
+        this.channel = channel;
+        this.zkServer = zks;
+        this.factory = factory;
+    }
+    
+    @Override
+    public void close() {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("close called for sessionid:0x"
+                    + Long.toHexString(sessionId));
+        }
+        synchronized(factory.cnxns){
+            // if this is not in cnxns then it's already closed
+            if (!factory.cnxns.remove(this)) {
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("cnxns size:" + factory.cnxns.size());
+                }
+                return;
+            }
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("close in progress for sessionid:0x"
+                        + Long.toHexString(sessionId));
+            }
+
+            synchronized (factory.ipMap) {
+                Set<NettyServerCnxn> s =
+                    factory.ipMap.get(((InetSocketAddress)channel
+                            .getRemoteAddress()).getAddress());
+                s.remove(this);
+            }
+    
+            if (channel.isOpen()) {
+                channel.close();
+            }
+            factory.unregisterConnection(this);
+        }
+    }
+
+    @Override
+    public long getSessionId() {
+        return sessionId;
+    }
+
+    @Override
+    public int getSessionTimeout() {
+        return sessionTimeout;
+    }
+
+    @Override
+    public void process(WatchedEvent event) {
+        ReplyHeader h = new ReplyHeader(-1, -1L, 0);
+        if (LOG.isTraceEnabled()) {
+            ZooTrace.logTraceMessage(LOG, ZooTrace.EVENT_DELIVERY_TRACE_MASK,
+                                     "Deliver event " + event + " to 0x"
+                                     + Long.toHexString(this.sessionId)
+                                     + " through " + this);
+        }
+
+        // Convert WatchedEvent to a type that can be sent over the wire
+        WatcherEvent e = event.getWrapper();
+
+        try {
+            sendResponse(h, e, "notification");
+        } catch (IOException e1) {
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Problem sending to " + getRemoteSocketAddress(), e1);
+            }
+            close();
+        }
+    }
+
+    private static final byte[] fourBytes = new byte[4];
+    static class ResumeMessageEvent implements MessageEvent {
+        Channel channel;
+        ResumeMessageEvent(Channel channel) {
+            this.channel = channel;
+        }
+        public Object getMessage() {return null;}
+        public SocketAddress getRemoteAddress() {return null;}
+        public Channel getChannel() {return channel;}
+        public ChannelFuture getFuture() {return null;}
+    };
+    
+    @Override
+    public void sendResponse(ReplyHeader h, Record r, String tag)
+            throws IOException {
+        if (!channel.isOpen()) {
+            return;
+        }
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        // Make space for length
+        BinaryOutputArchive bos = BinaryOutputArchive.getArchive(baos);
+        try {
+            baos.write(fourBytes);
+            bos.writeRecord(h, "header");
+            if (r != null) {
+                bos.writeRecord(r, tag);
+            }
+            baos.close();
+        } catch (IOException e) {
+            LOG.error("Error serializing response");
+        }
+        byte b[] = baos.toByteArray();
+        ByteBuffer bb = ByteBuffer.wrap(b);
+        bb.putInt(b.length - 4).rewind();
+        sendBuffer(bb);
+        if (h.getXid() > 0) {
+            // zks cannot be null otherwise we would not have gotten here!
+            if (!zkServer.shouldThrottle(outstandingCount.decrementAndGet())) {
+                enableRecv();
+            }
+        }
+    }
+
+    @Override
+    public void setSessionId(long sessionId) {
+        this.sessionId = sessionId;
+    }
+
+    @Override
+    public void enableRecv() {
+        if (throttled) {
+            throttled = false;
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Sending unthrottle event " + this);
+            }
+            channel.getPipeline().sendUpstream(new ResumeMessageEvent(channel));
+        }
+    }
+
+    @Override
+    public void sendBuffer(ByteBuffer sendBuffer) {
+        if (sendBuffer == ServerCnxnFactory.closeConn) {
+            channel.close();
+            return;
+        }
+        channel.write(wrappedBuffer(sendBuffer));
+        packetSent();
+    }
+
+    /**
+     * clean up the socket related to a command and also make sure we flush the
+     * data before we do that
+     * 
+     * @param pwriter
+     *            the pwriter for a command socket
+     */
+    private void cleanupWriterSocket(PrintWriter pwriter) {
+        try {
+            if (pwriter != null) {
+                pwriter.flush();
+                pwriter.close();
+            }
+        } catch (Exception e) {
+            LOG.info("Error closing PrintWriter ", e);
+        } finally {
+            try {
+                close();
+            } catch (Exception e) {
+                LOG.error("Error closing a command socket ", e);
+            }
+        }
+    }
+
+    /**
+     * This class wraps the sendBuffer method of NIOServerCnxn. It is
+     * responsible for chunking up the response to a client. Rather
+     * than cons'ing up a response fully in memory, which may be large
+     * for some commands, this class chunks up the result.
+     */
+    private class SendBufferWriter extends Writer {
+        private StringBuffer sb = new StringBuffer();
+        
+        /**
+         * Check if we are ready to send another chunk.
+         * @param force force sending, even if not a full chunk
+         */
+        private void checkFlush(boolean force) {
+            if ((force && sb.length() > 0) || sb.length() > 2048) {
+                sendBuffer(ByteBuffer.wrap(sb.toString().getBytes()));
+                // clear our internal buffer
+                sb.setLength(0);
+            }
+        }
+
+        @Override
+        public void close() throws IOException {
+            if (sb == null) return;
+            checkFlush(true);
+            sb = null; // clear out the ref to ensure no reuse
+        }
+
+        @Override
+        public void flush() throws IOException {
+            checkFlush(true);
+        }
+
+        @Override
+        public void write(char[] cbuf, int off, int len) throws IOException {
+            sb.append(cbuf, off, len);
+            checkFlush(false);
+        }
+    }
+
+    private static final String ZK_NOT_SERVING =
+        "This ZooKeeper instance is not currently serving requests";
+    
+    /**
+     * Set of threads for commmand ports. All the 4
+     * letter commands are run via a thread. Each class
+     * maps to a correspoding 4 letter command. CommandThread
+     * is the abstract class from which all the others inherit.
+     */
+    private abstract class CommandThread /*extends Thread*/ {
+        PrintWriter pw;
+        
+        CommandThread(PrintWriter pw) {
+            this.pw = pw;
+        }
+        
+        public void start() {
+            run();
+        }
+
+        public void run() {
+            try {
+                commandRun();
+            } catch (IOException ie) {
+                LOG.error("Error in running command ", ie);
+            } finally {
+                cleanupWriterSocket(pw);
+            }
+        }
+        
+        public abstract void commandRun() throws IOException;
+    }
+    
+    private class RuokCommand extends CommandThread {
+        public RuokCommand(PrintWriter pw) {
+            super(pw);
+        }
+        
+        @Override
+        public void commandRun() {
+            pw.print("imok");
+            
+        }
+    }
+    
+    private class TraceMaskCommand extends CommandThread {
+        TraceMaskCommand(PrintWriter pw) {
+            super(pw);
+        }
+        
+        @Override
+        public void commandRun() {
+            long traceMask = ZooTrace.getTextTraceLevel();
+            pw.print(traceMask);
+        }
+    }
+    
+    private class SetTraceMaskCommand extends CommandThread {
+        long trace = 0;
+        SetTraceMaskCommand(PrintWriter pw, long trace) {
+            super(pw);
+            this.trace = trace;
+        }
+        
+        @Override
+        public void commandRun() {
+            pw.print(trace);
+        }
+    }
+    
+    private class EnvCommand extends CommandThread {
+        EnvCommand(PrintWriter pw) {
+            super(pw);
+        }
+        
+        @Override
+        public void commandRun() {
+            List<Environment.Entry> env = Environment.list();
+
+            pw.println("Environment:");
+            for(Environment.Entry e : env) {
+                pw.print(e.getKey());
+                pw.print("=");
+                pw.println(e.getValue());
+            }
+            
+        } 
+    }
+    
+    private class ConfCommand extends CommandThread {
+        ConfCommand(PrintWriter pw) {
+            super(pw);
+        }
+            
+        @Override
+        public void commandRun() {
+            if (zkServer == null) {
+                pw.println(ZK_NOT_SERVING);
+            } else {
+                zkServer.dumpConf(pw);
+            }
+        }
+    }
+    
+    private class StatResetCommand extends CommandThread {
+        public StatResetCommand(PrintWriter pw) {
+            super(pw);
+        }
+        
+        @Override
+        public void commandRun() {
+            if (zkServer == null) {
+                pw.println(ZK_NOT_SERVING);
+            }
+            else { 
+                zkServer.serverStats().reset();
+                pw.println("Server stats reset.");
+            }
+        }
+    }
+    
+    private class CnxnStatResetCommand extends CommandThread {
+        public CnxnStatResetCommand(PrintWriter pw) {
+            super(pw);
+        }
+        
+        @Override
+        public void commandRun() {
+            if (zkServer == null) {
+                pw.println(ZK_NOT_SERVING);
+            } else {
+                synchronized(factory.cnxns){
+                    for(ServerCnxn c : factory.cnxns){
+                        c.resetStats();
+                    }
+                }
+                pw.println("Connection stats reset.");
+            }
+        }
+    }
+
+    private class DumpCommand extends CommandThread {
+        public DumpCommand(PrintWriter pw) {
+            super(pw);
+        }
+        
+        @Override
+        public void commandRun() {
+            if (zkServer == null) {
+                pw.println(ZK_NOT_SERVING);
+            }
+            else {
+                pw.println("SessionTracker dump:");
+                zkServer.sessionTracker.dumpSessions(pw);
+                pw.println("ephemeral nodes dump:");
+                zkServer.dumpEphemerals(pw);
+            }
+        }
+    }
+    
+    private class StatCommand extends CommandThread {
+        int len;
+        public StatCommand(PrintWriter pw, int len) {
+            super(pw);
+            this.len = len;
+        }
+        
+        @Override
+        public void commandRun() {
+            if (zkServer == null) {
+                pw.println(ZK_NOT_SERVING);
+            }
+            else {   
+                pw.print("Zookeeper version: ");
+                pw.println(Version.getFullVersion());
+                if (len == statCmd) {
+                    LOG.info("Stat command output");
+                    pw.println("Clients:");
+                    // clone should be faster than iteration
+                    // ie give up the cnxns lock faster
+                    HashSet<ServerCnxn> cnxns;
+                    synchronized(factory.cnxns){
+                        cnxns = new HashSet<ServerCnxn>(factory.cnxns);
+                    }
+                    for(ServerCnxn c : cnxns){
+                        c.dumpConnectionInfo(pw, true);
+                    }
+                    pw.println();
+                }
+                pw.print(zkServer.serverStats().toString());
+                pw.print("Node count: ");
+                pw.println(zkServer.getZKDatabase().getNodeCount());
+            }
+            
+        }
+    }
+    
+    private class ConsCommand extends CommandThread {
+        public ConsCommand(PrintWriter pw) {
+            super(pw);
+        }
+        
+        @Override
+        public void commandRun() {
+            if (zkServer == null) {
+                pw.println(ZK_NOT_SERVING);
+            } else {
+                // clone should be faster than iteration
+                // ie give up the cnxns lock faster
+                AbstractSet<ServerCnxn> cnxns;
+                synchronized (factory.cnxns) {
+                    cnxns = new HashSet<ServerCnxn>(factory.cnxns);
+                }
+                for (ServerCnxn c : cnxns) {
+                    c.dumpConnectionInfo(pw, false);
+                }
+                pw.println();
+            }
+        }
+    }
+    
+    private class WatchCommand extends CommandThread {
+        int len = 0;
+        public WatchCommand(PrintWriter pw, int len) {
+            super(pw);
+            this.len = len;
+        }
+
+        @Override
+        public void commandRun() {
+            if (zkServer == null) {
+                pw.println(ZK_NOT_SERVING);
+            } else {
+                DataTree dt = zkServer.getZKDatabase().getDataTree();
+                if (len == wchsCmd) {
+                    dt.dumpWatchesSummary(pw);
+                } else if (len == wchpCmd) {
+                    dt.dumpWatches(pw, true);
+                } else {
+                    dt.dumpWatches(pw, false);
+                }
+                pw.println();
+            }
+        }
+    }
+
+    private class MonitorCommand extends CommandThread {
+
+        MonitorCommand(PrintWriter pw) {
+            super(pw);
+        }
+
+        @Override
+        public void commandRun() {
+            if(zkServer == null) {
+                pw.println(ZK_NOT_SERVING);
+                return;
+            }
+            ZKDatabase zkdb = zkServer.getZKDatabase();
+            ServerStats stats = zkServer.serverStats();
+
+            print("version", Version.getFullVersion());
+
+            print("avg_latency", stats.getAvgLatency());
+            print("max_latency", stats.getMaxLatency());
+            print("min_latency", stats.getMinLatency());
+
+            print("packets_received", stats.getPacketsReceived());
+            print("packets_sent", stats.getPacketsSent());
+
+            print("outstanding_requests", stats.getOutstandingRequests());
+
+            print("server_state", stats.getServerState());
+            print("znode_count", zkdb.getNodeCount());
+
+            print("watch_count", zkdb.getDataTree().getWatchCount());
+            print("ephemerals_count", zkdb.getDataTree().getEphemeralsCount());
+            print("approximate_data_size", zkdb.getDataTree().approximateDataSize());
+
+            OperatingSystemMXBean osMbean = ManagementFactory.getOperatingSystemMXBean();
+            if(osMbean != null && osMbean instanceof UnixOperatingSystemMXBean) {
+                UnixOperatingSystemMXBean unixos = (UnixOperatingSystemMXBean)osMbean;
+
+                print("open_file_descriptor_count", unixos.getOpenFileDescriptorCount());
+                print("max_file_descriptor_count", unixos.getMaxFileDescriptorCount());
+            }
+
+            if(stats.getServerState() == "leader") {
+                Leader leader = ((LeaderZooKeeperServer)zkServer).getLeader();
+
+                print("followers", leader.learners.size());
+                print("synced_followers", leader.forwardingFollowers.size());
+                print("pending_syncs", leader.pendingSyncs.size());
+            }
+        }
+
+        private void print(String key, long number) {
+            print(key, "" + number);
+        }
+
+        private void print(String key, String value) {
+            pw.print("zk_");
+            pw.print(key);
+            pw.print("\t");
+            pw.println(value);
+        }
+
+    }
+
+
+    /** Return if four letter word found and responded to, otw false **/
+    private boolean checkFourLetterWord(final Channel channel,
+            ChannelBuffer message, final int len) throws IOException
+    {
+        // We take advantage of the limited size of the length to look
+        // for cmds. They are all 4-bytes which fits inside of an int
+        String cmd = cmd2String.get(len);
+        if (cmd == null) {
+            return false;
+        }
+        channel.setInterestOps(0).awaitUninterruptibly();
+        LOG.info("Processing " + cmd + " command from "
+                + channel.getRemoteAddress());
+        packetReceived();
+
+        final PrintWriter pwriter = new PrintWriter(
+                new BufferedWriter(new SendBufferWriter()));
+        if (len == ruokCmd) {
+            RuokCommand ruok = new RuokCommand(pwriter);
+            ruok.start();
+            return true;
+        } else if (len == getTraceMaskCmd) {
+            TraceMaskCommand tmask = new TraceMaskCommand(pwriter);
+            tmask.start();
+            return true;
+        } else if (len == setTraceMaskCmd) {
+            ByteBuffer mask = ByteBuffer.allocate(4);
+            message.readBytes(mask);
+
+            bb.flip();
+            long traceMask = mask.getLong();
+            ZooTrace.setTextTraceLevel(traceMask);
+            SetTraceMaskCommand setMask = new SetTraceMaskCommand(pwriter, traceMask);
+            setMask.start();
+            return true;
+        } else if (len == enviCmd) {
+            EnvCommand env = new EnvCommand(pwriter);
+            env.start();
+            return true;
+        } else if (len == confCmd) {
+            ConfCommand ccmd = new ConfCommand(pwriter);
+            ccmd.start();
+            return true;
+        } else if (len == srstCmd) {
+            StatResetCommand strst = new StatResetCommand(pwriter);
+            strst.start();
+            return true;
+        } else if (len == crstCmd) {
+            CnxnStatResetCommand crst = new CnxnStatResetCommand(pwriter);
+            crst.start();
+            return true;
+        } else if (len == dumpCmd) {
+            DumpCommand dump = new DumpCommand(pwriter);
+            dump.start();
+            return true;
+        } else if (len == statCmd || len == srvrCmd) {
+            StatCommand stat = new StatCommand(pwriter, len);
+            stat.start();
+            return true;
+        } else if (len == consCmd) {
+            ConsCommand cons = new ConsCommand(pwriter);
+            cons.start();
+            return true;
+        } else if (len == wchpCmd || len == wchcCmd || len == wchsCmd) {
+            WatchCommand wcmd = new WatchCommand(pwriter, len);
+            wcmd.start();
+            return true;
+        } else if (len == mntrCmd) {
+            MonitorCommand mntr = new MonitorCommand(pwriter);
+            mntr.start();
+            return true;
+        }
+        return false;
+    }
+
+    public void receiveMessage(ChannelBuffer message) {
+        try {
+            while(message.readable() && !throttled) {
+                if (bb != null) {
+                    if (LOG.isTraceEnabled()) {
+                        LOG.trace("message readable " + message.readableBytes()
+                                + " bb len " + bb.remaining() + " " + bb);
+                        ByteBuffer dat = bb.duplicate();
+                        dat.flip();
+                        LOG.trace(Long.toHexString(sessionId)
+                                + " bb 0x"
+                                + ChannelBuffers.hexDump(
+                                        ChannelBuffers.copiedBuffer(dat)));
+                    }
+
+                    if (bb.remaining() > message.readableBytes()) {
+                        int newLimit = bb.position() + message.readableBytes();
+                        bb.limit(newLimit);
+                    }
+                    message.readBytes(bb);
+                    bb.limit(bb.capacity());
+
+                    if (LOG.isTraceEnabled()) {
+                        LOG.trace("after readBytes message readable "
+                                + message.readableBytes()
+                                + " bb len " + bb.remaining() + " " + bb);
+                        ByteBuffer dat = bb.duplicate();
+                        dat.flip();
+                        LOG.trace("after readbytes "
+                                + Long.toHexString(sessionId)
+                                + " bb 0x"
+                                + ChannelBuffers.hexDump(
+                                        ChannelBuffers.copiedBuffer(dat)));
+                    }
+                    if (bb.remaining() == 0) {
+                        packetReceived();
+                        bb.flip();
+
+                        ZooKeeperServer zks = this.zkServer;
+                        if (zks == null) {
+                            throw new IOException("ZK down");
+                        }
+                        if (initialized) {
+                            zks.processPacket(this, bb);
+
+                            if (zks.shouldThrottle(outstandingCount.incrementAndGet())) {
+                                disableRecv();
+                            }
+                        } else {
+                            LOG.debug("got conn req request from "
+                                    + getRemoteSocketAddress());
+                            zks.processConnectRequest(this, bb);
+                            initialized = true;
+                        }
+                        bb = null;
+                    }
+                } else {
+                    if (LOG.isTraceEnabled()) {
+                        LOG.trace("message readable "
+                                + message.readableBytes()
+                                + " bblenrem " + bbLen.remaining());
+                        ByteBuffer dat = bbLen.duplicate();
+                        dat.flip();
+                        LOG.trace(Long.toHexString(sessionId)
+                                + " bbLen 0x"
+                                + ChannelBuffers.hexDump(
+                                        ChannelBuffers.copiedBuffer(dat)));
+                    }
+
+                    if (message.readableBytes() < bbLen.remaining()) {
+                        bbLen.limit(bbLen.position() + message.readableBytes());
+                    }
+                    message.readBytes(bbLen);
+                    bbLen.limit(bbLen.capacity());
+                    if (bbLen.remaining() == 0) {
+                        bbLen.flip();
+
+                        if (LOG.isTraceEnabled()) {
+                            LOG.trace(Long.toHexString(sessionId)
+                                    + " bbLen 0x"
+                                    + ChannelBuffers.hexDump(
+                                            ChannelBuffers.copiedBuffer(bbLen)));
+                        }
+                        int len = bbLen.getInt();
+                        if (LOG.isTraceEnabled()) {
+                            LOG.trace(Long.toHexString(sessionId)
+                                    + " bbLen len is " + len);
+                        }
+
+                        bbLen.clear();
+                        if (!initialized) {
+                            if (checkFourLetterWord(channel, message, len)) {
+                                return;
+                            }
+                        }
+                        if (len < 0 || len > BinaryInputArchive.maxBuffer) {
+                            throw new IOException("Len error " + len);
+                        }
+                        bb = ByteBuffer.allocate(len);
+                    }
+                }
+            }
+        } catch(IOException e) {
+            LOG.warn("Closing connection to " + getRemoteSocketAddress(), e);
+            close();
+        }
+    }
+
+    @Override
+    public void disableRecv() {
+        throttled = true;
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Throttling - disabling recv " + this);
+        }
+        channel.setReadable(false).awaitUninterruptibly();
+    }
+
+    @Override
+    public long getOutstandingRequests() {
+        return outstandingCount.longValue();
+    }
+
+    @Override
+    public void setSessionTimeout(int sessionTimeout) {
+        this.sessionTimeout = sessionTimeout;
+    }
+
+    @Override
+    public int getInterestOps() {
+        return channel.getInterestOps();
+    }
+
+    @Override
+    public InetSocketAddress getRemoteSocketAddress() {
+        return (InetSocketAddress)channel.getRemoteAddress();
+    }
+
+    /** Send close connection packet to the client.
+     */
+    public void sendCloseSession() {
+        sendBuffer(ServerCnxnFactory.closeConn);
+    }
+
+    @Override
+    protected ServerStats serverStats() {
+        if (zkServer == null) {
+            return null;
+        }
+        return zkServer.serverStats();
+    }
+
+}

Added: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/NettyServerCnxnFactory.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/NettyServerCnxnFactory.java?rev=986575&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/NettyServerCnxnFactory.java (added)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/NettyServerCnxnFactory.java Wed Aug 18 06:24:08 2010
@@ -0,0 +1,396 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server;
+
+import static org.jboss.netty.buffer.ChannelBuffers.dynamicBuffer;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.Executors;
+
+import org.apache.log4j.Logger;
+import org.jboss.netty.bootstrap.ServerBootstrap;
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.buffer.ChannelBuffers;
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.channel.ChannelPipelineCoverage;
+import org.jboss.netty.channel.ChannelStateEvent;
+import org.jboss.netty.channel.ExceptionEvent;
+import org.jboss.netty.channel.MessageEvent;
+import org.jboss.netty.channel.SimpleChannelHandler;
+import org.jboss.netty.channel.WriteCompletionEvent;
+import org.jboss.netty.channel.group.ChannelGroup;
+import org.jboss.netty.channel.group.DefaultChannelGroup;
+import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
+
+public class NettyServerCnxnFactory extends ServerCnxnFactory {
+    Logger LOG = Logger.getLogger(NettyServerCnxnFactory.class);
+
+    ServerBootstrap bootstrap;
+    Channel parentChannel;
+    ChannelGroup allChannels = new DefaultChannelGroup("zkServerCnxns");
+    HashSet<ServerCnxn> cnxns = new HashSet<ServerCnxn>();
+    HashMap<InetAddress, Set<NettyServerCnxn>> ipMap =
+        new HashMap<InetAddress, Set<NettyServerCnxn>>( );
+    InetSocketAddress localAddress;
+    int maxClientCnxns = 10;
+    
+    /**
+     * This is an inner class since we need to extend SimpleChannelHandler, but
+     * NettyServerCnxnFactory already extends ServerCnxnFactory. By making it inner
+     * this class gets access to the member variables and methods.
+     */
+    @ChannelPipelineCoverage("all")
+    class CnxnChannelHandler extends SimpleChannelHandler {
+
+        @Override
+        public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e)
+            throws Exception
+        {
+            if (LOG.isTraceEnabled()) {
+                LOG.trace("Channel closed " + e);
+            }
+            allChannels.remove(ctx.getChannel());
+        }
+
+        @Override
+        public void channelConnected(ChannelHandlerContext ctx,
+                ChannelStateEvent e) throws Exception
+        {
+            if (LOG.isTraceEnabled()) {
+                LOG.trace("Channel connected " + e);
+            }
+            allChannels.add(ctx.getChannel());
+            NettyServerCnxn cnxn = new NettyServerCnxn(ctx.getChannel(),
+                    zkServer, NettyServerCnxnFactory.this);
+            ctx.setAttachment(cnxn);
+            addCnxn(cnxn);
+        }
+
+        @Override
+        public void channelDisconnected(ChannelHandlerContext ctx,
+                ChannelStateEvent e) throws Exception
+        {
+            if (LOG.isTraceEnabled()) {
+                LOG.trace("Channel disconnected " + e);
+            }
+            NettyServerCnxn cnxn = (NettyServerCnxn) ctx.getAttachment();
+            if (cnxn != null) {
+                if (LOG.isTraceEnabled()) {
+                    LOG.trace("Channel disconnect caused close " + e);
+                }
+                cnxn.close();
+            }
+        }
+
+        @Override
+        public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e)
+            throws Exception
+        {
+            LOG.warn("Exception caught " + e, e.getCause());
+            NettyServerCnxn cnxn = (NettyServerCnxn) ctx.getAttachment();
+            if (cnxn != null) {
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("Closing " + cnxn);
+                    cnxn.close();
+                }
+            }
+        }
+
+        @Override
+        public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
+            throws Exception
+        {
+            if (LOG.isTraceEnabled()) {
+                LOG.trace("message received called " + e.getMessage());
+            }
+            try {
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("New message " + e.toString()
+                            + " from " + ctx.getChannel());
+                }
+                NettyServerCnxn cnxn = (NettyServerCnxn)ctx.getAttachment();
+                synchronized(cnxn) {
+                    processMessage(e, cnxn);
+                }
+            } catch(Exception ex) {
+                LOG.error("Unexpected exception in receive", ex);
+                throw ex;
+            }
+        }
+
+        private void processMessage(MessageEvent e, NettyServerCnxn cnxn) {
+            if (LOG.isDebugEnabled()) {
+                LOG.debug(Long.toHexString(cnxn.sessionId) + " queuedBuffer: "
+                        + cnxn.queuedBuffer);
+            }
+
+            if (e instanceof NettyServerCnxn.ResumeMessageEvent) {
+                LOG.debug("Received ResumeMessageEvent");
+                if (cnxn.queuedBuffer != null) {
+                    if (LOG.isTraceEnabled()) {
+                        LOG.trace("processing queue "
+                                + Long.toHexString(cnxn.sessionId)
+                                + " queuedBuffer 0x"
+                                + ChannelBuffers.hexDump(cnxn.queuedBuffer));
+                    }
+                    cnxn.receiveMessage(cnxn.queuedBuffer);
+                    if (!cnxn.queuedBuffer.readable()) {
+                        LOG.debug("Processed queue - no bytes remaining");
+                        cnxn.queuedBuffer = null;
+                    } else {
+                        LOG.debug("Processed queue - bytes remaining");
+                    }
+                } else {
+                    LOG.debug("queue empty");
+                }
+                cnxn.channel.setReadable(true);
+            } else {
+                ChannelBuffer buf = (ChannelBuffer)e.getMessage();
+                if (LOG.isTraceEnabled()) {
+                    LOG.trace(Long.toHexString(cnxn.sessionId)
+                            + " buf 0x"
+                            + ChannelBuffers.hexDump(buf));
+                }
+                
+                if (cnxn.throttled) {
+                    LOG.debug("Received message while throttled");
+                    // we are throttled, so we need to queue
+                    if (cnxn.queuedBuffer == null) {
+                        LOG.debug("allocating queue");
+                        cnxn.queuedBuffer = dynamicBuffer(buf.readableBytes());
+                    }
+                    cnxn.queuedBuffer.writeBytes(buf);
+                    LOG.debug(Long.toHexString(cnxn.sessionId)
+                            + " queuedBuffer 0x"
+                            + ChannelBuffers.hexDump(cnxn.queuedBuffer));
+                } else {
+                    LOG.debug("not throttled");
+                    if (cnxn.queuedBuffer != null) {
+                        if (LOG.isTraceEnabled()) {
+                            LOG.trace(Long.toHexString(cnxn.sessionId)
+                                    + " queuedBuffer 0x"
+                                    + ChannelBuffers.hexDump(cnxn.queuedBuffer));
+                        }
+                        cnxn.queuedBuffer.writeBytes(buf);
+                        if (LOG.isTraceEnabled()) {
+                            LOG.trace(Long.toHexString(cnxn.sessionId)
+                                    + " queuedBuffer 0x"
+                                    + ChannelBuffers.hexDump(cnxn.queuedBuffer));
+                        }
+
+                        cnxn.receiveMessage(cnxn.queuedBuffer);
+                        if (!cnxn.queuedBuffer.readable()) {
+                            LOG.debug("Processed queue - no bytes remaining");
+                            cnxn.queuedBuffer = null;
+                        } else {
+                            LOG.debug("Processed queue - bytes remaining");
+                        }
+                    } else {
+                        cnxn.receiveMessage(buf);
+                        if (buf.readable()) {
+                            if (LOG.isTraceEnabled()) {
+                                LOG.trace("Before copy " + buf);
+                            }
+                            cnxn.queuedBuffer = dynamicBuffer(buf.readableBytes()); 
+                            cnxn.queuedBuffer.writeBytes(buf);
+                            if (LOG.isTraceEnabled()) {
+                                LOG.trace("Copy is " + cnxn.queuedBuffer);
+                                LOG.trace(Long.toHexString(cnxn.sessionId)
+                                        + " queuedBuffer 0x"
+                                        + ChannelBuffers.hexDump(cnxn.queuedBuffer));
+                            }
+                        }
+                    }
+                }
+            }
+        }
+
+        @Override
+        public void writeComplete(ChannelHandlerContext ctx,
+                WriteCompletionEvent e) throws Exception
+        {
+            if (LOG.isTraceEnabled()) {
+                LOG.trace("write complete " + e);
+            }
+        }
+        
+    }
+    
+    CnxnChannelHandler channelHandler = new CnxnChannelHandler();
+    
+    NettyServerCnxnFactory() {
+        bootstrap = new ServerBootstrap(
+                new NioServerSocketChannelFactory(
+                        Executors.newCachedThreadPool(),
+                        Executors.newCachedThreadPool()));
+        // parent channel
+        bootstrap.setOption("reuseAddress", true);
+        // child channels
+        bootstrap.setOption("child.tcpNoDelay", true);
+        bootstrap.setOption("child.soLinger", 2);
+
+        bootstrap.getPipeline().addLast("servercnxnfactory", channelHandler);
+    }
+    
+    @Override
+    public void closeAll() {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("closeAll()");
+        }
+
+        synchronized (cnxns) {
+            // got to clear all the connections that we have in the selector
+            for (NettyServerCnxn cnxn : cnxns.toArray(new NettyServerCnxn[cnxns.size()])) {
+                try {
+                    cnxn.close();
+                } catch (Exception e) {
+                    LOG.warn("Ignoring exception closing cnxn sessionid 0x"
+                            + Long.toHexString(cnxn.getSessionId()), e);
+                }
+            }
+        }
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("allChannels size:" + allChannels.size()
+                    + " cnxns size:" + cnxns.size());
+        }
+    }
+
+    @Override
+    public void closeSession(long sessionId) {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("closeSession sessionid:0x" + sessionId);
+        }
+
+        synchronized (cnxns) {
+            for (NettyServerCnxn cnxn : cnxns.toArray(new NettyServerCnxn[cnxns.size()])) {
+                if (cnxn.getSessionId() == sessionId) {
+                    try {
+                        cnxn.close();
+                    } catch (Exception e) {
+                        LOG.warn("exception during session close", e);
+                    }
+                    break;
+                }
+            }
+        }
+    }
+
+    @Override
+    public void configure(InetSocketAddress addr, int maxClientCnxns)
+            throws IOException
+    {
+        localAddress = addr;
+        this.maxClientCnxns = maxClientCnxns;
+    }
+
+    /** {@inheritDoc} */
+    public int getMaxClientCnxnsPerHost() {
+        return maxClientCnxns;
+    }
+
+    /** {@inheritDoc} */
+    public void setMaxClientCnxnsPerHost(int max) {
+        maxClientCnxns = max;
+    }
+
+    @Override
+    public int getLocalPort() {
+        return localAddress.getPort();
+    }
+
+    boolean killed;
+    @Override
+    public void join() throws InterruptedException {
+        synchronized(this) {
+            while(!killed) {
+                wait();
+            }
+        }
+    }
+
+    @Override
+    public void shutdown() {
+        LOG.info("shutdown called " + localAddress);
+        
+        // null if factory never started
+        if (parentChannel != null) {
+            parentChannel.close().awaitUninterruptibly();
+            closeAll();
+            allChannels.close().awaitUninterruptibly();
+            bootstrap.releaseExternalResources();
+        }
+
+        if (zkServer != null) {
+            zkServer.shutdown();
+        }
+        synchronized(this) {
+            killed = true;
+            notifyAll();
+        }
+    }
+    
+    @Override
+    public void start() {
+        LOG.info("binding to port " + localAddress);
+        parentChannel = bootstrap.bind(localAddress);
+    }
+
+    @Override
+    public void startup(ZooKeeperServer zks) throws IOException,
+            InterruptedException {
+        start();
+        zks.startdata();
+        zks.startup();
+        setZooKeeperServer(zks);
+    }
+
+    @Override
+    public Iterable<ServerCnxn> getConnections() {
+        return cnxns;
+    }
+
+    @Override
+    public InetSocketAddress getLocalAddress() {
+        return localAddress;
+    }
+
+    private void addCnxn(NettyServerCnxn cnxn) {
+        synchronized (cnxns) {
+            cnxns.add(cnxn);
+            synchronized (ipMap){
+                InetAddress addr =
+                    ((InetSocketAddress)cnxn.channel.getRemoteAddress())
+                        .getAddress();
+                Set<NettyServerCnxn> s = ipMap.get(addr);
+                if (s == null) {
+                    s = new HashSet<NettyServerCnxn>();
+                }
+                s.add(cnxn);
+                ipMap.put(addr,s);
+            }
+        }
+    }
+
+}

Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/PrepRequestProcessor.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/PrepRequestProcessor.java?rev=986575&r1=986574&r2=986575&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/PrepRequestProcessor.java (original)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/PrepRequestProcessor.java Wed Aug 18 06:24:08 2010
@@ -84,7 +84,8 @@ public class PrepRequestProcessor extend
 
     public PrepRequestProcessor(ZooKeeperServer zks,
             RequestProcessor nextProcessor) {
-        super("ProcessThread:" + zks.getClientPort());
+        super("ProcessThread(sid:" + zks.getServerId()
+                + " cport:" + zks.getClientPort() + "):");
         this.nextProcessor = nextProcessor;
         this.zks = zks;
     }
@@ -505,6 +506,7 @@ public class PrepRequestProcessor extend
     }
 
     public void shutdown() {
+        LOG.info("Shutting down");
         submittedRequests.clear();
         submittedRequests.add(Request.requestOfDeath);
         nextProcessor.shutdown();

Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ServerCnxn.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ServerCnxn.java?rev=986575&r1=986574&r2=986575&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ServerCnxn.java (original)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ServerCnxn.java Wed Aug 18 06:24:08 2010
@@ -19,90 +19,414 @@
 package org.apache.zookeeper.server;
 
 import java.io.IOException;
+import java.io.PrintWriter;
+import java.io.StringWriter;
 import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.jute.Record;
 import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.Watcher;
 import org.apache.zookeeper.data.Id;
 import org.apache.zookeeper.proto.ReplyHeader;
+import org.apache.zookeeper.proto.RequestHeader;
 
 /**
  * Interface to a Server connection - represents a connection from a client
  * to the server.
  */
-public interface ServerCnxn extends Watcher {
+public abstract class ServerCnxn implements Stats, Watcher {
     // This is just an arbitrary object to represent requests issued by
     // (aka owned by) this class
     final public static Object me = new Object();
+    
+    protected ArrayList<Id> authInfo = new ArrayList<Id>();
 
-    int getSessionTimeout();
+    abstract int getSessionTimeout();
 
-    void sendResponse(ReplyHeader h, Record r, String tag) throws IOException;
+    abstract void close();
+
+    abstract void sendResponse(ReplyHeader h, Record r, String tag)
+        throws IOException;
 
     /* notify the client the session is closing and close/cleanup socket */
-    void sendCloseSession();
+    abstract void sendCloseSession();
+
+    public abstract void process(WatchedEvent event);
+
+    abstract long getSessionId();
+
+    abstract void setSessionId(long sessionId);
+
+    /** auth info for the cnxn, returns an unmodifyable list */
+    public List<Id> getAuthInfo() {
+        return Collections.unmodifiableList(authInfo);
+    }
+
+    public void addAuthInfo(Id id) {
+        authInfo.add(id);
+    }
+
+    public boolean removeAuthInfo(Id id) {
+        return authInfo.remove(id);
+    }
+
+    abstract void sendBuffer(ByteBuffer closeConn);
+
+    abstract void enableRecv();
+
+    abstract void disableRecv();
+
+    abstract void setSessionTimeout(int sessionTimeout);
+
+    protected static class CloseRequestException extends IOException {
+        private static final long serialVersionUID = -7854505709816442681L;
+
+        public CloseRequestException(String msg) {
+            super(msg);
+        }
+    }
+
+    protected static class EndOfStreamException extends IOException {
+        private static final long serialVersionUID = -8255690282104294178L;
+
+        public EndOfStreamException(String msg) {
+            super(msg);
+        }
+
+        public String toString() {
+            return "EndOfStreamException: " + getMessage();
+        }
+    }
+
+    /*
+     * See <a href="{@docRoot}/../../../docs/zookeeperAdmin.html#sc_zkCommands">
+     * Zk Admin</a>. this link is for all the commands.
+     */
+    protected final static int confCmd =
+        ByteBuffer.wrap("conf".getBytes()).getInt();
 
-    void finishSessionInit(boolean valid);
+    /*
+     * See <a href="{@docRoot}/../../../docs/zookeeperAdmin.html#sc_zkCommands">
+     * Zk Admin</a>. this link is for all the commands.
+     */
+    protected final static int consCmd =
+        ByteBuffer.wrap("cons".getBytes()).getInt();
+
+    /*
+     * See <a href="{@docRoot}/../../../docs/zookeeperAdmin.html#sc_zkCommands">
+     * Zk Admin</a>. this link is for all the commands.
+     */
+    protected final static int crstCmd =
+        ByteBuffer.wrap("crst".getBytes()).getInt();
+
+    /*
+     * See <a href="{@docRoot}/../../../docs/zookeeperAdmin.html#sc_zkCommands">
+     * Zk Admin</a>. this link is for all the commands.
+     */
+    protected final static int dumpCmd =
+        ByteBuffer.wrap("dump".getBytes()).getInt();
+
+    /*
+     * See <a href="{@docRoot}/../../../docs/zookeeperAdmin.html#sc_zkCommands">
+     * Zk Admin</a>. this link is for all the commands.
+     */
+    protected final static int enviCmd =
+        ByteBuffer.wrap("envi".getBytes()).getInt();
+
+    /*
+     * See <a href="{@docRoot}/../../../docs/zookeeperAdmin.html#sc_zkCommands">
+     * Zk Admin</a>. this link is for all the commands.
+     */
+    protected final static int getTraceMaskCmd =
+        ByteBuffer.wrap("gtmk".getBytes()).getInt();
+
+    /*
+     * See <a href="{@docRoot}/../../../docs/zookeeperAdmin.html#sc_zkCommands">
+     * Zk Admin</a>. this link is for all the commands.
+     */
+    protected final static int ruokCmd =
+        ByteBuffer.wrap("ruok".getBytes()).getInt();
+    /*
+     * See <a href="{@docRoot}/../../../docs/zookeeperAdmin.html#sc_zkCommands">
+     * Zk Admin</a>. this link is for all the commands.
+     */
+    protected final static int setTraceMaskCmd =
+        ByteBuffer.wrap("stmk".getBytes()).getInt();
+
+    /*
+     * See <a href="{@docRoot}/../../../docs/zookeeperAdmin.html#sc_zkCommands">
+     * Zk Admin</a>. this link is for all the commands.
+     */
+    protected final static int srvrCmd =
+        ByteBuffer.wrap("srvr".getBytes()).getInt();
 
-    void process(WatchedEvent event);
+    /*
+     * See <a href="{@docRoot}/../../../docs/zookeeperAdmin.html#sc_zkCommands">
+     * Zk Admin</a>. this link is for all the commands.
+     */
+    protected final static int srstCmd =
+        ByteBuffer.wrap("srst".getBytes()).getInt();
 
-    long getSessionId();
+    /*
+     * See <a href="{@docRoot}/../../../docs/zookeeperAdmin.html#sc_zkCommands">
+     * Zk Admin</a>. this link is for all the commands.
+     */
+    protected final static int statCmd =
+        ByteBuffer.wrap("stat".getBytes()).getInt();
 
-    void setSessionId(long sessionId);
+    /*
+     * See <a href="{@docRoot}/../../../docs/zookeeperAdmin.html#sc_zkCommands">
+     * Zk Admin</a>. this link is for all the commands.
+     */
+    protected final static int wchcCmd =
+        ByteBuffer.wrap("wchc".getBytes()).getInt();
 
-    ArrayList<Id> getAuthInfo();
+    /*
+     * See <a href="{@docRoot}/../../../docs/zookeeperAdmin.html#sc_zkCommands">
+     * Zk Admin</a>. this link is for all the commands.
+     */
+    protected final static int wchpCmd =
+        ByteBuffer.wrap("wchp".getBytes()).getInt();
+
+    /*
+     * See <a href="{@docRoot}/../../../docs/zookeeperAdmin.html#sc_zkCommands">
+     * Zk Admin</a>. this link is for all the commands.
+     */
+    protected final static int wchsCmd =
+        ByteBuffer.wrap("wchs".getBytes()).getInt();
+
+    /*
+     * See <a href="{@docRoot}/../../../docs/zookeeperAdmin.html#sc_zkCommands">
+     * Zk Admin</a>. this link is for all the commands.
+     */
+    protected final static int mntrCmd = ByteBuffer.wrap("mntr".getBytes())
+            .getInt();
 
-    InetSocketAddress getRemoteAddress();
+    protected final static HashMap<Integer, String> cmd2String =
+        new HashMap<Integer, String>();
+
+    // specify all of the commands that are available
+    static {
+        cmd2String.put(confCmd, "conf");
+        cmd2String.put(consCmd, "cons");
+        cmd2String.put(crstCmd, "crst");
+        cmd2String.put(dumpCmd, "dump");
+        cmd2String.put(enviCmd, "envi");
+        cmd2String.put(getTraceMaskCmd, "gtmk");
+        cmd2String.put(ruokCmd, "ruok");
+        cmd2String.put(setTraceMaskCmd, "stmk");
+        cmd2String.put(srstCmd, "srst");
+        cmd2String.put(srvrCmd, "srvr");
+        cmd2String.put(statCmd, "stat");
+        cmd2String.put(wchcCmd, "wchc");
+        cmd2String.put(wchpCmd, "wchp");
+        cmd2String.put(wchsCmd, "wchs");
+        cmd2String.put(mntrCmd, "mntr");
+    }
+
+    protected void packetReceived() {
+        incrPacketsReceived();
+        ServerStats serverStats = serverStats();
+        if (serverStats != null) {
+            serverStats().incrementPacketsReceived();
+        }
+    }
+
+    protected void packetSent() {
+        incrPacketsSent();
+        ServerStats serverStats = serverStats();
+        if (serverStats != null) {
+            serverStats().incrementPacketsSent();
+        }
+    }
+
+    protected abstract ServerStats serverStats();
+    
+    protected final Date established = new Date();
+
+    protected final AtomicLong packetsReceived = new AtomicLong();
+    protected final AtomicLong packetsSent = new AtomicLong();
+
+    protected long minLatency;
+    protected long maxLatency;
+    protected String lastOp;
+    protected long lastCxid;
+    protected long lastZxid;
+    protected long lastResponseTime;
+    protected long lastLatency;
+
+    protected long count;
+    protected long totalLatency;
+
+    public synchronized void resetStats() {
+        packetsReceived.set(0);
+        packetsSent.set(0);
+        minLatency = Long.MAX_VALUE;
+        maxLatency = 0;
+        lastOp = "NA";
+        lastCxid = -1;
+        lastZxid = -1;
+        lastResponseTime = 0;
+        lastLatency = 0;
+
+        count = 0;
+        totalLatency = 0;
+    }
+
+    protected long incrPacketsReceived() {
+        return packetsReceived.incrementAndGet();
+    }
+    
+    protected void incrOutstandingRequests(RequestHeader h) {
+    }
+
+    protected long incrPacketsSent() {
+        return packetsSent.incrementAndGet();
+    }
+
+    protected synchronized void updateStatsForResponse(long cxid, long zxid,
+            String op, long start, long end)
+    {
+        // don't overwrite with "special" xids - we're interested
+        // in the clients last real operation
+        if (cxid >= 0) {
+            lastCxid = cxid;
+        }
+        lastZxid = zxid;
+        lastOp = op;
+        lastResponseTime = end;
+        long elapsed = end - start;
+        lastLatency = elapsed;
+        if (elapsed < minLatency) {
+            minLatency = elapsed;
+        }
+        if (elapsed > maxLatency) {
+            maxLatency = elapsed;
+        }
+        count++;
+        totalLatency += elapsed;
+    }
+
+    public Date getEstablished() {
+        return (Date)established.clone();
+    }
+
+    public abstract long getOutstandingRequests();
+
+    public long getPacketsReceived() {
+        return packetsReceived.longValue();
+    }
+
+    public long getPacketsSent() {
+        return packetsSent.longValue();
+    }
+
+    public synchronized long getMinLatency() {
+        return minLatency == Long.MAX_VALUE ? 0 : minLatency;
+    }
+
+    public synchronized long getAvgLatency() {
+        return count == 0 ? 0 : totalLatency / count;
+    }
+
+    public synchronized long getMaxLatency() {
+        return maxLatency;
+    }
+
+    public synchronized String getLastOperation() {
+        return lastOp;
+    }
+
+    public synchronized long getLastCxid() {
+        return lastCxid;
+    }
+
+    public synchronized long getLastZxid() {
+        return lastZxid;
+    }
+
+    public synchronized long getLastResponseTime() {
+        return lastResponseTime;
+    }
+
+    public synchronized long getLastLatency() {
+        return lastLatency;
+    }
+
+    /**
+     * Prints detailed stats information for the connection.
+     *
+     * @see dumpConnectionInfo(PrintWriter, boolean) for brief stats
+     */
+    @Override
+    public String toString() {
+        StringWriter sw = new StringWriter();
+        PrintWriter pwriter = new PrintWriter(sw);
+        dumpConnectionInfo(pwriter, false);
+        pwriter.flush();
+        pwriter.close();
+        return sw.toString();
+    }
 
+    public abstract InetSocketAddress getRemoteSocketAddress();
+    public abstract int getInterestOps();
+    
     /**
-     * Statistics on the ServerCnxn
+     * Print information about the connection.
+     * @param brief iff true prints brief details, otw full detail
+     * @return information about this connection
      */
-    interface Stats {
-        /** Date/time the connection was established
-         * @since 3.3.0 */
-        Date getEstablished();
-
-        /**
-         * The number of requests that have been submitted but not yet
-         * responded to.
-         */
-        long getOutstandingRequests();
-        /** Number of packets received */
-        long getPacketsReceived();
-        /** Number of packets sent (incl notifications) */
-        long getPacketsSent();
-        /** Min latency in ms
-         * @since 3.3.0 */
-        long getMinLatency();
-        /** Average latency in ms
-         * @since 3.3.0 */
-        long getAvgLatency();
-        /** Max latency in ms
-         * @since 3.3.0 */
-        long getMaxLatency();
-        /** Last operation performed by this connection
-         * @since 3.3.0 */
-        String getLastOperation();
-        /** Last cxid of this connection
-         * @since 3.3.0 */
-        long getLastCxid();
-        /** Last zxid of this connection
-         * @since 3.3.0 */
-        long getLastZxid();
-        /** Last time server sent a response to client on this connection
-         * @since 3.3.0 */
-        long getLastResponseTime();
-        /** Latency of last response to client on this connection in ms
-         * @since 3.3.0 */
-        long getLastLatency();
-
-        /** Reset counters
-         * @since 3.3.0 */
-        void reset();
+    protected synchronized void
+    dumpConnectionInfo(PrintWriter pwriter, boolean brief) {
+        pwriter.print(" ");
+        pwriter.print(getRemoteSocketAddress());
+        pwriter.print("[");
+        int interestOps = getInterestOps();
+        pwriter.print(interestOps == 0 ? "0" : Integer.toHexString(interestOps));
+        pwriter.print("](queued=");
+        pwriter.print(getOutstandingRequests());
+        pwriter.print(",recved=");
+        pwriter.print(getPacketsReceived());
+        pwriter.print(",sent=");
+        pwriter.print(getPacketsSent());
+
+        if (!brief) {
+            long sessionId = getSessionId();
+            if (sessionId != 0) {
+                pwriter.print(",sid=0x");
+                pwriter.print(Long.toHexString(sessionId));
+                pwriter.print(",lop=");
+                pwriter.print(getLastOperation());
+                pwriter.print(",est=");
+                pwriter.print(getEstablished().getTime());
+                pwriter.print(",to=");
+                pwriter.print(getSessionTimeout());
+                long lastCxid = getLastCxid();
+                if (lastCxid >= 0) {
+                    pwriter.print(",lcxid=0x");
+                    pwriter.print(Long.toHexString(lastCxid));
+                }
+                pwriter.print(",lzxid=0x");
+                pwriter.print(Long.toHexString(getLastZxid()));
+                pwriter.print(",lresp=");
+                pwriter.print(getLastResponseTime());
+                pwriter.print(",llat=");
+                pwriter.print(getLastLatency());
+                pwriter.print(",minlat=");
+                pwriter.print(getMinLatency());
+                pwriter.print(",avglat=");
+                pwriter.print(getAvgLatency());
+                pwriter.print(",maxlat=");
+                pwriter.print(getMaxLatency());
+            }
+        }
     }
 
-    Stats getStats();
 }

Added: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ServerCnxnFactory.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ServerCnxnFactory.java?rev=986575&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ServerCnxnFactory.java (added)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ServerCnxnFactory.java Wed Aug 18 06:24:08 2010
@@ -0,0 +1,134 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+
+import javax.management.JMException;
+
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.jmx.MBeanRegistry;
+
+public abstract class ServerCnxnFactory {
+    
+    public static final String ZOOKEEPER_SERVER_CNXN_FACTORY = "zookeeper.serverCnxnFactory";
+
+    public interface PacketProcessor {
+        public void processPacket(ByteBuffer packet, ServerCnxn src);
+    }
+    
+    Logger LOG = Logger.getLogger(ServerCnxnFactory.class);
+
+    /**
+     * The buffer will cause the connection to be close when we do a send.
+     */
+    static final ByteBuffer closeConn = ByteBuffer.allocate(0);
+
+    public abstract int getLocalPort();
+    
+    public abstract Iterable<ServerCnxn> getConnections();
+
+    public abstract void closeSession(long sessionId);
+
+    public abstract void configure(InetSocketAddress addr,
+            int maxClientCnxns) throws IOException;
+
+    /** Maximum number of connections allowed from particular host (ip) */
+    public abstract int getMaxClientCnxnsPerHost();
+
+    /** Maximum number of connections allowed from particular host (ip) */
+    public abstract void setMaxClientCnxnsPerHost(int max);
+
+    public abstract void startup(ZooKeeperServer zkServer)
+        throws IOException, InterruptedException;
+
+    public abstract void join() throws InterruptedException;
+
+    public abstract void shutdown();
+
+    public abstract void start();
+
+    protected ZooKeeperServer zkServer;
+    final public void setZooKeeperServer(ZooKeeperServer zk) {
+        this.zkServer = zk;
+        if (zk != null) {
+            zk.setServerCnxnFactory(this);
+        }
+    }
+
+    public abstract void closeAll();
+    
+    static public ServerCnxnFactory createFactory() throws IOException {
+        String serverCnxnFactoryName =
+            System.getProperty(ZOOKEEPER_SERVER_CNXN_FACTORY);
+        if (serverCnxnFactoryName == null) {
+            serverCnxnFactoryName = NIOServerCnxnFactory.class.getName();
+        }
+        try {
+            return (ServerCnxnFactory) Class.forName(serverCnxnFactoryName)
+                                                .newInstance();
+        } catch (Exception e) {
+            IOException ioe = new IOException("Couldn't instantiate "
+                    + serverCnxnFactoryName);
+            ioe.initCause(e);
+            throw ioe;
+        }
+    }
+    
+    static public ServerCnxnFactory createFactory(int clientPort,
+            int maxClientCnxns) throws IOException
+    {
+        return createFactory(new InetSocketAddress(clientPort), maxClientCnxns);
+    }
+
+    static public ServerCnxnFactory createFactory(InetSocketAddress addr,
+            int maxClientCnxns) throws IOException
+    {
+        ServerCnxnFactory factory = createFactory();
+        factory.configure(addr, maxClientCnxns);
+        return factory;
+    }
+
+    public abstract InetSocketAddress getLocalAddress();
+
+    private HashMap<ServerCnxn, ConnectionBean> connectionBeans = new HashMap<ServerCnxn, ConnectionBean>();
+    public void unregisterConnection(ServerCnxn serverCnxn) {
+        ConnectionBean jmxConnectionBean = connectionBeans.remove(serverCnxn);
+        if (jmxConnectionBean != null){
+            MBeanRegistry.getInstance().unregister(jmxConnectionBean);
+        }
+    }
+    
+    public void registerConnection(ServerCnxn serverCnxn) {
+        if (zkServer != null) {
+            ConnectionBean jmxConnectionBean = new ConnectionBean(serverCnxn, zkServer);
+            try {
+                MBeanRegistry.getInstance().register(jmxConnectionBean, zkServer.jmxServerBean);
+                connectionBeans.put(serverCnxn, jmxConnectionBean);
+            } catch (JMException e) {
+                LOG.warn("Could not register connection", e);
+            }
+        }
+
+    }
+
+}

Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/SessionTrackerImpl.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/SessionTrackerImpl.java?rev=986575&r1=986574&r2=986575&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/SessionTrackerImpl.java (original)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/SessionTrackerImpl.java Wed Aug 18 06:24:08 2010
@@ -202,6 +202,8 @@ public class SessionTrackerImpl extends 
     }
 
     public void shutdown() {
+        LOG.info("Shutting down");
+
         running = false;
         if (LOG.isTraceEnabled()) {
             ZooTrace.logTraceMessage(LOG, ZooTrace.getTextTraceLevel(),

Added: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/Stats.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/Stats.java?rev=986575&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/Stats.java (added)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/Stats.java Wed Aug 18 06:24:08 2010
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server;
+
+import java.util.Date;
+
+/**
+ * Statistics on the ServerCnxn
+ */
+interface Stats {
+    /** Date/time the connection was established
+     * @since 3.3.0 */
+    Date getEstablished();
+
+    /**
+     * The number of requests that have been submitted but not yet
+     * responded to.
+     */
+    long getOutstandingRequests();
+    /** Number of packets received */
+    long getPacketsReceived();
+    /** Number of packets sent (incl notifications) */
+    long getPacketsSent();
+    /** Min latency in ms
+     * @since 3.3.0 */
+    long getMinLatency();
+    /** Average latency in ms
+     * @since 3.3.0 */
+    long getAvgLatency();
+    /** Max latency in ms
+     * @since 3.3.0 */
+    long getMaxLatency();
+    /** Last operation performed by this connection
+     * @since 3.3.0 */
+    String getLastOperation();
+    /** Last cxid of this connection
+     * @since 3.3.0 */
+    long getLastCxid();
+    /** Last zxid of this connection
+     * @since 3.3.0 */
+    long getLastZxid();
+    /** Last time server sent a response to client on this connection
+     * @since 3.3.0 */
+    long getLastResponseTime();
+    /** Latency of last response to client on this connection in ms
+     * @since 3.3.0 */
+    long getLastLatency();
+
+    /** Reset counters
+     * @since 3.3.0 */
+    void resetStats();
+}
\ No newline at end of file

Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/SyncRequestProcessor.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/SyncRequestProcessor.java?rev=986575&r1=986574&r2=986575&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/SyncRequestProcessor.java (original)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/SyncRequestProcessor.java Wed Aug 18 06:24:08 2010
@@ -166,6 +166,7 @@ public class SyncRequestProcessor extend
     }
 
     public void shutdown() {
+        LOG.info("Shutting down");
         queuedRequests.add(requestOfDeath);
         try {
             this.join();