You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cs...@apache.org on 2015/08/08 20:00:08 UTC

activemq git commit: https://issues.apache.org/jira/browse/AMQ-5748

Repository: activemq
Updated Branches:
  refs/heads/master a3c8bee1f -> a49d46e3c


https://issues.apache.org/jira/browse/AMQ-5748

Updating MemoryTopicMessageStore to decrement store statistics on cache
eviction.  Updating KahaDBMessageStoreSizeStatTest to account for the
fact that a LRU cache is used so the last 100 messages are kept in
memroy.


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/a49d46e3
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/a49d46e3
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/a49d46e3

Branch: refs/heads/master
Commit: a49d46e3ca689af6f2cb721c457be97d654b2492
Parents: a3c8bee
Author: Christopher L. Shannon (cshannon) <ch...@gmail.com>
Authored: Sat Aug 8 17:55:41 2015 +0000
Committer: Christopher L. Shannon (cshannon) <ch...@gmail.com>
Committed: Sat Aug 8 17:59:19 2015 +0000

----------------------------------------------------------------------
 .../store/memory/MemoryMessageStore.java        |  16 +-
 .../store/memory/MemoryTopicMessageStore.java   |  56 ++++++-
 .../store/AbstractMessageStoreSizeStatTest.java | 150 ++++++++++++++-----
 .../kahadb/KahaDBMessageStoreSizeStatTest.java  |   4 +-
 .../MultiKahaDBMessageStoreSizeStatTest.java    |  12 +-
 .../memory/MemoryMessageStoreSizeStatTest.java  |  57 ++++++-
 6 files changed, 242 insertions(+), 53 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/a49d46e3/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryMessageStore.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryMessageStore.java b/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryMessageStore.java
