You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2009/11/19 11:34:10 UTC

svn commit: r882096 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/broker/region/ main/java/org/apache/activemq/broker/region/cursors/ main/java/org/apache/activemq/util/ test/java/org/apache/activemq/broker/jmx/

Author: gtully
Date: Thu Nov 19 10:33:41 2009
New Revision: 882096

URL: http://svn.apache.org/viewvc?rev=882096&view=rev
Log:
resolve https://issues.apache.org/activemq/browse/AMQ-2487 - usage management of storecursor iterator was broken in that a browse would decrement the usage. memory management across move and retry operations is now correct. modified some tests to validate memory usage

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/VMPendingMessageCursor.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/BrokerSupport.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java?rev=882096&r1=882095&r2=882096&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java Thu Nov 19 10:33:41 2009
@@ -723,11 +723,7 @@
 							                .getDestination());
 							if (context.getBroker()==null) {
 								context.setBroker(getRoot());
-							}                        
-							                           
-							// Clear out the memory usage for the old queue. 
-							// We'll reset it to the DLQ below:
-							message.setMemoryUsage(null);
+							}
 							BrokerSupport.resendNoCopy(context,message,
 							        deadLetterDestination);
 						}

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java?rev=882096&r1=882095&r2=882096&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java Thu Nov 19 10:33:41 2009
@@ -43,6 +43,7 @@
     protected ActiveMQMessageAudit audit;
     protected boolean useCache=true;
     private boolean started=false;
