You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2011/03/15 02:54:18 UTC

svn commit: r1081634 [6/9] - in /qpid/branches/qpid-2920/qpid: ./ bin/ cpp/ cpp/bindings/qpid/ cpp/bindings/qpid/perl/ cpp/bindings/qpid/python/ cpp/bindings/qpid/ruby/ cpp/examples/ cpp/examples/direct/ cpp/examples/failover/ cpp/examples/fanout/ cpp/...

Modified: qpid/branches/qpid-2920/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java?rev=1081634&r1=1081633&r2=1081634&view=diff
==============================================================================
--- qpid/branches/qpid-2920/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java (original)
+++ qpid/branches/qpid-2920/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java Tue Mar 15 01:54:07 2011
@@ -30,7 +30,6 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
-import java.util.Map.Entry;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.CopyOnWriteArraySet;
@@ -92,6 +91,7 @@ import org.apache.qpid.server.output.Pro
 import org.apache.qpid.server.registry.ApplicationRegistry;
 import org.apache.qpid.server.state.AMQState;
 import org.apache.qpid.server.state.AMQStateManager;
+import org.apache.qpid.server.stats.StatisticsCounter;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
 import org.apache.qpid.transport.NetworkDriver;
@@ -172,6 +172,10 @@ public class AMQProtocolEngine implement
     private final UUID _id;
     private final ConfigStore _configStore;
     private long _createTime = System.currentTimeMillis();
+    
+    private ApplicationRegistry _registry;
+    private boolean _statisticsEnabled = false;
+    private StatisticsCounter _messagesDelivered, _dataDelivered, _messagesReceived, _dataReceived;
 
     public ManagedObject getManagedObject()
     {
@@ -195,9 +199,10 @@ public class AMQProtocolEngine implement
         _configStore = virtualHostRegistry.getConfigStore();
         _id = _configStore.createId();
 
-
         _actor.message(ConnectionMessages.OPEN(null, null, false, false));
 
+        _registry = virtualHostRegistry.getApplicationRegistry();
+        initialiseStatistics();
     }
 
     private AMQProtocolSessionMBean createMBean() throws JMException
@@ -1078,19 +1083,6 @@ public class AMQProtocolEngine implement
         return (_clientVersion == null) ? null : _clientVersion.toString();
     }
 
-    public void closeIfLingeringClosedChannels()
-    {
-        for (Entry<Integer, Long>id : _closingChannelsList.entrySet())
-        {
-            if (id.getValue() + 30000 > System.currentTimeMillis())
-            {
-                // We have a channel that we closed 30 seconds ago. Client's dead, kill the connection
-                _logger.error("Closing connection as channel was closed more than 30 seconds ago and no ChannelCloseOk has been processed");
-                closeProtocolSession();
-            }
-        }
-    }
-
     public Boolean isIncoming()
     {
         return true;
@@ -1263,7 +1255,6 @@ public class AMQProtocolEngine implement
 
     public void closeSession(AMQSessionModel session, AMQConstant cause, String message) throws AMQException
     {
-
         closeChannel((Integer)session.getID());
 
         MethodRegistry methodRegistry = getMethodRegistry();
@@ -1274,5 +1265,97 @@ public class AMQProtocolEngine implement
                         0,0);
 
         writeFrame(responseBody.generateFrame((Integer)session.getID()));       
-    }       
+    }
+
+    public void close(AMQConstant cause, String message) throws AMQException
+    {
+        closeConnection(0, new AMQConnectionException(cause, message, 0, 0,
+		                getProtocolOutputConverter().getProtocolMajorVersion(),
+		                getProtocolOutputConverter().getProtocolMinorVersion(),
+		                (Throwable) null), true);
+    }
+
+    public List<AMQSessionModel> getSessionModels()
+    {
+		List<AMQSessionModel> sessions = new ArrayList<AMQSessionModel>(); 
+		for (AMQChannel channel : getChannels())
+		{
+		    sessions.add((AMQSessionModel) channel);
+		}
+		return sessions;
+    }
+
+    public LogSubject getLogSubject()
+    {
+        return _logSubject;
+    }
+
+    public void registerMessageDelivered(long messageSize)
+    {
+        if (isStatisticsEnabled())
+        {
+            _messagesDelivered.registerEvent(1L);
+            _dataDelivered.registerEvent(messageSize);
+        }
+        _virtualHost.registerMessageDelivered(messageSize);
+    }
+
+    public void registerMessageReceived(long messageSize, long timestamp)
+    {
+        if (isStatisticsEnabled())
+        {
+            _messagesReceived.registerEvent(1L, timestamp);
+            _dataReceived.registerEvent(messageSize, timestamp);
+        }
+        _virtualHost.registerMessageReceived(messageSize, timestamp);
+    }
+    
+    public StatisticsCounter getMessageReceiptStatistics()
+    {
+        return _messagesReceived;
+    }
+    
+    public StatisticsCounter getDataReceiptStatistics()
+    {
+        return _dataReceived;
+    }
+    
+    public StatisticsCounter getMessageDeliveryStatistics()
+    {
+        return _messagesDelivered;
+    }
+    
+    public StatisticsCounter getDataDeliveryStatistics()
+    {
+        return _dataDelivered;
+    }
+    
+    public void resetStatistics()
+    {
+        _messagesDelivered.reset();
+        _dataDelivered.reset();
+        _messagesReceived.reset();
+        _dataReceived.reset();
+    }
+
+    public void initialiseStatistics()
+    {
+        setStatisticsEnabled(!StatisticsCounter.DISABLE_STATISTICS &&
+                _registry.getConfiguration().isStatisticsGenerationConnectionsEnabled());
+        
+        _messagesDelivered = new StatisticsCounter("messages-delivered-" + getSessionID());
+        _dataDelivered = new StatisticsCounter("data-delivered-" + getSessionID());
+        _messagesReceived = new StatisticsCounter("messages-received-" + getSessionID());
+        _dataReceived = new StatisticsCounter("data-received-" + getSessionID());
+    }
+
+    public boolean isStatisticsEnabled()
+    {
+        return _statisticsEnabled;
+    }
+
+    public void setStatisticsEnabled(boolean enabled)
+    {
+        _statisticsEnabled = enabled;
+    }
 }

Modified: qpid/branches/qpid-2920/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java?rev=1081634&r1=1081633&r2=1081634&view=diff
==============================================================================
--- qpid/branches/qpid-2920/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java (original)
+++ qpid/branches/qpid-2920/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java Tue Mar 15 01:54:07 2011
@@ -231,7 +231,5 @@ public interface AMQProtocolSession exte
 
     List<AMQChannel> getChannels();
 
-    void closeIfLingeringClosedChannels();
-
     void mgmtCloseChannel(int channelId);
 }

Modified: qpid/branches/qpid-2920/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java?rev=1081634&r1=1081633&r2=1081634&view=diff
==============================================================================
--- qpid/branches/qpid-2920/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java (original)
+++ qpid/branches/qpid-2920/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java Tue Mar 15 01:54:07 2011
@@ -37,25 +37,15 @@
  */
 package org.apache.qpid.server.protocol;
 
-import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.ConnectionCloseBody;
-import org.apache.qpid.framing.MethodRegistry;
-import org.apache.qpid.management.common.mbeans.ManagedConnection;
-import org.apache.qpid.management.common.mbeans.annotations.MBeanConstructor;
-import org.apache.qpid.management.common.mbeans.annotations.MBeanDescription;
-import org.apache.qpid.protocol.AMQConstant;
-import org.apache.qpid.server.AMQChannel;
-import org.apache.qpid.server.logging.actors.CurrentActor;
-import org.apache.qpid.server.logging.actors.ManagementActor;
-import org.apache.qpid.server.management.AMQManagedObject;
-import org.apache.qpid.server.management.ManagedObject;
+import java.util.Date;
+import java.util.List;
 
 import javax.management.JMException;
 import javax.management.MBeanException;
 import javax.management.MBeanNotificationInfo;
 import javax.management.NotCompliantMBeanException;
 import javax.management.Notification;
+import javax.management.ObjectName;
 import javax.management.monitor.MonitorNotification;
 import javax.management.openmbean.CompositeData;
 import javax.management.openmbean.CompositeDataSupport;
@@ -66,8 +56,20 @@ import javax.management.openmbean.Simple
 import javax.management.openmbean.TabularData;
 import javax.management.openmbean.TabularDataSupport;
 import javax.management.openmbean.TabularType;
