You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by iv...@apache.org on 2012/03/29 14:07:14 UTC

svn commit: r1306798 - in /zookeeper/bookkeeper/trunk: ./ hedwig-server/src/main/java/org/apache/hedwig/server/delivery/ hedwig-server/src/main/java/org/apache/hedwig/server/handlers/ hedwig-server/src/main/java/org/apache/hedwig/server/netty/ hedwig-s...

Author: ivank
Date: Thu Mar 29 12:07:13 2012
New Revision: 1306798

URL: http://svn.apache.org/viewvc?rev=1306798&view=rev
Log:
BOOKKEEPER-97: collect pub/sub/consume statistics on hub server (sijie via ivank)

Added:
    zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/handlers/NettyHandlerBean.java
    zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/handlers/NettyHandlerMXBean.java
    zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/netty/PubSubServerBean.java
    zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/netty/PubSubServerMXBean.java
    zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/netty/ServerStats.java
    zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/ReadAheadCacheBean.java
    zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/ReadAheadCacheMXBean.java
Modified:
    zookeeper/bookkeeper/trunk/CHANGES.txt
    zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/delivery/FIFODeliveryManager.java
    zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/handlers/BaseHandler.java
    zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/handlers/ConsumeHandler.java
    zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/handlers/PublishHandler.java
    zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/handlers/SubscribeHandler.java
    zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/handlers/UnsubscribeHandler.java
    zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/netty/PubSubServer.java
    zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/netty/UmbrellaHandler.java
    zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/ReadAheadCache.java

Modified: zookeeper/bookkeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/CHANGES.txt?rev=1306798&r1=1306797&r2=1306798&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/CHANGES.txt (original)
+++ zookeeper/bookkeeper/trunk/CHANGES.txt Thu Mar 29 12:07:13 2012
@@ -108,6 +108,8 @@ Trunk (unreleased changes)
 
         BOOKKEEPER-96: extends zookeeper JMX to monitor and manage hedwig server (sijie via ivank)
 
+        BOOKKEEPER-97: collect pub/sub/consume statistics on hub server (sijie via ivank)
+
       bookkeeper-benchmark/
 	BOOKKEEPER-158: Move latest benchmarking code into trunk (ivank via fpj)
 

Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/delivery/FIFODeliveryManager.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/delivery/FIFODeliveryManager.java?rev=1306798&r1=1306797&r2=1306798&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/delivery/FIFODeliveryManager.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/delivery/FIFODeliveryManager.java Thu Mar 29 12:07:13 2012
@@ -41,6 +41,7 @@ import org.apache.hedwig.protocol.PubSub
 import org.apache.hedwig.protocol.PubSubProtocol.StatusCode;
 import org.apache.hedwig.server.common.ServerConfiguration;
 import org.apache.hedwig.server.common.UnexpectedError;
+import org.apache.hedwig.server.netty.ServerStats;
 import org.apache.hedwig.server.persistence.Factory;
 import org.apache.hedwig.server.persistence.MapMethods;
 import org.apache.hedwig.server.persistence.PersistenceManager;
@@ -416,6 +417,8 @@ public class FIFODeliveryManager impleme
                 lastSeqIdCommunicatedExternally = lastLocalSeqIdDelivered;
                 moveDeliveryPtrForward(this, prevId, lastLocalSeqIdDelivered);
             }
+            // increment deliveried message
+            ServerStats.getInstance().incrementMessagesDelivered();
             deliverNextMessage();
         }
 

Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/handlers/BaseHandler.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/handlers/BaseHandler.java?rev=1306798&r1=1306797&r2=1306798&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/handlers/BaseHandler.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/handlers/BaseHandler.java Thu Mar 29 12:07:13 2012
@@ -24,6 +24,7 @@ import org.apache.hedwig.exceptions.PubS
 import org.apache.hedwig.protocol.PubSubProtocol.PubSubRequest;
 import org.apache.hedwig.protoextensions.PubSubResponseUtils;
 import org.apache.hedwig.server.common.ServerConfiguration;
+import org.apache.hedwig.server.netty.ServerStats;
 import org.apache.hedwig.server.topics.TopicManager;
 import org.apache.hedwig.util.Callback;
 import org.apache.hedwig.util.HedwigSocketAddress;
@@ -45,6 +46,7 @@ public abstract class BaseHandler implem
             @Override
             public void operationFailed(Object ctx, PubSubException exception) {
                 channel.write(PubSubResponseUtils.getResponseForException(exception, request.getTxnId()));
+                ServerStats.getInstance().getOpStats(request.getType()).incrementFailedOps();
             }
 
             @Override
