You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2013/12/18 16:19:57 UTC

git commit: https://issues.apache.org/jira/browse/AMQ-4930 - fix reference count and limit expriy/browse to memory + 10%

Updated Branches:
  refs/heads/trunk c387e842e -> a64976a37


https://issues.apache.org/jira/browse/AMQ-4930 - fix reference count and limit expriy/browse to memory + 10%


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/a64976a3
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/a64976a3
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/a64976a3

Branch: refs/heads/trunk
Commit: a64976a3774eeecc2830bdfc3bf70499f9cfccb1
Parents: c387e84
Author: gtully <ga...@gmail.com>
Authored: Tue Dec 17 14:42:19 2013 +0000
Committer: gtully <ga...@gmail.com>
Committed: Wed Dec 18 12:21:02 2013 +0000

----------------------------------------------------------------------
 .../apache/activemq/broker/region/Queue.java    |  66 ++++------
 .../apache/activemq/broker/region/Topic.java    |   3 +-
 .../cursors/FilePendingMessageCursor.java       |   4 -
 .../org/apache/activemq/bugs/AMQ4930Test.java   | 132 +++++++++++++++++++
 .../activemq/bugs/TempStoreDataCleanupTest.java |   4 +-
 .../KahaDBFilePendingMessageCursorTest.java     |   1 +
 6 files changed, 163 insertions(+), 47 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/a64976a3/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
index 29b65b2..9b32500 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
@@ -1029,6 +1029,11 @@ public class Queue extends BaseDestination implements Task, UsageListener {
             messages.stop();
         }
 
+        for (MessageReference messageReference : pagedInMessages.values()) {
+            messageReference.decrementReferenceCount();
+        }
+        pagedInMessages.clear();
+
         systemUsage.getMemoryUsage().removeUsageListener(this);
         if (memoryUsage != null) {
             memoryUsage.stop();
@@ -1145,7 +1150,8 @@ public class Queue extends BaseDestination implements Task, UsageListener {
     public void doBrowse(List<Message> browseList, int max) {
         final ConnectionContext connectionContext = createConnectionContext();
         try {
-            pageInMessages(true);
+            // allow some page in even if we are full and producers are blocked on pfc
+            pageInMessages(!memoryUsage.isFull(110));
             List<MessageReference> toExpire = new ArrayList<MessageReference>();
 
             pagedInPendingDispatchLock.writeLock().lock();
@@ -1156,6 +1162,8 @@ public class Queue extends BaseDestination implements Task, UsageListener {
                     if (broker.isExpired(ref)) {
                         LOG.debug("expiring from pagedInPending: {}", ref);
                         messageExpired(connectionContext, ref);
+                    } else {
+                        ref.decrementReferenceCount();
                     }
                 }
             } finally {
@@ -1179,45 +1187,20 @@ public class Queue extends BaseDestination implements Task, UsageListener {
                     } finally {
                         pagedInMessagesLock.writeLock().unlock();
                     }
+                    ref.decrementReferenceCount();
                 }
             }
 
-            if (browseList.size() < getMaxBrowsePageSize()) {
-                messagesLock.writeLock().lock();
-                try {
-                    try {
-                        messages.reset();
-                        while (messages.hasNext() && browseList.size() < max) {
-                            MessageReference node = messages.next();
-                            if (node.isExpired()) {
-                                if (broker.isExpired(node)) {
-                                    LOG.debug("expiring from messages: {}", node);
-                                    messageExpired(connectionContext, createMessageReference(node.getMessage()));
-                                }
-                                messages.remove();
-                            } else {
-                                messages.rollback(node.getMessageId());
-                                if (browseList.contains(node.getMessage()) == false) {
-                                    browseList.add(node.getMessage());
-                                }
-                            }
-                            node.decrementReferenceCount();
-                        }
-                    } finally {
-                        messages.release();
-                    }
-                } finally {
-                    messagesLock.writeLock().unlock();
-                }
-            }
+            // we need a store iterator to walk messages on disk, independent of the cursor which is tracking
+            // the next message batch
         } catch (Exception e) {
             LOG.error("Problem retrieving message for browse", e);
         }
     }
 
