You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ra...@apache.org on 2008/01/21 11:31:25 UTC

svn commit: r613830 - in /activemq/trunk/activemq-core/src/main/java/org/apache/activemq: broker/jmx/ broker/region/ broker/region/policy/ command/

Author: rajdavies
Date: Mon Jan 21 02:31:22 2008
New Revision: 613830

URL: http://svn.apache.org/viewvc?rev=613830&view=rev
Log:
Fix for https://issues.apache.org/activemq/browse/AMQ-1510

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerView.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerViewMBean.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationViewMBean.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFactoryImpl.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicRegion.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/Message.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerView.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerView.java?rev=613830&r1=613829&r2=613830&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerView.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerView.java Mon Jan 21 02:31:22 2008
@@ -104,6 +104,32 @@
     public void setMemoryLimit(long limit) {
         brokerService.getSystemUsage().getMemoryUsage().setLimit(limit);
     }
+    
+    public long getStoreLimit() {
+        return brokerService.getSystemUsage().getStoreUsage().getLimit();
+    }
+
+    public int getStorePercentageUsed() {
+        return brokerService.getSystemUsage().getStoreUsage().getPercentUsage();
+    }
+
+ 
+    public long getTmpLimit() {
+       return brokerService.getSystemUsage().getTempUsage().getLimit();
+    }
+
+    public int getTmpPercentageUsed() {
+       return brokerService.getSystemUsage().getTempUsage().getPercentUsage();
+    }
+
+    public void setStoreLimit(long limit) {
+        brokerService.getSystemUsage().getStoreUsage().setLimit(limit);
+    }
+
+    public void setTmpLimit(long limit) {
+        brokerService.getSystemUsage().getTempUsage().setLimit(limit);
+    }
+    
 
     public void resetStatistics() {
         broker.getDestinationStatistics().reset();
@@ -289,5 +315,4 @@
             throw e.getTargetException();
         }
     }
-    
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerViewMBean.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerViewMBean.java?rev=613830&r1=613829&r2=613830&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerViewMBean.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerViewMBean.java Mon Jan 21 02:31:22 2008
@@ -20,6 +20,7 @@
 
 import org.apache.activemq.Service;
 
+
 /**
  * @author David Martin Clavo david(dot)martin(dot)clavo(at)gmail.com (for the reloadLog4jProperties method)
  * @version $Revision$
@@ -65,6 +66,18 @@
     long getMemoryLimit();
 
     void setMemoryLimit(long limit);
+        
+    int getStorePercentageUsed();
+
+    long getStoreLimit();
+
+    void setStoreLimit(long limit);
+    
+    int getTmpPercentageUsed();
+
+    long getTmpLimit();
+
+    void setTmpLimit(long limit);
     
     boolean isPersistent();
 

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java?rev=613830&r1=613829&r2=613830&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java Mon Jan 21 02:31:22 2008
@@ -94,15 +94,15 @@
     }
 
     public int getMemoryPercentageUsed() {
-        return destination.getBrokerMemoryUsage().getPercentUsage();
+        return destination.getMemoryUsage().getPercentUsage();
     }
 
     public long getMemoryLimit() {
-        return destination.getBrokerMemoryUsage().getLimit();
+        return destination.getMemoryUsage().getLimit();
     }
 
     public void setMemoryLimit(long limit) {
-        destination.getBrokerMemoryUsage().setLimit(limit);
+        destination.getMemoryUsage().setLimit(limit);
     }
 
     public double getAverageEnqueueTime() {
@@ -267,4 +267,51 @@
         }
 
     }
+
+    public int getMaxAuditDepth() {
+        return destination.getMaxAuditDepth();
+     }
+
+     public int getMaxProducersToAudit() {
+         return destination.getMaxProducersToAudit();
+     }
+
+     public boolean isEnableAudit() {
+         return destination.isEnableAudit();
+     }
+
+     
+     public void setEnableAudit(boolean enableAudit) {
+         destination.setEnableAudit(enableAudit);
+     }
+
+     public void setMaxAuditDepth(int maxAuditDepth) {
+         destination.setMaxAuditDepth(maxAuditDepth);
+     }
+ 
+     public void setMaxProducersToAudit(int maxProducersToAudit) {
+         destination.setMaxProducersToAudit(maxProducersToAudit);
+     }
+
+    
+    public float getMemoryLimitPortion() {
+        return destination.getMemoryUsage().getUsagePortion();
+    }
+
+    public long getProducerCount() {
+        return destination.getDestinationStatistics().getProducers().getCount();
+    }
+
+    public boolean isProducerFlowControl() {
+       return destination.isProducerFlowControl();
+    }
+    
+    public void setMemoryLimitPortion(float value) {
+        destination.getMemoryUsage().setUsagePortion(value);
+    }
+
+    public void setProducerFlowControl(boolean producerFlowControl) {
+      destination.setProducerFlowControl(producerFlowControl);      
+    }
+  
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationViewMBean.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationViewMBean.java?rev=613830&r1=613829&r2=613830&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationViewMBean.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationViewMBean.java Mon Jan 21 02:31:22 2008
@@ -67,6 +67,11 @@
      * @return The number of consumers subscribed this destination.
      */
     long getConsumerCount();