index 51006c2..3989646 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryMessageStore.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryMessageStore.java
@@ -57,8 +57,7 @@ public class MemoryMessageStore extends AbstractMessageStore {
     public synchronized void addMessage(ConnectionContext context, Message message) throws IOException {
         synchronized (messageTable) {
             messageTable.put(message.getMessageId(), message);
-            getMessageStoreStatistics().getMessageCount().increment();
-            getMessageStoreStatistics().getMessageSize().addSize(message.getSize());
+            incMessageStoreStatistics(message);
         }
         message.incrementReferenceCount();
         message.getMessageId().setFutureOrSequenceLong(sequenceId++);
@@ -94,8 +93,7 @@ public class MemoryMessageStore extends AbstractMessageStore {
             Message removed = messageTable.remove(msgId);
             if( removed !=null ) {
                 removed.decrementReferenceCount();
-                getMessageStoreStatistics().getMessageCount().decrement();
-                getMessageStoreStatistics().getMessageSize().addSize(-removed.getSize());
+                decMessageStoreStatistics(removed);
             }
             if ((lastBatchId != null && lastBatchId.equals(msgId)) || messageTable.isEmpty()) {
                 lastBatchId = null;
@@ -200,4 +198,14 @@ public class MemoryMessageStore extends AbstractMessageStore {
         }
     }
 
+    protected final void incMessageStoreStatistics(Message message) {
+        getMessageStoreStatistics().getMessageCount().increment();
+        getMessageStoreStatistics().getMessageSize().addSize(message.getSize());
+    }
+
+    protected final void decMessageStoreStatistics(Message message) {
+        getMessageStoreStatistics().getMessageCount().decrement();
+        getMessageStoreStatistics().getMessageSize().addSize(-message.getSize());
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/a49d46e3/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryTopicMessageStore.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryTopicMessageStore.java b/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryTopicMessageStore.java
index 0debfe6..142547f 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryTopicMessageStore.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryTopicMessageStore.java
@@ -22,6 +22,7 @@ import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Map;
 import java.util.Map.Entry;
+
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.Message;
@@ -29,36 +30,47 @@ import org.apache.activemq.command.MessageAck;
 import org.apache.activemq.command.MessageId;
 import org.apache.activemq.command.SubscriptionInfo;
 import org.apache.activemq.store.MessageRecoveryListener;
+import org.apache.activemq.store.MessageStoreStatistics;
 import org.apache.activemq.store.TopicMessageStore;
 import org.apache.activemq.util.LRUCache;
 import org.apache.activemq.util.SubscriptionKey;
 
 /**
- * 
+ *
  */
 public class MemoryTopicMessageStore extends MemoryMessageStore implements TopicMessageStore {
 
     private Map<SubscriptionKey, SubscriptionInfo> subscriberDatabase;
     private Map<SubscriptionKey, MemoryTopicSub> topicSubMap;
+    private final Map<MessageId, Message> originalMessageTable;
 
     public MemoryTopicMessageStore(ActiveMQDestination destination) {
-        this(destination, new LRUCache<MessageId, Message>(100, 100, 0.75f, false), makeSubscriptionInfoMap());
+        this(destination, new MemoryTopicMessageStoreLRUCache(100, 100, 0.75f, false), makeSubscriptionInfoMap());
+
+        //Set the messageStoreStatistics after the super class is initialized so that the stats can be
+        //properly updated on cache eviction
+        MemoryTopicMessageStoreLRUCache cache = (MemoryTopicMessageStoreLRUCache) originalMessageTable;
+        cache.setMessageStoreStatistics(messageStoreStatistics);
     }
 
     public MemoryTopicMessageStore(ActiveMQDestination destination, Map<MessageId, Message> messageTable, Map<SubscriptionKey, SubscriptionInfo> subscriberDatabase) {
         super(destination, messageTable);
         this.subscriberDatabase = subscriberDatabase;
         this.topicSubMap = makeSubMap();
+        //this is only necessary so that messageStoreStatistics can be set if necessary
+        //We need the original reference since messageTable is wrapped in a synchronized map in the parent class
+        this.originalMessageTable = messageTable;
     }
 
     protected static Map<SubscriptionKey, SubscriptionInfo> makeSubscriptionInfoMap() {
         return Collections.synchronizedMap(new HashMap<SubscriptionKey, SubscriptionInfo>());
     }
-    
+
     protected static Map<SubscriptionKey, MemoryTopicSub> makeSubMap() {
         return Collections.synchronizedMap(new HashMap<SubscriptionKey, MemoryTopicSub>());
     }
 
+    @Override
     public synchronized void addMessage(ConnectionContext context, Message message) throws IOException {
         super.addMessage(context, message);
         for (Iterator<MemoryTopicSub> i = topicSubMap.values().iterator(); i.hasNext();) {
@@ -67,6 +79,7 @@ public class MemoryTopicMessageStore extends MemoryMessageStore implements Topic
         }
     }
 
+    @Override
     public synchronized void acknowledge(ConnectionContext context, String clientId, String subscriptionName,
                                          MessageId messageId, MessageAck ack) throws IOException {
         SubscriptionKey key = new SubscriptionKey(clientId, subscriptionName);
@@ -76,10 +89,12 @@ public class MemoryTopicMessageStore extends MemoryMessageStore implements Topic
         }
     }
 
+    @Override
     public synchronized SubscriptionInfo lookupSubscription(String clientId, String subscriptionName) throws IOException {
         return subscriberDatabase.get(new SubscriptionKey(clientId, subscriptionName));
     }
 
+    @Override
     public synchronized void addSubscription(SubscriptionInfo info, boolean retroactive) throws IOException {
         SubscriptionKey key = new SubscriptionKey(info);
         MemoryTopicSub sub = new MemoryTopicSub();
@@ -93,12 +108,14 @@ public class MemoryTopicMessageStore extends MemoryMessageStore implements Topic
         subscriberDatabase.put(key, info);
     }
 
+    @Override
     public synchronized void deleteSubscription(String clientId, String subscriptionName) {
         org.apache.activemq.util.SubscriptionKey key = new SubscriptionKey(clientId, subscriptionName);
         subscriberDatabase.remove(key);
         topicSubMap.remove(key);
     }
 
+    @Override
     public synchronized void recoverSubscription(String clientId, String subscriptionName, MessageRecoveryListener listener) throws Exception {
         MemoryTopicSub sub = topicSubMap.get(new SubscriptionKey(clientId, subscriptionName));
         if (sub != null) {
@@ -106,16 +123,19 @@ public class MemoryTopicMessageStore extends MemoryMessageStore implements Topic
         }
     }
 
+    @Override
     public synchronized void delete() {
         super.delete();
         subscriberDatabase.clear();
         topicSubMap.clear();
     }
 
+    @Override
     public SubscriptionInfo[] getAllSubscriptions() throws IOException {
         return subscriberDatabase.values().toArray(new SubscriptionInfo[subscriberDatabase.size()]);
     }
 
+    @Override
     public synchronized int getMessageCount(String clientId, String subscriberName) throws IOException {
         int result = 0;
         MemoryTopicSub sub = topicSubMap.get(new SubscriptionKey(clientId, subscriberName));
@@ -125,6 +145,7 @@ public class MemoryTopicMessageStore extends MemoryMessageStore implements Topic
         return result;
     }
 
+    @Override
     public synchronized void recoverNextMessages(String clientId, String subscriptionName, int maxReturned, MessageRecoveryListener listener) throws Exception {
         MemoryTopicSub sub = this.topicSubMap.get(new SubscriptionKey(clientId, subscriptionName));
         if (sub != null) {
@@ -132,10 +153,39 @@ public class MemoryTopicMessageStore extends MemoryMessageStore implements Topic
         }
     }
 
+    @Override
     public void resetBatching(String clientId, String subscriptionName) {
         MemoryTopicSub sub = topicSubMap.get(new SubscriptionKey(clientId, subscriptionName));
         if (sub != null) {
             sub.resetBatching();
         }
     }
+
+    /**
+     * Since we initialize the store with a LRUCache in some cases, we need to account for cache evictions
+     * when computing the message store statistics.
+     *
+     */
+    private static class MemoryTopicMessageStoreLRUCache extends LRUCache<MessageId, Message> {
+        private static final long serialVersionUID = -342098639681884413L;
+        private MessageStoreStatistics messageStoreStatistics;
+
+        public MemoryTopicMessageStoreLRUCache(int initialCapacity, int maximumCacheSize,
+                float loadFactor, boolean accessOrder) {
+            super(initialCapacity, maximumCacheSize, loadFactor, accessOrder);
+        }
+
+        public void setMessageStoreStatistics(
+                MessageStoreStatistics messageStoreStatistics) {
+            this.messageStoreStatistics = messageStoreStatistics;
+        }
+
+        @Override
+        protected void onCacheEviction(Map.Entry<MessageId, Message> eldest) {
+            if (messageStoreStatistics != null) {
+                messageStoreStatistics.getMessageCount().decrement();
+                messageStoreStatistics.getMessageSize().addSize(-eldest.getValue().getSize());
+            }
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/a49d46e3/activemq-unit-tests/src/test/java/org/apache/activemq/store/AbstractMessageStoreSizeStatTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/store/AbstractMessageStoreSizeStatTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/store/AbstractMessageStoreSizeStatTest.java
index 1b927b4..944d183 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/store/AbstractMessageStoreSizeStatTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/store/AbstractMessageStoreSizeStatTest.java
@@ -35,6 +35,7 @@ import javax.jms.QueueSession;
 import javax.jms.Session;
 import javax.jms.Topic;
 import javax.jms.TopicSession;
+import javax.jms.TopicSubscriber;
 import javax.management.ObjectName;
 import javax.management.openmbean.CompositeData;
 import javax.management.openmbean.TabularData;
@@ -47,6 +48,8 @@ import org.apache.activemq.broker.region.Destination;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ActiveMQQueue;
 import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.util.Wait;
+import org.apache.activemq.util.Wait.Condition;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -67,6 +70,7 @@ public abstract class AbstractMessageStoreSizeStatTest {
     protected BrokerService broker;
     protected URI brokerConnectURI;
     protected String defaultQueueName = "test.queue";
+    protected String defaultTopicName = "test.topic";
     protected static int messageSize = 1000;
 
     @Before
@@ -100,34 +104,67 @@ public abstract class AbstractMessageStoreSizeStatTest {
 
     @Test
     public void testMessageSize() throws Exception {
-        Destination dest = publishTestMessages(200);
+        Destination dest = publishTestQueueMessages(200);
         verifyStats(dest, 200, 200 * messageSize);
     }
 
     @Test
     public void testMessageSizeAfterConsumption() throws Exception {
 
-        Destination dest = publishTestMessages(200);
+        Destination dest = publishTestQueueMessages(200);
         verifyStats(dest, 200, 200 * messageSize);
 
-        consumeTestMessages();
-        Thread.sleep(3000);
+        consumeTestQueueMessages();
+
         verifyStats(dest, 0, 0);
     }
 
     @Test
-    public void testMessageSizeDurable() throws Exception {
+    public void testMessageSizeOneDurable() throws Exception {
+
+        Connection connection = new ActiveMQConnectionFactory(brokerConnectURI).createConnection();
+        connection.setClientID("clientId");
+        connection.start();
 
-        Destination dest = publishTestMessagesDurable();
+        Destination dest = publishTestMessagesDurable(connection, new String[] {"sub1"}, 200, 200);
 
         //verify the count and size
         verifyStats(dest, 200, 200 * messageSize);
 
+        //consume all messages
+        consumeDurableTestMessages(connection, "sub1", 200);
+
+        //All messages should now be gone
+        verifyStats(dest, 0, 0);
+
+        connection.close();
+    }
+
+    @Test(timeout=10000)
+    public void testMessageSizeTwoDurables() throws Exception {
+
+        Connection connection = new ActiveMQConnectionFactory(brokerConnectURI).createConnection();
+        connection.setClientID("clientId");
+        connection.start();
+
+        Destination dest = publishTestMessagesDurable(connection, new String[] {"sub1", "sub2"}, 200, 200);
+
+        //verify the count and size
+        verifyStats(dest, 200, 200 * messageSize);
+
+        //consume messages just for sub1
+        consumeDurableTestMessages(connection, "sub1", 200);
+
+        //There is still a durable that hasn't consumed so the messages should exist
+        verifyStats(dest, 200, 200 * messageSize);
+
+        connection.stop();
+
     }
 
     @Test
     public void testMessageSizeAfterDestinationDeletion() throws Exception {
-        Destination dest = publishTestMessages(200);
+        Destination dest = publishTestQueueMessages(200);
         verifyStats(dest, 200, 200 * messageSize);
 
         //check that the size is 0 after deletion
@@ -135,18 +172,34 @@ public abstract class AbstractMessageStoreSizeStatTest {
         verifyStats(dest, 0, 0);
     }
 
-    protected void verifyStats(Destination dest, int count, long minimumSize) throws Exception {
-        MessageStore messageStore = dest.getMessageStore();
-        MessageStoreStatistics storeStats = dest.getMessageStore().getMessageStoreStatistics();
-        assertEquals(messageStore.getMessageCount(), count);
-        assertEquals(messageStore.getMessageCount(),
-                storeStats.getMessageCount().getCount());
-        assertEquals(messageStore.getMessageSize(),
+    protected void verifyStats(Destination dest, final int count, final long minimumSize) throws Exception {
+        final MessageStore messageStore = dest.getMessageStore();
+        final MessageStoreStatistics storeStats = dest.getMessageStore().getMessageStoreStatistics();
+
+        Wait.waitFor(new Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                return (count == messageStore.getMessageCount()) && (messageStore.getMessageCount() ==
+                        storeStats.getMessageCount().getCount()) && (messageStore.getMessageSize() ==
                 messageStore.getMessageStoreStatistics().getMessageSize().getTotalSize());
+            }
+        });
+
         if (count > 0) {
             assertTrue(storeStats.getMessageSize().getTotalSize() > minimumSize);
+            Wait.waitFor(new Condition() {
+                @Override
+                public boolean isSatisified() throws Exception {
+                    return storeStats.getMessageSize().getTotalSize() > minimumSize;
+                }
+            });
         } else {
-            assertEquals(storeStats.getMessageSize().getTotalSize(), 0);
+            Wait.waitFor(new Condition() {
+                @Override
+                public boolean isSatisified() throws Exception {
+                    return storeStats.getMessageSize().getTotalSize() == 0;
+                }
+            });
         }
     }
 
@@ -166,11 +219,11 @@ public abstract class AbstractMessageStoreSizeStatTest {
     }
 
 
-    protected Destination publishTestMessages(int count) throws Exception {
-        return publishTestMessages(count, defaultQueueName);
+    protected Destination publishTestQueueMessages(int count) throws Exception {
+        return publishTestQueueMessages(count, defaultQueueName);
     }
 
-    protected Destination publishTestMessages(int count, String queueName) throws Exception {
+    protected Destination publishTestQueueMessages(int count, String queueName) throws Exception {
         // create a new queue
         final ActiveMQDestination activeMqQueue = new ActiveMQQueue(
                 queueName);
@@ -196,17 +249,21 @@ public abstract class AbstractMessageStoreSizeStatTest {
             }
 
         } finally {
-            connection.stop();
+            connection.close();
         }
 
         return dest;
     }
 
-    protected Destination consumeTestMessages() throws Exception {
-        return consumeTestMessages(defaultQueueName);
+    protected Destination consumeTestQueueMessages() throws Exception {
+        return consumeTestQueueMessages(defaultQueueName);
+    }
+
+    protected Destination consumeDurableTestMessages(Connection connection, String sub, int size) throws Exception {
+        return consumeDurableTestMessages(connection, sub, size, defaultTopicName);
     }
 
-    protected Destination consumeTestMessages(String queueName) throws Exception {
+    protected Destination consumeTestQueueMessages(String queueName) throws Exception {
         // create a new queue
         final ActiveMQDestination activeMqQueue = new ActiveMQQueue(
                 queueName);
@@ -235,22 +292,45 @@ public abstract class AbstractMessageStoreSizeStatTest {
         return dest;
     }
 
-    protected Destination publishTestMessagesDurable() throws Exception {
+    protected Destination consumeDurableTestMessages(Connection connection, String sub, int size, String topicName) throws Exception {
         // create a new queue
         final ActiveMQDestination activeMqTopic = new ActiveMQTopic(
-                "test.topic");
+                topicName);
+
+        Destination dest = broker.getDestination(activeMqTopic);
+
+        Session session = connection.createSession(false,
+                QueueSession.AUTO_ACKNOWLEDGE);
+        Topic topic = session.createTopic(topicName);
+
+        try {
+            TopicSubscriber consumer = session.createDurableSubscriber(topic, sub);
+            for (int i = 0; i < size; i++) {
+                consumer.receive();
+            }
+
+        } finally {
+            session.close();
+        }
+
+        return dest;
+    }
+
+    protected Destination publishTestMessagesDurable(Connection connection, String[] subNames, int publishSize, int expectedSize) throws Exception {
+        // create a new queue
+        final ActiveMQDestination activeMqTopic = new ActiveMQTopic(
+                defaultTopicName);
 
         Destination dest = broker.getDestination(activeMqTopic);
 
         // Start the connection
-        Connection connection = new ActiveMQConnectionFactory(brokerConnectURI)
-        .createConnection();
-        connection.setClientID("clientId");
-        connection.start();
+
         Session session = connection.createSession(false,
                 TopicSession.AUTO_ACKNOWLEDGE);
-        Topic topic = session.createTopic("test.topic");
-        session.createDurableSubscriber(topic, "sub1");
+        Topic topic = session.createTopic(defaultTopicName);
+        for (String subName : subNames) {
+            session.createDurableSubscriber(topic, subName);
+        }
 
         // browse the durable sub - this test is to verify that browsing (which calls createTopicMessageStore)
         //in KahaDBStore will not create a brand new store (ie uses the cache) If the cache is not used,
@@ -263,21 +343,21 @@ public abstract class AbstractMessageStoreSizeStatTest {
             // store
             MessageProducer prod = session.createProducer(topic);
             prod.setDeliveryMode(DeliveryMode.PERSISTENT);
-            for (int i = 0; i < 200; i++) {
+            for (int i = 0; i < publishSize; i++) {
                 prod.send(createMessage(session));
             }
 
-            //verify the view has 200 messages
-            assertEquals(1, subs.length);
+            //verify the view has expected messages
+            assertEquals(subNames.length, subs.length);
             ObjectName subName = subs[0];
             DurableSubscriptionViewMBean sub = (DurableSubscriptionViewMBean)
                     broker.getManagementContext().newProxyInstance(subName, DurableSubscriptionViewMBean.class, true);
             CompositeData[] data  = sub.browse();
             assertNotNull(data);
-            assertEquals(200, data.length);
+            assertEquals(expectedSize, data.length);
 
         } finally {
-            connection.stop();
+            session.close();
         }
 
         return dest;

http://git-wip-us.apache.org/repos/asf/activemq/blob/a49d46e3/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBMessageStoreSizeStatTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBMessageStoreSizeStatTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBMessageStoreSizeStatTest.java
index bb46f20..28884e6 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBMessageStoreSizeStatTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBMessageStoreSizeStatTest.java
@@ -64,7 +64,7 @@ public class KahaDBMessageStoreSizeStatTest extends
     @Test
     public void testMessageSizeAfterRestartAndPublish() throws Exception {
 
-        Destination dest = publishTestMessages(200);
+        Destination dest = publishTestQueueMessages(200);
 
         // verify the count and size
         verifyStats(dest, 200, 200 * messageSize);
@@ -72,7 +72,7 @@ public class KahaDBMessageStoreSizeStatTest extends
         // stop, restart broker and publish more messages
         stopBroker();
         this.setUpBroker(false);
-        dest = publishTestMessages(200);
+        dest = publishTestQueueMessages(200);
 
         // verify the count and size
         verifyStats(dest, 400, 400 * messageSize);

http://git-wip-us.apache.org/repos/asf/activemq/blob/a49d46e3/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/MultiKahaDBMessageStoreSizeStatTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/MultiKahaDBMessageStoreSizeStatTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/MultiKahaDBMessageStoreSizeStatTest.java
index 4342e1d..849a91b 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/MultiKahaDBMessageStoreSizeStatTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/MultiKahaDBMessageStoreSizeStatTest.java
@@ -84,7 +84,7 @@ public class MultiKahaDBMessageStoreSizeStatTest extends
     @Test
     public void testMessageSizeAfterRestartAndPublish() throws Exception {
 
-        Destination dest = publishTestMessages(200);
+        Destination dest = publishTestQueueMessages(200);
 
         // verify the count and size
         verifyStats(dest, 200, 200 * messageSize);
@@ -92,7 +92,7 @@ public class MultiKahaDBMessageStoreSizeStatTest extends
         // stop, restart broker and publish more messages
         stopBroker();
         this.setUpBroker(false);
-        dest = publishTestMessages(200);
+        dest = publishTestQueueMessages(200);
 
         // verify the count and size
         verifyStats(dest, 400, 400 * messageSize);
@@ -102,13 +102,13 @@ public class MultiKahaDBMessageStoreSizeStatTest extends
     @Test
     public void testMessageSizeAfterRestartAndPublishMultiQueue() throws Exception {
 
-        Destination dest = publishTestMessages(200);
+        Destination dest = publishTestQueueMessages(200);
 
         // verify the count and size
         verifyStats(dest, 200, 200 * messageSize);
         assertTrue(broker.getPersistenceAdapter().size() > 200 * messageSize);
 
-        Destination dest2 = publishTestMessages(200, "test.queue2");
+        Destination dest2 = publishTestQueueMessages(200, "test.queue2");
 
         // verify the count and size
         verifyStats(dest2, 200, 200 * messageSize);
@@ -117,8 +117,8 @@ public class MultiKahaDBMessageStoreSizeStatTest extends
         // stop, restart broker and publish more messages
         stopBroker();
         this.setUpBroker(false);
-        dest = publishTestMessages(200);
-        dest2 = publishTestMessages(200, "test.queue2");
+        dest = publishTestQueueMessages(200);
+        dest2 = publishTestQueueMessages(200, "test.queue2");
 
         // verify the count and size after publishing messages
         verifyStats(dest, 400, 400 * messageSize);

http://git-wip-us.apache.org/repos/asf/activemq/blob/a49d46e3/activemq-unit-tests/src/test/java/org/apache/activemq/store/memory/MemoryMessageStoreSizeStatTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/store/memory/MemoryMessageStoreSizeStatTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/store/memory/MemoryMessageStoreSizeStatTest.java
index 755936c..dc6ff8b 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/store/memory/MemoryMessageStoreSizeStatTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/store/memory/MemoryMessageStoreSizeStatTest.java
@@ -18,16 +18,21 @@ package org.apache.activemq.store.memory;
 
 import java.io.IOException;
 
+import javax.jms.Connection;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.region.Destination;
 import org.apache.activemq.store.AbstractMessageStoreSizeStatTest;
+import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
  * This test checks that KahaDB properly sets the new storeMessageSize statistic.
- * 
+ *
  * AMQ-5748
- * 
+ *
  */
 public class MemoryMessageStoreSizeStatTest extends AbstractMessageStoreSizeStatTest {
     protected static final Logger LOG = LoggerFactory
@@ -39,7 +44,53 @@ public class MemoryMessageStoreSizeStatTest extends AbstractMessageStoreSizeStat
         broker.setPersistenceAdapter(new MemoryPersistenceAdapter());
     }
 
+    @Override
+    @Test(timeout=10000)
+    public void testMessageSizeOneDurable() throws Exception {
+        Connection connection = new ActiveMQConnectionFactory(brokerConnectURI).createConnection();
+        connection.setClientID("clientId");
+        connection.start();
+
+        //The expected value is only 100 because for durables a LRUCache is being used
+        //with a max size of 100
+        Destination dest = publishTestMessagesDurable(connection, new String[] {"sub1"}, 200, 100);
+
+        //verify the count and size, should be 100 because of the LRUCache
+        verifyStats(dest, 100, 100 * messageSize);
+
+        consumeDurableTestMessages(connection, "sub1", 100);
+
+        //Since an LRU cache is used and messages are kept in memory, this should be 100 still
+        verifyStats(dest, 100, 100 * messageSize);
+
+        connection.stop();
+
+    }
+
+    @Override
+    @Test(timeout=10000)
+    public void testMessageSizeTwoDurables() throws Exception {
+        Connection connection = new ActiveMQConnectionFactory(brokerConnectURI).createConnection();
+        connection.setClientID("clientId");
+        connection.start();
+
+        //The expected value is only 100 because for durables a LRUCache is being used
+        //with a max size of 100, so only 100 messages are kept
+        Destination dest = publishTestMessagesDurable(connection, new String[] {"sub1", "sub2"}, 200, 100);
+
+        //verify the count and size
+        verifyStats(dest, 100, 100 * messageSize);
+
+        //consume for sub1
+        consumeDurableTestMessages(connection, "sub1", 100);
+
+        //Should be 100 messages still
+        verifyStats(dest, 100, 100 * messageSize);
+
+        connection.stop();
+
+    }
+
 
- 
 
 }