You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2009/11/19 11:34:10 UTC
svn commit: r882096 - in /activemq/trunk/activemq-core/src:
main/java/org/apache/activemq/broker/region/
main/java/org/apache/activemq/broker/region/cursors/
main/java/org/apache/activemq/util/ test/java/org/apache/activemq/broker/jmx/
Author: gtully
Date: Thu Nov 19 10:33:41 2009
New Revision: 882096
URL: http://svn.apache.org/viewvc?rev=882096&view=rev
Log:
resolve https://issues.apache.org/activemq/browse/AMQ-2487 - usage management of storecursor iterator was broken in that a browse would decrement the usage. memory management across move and retry operations is now correct. modified some tests to validate memory usage
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/VMPendingMessageCursor.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/BrokerSupport.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java?rev=882096&r1=882095&r2=882096&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java Thu Nov 19 10:33:41 2009
@@ -723,11 +723,7 @@
.getDestination());
if (context.getBroker()==null) {
context.setBroker(getRoot());
- }
-
- // Clear out the memory usage for the old queue.
- // We'll reset it to the DLQ below:
- message.setMemoryUsage(null);
+ }
BrokerSupport.resendNoCopy(context,message,
deadLetterDestination);
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java?rev=882096&r1=882095&r2=882096&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java Thu Nov 19 10:33:41 2009
@@ -43,6 +43,7 @@
protected ActiveMQMessageAudit audit;
protected boolean useCache=true;
private boolean started=false;
+ protected MessageReference last = null;
public synchronized void start() throws Exception {
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java?rev=882096&r1=882095&r2=882096&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java Thu Nov 19 10:33:41 2009
@@ -113,6 +113,7 @@
private synchronized void clearIterator(boolean ensureIterator) {
boolean haveIterator = this.iterator != null;
this.iterator=null;
+ last = null;
if(haveIterator&&ensureIterator) {
ensureIterator();
}
@@ -142,11 +143,11 @@
}
public final synchronized MessageReference next() {
- Message result = null;
+ MessageReference result = null;
if (!this.batchList.isEmpty()&&this.iterator.hasNext()) {
result = this.iterator.next().getValue();
- result.decrementReferenceCount();
}
+ last = result;
return result;
}
@@ -182,6 +183,9 @@
if (iterator!=null) {
iterator.remove();
}
+ if (last != null) {
+ last.decrementReferenceCount();
+ }
if (size==0 && isStarted() && useCache && hasSpace() && getStoreSize() == 0) {
if (LOG.isDebugEnabled()) {
LOG.debug(regionDestination.getActiveMQDestination().getPhysicalName() + " enabling cache on last remove");
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java?rev=882096&r1=882095&r2=882096&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java Thu Nov 19 10:33:41 2009
@@ -58,8 +58,6 @@
private boolean iterating;
private boolean flushRequired;
private AtomicBoolean started = new AtomicBoolean();
- private MessageReference last = null;
-
/**
* @param name
* @param store
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/VMPendingMessageCursor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/VMPendingMessageCursor.java?rev=882096&r1=882095&r2=882096&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/VMPendingMessageCursor.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/VMPendingMessageCursor.java Thu Nov 19 10:33:41 2009
@@ -35,8 +35,6 @@
public class VMPendingMessageCursor extends AbstractPendingMessageCursor {
private LinkedList<MessageReference> list = new LinkedList<MessageReference>();
private Iterator<MessageReference> iter;
- private MessageReference last;
-
public VMPendingMessageCursor(){
this.useCache=false;
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/BrokerSupport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/BrokerSupport.java?rev=882096&r1=882095&r2=882096&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/BrokerSupport.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/BrokerSupport.java Thu Nov 19 10:33:41 2009
@@ -52,6 +52,7 @@
message.setOriginalTransactionId(message.getTransactionId());
message.setDestination(deadLetterDestination);
message.setTransactionId(null);
+ message.setMemoryUsage(null);
boolean originalFlowControl = context.isProducerFlowControl();
try {
context.setProducerFlowControl(false);
Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java?rev=882096&r1=882095&r2=882096&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java Thu Nov 19 10:33:41 2009
@@ -40,6 +40,8 @@
import org.apache.activemq.EmbeddedBrokerTestSupport;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.region.BaseDestination;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.broker.region.policy.SharedDeadLetterStrategy;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.commons.logging.Log;
@@ -120,6 +122,7 @@
messageIDs[i] = messageID;
}
+ assertTrue("dest has some memory usage", queue.getMemoryPercentUsage() > 0);
echo("About to move " + messageCount + " messages");
@@ -138,11 +141,15 @@
echo("Now browsing the second queue");
queueViewMBeanName = assertRegisteredObjectName(domain + ":Type=Queue,Destination=" + newDestination + ",BrokerName=localhost");
- queue = (QueueViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true);
+ QueueViewMBean queueNew = (QueueViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true);
- long newQueuesize = queue.getQueueSize();
+ long newQueuesize = queueNew.getQueueSize();
echo("Second queue size: " + newQueuesize);
assertEquals("Unexpected number of messages ",messageCount, newQueuesize);
+
+ // check memory usage migration
+ assertTrue("new dest has some memory usage", queueNew.getMemoryPercentUsage() > 0);
+ assertEquals("old dest has no memory usage", 0, queue.getMemoryPercentUsage());
}
public void testRetryMessages() throws Exception {
@@ -164,7 +171,7 @@
long initialQueueSize = queue.getQueueSize();
echo("current queue size: " + initialQueueSize);
-
+ assertTrue("dest has some memory usage", queue.getMemoryPercentUsage() > 0);
// lets create a duff consumer which keeps rolling back...
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
@@ -203,6 +210,10 @@
messageIDs[i] = messageID;
}
+ int dlqMemUsage = dlq.getMemoryPercentUsage();
+ assertTrue("dlq has some memory usage", dlqMemUsage > 0);
+ assertEquals("dest has no memory usage", 0, queue.getMemoryPercentUsage());
+
echo("About to retry " + messageCount + " messages");
@@ -223,6 +234,10 @@
assertEquals("DLQ size", initialDlqSize - messageCount, dlqSize);
assertEquals("queue size", initialQueueSize, queueSize);
assertEquals("browse queue size", initialQueueSize, actualCount);
+
+ assertEquals("dest has some memory usage", dlqMemUsage, queue.getMemoryPercentUsage());
+ assertEquals("dlq still has memory usage", dlqMemUsage, dlq.getMemoryPercentUsage());
+
}
public void testMoveMessagesBySelector() throws Exception {
@@ -246,6 +261,7 @@
queue.removeMatchingMessages("counter > 2");
assertEquals("Should have no more messages in the queue: " + queueViewMBeanName, 0, queue.getQueueSize());
+ assertEquals("dest has no memory usage", 0, queue.getMemoryPercentUsage());
}
public void testCopyMessagesBySelector() throws Exception {
@@ -272,6 +288,7 @@
queue.removeMatchingMessages("counter > 2");
assertEquals("Should have no more messages in the queue: " + queueViewMBeanName, 0, queue.getQueueSize());
+ assertEquals("dest has no memory usage", 0, queue.getMemoryPercentUsage());
}
@@ -528,7 +545,14 @@
answer.setPersistent(false);
answer.setDeleteAllMessagesOnStartup(true);
answer.setUseJmx(true);
- //answer.setEnableStatistics(true);
+
+ // apply memory limit so that %usage is visible
+ PolicyMap policyMap = new PolicyMap();
+ PolicyEntry defaultEntry = new PolicyEntry();
+ defaultEntry.setMemoryLimit(1024*1024*4);
+ policyMap.setDefaultEntry(defaultEntry);
+ answer.setDestinationPolicy(policyMap);
+
answer.addConnector(bindAddress);
return answer;
}