+    protected MessageReference last = null;
   
 
     public synchronized void start() throws Exception  {

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java?rev=882096&r1=882095&r2=882096&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java Thu Nov 19 10:33:41 2009
@@ -113,6 +113,7 @@
     private synchronized void clearIterator(boolean ensureIterator) {
         boolean haveIterator = this.iterator != null;
         this.iterator=null;
+        last = null;
         if(haveIterator&&ensureIterator) {
             ensureIterator();
         }
@@ -142,11 +143,11 @@
     }
     
     public final synchronized MessageReference next() {
-        Message result = null;
+        MessageReference result = null;
         if (!this.batchList.isEmpty()&&this.iterator.hasNext()) {
             result = this.iterator.next().getValue();
-            result.decrementReferenceCount();
         }
+        last = result;
         return result;
     }
     
@@ -182,6 +183,9 @@
         if (iterator!=null) {
             iterator.remove();
         }
+        if (last != null) {
+            last.decrementReferenceCount();
+        }
         if (size==0 && isStarted() && useCache && hasSpace() && getStoreSize() == 0) {
             if (LOG.isDebugEnabled()) {
                 LOG.debug(regionDestination.getActiveMQDestination().getPhysicalName() + " enabling cache on last remove");

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java?rev=882096&r1=882095&r2=882096&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java Thu Nov 19 10:33:41 2009
@@ -58,8 +58,6 @@
     private boolean iterating;
     private boolean flushRequired;
     private AtomicBoolean started = new AtomicBoolean();
-    private MessageReference last = null;
-
     /**
      * @param name
      * @param store

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/VMPendingMessageCursor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/VMPendingMessageCursor.java?rev=882096&r1=882095&r2=882096&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/VMPendingMessageCursor.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/VMPendingMessageCursor.java Thu Nov 19 10:33:41 2009
@@ -35,8 +35,6 @@
 public class VMPendingMessageCursor extends AbstractPendingMessageCursor {
     private LinkedList<MessageReference> list = new LinkedList<MessageReference>();
     private Iterator<MessageReference> iter;
-    private MessageReference last;
-    
     public VMPendingMessageCursor(){
         this.useCache=false;
     }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/BrokerSupport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/BrokerSupport.java?rev=882096&r1=882095&r2=882096&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/BrokerSupport.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/BrokerSupport.java Thu Nov 19 10:33:41 2009
@@ -52,6 +52,7 @@
         message.setOriginalTransactionId(message.getTransactionId());
         message.setDestination(deadLetterDestination);
         message.setTransactionId(null);
+        message.setMemoryUsage(null);
         boolean originalFlowControl = context.isProducerFlowControl();
         try {
             context.setProducerFlowControl(false);

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java?rev=882096&r1=882095&r2=882096&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java Thu Nov 19 10:33:41 2009
@@ -40,6 +40,8 @@
 import org.apache.activemq.EmbeddedBrokerTestSupport;
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.broker.region.BaseDestination;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
 import org.apache.activemq.broker.region.policy.SharedDeadLetterStrategy;
 import org.apache.activemq.command.ActiveMQQueue;
 import org.apache.commons.logging.Log;
@@ -120,6 +122,7 @@
             messageIDs[i] = messageID;
         }
 
+        assertTrue("dest has some memory usage", queue.getMemoryPercentUsage() > 0);
 
         echo("About to move " + messageCount + " messages");
 
@@ -138,11 +141,15 @@
         echo("Now browsing the second queue");
 
         queueViewMBeanName = assertRegisteredObjectName(domain + ":Type=Queue,Destination=" + newDestination + ",BrokerName=localhost");
-        queue = (QueueViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true);
+        QueueViewMBean queueNew = (QueueViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true);
 
-        long newQueuesize = queue.getQueueSize();
+        long newQueuesize = queueNew.getQueueSize();
         echo("Second queue size: " + newQueuesize);
         assertEquals("Unexpected number of messages ",messageCount, newQueuesize);
+        
+        // check memory usage migration
+        assertTrue("new dest has some memory usage", queueNew.getMemoryPercentUsage() > 0);
+        assertEquals("old dest has no memory usage", 0, queue.getMemoryPercentUsage());
     }
 
     public void testRetryMessages() throws Exception {
@@ -164,7 +171,7 @@
 
         long initialQueueSize = queue.getQueueSize();
         echo("current queue size: " + initialQueueSize);
-
+        assertTrue("dest has some memory usage", queue.getMemoryPercentUsage() > 0);
 
         // lets create a duff consumer which keeps rolling back...
         Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
@@ -203,6 +210,10 @@
             messageIDs[i] = messageID;
         }
 
+        int dlqMemUsage = dlq.getMemoryPercentUsage();
+        assertTrue("dlq has some memory usage", dlqMemUsage > 0);
+        assertEquals("dest has no memory usage", 0, queue.getMemoryPercentUsage());
+        
 
         echo("About to retry " + messageCount + " messages");
 
@@ -223,6 +234,10 @@
         assertEquals("DLQ size", initialDlqSize - messageCount, dlqSize);
         assertEquals("queue size", initialQueueSize, queueSize);
         assertEquals("browse queue size", initialQueueSize, actualCount);
+        
+        assertEquals("dest has some memory usage", dlqMemUsage, queue.getMemoryPercentUsage());
+        assertEquals("dlq still has memory usage", dlqMemUsage, dlq.getMemoryPercentUsage());
+        
     }
 
     public void testMoveMessagesBySelector() throws Exception {
@@ -246,6 +261,7 @@
         queue.removeMatchingMessages("counter > 2");
 
         assertEquals("Should have no more messages in the queue: " + queueViewMBeanName, 0, queue.getQueueSize());
+        assertEquals("dest has no memory usage", 0, queue.getMemoryPercentUsage());
     }
 
     public void testCopyMessagesBySelector() throws Exception {
@@ -272,6 +288,7 @@
         queue.removeMatchingMessages("counter > 2");
 
         assertEquals("Should have no more messages in the queue: " + queueViewMBeanName, 0, queue.getQueueSize());
+        assertEquals("dest has no memory usage", 0, queue.getMemoryPercentUsage());
     }
 
 
@@ -528,7 +545,14 @@
         answer.setPersistent(false);
         answer.setDeleteAllMessagesOnStartup(true);
         answer.setUseJmx(true);
-        //answer.setEnableStatistics(true);
+       
+        // apply memory limit so that %usage is visible
+        PolicyMap policyMap = new PolicyMap();
+        PolicyEntry defaultEntry = new PolicyEntry();
+        defaultEntry.setMemoryLimit(1024*1024*4);
+        policyMap.setDefaultEntry(defaultEntry);
+        answer.setDestinationPolicy(policyMap);
+        
         answer.addConnector(bindAddress);
         return answer;
     }