+    
+    /**
+     * @return the number of producers publishing to the destination
+     */
+    long getProducerCount();
 
     /**
      * Returns the number of messages in this destination which are yet to be
@@ -119,11 +124,32 @@
      */
     String sendTextMessage(Map headers, String body) throws Exception;
 
+    /**
+     * @return the percentage of amount of memory used
+     */
     int getMemoryPercentageUsed();
 
+    /**
+     * @return the amount of memory allocated to this destination
+     */
     long getMemoryLimit();
 
+    /**
+     * set the amount of memory allocated to this destination
+     * @param limit
+     */
     void setMemoryLimit(long limit);
+    
+    /**
+     * @return the portion of memory from the broker memory limit for this destination
+     */
+    float getMemoryLimitPortion();
+    
+    /**
+     * set the portion of memory from the broker memory limit for this destination
+     * @param value
+     */
+    void setMemoryLimitPortion(float value);
 
     /**
      * Browses the current destination returning a list of messages
@@ -150,5 +176,34 @@
      * @return average time a message is held by a destination
      */
     double getAverageEnqueueTime();
+    
+    /**
+     * @return the producerFlowControl
+     */
+    boolean isProducerFlowControl();
+    /**
+     * @param producerFlowControl the producerFlowControl to set
+     */
+    public void setProducerFlowControl(boolean producerFlowControl);
+    
+    /**
+     * @return the maxProducersToAudit
+     */
+    public int getMaxProducersToAudit();
+    
+    /**
+     * @param maxProducersToAudit the maxProducersToAudit to set
+     */
+    public void setMaxProducersToAudit(int maxProducersToAudit);
+    
+    /**
+     * @return the maxAuditDepth
+     */
+    public int getMaxAuditDepth();
+    
+    /**
+     * @param maxAuditDepth the maxAuditDepth to set
+     */
+    public void setMaxAuditDepth(int maxAuditDepth);
 
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java?rev=613830&r1=613829&r2=613830&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java Mon Jan 21 02:31:22 2008
@@ -64,6 +64,12 @@
         this.destinationStatistics.setEnabled(parentStats.isEnabled());
         this.destinationStatistics.setParent(parentStats);        
     }
+    
+    /**
+     * initialize the destination
+     * @throws Exception
+     */
+    public abstract void initialize() throws Exception;
     /**
      * @return the producerFlowControl
      */
@@ -121,7 +127,7 @@
         destinationStatistics.getProducers().decrement();
     }
     
-    public final MemoryUsage getBrokerMemoryUsage() {
+    public final MemoryUsage getMemoryUsage() {
         return memoryUsage;
     }
 
@@ -143,6 +149,11 @@
     
     public final MessageStore getMessageStore() {
         return store;
+    }
+    
+    public final boolean isActive() {
+        return destinationStatistics.getConsumers().getCount() != 0 ||
+            destinationStatistics.getProducers().getCount() != 0;
     }
 
 

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java?rev=613830&r1=613829&r2=613830&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java Mon Jan 21 02:31:22 2008
@@ -53,7 +53,7 @@
 
     ActiveMQDestination getActiveMQDestination();
 
-    MemoryUsage getBrokerMemoryUsage();
+    MemoryUsage getMemoryUsage();
 
     void dispose(ConnectionContext context) throws IOException;
 
@@ -70,4 +70,20 @@
     boolean isProducerFlowControl();
     
     void setProducerFlowControl(boolean value);
+    
+    int getMaxProducersToAudit();
+    
+    void setMaxProducersToAudit(int maxProducersToAudit);
+   
+    int getMaxAuditDepth();
+   
+    void setMaxAuditDepth(int maxAuditDepth);
+  
+    boolean isEnableAudit();
+    
+    void setEnableAudit(boolean enableAudit);
+    
+    boolean isActive();
+    
+    
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFactoryImpl.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFactoryImpl.java?rev=613830&r1=613829&r2=613830&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFactoryImpl.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFactoryImpl.java Mon Jan 21 02:31:22 2008
@@ -115,6 +115,7 @@
             }
             Topic topic = new Topic(broker.getRoot(), destination, store, memoryManager, destinationStatistics, taskRunnerFactory);
             configureTopic(topic, destination);