-import java.util.Date;
-import java.util.List;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.ConnectionCloseBody;
+import org.apache.qpid.framing.MethodRegistry;
+import org.apache.qpid.management.common.mbeans.ManagedConnection;
+import org.apache.qpid.management.common.mbeans.annotations.MBeanConstructor;
+import org.apache.qpid.management.common.mbeans.annotations.MBeanDescription;
+import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.logging.actors.CurrentActor;
+import org.apache.qpid.server.logging.actors.ManagementActor;
+import org.apache.qpid.server.management.AMQManagedObject;
+import org.apache.qpid.server.management.ManagedObject;
 
 /**
  * This MBean class implements the management interface. In order to make more attributes, operations and notifications
@@ -94,8 +96,7 @@ public class AMQProtocolSessionMBean ext
         super(ManagedConnection.class, ManagedConnection.TYPE);
         _protocolSession = amqProtocolSession;
         String remote = getRemoteAddress();
-        remote = "anonymous".equals(remote) ? (remote + hashCode()) : remote;
-        _name = jmxEncode(new StringBuffer(remote), 0).toString();
+        _name = "anonymous".equals(remote) ? (remote + hashCode()) : remote;
         init();
     }
 
@@ -175,7 +176,7 @@ public class AMQProtocolSessionMBean ext
 
     public String getObjectInstanceName()
     {
-        return _name;
+        return ObjectName.quote(_name);
     }
 
     /**
@@ -339,4 +340,78 @@ public class AMQProtocolSessionMBean ext
         _broadcaster.sendNotification(n);
     }
 
-} // End of MBean class
+    public void resetStatistics() throws Exception
+    {
+        _protocolSession.resetStatistics();
+    }
+
+    public double getPeakMessageDeliveryRate()
+    {
+        return _protocolSession.getMessageDeliveryStatistics().getPeak();
+    }
+
+    public double getPeakDataDeliveryRate()
+    {
+        return _protocolSession.getDataDeliveryStatistics().getPeak();
+    }
+
+    public double getMessageDeliveryRate()
+    {
+        return _protocolSession.getMessageDeliveryStatistics().getRate();
+    }
+
+    public double getDataDeliveryRate()
+    {
+        return _protocolSession.getDataDeliveryStatistics().getRate();
+    }
+
+    public long getTotalMessagesDelivered()
+    {
+        return _protocolSession.getMessageDeliveryStatistics().getTotal();
+    }
+
+    public long getTotalDataDelivered()
+    {
+        return _protocolSession.getDataDeliveryStatistics().getTotal();
+    }
+
+    public double getPeakMessageReceiptRate()
+    {
+        return _protocolSession.getMessageReceiptStatistics().getPeak();
+    }
+
+    public double getPeakDataReceiptRate()
+    {
+        return _protocolSession.getDataReceiptStatistics().getPeak();
+    }
+
+    public double getMessageReceiptRate()
+    {
+        return _protocolSession.getMessageReceiptStatistics().getRate();
+    }
+
+    public double getDataReceiptRate()
+    {
+        return _protocolSession.getDataReceiptStatistics().getRate();
+    }
+
+    public long getTotalMessagesReceived()
+    {
+        return _protocolSession.getMessageReceiptStatistics().getTotal();
+    }
+
+    public long getTotalDataReceived()
+    {
+        return _protocolSession.getDataReceiptStatistics().getTotal();
+    }
+
+    public boolean isStatisticsEnabled()
+    {
+        return _protocolSession.isStatisticsEnabled();
+    }
+
+    public void setStatisticsEnabled(boolean enabled)
+    {
+        _protocolSession.setStatisticsEnabled(enabled);
+    }
+}

Modified: qpid/branches/qpid-2920/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java?rev=1081634&r1=1081633&r2=1081634&view=diff
==============================================================================
--- qpid/branches/qpid-2920/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java (original)
+++ qpid/branches/qpid-2920/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java Tue Mar 15 01:54:07 2011
@@ -20,15 +20,35 @@
  */
 package org.apache.qpid.server.protocol;
 
+import org.apache.qpid.AMQException;
 import org.apache.qpid.server.logging.LogSubject;
 
 public interface AMQSessionModel
 {
-    Object getID();
+    public Object getID();
 
-    AMQConnectionModel getConnectionModel();
+    public AMQConnectionModel getConnectionModel();
 
-    String getClientID();
+    public String getClientID();
+    
+    public void close() throws AMQException;
 
-    LogSubject getLogSubject();
+    public LogSubject getLogSubject();
+    
+    /**
+     * This method is called from the housekeeping thread to check the status of
+     * transactions on this session and react appropriately.
+     * 
+     * If a transaction is open for too long or idle for too long then a warning
+     * is logged or the connection is closed, depending on the configuration. An open
+     * transaction is one that has recent activity. The transaction age is counted
+     * from the time the transaction was started. An idle transaction is one that 
+     * has had no activity, such as publishing or acknowledgeing messages.
+     * 
+     * @param openWarn time in milliseconds before alerting on open transaction
+     * @param openClose time in milliseconds before closing connection with open transaction
+     * @param idleWarn time in milliseconds before alerting on idle transaction
+     * @param idleClose time in milliseconds before closing connection with idle transaction
+     */
+    public void checkTransactionStatus(long openWarn, long openClose, long idleWarn, long idleClose) throws AMQException;
 }

Modified: qpid/branches/qpid-2920/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java?rev=1081634&r1=1081633&r2=1081634&view=diff
==============================================================================
--- qpid/branches/qpid-2920/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java (original)
+++ qpid/branches/qpid-2920/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java Tue Mar 15 01:54:07 2011
@@ -43,6 +43,7 @@ import javax.management.JMException;
 import javax.management.MBeanException;
 import javax.management.MBeanNotificationInfo;
 import javax.management.Notification;
+import javax.management.ObjectName;
 import javax.management.OperationsException;
 import javax.management.monitor.MonitorNotification;
 import javax.management.openmbean.ArrayType;
@@ -97,7 +98,7 @@ public class AMQQueueMBean extends AMQMa
     {
         super(ManagedQueue.class, ManagedQueue.TYPE);
         _queue = queue;
-        _queueName = jmxEncode(new StringBuffer(queue.getNameShortString()), 0).toString();
+        _queueName = queue.getName();
     }
 
     public ManagedObject getParentObject()
@@ -147,7 +148,7 @@ public class AMQQueueMBean extends AMQMa
 
     public String getObjectInstanceName()
     {
-        return _queueName;
+        return ObjectName.quote(_queueName);
     }
 
     public String getName()

Modified: qpid/branches/qpid-2920/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java?rev=1081634&r1=1081633&r2=1081634&view=diff
==============================================================================
--- qpid/branches/qpid-2920/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java (original)
+++ qpid/branches/qpid-2920/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java Tue Mar 15 01:54:07 2011
@@ -23,6 +23,8 @@ package org.apache.qpid.server.registry;
 import java.net.InetSocketAddress;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Timer;
+import java.util.TimerTask;
 import java.util.UUID;
 
 import org.apache.commons.configuration.ConfigurationException;
@@ -41,11 +43,12 @@ import org.apache.qpid.server.configurat
 import org.apache.qpid.server.logging.CompositeStartupMessageLogger;
 import org.apache.qpid.server.logging.Log4jMessageLogger;
 import org.apache.qpid.server.logging.RootMessageLogger;
-import org.apache.qpid.server.logging.AbstractRootMessageLogger;
 import org.apache.qpid.server.logging.SystemOutMessageLogger;
+import org.apache.qpid.server.logging.actors.AbstractActor;
 import org.apache.qpid.server.logging.actors.BrokerActor;
 import org.apache.qpid.server.logging.actors.CurrentActor;
 import org.apache.qpid.server.logging.messages.BrokerMessages;
+import org.apache.qpid.server.logging.messages.VirtualHostMessages;
 import org.apache.qpid.server.management.ManagedObjectRegistry;
 import org.apache.qpid.server.management.NoopManagedObjectRegistry;
 import org.apache.qpid.server.plugins.PluginManager;
@@ -54,6 +57,7 @@ import org.apache.qpid.server.security.a
 import org.apache.qpid.server.security.auth.database.PrincipalDatabaseManager;
 import org.apache.qpid.server.security.auth.manager.AuthenticationManager;
 import org.apache.qpid.server.security.auth.manager.PrincipalDatabaseAuthenticationManager;
+import org.apache.qpid.server.stats.StatisticsCounter;
 import org.apache.qpid.server.transport.QpidAcceptor;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 import org.apache.qpid.server.virtualhost.VirtualHostImpl;
@@ -104,6 +108,10 @@ public abstract class ApplicationRegistr
     private ConfigStore _configStore;
 
     protected String _registryName;
