You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2011/03/15 02:54:18 UTC
svn commit: r1081634 [6/9] - in /qpid/branches/qpid-2920/qpid: ./ bin/ cpp/
cpp/bindings/qpid/ cpp/bindings/qpid/perl/ cpp/bindings/qpid/python/
cpp/bindings/qpid/ruby/ cpp/examples/ cpp/examples/direct/
cpp/examples/failover/ cpp/examples/fanout/ cpp/...
Modified: qpid/branches/qpid-2920/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java?rev=1081634&r1=1081633&r2=1081634&view=diff
==============================================================================
--- qpid/branches/qpid-2920/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java (original)
+++ qpid/branches/qpid-2920/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java Tue Mar 15 01:54:07 2011
@@ -30,7 +30,6 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
-import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CopyOnWriteArraySet;
@@ -92,6 +91,7 @@ import org.apache.qpid.server.output.Pro
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.state.AMQState;
import org.apache.qpid.server.state.AMQStateManager;
+import org.apache.qpid.server.stats.StatisticsCounter;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
import org.apache.qpid.transport.NetworkDriver;
@@ -172,6 +172,10 @@ public class AMQProtocolEngine implement
private final UUID _id;
private final ConfigStore _configStore;
private long _createTime = System.currentTimeMillis();
+
+ private ApplicationRegistry _registry;
+ private boolean _statisticsEnabled = false;
+ private StatisticsCounter _messagesDelivered, _dataDelivered, _messagesReceived, _dataReceived;
public ManagedObject getManagedObject()
{
@@ -195,9 +199,10 @@ public class AMQProtocolEngine implement
_configStore = virtualHostRegistry.getConfigStore();
_id = _configStore.createId();
-
_actor.message(ConnectionMessages.OPEN(null, null, false, false));
+ _registry = virtualHostRegistry.getApplicationRegistry();
+ initialiseStatistics();
}
private AMQProtocolSessionMBean createMBean() throws JMException
@@ -1078,19 +1083,6 @@ public class AMQProtocolEngine implement
return (_clientVersion == null) ? null : _clientVersion.toString();
}
- public void closeIfLingeringClosedChannels()
- {
- for (Entry<Integer, Long>id : _closingChannelsList.entrySet())
- {
- if (id.getValue() + 30000 > System.currentTimeMillis())
- {
- // We have a channel that we closed 30 seconds ago. Client's dead, kill the connection
- _logger.error("Closing connection as channel was closed more than 30 seconds ago and no ChannelCloseOk has been processed");
- closeProtocolSession();
- }
- }
- }
-
public Boolean isIncoming()
{
return true;
@@ -1263,7 +1255,6 @@ public class AMQProtocolEngine implement
public void closeSession(AMQSessionModel session, AMQConstant cause, String message) throws AMQException
{
-
closeChannel((Integer)session.getID());
MethodRegistry methodRegistry = getMethodRegistry();
@@ -1274,5 +1265,97 @@ public class AMQProtocolEngine implement
0,0);
writeFrame(responseBody.generateFrame((Integer)session.getID()));
- }
+ }
+
+ public void close(AMQConstant cause, String message) throws AMQException
+ {
+ closeConnection(0, new AMQConnectionException(cause, message, 0, 0,
+ getProtocolOutputConverter().getProtocolMajorVersion(),
+ getProtocolOutputConverter().getProtocolMinorVersion(),
+ (Throwable) null), true);
+ }
+
+ public List<AMQSessionModel> getSessionModels()
+ {
+ List<AMQSessionModel> sessions = new ArrayList<AMQSessionModel>();
+ for (AMQChannel channel : getChannels())
+ {
+ sessions.add((AMQSessionModel) channel);
+ }
+ return sessions;
+ }
+
+ public LogSubject getLogSubject()
+ {
+ return _logSubject;
+ }
+
+ public void registerMessageDelivered(long messageSize)
+ {
+ if (isStatisticsEnabled())
+ {
+ _messagesDelivered.registerEvent(1L);
+ _dataDelivered.registerEvent(messageSize);
+ }
+ _virtualHost.registerMessageDelivered(messageSize);
+ }
+
+ public void registerMessageReceived(long messageSize, long timestamp)
+ {
+ if (isStatisticsEnabled())
+ {
+ _messagesReceived.registerEvent(1L, timestamp);
+ _dataReceived.registerEvent(messageSize, timestamp);
+ }
+ _virtualHost.registerMessageReceived(messageSize, timestamp);
+ }
+
+ public StatisticsCounter getMessageReceiptStatistics()
+ {
+ return _messagesReceived;
+ }
+
+ public StatisticsCounter getDataReceiptStatistics()
+ {
+ return _dataReceived;
+ }
+
+ public StatisticsCounter getMessageDeliveryStatistics()
+ {
+ return _messagesDelivered;
+ }
+
+ public StatisticsCounter getDataDeliveryStatistics()
+ {
+ return _dataDelivered;
+ }
+
+ public void resetStatistics()
+ {
+ _messagesDelivered.reset();
+ _dataDelivered.reset();
+ _messagesReceived.reset();
+ _dataReceived.reset();
+ }
+
+ public void initialiseStatistics()
+ {
+ setStatisticsEnabled(!StatisticsCounter.DISABLE_STATISTICS &&
+ _registry.getConfiguration().isStatisticsGenerationConnectionsEnabled());
+
+ _messagesDelivered = new StatisticsCounter("messages-delivered-" + getSessionID());
+ _dataDelivered = new StatisticsCounter("data-delivered-" + getSessionID());
+ _messagesReceived = new StatisticsCounter("messages-received-" + getSessionID());
+ _dataReceived = new StatisticsCounter("data-received-" + getSessionID());
+ }
+
+ public boolean isStatisticsEnabled()
+ {
+ return _statisticsEnabled;
+ }
+
+ public void setStatisticsEnabled(boolean enabled)
+ {
+ _statisticsEnabled = enabled;
+ }
}
Modified: qpid/branches/qpid-2920/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java?rev=1081634&r1=1081633&r2=1081634&view=diff
==============================================================================
--- qpid/branches/qpid-2920/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java (original)
+++ qpid/branches/qpid-2920/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java Tue Mar 15 01:54:07 2011
@@ -231,7 +231,5 @@ public interface AMQProtocolSession exte
List<AMQChannel> getChannels();
- void closeIfLingeringClosedChannels();
-
void mgmtCloseChannel(int channelId);
}
Modified: qpid/branches/qpid-2920/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java?rev=1081634&r1=1081633&r2=1081634&view=diff
==============================================================================
--- qpid/branches/qpid-2920/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java (original)
+++ qpid/branches/qpid-2920/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java Tue Mar 15 01:54:07 2011
@@ -37,25 +37,15 @@
*/
package org.apache.qpid.server.protocol;
-import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.ConnectionCloseBody;
-import org.apache.qpid.framing.MethodRegistry;
-import org.apache.qpid.management.common.mbeans.ManagedConnection;
-import org.apache.qpid.management.common.mbeans.annotations.MBeanConstructor;
-import org.apache.qpid.management.common.mbeans.annotations.MBeanDescription;
-import org.apache.qpid.protocol.AMQConstant;
-import org.apache.qpid.server.AMQChannel;
-import org.apache.qpid.server.logging.actors.CurrentActor;
-import org.apache.qpid.server.logging.actors.ManagementActor;
-import org.apache.qpid.server.management.AMQManagedObject;
-import org.apache.qpid.server.management.ManagedObject;
+import java.util.Date;
+import java.util.List;
import javax.management.JMException;
import javax.management.MBeanException;
import javax.management.MBeanNotificationInfo;
import javax.management.NotCompliantMBeanException;
import javax.management.Notification;
+import javax.management.ObjectName;
import javax.management.monitor.MonitorNotification;
import javax.management.openmbean.CompositeData;
import javax.management.openmbean.CompositeDataSupport;
@@ -66,8 +56,20 @@ import javax.management.openmbean.Simple
import javax.management.openmbean.TabularData;
import javax.management.openmbean.TabularDataSupport;
import javax.management.openmbean.TabularType;
-import java.util.Date;
-import java.util.List;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.ConnectionCloseBody;
+import org.apache.qpid.framing.MethodRegistry;
+import org.apache.qpid.management.common.mbeans.ManagedConnection;
+import org.apache.qpid.management.common.mbeans.annotations.MBeanConstructor;
+import org.apache.qpid.management.common.mbeans.annotations.MBeanDescription;
+import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.logging.actors.CurrentActor;
+import org.apache.qpid.server.logging.actors.ManagementActor;
+import org.apache.qpid.server.management.AMQManagedObject;
+import org.apache.qpid.server.management.ManagedObject;
/**
* This MBean class implements the management interface. In order to make more attributes, operations and notifications
@@ -94,8 +96,7 @@ public class AMQProtocolSessionMBean ext
super(ManagedConnection.class, ManagedConnection.TYPE);
_protocolSession = amqProtocolSession;
String remote = getRemoteAddress();
- remote = "anonymous".equals(remote) ? (remote + hashCode()) : remote;
- _name = jmxEncode(new StringBuffer(remote), 0).toString();
+ _name = "anonymous".equals(remote) ? (remote + hashCode()) : remote;
init();
}
@@ -175,7 +176,7 @@ public class AMQProtocolSessionMBean ext
public String getObjectInstanceName()
{
- return _name;
+ return ObjectName.quote(_name);
}
/**
@@ -339,4 +340,78 @@ public class AMQProtocolSessionMBean ext
_broadcaster.sendNotification(n);
}
-} // End of MBean class
+ public void resetStatistics() throws Exception
+ {
+ _protocolSession.resetStatistics();
+ }
+
+ public double getPeakMessageDeliveryRate()
+ {
+ return _protocolSession.getMessageDeliveryStatistics().getPeak();
+ }
+
+ public double getPeakDataDeliveryRate()
+ {
+ return _protocolSession.getDataDeliveryStatistics().getPeak();
+ }
+
+ public double getMessageDeliveryRate()
+ {
+ return _protocolSession.getMessageDeliveryStatistics().getRate();
+ }
+
+ public double getDataDeliveryRate()
+ {
+ return _protocolSession.getDataDeliveryStatistics().getRate();
+ }
+
+ public long getTotalMessagesDelivered()
+ {
+ return _protocolSession.getMessageDeliveryStatistics().getTotal();
+ }
+
+ public long getTotalDataDelivered()
+ {
+ return _protocolSession.getDataDeliveryStatistics().getTotal();
+ }
+
+ public double getPeakMessageReceiptRate()
+ {
+ return _protocolSession.getMessageReceiptStatistics().getPeak();
+ }
+
+ public double getPeakDataReceiptRate()
+ {
+ return _protocolSession.getDataReceiptStatistics().getPeak();
+ }
+
+ public double getMessageReceiptRate()
+ {
+ return _protocolSession.getMessageReceiptStatistics().getRate();
+ }
+
+ public double getDataReceiptRate()
+ {
+ return _protocolSession.getDataReceiptStatistics().getRate();
+ }
+
+ public long getTotalMessagesReceived()
+ {
+ return _protocolSession.getMessageReceiptStatistics().getTotal();
+ }
+
+ public long getTotalDataReceived()
+ {
+ return _protocolSession.getDataReceiptStatistics().getTotal();
+ }
+
+ public boolean isStatisticsEnabled()
+ {
+ return _protocolSession.isStatisticsEnabled();
+ }
+
+ public void setStatisticsEnabled(boolean enabled)
+ {
+ _protocolSession.setStatisticsEnabled(enabled);
+ }
+}
Modified: qpid/branches/qpid-2920/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java?rev=1081634&r1=1081633&r2=1081634&view=diff
==============================================================================
--- qpid/branches/qpid-2920/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java (original)
+++ qpid/branches/qpid-2920/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java Tue Mar 15 01:54:07 2011
@@ -20,15 +20,35 @@
*/
package org.apache.qpid.server.protocol;
+import org.apache.qpid.AMQException;
import org.apache.qpid.server.logging.LogSubject;
public interface AMQSessionModel
{
- Object getID();
+ public Object getID();
- AMQConnectionModel getConnectionModel();
+ public AMQConnectionModel getConnectionModel();
- String getClientID();
+ public String getClientID();
+
+ public void close() throws AMQException;
- LogSubject getLogSubject();
+ public LogSubject getLogSubject();
+
+ /**
+ * This method is called from the housekeeping thread to check the status of
+ * transactions on this session and react appropriately.
+ *
+ * If a transaction is open for too long or idle for too long then a warning
+ * is logged or the connection is closed, depending on the configuration. An open
+ * transaction is one that has recent activity. The transaction age is counted
+ * from the time the transaction was started. An idle transaction is one that
+ * has had no activity, such as publishing or acknowledgeing messages.
+ *
+ * @param openWarn time in milliseconds before alerting on open transaction
+ * @param openClose time in milliseconds before closing connection with open transaction
+ * @param idleWarn time in milliseconds before alerting on idle transaction
+ * @param idleClose time in milliseconds before closing connection with idle transaction
+ */
+ public void checkTransactionStatus(long openWarn, long openClose, long idleWarn, long idleClose) throws AMQException;
}
Modified: qpid/branches/qpid-2920/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java?rev=1081634&r1=1081633&r2=1081634&view=diff
==============================================================================
--- qpid/branches/qpid-2920/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java (original)
+++ qpid/branches/qpid-2920/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java Tue Mar 15 01:54:07 2011
@@ -43,6 +43,7 @@ import javax.management.JMException;
import javax.management.MBeanException;
import javax.management.MBeanNotificationInfo;
import javax.management.Notification;
+import javax.management.ObjectName;
import javax.management.OperationsException;
import javax.management.monitor.MonitorNotification;
import javax.management.openmbean.ArrayType;
@@ -97,7 +98,7 @@ public class AMQQueueMBean extends AMQMa
{
super(ManagedQueue.class, ManagedQueue.TYPE);
_queue = queue;
- _queueName = jmxEncode(new StringBuffer(queue.getNameShortString()), 0).toString();
+ _queueName = queue.getName();
}
public ManagedObject getParentObject()
@@ -147,7 +148,7 @@ public class AMQQueueMBean extends AMQMa
public String getObjectInstanceName()
{
- return _queueName;
+ return ObjectName.quote(_queueName);
}
public String getName()
Modified: qpid/branches/qpid-2920/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java?rev=1081634&r1=1081633&r2=1081634&view=diff
==============================================================================
--- qpid/branches/qpid-2920/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java (original)
+++ qpid/branches/qpid-2920/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java Tue Mar 15 01:54:07 2011
@@ -23,6 +23,8 @@ package org.apache.qpid.server.registry;
import java.net.InetSocketAddress;
import java.util.HashMap;
import java.util.Map;
+import java.util.Timer;
+import java.util.TimerTask;
import java.util.UUID;
import org.apache.commons.configuration.ConfigurationException;
@@ -41,11 +43,12 @@ import org.apache.qpid.server.configurat
import org.apache.qpid.server.logging.CompositeStartupMessageLogger;
import org.apache.qpid.server.logging.Log4jMessageLogger;
import org.apache.qpid.server.logging.RootMessageLogger;
-import org.apache.qpid.server.logging.AbstractRootMessageLogger;
import org.apache.qpid.server.logging.SystemOutMessageLogger;
+import org.apache.qpid.server.logging.actors.AbstractActor;
import org.apache.qpid.server.logging.actors.BrokerActor;
import org.apache.qpid.server.logging.actors.CurrentActor;
import org.apache.qpid.server.logging.messages.BrokerMessages;
+import org.apache.qpid.server.logging.messages.VirtualHostMessages;
import org.apache.qpid.server.management.ManagedObjectRegistry;
import org.apache.qpid.server.management.NoopManagedObjectRegistry;
import org.apache.qpid.server.plugins.PluginManager;
@@ -54,6 +57,7 @@ import org.apache.qpid.server.security.a
import org.apache.qpid.server.security.auth.database.PrincipalDatabaseManager;
import org.apache.qpid.server.security.auth.manager.AuthenticationManager;
import org.apache.qpid.server.security.auth.manager.PrincipalDatabaseAuthenticationManager;
+import org.apache.qpid.server.stats.StatisticsCounter;
import org.apache.qpid.server.transport.QpidAcceptor;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.virtualhost.VirtualHostImpl;
@@ -104,6 +108,10 @@ public abstract class ApplicationRegistr
private ConfigStore _configStore;
protected String _registryName;
+
+ private Timer _reportingTimer;
+ private boolean _statisticsEnabled = false;
+ private StatisticsCounter _messagesDelivered, _dataDelivered, _messagesReceived, _dataReceived;
static
{
@@ -294,6 +302,8 @@ public abstract class ApplicationRegistr
try
{
initialiseVirtualHosts();
+ initialiseStatistics();
+ initialiseStatisticsReporting();
}
finally
{
@@ -320,6 +330,72 @@ public abstract class ApplicationRegistr
{
_managedObjectRegistry = new NoopManagedObjectRegistry();
}
+
+ public void initialiseStatisticsReporting()
+ {
+ long report = _configuration.getStatisticsReportingPeriod() * 1000; // convert to ms
+ final boolean broker = _configuration.isStatisticsGenerationBrokerEnabled();
+ final boolean virtualhost = _configuration.isStatisticsGenerationVirtualhostsEnabled();
+ final boolean reset = _configuration.isStatisticsReportResetEnabled();
+
+ /* add a timer task to report statistics if generation is enabled for broker or virtualhosts */
+ if (report > 0L && (broker || virtualhost))
+ {
+ _reportingTimer = new Timer("Statistics-Reporting", true);
+
+ class StatisticsReportingTask extends TimerTask
+ {
+ private final int DELIVERED = 0;
+ private final int RECEIVED = 1;
+
+ public void run()
+ {
+ CurrentActor.set(new AbstractActor(ApplicationRegistry.getInstance().getRootMessageLogger()) {
+ public String getLogMessage()
+ {
+ return "[" + Thread.currentThread().getName() + "] ";
+ }
+ });
+
+ if (broker)
+ {
+ CurrentActor.get().message(BrokerMessages.STATS_DATA(DELIVERED, _dataDelivered.getPeak() / 1024.0, _dataDelivered.getTotal()));
+ CurrentActor.get().message(BrokerMessages.STATS_MSGS(DELIVERED, _messagesDelivered.getPeak(), _messagesDelivered.getTotal()));
+ CurrentActor.get().message(BrokerMessages.STATS_DATA(RECEIVED, _dataReceived.getPeak() / 1024.0, _dataReceived.getTotal()));
+ CurrentActor.get().message(BrokerMessages.STATS_MSGS(RECEIVED, _messagesReceived.getPeak(), _messagesReceived.getTotal()));
+ }
+
+ if (virtualhost)
+ {
+ for (VirtualHost vhost : getVirtualHostRegistry().getVirtualHosts())
+ {
+ String name = vhost.getName();
+ StatisticsCounter dataDelivered = vhost.getDataDeliveryStatistics();
+ StatisticsCounter messagesDelivered = vhost.getMessageDeliveryStatistics();
+ StatisticsCounter dataReceived = vhost.getDataReceiptStatistics();
+ StatisticsCounter messagesReceived = vhost.getMessageReceiptStatistics();
+
+ CurrentActor.get().message(VirtualHostMessages.STATS_DATA(name, DELIVERED, dataDelivered.getPeak() / 1024.0, dataDelivered.getTotal()));
+ CurrentActor.get().message(VirtualHostMessages.STATS_MSGS(name, DELIVERED, messagesDelivered.getPeak(), messagesDelivered.getTotal()));
+ CurrentActor.get().message(VirtualHostMessages.STATS_DATA(name, RECEIVED, dataReceived.getPeak() / 1024.0, dataReceived.getTotal()));
+ CurrentActor.get().message(VirtualHostMessages.STATS_MSGS(name, RECEIVED, messagesReceived.getPeak(), messagesReceived.getTotal()));
+ }
+ }
+
+ if (reset)
+ {
+ resetStatistics();
+ }
+
+ CurrentActor.remove();
+ }
+ }
+
+ _reportingTimer.scheduleAtFixedRate(new StatisticsReportingTask(),
+ report / 2,
+ report);
+ }
+ }
public static IApplicationRegistry getInstance()
{
@@ -369,6 +445,12 @@ public abstract class ApplicationRegistr
{
_logger.info("Shutting down ApplicationRegistry:" + this);
}
+
+ //Stop Statistics Reporting
+ if (_reportingTimer != null)
+ {
+ _reportingTimer.cancel();
+ }
//Stop incoming connections
unbind();
@@ -498,4 +580,76 @@ public abstract class ApplicationRegistr
getBroker().addVirtualHost(virtualHost);
return virtualHost;
}
+
+ public void registerMessageDelivered(long messageSize)
+ {
+ if (isStatisticsEnabled())
+ {
+ _messagesDelivered.registerEvent(1L);
+ _dataDelivered.registerEvent(messageSize);
+ }
+ }
+
+ public void registerMessageReceived(long messageSize, long timestamp)
+ {
+ if (isStatisticsEnabled())
+ {
+ _messagesReceived.registerEvent(1L, timestamp);
+ _dataReceived.registerEvent(messageSize, timestamp);
+ }
+ }
+
+ public StatisticsCounter getMessageReceiptStatistics()
+ {
+ return _messagesReceived;
+ }
+
+ public StatisticsCounter getDataReceiptStatistics()
+ {
+ return _dataReceived;
+ }
+
+ public StatisticsCounter getMessageDeliveryStatistics()
+ {
+ return _messagesDelivered;
+ }
+
+ public StatisticsCounter getDataDeliveryStatistics()
+ {
+ return _dataDelivered;
+ }
+
+ public void resetStatistics()
+ {
+ _messagesDelivered.reset();
+ _dataDelivered.reset();
+ _messagesReceived.reset();
+ _dataReceived.reset();
+
+ for (VirtualHost vhost : _virtualHostRegistry.getVirtualHosts())
+ {
+ vhost.resetStatistics();
+ }
+ }
+
+ public void initialiseStatistics()
+ {
+ setStatisticsEnabled(!StatisticsCounter.DISABLE_STATISTICS &&
+ getConfiguration().isStatisticsGenerationBrokerEnabled());
+
+ _messagesDelivered = new StatisticsCounter("messages-delivered");
+ _dataDelivered = new StatisticsCounter("bytes-delivered");
+ _messagesReceived = new StatisticsCounter("messages-received");
+ _dataReceived = new StatisticsCounter("bytes-received");
+ }
+
+ public boolean isStatisticsEnabled()
+ {
+ return _statisticsEnabled;
+ }
+
+ public void setStatisticsEnabled(boolean enabled)
+ {
+ _statisticsEnabled = enabled;
+ }
}
Modified: qpid/branches/qpid-2920/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/IApplicationRegistry.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/IApplicationRegistry.java?rev=1081634&r1=1081633&r2=1081634&view=diff
==============================================================================
--- qpid/branches/qpid-2920/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/IApplicationRegistry.java (original)
+++ qpid/branches/qpid-2920/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/IApplicationRegistry.java Tue Mar 15 01:54:07 2011
@@ -35,11 +35,12 @@ import org.apache.qpid.server.plugins.Pl
import org.apache.qpid.server.security.SecurityManager;
import org.apache.qpid.server.security.auth.database.PrincipalDatabaseManager;
import org.apache.qpid.server.security.auth.manager.AuthenticationManager;
+import org.apache.qpid.server.stats.StatisticsGatherer;
import org.apache.qpid.server.transport.QpidAcceptor;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
-public interface IApplicationRegistry
+public interface IApplicationRegistry extends StatisticsGatherer
{
/**
* Initialise the application registry. All initialisation must be done in this method so that any components
@@ -97,4 +98,6 @@ public interface IApplicationRegistry
ConfigStore getConfigStore();
void setConfigStore(ConfigStore store);
+
+ void initialiseStatisticsReporting();
}
Added: qpid/branches/qpid-2920/qpid/java/broker/src/main/java/org/apache/qpid/server/stats/StatisticsCounter.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920/qpid/java/broker/src/main/java/org/apache/qpid/server/stats/StatisticsCounter.java?rev=1081634&view=auto
==============================================================================
--- qpid/branches/qpid-2920/qpid/java/broker/src/main/java/org/apache/qpid/server/stats/StatisticsCounter.java (added)
+++ qpid/branches/qpid-2920/qpid/java/broker/src/main/java/org/apache/qpid/server/stats/StatisticsCounter.java Tue Mar 15 01:54:07 2011
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.server.stats;
+
+import java.util.Date;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class collects statistics and counts the total, rate per second and
+ * peak rate per second values for the events that are registered with it.
+ */
+public class StatisticsCounter
+{
+ private static final Logger _log = LoggerFactory.getLogger(StatisticsCounter.class);
+
+ public static final long DEFAULT_SAMPLE_PERIOD = Long.getLong("qpid.statistics.samplePeriod", 2000L); // 2s
+ public static final boolean DISABLE_STATISTICS = Boolean.getBoolean("qpid.statistics.disable");
+
+ private static final String COUNTER = "counter";
+ private static final AtomicLong _counterIds = new AtomicLong(0L);
+
+ private long _peak = 0L;
+ private long _total = 0L;
+ private long _temp = 0L;
+ private long _last = 0L;
+ private long _rate = 0L;
+
+ private long _start;
+
+ private final long _period;
+ private final String _name;
+
+ public StatisticsCounter()
+ {
+ this(COUNTER);
+ }
+
+ public StatisticsCounter(String name)
+ {
+ this(name, DEFAULT_SAMPLE_PERIOD);
+ }
+
+ public StatisticsCounter(String name, long period)
+ {
+ _period = period;
+ _name = name + "-" + + _counterIds.incrementAndGet();
+ reset();
+ }
+
+ public void registerEvent()
+ {
+ registerEvent(1L);
+ }
+
+ public void registerEvent(long value)
+ {
+ registerEvent(value, System.currentTimeMillis());
+ }
+
+ public void registerEvent(long value, long timestamp)
+ {
+ if (DISABLE_STATISTICS)
+ {
+ return;
+ }
+
+ long thisSample = (timestamp / _period);
+ synchronized (this)
+ {
+ if (thisSample > _last)
+ {
+ _last = thisSample;
+ _rate = _temp;
+ _temp = 0L;
+ if (_rate > _peak)
+ {
+ _peak = _rate;
+ }
+ }
+
+ _total += value;
+ _temp += value;
+ }
+ }
+
+ /**
+ * Update the current rate and peak - may reset rate to zero if a new
+ * sample period has started.
+ */
+ private void update()
+ {
+ registerEvent(0L, System.currentTimeMillis());
+ }
+
+ /**
+ * Reset
+ */
+ public void reset()
+ {
+ _log.info("Resetting statistics for counter: " + _name);
+ _peak = 0L;
+ _rate = 0L;
+ _total = 0L;
+ _start = System.currentTimeMillis();
+ _last = _start / _period;
+ }
+
+ public double getPeak()
+ {
+ update();
+ return (double) _peak / ((double) _period / 1000.0d);
+ }
+
+ public double getRate()
+ {
+ update();
+ return (double) _rate / ((double) _period / 1000.0d);
+ }
+
+ public long getTotal()
+ {
+ return _total;
+ }
+
+ public long getStart()
+ {
+ return _start;
+ }
+
+ public Date getStartTime()
+ {
+ return new Date(_start);
+ }
+
+ public String getName()
+ {
+ return _name;
+ }
+
+ public long getPeriod()
+ {
+ return _period;
+ }
+}
Propchange: qpid/branches/qpid-2920/qpid/java/broker/src/main/java/org/apache/qpid/server/stats/StatisticsCounter.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: qpid/branches/qpid-2920/qpid/java/broker/src/main/java/org/apache/qpid/server/stats/StatisticsCounter.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: qpid/branches/qpid-2920/qpid/java/broker/src/main/java/org/apache/qpid/server/stats/StatisticsGatherer.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920/qpid/java/broker/src/main/java/org/apache/qpid/server/stats/StatisticsGatherer.java?rev=1081634&view=auto
==============================================================================
--- qpid/branches/qpid-2920/qpid/java/broker/src/main/java/org/apache/qpid/server/stats/StatisticsGatherer.java (added)
+++ qpid/branches/qpid-2920/qpid/java/broker/src/main/java/org/apache/qpid/server/stats/StatisticsGatherer.java Tue Mar 15 01:54:07 2011
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.server.stats;
+
+/**
+ * This interface is to be implemented by any broker business object that
+ * wishes to gather statistics about messages delivered through it.
+ *
+ * These statistics are exposed using a separate JMX Mbean interface, which
+ * calls these methods to retrieve the underlying {@link StatisticsCounter}s
+ * and return their attributes. This interface gives a standard way for
+ * parts of the broker to set up and configure statistics generation.
+ * <p>
+ * When creating these objects, there should be a parent/child relationship
+ * between them, such that the lowest level gatherer can record staticics if
+ * enabled, and pass on the notification to the parent object to allow higher
+ * level aggregation. When resetting statistics, this works in the opposite
+ * direction, with higher level gatherers also resetting all of their children.
+ */
+public interface StatisticsGatherer
+{
+ /**
+ * Initialise the statistics gathering for this object.
+ *
+ * This method is responsible for creating any {@link StatisticsCounter}
+ * objects and for determining whether statistics generation should be
+ * enabled, by checking broker and system configuration.
+ *
+ * @see StatisticsCounter#DISABLE_STATISTICS
+ */
+ void initialiseStatistics();
+
+ /**
+ * This method is responsible for registering the receipt of a message
+ * with the counters, and also for passing this notification to any parent
+ * {@link StatisticsGatherer}s. If statistics generation is not enabled,
+ * then this method should simple delegate to the parent gatherer.
+ *
+ * @param messageSize the size in bytes of the delivered message
+ * @param timestamp the time the message was delivered
+ */
+ void registerMessageReceived(long messageSize, long timestamp);
+
+ /**
+ * This method is responsible for registering the delivery of a message
+ * with the counters. Message delivery is recorded by the counter using
+ * the current system time, as opposed to the message timestamp.
+ *
+ * @param messageSize the size in bytes of the delivered message
+ * @see #registerMessageReceived(long, long)
+ */
+ void registerMessageDelivered(long messageSize);
+
+ /**
+ * Gives access to the {@link StatisticsCounter} that is used to count
+ * delivered message statistics.
+ *
+ * @return the {@link StatisticsCounter} that counts delivered messages
+ */
+ StatisticsCounter getMessageDeliveryStatistics();
+
+ /**
+ * Gives access to the {@link StatisticsCounter} that is used to count
+ * received message statistics.
+ *
+ * @return the {@link StatisticsCounter} that counts received messages
+ */
+ StatisticsCounter getMessageReceiptStatistics();
+
+ /**
+ * Gives access to the {@link StatisticsCounter} that is used to count
+ * delivered message size statistics.
+ *
+ * @return the {@link StatisticsCounter} that counts delivered bytes
+ */
+ StatisticsCounter getDataDeliveryStatistics();
+
+ /**
+ * Gives access to the {@link StatisticsCounter} that is used to count
+ * received message size statistics.
+ *
+ * @return the {@link StatisticsCounter} that counts received bytes
+ */
+ StatisticsCounter getDataReceiptStatistics();
+
+ /**
+ * Reset the counters for this, and any child {@link StatisticsGatherer}s.
+ */
+ void resetStatistics();
+
+ /**
+ * Check if this object has statistics generation enabled.
+ *
+ * @return true if statistics generation is enabled
+ */
+ boolean isStatisticsEnabled();
+
+ /**
+ * Enable or disable statistics generation for this object.
+ */
+ void setStatisticsEnabled(boolean enabled);
+}
Propchange: qpid/branches/qpid-2920/qpid/java/broker/src/main/java/org/apache/qpid/server/stats/StatisticsGatherer.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: qpid/branches/qpid-2920/qpid/java/broker/src/main/java/org/apache/qpid/server/stats/StatisticsGatherer.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Modified: qpid/branches/qpid-2920/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java?rev=1081634&r1=1081633&r2=1081634&view=diff
==============================================================================
--- qpid/branches/qpid-2920/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java (original)
+++ qpid/branches/qpid-2920/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java Tue Mar 15 01:54:07 2011
@@ -97,7 +97,6 @@ public class Subscription_0_10 implement
private FlowCreditManager_0_10 _creditManager;
-
private StateListener _stateListener = new StateListener()
{
Modified: qpid/branches/qpid-2920/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java?rev=1081634&r1=1081633&r2=1081634&view=diff
==============================================================================
--- qpid/branches/qpid-2920/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java (original)
+++ qpid/branches/qpid-2920/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java Tue Mar 15 01:54:07 2011
@@ -24,6 +24,9 @@ import static org.apache.qpid.server.log
import java.text.MessageFormat;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
import org.apache.qpid.AMQException;
import org.apache.qpid.protocol.AMQConstant;
@@ -35,12 +38,16 @@ import org.apache.qpid.server.logging.ac
import org.apache.qpid.server.logging.messages.ConnectionMessages;
import org.apache.qpid.server.protocol.AMQConnectionModel;
import org.apache.qpid.server.protocol.AMQSessionModel;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.stats.StatisticsCounter;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.transport.Connection;
+import org.apache.qpid.transport.ConnectionCloseCode;
import org.apache.qpid.transport.ExecutionErrorCode;
import org.apache.qpid.transport.ExecutionException;
import org.apache.qpid.transport.Method;
import org.apache.qpid.transport.ProtocolEvent;
+import org.apache.qpid.transport.Session;
public class ServerConnection extends Connection implements AMQConnectionModel, LogSubject
{
@@ -49,11 +56,20 @@ public class ServerConnection extends Co
private AtomicBoolean _logClosed = new AtomicBoolean(false);
private LogActor _actor = GenericActor.getInstance(this);
+ private ApplicationRegistry _registry;
+ private boolean _statisticsEnabled = false;
+ private StatisticsCounter _messagesDelivered, _dataDelivered, _messagesReceived, _dataReceived;
+
public ServerConnection()
{
}
+ public UUID getId()
+ {
+ return _config.getId();
+ }
+
@Override
protected void invoke(Method method)
{
@@ -110,6 +126,9 @@ public class ServerConnection extends Co
public void setVirtualHost(VirtualHost virtualHost)
{
_virtualHost = virtualHost;
+ _virtualHost.getConnectionRegistry().registerConnection(this);
+
+ initialiseStatistics();
}
public void setConnectionConfig(final ConnectionConfig config)
@@ -145,6 +164,11 @@ public class ServerConnection extends Co
((ServerSession)session).close();
}
+
+ public LogSubject getLogSubject()
+ {
+ return (LogSubject) this;
+ }
@Override
public void received(ProtocolEvent event)
@@ -215,4 +239,100 @@ public class ServerConnection extends Co
{
return _actor;
}
+
+ @Override
+ public void close(AMQConstant cause, String message) throws AMQException
+ {
+ ConnectionCloseCode replyCode = ConnectionCloseCode.NORMAL;
+ try
+ {
+ replyCode = ConnectionCloseCode.get(cause.getCode());
+ }
+ catch (IllegalArgumentException iae)
+ {
+ // Ignore
+ }
+ close(replyCode, message);
+ getVirtualHost().getConnectionRegistry().deregisterConnection(this);
+ }
+
+ @Override
+ public List<AMQSessionModel> getSessionModels()
+ {
+ List<AMQSessionModel> sessions = new ArrayList<AMQSessionModel>();
+ for (Session ssn : getChannels())
+ {
+ sessions.add((AMQSessionModel) ssn);
+ }
+ return sessions;
+ }
+
+ public void registerMessageDelivered(long messageSize)
+ {
+ if (isStatisticsEnabled())
+ {
+ _messagesDelivered.registerEvent(1L);
+ _dataDelivered.registerEvent(messageSize);
+ }
+ _virtualHost.registerMessageDelivered(messageSize);
+ }
+
+ public void registerMessageReceived(long messageSize, long timestamp)
+ {
+ if (isStatisticsEnabled())
+ {
+ _messagesReceived.registerEvent(1L, timestamp);
+ _dataReceived.registerEvent(messageSize, timestamp);
+ }
+ _virtualHost.registerMessageReceived(messageSize, timestamp);
+ }
+
+ public StatisticsCounter getMessageReceiptStatistics()
+ {
+ return _messagesReceived;
+ }
+
+ public StatisticsCounter getDataReceiptStatistics()
+ {
+ return _dataReceived;
+ }
+
+ public StatisticsCounter getMessageDeliveryStatistics()
+ {
+ return _messagesDelivered;
+ }
+
+ public StatisticsCounter getDataDeliveryStatistics()
+ {
+ return _dataDelivered;
+ }
+
+ public void resetStatistics()
+ {
+ _messagesDelivered.reset();
+ _dataDelivered.reset();
+ _messagesReceived.reset();
+ _dataReceived.reset();
+ }
+
+ public void initialiseStatistics()
+ {
+ setStatisticsEnabled(!StatisticsCounter.DISABLE_STATISTICS &&
+ _virtualHost.getApplicationRegistry().getConfiguration().isStatisticsGenerationConnectionsEnabled());
+
+ _messagesDelivered = new StatisticsCounter("messages-delivered-" + getConnectionId());
+ _dataDelivered = new StatisticsCounter("data-delivered-" + getConnectionId());
+ _messagesReceived = new StatisticsCounter("messages-received-" + getConnectionId());
+ _dataReceived = new StatisticsCounter("data-received-" + getConnectionId());
+ }
+
+ public boolean isStatisticsEnabled()
+ {
+ return _statisticsEnabled;
+ }
+
+ public void setStatisticsEnabled(boolean enabled)
+ {
+ _statisticsEnabled = enabled;
+ }
}
Modified: qpid/branches/qpid-2920/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java?rev=1081634&r1=1081633&r2=1081634&view=diff
==============================================================================
--- qpid/branches/qpid-2920/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java (original)
+++ qpid/branches/qpid-2920/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java Tue Mar 15 01:54:07 2011
@@ -20,26 +20,28 @@
*/
package org.apache.qpid.server.transport;
-import org.apache.qpid.transport.*;
-import org.apache.qpid.server.logging.actors.CurrentActor;
-import org.apache.qpid.server.logging.actors.GenericActor;
-import org.apache.qpid.common.ClientProperties;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.StringTokenizer;
+
+import javax.security.sasl.SaslException;
+import javax.security.sasl.SaslServer;
+
import org.apache.qpid.protocol.ProtocolEngine;
-import org.apache.qpid.server.security.SecurityManager;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.registry.IApplicationRegistry;
+import org.apache.qpid.server.security.SecurityManager;
import org.apache.qpid.server.virtualhost.VirtualHost;
-
-import javax.security.sasl.SaslServer;
-import javax.security.sasl.SaslException;
-import java.util.*;
+import org.apache.qpid.transport.*;
public class ServerConnectionDelegate extends ServerDelegate
{
private String _localFQDN;
private final IApplicationRegistry _appRegistry;
-
public ServerConnectionDelegate(IApplicationRegistry appRegistry, String localFQDN)
{
this(new HashMap<String,Object>(Collections.singletonMap("qpid.federation_tag",appRegistry.getBroker().getFederationTag())), Collections.singletonList((Object)"en_US"), appRegistry, localFQDN);
@@ -138,6 +140,7 @@ public class ServerConnectionDelegate ex
sconn.invoke(new ConnectionClose(ConnectionCloseCode.INVALID_PATH, "Unknown virtualhost '"+vhostName+"'"));
sconn.setState(Connection.State.CLOSING);
}
+
}
@Override
Modified: qpid/branches/qpid-2920/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java?rev=1081634&r1=1081633&r2=1081634&view=diff
==============================================================================
--- qpid/branches/qpid-2920/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java (original)
+++ qpid/branches/qpid-2920/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java Tue Mar 15 01:54:07 2011
@@ -20,12 +20,26 @@
*/
package org.apache.qpid.server.transport;
-import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.CHANNEL_FORMAT;
-import static org.apache.qpid.util.Serial.gt;
+import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.*;
+import static org.apache.qpid.util.Serial.*;
-import com.sun.security.auth.UserPrincipal;
+import java.lang.ref.WeakReference;
+import java.security.Principal;
+import java.text.MessageFormat;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.SortedMap;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.atomic.AtomicLong;
import org.apache.qpid.AMQException;
+import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.protocol.ProtocolEngine;
import org.apache.qpid.server.configuration.ConfigStore;
import org.apache.qpid.server.configuration.ConfiguredObject;
@@ -38,6 +52,8 @@ import org.apache.qpid.server.logging.ac
import org.apache.qpid.server.logging.actors.GenericActor;
import org.apache.qpid.server.logging.messages.ChannelMessages;
import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.protocol.AMQConnectionModel;
+import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.BaseQueue;
import org.apache.qpid.server.queue.QueueEntry;
@@ -48,8 +64,6 @@ import org.apache.qpid.server.txn.AutoCo
import org.apache.qpid.server.txn.LocalTransaction;
import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.server.virtualhost.VirtualHost;
-import org.apache.qpid.server.protocol.AMQSessionModel;
-import org.apache.qpid.server.protocol.AMQConnectionModel;
import org.apache.qpid.transport.Binary;
import org.apache.qpid.transport.Connection;
import org.apache.qpid.transport.MessageTransfer;
@@ -58,24 +72,15 @@ import org.apache.qpid.transport.Range;
import org.apache.qpid.transport.RangeSet;
import org.apache.qpid.transport.Session;
import org.apache.qpid.transport.SessionDelegate;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
-import java.lang.ref.WeakReference;
-import java.security.Principal;
-import java.text.MessageFormat;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.SortedMap;
-import java.util.UUID;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentSkipListMap;
-import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.atomic.AtomicLong;
+import com.sun.security.auth.UserPrincipal;
public class ServerSession extends Session implements PrincipalHolder, SessionConfig, AMQSessionModel, LogSubject
{
+ private static final Logger _logger = LoggerFactory.getLogger(ServerSession.class);
+
private static final String NULL_DESTINTATION = UUID.randomUUID().toString();
private final UUID _id;
@@ -111,6 +116,7 @@ public class ServerSession extends Sessi
private final AtomicLong _txnCommits = new AtomicLong(0);
private final AtomicLong _txnRejects = new AtomicLong(0);
private final AtomicLong _txnCount = new AtomicLong(0);
+ private final AtomicLong _txnUpdateTime = new AtomicLong(0);
private Principal _principal;
@@ -141,7 +147,7 @@ public class ServerSession extends Sessi
_connectionConfig = connConfig;
_transaction = new AutoCommitTransaction(this.getMessageStore());
_principal = new UserPrincipal(connection.getAuthorizationID());
- _reference = new WeakReference(this);
+ _reference = new WeakReference<Session>(this);
_id = getConfigStore().createId();
getConfigStore().addConfiguredObject(this);
}
@@ -160,8 +166,8 @@ public class ServerSession extends Sessi
public void enqueue(final ServerMessage message, final ArrayList<? extends BaseQueue> queues)
{
-
- _transaction.enqueue(queues,message, new ServerTransaction.Action()
+ getConnectionModel().registerMessageReceived(message.getSize(), message.getArrivalTime());
+ _transaction.enqueue(queues,message, new ServerTransaction.Action()
{
BaseQueue[] _queues = queues.toArray(new BaseQueue[queues.size()]);
@@ -189,6 +195,7 @@ public class ServerSession extends Sessi
});
incrementOutstandingTxnsIfNecessary();
+ updateTransactionalActivity();
}
@@ -196,6 +203,7 @@ public class ServerSession extends Sessi
Runnable postIdSettingAction)
{
invoke(xfr, postIdSettingAction);
+ getConnectionModel().registerMessageDelivered(xfr.getBodySize());
}
public void onMessageDispositionChange(MessageTransfer xfr, MessageDispositionChangeListener acceptListener)
@@ -377,6 +385,7 @@ public class ServerSession extends Sessi
entry.release();
}
});
+ updateTransactionalActivity();
}
public Collection<Subscription_0_10> getSubscriptions()
@@ -425,6 +434,11 @@ public class ServerSession extends Sessi
// theory
return !(_transaction instanceof AutoCommitTransaction);
}
+
+ public boolean inTransaction()
+ {
+ return isTransactional() && _txnUpdateTime.get() > 0 && _transaction.getTransactionStartTime() > 0;
+ }
public void selectTx()
{
@@ -471,6 +485,17 @@ public class ServerSession extends Sessi
}
}
+ /**
+ * Update last transaction activity timestamp
+ */
+ public void updateTransactionalActivity()
+ {
+ if (isTransactional())
+ {
+ _txnUpdateTime.set(System.currentTimeMillis());
+ }
+ }
+
public Long getTxnStarts()
{
return _txnStarts.get();
@@ -606,6 +631,38 @@ public class ServerSession extends Sessi
return (LogSubject) this;
}
+ public void checkTransactionStatus(long openWarn, long openClose, long idleWarn, long idleClose) throws AMQException
+ {
+ if (inTransaction())
+ {
+ long currentTime = System.currentTimeMillis();
+ long openTime = currentTime - _transaction.getTransactionStartTime();
+ long idleTime = currentTime - _txnUpdateTime.get();
+
+ // Log a warning on idle or open transactions
+ if (idleWarn > 0L && idleTime > idleWarn)
+ {
+ CurrentActor.get().message(getLogSubject(), ChannelMessages.IDLE_TXN(openTime));
+ _logger.warn("IDLE TRANSACTION ALERT " + getLogSubject().toString() + " " + idleTime + " ms");
+ }
+ else if (openWarn > 0L && openTime > openWarn)
+ {
+ CurrentActor.get().message(getLogSubject(), ChannelMessages.OPEN_TXN(openTime));
+ _logger.warn("OPEN TRANSACTION ALERT " + getLogSubject().toString() + " " + openTime + " ms");
+ }
+
+ // Close connection for idle or open transactions that have timed out
+ if (idleClose > 0L && idleTime > idleClose)
+ {
+ getConnectionModel().closeSession(this, AMQConstant.RESOURCE_ERROR, "Idle transaction timed out");
+ }
+ else if (openClose > 0L && openTime > openClose)
+ {
+ getConnectionModel().closeSession(this, AMQConstant.RESOURCE_ERROR, "Open transaction timed out");
+ }
+ }
+ }
+
@Override
public String toLogString()
{
@@ -617,7 +674,5 @@ public class ServerSession extends Sessi
getVirtualHost().getName(),
getChannel())
+ "] ";
-
}
-
}
Modified: qpid/branches/qpid-2920/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java?rev=1081634&r1=1081633&r2=1081634&view=diff
==============================================================================
--- qpid/branches/qpid-2920/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java (original)
+++ qpid/branches/qpid-2920/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java Tue Mar 15 01:54:07 2011
@@ -209,26 +209,27 @@ public class ServerSessionDelegate exten
}
else
{
-
if(queue.isExclusive())
{
+ ServerSession s = (ServerSession) session;
+ queue.setExclusiveOwningSession(s);
if(queue.getPrincipalHolder() == null)
{
- queue.setPrincipalHolder((ServerSession)session);
+ queue.setPrincipalHolder(s);
+ queue.setExclusiveOwningSession(s);
((ServerSession) session).addSessionCloseTask(new ServerSession.Task()
{
-
public void doTask(ServerSession session)
{
if(queue.getPrincipalHolder() == session)
{
queue.setPrincipalHolder(null);
+ queue.setExclusiveOwningSession(null);
}
}
});
}
-
}
FlowCreditManager_0_10 creditManager = new WindowCreditManager(0L,0L);
@@ -369,7 +370,6 @@ public class ServerSessionDelegate exten
}
ssn.processed(xfr);
-
}
@Override
@@ -969,10 +969,10 @@ public class ServerSessionDelegate exten
}
- if(method.hasAutoDelete()
- && method.getAutoDelete()
- && method.hasExclusive()
- && method.getExclusive())
+ if (method.hasAutoDelete()
+ && method.getAutoDelete()
+ && method.hasExclusive()
+ && method.getExclusive())
{
final AMQQueue q = queue;
final ServerSession.Task deleteQueueTask = new ServerSession.Task()
@@ -999,12 +999,12 @@ public class ServerSessionDelegate exten
}
});
}
- else if(method.getExclusive())
+ if (method.hasExclusive()
+ && method.getExclusive())
{
final AMQQueue q = queue;
final ServerSession.Task removeExclusive = new ServerSession.Task()
{
-
public void doTask(ServerSession session)
{
q.setPrincipalHolder(null);
@@ -1012,10 +1012,10 @@ public class ServerSessionDelegate exten
}
};
final ServerSession s = (ServerSession) session;
+ q.setExclusiveOwningSession(s);
s.addSessionCloseTask(removeExclusive);
queue.addQueueDeleteTask(new AMQQueue.Task()
{
-
public void doTask(AMQQueue queue) throws AMQException
{
s.removeSessionCloseTask(removeExclusive);
@@ -1029,7 +1029,7 @@ public class ServerSessionDelegate exten
}
}
}
- else if (method.getExclusive() && (queue.getPrincipalHolder() != null && !queue.getPrincipalHolder().equals(session)))
+ else if (method.getExclusive() && (queue.getExclusiveOwningSession() != null && !queue.getExclusiveOwningSession().equals(session)))
{
String description = "Cannot declare queue('" + queueName + "'),"
+ " as exclusive queue with same name "
Modified: qpid/branches/qpid-2920/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java?rev=1081634&r1=1081633&r2=1081634&view=diff
==============================================================================
--- qpid/branches/qpid-2920/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java (original)
+++ qpid/branches/qpid-2920/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java Tue Mar 15 01:54:07 2011
@@ -50,6 +50,11 @@ public class AutoCommitTransaction imple
_transactionLog = transactionLog;
}
+ public long getTransactionStartTime()
+ {
+ return 0L;
+ }
+
/**
* Since AutoCommitTransaction have no concept of a long lived transaction, any Actions registered
* by the caller are executed immediately.
Modified: qpid/branches/qpid-2920/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java?rev=1081634&r1=1081633&r2=1081634&view=diff
==============================================================================
--- qpid/branches/qpid-2920/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java (original)
+++ qpid/branches/qpid-2920/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java Tue Mar 15 01:54:07 2011
@@ -20,18 +20,23 @@ package org.apache.qpid.server.txn;
*
*/
-
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
-import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
import org.apache.qpid.server.message.EnqueableMessage;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.queue.BaseQueue;
import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.store.TransactionLog;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.BaseQueue;
+import org.apache.qpid.server.queue.QueueEntry;
+import org.apache.qpid.server.store.TransactionLog;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* A concrete implementation of ServerTransaction where enqueue/dequeue
@@ -41,17 +46,28 @@ import org.apache.qpid.server.store.Tran
*/
public class LocalTransaction implements ServerTransaction
{
- protected static final Logger _logger = Logger.getLogger(LocalTransaction.class);
+ protected static final Logger _logger = LoggerFactory.getLogger(LocalTransaction.class);
private final List<Action> _postTransactionActions = new ArrayList<Action>();
private volatile TransactionLog.Transaction _transaction;
private TransactionLog _transactionLog;
+ private long _txnStartTime = 0L;
public LocalTransaction(TransactionLog transactionLog)
{
_transactionLog = transactionLog;
}
+
+ public boolean inTransaction()
+ {
+ return _transaction != null;
+ }
+
+ public long getTransactionStartTime()
+ {
+ return _txnStartTime;
+ }
public void addPostTransactionAction(Action postTransactionAction)
{
@@ -89,7 +105,6 @@ public class LocalTransaction implements
try
{
-
for(QueueEntry entry : queueEntries)
{
ServerMessage message = entry.getMessage();
@@ -113,7 +128,6 @@ public class LocalTransaction implements
_logger.error("Error during message dequeues", e);
tidyUpOnError(e);
}
-
}
private void tidyUpOnError(Exception e)
@@ -140,8 +154,7 @@ public class LocalTransaction implements
}
finally
{
- _transaction = null;
- _postTransactionActions.clear();
+ resetDetails();
}
}
@@ -193,8 +206,25 @@ public class LocalTransaction implements
{
_postTransactionActions.add(postTransactionAction);
+ if (_txnStartTime == 0L)
+ {
+ _txnStartTime = System.currentTimeMillis();
+ }
+
if(message.isPersistent())
{
+ if(_transaction == null)
+ {
+ for(BaseQueue queue : queues)
+ {
+ if(queue.isDurable())
+ {
+ beginTranIfNecessary();
+ break;
+ }
+ }
+ }
+
try
{
for(BaseQueue queue : queues)
@@ -248,17 +278,14 @@ public class LocalTransaction implements
}
finally
{
- _transaction = null;
- _postTransactionActions.clear();
+ resetDetails();
}
-
}
public void rollback()
{
try
{
-
if(_transaction != null)
{
_transaction.abortTran();
@@ -280,9 +307,15 @@ public class LocalTransaction implements
}
finally
{
- _transaction = null;
- _postTransactionActions.clear();
+ resetDetails();
}
}
}
+
+ private void resetDetails()
+ {
+ _transaction = null;
+ _postTransactionActions.clear();
+ _txnStartTime = 0L;
+ }
}
Modified: qpid/branches/qpid-2920/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/ServerTransaction.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/ServerTransaction.java?rev=1081634&r1=1081633&r2=1081634&view=diff
==============================================================================
--- qpid/branches/qpid-2920/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/ServerTransaction.java (original)
+++ qpid/branches/qpid-2920/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/ServerTransaction.java Tue Mar 15 01:54:07 2011
@@ -52,6 +52,13 @@ public interface ServerTransaction
public void onRollback();
}
+ /**
+ * Return the time the current transaction started.
+ *
+ * @return the time this transaction started or 0 if not in a transaction
+ */
+ long getTransactionStartTime();
+
/**
* Register an Action for execution after transaction commit or rollback. Actions
* will be executed in the order in which they are registered.
Modified: qpid/branches/qpid-2920/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java?rev=1081634&r1=1081633&r2=1081634&view=diff
==============================================================================
--- qpid/branches/qpid-2920/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java (original)
+++ qpid/branches/qpid-2920/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java Tue Mar 15 01:54:07 2011
@@ -20,30 +20,28 @@
*/
package org.apache.qpid.server.virtualhost;
+import java.util.UUID;
+
import org.apache.qpid.common.Closeable;
+import org.apache.qpid.server.binding.BindingFactory;
+import org.apache.qpid.server.configuration.ConfigStore;
+import org.apache.qpid.server.configuration.VirtualHostConfig;
+import org.apache.qpid.server.configuration.VirtualHostConfiguration;
import org.apache.qpid.server.connection.IConnectionRegistry;
+import org.apache.qpid.server.exchange.ExchangeFactory;
+import org.apache.qpid.server.exchange.ExchangeRegistry;
import org.apache.qpid.server.federation.BrokerLink;
-import org.apache.qpid.server.configuration.VirtualHostConfiguration;
-import org.apache.qpid.server.configuration.VirtualHostConfig;
-import org.apache.qpid.server.configuration.ConfigStore;
+import org.apache.qpid.server.management.ManagedObject;
import org.apache.qpid.server.queue.QueueRegistry;
-import org.apache.qpid.server.exchange.ExchangeRegistry;
-import org.apache.qpid.server.exchange.ExchangeFactory;
-import org.apache.qpid.server.store.MessageStore;
-import org.apache.qpid.server.store.TransactionLog;
-import org.apache.qpid.server.store.DurableConfigurationStore;
+import org.apache.qpid.server.registry.IApplicationRegistry;
import org.apache.qpid.server.security.SecurityManager;
import org.apache.qpid.server.security.auth.manager.AuthenticationManager;
-import org.apache.qpid.server.management.ManagedObject;
-import org.apache.qpid.server.registry.IApplicationRegistry;
-import org.apache.qpid.server.binding.BindingFactory;
-
-import java.util.List;
-import java.util.UUID;
-import java.util.TimerTask;
-import java.util.concurrent.FutureTask;
+import org.apache.qpid.server.stats.StatisticsGatherer;
+import org.apache.qpid.server.store.DurableConfigurationStore;
+import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.store.TransactionLog;
-public interface VirtualHost extends DurableConfigurationStore.Source, VirtualHostConfig, Closeable
+public interface VirtualHost extends DurableConfigurationStore.Source, VirtualHostConfig, Closeable, StatisticsGatherer
{
IConnectionRegistry getConnectionRegistry();
Modified: qpid/branches/qpid-2920/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java?rev=1081634&r1=1081633&r2=1081634&view=diff
==============================================================================
--- qpid/branches/qpid-2920/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java (original)
+++ qpid/branches/qpid-2920/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java Tue Mar 15 01:54:07 2011
@@ -24,19 +24,18 @@ import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
-import java.util.TimerTask;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import javax.management.NotCompliantMBeanException;
+import javax.management.ObjectName;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.configuration.ConfigurationException;
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
-import org.apache.qpid.AMQInternalException;
import org.apache.qpid.AMQStoreException;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
@@ -63,6 +62,8 @@ import org.apache.qpid.server.logging.me
import org.apache.qpid.server.logging.subjects.MessageStoreLogSubject;
import org.apache.qpid.server.management.AMQManagedObject;
import org.apache.qpid.server.management.ManagedObject;
+import org.apache.qpid.server.protocol.AMQConnectionModel;
+import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.AMQQueueFactory;
import org.apache.qpid.server.queue.DefaultQueueRegistry;
@@ -72,6 +73,7 @@ import org.apache.qpid.server.registry.I
import org.apache.qpid.server.security.SecurityManager;
import org.apache.qpid.server.security.auth.manager.AuthenticationManager;
import org.apache.qpid.server.security.auth.manager.PrincipalDatabaseAuthenticationManager;
+import org.apache.qpid.server.stats.StatisticsCounter;
import org.apache.qpid.server.store.ConfigurationRecoveryHandler;
import org.apache.qpid.server.store.DurableConfigurationStore;
import org.apache.qpid.server.store.MessageStore;
@@ -111,6 +113,8 @@ public class VirtualHostImpl implements
private BrokerConfig _broker;
private UUID _id;
+ private boolean _statisticsEnabled = false;
+ private StatisticsCounter _messagesDelivered, _dataDelivered, _messagesReceived, _dataReceived;
private final long _createTime = System.currentTimeMillis();
private final ConcurrentHashMap<BrokerLink,BrokerLink> _links = new ConcurrentHashMap<BrokerLink, BrokerLink>();
@@ -161,12 +165,12 @@ public class VirtualHostImpl implements
public String getObjectInstanceName()
{
- return _name.toString();
+ return ObjectName.quote(_name);
}
public String getName()
{
- return _name.toString();
+ return _name;
}
public VirtualHostImpl getVirtualHost()
@@ -249,6 +253,8 @@ public class VirtualHostImpl implements
_brokerMBean = new AMQBrokerManagerMBean(_virtualHostMBean);
_brokerMBean.register();
initialiseHouseKeeping(hostConfig.getHousekeepingExpiredMessageCheckPeriod());
+
+ initialiseStatistics();
}
private void initialiseHouseKeeping(long period)
@@ -281,19 +287,30 @@ public class VirtualHostImpl implements
// house keeping task from running.
}
}
+ for (AMQConnectionModel connection : getConnectionRegistry().getConnections())
+ {
+ _logger.debug("Checking for long running open transactions on connection " + connection);
+ for (AMQSessionModel session : connection.getSessionModels())
+ {
+ _logger.debug("Checking for long running open transactions on session " + session);
+ try
+ {
+ session.checkTransactionStatus(_configuration.getTransactionTimeoutOpenWarn(),
+ _configuration.getTransactionTimeoutOpenClose(),
+ _configuration.getTransactionTimeoutIdleWarn(),
+ _configuration.getTransactionTimeoutIdleClose());
+ }
+ catch (Exception e)
+ {
+ _logger.error("Exception in housekeeping for connection: " + connection.toString(), e);
+ }
+ }
+ }
}
}
scheduleHouseKeepingTask(period, new ExpiredMessagesTask(this));
- class ForceChannelClosuresTask extends TimerTask
- {
- public void run()
- {
- _connectionRegistry.expireClosedChannels();
- }
- }
-
Map<String, VirtualHostPluginFactory> plugins =
ApplicationRegistry.getInstance().getPluginManager().getVirtualHostPlugins();
@@ -627,6 +644,80 @@ public class VirtualHostImpl implements
{
return _bindingFactory;
}
+
+ public void registerMessageDelivered(long messageSize)
+ {
+ if (isStatisticsEnabled())
+ {
+ _messagesDelivered.registerEvent(1L);
+ _dataDelivered.registerEvent(messageSize);
+ }
+ _appRegistry.registerMessageDelivered(messageSize);
+ }
+
+ public void registerMessageReceived(long messageSize, long timestamp)
+ {
+ if (isStatisticsEnabled())
+ {
+ _messagesReceived.registerEvent(1L, timestamp);
+ _dataReceived.registerEvent(messageSize, timestamp);
+ }
+ _appRegistry.registerMessageReceived(messageSize, timestamp);
+ }
+
+ public StatisticsCounter getMessageReceiptStatistics()
+ {
+ return _messagesReceived;
+ }
+
+ public StatisticsCounter getDataReceiptStatistics()
+ {
+ return _dataReceived;
+ }
+
+ public StatisticsCounter getMessageDeliveryStatistics()
+ {
+ return _messagesDelivered;
+ }
+
+ public StatisticsCounter getDataDeliveryStatistics()
+ {
+ return _dataDelivered;
+ }
+
+ public void resetStatistics()
+ {
+ _messagesDelivered.reset();
+ _dataDelivered.reset();
+ _messagesReceived.reset();
+ _dataReceived.reset();
+
+ for (AMQConnectionModel connection : _connectionRegistry.getConnections())
+ {
+ connection.resetStatistics();
+ }
+ }
+
+ public void initialiseStatistics()
+ {
+ setStatisticsEnabled(!StatisticsCounter.DISABLE_STATISTICS &&
+ _appRegistry.getConfiguration().isStatisticsGenerationVirtualhostsEnabled());
+
+ _messagesDelivered = new StatisticsCounter("messages-delivered-" + getName());
+ _dataDelivered = new StatisticsCounter("bytes-delivered-" + getName());
+ _messagesReceived = new StatisticsCounter("messages-received-" + getName());
+ _dataReceived = new StatisticsCounter("bytes-received-" + getName());
+ }
+
+ public boolean isStatisticsEnabled()
+ {
+ return _statisticsEnabled;
+ }
+
+ public void setStatisticsEnabled(boolean enabled)
+ {
+ _statisticsEnabled = enabled;
+ }
public void createBrokerConnection(final String transport,
final String host,
Added: qpid/branches/qpid-2920/qpid/java/broker/src/test/java/org/apache/qpid/server/stats/StatisticsCounterTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920/qpid/java/broker/src/test/java/org/apache/qpid/server/stats/StatisticsCounterTest.java?rev=1081634&view=auto
==============================================================================
--- qpid/branches/qpid-2920/qpid/java/broker/src/test/java/org/apache/qpid/server/stats/StatisticsCounterTest.java (added)
+++ qpid/branches/qpid-2920/qpid/java/broker/src/test/java/org/apache/qpid/server/stats/StatisticsCounterTest.java Tue Mar 15 01:54:07 2011
@@ -0,0 +1,144 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.stats;
+
+import junit.framework.TestCase;
+
+/**
+ * Unit tests for the {@link StatisticsCounter} class.
+ */
+public class StatisticsCounterTest extends TestCase
+{
+ /**
+ * Check that statistics counters are created correctly.
+ */
+ public void testCreate()
+ {
+ long before = System.currentTimeMillis();
+ StatisticsCounter counter = new StatisticsCounter("name", 1234L);
+ long after = System.currentTimeMillis();
+
+ assertTrue(before <= counter.getStart());
+ assertTrue(after >= counter.getStart());
+ assertTrue(counter.getName().startsWith("name-"));
+ assertEquals(1234L, counter.getPeriod());
+ }
+
+ /**
+ * Check that totals add up correctly.
+ */
+ public void testTotal()
+ {
+ StatisticsCounter counter = new StatisticsCounter("test", 1000L);
+ long start = counter.getStart();
+ for (int i = 0; i < 100; i++)
+ {
+ counter.registerEvent(i, start + i);
+ }
+ assertEquals(99 * 50, counter.getTotal()); // cf. Gauss
+ }
+
+ /**
+ * Test totals add up correctly even when messages are delivered
+ * out-of-order.
+ */
+ public void testTotalOutOfOrder()
+ {
+ StatisticsCounter counter = new StatisticsCounter("test", 1000L);
+ long start = counter.getStart();
+ assertEquals(0, counter.getTotal());
+ counter.registerEvent(10, start + 2500);
+ assertEquals(10, counter.getTotal());
+ counter.registerEvent(20, start + 1500);
+ assertEquals(30, counter.getTotal());
+ counter.registerEvent(10, start + 500);
+ assertEquals(40, counter.getTotal());
+ }
+
+ /**
+ * Test that the peak rate is reported correctly.
+ */
+ public void testPeak() throws Exception
+ {
+ StatisticsCounter counter = new StatisticsCounter("test", 1000L);
+ long start = counter.getStart();
+ assertEquals(0.0, counter.getPeak());
+ Thread.sleep(500);
+ counter.registerEvent(1000, start + 500);
+ Thread.sleep(1000);
+ assertEquals(1000.0, counter.getPeak());
+ counter.registerEvent(2000, start + 1500);
+ Thread.sleep(1000);
+ assertEquals(2000.0, counter.getPeak());
+ counter.registerEvent(1000, start + 2500);
+ Thread.sleep(1000);
+ assertEquals(2000.0, counter.getPeak());
+ }
+
+ /**
+ * Test that peak rate is reported correctly for out-of-order messages,
+ * and the total is also unaffected.
+ */
+ public void testPeakOutOfOrder() throws Exception
+ {
+ StatisticsCounter counter = new StatisticsCounter("test", 1000L);
+ long start = counter.getStart();
+ assertEquals(0.0, counter.getPeak());
+ counter.registerEvent(1000, start + 2500);
+ Thread.sleep(1500);
+ assertEquals(0.0, counter.getPeak());
+ counter.registerEvent(2000, start + 1500);
+ Thread.sleep(1000L);
+ assertEquals(0.0, counter.getPeak());
+ counter.registerEvent(1000, start + 500);
+ Thread.sleep(1500);
+ assertEquals(4000.0, counter.getPeak());
+ Thread.sleep(2000);
+ assertEquals(4000.0, counter.getPeak());
+ counter.registerEvent(1000, start + 500);
+ assertEquals(4000.0, counter.getPeak());
+ Thread.sleep(2000);
+ counter.registerEvent(1000);
+ assertEquals(4000.0, counter.getPeak());
+ assertEquals(6000, counter.getTotal());
+ }
+
+ /**
+ * Test the current rate is generated correctly.
+ */
+ public void testRate() throws Exception
+ {
+ StatisticsCounter counter = new StatisticsCounter("test", 1000L);
+ assertEquals(0.0, counter.getRate());
+ Thread.sleep(500);
+ counter.registerEvent(1000);
+ Thread.sleep(1000);
+ assertEquals(1000.0, counter.getRate());
+ counter.registerEvent(2000);
+ Thread.sleep(1000);
+ assertEquals(2000.0, counter.getRate());
+ counter.registerEvent(1000);
+ Thread.sleep(1000);
+ assertEquals(1000.0, counter.getRate());
+ Thread.sleep(1000);
+ assertEquals(0.0, counter.getRate());
+ }
+}
Propchange: qpid/branches/qpid-2920/qpid/java/broker/src/test/java/org/apache/qpid/server/stats/StatisticsCounterTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: qpid/branches/qpid-2920/qpid/java/broker/src/test/java/org/apache/qpid/server/stats/StatisticsCounterTest.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Modified: qpid/branches/qpid-2920/qpid/java/broker/src/velocity/java/org/apache/qpid/server/logging/GenerateLogMessages.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920/qpid/java/broker/src/velocity/java/org/apache/qpid/server/logging/GenerateLogMessages.java?rev=1081634&r1=1081633&r2=1081634&view=diff
==============================================================================
--- qpid/branches/qpid-2920/qpid/java/broker/src/velocity/java/org/apache/qpid/server/logging/GenerateLogMessages.java (original)
+++ qpid/branches/qpid-2920/qpid/java/broker/src/velocity/java/org/apache/qpid/server/logging/GenerateLogMessages.java Tue Mar 15 01:54:07 2011
@@ -481,7 +481,7 @@ public class GenerateLogMessages
// Only check the text inside the braces '{}'
int typeIndexEnd = parametersString[index].indexOf("}", typeIndex);
String typeString = parametersString[index].substring(typeIndex, typeIndexEnd);
- if (typeString.contains("number"))
+ if (typeString.contains("number") || typeString.contains("choice"))
{
type = "Number";
}
Modified: qpid/branches/qpid-2920/qpid/java/build.xml
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920/qpid/java/build.xml?rev=1081634&r1=1081633&r2=1081634&view=diff
==============================================================================
--- qpid/branches/qpid-2920/qpid/java/build.xml (original)
+++ qpid/branches/qpid-2920/qpid/java/build.xml Tue Mar 15 01:54:07 2011
@@ -93,6 +93,10 @@
<fail if="failed" message="TEST SUITE FAILED"/>
</target>
+
+ <target name="report-module" description="generate junitreport for modules">
+ <iterate target="report-module"/>
+ </target>
<target name="jar" description="create module jars">
<iterate target="jar"/>
Modified: qpid/branches/qpid-2920/qpid/java/client/example/src/main/java/org/apache/qpid/example/pubsub/ConnectionSetup.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920/qpid/java/client/example/src/main/java/org/apache/qpid/example/pubsub/ConnectionSetup.java?rev=1081634&r1=1081633&r2=1081634&view=diff
==============================================================================
--- qpid/branches/qpid-2920/qpid/java/client/example/src/main/java/org/apache/qpid/example/pubsub/ConnectionSetup.java (original)
+++ qpid/branches/qpid-2920/qpid/java/client/example/src/main/java/org/apache/qpid/example/pubsub/ConnectionSetup.java Tue Mar 15 01:54:07 2011
@@ -50,7 +50,7 @@ public class ConnectionSetup
final static String QUEUE_NAME = "example.MyQueue";
public static final String TOPIC_JNDI_NAME = "topic";
- final static String TOPIC_NAME = "example.hierarical.topic";
+ final static String TOPIC_NAME = "usa.news";
private Context _ctx;
Modified: qpid/branches/qpid-2920/qpid/java/client/example/src/main/java/org/apache/qpid/example/pubsub/Publisher.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920/qpid/java/client/example/src/main/java/org/apache/qpid/example/pubsub/Publisher.java?rev=1081634&r1=1081633&r2=1081634&view=diff
==============================================================================
--- qpid/branches/qpid-2920/qpid/java/client/example/src/main/java/org/apache/qpid/example/pubsub/Publisher.java (original)
+++ qpid/branches/qpid-2920/qpid/java/client/example/src/main/java/org/apache/qpid/example/pubsub/Publisher.java Tue Mar 15 01:54:07 2011
@@ -71,7 +71,7 @@ public class Publisher extends Client
public static void main(String[] args)
{
- String destination = args.length > 2 ? args[1] : null;
+ String destination = args.length > 2 ? args[1] : "usa.news";
int msgCount = args.length > 2 ? Integer.parseInt(args[2]) : 100;
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org