You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ra...@apache.org on 2007/07/30 20:45:50 UTC

svn commit: r561088 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/advisory/ main/java/org/apache/activemq/broker/ main/java/org/apache/activemq/broker/jmx/ main/java/org/apache/activemq/broker/region/ main/java/org/apache/active...

Author: rajdavies
Date: Mon Jul 30 11:45:49 2007
New Revision: 561088

URL: http://svn.apache.org/viewvc?view=rev&rev=561088
Log:
Fix for https://issues.apache.org/activemq/browse/AMQ-567

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationViewMBean.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationStatistics.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/filter/PropertyExpression.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/StubBroker.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java?view=diff&rev=561088&r1=561087&r2=561088
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java Mon Jul 30 11:45:49 2007
@@ -270,25 +270,27 @@
         fireAdvisory(context, topic, command, targetConsumerId, advisoryMessage);
     }
 
-    protected void fireAdvisory(ConnectionContext context, ActiveMQTopic topic, Command command, ConsumerId targetConsumerId, ActiveMQMessage advisoryMessage) throws Exception {
-        advisoryMessage.setDataStructure(command);
-        advisoryMessage.setPersistent(false);
-        advisoryMessage.setType(AdvisorySupport.ADIVSORY_MESSAGE_TYPE);
-        advisoryMessage.setMessageId(new MessageId(advisoryProducerId, messageIdGenerator.getNextSequenceId()));
-        advisoryMessage.setTargetConsumerId(targetConsumerId);
-
-        advisoryMessage.setDestination(topic);
-        advisoryMessage.setResponseRequired(false);
-        advisoryMessage.setProducerId(advisoryProducerId);
-        boolean originalFlowControl = context.isProducerFlowControl();
-        final ProducerBrokerExchange producerExchange = new ProducerBrokerExchange();
-        producerExchange.setConnectionContext(context);
-        producerExchange.setMutable(true);
-        try {
-            context.setProducerFlowControl(false);
-            next.send(producerExchange, advisoryMessage);
-        } finally {
-            context.setProducerFlowControl(originalFlowControl);
+    protected void fireAdvisory(ConnectionContext context,ActiveMQTopic topic,Command command,
+            ConsumerId targetConsumerId,ActiveMQMessage advisoryMessage) throws Exception{
+        if(getBrokerService().isStarted()){
+            advisoryMessage.setDataStructure(command);
+            advisoryMessage.setPersistent(false);
+            advisoryMessage.setType(AdvisorySupport.ADIVSORY_MESSAGE_TYPE);
+            advisoryMessage.setMessageId(new MessageId(advisoryProducerId,messageIdGenerator.getNextSequenceId()));
+            advisoryMessage.setTargetConsumerId(targetConsumerId);
+            advisoryMessage.setDestination(topic);
+            advisoryMessage.setResponseRequired(false);
+            advisoryMessage.setProducerId(advisoryProducerId);
+            boolean originalFlowControl=context.isProducerFlowControl();
+            final ProducerBrokerExchange producerExchange=new ProducerBrokerExchange();
+            producerExchange.setConnectionContext(context);
+            producerExchange.setMutable(true);
+            try{
+                context.setProducerFlowControl(false);
+                next.send(producerExchange,advisoryMessage);
+            }finally{
+                context.setProducerFlowControl(originalFlowControl);
+            }
         }
     }
 

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java?view=diff&rev=561088&r1=561087&r2=561088
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java Mon Jul 30 11:45:49 2007
@@ -295,35 +295,35 @@
 
     public Response service(Command command){
         Response response=null;
-        boolean responseRequired=command.isResponseRequired();
-        int commandId=command.getCommandId();
-        try{
-            response=command.visit(this);
-        }catch(Throwable e){
+        if(broker.getBrokerService().isStarted()){
+            boolean responseRequired=command.isResponseRequired();
+            int commandId=command.getCommandId();
+            try{
+                response=command.visit(this);
+            }catch(Throwable e){
+                if(responseRequired){
+                    if(serviceLog.isDebugEnabled()&&e.getClass()!=BrokerStoppedException.class)
+                        serviceLog.debug("Error occured while processing sync command: "+e,e);
+                    response=new ExceptionResponse(e);
+                }else{
+                    serviceException(e);
+                }
+            }
             if(responseRequired){
-                if(serviceLog.isDebugEnabled()&&e.getClass()!=BrokerStoppedException.class)
-                    serviceLog.debug("Error occured while processing sync command: "+e,e);
-                response=new ExceptionResponse(e);
-            }else{
-                serviceException(e);
+                if(response==null){
+                    response=new Response();
+                }
+                response.setCorrelationId(commandId);
             }
-        }
-        if(responseRequired){
-            if(response==null){
-                response=new Response();
+            // The context may have been flagged so that the response is not sent.
+            if(context!=null){
+                if(context.isDontSendReponse()){
+                    context.setDontSendReponse(false);
+                    response=null;
+                }
+                context=null;
             }
-            response.setCorrelationId(commandId);
-        }
-        
-        // The context may have been flagged so that the response is not sent.
-        if( context!=null ) {
-        	if( context.isDontSendReponse() ) {
-        		context.setDontSendReponse(false);
-        		response=null;
-        	}
-            context=null;
         }
-        
         return response;
     }
 

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java?view=diff&rev=561088&r1=561087&r2=561088
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java Mon Jul 30 11:45:49 2007
@@ -107,6 +107,19 @@
     public void setMemoryLimit(long limit) {
        destination.getUsageManager().setLimit(limit);
     }
+    
+    public double getAverageEnqueueTime(){
+        return destination.getDestinationStatistics().getProcessTime().getAverageTime();
+    }
+
+    public long getMaxEnqueueTime(){
+        return destination.getDestinationStatistics().getProcessTime().getMaxTime();
+    }
+
+    public long getMinEnqueueTime(){
+        return destination.getDestinationStatistics().getProcessTime().getMinTime();
+    }
+
 
     public CompositeData[] browse() throws OpenDataException{
         try {
@@ -260,5 +273,4 @@
     	}
     	
     }
-
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationViewMBean.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationViewMBean.java?view=diff&rev=561088&r1=561087&r2=561088
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationViewMBean.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationViewMBean.java Mon Jul 30 11:45:49 2007
@@ -125,5 +125,21 @@
      * Browses the current destination with the given selector returning a list of messages
      */
     public List browseMessages(String selector) throws InvalidSelectorException;
+    
+       
+    /**
+     * @return longest time a message is held by a destination
+     */
+    public long getMaxEnqueueTime();
+    
+    /**
+     * @return shortest time a message is held by a destination
+     */
+    public long getMinEnqueueTime();
+    
+    /**
+     * @return average time a message is held by a destination
+     */
+    public double getAverageEnqueueTime();
 
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationStatistics.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationStatistics.java?view=diff&rev=561088&r1=561087&r2=561088
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationStatistics.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationStatistics.java Mon Jul 30 11:45:49 2007
@@ -22,6 +22,7 @@
 import org.apache.activemq.management.CountStatisticImpl;
 import org.apache.activemq.management.PollCountStatisticImpl;
 import org.apache.activemq.management.StatsImpl;
+import org.apache.activemq.management.TimeStatisticImpl;
 
 /**
  * The J2EE Statistics for the a Destination.
@@ -36,6 +37,7 @@
     protected CountStatisticImpl messages;
     protected PollCountStatisticImpl messagesCached;
     protected CountStatisticImpl dispatched;
+    protected TimeStatisticImpl processTime;
 
     public DestinationStatistics() {
 
@@ -45,13 +47,14 @@
         consumers = new CountStatisticImpl("consumers", "The number of consumers that that are subscribing to messages from the destination");
         messages = new CountStatisticImpl("messages", "The number of messages that that are being held by the destination");
         messagesCached = new PollCountStatisticImpl("messagesCached", "The number of messages that are held in the destination's memory cache");
-
+        processTime = new TimeStatisticImpl("processTime","information around length of time messages are held by a destination");
         addStatistic("enqueues", enqueues);
         addStatistic("dispatched", dispatched);
         addStatistic("dequeues", dequeues);
         addStatistic("consumers", consumers);
         addStatistic("messages", messages);
         addStatistic("messagesCached", messagesCached);
+        addStatistic("processTime",processTime);
     }
 
     public CountStatisticImpl getEnqueues() {
@@ -73,6 +76,18 @@
     public CountStatisticImpl getMessages() {
         return messages;
     }
+    
+    public void setMessagesCached(PollCountStatisticImpl messagesCached) {
+        this.messagesCached = messagesCached;
+    }
+
+    public CountStatisticImpl getDispatched() {
+    return dispatched;
+    }
+
+    public TimeStatisticImpl getProcessTime(){
+        return this.processTime;
+    }
 
     public void reset() {
         super.reset();
@@ -89,6 +104,7 @@
         consumers.setEnabled(enabled);
         messages.setEnabled(enabled);
         messagesCached.setEnabled(enabled);
+        processTime.setEnabled(enabled);
 
     }    
 
@@ -100,6 +116,7 @@
             consumers.setParent(parent.consumers);
             messagesCached.setParent(parent.messagesCached);
             messages.setParent(parent.messages);
+            processTime.setParent(parent.processTime);
         }
         else {
             enqueues.setParent(null);
@@ -108,14 +125,9 @@
             consumers.setParent(null);
             messagesCached.setParent(null);
             messages.setParent(null);
+            processTime.setParent(null);
         }
     }
 
-    public void setMessagesCached(PollCountStatisticImpl messagesCached) {
-        this.messagesCached = messagesCached;
-    }
-
-    public CountStatisticImpl getDispatched() {
-	return dispatched;
-    }
+    
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java?view=diff&rev=561088&r1=561087&r2=561088
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java Mon Jul 30 11:45:49 2007
@@ -539,10 +539,15 @@
         return result;
     }
     
-    public void preProcessDispatch(MessageDispatch messageDispatch){ 
-        Message message = messageDispatch.getMessage();
-        if(message != null) {
-            message.setBrokerOutTime(System.currentTimeMillis());
+    public void preProcessDispatch(MessageDispatch messageDispatch){
+        Message message=messageDispatch.getMessage();
+        if(message!=null){
+            long endTime=System.currentTimeMillis();
+            message.setBrokerOutTime(endTime);
+            if(getBrokerService().isEnableStatistics()){
+                long totalTime = endTime - message.getBrokerInTime();
+                message.getRegionDestination().getDestinationStatistics().getProcessTime().addTime(totalTime);
+            }
         }
     }
     

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/filter/PropertyExpression.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/filter/PropertyExpression.java?view=diff&rev=561088&r1=561087&r2=561088
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/filter/PropertyExpression.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/filter/PropertyExpression.java Mon Jul 30 11:45:49 2007
@@ -137,6 +137,16 @@
                 return new Integer(txId.toString());
             }
         });
+        JMS_PROPERTY_EXPRESSIONS.put("JMS_ActiveMQBrokerInTime", new SubExpression() {
+            public Object evaluate(Message message) {
+                return Long.valueOf(message.getBrokerInTime());
+            }
+        });
+        JMS_PROPERTY_EXPRESSIONS.put("JMS_ActiveMQBrokerOutTime", new SubExpression() {
+            public Object evaluate(Message message) {
+                return Long.valueOf(message.getBrokerOutTime());
+            }
+        });
     }
     
     private final String name;

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/StubBroker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/StubBroker.java?view=diff&rev=561088&r1=561087&r2=561088
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/StubBroker.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/StubBroker.java Mon Jul 30 11:45:49 2007
@@ -153,8 +153,11 @@
     public int prepareTransaction(ConnectionContext context, TransactionId xid) throws Exception {
         return 0;
     }
+    
+    public void preProcessDispatch(MessageDispatch messageDispatch) {
+    }
 
-    public void processDispatch(MessageDispatch messageDispatch) {
+    public void postProcessDispatch(MessageDispatch messageDispatch) {
     }
 
     public void removeBroker(Connection connection, BrokerInfo info) {
@@ -244,6 +247,10 @@
     
     public BrokerService getBrokerService(){
         return null;
+    }
+    
+    public boolean isExpired(MessageReference messageReference) {
+        return false;
     }
 
     public void messageExpired(ConnectionContext context,MessageReference messageReference){