+            topic.initialize();
             return topic;
         }
     }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java?rev=613830&r1=613829&r2=613830&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java Mon Jan 21 02:31:22 2008
@@ -81,8 +81,8 @@
         return next.getName();
     }
 
-    public MemoryUsage getBrokerMemoryUsage() {
-        return next.getBrokerMemoryUsage();
+    public MemoryUsage getMemoryUsage() {
+        return next.getMemoryUsage();
     }
 
     public boolean lock(MessageReference node, LockOwner lockOwner) {
@@ -141,6 +141,34 @@
             throws Exception {
        next.removeProducer(context, info);
     }
+
+    public int getMaxAuditDepth() {
+       return next.getMaxAuditDepth();
+    }
+
+    public int getMaxProducersToAudit() {
+        return next.getMaxProducersToAudit();
+    }
+
+    public boolean isEnableAudit() {
+        return next.isEnableAudit();
+    }
+
     
+    public void setEnableAudit(boolean enableAudit) {
+        next.setEnableAudit(enableAudit);
+    }
+
+    public void setMaxAuditDepth(int maxAuditDepth) {
+       next.setMaxAuditDepth(maxAuditDepth);
+    }
+
     
+    public void setMaxProducersToAudit(int maxProducersToAudit) {
+       next.setMaxProducersToAudit(maxProducersToAudit);
+    }
+    
+    public boolean isActive() {
+        return next.isActive();
+    }
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java?rev=613830&r1=613829&r2=613830&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java Mon Jan 21 02:31:22 2008
@@ -17,11 +17,12 @@
 package org.apache.activemq.broker.region;
 
 import java.io.IOException;
-import java.util.ArrayList;
 import java.util.Iterator;
-import java.util.List;
 import java.util.concurrent.ConcurrentHashMap;
+
 import javax.jms.InvalidSelectorException;
+import javax.jms.JMSException;
+
 import org.apache.activemq.broker.Broker;
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.broker.region.cursors.StoreDurableSubscriberCursor;
@@ -31,9 +32,10 @@
 import org.apache.activemq.command.MessageAck;
 import org.apache.activemq.command.MessageDispatch;
 import org.apache.activemq.command.MessageId;
+import org.apache.activemq.store.TopicMessageStore;
+import org.apache.activemq.usage.SystemUsage;
 import org.apache.activemq.usage.Usage;
 import org.apache.activemq.usage.UsageListener;
-import org.apache.activemq.usage.SystemUsage;
 import org.apache.activemq.util.SubscriptionKey;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -47,13 +49,23 @@
     private final boolean keepDurableSubsActive;
     private boolean active;
 
-    public DurableTopicSubscription(Broker broker, SystemUsage usageManager, ConnectionContext context, ConsumerInfo info, boolean keepDurableSubsActive)
-        throws InvalidSelectorException {
+    public DurableTopicSubscription(Broker broker, Destination dest,SystemUsage usageManager, ConnectionContext context, ConsumerInfo info, boolean keepDurableSubsActive)
+        throws JMSException {
         super(broker,usageManager, context, info);
         this.pending = new StoreDurableSubscriberCursor(context.getClientId(), info.getSubscriptionName(), broker.getTempDataStore(), info.getPrefetchSize(), this);
         this.pending.setSystemUsage(usageManager);
         this.keepDurableSubsActive = keepDurableSubsActive;
         subscriptionKey = new SubscriptionKey(context.getClientId(), info.getSubscriptionName());
+        if (dest != null && dest.getMessageStore() != null) {
+            TopicMessageStore store = (TopicMessageStore)dest.getMessageStore();
+            try {
+                this.enqueueCounter=store.getMessageCount(subscriptionKey.getClientId(),subscriptionKey.getSubscriptionName());
+            } catch (IOException e) {
+                JMSException jmsEx = new JMSException("Failed to retrieve eunqueueCount from store "+ e);
+                jmsEx.setLinkedException(e);
+                throw jmsEx;
+            }
+        }
     }
 
     public boolean isActive() {

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?rev=613830&r1=613829&r2=613830&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java Mon Jan 21 02:31:22 2008
@@ -150,6 +150,9 @@
                         return true;
                     }
                 });
+            }else {
+                int messageCount = store.getMessageCount();
+                destinationStatistics.getMessages().setCount(messageCount);
             }
         }
     }