+    
+    private Timer _reportingTimer;
+    private boolean _statisticsEnabled = false;
+    private StatisticsCounter _messagesDelivered, _dataDelivered, _messagesReceived, _dataReceived;
 
     static
     {
@@ -294,6 +302,8 @@ public abstract class ApplicationRegistr
         try
         {
             initialiseVirtualHosts();
+            initialiseStatistics();
+            initialiseStatisticsReporting();
         }
         finally
         {
@@ -320,6 +330,72 @@ public abstract class ApplicationRegistr
     {
         _managedObjectRegistry = new NoopManagedObjectRegistry();
     }
+    
+    public void initialiseStatisticsReporting()
+    {
+        long report = _configuration.getStatisticsReportingPeriod() * 1000; // convert to ms
+        final boolean broker = _configuration.isStatisticsGenerationBrokerEnabled();
+        final boolean virtualhost = _configuration.isStatisticsGenerationVirtualhostsEnabled();
+        final boolean reset = _configuration.isStatisticsReportResetEnabled();
+        
+        /* add a timer task to report statistics if generation is enabled for broker or virtualhosts */
+        if (report > 0L && (broker || virtualhost))
+        {
+            _reportingTimer = new Timer("Statistics-Reporting", true);
+            
+            class StatisticsReportingTask extends TimerTask
+            {
+                private final int DELIVERED = 0;
+                private final int RECEIVED = 1;
+                
+                public void run()
+                {
+                    CurrentActor.set(new AbstractActor(ApplicationRegistry.getInstance().getRootMessageLogger()) {
+                        public String getLogMessage()
+                        {
+                            return "[" + Thread.currentThread().getName() + "] ";
+                        }
+                    });
+                    
+                    if (broker)
+                    {
+                        CurrentActor.get().message(BrokerMessages.STATS_DATA(DELIVERED, _dataDelivered.getPeak() / 1024.0, _dataDelivered.getTotal()));
+                        CurrentActor.get().message(BrokerMessages.STATS_MSGS(DELIVERED, _messagesDelivered.getPeak(), _messagesDelivered.getTotal()));
+                        CurrentActor.get().message(BrokerMessages.STATS_DATA(RECEIVED, _dataReceived.getPeak() / 1024.0, _dataReceived.getTotal()));
+                        CurrentActor.get().message(BrokerMessages.STATS_MSGS(RECEIVED, _messagesReceived.getPeak(), _messagesReceived.getTotal()));
+                    }
+                    
+                    if (virtualhost)
+                    {
+                        for (VirtualHost vhost : getVirtualHostRegistry().getVirtualHosts())
+                        {
+                            String name = vhost.getName();
+                            StatisticsCounter dataDelivered = vhost.getDataDeliveryStatistics();
+                            StatisticsCounter messagesDelivered = vhost.getMessageDeliveryStatistics();
+                            StatisticsCounter dataReceived = vhost.getDataReceiptStatistics();
+                            StatisticsCounter messagesReceived = vhost.getMessageReceiptStatistics();
+                            
+                            CurrentActor.get().message(VirtualHostMessages.STATS_DATA(name, DELIVERED, dataDelivered.getPeak() / 1024.0, dataDelivered.getTotal()));
+                            CurrentActor.get().message(VirtualHostMessages.STATS_MSGS(name, DELIVERED, messagesDelivered.getPeak(), messagesDelivered.getTotal()));
+                            CurrentActor.get().message(VirtualHostMessages.STATS_DATA(name, RECEIVED, dataReceived.getPeak() / 1024.0, dataReceived.getTotal()));
+                            CurrentActor.get().message(VirtualHostMessages.STATS_MSGS(name, RECEIVED, messagesReceived.getPeak(), messagesReceived.getTotal()));
+                        }
+                    }
+                    
+                    if (reset)
+                    {
+                        resetStatistics();
+                    }
+
+                    CurrentActor.remove();
+                }
+            }
+
+            _reportingTimer.scheduleAtFixedRate(new StatisticsReportingTask(),
+                                                report / 2,
+                                                report);
+        }
+    }
 
     public static IApplicationRegistry getInstance()
     {
@@ -369,6 +445,12 @@ public abstract class ApplicationRegistr
         {
             _logger.info("Shutting down ApplicationRegistry:" + this);
         }
+        
+        //Stop Statistics Reporting
+        if (_reportingTimer != null)
+        {
+            _reportingTimer.cancel();
+        }
 
         //Stop incoming connections
         unbind();
@@ -498,4 +580,76 @@ public abstract class ApplicationRegistr
         getBroker().addVirtualHost(virtualHost);
         return virtualHost;
     }
+    
+    public void registerMessageDelivered(long messageSize)
+    {
+        if (isStatisticsEnabled())
+        {
+            _messagesDelivered.registerEvent(1L);
+            _dataDelivered.registerEvent(messageSize);
+        }
+    }
+    
+    public void registerMessageReceived(long messageSize, long timestamp)
+    {
+        if (isStatisticsEnabled())
+        {
+            _messagesReceived.registerEvent(1L, timestamp);
+            _dataReceived.registerEvent(messageSize, timestamp);
+        }
+    }
+    
+    public StatisticsCounter getMessageReceiptStatistics()
+    {
+        return _messagesReceived;
+    }
+    
+    public StatisticsCounter getDataReceiptStatistics()
+    {
+        return _dataReceived;
+    }
+    
+    public StatisticsCounter getMessageDeliveryStatistics()
+    {
+        return _messagesDelivered;
+    }
+    
+    public StatisticsCounter getDataDeliveryStatistics()
+    {
+        return _dataDelivered;
+    }
+    
+    public void resetStatistics()
+    {
+        _messagesDelivered.reset();
+        _dataDelivered.reset();
+        _messagesReceived.reset();
+        _dataReceived.reset();
+        
+        for (VirtualHost vhost : _virtualHostRegistry.getVirtualHosts())
+        {
+            vhost.resetStatistics();
+        }
+    }
+
+    public void initialiseStatistics()
+    {
+        setStatisticsEnabled(!StatisticsCounter.DISABLE_STATISTICS &&
+                getConfiguration().isStatisticsGenerationBrokerEnabled());
+        
+        _messagesDelivered = new StatisticsCounter("messages-delivered");
+        _dataDelivered = new StatisticsCounter("bytes-delivered");
+        _messagesReceived = new StatisticsCounter("messages-received");
+        _dataReceived = new StatisticsCounter("bytes-received");
+    }
+
+    public boolean isStatisticsEnabled()
+    {
+        return _statisticsEnabled;
+    }
+
+    public void setStatisticsEnabled(boolean enabled)
+    {
+        _statisticsEnabled = enabled;
+    }
 }

Modified: qpid/branches/qpid-2920/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/IApplicationRegistry.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/IApplicationRegistry.java?rev=1081634&r1=1081633&r2=1081634&view=diff
==============================================================================
--- qpid/branches/qpid-2920/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/IApplicationRegistry.java (original)
+++ qpid/branches/qpid-2920/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/IApplicationRegistry.java Tue Mar 15 01:54:07 2011
@@ -35,11 +35,12 @@ import org.apache.qpid.server.plugins.Pl
 import org.apache.qpid.server.security.SecurityManager;
 import org.apache.qpid.server.security.auth.database.PrincipalDatabaseManager;
 import org.apache.qpid.server.security.auth.manager.AuthenticationManager;
+import org.apache.qpid.server.stats.StatisticsGatherer;
 import org.apache.qpid.server.transport.QpidAcceptor;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
 
-public interface IApplicationRegistry
+public interface IApplicationRegistry extends StatisticsGatherer
 {
     /**
      * Initialise the application registry. All initialisation must be done in this method so that any components
@@ -97,4 +98,6 @@ public interface IApplicationRegistry
     ConfigStore getConfigStore();
 
     void setConfigStore(ConfigStore store);
+    
+    void initialiseStatisticsReporting();
 }

Added: qpid/branches/qpid-2920/qpid/java/broker/src/main/java/org/apache/qpid/server/stats/StatisticsCounter.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920/qpid/java/broker/src/main/java/org/apache/qpid/server/stats/StatisticsCounter.java?rev=1081634&view=auto
==============================================================================
--- qpid/branches/qpid-2920/qpid/java/broker/src/main/java/org/apache/qpid/server/stats/StatisticsCounter.java (added)
+++ qpid/branches/qpid-2920/qpid/java/broker/src/main/java/org/apache/qpid/server/stats/StatisticsCounter.java Tue Mar 15 01:54:07 2011
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.server.stats;
+
+import java.util.Date;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class collects statistics and counts the total, rate per second and
+ * peak rate per second values for the events that are registered with it. 
+ */
+public class StatisticsCounter
+{
+    private static final Logger _log = LoggerFactory.getLogger(StatisticsCounter.class);
+    
+    public static final long DEFAULT_SAMPLE_PERIOD = Long.getLong("qpid.statistics.samplePeriod", 2000L); // 2s
+    public static final boolean DISABLE_STATISTICS = Boolean.getBoolean("qpid.statistics.disable");
+    
+    private static final String COUNTER = "counter";
+    private static final AtomicLong _counterIds = new AtomicLong(0L);
+    
+    private long _peak = 0L;
+    private long _total = 0L;
+    private long _temp = 0L;
+    private long _last = 0L;
+    private long _rate = 0L;
+
+    private long _start;
+    
+    private final long _period;
+    private final String _name;
+
+    public StatisticsCounter()
+    {
+        this(COUNTER);
+    }
+    
+    public StatisticsCounter(String name)
+    {
+        this(name, DEFAULT_SAMPLE_PERIOD);
+    }
+
+    public StatisticsCounter(String name, long period)
+    {
+        _period = period;
+        _name = name + "-" + + _counterIds.incrementAndGet();
+        reset();
+    }
+    
+    public void registerEvent()
+    {
+        registerEvent(1L);
+    }
+
+    public void registerEvent(long value)
+    {
+        registerEvent(value, System.currentTimeMillis());
+    }
+
+    public void registerEvent(long value, long timestamp)
+    {
+        if (DISABLE_STATISTICS)
+        {
+            return;
+        }
+        
+        long thisSample = (timestamp / _period);
+        synchronized (this)
+        {
+            if (thisSample > _last)
+            {
+                _last = thisSample;
+                _rate = _temp;
+                _temp = 0L;
+                if (_rate > _peak)
+                {
+                    _peak = _rate;
+                }
+            }
+            
+            _total += value;
+            _temp += value;
+        }
+    }
+    
+    /**
+     * Update the current rate and peak - may reset rate to zero if a new
+     * sample period has started.
+     */
+    private void update()
+    {
+        registerEvent(0L, System.currentTimeMillis());
+    }
+
+    /**
+     * Reset 
+     */
+    public void reset()
+    {
+        _log.info("Resetting statistics for counter: " + _name);
+        _peak = 0L;
+        _rate = 0L;
+        _total = 0L;
+        _start = System.currentTimeMillis();
+        _last = _start / _period;
+    }
+
+    public double getPeak()
+    {
+        update();
+        return (double) _peak / ((double) _period / 1000.0d);
+    }
+
+    public double getRate()
+    {
+        update();
+        return (double) _rate / ((double) _period / 1000.0d);
+    }
+
+    public long getTotal()
+    {
+        return _total;
+    }
+
+    public long getStart()
+    {
+        return _start;
+    }
+
+    public Date getStartTime()
+    {
+        return new Date(_start);
+    }
+    
+    public String getName()
+    {
+        return _name;
+    }
+    
+    public long getPeriod()
+    {
+        return _period;
+    }
+}

Propchange: qpid/branches/qpid-2920/qpid/java/broker/src/main/java/org/apache/qpid/server/stats/StatisticsCounter.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: qpid/branches/qpid-2920/qpid/java/broker/src/main/java/org/apache/qpid/server/stats/StatisticsCounter.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: qpid/branches/qpid-2920/qpid/java/broker/src/main/java/org/apache/qpid/server/stats/StatisticsGatherer.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920/qpid/java/broker/src/main/java/org/apache/qpid/server/stats/StatisticsGatherer.java?rev=1081634&view=auto
==============================================================================
--- qpid/branches/qpid-2920/qpid/java/broker/src/main/java/org/apache/qpid/server/stats/StatisticsGatherer.java (added)
+++ qpid/branches/qpid-2920/qpid/java/broker/src/main/java/org/apache/qpid/server/stats/StatisticsGatherer.java Tue Mar 15 01:54:07 2011
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.server.stats;
+
+/**
+ * This interface is to be implemented by any broker business object that
+ * wishes to gather statistics about messages delivered through it.
+ * 
+ * These statistics are exposed using a separate JMX Mbean interface, which
+ * calls these methods to retrieve the underlying {@link StatisticsCounter}s
+ * and return their attributes. This interface gives a standard way for
+ * parts of the broker to set up and configure statistics generation.
+ * <p>
+ * When creating these objects, there should be a parent/child relationship
+ * between them, such that the lowest level gatherer can record staticics if
+ * enabled, and pass on the notification to the parent object to allow higher
+ * level aggregation. When resetting statistics, this works in the opposite
+ * direction, with higher level gatherers also resetting all of their children.
+ */
+public interface StatisticsGatherer
+{
+    /**
+     * Initialise the statistics gathering for this object.
+     * 
+     * This method is responsible for creating any {@link StatisticsCounter}
+     * objects and for determining whether statistics generation should be
+     * enabled, by checking broker and system configuration.
+     * 
+     * @see StatisticsCounter#DISABLE_STATISTICS
+     */
+    void initialiseStatistics();
+    
+    /**
+     * This method is responsible for registering the receipt of a message
+     * with the counters, and also for passing this notification to any parent
+     * {@link StatisticsGatherer}s. If statistics generation is not enabled,
+     * then this method should simple delegate to the parent gatherer.
+     * 
+     * @param messageSize the size in bytes of the delivered message
+     * @param timestamp the time the message was delivered
+     */
+    void registerMessageReceived(long messageSize, long timestamp);
+    
+    /**
+     * This method is responsible for registering the delivery of a message
+     * with the counters. Message delivery is recorded by the counter using
+     * the current system time, as opposed to the message timestamp.
+     * 
+     * @param messageSize the size in bytes of the delivered message
+     * @see #registerMessageReceived(long, long)
+     */
+    void registerMessageDelivered(long messageSize);
+    
+    /**
+     * Gives access to the {@link StatisticsCounter} that is used to count
+     * delivered message statistics.
+     * 
+     * @return the {@link StatisticsCounter} that counts delivered messages
+     */
+    StatisticsCounter getMessageDeliveryStatistics();
+    
+    /**
+     * Gives access to the {@link StatisticsCounter} that is used to count
+     * received message statistics.
+     * 
+     * @return the {@link StatisticsCounter} that counts received messages
+     */
+    StatisticsCounter getMessageReceiptStatistics();
+    
+    /**
+     * Gives access to the {@link StatisticsCounter} that is used to count
+     * delivered message size statistics.
+     * 
+     * @return the {@link StatisticsCounter} that counts delivered bytes
+     */
+    StatisticsCounter getDataDeliveryStatistics();
+    
+    /**
+     * Gives access to the {@link StatisticsCounter} that is used to count
+     * received message size statistics.
+     * 
+     * @return the {@link StatisticsCounter} that counts received bytes
+     */
+    StatisticsCounter getDataReceiptStatistics();
+    
+    /**
+     * Reset the counters for this, and any child {@link StatisticsGatherer}s.
+     */
+    void resetStatistics();
+    
+    /**
+     * Check if this object has statistics generation enabled.
+     * 
+     * @return true if statistics generation is enabled
+     */
+    boolean isStatisticsEnabled();
+    
+    /**
+     * Enable or disable statistics generation for this object.
+     */
+    void setStatisticsEnabled(boolean enabled);
+}

Propchange: qpid/branches/qpid-2920/qpid/java/broker/src/main/java/org/apache/qpid/server/stats/StatisticsGatherer.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: qpid/branches/qpid-2920/qpid/java/broker/src/main/java/org/apache/qpid/server/stats/StatisticsGatherer.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: qpid/branches/qpid-2920/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java?rev=1081634&r1=1081633&r2=1081634&view=diff
==============================================================================
--- qpid/branches/qpid-2920/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java (original)
+++ qpid/branches/qpid-2920/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java Tue Mar 15 01:54:07 2011
@@ -97,7 +97,6 @@ public class Subscription_0_10 implement
 
     private FlowCreditManager_0_10 _creditManager;
 
-
     private StateListener _stateListener = new StateListener()
                                             {
 

Modified: qpid/branches/qpid-2920/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java?rev=1081634&r1=1081633&r2=1081634&view=diff
==============================================================================
--- qpid/branches/qpid-2920/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java (original)
+++ qpid/branches/qpid-2920/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java Tue Mar 15 01:54:07 2011
@@ -24,6 +24,9 @@ import static org.apache.qpid.server.log
 
 import java.text.MessageFormat;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
 
 import org.apache.qpid.AMQException;
 import org.apache.qpid.protocol.AMQConstant;
@@ -35,12 +38,16 @@ import org.apache.qpid.server.logging.ac
 import org.apache.qpid.server.logging.messages.ConnectionMessages;
 import org.apache.qpid.server.protocol.AMQConnectionModel;
 import org.apache.qpid.server.protocol.AMQSessionModel;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.stats.StatisticsCounter;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 import org.apache.qpid.transport.Connection;
+import org.apache.qpid.transport.ConnectionCloseCode;
 import org.apache.qpid.transport.ExecutionErrorCode;
 import org.apache.qpid.transport.ExecutionException;
 import org.apache.qpid.transport.Method;
 import org.apache.qpid.transport.ProtocolEvent;
+import org.apache.qpid.transport.Session;
 
 public class ServerConnection extends Connection implements AMQConnectionModel, LogSubject
 {
@@ -49,11 +56,20 @@ public class ServerConnection extends Co
     private AtomicBoolean _logClosed = new AtomicBoolean(false);
     private LogActor _actor = GenericActor.getInstance(this);
 
+    private ApplicationRegistry _registry;
+    private boolean _statisticsEnabled = false;
+    private StatisticsCounter _messagesDelivered, _dataDelivered, _messagesReceived, _dataReceived;
+    
     public ServerConnection()
     {
 
     }
 
+    public UUID getId()
+    {
+        return _config.getId();
+    }
+
     @Override
     protected void invoke(Method method)
     {
@@ -110,6 +126,9 @@ public class ServerConnection extends Co
     public void setVirtualHost(VirtualHost virtualHost)
     {
         _virtualHost = virtualHost;
+        _virtualHost.getConnectionRegistry().registerConnection(this);
+        
+        initialiseStatistics();
     }
 
     public void setConnectionConfig(final ConnectionConfig config)
@@ -145,6 +164,11 @@ public class ServerConnection extends Co
 
         ((ServerSession)session).close();
     }
+    
+    public LogSubject getLogSubject()
+    {
+        return (LogSubject) this;
+    }
 
     @Override
     public void received(ProtocolEvent event)
@@ -215,4 +239,100 @@ public class ServerConnection extends Co
     {
         return _actor;
     }
+
+    @Override
+    public void close(AMQConstant cause, String message) throws AMQException
+    {
+        ConnectionCloseCode replyCode = ConnectionCloseCode.NORMAL;
+        try
+        {
+	        replyCode = ConnectionCloseCode.get(cause.getCode());
+        }
+        catch (IllegalArgumentException iae)
+        {
+            // Ignore
+        }
+        close(replyCode, message);
+        getVirtualHost().getConnectionRegistry().deregisterConnection(this);
+    }
+
+    @Override
+    public List<AMQSessionModel> getSessionModels()
+    {
+        List<AMQSessionModel> sessions = new ArrayList<AMQSessionModel>();
+        for (Session ssn : getChannels())
+        {
+            sessions.add((AMQSessionModel) ssn);
+        }
+        return sessions;
+    }
+
+    public void registerMessageDelivered(long messageSize)
+    {
+        if (isStatisticsEnabled())
+        {
+            _messagesDelivered.registerEvent(1L);
+            _dataDelivered.registerEvent(messageSize);
+        }
+        _virtualHost.registerMessageDelivered(messageSize);
+    }
+
+    public void registerMessageReceived(long messageSize, long timestamp)
+    {
+        if (isStatisticsEnabled())
+        {
+            _messagesReceived.registerEvent(1L, timestamp);
+            _dataReceived.registerEvent(messageSize, timestamp);
+        }
+        _virtualHost.registerMessageReceived(messageSize, timestamp);
+    }
+    
+    public StatisticsCounter getMessageReceiptStatistics()
+    {
+        return _messagesReceived;
+    }
+    
+    public StatisticsCounter getDataReceiptStatistics()
+    {
+        return _dataReceived;
+    }
+    
+    public StatisticsCounter getMessageDeliveryStatistics()
+    {
+        return _messagesDelivered;
+    }
+    
+    public StatisticsCounter getDataDeliveryStatistics()
+    {
+        return _dataDelivered;
+    }
+    
+    public void resetStatistics()
+    {
+        _messagesDelivered.reset();
+        _dataDelivered.reset();
+        _messagesReceived.reset();
+        _dataReceived.reset();
+    }
+
+    public void initialiseStatistics()
+    {
+        setStatisticsEnabled(!StatisticsCounter.DISABLE_STATISTICS &&
+                _virtualHost.getApplicationRegistry().getConfiguration().isStatisticsGenerationConnectionsEnabled());
+        
+        _messagesDelivered = new StatisticsCounter("messages-delivered-" + getConnectionId());
+        _dataDelivered = new StatisticsCounter("data-delivered-" + getConnectionId());
+        _messagesReceived = new StatisticsCounter("messages-received-" + getConnectionId());
+        _dataReceived = new StatisticsCounter("data-received-" + getConnectionId());
+    }
+
+    public boolean isStatisticsEnabled()
+    {
+        return _statisticsEnabled;
+    }
+
+    public void setStatisticsEnabled(boolean enabled)
+    {
+        _statisticsEnabled = enabled;
+    }
 }

Modified: qpid/branches/qpid-2920/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java?rev=1081634&r1=1081633&r2=1081634&view=diff
==============================================================================
--- qpid/branches/qpid-2920/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java (original)
+++ qpid/branches/qpid-2920/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java Tue Mar 15 01:54:07 2011
@@ -20,26 +20,28 @@
  */
 package org.apache.qpid.server.transport;
 
-import org.apache.qpid.transport.*;
-import org.apache.qpid.server.logging.actors.CurrentActor;
-import org.apache.qpid.server.logging.actors.GenericActor;
-import org.apache.qpid.common.ClientProperties;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.StringTokenizer;
+
+import javax.security.sasl.SaslException;
+import javax.security.sasl.SaslServer;
+
 import org.apache.qpid.protocol.ProtocolEngine;
-import org.apache.qpid.server.security.SecurityManager;
 import org.apache.qpid.server.registry.ApplicationRegistry;
 import org.apache.qpid.server.registry.IApplicationRegistry;
+import org.apache.qpid.server.security.SecurityManager;
 import org.apache.qpid.server.virtualhost.VirtualHost;
-
-import javax.security.sasl.SaslServer;
-import javax.security.sasl.SaslException;
-import java.util.*;
+import org.apache.qpid.transport.*;
 
 public class ServerConnectionDelegate extends ServerDelegate
 {
     private String _localFQDN;
     private final IApplicationRegistry _appRegistry;
 
-
     public ServerConnectionDelegate(IApplicationRegistry appRegistry, String localFQDN)
     {
         this(new HashMap<String,Object>(Collections.singletonMap("qpid.federation_tag",appRegistry.getBroker().getFederationTag())), Collections.singletonList((Object)"en_US"), appRegistry, localFQDN);
@@ -138,6 +140,7 @@ public class ServerConnectionDelegate ex
             sconn.invoke(new ConnectionClose(ConnectionCloseCode.INVALID_PATH, "Unknown virtualhost '"+vhostName+"'"));
             sconn.setState(Connection.State.CLOSING);
         }
+        
     }
     
     @Override

Modified: qpid/branches/qpid-2920/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java?rev=1081634&r1=1081633&r2=1081634&view=diff
==============================================================================
--- qpid/branches/qpid-2920/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java (original)
+++ qpid/branches/qpid-2920/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java Tue Mar 15 01:54:07 2011
@@ -20,12 +20,26 @@
  */
 package org.apache.qpid.server.transport;
 
-import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.CHANNEL_FORMAT;
-import static org.apache.qpid.util.Serial.gt;
+import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.*;
+import static org.apache.qpid.util.Serial.*;
 
-import com.sun.security.auth.UserPrincipal;
+import java.lang.ref.WeakReference;
+import java.security.Principal;
+import java.text.MessageFormat;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.SortedMap;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.qpid.AMQException;
+import org.apache.qpid.protocol.AMQConstant;
 import org.apache.qpid.protocol.ProtocolEngine;
 import org.apache.qpid.server.configuration.ConfigStore;
 import org.apache.qpid.server.configuration.ConfiguredObject;
@@ -38,6 +52,8 @@ import org.apache.qpid.server.logging.ac
 import org.apache.qpid.server.logging.actors.GenericActor;
 import org.apache.qpid.server.logging.messages.ChannelMessages;
 import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.protocol.AMQConnectionModel;
+import org.apache.qpid.server.protocol.AMQSessionModel;
 import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.queue.BaseQueue;
 import org.apache.qpid.server.queue.QueueEntry;
@@ -48,8 +64,6 @@ import org.apache.qpid.server.txn.AutoCo
 import org.apache.qpid.server.txn.LocalTransaction;
 import org.apache.qpid.server.txn.ServerTransaction;
 import org.apache.qpid.server.virtualhost.VirtualHost;
-import org.apache.qpid.server.protocol.AMQSessionModel;
-import org.apache.qpid.server.protocol.AMQConnectionModel;
 import org.apache.qpid.transport.Binary;
 import org.apache.qpid.transport.Connection;
 import org.apache.qpid.transport.MessageTransfer;
@@ -58,24 +72,15 @@ import org.apache.qpid.transport.Range;
 import org.apache.qpid.transport.RangeSet;
 import org.apache.qpid.transport.Session;
 import org.apache.qpid.transport.SessionDelegate;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
-import java.lang.ref.WeakReference;
-import java.security.Principal;
-import java.text.MessageFormat;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.SortedMap;
-import java.util.UUID;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentSkipListMap;
-import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.atomic.AtomicLong;
+import com.sun.security.auth.UserPrincipal;
 
 public class ServerSession extends Session implements PrincipalHolder, SessionConfig, AMQSessionModel, LogSubject
 {
+    private static final Logger _logger = LoggerFactory.getLogger(ServerSession.class);
+    
     private static final String NULL_DESTINTATION = UUID.randomUUID().toString();
 
     private final UUID _id;
@@ -111,6 +116,7 @@ public class ServerSession extends Sessi
     private final AtomicLong _txnCommits = new AtomicLong(0);
     private final AtomicLong _txnRejects = new AtomicLong(0);
     private final AtomicLong _txnCount = new AtomicLong(0);
+    private final AtomicLong _txnUpdateTime = new AtomicLong(0);
 
     private Principal _principal;
 
@@ -141,7 +147,7 @@ public class ServerSession extends Sessi
         _connectionConfig = connConfig;        
         _transaction = new AutoCommitTransaction(this.getMessageStore());
         _principal = new UserPrincipal(connection.getAuthorizationID());
-        _reference = new WeakReference(this);
+        _reference = new WeakReference<Session>(this);
         _id = getConfigStore().createId();
         getConfigStore().addConfiguredObject(this);
     }
@@ -160,8 +166,8 @@ public class ServerSession extends Sessi
 
     public void enqueue(final ServerMessage message, final ArrayList<? extends BaseQueue> queues)
     {
-
-            _transaction.enqueue(queues,message, new ServerTransaction.Action()
+        getConnectionModel().registerMessageReceived(message.getSize(), message.getArrivalTime());
+        _transaction.enqueue(queues,message, new ServerTransaction.Action()
             {
 
                 BaseQueue[] _queues = queues.toArray(new BaseQueue[queues.size()]);
@@ -189,6 +195,7 @@ public class ServerSession extends Sessi
             });
 
             incrementOutstandingTxnsIfNecessary();
+            updateTransactionalActivity();
     }
 
 
@@ -196,6 +203,7 @@ public class ServerSession extends Sessi
                             Runnable postIdSettingAction)
     {
         invoke(xfr, postIdSettingAction);
+        getConnectionModel().registerMessageDelivered(xfr.getBodySize());
     }
 
     public void onMessageDispositionChange(MessageTransfer xfr, MessageDispositionChangeListener acceptListener)
@@ -377,6 +385,7 @@ public class ServerSession extends Sessi
                                      entry.release();
                                  }
                              });
