You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2014/08/06 16:21:43 UTC
[3/3] git commit: https://issues.apache.org/jira/browse/AMQ-4930 -
ensure we page in messages for browse/expire when destination stats are
disabled via config
https://issues.apache.org/jira/browse/AMQ-4930 - ensure we page in messages for browse/expire when destination stats are disabled via config
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/41659725
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/41659725
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/41659725
Branch: refs/heads/trunk
Commit: 41659725f4c4fa027386148077aa76c31d8853af
Parents: 6bdce73
Author: gtully <ga...@gmail.com>
Authored: Tue Aug 5 16:32:44 2014 +0100
Committer: gtully <ga...@gmail.com>
Committed: Wed Aug 6 15:21:19 2014 +0100
----------------------------------------------------------------------
.../apache/activemq/broker/region/Queue.java | 12 ++++++++--
.../org/apache/activemq/bugs/AMQ4930Test.java | 24 +++++++++++++++-----
2 files changed, 28 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq/blob/41659725/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
index 647ba68..3cdd91c 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
@@ -1232,9 +1232,17 @@ public class Queue extends BaseDestination implements Task, UsageListener {
} finally {
pagedInMessagesLock.readLock().unlock();
}
- LOG.trace("max {}, alreadyPagedIn {}, messagesCount {}, memoryUsage {}%", new Object[]{max, alreadyPagedIn, destinationStatistics.getMessages().getCount(), memoryUsage.getPercentUsage()});
+ int messagesInQueue = 0;
+ messagesLock.readLock().lock();
+ try {
+ messagesInQueue = messages.size();
+ } finally {
+ messagesLock.readLock().unlock();
+ }
+
+ LOG.trace("max {}, alreadyPagedIn {}, messagesCount {}, memoryUsage {}%", new Object[]{max, alreadyPagedIn, messagesInQueue, memoryUsage.getPercentUsage()});
return (alreadyPagedIn < max)
- && (alreadyPagedIn < destinationStatistics.getMessages().getCount())
+ && (alreadyPagedIn < messagesInQueue)
&& messages.hasSpace();
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/41659725/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4930Test.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4930Test.java b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4930Test.java
index f75eae3..e6bea2a 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4930Test.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4930Test.java
@@ -30,6 +30,7 @@ import org.apache.activemq.broker.region.RegionBroker;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.Message;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -37,6 +38,7 @@ public class AMQ4930Test extends TestCase {
private static final Logger LOG = LoggerFactory.getLogger(AMQ4930Test.class);
final int messageCount = 150;
final int messageSize = 1024*1024;
+ final int maxBrowsePageSize = 50;
final ActiveMQQueue bigQueue = new ActiveMQQueue("BIG");
BrokerService broker;
ActiveMQConnectionFactory factory;
@@ -50,8 +52,8 @@ public class AMQ4930Test extends TestCase {
PolicyEntry policy = new PolicyEntry();
// disable expriy processing as this will call browse in parallel
policy.setExpireMessagesPeriod(0);
- policy.setMaxPageSize(50);
- policy.setMaxBrowsePageSize(50);
+ policy.setMaxPageSize(maxBrowsePageSize);
+ policy.setMaxBrowsePageSize(maxBrowsePageSize);
pMap.setDefaultEntry(policy);
broker.setDestinationPolicy(pMap);
@@ -65,6 +67,11 @@ public class AMQ4930Test extends TestCase {
doTestBrowsePending(DeliveryMode.PERSISTENT);
}
+ public void testWithStatsDisabled() throws Exception {
+ ((RegionBroker)broker.getRegionBroker()).getDestinationStatistics().setEnabled(false);
+ doTestBrowsePending(DeliveryMode.PERSISTENT);
+ }
+
public void doTestBrowsePending(int deliveryMode) throws Exception {
Connection connection = factory.createConnection();
@@ -77,7 +84,6 @@ public class AMQ4930Test extends TestCase {
for (int i = 0; i < messageCount; i++) {
producer.send(bigQueue, bytesMessage);
- LOG.info("Sent: " + i);
}
final QueueViewMBean queueViewMBean = (QueueViewMBean)
@@ -94,15 +100,21 @@ public class AMQ4930Test extends TestCase {
final Queue underTest = (Queue) ((RegionBroker)broker.getRegionBroker()).getQueueRegion().getDestinationMap().get(bigQueue);
// do twice to attempt to pull in 2*maxBrowsePageSize which uses up the system memory limit
- underTest.browse();
- underTest.browse();
+ Message[] browsed = underTest.browse();
+ LOG.info("Browsed: " + browsed.length);
+ assertEquals("maxBrowsePageSize", maxBrowsePageSize, browsed.length);
+ browsed = underTest.browse();
+ LOG.info("Browsed: " + browsed.length);
+ assertEquals("maxBrowsePageSize", maxBrowsePageSize, browsed.length);
Runtime.getRuntime().gc();
long free = Runtime.getRuntime().freeMemory()/1024;
LOG.info("free at start of check: " + free);
// check for memory growth
for (int i=0; i<10; i++) {
LOG.info("free: " + Runtime.getRuntime().freeMemory()/1024);
- underTest.browse();
+ browsed = underTest.browse();
+ LOG.info("Browsed: " + browsed.length);
+ assertEquals("maxBrowsePageSize", maxBrowsePageSize, browsed.length);
Runtime.getRuntime().gc();
Runtime.getRuntime().gc();
assertTrue("No growth: " + Runtime.getRuntime().freeMemory()/1024, Runtime.getRuntime().freeMemory()/1024 >= (free - (free * 0.1)));