@@ -52,6 +54,7 @@ public abstract class BaseHandler implem
                 if (!owner.equals(cfg.getServerAddr())) {
                     channel.write(PubSubResponseUtils.getResponseForException(
                                       new ServerNotResponsibleForTopicException(owner.toString()), request.getTxnId()));
+                    ServerStats.getInstance().incrementRequestsRedirect();
                     return;
                 }
                 handleRequestAtOwner(request, channel);

Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/handlers/ConsumeHandler.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/handlers/ConsumeHandler.java?rev=1306798&r1=1306797&r2=1306798&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/handlers/ConsumeHandler.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/handlers/ConsumeHandler.java Thu Mar 29 12:07:13 2012
@@ -21,9 +21,12 @@ import org.jboss.netty.channel.Channel;
 
 import org.apache.hedwig.exceptions.PubSubException;
 import org.apache.hedwig.protocol.PubSubProtocol.ConsumeRequest;
+import org.apache.hedwig.protocol.PubSubProtocol.OperationType;
 import org.apache.hedwig.protocol.PubSubProtocol.PubSubRequest;
 import org.apache.hedwig.server.common.ServerConfiguration;
+import org.apache.hedwig.server.netty.ServerStats;
 import org.apache.hedwig.server.netty.UmbrellaHandler;
+import org.apache.hedwig.server.netty.ServerStats.OpStats;
 import org.apache.hedwig.server.subscriptions.SubscriptionManager;
 import org.apache.hedwig.server.topics.TopicManager;
 import org.apache.hedwig.util.Callback;
@@ -32,13 +35,17 @@ public class ConsumeHandler extends Base
 
     SubscriptionManager sm;
     Callback<Void> noopCallback = new NoopCallback<Void>();
+    final OpStats consumeStats = ServerStats.getInstance().getOpStats(OperationType.CONSUME);
 
     class NoopCallback<T> implements Callback<T> {
         @Override
         public void operationFailed(Object ctx, PubSubException exception) {
+            consumeStats.incrementFailedOps();
         }
 
         public void operationFinished(Object ctx, T resultOfOperation) {
+            // we don't collect consume process time
+            consumeStats.updateLatency(0);
         };
     }
 
@@ -47,6 +54,7 @@ public class ConsumeHandler extends Base
         if (!request.hasConsumeRequest()) {
             UmbrellaHandler.sendErrorResponseToMalformedRequest(channel, request.getTxnId(),
                     "Missing consume request data");
+            consumeStats.incrementFailedOps();
             return;
         }
 

Added: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/handlers/NettyHandlerBean.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/handlers/NettyHandlerBean.java?rev=1306798&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/handlers/NettyHandlerBean.java (added)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/handlers/NettyHandlerBean.java Thu Mar 29 12:07:13 2012
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hedwig.server.handlers;
+
+import java.util.Map;
+
+import org.apache.hedwig.protocol.PubSubProtocol.OperationType;
+import org.apache.hedwig.server.jmx.HedwigMBeanInfo;
+
+public class NettyHandlerBean implements NettyHandlerMXBean, HedwigMBeanInfo {
+
+    Map<OperationType, Handler> handlers;
+    SubscribeHandler subHandler;
+
+    public NettyHandlerBean(Map<OperationType, Handler> handlers) {
+        this.handlers = handlers;
+        subHandler = (SubscribeHandler) handlers.get(OperationType.SUBSCRIBE);
+    }
+
+    @Override
+    public String getName() {
+        return "NettyHandlers";
+    }
+
+    @Override
+    public boolean isHidden() {
+        return false;
+    }
+
+    @Override
+    public int getNumSubscriptionChannels() {
+        return subHandler.sub2Channel.size();
+    }
+
+}

Added: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/handlers/NettyHandlerMXBean.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/handlers/NettyHandlerMXBean.java?rev=1306798&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/handlers/NettyHandlerMXBean.java (added)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/handlers/NettyHandlerMXBean.java Thu Mar 29 12:07:13 2012
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hedwig.server.handlers;
+
+/**
+ * Netty Handler MBean
+ */
+public interface NettyHandlerMXBean {
+
+    /**
+     * @return number of subscription channels
+     */
+    public int getNumSubscriptionChannels();
+
+}

Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/handlers/PublishHandler.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/handlers/PublishHandler.java?rev=1306798&r1=1306797&r2=1306798&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/handlers/PublishHandler.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/handlers/PublishHandler.java Thu Mar 29 12:07:13 2012
@@ -20,9 +20,12 @@ package org.apache.hedwig.server.handler
 import org.jboss.netty.channel.Channel;
 import org.apache.hedwig.exceptions.PubSubException;
 import org.apache.hedwig.protocol.PubSubProtocol.Message;