+	    updateTransactionalActivity();
     }
 
     public Collection<Subscription_0_10> getSubscriptions()
@@ -425,6 +434,11 @@ public class ServerSession extends Sessi
         // theory
         return !(_transaction instanceof AutoCommitTransaction);
     }
+    
+    public boolean inTransaction()
+    {
+        return isTransactional() && _txnUpdateTime.get() > 0 && _transaction.getTransactionStartTime() > 0;
+    }
 
     public void selectTx()
     {
@@ -471,6 +485,17 @@ public class ServerSession extends Sessi
         }
     }
 
+    /**
+     * Update last transaction activity timestamp
+     */
+    public void updateTransactionalActivity()
+    {
+        if (isTransactional())
+        {
+            _txnUpdateTime.set(System.currentTimeMillis());
+        }
+    }
+
     public Long getTxnStarts()
     {
         return _txnStarts.get();
@@ -606,6 +631,38 @@ public class ServerSession extends Sessi
         return (LogSubject) this;
     }
 
+    public void checkTransactionStatus(long openWarn, long openClose, long idleWarn, long idleClose) throws AMQException
+    {
+        if (inTransaction())
+        {
+            long currentTime = System.currentTimeMillis();
+            long openTime = currentTime - _transaction.getTransactionStartTime();
+            long idleTime = currentTime - _txnUpdateTime.get();
+
+            // Log a warning on idle or open transactions
+            if (idleWarn > 0L && idleTime > idleWarn)
+            {
+                CurrentActor.get().message(getLogSubject(), ChannelMessages.IDLE_TXN(openTime));
+                _logger.warn("IDLE TRANSACTION ALERT " + getLogSubject().toString() + " " + idleTime + " ms");
+            }
+            else if (openWarn > 0L && openTime > openWarn)
+            {
+                CurrentActor.get().message(getLogSubject(), ChannelMessages.OPEN_TXN(openTime));
+                _logger.warn("OPEN TRANSACTION ALERT " + getLogSubject().toString() + " " + openTime + " ms");
+            }
+
+            // Close connection for idle or open transactions that have timed out
+            if (idleClose > 0L && idleTime > idleClose)
+            {
+                getConnectionModel().closeSession(this, AMQConstant.RESOURCE_ERROR, "Idle transaction timed out");
+            }
+            else if (openClose > 0L && openTime > openClose)
+            {
+                getConnectionModel().closeSession(this, AMQConstant.RESOURCE_ERROR, "Open transaction timed out");
+            }
+        }
+    }
+
     @Override
     public String toLogString()
     {
@@ -617,7 +674,5 @@ public class ServerSession extends Sessi
                                    getVirtualHost().getName(),
                                    getChannel())
             + "] ";