@@ -320,7 +323,8 @@
         final boolean sendProducerAck = !message.isResponseRequired() && producerInfo.getWindowSize() > 0 && !context.isInRecoveryMode();
         if (message.isExpired()) {
             broker.messageExpired(context, message);
-            destinationStatistics.getMessages().decrement();
+            //message not added to stats yet
+            //destinationStatistics.getMessages().decrement();
             if (sendProducerAck) {
                 ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), message.getSize());
                 context.getConnection().dispatchAsync(ack);
@@ -346,7 +350,8 @@
                                 // message may have expired.
                                 if (broker.isExpired(message)) {
                                     broker.messageExpired(context, message);
-                                    destinationStatistics.getMessages().decrement();
+                                    //message not added to stats yet
+                                    //destinationStatistics.getMessages().decrement();
                                 } else {
                                     doMessageSend(producerExchange, message);
                                 }
@@ -436,7 +441,8 @@
                         // op, by that time the message could have expired..
                         if (broker.isExpired(message)) {
                             broker.messageExpired(context, message);
-                            destinationStatistics.getMessages().decrement();
+                            //message not added to stats yet
+                            //destinationStatistics.getMessages().decrement();
                             return;
                         }
                         sendMessage(context, message);

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java?rev=613830&r1=613829&r2=613830&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java Mon Jan 21 02:31:22 2008
@@ -56,8 +56,6 @@
 import org.apache.activemq.command.TransactionId;
 import org.apache.activemq.kaha.Store;
 import org.apache.activemq.state.ConnectionState;
-import org.apache.activemq.store.PersistenceAdapter;
-import org.apache.activemq.store.memory.MemoryPersistenceAdapter;
 import org.apache.activemq.thread.TaskRunnerFactory;
 import org.apache.activemq.usage.SystemUsage;
 import org.apache.activemq.util.BrokerSupport;

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java?rev=613830&r1=613829&r2=613830&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java Mon Jan 21 02:31:22 2008
@@ -99,6 +99,13 @@
         } 
         this.taskRunner = taskFactory.createTaskRunner(this, "Topic  " + destination.getPhysicalName());
     }