+import org.apache.hedwig.protocol.PubSubProtocol.OperationType;
 import org.apache.hedwig.protocol.PubSubProtocol.PubSubRequest;
 import org.apache.hedwig.protoextensions.PubSubResponseUtils;
 import org.apache.hedwig.server.common.ServerConfiguration;
+import org.apache.hedwig.server.netty.ServerStats;
+import org.apache.hedwig.server.netty.ServerStats.OpStats;
 import org.apache.hedwig.server.netty.UmbrellaHandler;
 import org.apache.hedwig.server.persistence.PersistRequest;
 import org.apache.hedwig.server.persistence.PersistenceManager;
@@ -32,10 +35,12 @@ import org.apache.hedwig.util.Callback;
 public class PublishHandler extends BaseHandler {
 
     private PersistenceManager persistenceMgr;
+    private final OpStats pubStats;
 
     public PublishHandler(TopicManager topicMgr, PersistenceManager persistenceMgr, ServerConfiguration cfg) {
         super(topicMgr, cfg);
         this.persistenceMgr = persistenceMgr;
+        this.pubStats = ServerStats.getInstance().getOpStats(OperationType.PUBLISH);
     }
 
     @Override
@@ -43,22 +48,26 @@ public class PublishHandler extends Base
         if (!request.hasPublishRequest()) {
             UmbrellaHandler.sendErrorResponseToMalformedRequest(channel, request.getTxnId(),
                     "Missing publish request data");
+            pubStats.incrementFailedOps();
             return;
         }
 
         Message msgToSerialize = Message.newBuilder(request.getPublishRequest().getMsg()).setSrcRegion(
                                      cfg.getMyRegionByteString()).build();
 
+        final long requestTime = System.currentTimeMillis();
         PersistRequest persistRequest = new PersistRequest(request.getTopic(), msgToSerialize,
         new Callback<Long>() {
             @Override
             public void operationFailed(Object ctx, PubSubException exception) {
                 channel.write(PubSubResponseUtils.getResponseForException(exception, request.getTxnId()));
+                pubStats.incrementFailedOps();
             }
 
             @Override
             public void operationFinished(Object ctx, Long resultOfOperation) {
                 channel.write(PubSubResponseUtils.getSuccessResponse(request.getTxnId()));
+                pubStats.updateLatency(System.currentTimeMillis() - requestTime);
             }
         }, null);
 

Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/handlers/SubscribeHandler.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/handlers/SubscribeHandler.java?rev=1306798&r1=1306797&r2=1306798&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/handlers/SubscribeHandler.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/handlers/SubscribeHandler.java Thu Mar 29 12:07:13 2012
@@ -29,6 +29,7 @@ import org.apache.hedwig.client.data.Top
 import org.apache.hedwig.exceptions.PubSubException;
 import org.apache.hedwig.exceptions.PubSubException.ServerNotResponsibleForTopicException;
 import org.apache.hedwig.protocol.PubSubProtocol.MessageSeqId;
+import org.apache.hedwig.protocol.PubSubProtocol.OperationType;
 import org.apache.hedwig.protocol.PubSubProtocol.PubSubRequest;
 import org.apache.hedwig.protocol.PubSubProtocol.SubscribeRequest;
 import org.apache.hedwig.protoextensions.PubSubResponseUtils;
@@ -36,6 +37,8 @@ import org.apache.hedwig.protoextensions
 import org.apache.hedwig.server.common.ServerConfiguration;
 import org.apache.hedwig.server.delivery.ChannelEndPoint;
 import org.apache.hedwig.server.delivery.DeliveryManager;
+import org.apache.hedwig.server.netty.ServerStats;
+import org.apache.hedwig.server.netty.ServerStats.OpStats;
 import org.apache.hedwig.server.netty.UmbrellaHandler;
 import org.apache.hedwig.server.persistence.PersistenceManager;
 import org.apache.hedwig.server.subscriptions.SubscriptionManager;
@@ -51,6 +54,8 @@ public class SubscribeHandler extends Ba
     private SubscriptionManager subMgr;
     ConcurrentHashMap<TopicSubscriber, Channel> sub2Channel;
     ConcurrentHashMap<Channel, TopicSubscriber> channel2sub;
+    // op stats
+    private final OpStats subStats;
 
     public SubscribeHandler(TopicManager topicMgr, DeliveryManager deliveryManager, PersistenceManager persistenceMgr,
                             SubscriptionManager subMgr, ServerConfiguration cfg) {
@@ -60,6 +65,7 @@ public class SubscribeHandler extends Ba
         this.subMgr = subMgr;
         sub2Channel = new ConcurrentHashMap<TopicSubscriber, Channel>();
         channel2sub = new ConcurrentHashMap<Channel, TopicSubscriber>();
+        subStats = ServerStats.getInstance().getOpStats(OperationType.SUBSCRIBE);
     }
 
     public void channelDisconnected(Channel channel) {
@@ -80,6 +86,7 @@ public class SubscribeHandler extends Ba
         if (!request.hasSubscribeRequest()) {
             UmbrellaHandler.sendErrorResponseToMalformedRequest(channel, request.getTxnId(),
                     "Missing subscribe request data");
+            subStats.incrementFailedOps();
             return;
         }
 
@@ -91,6 +98,8 @@ public class SubscribeHandler extends Ba
         } catch (ServerNotResponsibleForTopicException e) {
             channel.write(PubSubResponseUtils.getResponseForException(e, request.getTxnId())).addListener(
                 ChannelFutureListener.CLOSE);
+            subStats.incrementFailedOps();
+            ServerStats.getInstance().incrementRequestsRedirect();
             return;
         }
 
@@ -99,12 +108,14 @@ public class SubscribeHandler extends Ba
 
         MessageSeqId lastSeqIdPublished = MessageSeqId.newBuilder(seqId).setLocalComponent(seqId.getLocalComponent()).build();
 
+        final long requestTime = System.currentTimeMillis();
         subMgr.serveSubscribeRequest(topic, subRequest, lastSeqIdPublished, new Callback<MessageSeqId>() {
 
             @Override
             public void operationFailed(Object ctx, PubSubException exception) {
                 channel.write(PubSubResponseUtils.getResponseForException(exception, request.getTxnId())).addListener(
                     ChannelFutureListener.CLOSE);
+                subStats.incrementFailedOps();
             }
 
             @Override
@@ -119,6 +130,7 @@ public class SubscribeHandler extends Ba
                         // channel got disconnected while we were processing the
                         // subscribe request,
                         // nothing much we can do in this case
+                        subStats.incrementFailedOps();
                         return;
                     }
 
@@ -128,6 +140,7 @@ public class SubscribeHandler extends Ba
                             "subscription for this topic, subscriberId is already being served on a different channel");
                         channel.write(PubSubResponseUtils.getResponseForException(pse, request.getTxnId()))
                         .addListener(ChannelFutureListener.CLOSE);
+                        subStats.incrementFailedOps();
                         return;
                     } else {
                         // channel2sub is just a cache, so we can add to it
@@ -139,6 +152,7 @@ public class SubscribeHandler extends Ba
                 // otherwise the first message might go out before the response
                 // to the subscribe
                 channel.write(PubSubResponseUtils.getSuccessResponse(request.getTxnId()));
+                subStats.updateLatency(System.currentTimeMillis() - requestTime);
 
                 // want to start 1 ahead of the consume ptr
                 MessageSeqId seqIdToStartFrom = MessageSeqId.newBuilder(resultOfOperation).setLocalComponent(

Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/handlers/UnsubscribeHandler.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/handlers/UnsubscribeHandler.java?rev=1306798&r1=1306797&r2=1306798&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/handlers/UnsubscribeHandler.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/handlers/UnsubscribeHandler.java Thu Mar 29 12:07:13 2012
@@ -20,11 +20,14 @@ package org.apache.hedwig.server.handler
 import org.jboss.netty.channel.Channel;
 import com.google.protobuf.ByteString;
 import org.apache.hedwig.exceptions.PubSubException;
+import org.apache.hedwig.protocol.PubSubProtocol.OperationType;
 import org.apache.hedwig.protocol.PubSubProtocol.PubSubRequest;
 import org.apache.hedwig.protocol.PubSubProtocol.UnsubscribeRequest;
 import org.apache.hedwig.protoextensions.PubSubResponseUtils;
 import org.apache.hedwig.server.common.ServerConfiguration;
 import org.apache.hedwig.server.delivery.DeliveryManager;
+import org.apache.hedwig.server.netty.ServerStats;
+import org.apache.hedwig.server.netty.ServerStats.OpStats;
 import org.apache.hedwig.server.netty.UmbrellaHandler;
 import org.apache.hedwig.server.subscriptions.SubscriptionManager;
 import org.apache.hedwig.server.topics.TopicManager;
@@ -33,12 +36,15 @@ import org.apache.hedwig.util.Callback;
 public class UnsubscribeHandler extends BaseHandler {
     SubscriptionManager subMgr;
     DeliveryManager deliveryMgr;
+    // op stats
+    final OpStats unsubStats;
 
     public UnsubscribeHandler(TopicManager tm, ServerConfiguration cfg, SubscriptionManager subMgr,
                               DeliveryManager deliveryMgr) {
         super(tm, cfg);
         this.subMgr = subMgr;
         this.deliveryMgr = deliveryMgr;
+        unsubStats = ServerStats.getInstance().getOpStats(OperationType.UNSUBSCRIBE);
     }
 
     @Override
@@ -46,6 +52,7 @@ public class UnsubscribeHandler extends 
         if (!request.hasUnsubscribeRequest()) {
             UmbrellaHandler.sendErrorResponseToMalformedRequest(channel, request.getTxnId(),
                     "Missing unsubscribe request data");
+            unsubStats.incrementFailedOps();
             return;
         }
 
@@ -53,17 +60,19 @@ public class UnsubscribeHandler extends 
         final ByteString topic = request.getTopic();
         final ByteString subscriberId = unsubRequest.getSubscriberId();
 
+        final long requestTime = System.currentTimeMillis();
         subMgr.unsubscribe(topic, subscriberId, new Callback<Void>() {
             @Override
             public void operationFailed(Object ctx, PubSubException exception) {
                 channel.write(PubSubResponseUtils.getResponseForException(exception, request.getTxnId()));
+                unsubStats.incrementFailedOps();
             }
 
             @Override
             public void operationFinished(Object ctx, Void resultOfOperation) {
                 deliveryMgr.stopServingSubscriber(topic, subscriberId);
                 channel.write(PubSubResponseUtils.getSuccessResponse(request.getTxnId()));
-
+                unsubStats.updateLatency(System.currentTimeMillis() - requestTime);
             }
         }, null);
 

Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/netty/PubSubServer.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/netty/PubSubServer.java?rev=1306798&r1=1306797&r2=1306798&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/netty/PubSubServer.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/netty/PubSubServer.java Thu Mar 29 12:07:13 2012
@@ -58,9 +58,11 @@ import org.apache.hedwig.server.delivery
 import org.apache.hedwig.server.delivery.FIFODeliveryManager;
 import org.apache.hedwig.server.handlers.ConsumeHandler;
 import org.apache.hedwig.server.handlers.Handler;
+import org.apache.hedwig.server.handlers.NettyHandlerBean;
 import org.apache.hedwig.server.handlers.PublishHandler;
 import org.apache.hedwig.server.handlers.SubscribeHandler;
 import org.apache.hedwig.server.handlers.UnsubscribeHandler;
+import org.apache.hedwig.server.jmx.HedwigMBeanRegistry;
 import org.apache.hedwig.server.persistence.BookkeeperPersistenceManager;
 import org.apache.hedwig.server.persistence.LocalDBPersistenceManager;
 import org.apache.hedwig.server.persistence.PersistenceManager;
@@ -102,6 +104,10 @@ public class PubSubServer {
     // we use this to prevent long stack chains from building up in callbacks
     ScheduledExecutorService scheduler;
 
+    // JMX Beans
+    NettyHandlerBean jmxNettyBean;
+    PubSubServerBean jmxServerBean;
+
     protected PersistenceManager instantiatePersistenceManager(TopicManager topicMgr) throws IOException,
         InterruptedException {
 
@@ -248,6 +254,51 @@ public class PubSubServer {
         serverChannelFactory.releaseExternalResources();
         clientChannelFactory.releaseExternalResources();
         scheduler.shutdown();
+
+        // unregister jmx
+        unregisterJMX();
+    }
+
+    protected void registerJMX(Map<OperationType, Handler> handlers) {
+        try {
+            jmxServerBean = new PubSubServerBean();
+            HedwigMBeanRegistry.getInstance().register(jmxServerBean, null);
+            try {
+                jmxNettyBean = new NettyHandlerBean(handlers);
+                HedwigMBeanRegistry.getInstance().register(jmxNettyBean, jmxServerBean);
+            } catch (Exception e) {
+                logger.warn("Failed to register with JMX", e);
+                jmxNettyBean = null;
+            }
+        } catch (Exception e) {
+            logger.warn("Failed to register with JMX", e);
+            jmxServerBean = null;
+        }
+        if (pm instanceof ReadAheadCache) {
+            ((ReadAheadCache)pm).registerJMX(jmxServerBean);
+        }
+    }
+
+    protected void unregisterJMX() {
+        if (pm != null && pm instanceof ReadAheadCache) {
+            ((ReadAheadCache)pm).unregisterJMX();
+        }
+        try {
+            if (jmxNettyBean != null) {
+                HedwigMBeanRegistry.getInstance().unregister(jmxNettyBean);
+            }
+        } catch (Exception e) {
+            logger.warn("Failed to unregister with JMX", e);
+        }
+        try {
+            if (jmxServerBean != null) {
+                HedwigMBeanRegistry.getInstance().unregister(jmxServerBean);
+            }
+        } catch (Exception e) {
+            logger.warn("Failed to unregister with JMX", e);
+        }
+        jmxNettyBean = null;
+        jmxServerBean = null;
     }
 
     /**
@@ -312,6 +363,8 @@ public class PubSubServer {
                     if (conf.isSSLEnabled()) {
                         initializeNetty(new SslServerContextFactory(conf), handlers);
                     }
+                    // register jmx
+                    registerJMX(handlers);
                 } catch (Exception e) {
                     ConcurrencyUtils.put(queue, Either.right(e));
                     return;

Added: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/netty/PubSubServerBean.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/netty/PubSubServerBean.java?rev=1306798&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/netty/PubSubServerBean.java (added)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/netty/PubSubServerBean.java Thu Mar 29 12:07:13 2012
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hedwig.server.netty;
+
+import org.apache.hedwig.server.jmx.HedwigMBeanInfo;
+import org.apache.hedwig.server.netty.ServerStats.OpStatData;
+
+import org.apache.hedwig.protocol.PubSubProtocol.OperationType;
+
+/**
+ * PubSub Server Bean
+ */
+public class PubSubServerBean implements PubSubServerMXBean, HedwigMBeanInfo {
+
+    @Override
+    public String getName() {
+        return "PubSubServer";
+    }
+
+    @Override
+    public boolean isHidden() {
+        return false;
+    }
+
+    @Override
+    public OpStatData getPubStats() {
+        return ServerStats.getInstance().getOpStats(OperationType.PUBLISH).toOpStatData();
+    }
+
+    @Override
+    public OpStatData getSubStats() {
+        return ServerStats.getInstance().getOpStats(OperationType.SUBSCRIBE).toOpStatData();
+    }
+
+    @Override
+    public OpStatData getUnsubStats() {
+        return ServerStats.getInstance().getOpStats(OperationType.UNSUBSCRIBE).toOpStatData();
+    }
+
+    @Override
+    public OpStatData getConsumeStats() {
+        return ServerStats.getInstance().getOpStats(OperationType.CONSUME).toOpStatData();
+    }
+
+    @Override
+    public long getNumRequestsReceived() {
+        return ServerStats.getInstance().getNumRequestsReceived();
+    }
+
+    @Override
+    public long getNumRequestsRedirect() {
+        return ServerStats.getInstance().getNumRequestsRedirect();
+    }
+
+    @Override
+    public long getNumMessagesDelivered() {
+        return ServerStats.getInstance().getNumMessagesDelivered();
+    }
+
+
+}

Added: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/netty/PubSubServerMXBean.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/netty/PubSubServerMXBean.java?rev=1306798&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/netty/PubSubServerMXBean.java (added)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/netty/PubSubServerMXBean.java Thu Mar 29 12:07:13 2012
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hedwig.server.netty;
+
+import org.apache.hedwig.server.netty.ServerStats.OpStatData;
+
+/**
+ * PubSub Server MBean
+ */
+public interface PubSubServerMXBean {
+
+    /**
+     * @return publish stats
+     */
+    public OpStatData getPubStats();
+
+    /**
+     * @return subscription stats
+     */
+    public OpStatData getSubStats();
+
+    /**
+     * @return unsub stats
+     */
+    public OpStatData getUnsubStats();
+
+    /**
+     * @return consume stats
+     */
+    public OpStatData getConsumeStats();
+
+    /**
+     * @return number of requests received
+     */
+    public long getNumRequestsReceived();
+
+    /**
+     * @return number of requests redirect
+     */
+    public long getNumRequestsRedirect();
+
+    /**
+     * @return number of messages delivered
+     */
+    public long getNumMessagesDelivered();
+
+}

Added: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/netty/ServerStats.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/netty/ServerStats.java?rev=1306798&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/netty/ServerStats.java (added)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/netty/ServerStats.java Thu Mar 29 12:07:13 2012
@@ -0,0 +1,190 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hedwig.server.netty;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import java.beans.ConstructorProperties;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.hedwig.protocol.PubSubProtocol.OperationType;
+
+/**
+ * Server Stats
+ */
+public class ServerStats {
+    static ServerStats instance = new ServerStats();
+
+    /**
+     * A read view of stats, also used in CompositeViewData to expose to JMX
+     */
+    public static class OpStatData {
+        private final long maxLatency, minLatency;
+        private final double avgLatency;
+        private final long numSuccessOps, numFailedOps;
+        private final String latencyHist;
+
+        @ConstructorProperties({"maxLatency", "minLatency", "avgLatency",
+                                "numSuccessOps", "numFailedOps", "latencyHist"})
+        public OpStatData(long maxLatency, long minLatency, double avgLatency,
+                          long numSuccessOps, long numFailedOps, String latencyHist) {
+            this.maxLatency = maxLatency;
+            this.minLatency = minLatency == Long.MAX_VALUE ? 0 : minLatency;
+            this.avgLatency = avgLatency;
+            this.numSuccessOps = numSuccessOps;
+            this.numFailedOps = numFailedOps;
+            this.latencyHist = latencyHist;
+        }
+
+        public long getMaxLatency() {
+            return maxLatency;
+        }
+
+        public long getMinLatency() {
+            return minLatency;
+        }
+
+        public double getAvgLatency() {
+            return avgLatency;
+        }
+
+        public long getNumSuccessOps() {
+            return numSuccessOps;
+        }
+
+        public long getNumFailedOps() {
+            return numFailedOps;
+        }
+
+        public String getLatencyHist() {
+            return latencyHist;
+        }
+    }
+
+    /**
+     * Operation Statistics
+     */
+    public static class OpStats {
+        static final int NUM_BUCKETS = 3*9 + 2;
+
+        long maxLatency = 0;
+        long minLatency = Long.MAX_VALUE;
+        double totalLatency = 0.0f;
+        long numSuccessOps = 0;
+        long numFailedOps = 0;
+        long[] latencyBuckets = new long[NUM_BUCKETS];
+
+        OpStats() {}
+
+        /**
+         * Increment number of failed operations
+         */
+        synchronized public void incrementFailedOps() {
+            ++numFailedOps;
+        }
+
+        /**
+         * Update Latency
+         */
+        synchronized public void updateLatency(long latency) {
+            totalLatency += latency;
+            ++numSuccessOps;
+            if (latency < minLatency) {
+                minLatency = latency;
+            }
+            if (latency > maxLatency) {
+                maxLatency = latency;
+            }
+            int bucket;
+            if (latency <= 100) { // less than 100ms
+                bucket = (int)(latency / 10);
+            } else if (latency <= 1000) { // 100ms ~ 1000ms
+                bucket = 1 * 9 + (int)(latency / 100);
+            } else if (latency <= 10000) { // 1s ~ 10s
+                bucket = 2 * 9 + (int)(latency / 1000);
+            } else { // more than 10s
+                bucket = 3 * 9 + 1;
+            }
+            ++latencyBuckets[bucket];
+        }
+
+        public OpStatData toOpStatData() {
+            double avgLatency = numSuccessOps > 0 ? totalLatency / numSuccessOps : 0.0f;
+            StringBuilder sb = new StringBuilder();
+            for (int i=0; i<NUM_BUCKETS; i++) {
+                sb.append(latencyBuckets[i]);
+                if (i != NUM_BUCKETS - 1) {
+                    sb.append(',');
+                }
+            }
+
+            return new OpStatData(maxLatency, minLatency, avgLatency, numSuccessOps, numFailedOps, sb.toString());
+        }
+
+    }
+
+    public static ServerStats getInstance() {
+        return instance;
+    }
+
+    protected ServerStats() {
+        stats = new HashMap<OperationType, OpStats>();
+        for (OperationType type : OperationType.values()) {
+            stats.put(type, new OpStats());
+        }
+    }
+    Map<OperationType, OpStats> stats;
+
+
+    AtomicLong numRequestsReceived = new AtomicLong(0);
+    AtomicLong numRequestsRedirect = new AtomicLong(0);
+    AtomicLong numMessagesDelivered = new AtomicLong(0);
+
+    /**
+     * Stats of operations
+     *
+     * @param type
+     *          Operation Type
+     * @return op stats
+     */
+    public OpStats getOpStats(OperationType type) {
+        return stats.get(type);
+    }
+
+    public void incrementRequestsReceived() {
+        numRequestsReceived.incrementAndGet();
+    }
+
+    public void incrementRequestsRedirect() {
+        numRequestsRedirect.incrementAndGet();
+    }
+
+    public void incrementMessagesDelivered() {
+        numMessagesDelivered.incrementAndGet();
+    }
+
+    public long getNumRequestsReceived() {
+        return numRequestsReceived.get();
+    }
+
+    public long getNumRequestsRedirect() {
+        return numRequestsRedirect.get();
+    }
+
+    public long getNumMessagesDelivered() {
+        return numMessagesDelivered.get();
+    }
+}

Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/netty/UmbrellaHandler.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/netty/UmbrellaHandler.java?rev=1306798&r1=1306797&r2=1306798&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/netty/UmbrellaHandler.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/netty/UmbrellaHandler.java Thu Mar 29 12:07:13 2012
@@ -154,6 +154,7 @@ public class UmbrellaHandler extends Sim
         }
 
         handler.handleRequest(request, channel);
+        ServerStats.getInstance().incrementRequestsReceived();
     }
 
 }

Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/ReadAheadCache.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/ReadAheadCache.java?rev=1306798&r1=1306797&r2=1306798&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/ReadAheadCache.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/ReadAheadCache.java Thu Mar 29 12:07:13 2012
@@ -42,9 +42,13 @@ import org.apache.hedwig.protocol.PubSub
 import org.apache.hedwig.protoextensions.MessageIdUtils;
 import org.apache.hedwig.server.common.ServerConfiguration;
 import org.apache.hedwig.server.common.UnexpectedError;
+import org.apache.hedwig.server.jmx.HedwigJMXService;
+import org.apache.hedwig.server.jmx.HedwigMBeanInfo;
+import org.apache.hedwig.server.jmx.HedwigMBeanRegistry;
+import org.apache.hedwig.server.persistence.ReadAheadCacheBean;
 import org.apache.hedwig.util.Callback;
 
-public class ReadAheadCache implements PersistenceManager, Runnable {
+public class ReadAheadCache implements PersistenceManager, Runnable, HedwigJMXService {
 
     static Logger logger = LoggerFactory.getLogger(ReadAheadCache.class);
 
@@ -106,6 +110,9 @@ public class ReadAheadCache implements P
     // when we want to stop the thread during a PubSubServer shutdown.
     protected boolean keepRunning = true;
 
+    // JMX Beans
+    ReadAheadCacheBean jmxCacheBean = null;
+
     /**
      * Constructor. Starts the cache maintainer thread
      *
@@ -713,4 +720,25 @@ public class ReadAheadCache implements P
         }
     }
 
+    @Override
+    public void registerJMX(HedwigMBeanInfo parent) {
+        try {
+            jmxCacheBean = new ReadAheadCacheBean(this);
+            HedwigMBeanRegistry.getInstance().register(jmxCacheBean, null);
+        } catch (Exception e) {
+            logger.warn("Failed to register readahead cache with JMX", e);
+            jmxCacheBean = null;
+        }
+    }
+
+    @Override
+    public void unregisterJMX() {
+        try {
+            if (jmxCacheBean != null) {
+                HedwigMBeanRegistry.getInstance().unregister(jmxCacheBean);
+            }
+        } catch (Exception e) {
+            logger.warn("Failed to unregister readahead cache with JMX", e);
+        }
+    }
 }

Added: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/ReadAheadCacheBean.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/ReadAheadCacheBean.java?rev=1306798&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/ReadAheadCacheBean.java (added)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/ReadAheadCacheBean.java Thu Mar 29 12:07:13 2012
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hedwig.server.persistence;
+
+import org.apache.hedwig.server.jmx.HedwigMBeanInfo;
+
+/**
+ * Read Ahead Cache Bean
+ */
+public class ReadAheadCacheBean implements ReadAheadCacheMXBean,
+        HedwigMBeanInfo {
+
+    ReadAheadCache cache;
+    public ReadAheadCacheBean(ReadAheadCache cache) {
+        this.cache = cache;
+    }
+
+    @Override
+    public String getName() {
+        return "ReadAheadCache";
+    }
+
+    @Override
+    public boolean isHidden() {
+        return false;
+    }
+
+    @Override
+    public long getMaxCacheSize() {
+        return cache.cfg.getMaximumCacheSize();
+    }
+
+    @Override
+    public long getPresentCacheSize() {
+        return cache.presentCacheSize;
+    }
+
+    @Override
+    public int getNumCachedEntries() {
+        return cache.cache.size();
+    }
+
+    @Override
+    public int getNumPendingCacheRequests() {
+        return cache.requestQueue.size();
+    }
+
+}

Added: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/ReadAheadCacheMXBean.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/ReadAheadCacheMXBean.java?rev=1306798&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/ReadAheadCacheMXBean.java (added)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/ReadAheadCacheMXBean.java Thu Mar 29 12:07:13 2012
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hedwig.server.persistence;
+
+/**
+ * Read Ahead Cache MBean
+ */
+public interface ReadAheadCacheMXBean {
+
+    /**
+     * @return max cache size
+     */
+    public long getMaxCacheSize();
+
+    /**
+     * @return present cache size
+     */
+    public long getPresentCacheSize();
+
+    /**
+     * @return number of cached entries
+     */
+    public int getNumCachedEntries();
+
+    /**
+     * @return number of pending cache requests
+     */
+    public int getNumPendingCacheRequests();
+}