-
     }
-
 }

Modified: qpid/branches/qpid-2920/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java?rev=1081634&r1=1081633&r2=1081634&view=diff
==============================================================================
--- qpid/branches/qpid-2920/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java (original)
+++ qpid/branches/qpid-2920/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java Tue Mar 15 01:54:07 2011
@@ -209,26 +209,27 @@ public class ServerSessionDelegate exten
                 }
                 else
                 {
-
                     if(queue.isExclusive())
                     {
+                        ServerSession s = (ServerSession) session;
+                        queue.setExclusiveOwningSession(s);
                         if(queue.getPrincipalHolder() == null)
                         {
-                            queue.setPrincipalHolder((ServerSession)session);
+                            queue.setPrincipalHolder(s);
+                            queue.setExclusiveOwningSession(s);
                             ((ServerSession) session).addSessionCloseTask(new ServerSession.Task()
                             {
-
                                 public void doTask(ServerSession session)
                                 {
                                     if(queue.getPrincipalHolder() == session)
                                     {
                                         queue.setPrincipalHolder(null);
+                                        queue.setExclusiveOwningSession(null);
                                     }
                                 }
                             });
                         }
 
-
                     }
 
                     FlowCreditManager_0_10 creditManager = new WindowCreditManager(0L,0L);
@@ -369,7 +370,6 @@ public class ServerSessionDelegate exten
         }
 
         ssn.processed(xfr);
