You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2013/12/18 16:19:57 UTC
git commit: https://issues.apache.org/jira/browse/AMQ-4930 - fix
reference count and limit expriy/browse to memory + 10%
Updated Branches:
refs/heads/trunk c387e842e -> a64976a37
https://issues.apache.org/jira/browse/AMQ-4930 - fix reference count and limit expriy/browse to memory + 10%
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/a64976a3
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/a64976a3
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/a64976a3
Branch: refs/heads/trunk
Commit: a64976a3774eeecc2830bdfc3bf70499f9cfccb1
Parents: c387e84
Author: gtully <ga...@gmail.com>
Authored: Tue Dec 17 14:42:19 2013 +0000
Committer: gtully <ga...@gmail.com>
Committed: Wed Dec 18 12:21:02 2013 +0000
----------------------------------------------------------------------
.../apache/activemq/broker/region/Queue.java | 66 ++++------
.../apache/activemq/broker/region/Topic.java | 3 +-
.../cursors/FilePendingMessageCursor.java | 4 -
.../org/apache/activemq/bugs/AMQ4930Test.java | 132 +++++++++++++++++++
.../activemq/bugs/TempStoreDataCleanupTest.java | 4 +-
.../KahaDBFilePendingMessageCursorTest.java | 1 +
6 files changed, 163 insertions(+), 47 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq/blob/a64976a3/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
index 29b65b2..9b32500 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
@@ -1029,6 +1029,11 @@ public class Queue extends BaseDestination implements Task, UsageListener {
messages.stop();
}
+ for (MessageReference messageReference : pagedInMessages.values()) {
+ messageReference.decrementReferenceCount();
+ }
+ pagedInMessages.clear();
+
systemUsage.getMemoryUsage().removeUsageListener(this);
if (memoryUsage != null) {
memoryUsage.stop();
@@ -1145,7 +1150,8 @@ public class Queue extends BaseDestination implements Task, UsageListener {
public void doBrowse(List<Message> browseList, int max) {
final ConnectionContext connectionContext = createConnectionContext();
try {
- pageInMessages(true);
+ // allow some page in even if we are full and producers are blocked on pfc
+ pageInMessages(!memoryUsage.isFull(110));
List<MessageReference> toExpire = new ArrayList<MessageReference>();
pagedInPendingDispatchLock.writeLock().lock();
@@ -1156,6 +1162,8 @@ public class Queue extends BaseDestination implements Task, UsageListener {
if (broker.isExpired(ref)) {
LOG.debug("expiring from pagedInPending: {}", ref);
messageExpired(connectionContext, ref);
+ } else {
+ ref.decrementReferenceCount();
}
}
} finally {
@@ -1179,45 +1187,20 @@ public class Queue extends BaseDestination implements Task, UsageListener {
} finally {
pagedInMessagesLock.writeLock().unlock();
}
+ ref.decrementReferenceCount();
}
}
- if (browseList.size() < getMaxBrowsePageSize()) {
- messagesLock.writeLock().lock();
- try {
- try {
- messages.reset();
- while (messages.hasNext() && browseList.size() < max) {
- MessageReference node = messages.next();
- if (node.isExpired()) {
- if (broker.isExpired(node)) {
- LOG.debug("expiring from messages: {}", node);
- messageExpired(connectionContext, createMessageReference(node.getMessage()));
- }
- messages.remove();
- } else {
- messages.rollback(node.getMessageId());
- if (browseList.contains(node.getMessage()) == false) {
- browseList.add(node.getMessage());
- }
- }
- node.decrementReferenceCount();
- }
- } finally {
- messages.release();
- }
- } finally {
- messagesLock.writeLock().unlock();
- }
- }
+ // we need a store iterator to walk messages on disk, independent of the cursor which is tracking
+ // the next message batch
} catch (Exception e) {
LOG.error("Problem retrieving message for browse", e);
}
}
- private void addAll(Collection<? extends MessageReference> refs, List<Message> l, int maxBrowsePageSize,
+ private void addAll(Collection<? extends MessageReference> refs, List<Message> l, int max,
List<MessageReference> toExpire) throws Exception {
- for (Iterator<? extends MessageReference> i = refs.iterator(); i.hasNext() && l.size() < getMaxBrowsePageSize();) {
+ for (Iterator<? extends MessageReference> i = refs.iterator(); i.hasNext() && l.size() < max;) {
QueueMessageReference ref = (QueueMessageReference) i.next();
if (ref.isExpired()) {
toExpire.add(ref);
@@ -1896,27 +1879,30 @@ public class Queue extends BaseDestination implements Task, UsageListener {
PendingList resultList = null;
int toPageIn = Math.min(getMaxPageSize(), messages.size());
- LOG.debug("{} toPageIn: {}, Inflight: {}, pagedInMessages.size {}, enqueueCount: {}, dequeueCount: {}",
+ int pagedInPendingSize = 0;
+ pagedInPendingDispatchLock.readLock().lock();
+ try {
+ pagedInPendingSize = pagedInPendingDispatch.size();
+ } finally {
+ pagedInPendingDispatchLock.readLock().unlock();
+ }
+
+ LOG.debug("{} toPageIn: {}, Inflight: {}, pagedInMessages.size {}, pagedInPendingDispatch.size {}, enqueueCount: {}, dequeueCount: {}, memUsage:{}",
new Object[]{
destination.getPhysicalName(),
toPageIn,
destinationStatistics.getInflight().getCount(),
pagedInMessages.size(),
+ pagedInPendingSize,
destinationStatistics.getEnqueues().getCount(),
- destinationStatistics.getDequeues().getCount()
+ destinationStatistics.getDequeues().getCount(),
+ getMemoryUsage().getUsage()
});
if (isLazyDispatch() && !force) {
// Only page in the minimum number of messages which can be
// dispatched immediately.
toPageIn = Math.min(getConsumerMessageCountBeforeFull(), toPageIn);
}
- int pagedInPendingSize = 0;
- pagedInPendingDispatchLock.readLock().lock();
- try {
- pagedInPendingSize = pagedInPendingDispatch.size();
- } finally {
- pagedInPendingDispatchLock.readLock().unlock();
- }
if (toPageIn > 0 && (force || (!consumers.isEmpty() && pagedInPendingSize < getMaxPageSize()))) {
int count = 0;
result = new ArrayList<QueueMessageReference>(toPageIn);
http://git-wip-us.apache.org/repos/asf/activemq/blob/a64976a3/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java
index 41eef60..ab0f8ce 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java
@@ -340,7 +340,8 @@ public class Topic extends BaseDestination implements Task {
if (warnOnProducerFlowControl) {
warnOnProducerFlowControl = false;
- LOG.info("{}, Usage Manager memory limit reached for {}. Producers will be throttled to the rate at which messages are removed from this destination to prevent flooding it. See http://activemq.apache.org/producer-flow-control.html for more info.", getActiveMQDestination().getQualifiedName());
+ LOG.info("{}, Usage Manager memory limit reached {}. Producers will be throttled to the rate at which messages are removed from this destination to prevent flooding it. See http://activemq.apache.org/producer-flow-control.html for more info.",
+ getActiveMQDestination().getQualifiedName(), memoryUsage.getLimit());
}
if (!context.isNetworkConnection() && systemUsage.isSendFailIfNoSpace()) {
http://git-wip-us.apache.org/repos/asf/activemq/blob/a64976a3/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java
index b1767e3..2769e68 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java
@@ -411,10 +411,6 @@ public class FilePendingMessageCursor extends AbstractPendingMessageCursor imple
return true;
}
- protected boolean isSpaceInMemoryList() {
- return hasSpace() && isDiskListEmpty();
- }
-
protected synchronized void expireOldMessages() {
if (!memoryList.isEmpty()) {
for (Iterator<MessageReference> iterator = memoryList.iterator(); iterator.hasNext();) {
http://git-wip-us.apache.org/repos/asf/activemq/blob/a64976a3/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4930Test.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4930Test.java b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4930Test.java
new file mode 100644
index 0000000..f75eae3
--- /dev/null
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4930Test.java
@@ -0,0 +1,132 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.bugs;
+
+import javax.jms.BytesMessage;
+import javax.jms.Connection;
+import javax.jms.DeliveryMode;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import junit.framework.TestCase;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.jmx.QueueViewMBean;
+import org.apache.activemq.broker.region.Queue;
+import org.apache.activemq.broker.region.RegionBroker;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class AMQ4930Test extends TestCase {
+ private static final Logger LOG = LoggerFactory.getLogger(AMQ4930Test.class);
+ final int messageCount = 150;
+ final int messageSize = 1024*1024;
+ final ActiveMQQueue bigQueue = new ActiveMQQueue("BIG");
+ BrokerService broker;
+ ActiveMQConnectionFactory factory;
+
+ protected void configureBroker() throws Exception {
+ broker.setDeleteAllMessagesOnStartup(true);
+ broker.setAdvisorySupport(false);
+ broker.getSystemUsage().getMemoryUsage().setLimit(1*1024*1024);
+
+ PolicyMap pMap = new PolicyMap();
+ PolicyEntry policy = new PolicyEntry();
+ // disable expriy processing as this will call browse in parallel
+ policy.setExpireMessagesPeriod(0);
+ policy.setMaxPageSize(50);
+ policy.setMaxBrowsePageSize(50);
+ pMap.setDefaultEntry(policy);
+
+ broker.setDestinationPolicy(pMap);
+ }
+
+ public void testBrowsePendingNonPersistent() throws Exception {
+ doTestBrowsePending(DeliveryMode.NON_PERSISTENT);
+ }
+
+ public void testBrowsePendingPersistent() throws Exception {
+ doTestBrowsePending(DeliveryMode.PERSISTENT);
+ }
+
+ public void doTestBrowsePending(int deliveryMode) throws Exception {
+
+ Connection connection = factory.createConnection();
+ connection.start();
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageProducer producer = session.createProducer(bigQueue);
+ producer.setDeliveryMode(deliveryMode);
+ BytesMessage bytesMessage = session.createBytesMessage();
+ bytesMessage.writeBytes(new byte[messageSize]);
+
+ for (int i = 0; i < messageCount; i++) {
+ producer.send(bigQueue, bytesMessage);
+ LOG.info("Sent: " + i);
+ }
+
+ final QueueViewMBean queueViewMBean = (QueueViewMBean)
+ broker.getManagementContext().newProxyInstance(broker.getAdminView().getQueues()[0], QueueViewMBean.class, false);
+
+ LOG.info(queueViewMBean.getName() + " Size: " + queueViewMBean.getEnqueueCount());
+
+ connection.close();
+
+ assertFalse("Cache disabled on q", queueViewMBean.isCacheEnabled());
+
+ // ensure repeated browse does now blow mem
+
+ final Queue underTest = (Queue) ((RegionBroker)broker.getRegionBroker()).getQueueRegion().getDestinationMap().get(bigQueue);
+
+ // do twice to attempt to pull in 2*maxBrowsePageSize which uses up the system memory limit
+ underTest.browse();
+ underTest.browse();
+ Runtime.getRuntime().gc();
+ long free = Runtime.getRuntime().freeMemory()/1024;
+ LOG.info("free at start of check: " + free);
+ // check for memory growth
+ for (int i=0; i<10; i++) {
+ LOG.info("free: " + Runtime.getRuntime().freeMemory()/1024);
+ underTest.browse();
+ Runtime.getRuntime().gc();
+ Runtime.getRuntime().gc();
+ assertTrue("No growth: " + Runtime.getRuntime().freeMemory()/1024, Runtime.getRuntime().freeMemory()/1024 >= (free - (free * 0.1)));
+ }
+ }
+
+
+ protected void setUp() throws Exception {
+ super.setUp();
+ broker = new BrokerService();
+ broker.setBrokerName("thisOne");
+ configureBroker();
+ broker.start();
+ factory = new ActiveMQConnectionFactory("vm://thisOne?jms.alwaysSyncSend=true");
+ factory.setWatchTopicAdvisories(false);
+
+ }
+
+ protected void tearDown() throws Exception {
+ super.tearDown();
+ if (broker != null) {
+ broker.stop();
+ broker = null;
+ }
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/activemq/blob/a64976a3/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/TempStoreDataCleanupTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/TempStoreDataCleanupTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/TempStoreDataCleanupTest.java
index 36dafaf..34df4a3 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/TempStoreDataCleanupTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/TempStoreDataCleanupTest.java
@@ -115,7 +115,7 @@ public class TempStoreDataCleanupTest {
public void testIt() throws Exception {
int startPercentage = broker.getAdminView().getMemoryPercentUsage();
- LOG.info("MemoryUseage at test start = " + startPercentage);
+ LOG.info("MemoryUsage at test start = " + startPercentage);
for (int i = 0; i < 2; i++) {
LOG.info("Started the test iteration: " + i + " using queueName = " + queueName);
@@ -146,7 +146,7 @@ public class TempStoreDataCleanupTest {
TimeUnit.SECONDS.sleep(2);
}
- LOG.info("MemoryUseage before awaiting temp store cleanup = " + broker.getAdminView().getMemoryPercentUsage());
+ LOG.info("MemoryUsage before awaiting temp store cleanup = " + broker.getAdminView().getMemoryPercentUsage());
final PListStoreImpl pa = (PListStoreImpl) broker.getTempDataStore();
assertTrue("only one journal file should be left: " + pa.getJournal().getFileMap().size(),
http://git-wip-us.apache.org/repos/asf/activemq/blob/a64976a3/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/plist/KahaDBFilePendingMessageCursorTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/plist/KahaDBFilePendingMessageCursorTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/plist/KahaDBFilePendingMessageCursorTest.java
index f0338ba..5a3b318 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/plist/KahaDBFilePendingMessageCursorTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/plist/KahaDBFilePendingMessageCursorTest.java
@@ -74,6 +74,7 @@ public class KahaDBFilePendingMessageCursorTest extends FilePendingMessageCursor
while(underTest.hasNext()) {
MessageReference ref = underTest.next();
underTest.remove();
+ ref.decrementReferenceCount();
assertEquals("id is correct", receivedCount++, ref.getMessageId().getProducerSequenceId());
}
assertEquals("got all messages back", receivedCount, numMessages);