You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ra...@apache.org on 2007/07/30 20:45:50 UTC
svn commit: r561088 - in /activemq/trunk/activemq-core/src:
main/java/org/apache/activemq/advisory/ main/java/org/apache/activemq/broker/
main/java/org/apache/activemq/broker/jmx/
main/java/org/apache/activemq/broker/region/ main/java/org/apache/active...
Author: rajdavies
Date: Mon Jul 30 11:45:49 2007
New Revision: 561088
URL: http://svn.apache.org/viewvc?view=rev&rev=561088
Log:
Fix for https://issues.apache.org/activemq/browse/AMQ-567
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationViewMBean.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationStatistics.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/filter/PropertyExpression.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/StubBroker.java
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java?view=diff&rev=561088&r1=561087&r2=561088
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java Mon Jul 30 11:45:49 2007
@@ -270,25 +270,27 @@
fireAdvisory(context, topic, command, targetConsumerId, advisoryMessage);
}
- protected void fireAdvisory(ConnectionContext context, ActiveMQTopic topic, Command command, ConsumerId targetConsumerId, ActiveMQMessage advisoryMessage) throws Exception {
- advisoryMessage.setDataStructure(command);
- advisoryMessage.setPersistent(false);
- advisoryMessage.setType(AdvisorySupport.ADIVSORY_MESSAGE_TYPE);
- advisoryMessage.setMessageId(new MessageId(advisoryProducerId, messageIdGenerator.getNextSequenceId()));
- advisoryMessage.setTargetConsumerId(targetConsumerId);
-
- advisoryMessage.setDestination(topic);
- advisoryMessage.setResponseRequired(false);
- advisoryMessage.setProducerId(advisoryProducerId);
- boolean originalFlowControl = context.isProducerFlowControl();
- final ProducerBrokerExchange producerExchange = new ProducerBrokerExchange();
- producerExchange.setConnectionContext(context);
- producerExchange.setMutable(true);
- try {
- context.setProducerFlowControl(false);
- next.send(producerExchange, advisoryMessage);
- } finally {
- context.setProducerFlowControl(originalFlowControl);
+ protected void fireAdvisory(ConnectionContext context,ActiveMQTopic topic,Command command,
+ ConsumerId targetConsumerId,ActiveMQMessage advisoryMessage) throws Exception{
+ if(getBrokerService().isStarted()){
+ advisoryMessage.setDataStructure(command);
+ advisoryMessage.setPersistent(false);
+ advisoryMessage.setType(AdvisorySupport.ADIVSORY_MESSAGE_TYPE);
+ advisoryMessage.setMessageId(new MessageId(advisoryProducerId,messageIdGenerator.getNextSequenceId()));
+ advisoryMessage.setTargetConsumerId(targetConsumerId);
+ advisoryMessage.setDestination(topic);
+ advisoryMessage.setResponseRequired(false);
+ advisoryMessage.setProducerId(advisoryProducerId);
+ boolean originalFlowControl=context.isProducerFlowControl();
+ final ProducerBrokerExchange producerExchange=new ProducerBrokerExchange();
+ producerExchange.setConnectionContext(context);
+ producerExchange.setMutable(true);
+ try{
+ context.setProducerFlowControl(false);
+ next.send(producerExchange,advisoryMessage);
+ }finally{
+ context.setProducerFlowControl(originalFlowControl);
+ }
}
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java?view=diff&rev=561088&r1=561087&r2=561088
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java Mon Jul 30 11:45:49 2007
@@ -295,35 +295,35 @@
public Response service(Command command){
Response response=null;
- boolean responseRequired=command.isResponseRequired();
- int commandId=command.getCommandId();
- try{
- response=command.visit(this);
- }catch(Throwable e){
+ if(broker.getBrokerService().isStarted()){
+ boolean responseRequired=command.isResponseRequired();
+ int commandId=command.getCommandId();
+ try{
+ response=command.visit(this);
+ }catch(Throwable e){
+ if(responseRequired){
+ if(serviceLog.isDebugEnabled()&&e.getClass()!=BrokerStoppedException.class)
+ serviceLog.debug("Error occured while processing sync command: "+e,e);
+ response=new ExceptionResponse(e);
+ }else{
+ serviceException(e);
+ }
+ }
if(responseRequired){
- if(serviceLog.isDebugEnabled()&&e.getClass()!=BrokerStoppedException.class)
- serviceLog.debug("Error occured while processing sync command: "+e,e);
- response=new ExceptionResponse(e);
- }else{
- serviceException(e);
+ if(response==null){
+ response=new Response();
+ }
+ response.setCorrelationId(commandId);
}
- }
- if(responseRequired){
- if(response==null){
- response=new Response();
+ // The context may have been flagged so that the response is not sent.
+ if(context!=null){
+ if(context.isDontSendReponse()){
+ context.setDontSendReponse(false);
+ response=null;
+ }
+ context=null;
}
- response.setCorrelationId(commandId);
- }
-
- // The context may have been flagged so that the response is not sent.
- if( context!=null ) {
- if( context.isDontSendReponse() ) {
- context.setDontSendReponse(false);
- response=null;
- }
- context=null;
}
-
return response;
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java?view=diff&rev=561088&r1=561087&r2=561088
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java Mon Jul 30 11:45:49 2007
@@ -107,6 +107,19 @@
public void setMemoryLimit(long limit) {
destination.getUsageManager().setLimit(limit);
}
+
+ public double getAverageEnqueueTime(){
+ return destination.getDestinationStatistics().getProcessTime().getAverageTime();
+ }
+
+ public long getMaxEnqueueTime(){
+ return destination.getDestinationStatistics().getProcessTime().getMaxTime();
+ }
+
+ public long getMinEnqueueTime(){
+ return destination.getDestinationStatistics().getProcessTime().getMinTime();
+ }
+
public CompositeData[] browse() throws OpenDataException{
try {
@@ -260,5 +273,4 @@
}
}
-
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationViewMBean.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationViewMBean.java?view=diff&rev=561088&r1=561087&r2=561088
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationViewMBean.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationViewMBean.java Mon Jul 30 11:45:49 2007
@@ -125,5 +125,21 @@
* Browses the current destination with the given selector returning a list of messages
*/
public List browseMessages(String selector) throws InvalidSelectorException;
+
+
+ /**
+ * @return longest time a message is held by a destination
+ */
+ public long getMaxEnqueueTime();
+
+ /**
+ * @return shortest time a message is held by a destination
+ */
+ public long getMinEnqueueTime();
+
+ /**
+ * @return average time a message is held by a destination
+ */
+ public double getAverageEnqueueTime();
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationStatistics.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationStatistics.java?view=diff&rev=561088&r1=561087&r2=561088
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationStatistics.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationStatistics.java Mon Jul 30 11:45:49 2007
@@ -22,6 +22,7 @@
import org.apache.activemq.management.CountStatisticImpl;
import org.apache.activemq.management.PollCountStatisticImpl;
import org.apache.activemq.management.StatsImpl;
+import org.apache.activemq.management.TimeStatisticImpl;
/**
* The J2EE Statistics for the a Destination.
@@ -36,6 +37,7 @@
protected CountStatisticImpl messages;
protected PollCountStatisticImpl messagesCached;
protected CountStatisticImpl dispatched;
+ protected TimeStatisticImpl processTime;
public DestinationStatistics() {
@@ -45,13 +47,14 @@
consumers = new CountStatisticImpl("consumers", "The number of consumers that that are subscribing to messages from the destination");
messages = new CountStatisticImpl("messages", "The number of messages that that are being held by the destination");
messagesCached = new PollCountStatisticImpl("messagesCached", "The number of messages that are held in the destination's memory cache");
-
+ processTime = new TimeStatisticImpl("processTime","information around length of time messages are held by a destination");
addStatistic("enqueues", enqueues);
addStatistic("dispatched", dispatched);
addStatistic("dequeues", dequeues);
addStatistic("consumers", consumers);
addStatistic("messages", messages);
addStatistic("messagesCached", messagesCached);
+ addStatistic("processTime",processTime);
}
public CountStatisticImpl getEnqueues() {
@@ -73,6 +76,18 @@
public CountStatisticImpl getMessages() {
return messages;
}
+
+ public void setMessagesCached(PollCountStatisticImpl messagesCached) {
+ this.messagesCached = messagesCached;
+ }
+
+ public CountStatisticImpl getDispatched() {
+ return dispatched;
+ }
+
+ public TimeStatisticImpl getProcessTime(){
+ return this.processTime;
+ }
public void reset() {
super.reset();
@@ -89,6 +104,7 @@
consumers.setEnabled(enabled);
messages.setEnabled(enabled);
messagesCached.setEnabled(enabled);
+ processTime.setEnabled(enabled);
}
@@ -100,6 +116,7 @@
consumers.setParent(parent.consumers);
messagesCached.setParent(parent.messagesCached);
messages.setParent(parent.messages);
+ processTime.setParent(parent.processTime);
}
else {
enqueues.setParent(null);
@@ -108,14 +125,9 @@
consumers.setParent(null);
messagesCached.setParent(null);
messages.setParent(null);
+ processTime.setParent(null);
}
}
- public void setMessagesCached(PollCountStatisticImpl messagesCached) {
- this.messagesCached = messagesCached;
- }
-
- public CountStatisticImpl getDispatched() {
- return dispatched;
- }
+
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java?view=diff&rev=561088&r1=561087&r2=561088
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java Mon Jul 30 11:45:49 2007
@@ -539,10 +539,15 @@
return result;
}
- public void preProcessDispatch(MessageDispatch messageDispatch){
- Message message = messageDispatch.getMessage();
- if(message != null) {
- message.setBrokerOutTime(System.currentTimeMillis());
+ public void preProcessDispatch(MessageDispatch messageDispatch){
+ Message message=messageDispatch.getMessage();
+ if(message!=null){
+ long endTime=System.currentTimeMillis();
+ message.setBrokerOutTime(endTime);
+ if(getBrokerService().isEnableStatistics()){
+ long totalTime = endTime - message.getBrokerInTime();
+ message.getRegionDestination().getDestinationStatistics().getProcessTime().addTime(totalTime);
+ }
}
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/filter/PropertyExpression.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/filter/PropertyExpression.java?view=diff&rev=561088&r1=561087&r2=561088
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/filter/PropertyExpression.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/filter/PropertyExpression.java Mon Jul 30 11:45:49 2007
@@ -137,6 +137,16 @@
return new Integer(txId.toString());
}
});
+ JMS_PROPERTY_EXPRESSIONS.put("JMS_ActiveMQBrokerInTime", new SubExpression() {
+ public Object evaluate(Message message) {
+ return Long.valueOf(message.getBrokerInTime());
+ }
+ });
+ JMS_PROPERTY_EXPRESSIONS.put("JMS_ActiveMQBrokerOutTime", new SubExpression() {
+ public Object evaluate(Message message) {
+ return Long.valueOf(message.getBrokerOutTime());
+ }
+ });
}
private final String name;
Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/StubBroker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/StubBroker.java?view=diff&rev=561088&r1=561087&r2=561088
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/StubBroker.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/StubBroker.java Mon Jul 30 11:45:49 2007
@@ -153,8 +153,11 @@
public int prepareTransaction(ConnectionContext context, TransactionId xid) throws Exception {
return 0;
}
+
+ public void preProcessDispatch(MessageDispatch messageDispatch) {
+ }
- public void processDispatch(MessageDispatch messageDispatch) {
+ public void postProcessDispatch(MessageDispatch messageDispatch) {
}
public void removeBroker(Connection connection, BrokerInfo info) {
@@ -244,6 +247,10 @@
public BrokerService getBrokerService(){
return null;
+ }
+
+ public boolean isExpired(MessageReference messageReference) {
+ return false;
}
public void messageExpired(ConnectionContext context,MessageReference messageReference){