-
     }
 
     @Override
@@ -969,10 +969,10 @@ public class ServerSessionDelegate exten
 
                         }
 
-                        if(method.hasAutoDelete()
-                           && method.getAutoDelete()
-                           && method.hasExclusive()
-                           && method.getExclusive())
+                        if (method.hasAutoDelete()
+                            && method.getAutoDelete()
+                            && method.hasExclusive()
+                            && method.getExclusive())
                         {
                             final AMQQueue q = queue;
                             final ServerSession.Task deleteQueueTask = new ServerSession.Task()
@@ -999,12 +999,12 @@ public class ServerSessionDelegate exten
                                     }
                                 });
                         }
-                        else if(method.getExclusive())
+                        if (method.hasExclusive()
+                            && method.getExclusive())
                         {
                             final AMQQueue q = queue;
                             final ServerSession.Task removeExclusive = new ServerSession.Task()
                             {
-
                                 public void doTask(ServerSession session)
                                 {
                                     q.setPrincipalHolder(null);
@@ -1012,10 +1012,10 @@ public class ServerSessionDelegate exten
                                 }
                             };
                             final ServerSession s = (ServerSession) session;
+                            q.setExclusiveOwningSession(s);
                             s.addSessionCloseTask(removeExclusive);
                             queue.addQueueDeleteTask(new AMQQueue.Task()
                             {
-
                                 public void doTask(AMQQueue queue) throws AMQException
                                 {
                                     s.removeSessionCloseTask(removeExclusive);
@@ -1029,7 +1029,7 @@ public class ServerSessionDelegate exten
                     }
                 }
             }
-            else if (method.getExclusive() && (queue.getPrincipalHolder() != null && !queue.getPrincipalHolder().equals(session)))
+            else if (method.getExclusive() && (queue.getExclusiveOwningSession() != null && !queue.getExclusiveOwningSession().equals(session)))
             {
                     String description = "Cannot declare queue('" + queueName + "'),"
                                                                            + " as exclusive queue with same name "

Modified: qpid/branches/qpid-2920/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java?rev=1081634&r1=1081633&r2=1081634&view=diff
==============================================================================
--- qpid/branches/qpid-2920/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java (original)
+++ qpid/branches/qpid-2920/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java Tue Mar 15 01:54:07 2011
@@ -50,6 +50,11 @@ public class AutoCommitTransaction imple
         _transactionLog = transactionLog;
     }
 
+    public long getTransactionStartTime()
+    {
+        return 0L;
+    }
+
     /**
      * Since AutoCommitTransaction have no concept of a long lived transaction, any Actions registered
      * by the caller are executed immediately.

Modified: qpid/branches/qpid-2920/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java?rev=1081634&r1=1081633&r2=1081634&view=diff
==============================================================================
--- qpid/branches/qpid-2920/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java (original)
+++ qpid/branches/qpid-2920/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java Tue Mar 15 01:54:07 2011
@@ -20,18 +20,23 @@ package org.apache.qpid.server.txn;
  * 
  */
 
-
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
 
-import org.apache.log4j.Logger;
 import org.apache.qpid.AMQException;
 import org.apache.qpid.server.message.EnqueableMessage;
 import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.queue.BaseQueue;
 import org.apache.qpid.server.queue.QueueEntry;
 import org.apache.qpid.server.store.TransactionLog;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.BaseQueue;
+import org.apache.qpid.server.queue.QueueEntry;
+import org.apache.qpid.server.store.TransactionLog;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * A concrete implementation of ServerTransaction where enqueue/dequeue
@@ -41,17 +46,28 @@ import org.apache.qpid.server.store.Tran
  */
 public class LocalTransaction implements ServerTransaction
 {
-    protected static final Logger _logger = Logger.getLogger(LocalTransaction.class);
+    protected static final Logger _logger = LoggerFactory.getLogger(LocalTransaction.class);
 
     private final List<Action> _postTransactionActions = new ArrayList<Action>();
 
     private volatile TransactionLog.Transaction _transaction;
     private TransactionLog _transactionLog;
+    private long _txnStartTime = 0L;
 
     public LocalTransaction(TransactionLog transactionLog)
     {
         _transactionLog = transactionLog;
     }
+    
+    public boolean inTransaction()
+    {
+        return _transaction != null;
+    }
+    
+    public long getTransactionStartTime()
+    {
+        return _txnStartTime;
+    }
 
     public void addPostTransactionAction(Action postTransactionAction)
     {
@@ -89,7 +105,6 @@ public class LocalTransaction implements
 
         try
         {
-
             for(QueueEntry entry : queueEntries)
             {
                 ServerMessage message = entry.getMessage();
@@ -113,7 +128,6 @@ public class LocalTransaction implements
             _logger.error("Error during message dequeues", e);
             tidyUpOnError(e);
         }
-
     }
 
     private void tidyUpOnError(Exception e)
@@ -140,8 +154,7 @@ public class LocalTransaction implements
             }
             finally
             {
-                _transaction = null;
-                _postTransactionActions.clear();
+		resetDetails();
             }
         }
 
@@ -193,8 +206,25 @@ public class LocalTransaction implements
     {
         _postTransactionActions.add(postTransactionAction);
 
+        if (_txnStartTime == 0L)
+        {
+            _txnStartTime = System.currentTimeMillis();
+        }
+
         if(message.isPersistent())
         {
+            if(_transaction == null)
+            {
+                for(BaseQueue queue : queues)
+                {
+                    if(queue.isDurable())
+                    {
+                        beginTranIfNecessary();
+                        break;
+                    }
+                }
+            }
+
             try
             {
                 for(BaseQueue queue : queues)
@@ -248,17 +278,14 @@ public class LocalTransaction implements
         }
         finally
         {
-            _transaction = null;
-            _postTransactionActions.clear();
+            resetDetails();
         }
-
     }
 
     public void rollback()
     {
         try
         {
-
             if(_transaction != null)
             {
                 _transaction.abortTran();
@@ -280,9 +307,15 @@ public class LocalTransaction implements
             }
             finally
             {
-                _transaction = null;
-                _postTransactionActions.clear();
+                resetDetails();
             }
         }
     }
+    
+    private void resetDetails()
+    {
+        _transaction = null;
+	_postTransactionActions.clear();
+        _txnStartTime = 0L;
+    }
 }

Modified: qpid/branches/qpid-2920/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/ServerTransaction.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/ServerTransaction.java?rev=1081634&r1=1081633&r2=1081634&view=diff
==============================================================================
--- qpid/branches/qpid-2920/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/ServerTransaction.java (original)
+++ qpid/branches/qpid-2920/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/ServerTransaction.java Tue Mar 15 01:54:07 2011
@@ -52,6 +52,13 @@ public interface ServerTransaction
         public void onRollback();
     }
 
