You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gr...@apache.org on 2010/11/08 18:05:09 UTC

svn commit: r1032643 [1/2] - in /qpid/branches/0.5.x-dev/qpid/java: broker/src/main/java/org/apache/qpid/server/ broker/src/main/java/org/apache/qpid/server/configuration/ broker/src/main/java/org/apache/qpid/server/information/management/ broker/src/m...

Author: grkvlt
Date: Mon Nov  8 17:05:08 2010
New Revision: 1032643

URL: http://svn.apache.org/viewvc?rev=1032643&view=rev
Log:
QPID-2932: Add statistics generation for broker message delivery

Added:
    qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/stats/
    qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/stats/StatisticsCounter.java
    qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/stats/StatisticsGatherer.java
    qpid/branches/0.5.x-dev/qpid/java/broker/src/test/java/org/apache/qpid/server/stats/
    qpid/branches/0.5.x-dev/qpid/java/broker/src/test/java/org/apache/qpid/server/stats/StatisticsCounterTest.java
    qpid/branches/0.5.x-dev/qpid/java/systests/derby.log
    qpid/branches/0.5.x-dev/qpid/java/systests/src/main/java/org/apache/qpid/management/jmx/MessageConnectionStatisticsTest.java
    qpid/branches/0.5.x-dev/qpid/java/systests/src/main/java/org/apache/qpid/management/jmx/MessageStatisticsConfigurationTest.java
    qpid/branches/0.5.x-dev/qpid/java/systests/src/main/java/org/apache/qpid/management/jmx/MessageStatisticsReportingTest.java
    qpid/branches/0.5.x-dev/qpid/java/systests/src/main/java/org/apache/qpid/management/jmx/MessageStatisticsTest.java
    qpid/branches/0.5.x-dev/qpid/java/systests/src/main/java/org/apache/qpid/management/jmx/MessageStatisticsTestCase.java
Modified:
    qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java
    qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
    qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/Main.java
    qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java
    qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/information/management/ServerInformationMBean.java
    qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/LogMessages.properties
    qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/management/JMXManagedObjectRegistry.java
    qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
    qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java
    qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java
    qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java
    qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ConfigurationFileApplicationRegistry.java
    qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/IApplicationRegistry.java
    qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
    qpid/branches/0.5.x-dev/qpid/java/broker/src/test/java/org/apache/qpid/server/util/NullApplicationRegistry.java
    qpid/branches/0.5.x-dev/qpid/java/broker/src/test/java/org/apache/qpid/server/util/TestApplicationRegistry.java
    qpid/branches/0.5.x-dev/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedBroker.java
    qpid/branches/0.5.x-dev/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedConnection.java
    qpid/branches/0.5.x-dev/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ServerInformation.java
    qpid/branches/0.5.x-dev/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/connection/ConnectionCloseTest.java
    qpid/branches/0.5.x-dev/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/JMXTestUtils.java

Modified: qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java
URL: http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java?rev=1032643&r1=1032642&r2=1032643&view=diff
==============================================================================
--- qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java (original)
+++ qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java Mon Nov  8 17:05:08 2010
@@ -39,6 +39,7 @@ package org.apache.qpid.server;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 
@@ -57,16 +58,19 @@ import org.apache.qpid.server.exchange.E
 import org.apache.qpid.server.exchange.ExchangeFactory;
 import org.apache.qpid.server.exchange.ExchangeRegistry;
 import org.apache.qpid.server.exchange.ExchangeType;
+import org.apache.qpid.server.logging.actors.CurrentActor;
+import org.apache.qpid.server.logging.actors.ManagementActor;
 import org.apache.qpid.server.management.AMQManagedObject;
 import org.apache.qpid.server.management.ManagedObject;
+import org.apache.qpid.server.protocol.AMQMinaProtocolSession;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.protocol.AMQProtocolSessionMBean;
 import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.queue.AMQQueueFactory;
 import org.apache.qpid.server.queue.AMQQueueMBean;
 import org.apache.qpid.server.queue.QueueRegistry;
 import org.apache.qpid.server.store.MessageStore;
 import org.apache.qpid.server.virtualhost.VirtualHost;
-import org.apache.qpid.server.logging.actors.CurrentActor;
-import org.apache.qpid.server.logging.actors.ManagementActor;
 
 /**
  * This MBean implements the broker management interface and exposes the
@@ -79,6 +83,7 @@ public class AMQBrokerManagerMBean exten
     private final ExchangeRegistry _exchangeRegistry;
     private final ExchangeFactory _exchangeFactory;
     private final MessageStore _messageStore;
+    private final VirtualHost _virtualHost;
 
     private final VirtualHost.VirtualHostMBean _virtualHostMBean;
 
@@ -88,12 +93,12 @@ public class AMQBrokerManagerMBean exten
         super(ManagedBroker.class, ManagedBroker.TYPE);
 
         _virtualHostMBean = virtualHostMBean;
-        VirtualHost virtualHost = virtualHostMBean.getVirtualHost();
+        _virtualHost = virtualHostMBean.getVirtualHost();
 
-        _queueRegistry = virtualHost.getQueueRegistry();
-        _exchangeRegistry = virtualHost.getExchangeRegistry();
-        _messageStore = virtualHost.getMessageStore();
-        _exchangeFactory = virtualHost.getExchangeFactory();
+        _queueRegistry = _virtualHost.getQueueRegistry();
+        _exchangeRegistry = _virtualHost.getExchangeRegistry();
+        _messageStore = _virtualHost.getMessageStore();
+        _exchangeFactory = _virtualHost.getExchangeFactory();
     }
 
     public String getObjectInstanceName()
@@ -348,4 +353,46 @@ public class AMQBrokerManagerMBean exten
     {
         return getObjectNameForSingleInstanceMBean();
     }
-} // End of MBean class
+
+    public void resetStatistics() throws Exception
+    {
+        _virtualHost.getMessageStatistics().reset();
+        _virtualHost.getDataStatistics().reset();
+        
+        Collection<AMQProtocolSession> connections = _virtualHost.getConnectionRegistry().getConnections();
+        for (AMQProtocolSession con : connections)
+        {
+            ((AMQProtocolSessionMBean) ((AMQMinaProtocolSession) con).getManagedObject()).resetStatistics();
+        }
+    }
+
+    public double getPeakMessageRate()
+    {
+        return _virtualHost.getMessageStatistics().getPeak();
+    }
+
+    public double getPeakDataRate()
+    {
+        return _virtualHost.getDataStatistics().getPeak();
+    }
+
+    public double getMessageRate()
+    {
+        return _virtualHost.getMessageStatistics().getRate();
+    }
+
+    public double getDataRate()
+    {
+        return _virtualHost.getDataStatistics().getRate();
+    }
+
+    public long getTotalMessages()
+    {
+        return _virtualHost.getMessageStatistics().getTotal();
+    }
+
+    public long getTotalData()
+    {
+        return _virtualHost.getDataStatistics().getTotal();
+    }
+}

Modified: qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
URL: http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java?rev=1032643&r1=1032642&r2=1032643&view=diff
==============================================================================
--- qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java (original)
+++ qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java Mon Nov  8 17:05:08 2010
@@ -206,6 +206,9 @@ public class AMQChannel
         // check and deliver if header says body length is zero
         if (_currentMessage.allContentReceived())
         {
+            long bodySize = _currentMessage.getContentHeaderBody().bodySize;
+            long timestamp = ((BasicContentHeaderProperties) _currentMessage.getContentHeaderBody().properties).getTimestamp();
+            _session.registerMessageDelivery(bodySize, timestamp);
             try
             {
                 _currentMessage.deliverToQueues();

Modified: qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/Main.java
URL: http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/Main.java?rev=1032643&r1=1032642&r2=1032643&view=diff
==============================================================================
--- qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/Main.java (original)
+++ qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/Main.java Mon Nov  8 17:05:08 2010
@@ -291,7 +291,7 @@ public class Main
             configMBean.register();
 
             ServerInformationMBean sysInfoMBean =
-                    new ServerInformationMBean(QpidProperties.getBuildVersion(), QpidProperties.getReleaseVersion());
+                    new ServerInformationMBean((ApplicationRegistry) ApplicationRegistry.getInstance());
             sysInfoMBean.register();
 
             //fixme .. use QpidProperties.getVersionString when we have fixed the classpath issues

Modified: qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java
URL: http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java?rev=1032643&r1=1032642&r2=1032643&view=diff
==============================================================================
--- qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java (original)
+++ qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java Mon Nov  8 17:05:08 2010
@@ -664,4 +664,34 @@ public class ServerConfiguration impleme
                    getConfig().getLong("housekeeping.expiredMessageCheckPeriod",
                            DEFAULT_HOUSEKEEPING_PERIOD));
     }
+
+    public long getStatisticsSamplePeriod()
+    {
+        return getConfig().getLong("statistics.sample.period", 5000L);
+    }
+
+    public boolean isStatisticsGenerationBrokerEnabled()
+    {
+        return getConfig().getBoolean("statistics.generation.broker", false);
+    }
+
+    public boolean isStatisticsGenerationVirtualhostsEnabled()
+    {
+        return getConfig().getBoolean("statistics.generation.virtualhosts", false);
+    }
+
+    public boolean isStatisticsGenerationConnectionsEnabled()
+    {
+        return getConfig().getBoolean("statistics.generation.connections", false);
+    }
+
+    public long getStatisticsReportingPeriod()
+    {
+        return getConfig().getLong("statistics.reporting.period", 0L);
+    }
+
+    public boolean isStatisticsReportResetEnabled()
+    {
+        return getConfig().getBoolean("statistics.reporting.reset", false);
+    }
 }

Modified: qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/information/management/ServerInformationMBean.java
URL: http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/information/management/ServerInformationMBean.java?rev=1032643&r1=1032642&r2=1032643&view=diff
==============================================================================
--- qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/information/management/ServerInformationMBean.java (original)
+++ qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/information/management/ServerInformationMBean.java Mon Nov  8 17:05:08 2010
@@ -21,12 +21,17 @@
 package org.apache.qpid.server.information.management;
 
 import java.io.IOException;
+import java.util.Collection;
 
+import javax.management.JMException;
+
+import org.apache.qpid.common.QpidProperties;
 import org.apache.qpid.management.common.mbeans.ServerInformation;
 import org.apache.qpid.management.common.mbeans.annotations.MBeanDescription;
+import org.apache.qpid.server.AMQBrokerManagerMBean;
 import org.apache.qpid.server.management.AMQManagedObject;
-
-import javax.management.JMException;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.virtualhost.VirtualHost;
 
 /** MBean class for the ServerInformationMBean. */
 @MBeanDescription("Server Information Interface")