+    
+    public void initialize() throws Exception{
+        if (store != null) {
+            int messageCount = store.getMessageCount();
+            destinationStatistics.getMessages().setCount(messageCount);
+        }
+    }
 
     public boolean lock(MessageReference node, LockOwner sub) {
         return true;
@@ -288,7 +295,8 @@
                                 // message may have expired.
                                 if (broker.isExpired(message)) {
                                     broker.messageExpired(context, message);
-                                    destinationStatistics.getMessages().decrement();
+                                    //destinationStatistics.getEnqueues().increment();
+                                    //destinationStatistics.getMessages().decrement();
                                 } else {
                                     doMessageSend(producerExchange, message);
                                 }
@@ -394,7 +402,8 @@
                     if (broker.isExpired(message)) {
                         broker.messageExpired(context, message);
                         message.decrementReferenceCount();
-                        destinationStatistics.getMessages().decrement();
+                        //destinationStatistics.getEnqueues().increment();
+                        //destinationStatistics.getMessages().decrement();
                         return;
                     }
                     try {
@@ -543,6 +552,7 @@
     // Implementation methods
     // -------------------------------------------------------------------------
     protected void dispatch(final ConnectionContext context, Message message) throws Exception {
+        destinationStatistics.getMessages().increment();
         destinationStatistics.getEnqueues().increment();
         dispatchValve.increment();
         MessageEvaluationContext msgContext = context.getMessageEvaluationContext();

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicRegion.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicRegion.java?rev=613830&r1=613829&r2=613830&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicRegion.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicRegion.java Mon Jan 21 02:31:22 2008
@@ -229,9 +229,17 @@
             }
             SubscriptionKey key = new SubscriptionKey(context.getClientId(), info.getSubscriptionName());
             DurableTopicSubscription sub = durableSubscriptions.get(key);
+            ActiveMQDestination destination = info.getDestination();
             if (sub == null) {
-                sub = new DurableTopicSubscription(broker, usageManager, context, info, keepDurableSubsActive);
-                ActiveMQDestination destination = info.getDestination();
+                Destination dest=null;
+                try {
+                    dest = lookup(context, destination);
+                } catch (Exception e) {
+                    JMSException jmsEx = new JMSException("Failed to retrieve destination from region "+ e);
+                    jmsEx.setLinkedException(e);
+                    throw jmsEx;
+                }
+                sub = new DurableTopicSubscription(broker,dest, usageManager, context, info, keepDurableSubsActive);
                 if (destination != null && broker.getDestinationPolicy() != null) {
                     PolicyEntry entry = broker.getDestinationPolicy().getEntryFor(destination);
                     if (entry != null) {

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java?rev=613830&r1=613829&r2=613830&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java Mon Jan 21 02:31:22 2008
@@ -67,7 +67,7 @@
         }
         queue.setMessageGroupMapFactory(getMessageGroupMapFactory());
         if (memoryLimit > 0) {
-            queue.getBrokerMemoryUsage().setLimit(memoryLimit);
+            queue.getMemoryUsage().setLimit(memoryLimit);
         }
         if (pendingQueuePolicy != null) {
             PendingMessageCursor messages = pendingQueuePolicy.getQueuePendingMessageCursor(queue, tmpStore);
@@ -91,7 +91,7 @@
         }
         topic.setSendAdvisoryIfNoConsumers(sendAdvisoryIfNoConsumers);
         if (memoryLimit > 0) {
-            topic.getBrokerMemoryUsage().setLimit(memoryLimit);
+            topic.getMemoryUsage().setLimit(memoryLimit);
         }
         topic.setProducerFlowControl(isProducerFlowControl());
         topic.setEnableAudit(isEnableAudit());

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/Message.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/Message.java?rev=613830&r1=613829&r2=613830&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/Message.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/Message.java Mon Jan 21 02:31:22 2008
@@ -571,7 +571,7 @@
     public void setRegionDestination(org.apache.activemq.broker.region.Destination destination) {
         this.regionDestination = destination;
         if(this.memoryUsage==null) {
-            this.memoryUsage=regionDestination.getBrokerMemoryUsage();
+            this.memoryUsage=regionDestination.getMemoryUsage();
         }
     }
     



Re: svn commit: r613830 - in /activemq/trunk/activemq-core/src/main/java/org/apache/activemq: broker/jmx/ broker/region/ broker/region/policy/ command/

Posted by Rob Davies <ra...@gmail.com>.
Thanks Glen - will apply in my next commit!

cheers,

Rob

On Jan 22, 2008, at 2:51 AM, Glen Mazza wrote:

>
> Am Montag, den 21.01.2008, 10:31 +0000 schrieb rajdavies@apache.org:
>> Author: rajdavies
>> Date: Mon Jan 21 02:31:22 2008
>> New Revision: 613830
>>
>> URL: http://svn.apache.org/viewvc?rev=613830&view=rev
>> Log:
>> Fix for https://issues.apache.org/activemq/browse/AMQ-1510
>>
>> Modified:
>>    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ 
>> broker/jmx/BrokerView.java
>>    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ 
>> broker/jmx/BrokerViewMBean.java
>>    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ 
>> broker/jmx/DestinationView.java
>>    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ 
>> broker/jmx/DestinationViewMBean.java
>>    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ 
>> broker/region/BaseDestination.java
>>    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ 
>> broker/region/Destination.java
>>    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ 
>> broker/region/DestinationFactoryImpl.java
>>    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ 
>> broker/region/DestinationFilter.java
>>    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ 
>> broker/region/DurableTopicSubscription.java
>>    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ 
>> broker/region/Queue.java
>>    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ 
>> broker/region/RegionBroker.java
>>    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ 
>> broker/region/Topic.java
>>    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ 
>> broker/region/TopicRegion.java
>>    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ 
>> broker/region/policy/PolicyEntry.java
>>    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ 
>> command/Message.java
>>
>> Modified: activemq/trunk/activemq-core/src/main/java/org/apache/ 
>> activemq/broker/jmx/BrokerView.java
>> URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerView.java?rev=613830&r1=613829&r2=613830&view=diff
>> = 
>> = 
>> = 
>> = 
>> = 
>> = 
>> = 
>> = 
>> = 
>> =====================================================================
>> --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ 
>> broker/jmx/BrokerView.java (original)
>> +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ 
>> broker/jmx/BrokerView.java Mon Jan 21 02:31:22 2008
>> @@ -104,6 +104,32 @@
>
> Something seems wrong with this class.  Looking at the four methods
> here:
>
>    public ObjectName[] getQueueSubscribers() {
>        return broker.getQueueSubscribers();
>    }
>
>    public ObjectName[] getTemporaryQueueSubscribers() {
>        return broker.getTemporaryQueueSubscribers();
>    }
>
>    public ObjectName[] getTopicSubscribers() {
>        return broker.getTemporaryTopicSubscribers();
>    }
>
>    public ObjectName[] getTemporaryTopicSubscribers() {
>        return broker.getTemporaryTopicSubscribers();
>    }
>
> I think the third needs to be:
> "return broker.getTopicSubscribers()"
>
>
>
>> +
>> +    public int getStorePercentageUsed() {
>> +        return  
>> brokerService.getSystemUsage().getStoreUsage().getPercentUsage();
>> +    }
>> +
>> +    public int getTmpPercentageUsed() {
>> +       return  
>> brokerService.getSystemUsage().getTempUsage().getPercentUsage();
>> +    }
>>
>
> There is a switch between "Usage"/"Used" naming here.  I can't  
> recommend
> a preference, but if those brokerServer.getXXXXUsage() methods are
> externally available you may wish to standardize on one or the other.
>
>
>> -
>> }
>>
>> Modified: activemq/trunk/activemq-core/src/main/java/org/apache/ 
>> activemq/broker/jmx/BrokerViewMBean.java
>> URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerViewMBean.java?rev=613830&r1=613829&r2=613830&view=diff
>> = 
>> = 
>> = 
>> = 
>> = 
>> = 
>> = 
>> = 
>> = 
>> =====================================================================
>> --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ 
>> broker/jmx/BrokerViewMBean.java (original)
>> +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ 
>> broker/jmx/BrokerViewMBean.java Mon Jan 21 02:31:22 2008
>> @@ -20,6 +20,7 @@
>>
>> import org.apache.activemq.Service;
>>
>> +
>> /**
>>  * @author David Martin Clavo  
>> david(dot)martin(dot)clavo(at)gmail.com (for the  
>> reloadLog4jProperties method)
>>  * @version $Revision$
>> @@ -65,6 +66,18 @@
>>     long getMemoryLimit();
>>
>>     void setMemoryLimit(long limit);
>> +
>> +    int getStorePercentageUsed();
>> +
>> +    long getStoreLimit();
>> +
>> +    void setStoreLimit(long limit);
>> +
>> +    int getTmpPercentageUsed();
>> +
>> +    long getTmpLimit();
>> +
>> +    void setTmpLimit(long limit);
>>
>
> Normally, "Temp" is the abbreviation--are you sure you wish to go with
> "Tmp" here?  No big deal, just wish to confirm.
>
>
>
>>     boolean isPersistent();
>>
>>
>> Modified: activemq/trunk/activemq-core/src/main/java/org/apache/ 
>> activemq/broker/jmx/DestinationView.java
>> URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java?rev=613830&r1=613829&r2=613830&view=diff
>> = 
>> = 
>> = 
>> = 
>> = 
>> = 
>> = 
>> = 
>> = 
>> =====================================================================
>> --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ 
>> broker/jmx/DestinationView.java (original)
>> +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ 
>> broker/jmx/DestinationView.java Mon Jan 21 02:31:22 2008
>> @@ -94,15 +94,15 @@
>>     }
>>
>>     public int getMemoryPercentageUsed() {
>> -        return destination.getBrokerMemoryUsage().getPercentUsage();
>> +        return destination.getMemoryUsage().getPercentUsage();
>>     }
>>
>
> Same usage/used comment as above.
>
>
>>     public long getMemoryLimit() {
>> -        return destination.getBrokerMemoryUsage().getLimit();
>> +        return destination.getMemoryUsage().getLimit();
>>     }
>>
>>     public void setMemoryLimit(long limit) {
>> -        destination.getBrokerMemoryUsage().setLimit(limit);
>> +        destination.getMemoryUsage().setLimit(limit);
>>     }
>>
>
> There were two other methods added to this class that might cause
> confusion with the above two methods:
>
>> +    public float getMemoryLimitPortion() {
>> +        return destination.getMemoryUsage().getUsagePortion();
>> +    }
>> +
>> +
>> +    public void setMemoryLimitPortion(float value) {
>> +        destination.getMemoryUsage().setUsagePortion(value);
>> +    }
>> +
>>
>
> Might you have meant "getMemoryUsagePortion()"?
>
>
>
>> Modified: activemq/trunk/activemq-core/src/main/java/org/apache/ 
>> activemq/broker/region/DurableTopicSubscription.java
>> URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java?rev=613830&r1=613829&r2=613830&view=diff
>> = 
>> = 
>> = 
>> = 
>> = 
>> = 
>> = 
>> = 
>> = 
>> =====================================================================
>> --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ 
>> broker/region/DurableTopicSubscription.java (original)
>> +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ 
>> broker/region/DurableTopicSubscription.java Mon Jan 21 02:31:22 2008
>> @@ -17,11 +17,12 @@
>
> DurableTopicSubscription exposes both the SubscriptionKey and methods
> that access the two main values of SubscriptionKey:
>
>    public SubscriptionKey getSubscriptionKey() {
>        return subscriptionKey;
>    }
>
>    public String getClientId() {
>        return subscriptionKey.getClientId();
>    }
>
>    public String getSubscriptionName() {
>        return subscriptionKey.getSubscriptionName();
>    }
>
> Searching is showing that these methods are very seldom called, I  
> would
> guess the latter two can be removed, as that information is already
> available from the first method.  (Alternatively, if it is desired to
> encapsulate the key, to just remove the first method.)
>
> Regards,
> Glen
>


Re: svn commit: r613830 - in /activemq/trunk/activemq-core/src/main/java/org/apache/activemq: broker/jmx/ broker/region/ broker/region/policy/ command/

Posted by Glen Mazza <gl...@verizon.net>.
Am Montag, den 21.01.2008, 10:31 +0000 schrieb rajdavies@apache.org:
> Author: rajdavies
> Date: Mon Jan 21 02:31:22 2008
> New Revision: 613830
> 
> URL: http://svn.apache.org/viewvc?rev=613830&view=rev
> Log:
> Fix for https://issues.apache.org/activemq/browse/AMQ-1510
> 
> Modified:
>     activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerView.java
>     activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerViewMBean.java
>     activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java
>     activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationViewMBean.java
>     activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
>     activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java
>     activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFactoryImpl.java
>     activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java
>     activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
>     activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
>     activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
>     activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
>     activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicRegion.java
>     activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
>     activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/Message.java
> 
> Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerView.java
> URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerView.java?rev=613830&r1=613829&r2=613830&view=diff
> ==============================================================================
> --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerView.java (original)
> +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerView.java Mon Jan 21 02:31:22 2008
> @@ -104,6 +104,32 @@

Something seems wrong with this class.  Looking at the four methods
here:

    public ObjectName[] getQueueSubscribers() {
        return broker.getQueueSubscribers();
    }

    public ObjectName[] getTemporaryQueueSubscribers() {
        return broker.getTemporaryQueueSubscribers();
    }

    public ObjectName[] getTopicSubscribers() {
        return broker.getTemporaryTopicSubscribers();
    }

    public ObjectName[] getTemporaryTopicSubscribers() {
        return broker.getTemporaryTopicSubscribers();
    }

I think the third needs to be:
"return broker.getTopicSubscribers()"



> +
> +    public int getStorePercentageUsed() {
> +        return brokerService.getSystemUsage().getStoreUsage().getPercentUsage();
> +    }
> +
> +    public int getTmpPercentageUsed() {
> +       return brokerService.getSystemUsage().getTempUsage().getPercentUsage();
> +    }
> 

There is a switch between "Usage"/"Used" naming here.  I can't recommend
a preference, but if those brokerServer.getXXXXUsage() methods are
externally available you may wish to standardize on one or the other.


> -    
>  }
> 
> Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerViewMBean.java
> URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerViewMBean.java?rev=613830&r1=613829&r2=613830&view=diff
> ==============================================================================
> --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerViewMBean.java (original)
> +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerViewMBean.java Mon Jan 21 02:31:22 2008
> @@ -20,6 +20,7 @@
>  
>  import org.apache.activemq.Service;
>  
> +
>  /**
>   * @author David Martin Clavo david(dot)martin(dot)clavo(at)gmail.com (for the reloadLog4jProperties method)
>   * @version $Revision$
> @@ -65,6 +66,18 @@
>      long getMemoryLimit();
>  
>      void setMemoryLimit(long limit);
> +        
> +    int getStorePercentageUsed();
> +
> +    long getStoreLimit();
> +
> +    void setStoreLimit(long limit);
> +    
> +    int getTmpPercentageUsed();
> +
> +    long getTmpLimit();
> +
> +    void setTmpLimit(long limit);
>      

Normally, "Temp" is the abbreviation--are you sure you wish to go with
"Tmp" here?  No big deal, just wish to confirm.



>      boolean isPersistent();
>  
> 
> Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java
> URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java?rev=613830&r1=613829&r2=613830&view=diff
> ==============================================================================
> --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java (original)
> +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java Mon Jan 21 02:31:22 2008
> @@ -94,15 +94,15 @@
>      }
>  
>      public int getMemoryPercentageUsed() {
> -        return destination.getBrokerMemoryUsage().getPercentUsage();
> +        return destination.getMemoryUsage().getPercentUsage();
>      }
>  

Same usage/used comment as above.


>      public long getMemoryLimit() {
> -        return destination.getBrokerMemoryUsage().getLimit();
> +        return destination.getMemoryUsage().getLimit();
>      }
>  
>      public void setMemoryLimit(long limit) {
> -        destination.getBrokerMemoryUsage().setLimit(limit);
> +        destination.getMemoryUsage().setLimit(limit);
>      }
>  

There were two other methods added to this class that might cause
confusion with the above two methods:

> +    public float getMemoryLimitPortion() {
> +        return destination.getMemoryUsage().getUsagePortion();
> +    }
> +
> +    
> +    public void setMemoryLimitPortion(float value) {
> +        destination.getMemoryUsage().setUsagePortion(value);
> +    }
> +
> 

Might you have meant "getMemoryUsagePortion()"?



> Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
> URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java?rev=613830&r1=613829&r2=613830&view=diff
> ==============================================================================
> --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java (original)
> +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java Mon Jan 21 02:31:22 2008
> @@ -17,11 +17,12 @@

DurableTopicSubscription exposes both the SubscriptionKey and methods
that access the two main values of SubscriptionKey:

    public SubscriptionKey getSubscriptionKey() {
        return subscriptionKey;
    }

    public String getClientId() {
        return subscriptionKey.getClientId();
    }

    public String getSubscriptionName() {
        return subscriptionKey.getSubscriptionName();
    }

Searching is showing that these methods are very seldom called, I would
guess the latter two can be removed, as that information is already
available from the first method.  (Alternatively, if it is desired to
encapsulate the key, to just remove the first method.)

Regards,
Glen


Re: svn commit: r613830 - in /activemq/trunk/activemq-core/src/main/java/org/apache/activemq: broker/jmx/ broker/region/ broker/region/policy/ command/

Posted by Rob Davies <ra...@gmail.com>.
On Jan 22, 2008, at 1:38 AM, Glen Mazza wrote:

> Am Montag, den 21.01.2008, 10:31 +0000 schrieb rajdavies@apache.org:
>> Author: rajdavies
>> Date: Mon Jan 21 02:31:22 2008
>> New Revision: 613830
>>
>> URL: http://svn.apache.org/viewvc?rev=613830&view=rev
>> Log:
>> Fix for https://issues.apache.org/activemq/browse/AMQ-1510
>>
>> Modified:
>> activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ 
>> broker/region/Queue.java
>> URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?rev=613830&r1=613829&r2=613830&view=diff
>> = 
>> = 
>> = 
>> = 
>> = 
>> = 
>> = 
>> = 
>> = 
>> =====================================================================
>> --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ 
>> broker/region/Queue.java (original)
>> +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ 
>> broker/region/Queue.java Mon Jan 21 02:31:22 2008
>> @@ -150,6 +150,9 @@
>
> Studying Queue.java, I think I see a minor problem with its
> implementation--the business logic under
> "if (r.lock(LockOwner.HIGH_PRIORITY_LOCK_OWNER)) {" on line 680 is
> equivalent to that of "protected void  
> removeMessage(ConnectionContext c,
> IndirectMessageReference r)" on line 922.  I think the code in the
> former can be simply replaced with a call to the latter.
>
> Glen
>
>

Thanks Glen - will change in my next commit




Rob Davies
'Go further faster with Apache Camel!'
http://rajdavies.blogspot.com/




Re: svn commit: r613830 - in /activemq/trunk/activemq-core/src/main/java/org/apache/activemq: broker/jmx/ broker/region/ broker/region/policy/ command/

Posted by Glen Mazza <gl...@verizon.net>.
Am Montag, den 21.01.2008, 10:31 +0000 schrieb rajdavies@apache.org:
> Author: rajdavies
> Date: Mon Jan 21 02:31:22 2008
> New Revision: 613830
> 
> URL: http://svn.apache.org/viewvc?rev=613830&view=rev
> Log:
> Fix for https://issues.apache.org/activemq/browse/AMQ-1510
> 
> Modified:
> activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
> URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?rev=613830&r1=613829&r2=613830&view=diff
> ==============================================================================
> --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java (original)
> +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java Mon Jan 21 02:31:22 2008
> @@ -150,6 +150,9 @@

Studying Queue.java, I think I see a minor problem with its
implementation--the business logic under 
"if (r.lock(LockOwner.HIGH_PRIORITY_LOCK_OWNER)) {" on line 680 is
equivalent to that of "protected void removeMessage(ConnectionContext c,
IndirectMessageReference r)" on line 922.  I think the code in the
former can be simply replaced with a call to the latter.

Glen