+    /**
+     * Return the time the current transaction started.
+     * 
+     * @return the time this transaction started or 0 if not in a transaction
+     */
+    long getTransactionStartTime();
+
     /** 
      * Register an Action for execution after transaction commit or rollback.  Actions
      * will be executed in the order in which they are registered.

Modified: qpid/branches/qpid-2920/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java?rev=1081634&r1=1081633&r2=1081634&view=diff
==============================================================================
--- qpid/branches/qpid-2920/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java (original)
+++ qpid/branches/qpid-2920/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java Tue Mar 15 01:54:07 2011
@@ -20,30 +20,28 @@
 */
 package org.apache.qpid.server.virtualhost;
 
+import java.util.UUID;
+
 import org.apache.qpid.common.Closeable;
+import org.apache.qpid.server.binding.BindingFactory;
+import org.apache.qpid.server.configuration.ConfigStore;
+import org.apache.qpid.server.configuration.VirtualHostConfig;
+import org.apache.qpid.server.configuration.VirtualHostConfiguration;
 import org.apache.qpid.server.connection.IConnectionRegistry;
+import org.apache.qpid.server.exchange.ExchangeFactory;
+import org.apache.qpid.server.exchange.ExchangeRegistry;
 import org.apache.qpid.server.federation.BrokerLink;
-import org.apache.qpid.server.configuration.VirtualHostConfiguration;
-import org.apache.qpid.server.configuration.VirtualHostConfig;
-import org.apache.qpid.server.configuration.ConfigStore;
+import org.apache.qpid.server.management.ManagedObject;
 import org.apache.qpid.server.queue.QueueRegistry;
-import org.apache.qpid.server.exchange.ExchangeRegistry;
-import org.apache.qpid.server.exchange.ExchangeFactory;
-import org.apache.qpid.server.store.MessageStore;
-import org.apache.qpid.server.store.TransactionLog;
-import org.apache.qpid.server.store.DurableConfigurationStore;
+import org.apache.qpid.server.registry.IApplicationRegistry;
 import org.apache.qpid.server.security.SecurityManager;
 import org.apache.qpid.server.security.auth.manager.AuthenticationManager;
-import org.apache.qpid.server.management.ManagedObject;
-import org.apache.qpid.server.registry.IApplicationRegistry;
-import org.apache.qpid.server.binding.BindingFactory;
-
-import java.util.List;
-import java.util.UUID;
-import java.util.TimerTask;
-import java.util.concurrent.FutureTask;
+import org.apache.qpid.server.stats.StatisticsGatherer;
+import org.apache.qpid.server.store.DurableConfigurationStore;
+import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.store.TransactionLog;
 
-public interface VirtualHost extends DurableConfigurationStore.Source, VirtualHostConfig, Closeable
+public interface VirtualHost extends DurableConfigurationStore.Source, VirtualHostConfig, Closeable, StatisticsGatherer
 {
     IConnectionRegistry getConnectionRegistry();
 

Modified: qpid/branches/qpid-2920/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java?rev=1081634&r1=1081633&r2=1081634&view=diff
==============================================================================
--- qpid/branches/qpid-2920/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java (original)
+++ qpid/branches/qpid-2920/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java Tue Mar 15 01:54:07 2011
@@ -24,19 +24,18 @@ import java.util.Collections;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
-import java.util.TimerTask;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 
 import javax.management.NotCompliantMBeanException;
+import javax.management.ObjectName;
 
 import org.apache.commons.configuration.Configuration;
 import org.apache.commons.configuration.ConfigurationException;
 import org.apache.log4j.Logger;
 import org.apache.qpid.AMQException;
-import org.apache.qpid.AMQInternalException;
 import org.apache.qpid.AMQStoreException;
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.FieldTable;
@@ -63,6 +62,8 @@ import org.apache.qpid.server.logging.me
 import org.apache.qpid.server.logging.subjects.MessageStoreLogSubject;
 import org.apache.qpid.server.management.AMQManagedObject;
 import org.apache.qpid.server.management.ManagedObject;
+import org.apache.qpid.server.protocol.AMQConnectionModel;
+import org.apache.qpid.server.protocol.AMQSessionModel;
 import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.queue.AMQQueueFactory;
 import org.apache.qpid.server.queue.DefaultQueueRegistry;
@@ -72,6 +73,7 @@ import org.apache.qpid.server.registry.I
 import org.apache.qpid.server.security.SecurityManager;
 import org.apache.qpid.server.security.auth.manager.AuthenticationManager;
 import org.apache.qpid.server.security.auth.manager.PrincipalDatabaseAuthenticationManager;
+import org.apache.qpid.server.stats.StatisticsCounter;
 import org.apache.qpid.server.store.ConfigurationRecoveryHandler;
 import org.apache.qpid.server.store.DurableConfigurationStore;
 import org.apache.qpid.server.store.MessageStore;
@@ -111,6 +113,8 @@ public class VirtualHostImpl implements 
     private BrokerConfig _broker;
     private UUID _id;
 
+    private boolean _statisticsEnabled = false;
+    private StatisticsCounter _messagesDelivered, _dataDelivered, _messagesReceived, _dataReceived;
 
     private final long _createTime = System.currentTimeMillis();
     private final ConcurrentHashMap<BrokerLink,BrokerLink> _links = new ConcurrentHashMap<BrokerLink, BrokerLink>();
@@ -161,12 +165,12 @@ public class VirtualHostImpl implements 
 
         public String getObjectInstanceName()
         {
-            return _name.toString();
+            return ObjectName.quote(_name);
         }
 
         public String getName()
         {
-            return _name.toString();
+            return _name;
         }
 
         public VirtualHostImpl getVirtualHost()
@@ -249,6 +253,8 @@ public class VirtualHostImpl implements 
         _brokerMBean = new AMQBrokerManagerMBean(_virtualHostMBean);
         _brokerMBean.register();
         initialiseHouseKeeping(hostConfig.getHousekeepingExpiredMessageCheckPeriod());
+        
+        initialiseStatistics();
     }
 
 	private void initialiseHouseKeeping(long period)
@@ -281,19 +287,30 @@ public class VirtualHostImpl implements 
                             // house keeping task from running.
                         }
                     }
+                    for (AMQConnectionModel connection : getConnectionRegistry().getConnections())
+                    {
+                        _logger.debug("Checking for long running open transactions on connection " + connection);
+                        for (AMQSessionModel session : connection.getSessionModels())
+                        {
+	                        _logger.debug("Checking for long running open transactions on session " + session);
+                            try
+                            {
+                                session.checkTransactionStatus(_configuration.getTransactionTimeoutOpenWarn(),
+	                                                           _configuration.getTransactionTimeoutOpenClose(),
+	                                                           _configuration.getTransactionTimeoutIdleWarn(),
+	                                                           _configuration.getTransactionTimeoutIdleClose());
+	                            }
+                            catch (Exception e)
+                            {
+                                _logger.error("Exception in housekeeping for connection: " + connection.toString(), e);
+                            }
+                        }
+                    }
                 }
             }
 
             scheduleHouseKeepingTask(period, new ExpiredMessagesTask(this));
 
-            class ForceChannelClosuresTask extends TimerTask
-            {
-                public void run()
-                {
-                    _connectionRegistry.expireClosedChannels();
-                }
-            }
-
             Map<String, VirtualHostPluginFactory> plugins =
                 ApplicationRegistry.getInstance().getPluginManager().getVirtualHostPlugins();
 
@@ -627,6 +644,80 @@ public class VirtualHostImpl implements 
     {
         return _bindingFactory;
     }