@@ -34,12 +39,15 @@ public class ServerInformationMBean exte
 {
     private String buildVersion;
     private String productVersion;
+    private ApplicationRegistry registry;
     
-    public ServerInformationMBean(String buildVersion, String productVersion) throws JMException
+    public ServerInformationMBean(ApplicationRegistry applicationRegistry) throws JMException
     {
         super(ServerInformation.class, ServerInformation.TYPE);
-        this.buildVersion = buildVersion;
-        this.productVersion = productVersion;
+
+        registry = applicationRegistry;
+        buildVersion = QpidProperties.getBuildVersion();
+        productVersion = QpidProperties.getReleaseVersion();
     }
 
     public String getObjectInstanceName()
@@ -67,5 +75,45 @@ public class ServerInformationMBean exte
         return productVersion;
     }
 
-    
+    public void resetStatistics() throws Exception
+    {
+        registry.getDataStatistics().reset();
+        registry.getMessageStatistics().reset();
+        
+        Collection<VirtualHost> virtualhosts = registry.getVirtualHostRegistry().getVirtualHosts();
+        for (VirtualHost vhost : virtualhosts)
+        {
+            ((AMQBrokerManagerMBean) vhost.getBrokerMBean()).resetStatistics();
+        }
+    }
+
+    public double getPeakMessageRate()
+    {
+        return registry.getMessageStatistics().getPeak();
+    }
+
+    public double getPeakDataRate()
+    {
+        return registry.getDataStatistics().getPeak();
+    }
+
+    public double getMessageRate()
+    {
+        return registry.getMessageStatistics().getRate();
+    }
+
+    public double getDataRate()
+    {
+        return registry.getDataStatistics().getRate();
+    }
+
+    public long getTotalMessages()
+    {
+        return registry.getMessageStatistics().getTotal();
+    }
+
+    public long getTotalData()
+    {
+        return registry.getDataStatistics().getTotal();
+    }
 }

Modified: qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/LogMessages.properties
URL: http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/LogMessages.properties?rev=1032643&r1=1032642&r2=1032643&view=diff
==============================================================================
--- qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/LogMessages.properties (original)
+++ qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/LogMessages.properties Mon Nov  8 17:05:08 2010
@@ -222,6 +222,8 @@ BRK_STOPPED = BRK-1005 : Stopped
 BRK_CONFIG = BRK-1006 : Using configuration : {0}
 # 0 - path
 BRK_LOG_CONFIG = BRK-1007 : Using logging configuration : {0}