-    private void addAll(Collection<? extends MessageReference> refs, List<Message> l, int maxBrowsePageSize,
+    private void addAll(Collection<? extends MessageReference> refs, List<Message> l, int max,
             List<MessageReference> toExpire) throws Exception {
-        for (Iterator<? extends MessageReference> i = refs.iterator(); i.hasNext() && l.size() < getMaxBrowsePageSize();) {
+        for (Iterator<? extends MessageReference> i = refs.iterator(); i.hasNext() && l.size() < max;) {
             QueueMessageReference ref = (QueueMessageReference) i.next();
             if (ref.isExpired()) {
                 toExpire.add(ref);
@@ -1896,27 +1879,30 @@ public class Queue extends BaseDestination implements Task, UsageListener {
         PendingList resultList = null;
 
         int toPageIn = Math.min(getMaxPageSize(), messages.size());
-        LOG.debug("{} toPageIn: {}, Inflight: {}, pagedInMessages.size {}, enqueueCount: {}, dequeueCount: {}",
+        int pagedInPendingSize = 0;
+        pagedInPendingDispatchLock.readLock().lock();
+        try {
+            pagedInPendingSize = pagedInPendingDispatch.size();
+        } finally {
+            pagedInPendingDispatchLock.readLock().unlock();
+        }
+
+        LOG.debug("{} toPageIn: {}, Inflight: {}, pagedInMessages.size {}, pagedInPendingDispatch.size {}, enqueueCount: {}, dequeueCount: {}, memUsage:{}",
                 new Object[]{
                         destination.getPhysicalName(),
                         toPageIn,
                         destinationStatistics.getInflight().getCount(),
                         pagedInMessages.size(),
+                        pagedInPendingSize,
                         destinationStatistics.getEnqueues().getCount(),
-                        destinationStatistics.getDequeues().getCount()
+                        destinationStatistics.getDequeues().getCount(),
+                        getMemoryUsage().getUsage()
                 });
         if (isLazyDispatch() && !force) {
             // Only page in the minimum number of messages which can be
             // dispatched immediately.
             toPageIn = Math.min(getConsumerMessageCountBeforeFull(), toPageIn);
         }
-        int pagedInPendingSize = 0;
-        pagedInPendingDispatchLock.readLock().lock();
-        try {
-            pagedInPendingSize = pagedInPendingDispatch.size();
-        } finally {
-            pagedInPendingDispatchLock.readLock().unlock();
-        }
         if (toPageIn > 0 && (force || (!consumers.isEmpty() && pagedInPendingSize < getMaxPageSize()))) {
             int count = 0;
             result = new ArrayList<QueueMessageReference>(toPageIn);

http://git-wip-us.apache.org/repos/asf/activemq/blob/a64976a3/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java
index 41eef60..ab0f8ce 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java
@@ -340,7 +340,8 @@ public class Topic extends BaseDestination implements Task {
 
                 if (warnOnProducerFlowControl) {
                     warnOnProducerFlowControl = false;
-                    LOG.info("{}, Usage Manager memory limit reached for {}. Producers will be throttled to the rate at which messages are removed from this destination to prevent flooding it. See http://activemq.apache.org/producer-flow-control.html for more info.", getActiveMQDestination().getQualifiedName());
+                    LOG.info("{}, Usage Manager memory limit reached {}. Producers will be throttled to the rate at which messages are removed from this destination to prevent flooding it. See http://activemq.apache.org/producer-flow-control.html for more info.",
+                            getActiveMQDestination().getQualifiedName(), memoryUsage.getLimit());
                 }
 
                 if (!context.isNetworkConnection() && systemUsage.isSendFailIfNoSpace()) {

http://git-wip-us.apache.org/repos/asf/activemq/blob/a64976a3/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java
index b1767e3..2769e68 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java
@@ -411,10 +411,6 @@ public class FilePendingMessageCursor extends AbstractPendingMessageCursor imple
         return true;
     }
 
-    protected boolean isSpaceInMemoryList() {
-        return hasSpace() && isDiskListEmpty();
-    }
-
     protected synchronized void expireOldMessages() {
         if (!memoryList.isEmpty()) {
             for (Iterator<MessageReference> iterator = memoryList.iterator(); iterator.hasNext();) {

http://git-wip-us.apache.org/repos/asf/activemq/blob/a64976a3/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4930Test.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4930Test.java b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4930Test.java
new file mode 100644
index 0000000..f75eae3
--- /dev/null
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4930Test.java
@@ -0,0 +1,132 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.bugs;
+
+import javax.jms.BytesMessage;
+import javax.jms.Connection;
+import javax.jms.DeliveryMode;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import junit.framework.TestCase;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.jmx.QueueViewMBean;
+import org.apache.activemq.broker.region.Queue;
+import org.apache.activemq.broker.region.RegionBroker;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class AMQ4930Test extends TestCase {
+    private static final Logger LOG = LoggerFactory.getLogger(AMQ4930Test.class);
+    final int messageCount = 150;
+    final int messageSize = 1024*1024;
+    final ActiveMQQueue bigQueue = new ActiveMQQueue("BIG");
+    BrokerService broker;
+    ActiveMQConnectionFactory factory;
+
+    protected void configureBroker() throws Exception {
+        broker.setDeleteAllMessagesOnStartup(true);
+        broker.setAdvisorySupport(false);
+        broker.getSystemUsage().getMemoryUsage().setLimit(1*1024*1024);
+
+        PolicyMap pMap = new PolicyMap();
+        PolicyEntry policy = new PolicyEntry();
+        // disable expriy processing as this will call browse in parallel
+        policy.setExpireMessagesPeriod(0);
+        policy.setMaxPageSize(50);
+        policy.setMaxBrowsePageSize(50);
+        pMap.setDefaultEntry(policy);
+
+        broker.setDestinationPolicy(pMap);
+    }
+
+    public void testBrowsePendingNonPersistent() throws Exception {
+        doTestBrowsePending(DeliveryMode.NON_PERSISTENT);
+    }
+
+    public void testBrowsePendingPersistent() throws Exception {
+        doTestBrowsePending(DeliveryMode.PERSISTENT);
+    }
+
+    public void doTestBrowsePending(int deliveryMode) throws Exception {
+
+        Connection connection = factory.createConnection();
+        connection.start();
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        MessageProducer producer = session.createProducer(bigQueue);
+        producer.setDeliveryMode(deliveryMode);
+        BytesMessage bytesMessage = session.createBytesMessage();
+        bytesMessage.writeBytes(new byte[messageSize]);
+
+        for (int i = 0; i < messageCount; i++) {
+            producer.send(bigQueue, bytesMessage);
+            LOG.info("Sent: " + i);
+        }
+
+        final QueueViewMBean queueViewMBean = (QueueViewMBean)
+                broker.getManagementContext().newProxyInstance(broker.getAdminView().getQueues()[0], QueueViewMBean.class, false);
+
+        LOG.info(queueViewMBean.getName() + " Size: " + queueViewMBean.getEnqueueCount());
+
+        connection.close();
+
+        assertFalse("Cache disabled on q", queueViewMBean.isCacheEnabled());
+
+        // ensure repeated browse does now blow mem
+
+        final Queue underTest = (Queue) ((RegionBroker)broker.getRegionBroker()).getQueueRegion().getDestinationMap().get(bigQueue);
+
+        // do twice to attempt to pull in 2*maxBrowsePageSize which uses up the system memory limit
+        underTest.browse();
+        underTest.browse();
+        Runtime.getRuntime().gc();
+        long free = Runtime.getRuntime().freeMemory()/1024;
+        LOG.info("free at start of check: " + free);
+        // check for memory growth
+        for (int i=0; i<10; i++) {
+            LOG.info("free: " + Runtime.getRuntime().freeMemory()/1024);
+            underTest.browse();
+            Runtime.getRuntime().gc();
+            Runtime.getRuntime().gc();
+            assertTrue("No growth: " + Runtime.getRuntime().freeMemory()/1024, Runtime.getRuntime().freeMemory()/1024 >= (free - (free * 0.1)));
+        }
+    }
+
+
+    protected void setUp() throws Exception {
+        super.setUp();
+        broker = new BrokerService();
+        broker.setBrokerName("thisOne");
+        configureBroker();
+        broker.start();
+        factory = new ActiveMQConnectionFactory("vm://thisOne?jms.alwaysSyncSend=true");
+        factory.setWatchTopicAdvisories(false);
+
+    }
+
+    protected void tearDown() throws Exception {
+        super.tearDown();
+        if (broker != null) {
+            broker.stop();
+            broker = null;
+        }
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/activemq/blob/a64976a3/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/TempStoreDataCleanupTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/TempStoreDataCleanupTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/TempStoreDataCleanupTest.java
index 36dafaf..34df4a3 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/TempStoreDataCleanupTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/TempStoreDataCleanupTest.java
@@ -115,7 +115,7 @@ public class TempStoreDataCleanupTest {
     public void testIt() throws Exception {
 
         int startPercentage = broker.getAdminView().getMemoryPercentUsage();
-        LOG.info("MemoryUseage at test start = " + startPercentage);
+        LOG.info("MemoryUsage at test start = " + startPercentage);
 
         for (int i = 0; i < 2; i++) {
             LOG.info("Started the test iteration: " + i + " using queueName = " + queueName);
@@ -146,7 +146,7 @@ public class TempStoreDataCleanupTest {
             TimeUnit.SECONDS.sleep(2);
         }
 
-        LOG.info("MemoryUseage before awaiting temp store cleanup = " + broker.getAdminView().getMemoryPercentUsage());
+        LOG.info("MemoryUsage before awaiting temp store cleanup = " + broker.getAdminView().getMemoryPercentUsage());
 
         final PListStoreImpl pa = (PListStoreImpl) broker.getTempDataStore();
         assertTrue("only one journal file should be left: " + pa.getJournal().getFileMap().size(),

http://git-wip-us.apache.org/repos/asf/activemq/blob/a64976a3/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/plist/KahaDBFilePendingMessageCursorTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/plist/KahaDBFilePendingMessageCursorTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/plist/KahaDBFilePendingMessageCursorTest.java
index f0338ba..5a3b318 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/plist/KahaDBFilePendingMessageCursorTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/plist/KahaDBFilePendingMessageCursorTest.java
@@ -74,6 +74,7 @@ public class KahaDBFilePendingMessageCursorTest extends FilePendingMessageCursor
             while(underTest.hasNext()) {
                 MessageReference ref = underTest.next();
                 underTest.remove();
+                ref.decrementReferenceCount();
                 assertEquals("id is correct", receivedCount++, ref.getMessageId().getProducerSequenceId());
             }
             assertEquals("got all messages back", receivedCount, numMessages);