+    
+    public void registerMessageDelivered(long messageSize)
+    {
+        if (isStatisticsEnabled())
+        {
+            _messagesDelivered.registerEvent(1L);
+            _dataDelivered.registerEvent(messageSize);
+        }
+        _appRegistry.registerMessageDelivered(messageSize);
+    }
+    
+    public void registerMessageReceived(long messageSize, long timestamp)
+    {
+        if (isStatisticsEnabled())
+        {
+            _messagesReceived.registerEvent(1L, timestamp);
+            _dataReceived.registerEvent(messageSize, timestamp);
+        }
+        _appRegistry.registerMessageReceived(messageSize, timestamp);
+    }
+    
+    public StatisticsCounter getMessageReceiptStatistics()
+    {
+        return _messagesReceived;
+    }
+    
+    public StatisticsCounter getDataReceiptStatistics()
+    {
+        return _dataReceived;
+    }
+    
+    public StatisticsCounter getMessageDeliveryStatistics()
+    {
+        return _messagesDelivered;
+    }
+    
+    public StatisticsCounter getDataDeliveryStatistics()
+    {
+        return _dataDelivered;
+    }
+    
+    public void resetStatistics()
+    {
+        _messagesDelivered.reset();
+        _dataDelivered.reset();
+        _messagesReceived.reset();
+        _dataReceived.reset();
+        
+        for (AMQConnectionModel connection : _connectionRegistry.getConnections())
+        {
+            connection.resetStatistics();
+        }
+    }
+
+    public void initialiseStatistics()
+    {
+        setStatisticsEnabled(!StatisticsCounter.DISABLE_STATISTICS &&
+                _appRegistry.getConfiguration().isStatisticsGenerationVirtualhostsEnabled());
+        
+        _messagesDelivered = new StatisticsCounter("messages-delivered-" + getName());
+        _dataDelivered = new StatisticsCounter("bytes-delivered-" + getName());
+        _messagesReceived = new StatisticsCounter("messages-received-" + getName());
+        _dataReceived = new StatisticsCounter("bytes-received-" + getName());
+    }
+
+    public boolean isStatisticsEnabled()
+    {
+        return _statisticsEnabled;
+    }
+
+    public void setStatisticsEnabled(boolean enabled)
+    {
+        _statisticsEnabled = enabled;
+    }
 
     public void createBrokerConnection(final String transport,
                                        final String host,

Added: qpid/branches/qpid-2920/qpid/java/broker/src/test/java/org/apache/qpid/server/stats/StatisticsCounterTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920/qpid/java/broker/src/test/java/org/apache/qpid/server/stats/StatisticsCounterTest.java?rev=1081634&view=auto
==============================================================================
--- qpid/branches/qpid-2920/qpid/java/broker/src/test/java/org/apache/qpid/server/stats/StatisticsCounterTest.java (added)
+++ qpid/branches/qpid-2920/qpid/java/broker/src/test/java/org/apache/qpid/server/stats/StatisticsCounterTest.java Tue Mar 15 01:54:07 2011
@@ -0,0 +1,144 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.stats;
+
+import junit.framework.TestCase;
+
+/**
+ * Unit tests for the {@link StatisticsCounter} class.
+ */
+public class StatisticsCounterTest extends TestCase
+{
+    /**
+     * Check that statistics counters are created correctly.
+     */
+    public void testCreate()
+    {
+        long before = System.currentTimeMillis();
+        StatisticsCounter counter = new StatisticsCounter("name", 1234L);
+        long after = System.currentTimeMillis();
+        
+        assertTrue(before <= counter.getStart());
+        assertTrue(after >= counter.getStart());
+        assertTrue(counter.getName().startsWith("name-"));
+        assertEquals(1234L, counter.getPeriod());
+    }
+ 
+    /**
+     * Check that totals add up correctly.
+     */
+    public void testTotal()
+    {
+        StatisticsCounter counter = new StatisticsCounter("test", 1000L);
+        long start = counter.getStart();
+        for (int i = 0; i < 100; i++)
+        {
+            counter.registerEvent(i, start + i);
+        }
+        assertEquals(99 * 50, counter.getTotal()); // cf. Gauss
+    }
+ 
+    /**
+     * Test totals add up correctly even when messages are delivered
+     * out-of-order.
+     */
+    public void testTotalOutOfOrder()
+    {
+        StatisticsCounter counter = new StatisticsCounter("test", 1000L);
+        long start = counter.getStart();
+        assertEquals(0, counter.getTotal());
+        counter.registerEvent(10, start + 2500);
+        assertEquals(10, counter.getTotal());
+        counter.registerEvent(20, start + 1500);
+        assertEquals(30, counter.getTotal());
+        counter.registerEvent(10, start + 500);
+        assertEquals(40, counter.getTotal());
+    }
+ 
+    /**
+     * Test that the peak rate is reported correctly.
+     */
+    public void testPeak() throws Exception
+    {
+        StatisticsCounter counter = new StatisticsCounter("test", 1000L);
+        long start = counter.getStart();
+        assertEquals(0.0, counter.getPeak());
+        Thread.sleep(500);
+        counter.registerEvent(1000, start + 500);
+        Thread.sleep(1000);
+        assertEquals(1000.0, counter.getPeak());
+        counter.registerEvent(2000, start + 1500);
+        Thread.sleep(1000);
+        assertEquals(2000.0, counter.getPeak());
+        counter.registerEvent(1000, start + 2500);
+        Thread.sleep(1000);
+        assertEquals(2000.0, counter.getPeak());
+    }
+ 
+    /**
+     * Test that peak rate is reported correctly for out-of-order messages,
+     * and the total is also unaffected.
+     */
+    public void testPeakOutOfOrder() throws Exception
+    {
+        StatisticsCounter counter = new StatisticsCounter("test", 1000L);
+        long start = counter.getStart();
+        assertEquals(0.0, counter.getPeak());
+        counter.registerEvent(1000, start + 2500);
+        Thread.sleep(1500);
+        assertEquals(0.0, counter.getPeak());
+        counter.registerEvent(2000, start + 1500);
+        Thread.sleep(1000L);
+        assertEquals(0.0, counter.getPeak());
+        counter.registerEvent(1000, start + 500);
+        Thread.sleep(1500);
+        assertEquals(4000.0, counter.getPeak());
+        Thread.sleep(2000);
+        assertEquals(4000.0, counter.getPeak());
+        counter.registerEvent(1000, start + 500);
+        assertEquals(4000.0, counter.getPeak());
+        Thread.sleep(2000);
+        counter.registerEvent(1000);
+        assertEquals(4000.0, counter.getPeak());
+        assertEquals(6000, counter.getTotal());
+    }
+ 
+    /**
+     * Test the current rate is generated correctly.
+     */
+    public void testRate() throws Exception
+    {
+        StatisticsCounter counter = new StatisticsCounter("test", 1000L);
+        assertEquals(0.0, counter.getRate());
+        Thread.sleep(500);
+        counter.registerEvent(1000);
+        Thread.sleep(1000);
+        assertEquals(1000.0, counter.getRate());
+        counter.registerEvent(2000);
+        Thread.sleep(1000);
+        assertEquals(2000.0, counter.getRate());
+        counter.registerEvent(1000);
+        Thread.sleep(1000);
+        assertEquals(1000.0, counter.getRate());
+        Thread.sleep(1000);
+        assertEquals(0.0, counter.getRate());
+    }
+}

Propchange: qpid/branches/qpid-2920/qpid/java/broker/src/test/java/org/apache/qpid/server/stats/StatisticsCounterTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: qpid/branches/qpid-2920/qpid/java/broker/src/test/java/org/apache/qpid/server/stats/StatisticsCounterTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: qpid/branches/qpid-2920/qpid/java/broker/src/velocity/java/org/apache/qpid/server/logging/GenerateLogMessages.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920/qpid/java/broker/src/velocity/java/org/apache/qpid/server/logging/GenerateLogMessages.java?rev=1081634&r1=1081633&r2=1081634&view=diff
==============================================================================
--- qpid/branches/qpid-2920/qpid/java/broker/src/velocity/java/org/apache/qpid/server/logging/GenerateLogMessages.java (original)
+++ qpid/branches/qpid-2920/qpid/java/broker/src/velocity/java/org/apache/qpid/server/logging/GenerateLogMessages.java Tue Mar 15 01:54:07 2011
@@ -481,7 +481,7 @@ public class GenerateLogMessages
                     // Only check the text inside the braces '{}'
                     int typeIndexEnd = parametersString[index].indexOf("}", typeIndex);
                     String typeString = parametersString[index].substring(typeIndex, typeIndexEnd);
-                    if (typeString.contains("number"))
+                    if (typeString.contains("number") || typeString.contains("choice"))
                     {
                         type = "Number";
                     }

Modified: qpid/branches/qpid-2920/qpid/java/build.xml
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920/qpid/java/build.xml?rev=1081634&r1=1081633&r2=1081634&view=diff
==============================================================================
--- qpid/branches/qpid-2920/qpid/java/build.xml (original)
+++ qpid/branches/qpid-2920/qpid/java/build.xml Tue Mar 15 01:54:07 2011
@@ -93,6 +93,10 @@
     <fail if="failed" message="TEST SUITE FAILED"/>
 
   </target>
+  
+  <target name="report-module" description="generate junitreport for modules">
+    <iterate target="report-module"/>
+  </target>
 
   <target name="jar" description="create module jars">
     <iterate target="jar"/>

Modified: qpid/branches/qpid-2920/qpid/java/client/example/src/main/java/org/apache/qpid/example/pubsub/ConnectionSetup.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920/qpid/java/client/example/src/main/java/org/apache/qpid/example/pubsub/ConnectionSetup.java?rev=1081634&r1=1081633&r2=1081634&view=diff
==============================================================================
--- qpid/branches/qpid-2920/qpid/java/client/example/src/main/java/org/apache/qpid/example/pubsub/ConnectionSetup.java (original)
+++ qpid/branches/qpid-2920/qpid/java/client/example/src/main/java/org/apache/qpid/example/pubsub/ConnectionSetup.java Tue Mar 15 01:54:07 2011
@@ -50,7 +50,7 @@ public class ConnectionSetup
     final static String QUEUE_NAME = "example.MyQueue";
 
     public static final String TOPIC_JNDI_NAME = "topic";
-    final static String TOPIC_NAME = "example.hierarical.topic";
+    final static String TOPIC_NAME = "usa.news";
 
     private Context _ctx;
 

Modified: qpid/branches/qpid-2920/qpid/java/client/example/src/main/java/org/apache/qpid/example/pubsub/Publisher.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920/qpid/java/client/example/src/main/java/org/apache/qpid/example/pubsub/Publisher.java?rev=1081634&r1=1081633&r2=1081634&view=diff
==============================================================================
--- qpid/branches/qpid-2920/qpid/java/client/example/src/main/java/org/apache/qpid/example/pubsub/Publisher.java (original)
+++ qpid/branches/qpid-2920/qpid/java/client/example/src/main/java/org/apache/qpid/example/pubsub/Publisher.java Tue Mar 15 01:54:07 2011
@@ -71,7 +71,7 @@ public class Publisher extends Client
     public static void main(String[] args)
     {
 
-        String destination = args.length > 2 ? args[1] : null;
+        String destination = args.length > 2 ? args[1] : "usa.news";
 
         int msgCount = args.length > 2 ? Integer.parseInt(args[2]) : 100;
 



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org