+BRK_STATS_DATA = BRK-1008 : {0,number,#.###} kB/s, {1,number,#} bytes
+BRK_STATS_MSGS = BRK-1009 : {0,number,#.###} msg/s, {1,number,#} msgs
 
 #ManagementConsole
 MNG_STARTUP = MNG-1001 : Startup
@@ -243,6 +245,8 @@ MNG_CLOSE = MNG-1008 : Close
 # 0 - name
 VHT_CREATED = VHT-1001 : Created : {0}
 VHT_CLOSED = VHT-1002 : Closed
+VHT_STATS_DATA = VHT-1003 : {0} : {1,number,#.###} kB/s, {2,number,#} bytes
+VHT_STATS_MSGS = VHT-1004 : {0} : {1,number,#.###} msg/s, {2,number,#} msgs
 
 #MessageStore
 # 0 - name

Modified: qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/management/JMXManagedObjectRegistry.java
URL: http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/management/JMXManagedObjectRegistry.java?rev=1032643&r1=1032642&r2=1032643&view=diff
==============================================================================
--- qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/management/JMXManagedObjectRegistry.java (original)
+++ qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/management/JMXManagedObjectRegistry.java Mon Nov  8 17:05:08 2010
@@ -98,8 +98,9 @@ public class JMXManagedObjectRegistry im
 
     public void start() throws IOException, ConfigurationException
     {
-
         CurrentActor.get().message(ManagementConsoleMessages.MNG_STARTUP());
+
+        boolean disableCustomSocketFactory = Boolean.getBoolean("qpid.management.disableCustomSocketFactory");
         
         //check if system properties are set to use the JVM's out-of-the-box JMXAgent
         if (areOutOfTheBoxJMXOptionsSet())
@@ -225,8 +226,15 @@ public class JMXManagedObjectRegistry im
          * As a result, only binds made using the object reference will succeed, thus securing it from external change. 
          */
         System.setProperty("java.rmi.server.randomIDs", "true");
-        _rmiRegistry = LocateRegistry.createRegistry(port, null, new CustomRMIServerSocketFactory());
-
+        if (disableCustomSocketFactory)
+        {
+	        _rmiRegistry = LocateRegistry.createRegistry(port, null, null);
+        }
+        else
+        {
+	        _rmiRegistry = LocateRegistry.createRegistry(port, null, new CustomRMIServerSocketFactory());
+        }
+        
         /*
          * We must now create the RMI ConnectorServer manually, as the JMX Factory methods use RMI calls 
          * to bind the ConnectorServer to the registry, which will now fail as for security we have

Modified: qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
URL: http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java?rev=1032643&r1=1032642&r2=1032643&view=diff
==============================================================================
--- qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java (original)
+++ qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java Mon Nov  8 17:05:08 2010
@@ -78,13 +78,16 @@ import org.apache.qpid.server.management
 import org.apache.qpid.server.output.ProtocolOutputConverter;
 import org.apache.qpid.server.output.ProtocolOutputConverterRegistry;
 import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.registry.IApplicationRegistry;
 import org.apache.qpid.server.state.AMQState;
 import org.apache.qpid.server.state.AMQStateManager;
+import org.apache.qpid.server.stats.StatisticsCounter;
+import org.apache.qpid.server.stats.StatisticsGatherer;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
 import org.apache.qpid.transport.Sender;
 
-public class AMQMinaProtocolSession implements AMQProtocolSession, Managable
+public class AMQMinaProtocolSession implements AMQProtocolSession, Managable, StatisticsGatherer
 {
     private static final Logger _logger = Logger.getLogger(AMQProtocolSession.class);
 
@@ -148,6 +151,10 @@ public class AMQMinaProtocolSession impl
     private LogSubject _logSubject;
 
     private final AtomicBoolean _closing = new AtomicBoolean(false);
+    
+    private ApplicationRegistry _registry;
+    private boolean _statisticsEnabled = false;
+    private StatisticsCounter _messageStats, _dataStats;
 
     public ManagedObject getManagedObject()
     {
@@ -162,8 +169,9 @@ public class AMQMinaProtocolSession impl
         session.setAttachment(this);
 
         _codecFactory = codecFactory;
+        _registry = virtualHostRegistry.getApplicationRegistry();
 
-        _actor = new AMQPConnectionActor(this, virtualHostRegistry.getApplicationRegistry().getRootMessageLogger());
+        _actor = new AMQPConnectionActor(this, _registry.getRootMessageLogger());
 
         _actor.message(ConnectionMessages.CON_OPEN(null, null, false, false));
 
@@ -178,8 +186,9 @@ public class AMQMinaProtocolSession impl
         {
             e.printStackTrace();
             throw e;
-
         }
+        
+        initialiseStatistics();
     }
 
     private AMQProtocolSessionMBean createMBean() throws AMQException
@@ -944,4 +953,48 @@ public class AMQMinaProtocolSession impl
     {
        // No-op, interface munging between this and AMQProtocolSession
     }
+
+    public void registerMessageDelivery(long messageSize, long timestamp)
+    {
+        if (isStatisticsEnabled())
+        {
+            _logger.info("=== STATS === register " + messageSize);
+	        _messageStats.registerEvent(1L, timestamp);
+	        _dataStats.registerEvent(messageSize, timestamp);
+	    }
+        _virtualHost.registerMessageDelivery(messageSize, timestamp);
+    }
+    
+    public StatisticsCounter getMessageStatistics()
+    {
+        return _messageStats;
+    }
+    
+    public StatisticsCounter getDataStatistics()
+    {
+        return _dataStats;
+    }
+    
+    public void resetStatistics()
+    {
+        _messageStats.reset();
+        _dataStats.reset();
+    }
+
+    public void initialiseStatistics()
+    {
+        setStatisticsEnabled(_registry.getConfiguration().isStatisticsGenerationConnectionsEnabled());
+        _messageStats = new StatisticsCounter("messages-" + getSessionID());
+        _dataStats = new StatisticsCounter("bytes-" + getSessionID());
+    }
+
+    public boolean isStatisticsEnabled()
+    {
+        return _statisticsEnabled;
+    }
+
+    public void setStatisticsEnabled(boolean enabled)
+    {
+        _statisticsEnabled = enabled;
+    }
 }

Modified: qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java
URL: http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java?rev=1032643&r1=1032642&r2=1032643&view=diff
==============================================================================
--- qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java (original)
+++ qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java Mon Nov  8 17:05:08 2010
@@ -221,4 +221,5 @@ public interface AMQProtocolSession exte
 
     public ProtocolSessionIdentifier getSessionIdentifier();
     
+    public void registerMessageDelivery(long messageSize, long timestamp);
 }

Modified: qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java
URL: http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java?rev=1032643&r1=1032642&r2=1032643&view=diff
==============================================================================
--- qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java (original)
+++ qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java Mon Nov  8 17:05:08 2010
@@ -341,4 +341,49 @@ public class AMQProtocolSessionMBean ext
         _broadcaster.sendNotification(n);
     }
 
-} // End of MBean class
+    public void resetStatistics() throws Exception
+    {
+        _session.getMessageStatistics().reset();
+        _session.getDataStatistics().reset();
+    }
+
+    public double getPeakMessageRate()
+    {
+        return _session.getMessageStatistics().getPeak();
+    }
+
+    public double getPeakDataRate()
+    {
+        return _session.getDataStatistics().getPeak();
+    }
+
+    public double getMessageRate()
+    {
+        return _session.getMessageStatistics().getRate();
+    }
+
+    public double getDataRate()
+    {
+        return _session.getDataStatistics().getRate();
+    }
+
+    public long getTotalMessages()
+    {
+        return _session.getMessageStatistics().getTotal();
+    }
+
+    public long getTotalData()
+    {
+        return _session.getDataStatistics().getTotal();
+    }
+
+    public boolean isStatisticsEnabled()
+    {
+        return _session.isStatisticsEnabled();
+    }
+
+    public void setStatisticsEnabled(boolean enabled)
+    {
+        _session.setStatisticsEnabled(enabled);
+    }
+} 

Modified: qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java
URL: http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java?rev=1032643&r1=1032642&r2=1032643&view=diff
==============================================================================
--- qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java (original)
+++ qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java Mon Nov  8 17:05:08 2010
@@ -23,19 +23,28 @@ package org.apache.qpid.server.registry;
 import java.net.InetSocketAddress;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Timer;
+import java.util.TimerTask;
 
 import org.apache.commons.configuration.ConfigurationException;
 import org.apache.log4j.Logger;
 import org.apache.qpid.server.configuration.ServerConfiguration;
 import org.apache.qpid.server.management.ManagedObjectRegistry;
 import org.apache.qpid.server.plugins.PluginManager;
+import org.apache.qpid.server.protocol.AMQMinaProtocolSession;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
 import org.apache.qpid.server.security.access.ACLManager;
 import org.apache.qpid.server.security.auth.database.PrincipalDatabaseManager;
 import org.apache.qpid.server.security.auth.manager.AuthenticationManager;
+import org.apache.qpid.server.stats.StatisticsCounter;
+import org.apache.qpid.server.stats.StatisticsGatherer;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
 import org.apache.qpid.server.logging.RootMessageLogger;
 import org.apache.qpid.server.logging.messages.BrokerMessages;
+import org.apache.qpid.server.logging.messages.ConnectionMessages;
+import org.apache.qpid.server.logging.messages.VirtualHostMessages;
+import org.apache.qpid.server.logging.actors.AbstractActor;
 import org.apache.qpid.server.logging.actors.CurrentActor;
 import org.apache.qpid.server.transport.QpidAcceptor;
 
@@ -45,7 +54,7 @@ import org.apache.qpid.server.transport.
  * <p/>
  * Subclasses should handle the construction of the "registered objects" such as the exchange registry.
  */
-public abstract class ApplicationRegistry implements IApplicationRegistry
+public abstract class ApplicationRegistry implements IApplicationRegistry, StatisticsGatherer
 {
     protected static final Logger _logger = Logger.getLogger(ApplicationRegistry.class);
 
@@ -75,6 +84,10 @@ public abstract class ApplicationRegistr
 
     protected RootMessageLogger _rootMessageLogger;
 
+    protected Timer _reportingTimer;
+    protected boolean _statisticsEnabled = false;
+    protected StatisticsCounter _messageStats, _dataStats;
+    
     static
     {
         Runtime.getRuntime().addShutdownHook(new Thread(new ShutdownService()));
@@ -220,6 +233,12 @@ public abstract class ApplicationRegistr
             _logger.info("Shutting down ApplicationRegistry:"+this);
         }
 
+        //Stop Statistics Reporting
+        if (_reportingTimer != null)
+        {
+            _reportingTimer.cancel();
+        }
+
         //Stop incomming connections
         unbind();
 
@@ -317,4 +336,109 @@ public abstract class ApplicationRegistr
         return _rootMessageLogger;
     }
     
+    public void initialiseStatisticsReporting()
+    {
+        long report = _configuration.getStatisticsReportingPeriod() * 1000; // convert to ms
+        final boolean broker = _configuration.isStatisticsGenerationBrokerEnabled();
+        final boolean virtualhost = _configuration.isStatisticsGenerationVirtualhostsEnabled();
+        final boolean reset = _configuration.isStatisticsReportResetEnabled();
+        
+        /* add a timer task to report statistics if generation is enabled for broker or virtualhosts */
+        if (report > 0L && (broker || virtualhost))
+        {
+	        _reportingTimer = new Timer("Statistics-Reporting", true);
+	        
+            class StatisticsReportingTask extends TimerTask
+            {
+                Logger _srLogger = Logger.getLogger(StatisticsReportingTask.class);
+                
+                public void run()
+                {
+                    CurrentActor.set(new AbstractActor(ApplicationRegistry.getInstance().getRootMessageLogger()) {
+                        public String getLogMessage()
+                        {
+                            return "[" + Thread.currentThread().getName() + "] ";
+                        }
+                    });
+                    
+                    if (broker)
+                    {
+	                    CurrentActor.get().message(BrokerMessages.BRK_STATS_DATA(_dataStats.getPeak() / 1024.0, _dataStats.getTotal()));
+	                    CurrentActor.get().message(BrokerMessages.BRK_STATS_MSGS(_messageStats.getPeak(), _messageStats.getTotal()));
+                    }
+                    
+                    if (virtualhost)
+                    {
+	                    for (VirtualHost vhost : getVirtualHostRegistry().getVirtualHosts())
+	                    {
+	                        String name = vhost.getName();
+	                        StatisticsCounter data = vhost.getDataStatistics();
+	                        StatisticsCounter messages = vhost.getMessageStatistics();
+	                        
+	                        CurrentActor.get().message(VirtualHostMessages.VHT_STATS_DATA(name, data.getPeak() / 1024.0, data.getTotal()));
+	                        CurrentActor.get().message(VirtualHostMessages.VHT_STATS_MSGS(name, messages.getPeak(), messages.getTotal()));
+	                    }
+                    }
+                    
+                    if (reset)
+                    {
+                        resetStatistics();
+                    }
+
+                    CurrentActor.remove();
+                }
+            }
+
+            _reportingTimer.scheduleAtFixedRate(new StatisticsReportingTask(),
+                                                report / 2,
+                                                report);
+        }
+    }
+    
+    public void registerMessageDelivery(long messageSize, long timestamp)
+    {
+        if (isStatisticsEnabled())
+        {
+	        _messageStats.registerEvent(1L, timestamp);
+	        _dataStats.registerEvent(messageSize, timestamp);
+        }
+    }
+    
+    public StatisticsCounter getMessageStatistics()
+    {
+        return _messageStats;
+    }
+    
+    public StatisticsCounter getDataStatistics()
+    {
+        return _dataStats;
+    }
+    
+    public void resetStatistics()
+    {
+        _messageStats.reset();
+        _dataStats.reset();
+        
+        for (VirtualHost vhost : _virtualHostRegistry.getVirtualHosts())
+        {
+            vhost.resetStatistics();
+        }
+    }
+
+    public void initialiseStatistics()
+    {
+        setStatisticsEnabled(getConfiguration().isStatisticsGenerationBrokerEnabled());
+        _messageStats = new StatisticsCounter("messages");
+        _dataStats = new StatisticsCounter("bytes");
+    }
+
+    public boolean isStatisticsEnabled()
+    {
+        return _statisticsEnabled;
+    }
+
+    public void setStatisticsEnabled(boolean enabled)
+    {
+        _statisticsEnabled = enabled;
+    }
 }

Modified: qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ConfigurationFileApplicationRegistry.java
URL: http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ConfigurationFileApplicationRegistry.java?rev=1032643&r1=1032642&r2=1032643&view=diff
==============================================================================
--- qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ConfigurationFileApplicationRegistry.java (original)
+++ qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ConfigurationFileApplicationRegistry.java Mon Nov  8 17:05:08 2010
@@ -35,6 +35,7 @@ import org.apache.qpid.server.plugins.Pl
 import org.apache.qpid.server.security.access.ACLManager;
 import org.apache.qpid.server.security.auth.database.ConfigurationFilePrincipalDatabaseManager;
 import org.apache.qpid.server.security.auth.manager.PrincipalDatabaseAuthenticationManager;
+import org.apache.qpid.server.stats.StatisticsCounter;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
 
@@ -76,6 +77,9 @@ public class ConfigurationFileApplicatio
         _databaseManager.initialiseManagement(_configuration);
 
         _managedObjectRegistry.start();
+        
+        initialiseStatistics();
+        initialiseStatisticsReporting();
 
         initialiseVirtualHosts();
 

Modified: qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/IApplicationRegistry.java
URL: http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/IApplicationRegistry.java?rev=1032643&r1=1032642&r2=1032643&view=diff
==============================================================================
--- qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/IApplicationRegistry.java (original)
+++ qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/IApplicationRegistry.java Mon Nov  8 17:05:08 2010
@@ -34,8 +34,12 @@ import org.apache.qpid.server.security.a
 import org.apache.qpid.server.security.access.ACLPlugin;
 import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
 import org.apache.qpid.server.logging.RootMessageLogger;
+import org.apache.qpid.server.plugins.PluginManager;
+import org.apache.qpid.server.security.access.ACLManager;
+import org.apache.qpid.server.security.auth.database.PrincipalDatabaseManager;
+import org.apache.qpid.server.security.auth.manager.AuthenticationManager;
 import org.apache.qpid.server.transport.QpidAcceptor;
-import org.apache.mina.common.IoAcceptor;
+import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
 
 public interface IApplicationRegistry
 {
@@ -81,4 +85,5 @@ public interface IApplicationRegistry
      */
     void addAcceptor(InetSocketAddress bindAddress, QpidAcceptor acceptor);
 
+    void initialiseStatisticsReporting();
 }

Added: qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/stats/StatisticsCounter.java
URL: http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/stats/StatisticsCounter.java?rev=1032643&view=auto
==============================================================================
--- qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/stats/StatisticsCounter.java (added)
+++ qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/stats/StatisticsCounter.java Mon Nov  8 17:05:08 2010
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.server.stats;
+
+import java.util.Date;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class collects statistics and counts the total, rate per second and
+ * peak rate per second values for the events that are registered with it. 
+ */
+public class StatisticsCounter
+{
+    private static final Logger _log = LoggerFactory.getLogger(StatisticsCounter.class);
+    
+    private static final String COUNTER = "counter";
+    private static final long DEFAULT_SAMPLE_PERIOD = Long.getLong("qpid.statistics.samplePeriod", 1000L); // 1s
+    private static final boolean _disable = Boolean.getBoolean("qpid.statistics.disable");
+    private static final AtomicLong _counterIds = new AtomicLong(0L);
+    
+    private final AtomicLong _peak = new AtomicLong(0L);
+    private final AtomicLong _total = new AtomicLong(0L);
+    private final AtomicLong _last = new AtomicLong(0L);
+    private final AtomicLong _rate = new AtomicLong(0L);
+
+    private long _start;
+    
+    private final long _period;
+    private final String _name;
+
+    public StatisticsCounter()
+    {
+        this(COUNTER);
+    }
+    
+    public StatisticsCounter(String name)
+    {
+        this(name, DEFAULT_SAMPLE_PERIOD);
+    }
+
+    public StatisticsCounter(String name, long period)
+    {
+        _period = period;
+        _name = name + "-" + + _counterIds.incrementAndGet();
+        reset();
+    }
+    
+    public void registerEvent()
+    {
+        registerEvent(1L);
+    }
+
+    public void registerEvent(long value)
+    {
+        registerEvent(value, System.currentTimeMillis());
+    }
+
+    public void registerEvent(long value, long timestamp)
+    {
+        if (_disable)
+        {
+            return;
+        }
+        
+        long thisSample = (timestamp / _period);
+        long lastSample;
+        while (thisSample > (lastSample = _last.get()))
+        {
+            if (_last.compareAndSet(lastSample, thisSample))
+            {
+                _rate.set(0L);
+            }
+        }
+        
+        _total.addAndGet(value);
+        long current = _rate.addAndGet(value);
+        long peak;
+        while (current > (peak = _peak.get()))
+        {
+            _peak.compareAndSet(peak, current);
+        }
+    }
+    
+    /**
+     * Update the current rate and peak - may reset rate to zero if a new
+     * sample period has started.
+     */
+    private void update()
+    {
+        registerEvent(0L, System.currentTimeMillis());
+    }
+
+    /**
+     * Reset 
+     */
+    public void reset()
+    {
+        _peak.set(0L);
+        _rate.set(0L);
+        _total.set(0L);
+        _start = System.currentTimeMillis();
+        _last.set(_start / _period);
+    }
+
+    public double getPeak()
+    {
+        return (double) _peak.get() / ((double) _period / 1000.0d);
+    }
+
+    public double getRate()
+    {
+        update();
+        return (double) _rate.get() / ((double) _period / 1000.0d);
+    }
+
+    public long getTotal()
+    {
+        return _total.get();
+    }
+
+    public long getStart()
+    {
+        return _start;
+    }
+
+    public Date getStartTime()
+    {
+        return new Date(_start);
+    }
+    
+    public String getName()
+    {
+        return _name;
+    }
+    
+    public long getPeriod()
+    {
+        return _period;
+    }
+}

Added: qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/stats/StatisticsGatherer.java
URL: http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/stats/StatisticsGatherer.java?rev=1032643&view=auto
==============================================================================
--- qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/stats/StatisticsGatherer.java (added)
+++ qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/stats/StatisticsGatherer.java Mon Nov  8 17:05:08 2010
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.server.stats;
+
+/**
+ * Statistics gatherer.
+ */
+public interface StatisticsGatherer
+{
+    /**
+     * 
+     * @param period
+     */
+    void initialiseStatistics();
+    
+    /**
+     * 
+     * @param messageSize
+     * @param timestamp
+     */
+    void registerMessageDelivery(long messageSize, long timestamp);
+    
+    /**
+     * 
+     * @return
+     */
+    StatisticsCounter getMessageStatistics();
+    
+    /**
+     * 
+     * @return
+     */
+    StatisticsCounter getDataStatistics();
+    
+    /**
+     * 
+     * @return
+     */
+    void resetStatistics();
+    
+    /**
+     * 
+     * @return
+     */
+    boolean isStatisticsEnabled();
+    
+    /**
+     * 
+     * @param enabled
+     */
+    void setStatisticsEnabled(boolean enabled);
+}

Modified: qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
URL: http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java?rev=1032643&r1=1032642&r2=1032643&view=diff
==============================================================================
--- qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java (original)
+++ qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java Mon Nov  8 17:05:08 2010
@@ -20,6 +20,14 @@
  */
 package org.apache.qpid.server.virtualhost;
 
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Timer;
+import java.util.TimerTask;
+
+import javax.management.NotCompliantMBeanException;
+
 import org.apache.commons.configuration.ConfigurationException;
 import org.apache.log4j.Logger;
 import org.apache.qpid.AMQException;
@@ -28,11 +36,9 @@ import org.apache.qpid.framing.FieldTabl
 import org.apache.qpid.framing.abstraction.ContentChunk;
 import org.apache.qpid.server.AMQBrokerManagerMBean;
 import org.apache.qpid.server.AMQChannel;
-import org.apache.qpid.server.logging.actors.AbstractActor;
-import org.apache.qpid.server.logging.actors.CurrentActor;
-import org.apache.qpid.server.logging.messages.VirtualHostMessages;
 import org.apache.qpid.server.configuration.ExchangeConfiguration;
 import org.apache.qpid.server.configuration.QueueConfiguration;
+import org.apache.qpid.server.configuration.ServerConfiguration;
 import org.apache.qpid.server.configuration.VirtualHostConfiguration;
 import org.apache.qpid.server.connection.ConnectionRegistry;
 import org.apache.qpid.server.connection.IConnectionRegistry;
@@ -41,30 +47,31 @@ import org.apache.qpid.server.exchange.D
 import org.apache.qpid.server.exchange.Exchange;
 import org.apache.qpid.server.exchange.ExchangeFactory;
 import org.apache.qpid.server.exchange.ExchangeRegistry;
+import org.apache.qpid.server.logging.actors.AbstractActor;
+import org.apache.qpid.server.logging.actors.CurrentActor;
+import org.apache.qpid.server.logging.messages.ConnectionMessages;
+import org.apache.qpid.server.logging.messages.VirtualHostMessages;
 import org.apache.qpid.server.management.AMQManagedObject;
 import org.apache.qpid.server.management.ManagedObject;
+import org.apache.qpid.server.protocol.AMQMinaProtocolSession;
 import org.apache.qpid.server.protocol.AMQProtocolSession;
 import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.queue.AMQQueueFactory;
 import org.apache.qpid.server.queue.DefaultQueueRegistry;
-import org.apache.qpid.server.queue.QueueRegistry;
 import org.apache.qpid.server.queue.MessageMetaData;
+import org.apache.qpid.server.queue.QueueRegistry;
 import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.registry.IApplicationRegistry;
 import org.apache.qpid.server.security.access.ACLManager;
 import org.apache.qpid.server.security.access.Accessable;
 import org.apache.qpid.server.security.auth.manager.AuthenticationManager;
 import org.apache.qpid.server.security.auth.manager.PrincipalDatabaseAuthenticationManager;
+import org.apache.qpid.server.stats.StatisticsCounter;
+import org.apache.qpid.server.stats.StatisticsGatherer;
 import org.apache.qpid.server.store.MessageStore;
 import org.apache.qpid.server.store.StoreContext;
 
-import javax.management.NotCompliantMBeanException;
-import java.util.Collections;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Timer;
-import java.util.TimerTask;
-
-public class VirtualHost implements Accessable
+public class VirtualHost implements Accessable, StatisticsGatherer
 {
     private static final Logger _logger = Logger.getLogger(VirtualHost.class);
 
@@ -90,6 +97,10 @@ public class VirtualHost implements Acce
 
     private final Timer _houseKeepingTimer;
     private VirtualHostConfiguration _configuration;
+    
+    private ApplicationRegistry _registry;
+    private boolean _statisticsEnabled = false;
+    private StatisticsCounter _messageStats, _dataStats;
 
     public void setAccessableName(String name)
     {
@@ -163,6 +174,8 @@ public class VirtualHost implements Acce
         {
     		throw new IllegalArgumentException("Illegal name (" + _name + ") for virtualhost.");
         }
+        
+        _registry = (ApplicationRegistry) ApplicationRegistry.getInstance();
 
         _virtualHostMBean = new VirtualHostMBean();
 
@@ -227,11 +240,15 @@ public class VirtualHost implements Acce
 
         _brokerMBean = new AMQBrokerManagerMBean(_virtualHostMBean);
         _brokerMBean.register();
-        initialiseHouseKeeping(hostConfig.getHousekeepingExpiredMessageCheckPeriod());
+        
+        initialiseHouseKeeping();
+        initialiseStatistics();
     }
 
-	private void initialiseHouseKeeping(long period)
+	private void initialiseHouseKeeping()
     {
+	    long period = _configuration.getHousekeepingExpiredMessageCheckPeriod();
+	    
         /* add a timer task to iterate over queues, cleaning expired messages from queues with no consumers */
         if (period > 0L)
         {
@@ -244,7 +261,7 @@ public class VirtualHost implements Acce
                     CurrentActor.set(new AbstractActor(ApplicationRegistry.getInstance().getRootMessageLogger()) {
                         public String getLogMessage()
                         {
-                            return "[" + Thread.currentThread().getName() + "]";
+                            return "[" + Thread.currentThread().getName() + "] ";
                         }
                     });
                     _hkLogger.info("Starting the houseKeeping job");
@@ -472,6 +489,54 @@ public class VirtualHost implements Acce
     {
         return _virtualHostMBean;
     }
+    
+    public void registerMessageDelivery(long messageSize, long timestamp)
+    {
+        if (isStatisticsEnabled())
+        {
+            _messageStats.registerEvent(1L, timestamp);
+            _dataStats.registerEvent(messageSize, timestamp);
+        }
+        _registry.registerMessageDelivery(messageSize, timestamp);
+    }
+    
+    public StatisticsCounter getMessageStatistics()
+    {
+        return _messageStats;
+    }
+    
+    public StatisticsCounter getDataStatistics()
+    {
+        return _dataStats;
+    }
+    
+    public void resetStatistics()
+    {
+        _messageStats.reset();
+        _dataStats.reset();
+        
+        for (AMQProtocolSession session : _connectionRegistry.getConnections())
+        {
+            ((AMQMinaProtocolSession) session).resetStatistics();
+        }
+    }
+
+    public void initialiseStatistics()
+    {
+        setStatisticsEnabled(_registry.getConfiguration().isStatisticsGenerationVirtualhostsEnabled());
+        _messageStats = new StatisticsCounter("messages-" + getName());
+        _dataStats = new StatisticsCounter("bytes-" + getName());
+    }
+
+    public boolean isStatisticsEnabled()
+    {
+        return _statisticsEnabled;
+    }
+
+    public void setStatisticsEnabled(boolean enabled)
+    {
+        _statisticsEnabled = enabled;
+    }
 
     /**
      * Temporary Startup RT class to record the creation of persistent queues / exchanges.

Added: qpid/branches/0.5.x-dev/qpid/java/broker/src/test/java/org/apache/qpid/server/stats/StatisticsCounterTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/broker/src/test/java/org/apache/qpid/server/stats/StatisticsCounterTest.java?rev=1032643&view=auto
==============================================================================
--- qpid/branches/0.5.x-dev/qpid/java/broker/src/test/java/org/apache/qpid/server/stats/StatisticsCounterTest.java (added)
+++ qpid/branches/0.5.x-dev/qpid/java/broker/src/test/java/org/apache/qpid/server/stats/StatisticsCounterTest.java Mon Nov  8 17:05:08 2010
@@ -0,0 +1,135 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.stats;
+
+import junit.framework.TestCase;
+
+/**
+ * Unit tests for the {@link StatisticsCounter} class.
+ */
+public class StatisticsCounterTest extends TestCase
+{
+    /**
+     * Check that statistics counters are created correctly.
+     */
+    public void testCreate()
+    {
+        long before = System.currentTimeMillis();
+        StatisticsCounter counter = new StatisticsCounter("name", 1234L);
+        long after = System.currentTimeMillis();
+        
+        assertTrue(before <= counter.getStart());
+        assertTrue(after >= counter.getStart());
+        assertTrue(counter.getName().startsWith("name-"));
+        assertEquals(1234L, counter.getPeriod());
+    }
+ 
+    /**
+     * Check that totals add up correctly.
+     */
+    public void testTotal()
+    {
+        StatisticsCounter counter = new StatisticsCounter("test", 1000L);
+        long start = counter.getStart();
+        for (int i = 0; i < 100; i++)
+        {
+            counter.registerEvent(i, start + i);
+        }
+        assertEquals(99 * 50, counter.getTotal()); // cf. Gauss
+    }
+ 
+    /**
+     * Test totals add up correctly even when messages are delivered
+     * out-of-order.
+     */
+    public void testTotalOutOfOrder()
+    {
+        StatisticsCounter counter = new StatisticsCounter("test", 1000L);
+        long start = counter.getStart();
+        assertEquals(0, counter.getTotal());
+        counter.registerEvent(10, start + 2500);
+        assertEquals(10, counter.getTotal());
+        counter.registerEvent(20, start + 1500);
+        assertEquals(30, counter.getTotal());
+        counter.registerEvent(10, start + 500);
+        assertEquals(40, counter.getTotal());
+    }
+ 
+    /**
+     * Test that the peak rate is reported correctly.
+     */
+    public void testPeak()
+    {
+        StatisticsCounter counter = new StatisticsCounter("test", 1000L);
+        long start = counter.getStart();
+        assertEquals(0.0, counter.getPeak());
+        counter.registerEvent(1000, start + 500);
+        assertEquals(1000.0, counter.getPeak());
+        counter.registerEvent(2000, start + 1500);
+        assertEquals(2000.0, counter.getPeak());
+        counter.registerEvent(1000, start + 2500);
+        assertEquals(2000.0, counter.getPeak());
+    }
+ 
+    /**
+     * Test that peak rate is reported correctly even when messages are
+     * delivered out-of-order.
+     */
+    public void testPeakOutOfOrder() throws Exception
+    {
+        StatisticsCounter counter = new StatisticsCounter("test", 1000L);
+        long start = counter.getStart();
+        assertEquals(0.0, counter.getPeak());
+        counter.registerEvent(1000, start + 2500);
+        assertEquals(1000.0, counter.getPeak());
+        counter.registerEvent(2000, start + 1500);
+        assertEquals(3000.0, counter.getPeak());
+        counter.registerEvent(1000, start + 500);
+        assertEquals(4000.0, counter.getPeak());
+        Thread.sleep(2000);
+        assertEquals(4000.0, counter.getPeak());
+        counter.registerEvent(1000, start + 500);
+        assertEquals(5000.0, counter.getPeak());
+        Thread.sleep(2000);
+        counter.registerEvent(1000);
+        assertEquals(5000.0, counter.getPeak());
+    }
+ 
+    /**
+     * Test the current rate is generated correctly.
+     */
+    public void testRate() throws Exception
+    {
+        StatisticsCounter counter = new StatisticsCounter("test", 1000L);
+        assertEquals(0.0, counter.getRate());
+        Thread.sleep(100);
+        counter.registerEvent(1000);
+        assertEquals(1000.0, counter.getRate());
+        Thread.sleep(1000);
+        counter.registerEvent(2000);
+        assertEquals(2000.0, counter.getRate());
+        Thread.sleep(1000);
+        counter.registerEvent(1000);
+        assertEquals(1000.0, counter.getRate());
+        Thread.sleep(1000);
+        assertEquals(0.0, counter.getRate());
+    }
+}

Modified: qpid/branches/0.5.x-dev/qpid/java/broker/src/test/java/org/apache/qpid/server/util/NullApplicationRegistry.java
URL: http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/broker/src/test/java/org/apache/qpid/server/util/NullApplicationRegistry.java?rev=1032643&r1=1032642&r2=1032643&view=diff
==============================================================================
--- qpid/branches/0.5.x-dev/qpid/java/broker/src/test/java/org/apache/qpid/server/util/NullApplicationRegistry.java (original)
+++ qpid/branches/0.5.x-dev/qpid/java/broker/src/test/java/org/apache/qpid/server/util/NullApplicationRegistry.java Mon Nov  8 17:05:08 2010
@@ -71,6 +71,9 @@ public class NullApplicationRegistry ext
         _accessManager = new ACLManager(_configuration.getSecurityConfiguration(), _pluginManager, AllowAll.FACTORY);
 
         _authenticationManager = new PrincipalDatabaseAuthenticationManager(null, null);
+        
+        initialiseStatistics();
+        initialiseStatisticsReporting();
 
         _managedObjectRegistry = new NoopManagedObjectRegistry();
         _virtualHostRegistry = new VirtualHostRegistry(this);

Modified: qpid/branches/0.5.x-dev/qpid/java/broker/src/test/java/org/apache/qpid/server/util/TestApplicationRegistry.java
URL: http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/broker/src/test/java/org/apache/qpid/server/util/TestApplicationRegistry.java?rev=1032643&r1=1032642&r2=1032643&view=diff
==============================================================================
--- qpid/branches/0.5.x-dev/qpid/java/broker/src/test/java/org/apache/qpid/server/util/TestApplicationRegistry.java (original)
+++ qpid/branches/0.5.x-dev/qpid/java/broker/src/test/java/org/apache/qpid/server/util/TestApplicationRegistry.java Mon Nov  8 17:05:08 2010
@@ -94,6 +94,9 @@ public class TestApplicationRegistry ext
         _managedObjectRegistry = new NoopManagedObjectRegistry();
 
         _messageStore = new TestableMemoryMessageStore();
+        
+        initialiseStatistics();
+        initialiseStatisticsReporting();
 
         _virtualHostRegistry = new VirtualHostRegistry(this);
         

Modified: qpid/branches/0.5.x-dev/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedBroker.java
URL: http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedBroker.java?rev=1032643&r1=1032642&r2=1032643&view=diff
==============================================================================
--- qpid/branches/0.5.x-dev/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedBroker.java (original)
+++ qpid/branches/0.5.x-dev/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedBroker.java Mon Nov  8 17:05:08 2010
@@ -34,9 +34,8 @@ import org.apache.qpid.management.common
 /**
  * The ManagedBroker is the management interface to expose management
  * features of the Broker.
- *
- * @author   Bhupendra Bhardwaj
- * @version  0.1
+ * 
+ * @since Qpid JMX API 1.9
  */
 public interface ManagedBroker
 {
@@ -122,4 +121,62 @@ public interface ManagedBroker
                          impact= MBeanOperationInfo.ACTION)
     void deleteQueue(@MBeanOperationParameter(name= ManagedQueue.TYPE, description="Queue Name")String queueName)
         throws IOException, JMException;
+    
+    /**
+     * Resets all message and data statistics for the virtual host.
+     * 
+     * @since Qpid JMX API 1.9
+     */
+    @MBeanOperation(name="resetStatistics",
+                    description="Resets all message and data statistics for the virtual host",
+                    impact= MBeanOperationInfo.ACTION)
+    void resetStatistics() throws Exception;
+
+    /**
+     * Peak rate of messages per second for the virtual host.
+     * 
+     * @since Qpid JMX API 1.9
+     */
+    @MBeanAttribute(name="peakMessageRate", description=TYPE + " Peak Message Rate")
+    double getPeakMessageRate();
+
+    /**
+     * Peak rate of bytes per second for the virtual host.
+     * 
+     * @since Qpid JMX API 1.9
+     */
+    @MBeanAttribute(name="peakDataRate", description=TYPE + " Peak Data Rate")
+    double getPeakDataRate();
+
+    /**
+     * Rate of messages per second for the virtual host.
+     * 
+     * @since Qpid JMX API 1.9
+     */
+    @MBeanAttribute(name="messageRate", description=TYPE + " Message Rate")
+    double getMessageRate();
+
+    /**
+     * Rate of bytes per second for the virtual host.
+     * 
+     * @since Qpid JMX API 1.9
+     */
+    @MBeanAttribute(name="dataRate", description=TYPE + " Data Rate")
+    double getDataRate();
+
+    /**
+     * Total count of messages for the virtual host.
+     * 
+     * @since Qpid JMX API 1.9
+     */
+    @MBeanAttribute(name="totalMessages", description=TYPE + " Total Message Count")
+    long getTotalMessages();
+
+    /**
+     * Total count of bytes for the virtual host.
+     * 
+     * @since Qpid JMX API 1.9
+     */
+    @MBeanAttribute(name="totalData", description=TYPE + " Total Bytes")
+    long getTotalData();
 }

Modified: qpid/branches/0.5.x-dev/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedConnection.java
URL: http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedConnection.java?rev=1032643&r1=1032642&r2=1032643&view=diff
==============================================================================
--- qpid/branches/0.5.x-dev/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedConnection.java (original)
+++ qpid/branches/0.5.x-dev/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedConnection.java Mon Nov  8 17:05:08 2010
@@ -35,8 +35,8 @@ import org.apache.qpid.management.common
 
 /**
  * The management interface exposed to allow management of Connections.
- * @author   Bhupendra Bhardwaj
- * @version  0.1
+ * 
+ * @since Qpid JMX API 1.9
  */
 public interface ManagedConnection
 {
@@ -139,4 +139,72 @@ public interface ManagedConnection
                     description="Closes this connection and all related channels",
                     impact= MBeanOperationInfo.ACTION)
     void closeConnection() throws Exception;
+ 
+    /**
+     * Resets message and data statistics for this connection.
+     * 
+     * @since Qpid JMX API 1.9
+     */
+    @MBeanOperation(name="resetStatistics",
+                    description="Resets message and data statistics for this connection",
+                    impact= MBeanOperationInfo.ACTION)
+    void resetStatistics() throws Exception;
+
+    /**
+     * Peak rate of messages per second on this connection.
+     * 
+     * @since Qpid JMX API 1.9
+     */
+    @MBeanAttribute(name="peakMessageRate", description=TYPE + " Peak Message Rate")
+    double getPeakMessageRate();
+
+    /**
+     * Peak rate of bytes per second on this connection.
+     * 
+     * @since Qpid JMX API 1.9
+     */
+    @MBeanAttribute(name="peakDataRate", description=TYPE + " Peak Data Rate")
+    double getPeakDataRate();
+
+    /**
+     * Rate of messages per second on this connection.
+     * 
+     * @since Qpid JMX API 1.9
+     */
+    @MBeanAttribute(name="messageRate", description=TYPE + " Message Rate")
+    double getMessageRate();
+
+    /**
+     * Rate of bytes per second on this connection.
+     * 
+     * @since Qpid JMX API 1.9
+     */
+    @MBeanAttribute(name="dataRate", description=TYPE + " Data Rate")
+    double getDataRate();
+
+    /**
+     * Total count of messages on this connection.
+     * 
+     * @since Qpid JMX API 1.9
+     */
+    @MBeanAttribute(name="totalMessages", description=TYPE + " Total Message Count")
+    long getTotalMessages();
+
+    /**
+     * Total count of bytes on this connection.
+     * 
+     * @since Qpid JMX API 1.9
+     */
+    @MBeanAttribute(name="totalData", description=TYPE + " Total Bytes")
+    long getTotalData();
+
+    /**
+     * Is statistics collection enabled for this connection.
+     * 
+     * @since Qpid JMX API 1.9
+     */
+    @MBeanAttribute(name="statisticsEnabled", description=TYPE + " Statistics Enabled")
+    boolean isStatisticsEnabled();
+    
+    void setStatisticsEnabled(boolean enabled);
 }

Modified: qpid/branches/0.5.x-dev/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ServerInformation.java
URL: http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ServerInformation.java?rev=1032643&r1=1032642&r2=1032643&view=diff
==============================================================================
--- qpid/branches/0.5.x-dev/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ServerInformation.java (original)
+++ qpid/branches/0.5.x-dev/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ServerInformation.java Mon Nov  8 17:05:08 2010
@@ -22,11 +22,15 @@ package org.apache.qpid.management.commo
 
 import java.io.IOException;
 
+import javax.management.MBeanOperationInfo;
+
 import org.apache.qpid.management.common.mbeans.annotations.MBeanAttribute;
+import org.apache.qpid.management.common.mbeans.annotations.MBeanOperation;
 
 /**
  * Interface for the ServerInformation MBean
- * @since Qpid JMX API 1.3
+ * 
+ * @since Qpid JMX API 1.9
  */
 public interface ServerInformation
 {
@@ -80,4 +84,62 @@ public interface ServerInformation
     @MBeanAttribute(name="ProductVersion", 
                     description = "The product version string")
     String getProductVersion() throws IOException;
+    
+    /**
+     * Resets all message and data statistics for the broker.
+     * 
+     * @since Qpid JMX API 1.9
+     */
+    @MBeanOperation(name="resetStatistics",
+                    description="Resets all message and data statistics for the broker",
+                    impact= MBeanOperationInfo.ACTION)
+    void resetStatistics() throws Exception;
+
+    /**
+     * Peak rate of messages per second for the broker.
+     * 
+     * @since Qpid JMX API 1.9
+     */
+    @MBeanAttribute(name="peakMessageRate", description=TYPE + " Peak Message Rate")
+    double getPeakMessageRate();
+
+    /**
+     * Peak rate of bytes per second for the broker.
+     * 
+     * @since Qpid JMX API 1.9
+     */
+    @MBeanAttribute(name="peakDataRate", description=TYPE + " Peak Data Rate")
+    double getPeakDataRate();
+
+    /**
+     * Rate of messages per second for the broker.
+     * 
+     * @since Qpid JMX API 1.9
+     */
+    @MBeanAttribute(name="messageRate", description=TYPE + " Message Rate")
+    double getMessageRate();
+
+    /**
+     * Rate of bytes per second for the broker.
+     * 
+     * @since Qpid JMX API 1.9
+     */
+    @MBeanAttribute(name="dataRate", description=TYPE + " Data Rate")
+    double getDataRate();
+
+    /**
+     * Total count of messages for the broker.
+     * 
+     * @since Qpid JMX API 1.9
+     */
+    @MBeanAttribute(name="totalMessages", description=TYPE + " Total Message Count")
+    long getTotalMessages();
+
+    /**
+     * Total count of bytes for the broker.
+     * 
+     * @since Qpid JMX API 1.9
+     */
+    @MBeanAttribute(name="totalData", description=TYPE + " Total Bytes")
+    long getTotalData();
 }

Added: qpid/branches/0.5.x-dev/qpid/java/systests/derby.log
URL: http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/systests/derby.log?rev=1032643&view=auto
==============================================================================
--- qpid/branches/0.5.x-dev/qpid/java/systests/derby.log (added)
+++ qpid/branches/0.5.x-dev/qpid/java/systests/derby.log Mon Nov  8 17:05:08 2010
@@ -0,0 +1,6 @@
+----------------------------------------------------------------
+2010-11-05 16:45:02.145 GMT:
+ Booting Derby version The Apache Software Foundation - Apache Derby - 10.3.2.1 - (599110): instance c013800d-012c-1ced-4a3a-000004b61c38
+on database directory /home/kennedya/workspaces/qpid-0.5/qpid/java/build/work/test/test  
+
+Database Class Loader started - derby.database.classpath=''

Added: qpid/branches/0.5.x-dev/qpid/java/systests/src/main/java/org/apache/qpid/management/jmx/MessageConnectionStatisticsTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/systests/src/main/java/org/apache/qpid/management/jmx/MessageConnectionStatisticsTest.java?rev=1032643&view=auto
==============================================================================
--- qpid/branches/0.5.x-dev/qpid/java/systests/src/main/java/org/apache/qpid/management/jmx/MessageConnectionStatisticsTest.java (added)
+++ qpid/branches/0.5.x-dev/qpid/java/systests/src/main/java/org/apache/qpid/management/jmx/MessageConnectionStatisticsTest.java Mon Nov  8 17:05:08 2010
@@ -0,0 +1,94 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.management.jmx;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import javax.jms.Connection;
+
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.management.common.mbeans.ManagedBroker;
+import org.apache.qpid.management.common.mbeans.ManagedConnection;
+
+/**
+ * Test enabling generation of message statistics on a per-connection basis.
+ */
+public class MessageConnectionStatisticsTest extends MessageStatisticsTestCase
+{
+    public void configureStatistics() throws Exception
+    {
+        // no statistics generation configured
+    }
+
+    /**
+     * Test statistics on a single connection
+     */
+    public void testEnablingStatisticsPerConnection() throws Exception
+    {
+        ManagedBroker vhost = _jmxUtils.getManagedBroker("test");
+        
+        sendUsing(_test, 5, 200);
+        
+        List<String> addresses = new ArrayList<String>();
+        for (ManagedConnection mc : _jmxUtils.getManagedConnections("test"))
+        {
+	        assertEquals("Incorrect connection total", 0,  mc.getTotalMessages());
+	        assertEquals("Incorrect connection data", 0, mc.getTotalData());
+            
+            addresses.add(mc.getRemoteAddress());
+        }
+        assertEquals("Incorrect active connection data", 0, vhost.getTotalData());
+        assertEquals("Incorrect active connection data", 0, vhost.getTotalMessages());
+        
+        Connection test = new AMQConnection(_brokerUrl, USER, USER, "clientid", "test");
+        test.start();
+        for (ManagedConnection mc : _jmxUtils.getManagedConnections("test"))
+        {
+            if (addresses.contains(mc.getRemoteAddress()))
+            {
+                continue;
+            }
+            mc.setStatisticsEnabled(true);
+	        assertEquals("Incorrect connection total", 0,  mc.getTotalMessages());
+	        assertEquals("Incorrect connection data", 0, mc.getTotalData());
+        }
+        
+        sendUsing(test, 5, 200);
+        sendUsing(_test, 5, 200);
+        
+        for (ManagedConnection mc : _jmxUtils.getManagedConnections("test"))
+        {
+            if (addresses.contains(mc.getRemoteAddress()))
+            {
+		        assertEquals("Incorrect connection total", 0,  mc.getTotalMessages());
+		        assertEquals("Incorrect connection data", 0, mc.getTotalData());
+            }
+            else
+            {
+		        assertEquals("Incorrect connection total", 5,  mc.getTotalMessages());
+		        assertEquals("Incorrect connection data", 1000, mc.getTotalData());
+            }
+        }
+        assertEquals("Incorrect active connection data", 0, vhost.getTotalData());
+        assertEquals("Incorrect active connection data", 0, vhost.getTotalMessages());
+    }
+}

Added: qpid/branches/0.5.x-dev/qpid/java/systests/src/main/java/org/apache/qpid/management/jmx/MessageStatisticsConfigurationTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/systests/src/main/java/org/apache/qpid/management/jmx/MessageStatisticsConfigurationTest.java?rev=1032643&view=auto
==============================================================================
--- qpid/branches/0.5.x-dev/qpid/java/systests/src/main/java/org/apache/qpid/management/jmx/MessageStatisticsConfigurationTest.java (added)
+++ qpid/branches/0.5.x-dev/qpid/java/systests/src/main/java/org/apache/qpid/management/jmx/MessageStatisticsConfigurationTest.java Mon Nov  8 17:05:08 2010
@@ -0,0 +1,162 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.management.jmx;
+
+import org.apache.qpid.management.common.mbeans.ManagedBroker;
+import org.apache.qpid.management.common.mbeans.ManagedConnection;
+
+/**
+ * Test enabling generation of message statistics on a per-connection basis.
+ */
+public class MessageStatisticsConfigurationTest extends MessageStatisticsTestCase
+{
+    public void configureStatistics() throws Exception
+    {
+        setConfigurationProperty("statistics.generation.broker", Boolean.toString(getName().contains("Broker")));
+        setConfigurationProperty("statistics.generation.virtualhosts", Boolean.toString(getName().contains("Virtualhost")));
+        setConfigurationProperty("statistics.generation.connections", Boolean.toString(getName().contains("Connection")));
+    }
+
+    /**
+     * Just broker statistics.
+     */
+    public void testGenerateBrokerStatistics() throws Exception
+    {
+        sendUsing(_test, 5, 200);
+        Thread.sleep(1000);
+        
+        for (ManagedConnection mc : _jmxUtils.getManagedConnections("test"))
+        {
+	        assertEquals("Incorrect connection total", 0,  mc.getTotalMessages());
+	        assertEquals("Incorrect connection data", 0, mc.getTotalData());
+        }
+        
+        ManagedBroker vhost = _jmxUtils.getManagedBroker("test");
+        assertEquals("Incorrect vhost data", 0, vhost.getTotalMessages());
+        assertEquals("Incorrect vhost data", 0, vhost.getTotalData());
+
+        if (!_broker.equals(VM))
+        {
+            assertEquals("Incorrect server total messages", 5, _jmxUtils.getServerInformation().getTotalMessages());
+            assertEquals("Incorrect server total data", 1000, _jmxUtils.getServerInformation().getTotalData());
+        }
+    }
+
+    /**
+     * Just virtualhost statistics.
+     */
+    public void testGenerateVirtualhostStatistics() throws Exception
+    {
+        sendUsing(_test, 5, 200);
+        Thread.sleep(1000);
+        
+        for (ManagedConnection mc : _jmxUtils.getManagedConnections("test"))
+        {
+            assertEquals("Incorrect connection total", 0,  mc.getTotalMessages());
+            assertEquals("Incorrect connection data", 0, mc.getTotalData());
+        }
+        
+        ManagedBroker vhost = _jmxUtils.getManagedBroker("test");
+        assertEquals("Incorrect vhost data", 5, vhost.getTotalMessages());
+        assertEquals("Incorrect vhost data", 1000, vhost.getTotalData());
+
+        if (!_broker.equals(VM))
+        {
+            assertEquals("Incorrect server total messages", 0, _jmxUtils.getServerInformation().getTotalMessages());
+            assertEquals("Incorrect server total data", 0, _jmxUtils.getServerInformation().getTotalData());
+        }
+    }
+
+    /**
+     * Just connection statistics.
+     */
+    public void testGenerateConnectionStatistics() throws Exception
+    {
+        sendUsing(_test, 5, 200);
+        Thread.sleep(1000);
+        
+        for (ManagedConnection mc : _jmxUtils.getManagedConnections("test"))
+        {
+            assertEquals("Incorrect connection total", 5,  mc.getTotalMessages());
+            assertEquals("Incorrect connection data", 1000, mc.getTotalData());
+        }
+        
+        ManagedBroker vhost = _jmxUtils.getManagedBroker("test");
+        assertEquals("Incorrect vhost data", 0, vhost.getTotalMessages());
+        assertEquals("Incorrect vhost data", 0, vhost.getTotalData());
+
+        if (!_broker.equals(VM))
+        {
+            assertEquals("Incorrect server total messages", 0, _jmxUtils.getServerInformation().getTotalMessages());
+            assertEquals("Incorrect server total data", 0, _jmxUtils.getServerInformation().getTotalData());
+        }
+    }
+
+    /**
+     * Both broker and virtualhost statistics.
+     */
+    public void testGenerateBrokerAndVirtualhostStatistics() throws Exception
+    {
+        sendUsing(_test, 5, 200);
+        Thread.sleep(1000);
+        
+        for (ManagedConnection mc : _jmxUtils.getManagedConnections("test"))
+        {
+            assertEquals("Incorrect connection total", 0,  mc.getTotalMessages());
+            assertEquals("Incorrect connection data", 0, mc.getTotalData());
+        }
+        
+        ManagedBroker vhost = _jmxUtils.getManagedBroker("test");
+        assertEquals("Incorrect vhost data", 5, vhost.getTotalMessages());
+        assertEquals("Incorrect vhost data", 1000, vhost.getTotalData());
+
+        if (!_broker.equals(VM))
+        {
+            assertEquals("Incorrect server total messages", 5, _jmxUtils.getServerInformation().getTotalMessages());
+            assertEquals("Incorrect server total data", 1000, _jmxUtils.getServerInformation().getTotalData());
+        }
+    }
+
+    /**
+     * Broker, virtualhost and connection statistics.
+     */
+    public void testGenerateBrokerVirtualhostAndConnectionStatistics() throws Exception
+    {
+        sendUsing(_test, 5, 200);
+        Thread.sleep(1000);
+        
+        for (ManagedConnection mc : _jmxUtils.getManagedConnections("test"))
+        {
+            assertEquals("Incorrect connection total", 5,  mc.getTotalMessages());
+            assertEquals("Incorrect connection data", 1000, mc.getTotalData());
+        }
+        
+        ManagedBroker vhost = _jmxUtils.getManagedBroker("test");
+        assertEquals("Incorrect vhost data", 5, vhost.getTotalMessages());
+        assertEquals("Incorrect vhost data", 1000, vhost.getTotalData());
+
+        if (!_broker.equals(VM))
+        {
+            assertEquals("Incorrect server total messages", 5, _jmxUtils.getServerInformation().getTotalMessages());
+            assertEquals("Incorrect server total data", 1000, _jmxUtils.getServerInformation().getTotalData());
+        }
+    }
+}

Added: qpid/branches/0.5.x-dev/qpid/java/systests/src/main/java/org/apache/qpid/management/jmx/MessageStatisticsReportingTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/systests/src/main/java/org/apache/qpid/management/jmx/MessageStatisticsReportingTest.java?rev=1032643&view=auto
==============================================================================
--- qpid/branches/0.5.x-dev/qpid/java/systests/src/main/java/org/apache/qpid/management/jmx/MessageStatisticsReportingTest.java (added)
+++ qpid/branches/0.5.x-dev/qpid/java/systests/src/main/java/org/apache/qpid/management/jmx/MessageStatisticsReportingTest.java Mon Nov  8 17:05:08 2010
@@ -0,0 +1,90 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.management.jmx;
+
+import java.util.List;
+
+import org.apache.qpid.util.LogMonitor;
+
+/**
+ * Test generation of message statistics reporting.
+ */
+public class MessageStatisticsReportingTest extends MessageStatisticsTestCase
+{
+    protected LogMonitor _monitor;
+    
+    public void configureStatistics() throws Exception
+    {
+        setConfigurationProperty("statistics.generation.broker", "true");
+        setConfigurationProperty("statistics.generation.virtualhosts", "true");
+        
+        if (getName().equals("testEnabledStatisticsReporting"))
+        {
+            setConfigurationProperty("statistics.reporting.period", "10");
+        }
+        
+        _monitor = new LogMonitor(_outputFile);
+    }
+
+    /**
+     * Test enabling reporting.
+     */
+    public void testEnabledStatisticsReporting() throws Exception
+    {
+        sendUsing(_test, 10, 100);
+        sendUsing(_dev, 20, 100);
+        sendUsing(_local, 15, 100);
+        
+        Thread.sleep(10 * 1000); // 15s
+        
+        List<String> brokerStatsData = _monitor.findMatches("BRK-1008");
+        List<String> brokerStatsMessages = _monitor.findMatches("BRK-1009");
+        List<String> vhostStatsData = _monitor.findMatches("VHT-1003");
+        List<String> vhostStatsMessages = _monitor.findMatches("VHT-1004");
+        
+        assertEquals("Incorrect number of broker data stats log messages", 1, brokerStatsData.size());
+        assertEquals("Incorrect number of broker message stats log messages", 1, brokerStatsMessages.size());
+        assertEquals("Incorrect number of virtualhost data stats log messages", 3, vhostStatsData.size());
+        assertEquals("Incorrect number of virtualhost message stats log messages", 3, vhostStatsMessages.size());
+    }
+
+    /**
+     * Test not enabling reporting.
+     */
+    public void testNotEnabledStatisticsReporting() throws Exception
+    {
+        sendUsing(_test, 10, 100);
+        sendUsing(_dev, 20, 100);
+        sendUsing(_local, 15, 100);
+        
+        Thread.sleep(10 * 1000); // 15s
+        
+        List<String> brokerStatsData = _monitor.findMatches("BRK-1008");
+        List<String> brokerStatsMessages = _monitor.findMatches("BRK-1009");
+        List<String> vhostStatsData = _monitor.findMatches("VHT-1003");
+        List<String> vhostStatsMessages = _monitor.findMatches("VHT-1004");
+        
+        assertEquals("Incorrect number of broker data stats log messages", 0, brokerStatsData.size());
+        assertEquals("Incorrect number of broker message stats log messages", 0, brokerStatsMessages.size());
+        assertEquals("Incorrect number of virtualhost data stats log messages", 0, vhostStatsData.size());
+        assertEquals("Incorrect number of virtualhost message stats log messages", 0, vhostStatsMessages.size());
+    }
+}



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org