You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cs...@apache.org on 2015/09/09 20:12:47 UTC

[1/3] activemq git commit: https://issues.apache.org/jira/browse/AMQ-5923

Repository: activemq
Updated Branches:
  refs/heads/master b17cc37ef -> 734fb7dda


http://git-wip-us.apache.org/repos/asf/activemq/blob/734fb7dd/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBMessageStoreSizeStatTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBMessageStoreSizeStatTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBMessageStoreSizeStatTest.java
index 28884e6..276a310 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBMessageStoreSizeStatTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBMessageStoreSizeStatTest.java
@@ -18,12 +18,15 @@ package org.apache.activemq.store.kahadb;
 
 import java.io.File;
 import java.io.IOException;
+import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.broker.region.Destination;
 import org.apache.activemq.store.AbstractMessageStoreSizeStatTest;
 import org.apache.commons.io.FileUtils;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -39,12 +42,13 @@ public class KahaDBMessageStoreSizeStatTest extends
     protected static final Logger LOG = LoggerFactory
             .getLogger(KahaDBMessageStoreSizeStatTest.class);
 
-    File dataFileDir = new File("target/test-amq-5748/stat-datadb");
+    @Rule
+    public TemporaryFolder dataFileDir = new TemporaryFolder(new File("target"));
 
     @Override
     protected void setUpBroker(boolean clearDataDir) throws Exception {
-        if (clearDataDir && dataFileDir.exists())
-            FileUtils.cleanDirectory(dataFileDir);
+        if (clearDataDir && dataFileDir.getRoot().exists())
+            FileUtils.cleanDirectory(dataFileDir.getRoot());
         super.setUpBroker(clearDataDir);
     }
 
@@ -52,7 +56,7 @@ public class KahaDBMessageStoreSizeStatTest extends
     protected void initPersistence(BrokerService brokerService)
             throws IOException {
         broker.setPersistent(true);
-        broker.setDataDirectoryFile(dataFileDir);
+        broker.setDataDirectoryFile(dataFileDir.getRoot());
     }
 
     /**
@@ -63,19 +67,19 @@ public class KahaDBMessageStoreSizeStatTest extends
      */
     @Test
     public void testMessageSizeAfterRestartAndPublish() throws Exception {
-
-        Destination dest = publishTestQueueMessages(200);
+        AtomicLong publishedMessageSize = new AtomicLong();
+        Destination dest = publishTestQueueMessages(200, publishedMessageSize);
 
         // verify the count and size
-        verifyStats(dest, 200, 200 * messageSize);
+        verifyStats(dest, 200, publishedMessageSize.get());
 
         // stop, restart broker and publish more messages
         stopBroker();
         this.setUpBroker(false);
-        dest = publishTestQueueMessages(200);
+        dest = publishTestQueueMessages(200, publishedMessageSize);
 
         // verify the count and size
-        verifyStats(dest, 400, 400 * messageSize);
+        verifyStats(dest, 400, publishedMessageSize.get());
 
     }
 

http://git-wip-us.apache.org/repos/asf/activemq/blob/734fb7dd/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/MultiKahaDBMessageStoreSizeStatTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/MultiKahaDBMessageStoreSizeStatTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/MultiKahaDBMessageStoreSizeStatTest.java
index 849a91b..3572acc 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/MultiKahaDBMessageStoreSizeStatTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/MultiKahaDBMessageStoreSizeStatTest.java
@@ -22,12 +22,15 @@ import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.broker.region.Destination;
 import org.apache.activemq.store.AbstractMessageStoreSizeStatTest;
 import org.apache.commons.io.FileUtils;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -43,12 +46,13 @@ public class MultiKahaDBMessageStoreSizeStatTest extends
     protected static final Logger LOG = LoggerFactory
             .getLogger(MultiKahaDBMessageStoreSizeStatTest.class);
 
-    File dataFileDir = new File("target/test-amq-5748/stat-datadb");
+    @Rule
+    public TemporaryFolder dataFileDir = new TemporaryFolder(new File("target"));
 
     @Override
     protected void setUpBroker(boolean clearDataDir) throws Exception {
-        if (clearDataDir && dataFileDir.exists())
-            FileUtils.cleanDirectory(dataFileDir);
+        if (clearDataDir && dataFileDir.getRoot().exists())
+            FileUtils.cleanDirectory(dataFileDir.getRoot());
         super.setUpBroker(clearDataDir);
     }
 
@@ -59,7 +63,7 @@ public class MultiKahaDBMessageStoreSizeStatTest extends
 
         //setup multi-kaha adapter
         MultiKahaDBPersistenceAdapter persistenceAdapter = new MultiKahaDBPersistenceAdapter();
-        persistenceAdapter.setDirectory(dataFileDir);
+        persistenceAdapter.setDirectory(dataFileDir.getRoot());
 
         KahaDBPersistenceAdapter kahaStore = new KahaDBPersistenceAdapter();
         kahaStore.setJournalMaxFileLength(1024 * 512);
@@ -81,51 +85,53 @@ public class MultiKahaDBMessageStoreSizeStatTest extends
      *
      * @throws Exception
      */
-    @Test
+    @Test(timeout=10000)
     public void testMessageSizeAfterRestartAndPublish() throws Exception {
+        AtomicLong publishedMessageSize = new AtomicLong();
 
-        Destination dest = publishTestQueueMessages(200);
+        Destination dest = publishTestQueueMessages(200, publishedMessageSize);
 
         // verify the count and size
-        verifyStats(dest, 200, 200 * messageSize);
+        verifyStats(dest, 200, publishedMessageSize.get());
 
         // stop, restart broker and publish more messages
         stopBroker();
         this.setUpBroker(false);
-        dest = publishTestQueueMessages(200);
+        dest = publishTestQueueMessages(200, publishedMessageSize);
 
         // verify the count and size
-        verifyStats(dest, 400, 400 * messageSize);
+        verifyStats(dest, 400, publishedMessageSize.get());
 
     }
 
-    @Test
+    @Test(timeout=10000)
     public void testMessageSizeAfterRestartAndPublishMultiQueue() throws Exception {
+        AtomicLong publishedMessageSize = new AtomicLong();
+        AtomicLong publishedMessageSize2 = new AtomicLong();
 
-        Destination dest = publishTestQueueMessages(200);
+        Destination dest = publishTestQueueMessages(200, publishedMessageSize);
 
         // verify the count and size
-        verifyStats(dest, 200, 200 * messageSize);
-        assertTrue(broker.getPersistenceAdapter().size() > 200 * messageSize);
+        verifyStats(dest, 200, publishedMessageSize.get());
+        assertTrue(broker.getPersistenceAdapter().size() > publishedMessageSize.get());
 
-        Destination dest2 = publishTestQueueMessages(200, "test.queue2");
+        Destination dest2 = publishTestQueueMessages(200, "test.queue2", publishedMessageSize2);
 
         // verify the count and size
-        verifyStats(dest2, 200, 200 * messageSize);
-        assertTrue(broker.getPersistenceAdapter().size() > 400 * messageSize);
+        verifyStats(dest2, 200, publishedMessageSize2.get());
+        assertTrue(broker.getPersistenceAdapter().size() > publishedMessageSize.get() + publishedMessageSize2.get());
 
         // stop, restart broker and publish more messages
         stopBroker();
         this.setUpBroker(false);
-        dest = publishTestQueueMessages(200);
-        dest2 = publishTestQueueMessages(200, "test.queue2");
+        dest = publishTestQueueMessages(200, publishedMessageSize);
+        dest2 = publishTestQueueMessages(200, "test.queue2", publishedMessageSize2);
 
         // verify the count and size after publishing messages
-        verifyStats(dest, 400, 400 * messageSize);
-        verifyStats(dest2, 400, 400 * messageSize);
+        verifyStats(dest, 400, publishedMessageSize.get());
+        verifyStats(dest2, 400, publishedMessageSize2.get());
 
-        System.out.println(broker.getPersistenceAdapter().size());
-        assertTrue(broker.getPersistenceAdapter().size() > 800 * messageSize);
+        assertTrue(broker.getPersistenceAdapter().size() > publishedMessageSize.get() + publishedMessageSize2.get());
         assertTrue(broker.getPersistenceAdapter().size() >=
                 (dest.getMessageStore().getMessageSize() + dest2.getMessageStore().getMessageSize()));
 

http://git-wip-us.apache.org/repos/asf/activemq/blob/734fb7dd/activemq-unit-tests/src/test/java/org/apache/activemq/store/memory/MemoryMessageStoreSizeStatTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/store/memory/MemoryMessageStoreSizeStatTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/store/memory/MemoryMessageStoreSizeStatTest.java
index dc6ff8b..ba2ae33 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/store/memory/MemoryMessageStoreSizeStatTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/store/memory/MemoryMessageStoreSizeStatTest.java
@@ -17,6 +17,7 @@
 package org.apache.activemq.store.memory;
 
 import java.io.IOException;
+import java.util.concurrent.atomic.AtomicLong;
 
 import javax.jms.Connection;
 
@@ -24,6 +25,7 @@ import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.broker.region.Destination;
 import org.apache.activemq.store.AbstractMessageStoreSizeStatTest;
+import org.apache.activemq.store.AbstractStoreStatTestSupport;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -47,21 +49,23 @@ public class MemoryMessageStoreSizeStatTest extends AbstractMessageStoreSizeStat
     @Override
     @Test(timeout=10000)
     public void testMessageSizeOneDurable() throws Exception {
+        AtomicLong publishedMessageSize = new AtomicLong();
         Connection connection = new ActiveMQConnectionFactory(brokerConnectURI).createConnection();
         connection.setClientID("clientId");
         connection.start();
 
         //The expected value is only 100 because for durables a LRUCache is being used
         //with a max size of 100
-        Destination dest = publishTestMessagesDurable(connection, new String[] {"sub1"}, 200, 100);
+        Destination dest = publishTestMessagesDurable(connection, new String[] {"sub1"}, 200, 100, publishedMessageSize);
 
         //verify the count and size, should be 100 because of the LRUCache
-        verifyStats(dest, 100, 100 * messageSize);
+        //verify size is at least the minimum of 100 messages times 100 bytes
+        verifyStats(dest, 100, 100 * 100);
 
-        consumeDurableTestMessages(connection, "sub1", 100);
+        consumeDurableTestMessages(connection, "sub1", 100, publishedMessageSize);
 
         //Since an LRU cache is used and messages are kept in memory, this should be 100 still
-        verifyStats(dest, 100, 100 * messageSize);
+        verifyStats(dest, 100, publishedMessageSize.get());
 
         connection.stop();
 
@@ -70,22 +74,24 @@ public class MemoryMessageStoreSizeStatTest extends AbstractMessageStoreSizeStat
     @Override
     @Test(timeout=10000)
     public void testMessageSizeTwoDurables() throws Exception {
+        AtomicLong publishedMessageSize = new AtomicLong();
         Connection connection = new ActiveMQConnectionFactory(brokerConnectURI).createConnection();
         connection.setClientID("clientId");
         connection.start();
 
         //The expected value is only 100 because for durables a LRUCache is being used
         //with a max size of 100, so only 100 messages are kept
-        Destination dest = publishTestMessagesDurable(connection, new String[] {"sub1", "sub2"}, 200, 100);
+        Destination dest = publishTestMessagesDurable(connection, new String[] {"sub1", "sub2"}, 200, 100, publishedMessageSize);
 
         //verify the count and size
-        verifyStats(dest, 100, 100 * messageSize);
+        //verify size is at least the minimum of 100 messages times 100 bytes
+        verifyStats(dest, 100, 100 * 100);
 
         //consume for sub1
-        consumeDurableTestMessages(connection, "sub1", 100);
+        consumeDurableTestMessages(connection, "sub1", 100, publishedMessageSize);
 
         //Should be 100 messages still
-        verifyStats(dest, 100, 100 * messageSize);
+        verifyStats(dest, 100, publishedMessageSize.get());
 
         connection.stop();
 


[3/3] activemq git commit: https://issues.apache.org/jira/browse/AMQ-5923

Posted by cs...@apache.org.
https://issues.apache.org/jira/browse/AMQ-5923

Adding metrics to track the pending message size for a queue and for
subscribers.  This is useful so that not only the pending count is
known but also the total message size left to consume. Also improving
the message size store tests as well.


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/734fb7dd
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/734fb7dd
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/734fb7dd

Branch: refs/heads/master
Commit: 734fb7dda35285ada7bc57642215077e08c88e80
Parents: b17cc37
Author: Christopher L. Shannon (cshannon) <ch...@gmail.com>
Authored: Fri Aug 21 18:58:08 2015 +0000
Committer: Christopher L. Shannon (cshannon) <ch...@gmail.com>
Committed: Wed Sep 9 18:12:15 2015 +0000

----------------------------------------------------------------------
 .../broker/region/PrefetchSubscription.java     |   7 +
 .../apache/activemq/broker/region/Queue.java    |  13 +
 .../activemq/broker/region/Subscription.java    |   5 +
 .../broker/region/TopicSubscription.java        |   7 +
 .../region/cursors/AbstractStoreCursor.java     |  27 +
 .../cursors/FilePendingMessageCursor.java       |  21 +-
 .../region/cursors/OrderedPendingList.java      |  35 +-
 .../broker/region/cursors/PendingList.java      |   3 +
 .../region/cursors/PendingMessageCursor.java    |  66 +--
 .../region/cursors/PendingMessageHelper.java    |  68 +++
 .../region/cursors/PrioritizedPendingList.java  |  31 +-
 .../cursors/QueueDispatchPendingList.java       |   5 +
 .../region/cursors/QueueStorePrefetch.java      |  33 +-
 .../cursors/StoreDurableSubscriberCursor.java   |   9 +
 .../broker/region/cursors/StoreQueueCursor.java |  25 +
 .../region/cursors/TopicStorePrefetch.java      |  24 +-
 .../region/cursors/VMPendingMessageCursor.java  |  60 +-
 .../java/org/apache/activemq/store/PList.java   |   2 +
 .../activemq/store/ProxyTopicMessageStore.java  |  10 +
 .../activemq/store/TopicMessageStore.java       |   2 +
 .../store/memory/MemoryTopicMessageStore.java   |  10 +
 .../activemq/store/memory/MemoryTopicSub.java   |  23 +-
 .../store/jdbc/JDBCTopicMessageStore.java       |  31 +-
 .../store/journal/JournalTopicMessageStore.java |  24 +-
 .../activemq/store/kahadb/KahaDBStore.java      |  24 +
 .../activemq/store/kahadb/MessageDatabase.java  |  26 +
 .../activemq/store/kahadb/TempKahaDBStore.java  |   5 +
 .../store/kahadb/disk/index/ListIndex.java      |  20 +-
 .../store/kahadb/disk/index/ListNode.java       |  13 +-
 .../activemq/store/kahadb/plist/PListImpl.java  |  67 ++-
 .../apache/activemq/leveldb/LevelDBStore.scala  |   6 +
 .../region/QueueDuplicatesFromStoreTest.java    |   7 +
 .../region/SubscriptionAddRemoveQueueTest.java  |  42 ++
 .../AbstractPendingMessageCursorTest.java       | 547 +++++++++++++++++++
 .../cursors/KahaDBPendingMessageCursorTest.java | 126 +++++
 .../cursors/MemoryPendingMessageCursorTest.java | 145 +++++
 .../MultiKahaDBPendingMessageCursorTest.java    |  60 ++
 .../region/cursors/OrderPendingListTest.java    |  10 +
 .../store/AbstractMessageStoreSizeStatTest.java | 244 ++-------
 .../store/AbstractStoreStatTestSupport.java     | 268 +++++++++
 .../kahadb/KahaDBMessageStoreSizeStatTest.java  |  22 +-
 .../MultiKahaDBMessageStoreSizeStatTest.java    |  50 +-
 .../memory/MemoryMessageStoreSizeStatTest.java  |  22 +-
 43 files changed, 1919 insertions(+), 326 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/734fb7dd/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
index ef1b372..4e688cc 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
@@ -582,6 +582,13 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
     }
 
     @Override
+    public long getPendingMessageSize() {
+        synchronized (pendingLock) {
+            return pending.messageSize();
+        }
+    }
+
+    @Override
     public int getDispatchedQueueSize() {
         return dispatched.size();
     }

http://git-wip-us.apache.org/repos/asf/activemq/blob/734fb7dd/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
index c9823e1..b0b609b 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
@@ -927,6 +927,19 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index
         return msg;
     }
 
+    public long getPendingMessageSize() {
+        messagesLock.readLock().lock();
+        try{
+            return messages.messageSize();
+        } finally {
+            messagesLock.readLock().unlock();
+        }
+    }
+
+    public long getPendingMessageCount() {
+         return this.destinationStatistics.getMessages().getCount();
+    }
+
     @Override
     public String toString() {
         return destination.getQualifiedName() + ", subscriptions=" + consumers.size()

http://git-wip-us.apache.org/repos/asf/activemq/blob/734fb7dd/activemq-broker/src/main/java/org/apache/activemq/broker/region/Subscription.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Subscription.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Subscription.java
index 9452b99..4a8b341 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Subscription.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Subscription.java
@@ -119,6 +119,11 @@ public interface Subscription extends SubscriptionRecovery {
     int getPendingQueueSize();
 
     /**
+     * @return size of the messages pending delivery
+     */
+    long getPendingMessageSize();
+
+    /**
      * @return number of messages dispatched to the client
      */
     int getDispatchedQueueSize();

http://git-wip-us.apache.org/repos/asf/activemq/blob/734fb7dd/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
index d3e683d..e1c8a95 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
@@ -419,6 +419,13 @@ public class TopicSubscription extends AbstractSubscription {
     }
 
     @Override
+    public long getPendingMessageSize() {
+        synchronized (matchedListMutex) {
+            return matched.messageSize();
+        }
+    }
+
+    @Override
     public int getDispatchedQueueSize() {
         return (int)(getSubscriptionStatistics().getDispatched().getCount() -
                 prefetchExtension.get() - getSubscriptionStatistics().getDequeues().getCount());

http://git-wip-us.apache.org/repos/asf/activemq/blob/734fb7dd/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
index 4bdd7f6..05e4b1f 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
@@ -23,6 +23,7 @@ import java.util.concurrent.CancellationException;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+
 import org.apache.activemq.broker.region.Destination;
 import org.apache.activemq.broker.region.MessageReference;
 import org.apache.activemq.broker.region.Subscription;
@@ -49,6 +50,8 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i
     final MessageId[] lastCachedIds = new MessageId[2];
     protected boolean hadSpace = false;
 
+
+
     protected AbstractStoreCursor(Destination destination) {
         super((destination != null ? destination.isPrioritizedMessages():false));
         this.regionDestination=destination;
@@ -60,6 +63,7 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i
     }
 
 
+    @Override
     public final synchronized void start() throws Exception{
         if (!isStarted()) {
             super.start();
@@ -78,6 +82,7 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i
         resetSize();
     }
 
+    @Override
     public final synchronized void stop() throws Exception {
         resetBatch();
         super.stop();
@@ -85,6 +90,7 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i
     }
 
 
+    @Override
     public final boolean recoverMessage(Message message) throws Exception {
         return recoverMessage(message,false);
     }
@@ -136,6 +142,7 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i
         duplicatesFromStore.clear();
     }
 
+    @Override
     public final synchronized void reset() {
         if (batchList.isEmpty()) {
             try {
@@ -150,6 +157,7 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i
     }
 
 
+    @Override
     public synchronized void release() {
         clearIterator(false);
     }
@@ -173,6 +181,7 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i
     }
 
 
+    @Override
     public final synchronized boolean hasNext() {
         if (batchList.isEmpty()) {
             try {
@@ -187,6 +196,7 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i
     }
 
 
+    @Override
     public final synchronized MessageReference next() {
         MessageReference result = null;
         if (!this.batchList.isEmpty()&&this.iterator.hasNext()) {
@@ -199,6 +209,7 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i
         return result;
     }
 
+    @Override
     public synchronized boolean addMessageLast(MessageReference node) throws Exception {
         boolean disableCache = false;
         if (hasSpace()) {
@@ -333,12 +344,14 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i
     }
 
 
+    @Override
     public synchronized void addMessageFirst(MessageReference node) throws Exception {
         setCacheEnabled(false);
         size++;
     }
 
 
+    @Override
     public final synchronized void remove() {
         size--;
         if (iterator!=null) {
@@ -350,6 +363,7 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i
     }
 
 
+    @Override
     public final synchronized void remove(MessageReference node) {
         if (batchList.remove(node) != null) {
             size--;
@@ -358,11 +372,13 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i
     }
 
 
+    @Override
     public final synchronized void clear() {
         gc();
     }
 
 
+    @Override
     public synchronized void gc() {
         for (MessageReference msg : batchList) {
             rollback(msg.getMessageId());
@@ -374,6 +390,7 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i
         setCacheEnabled(false);
     }
 
+    @Override
     protected final synchronized void fillBatch() {
         if (LOG.isTraceEnabled()) {
             LOG.trace("{} fillBatch", this);
@@ -395,17 +412,20 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i
     }
 
 
+    @Override
     public final synchronized boolean isEmpty() {
         // negative means more messages added to store through queue.send since last reset
         return size == 0;
     }
 
 
+    @Override
     public final synchronized boolean hasMessagesBufferedToDeliver() {
         return !batchList.isEmpty();
     }
 
 
+    @Override
     public final synchronized int size() {
         if (size < 0) {
             this.size = getStoreSize();
@@ -414,6 +434,11 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i
     }
 
     @Override
+    public final synchronized long messageSize() {
+        return getStoreMessageSize();
+    }
+
+    @Override
     public String toString() {
         return super.toString() + ":" + regionDestination.getActiveMQDestination().getPhysicalName() + ",batchResetNeeded=" + batchResetNeeded
                     + ",size=" + this.size + ",cacheEnabled=" + isCacheEnabled()
@@ -428,6 +453,8 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i
 
     protected abstract int getStoreSize();
 
+    protected abstract long getStoreMessageSize();
+
     protected abstract boolean isStoreEmpty();
 
     public Subscription getSubscription() {

http://git-wip-us.apache.org/repos/asf/activemq/blob/734fb7dd/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java
index 7512e39..3f3f33b 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java
@@ -44,8 +44,8 @@ import org.apache.activemq.util.ByteSequence;
 /**
  * persist pending messages pending message (messages awaiting dispatch to a
  * consumer) cursor
- * 
- * 
+ *
+ *
  */
 public class FilePendingMessageCursor extends AbstractPendingMessageCursor implements UsageListener {
     static final Logger LOG = LoggerFactory.getLogger(FilePendingMessageCursor.class);
@@ -198,15 +198,15 @@ public class FilePendingMessageCursor extends AbstractPendingMessageCursor imple
 
     /**
      * add message to await dispatch
-     * 
+     *
      * @param node
-     * @throws Exception 
+     * @throws Exception
      */
     @Override
     public synchronized boolean addMessageLast(MessageReference node) throws Exception {
         return tryAddMessageLast(node, 0);
     }
-    
+
     @Override
     public synchronized boolean tryAddMessageLast(MessageReference node, long maxWaitTime) throws Exception {
         if (!node.isExpired()) {
@@ -252,7 +252,7 @@ public class FilePendingMessageCursor extends AbstractPendingMessageCursor imple
 
     /**
      * add message to await dispatch
-     * 
+     *
      * @param node
      */
     @Override
@@ -356,6 +356,11 @@ public class FilePendingMessageCursor extends AbstractPendingMessageCursor imple
         return memoryList.size() + (isDiskListEmpty() ? 0 : (int)getDiskList().size());
     }
 
+    @Override
+    public synchronized long messageSize() {
+        return memoryList.messageSize() + (isDiskListEmpty() ? 0 : (int)getDiskList().messageSize());
+    }
+
     /**
      * clear all pending messages
      */
@@ -389,6 +394,7 @@ public class FilePendingMessageCursor extends AbstractPendingMessageCursor imple
         super.setSystemUsage(usageManager);
     }
 
+    @Override
     public void onUsageChanged(Usage usage, int oldPercentUsage, int newPercentUsage) {
         if (newPercentUsage >= getMemoryUsageHighWaterMark()) {
             synchronized (this) {
@@ -497,10 +503,12 @@ public class FilePendingMessageCursor extends AbstractPendingMessageCursor imple
             }
         }
 
+        @Override
         public boolean hasNext() {
             return iterator.hasNext();
         }
 
+        @Override
         public MessageReference next() {
             try {
                 PListEntry entry = iterator.next();
@@ -513,6 +521,7 @@ public class FilePendingMessageCursor extends AbstractPendingMessageCursor imple
             }
         }
 
+        @Override
         public void remove() {
             iterator.remove();
         }

http://git-wip-us.apache.org/repos/asf/activemq/blob/734fb7dd/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/OrderedPendingList.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/OrderedPendingList.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/OrderedPendingList.java
index 9bf9588..31870b1 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/OrderedPendingList.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/OrderedPendingList.java
@@ -25,13 +25,23 @@ import java.util.Map;
 
 import org.apache.activemq.broker.region.MessageReference;
 import org.apache.activemq.command.MessageId;
+import org.apache.activemq.management.SizeStatisticImpl;
 
 public class OrderedPendingList implements PendingList {
 
     private PendingNode root = null;
     private PendingNode tail = null;
     private final Map<MessageId, PendingNode> map = new HashMap<MessageId, PendingNode>();
+    private final SizeStatisticImpl messageSize;
+    private final PendingMessageHelper pendingMessageHelper;
 
+    public OrderedPendingList() {
+        messageSize = new SizeStatisticImpl("messageSize", "The size in bytes of the pending messages");
+        messageSize.setEnabled(true);
+        pendingMessageHelper = new PendingMessageHelper(map, messageSize);
+    }
+
+    @Override
     public PendingNode addMessageFirst(MessageReference message) {
         PendingNode node = new PendingNode(this, message);
         if (root == null) {
@@ -41,10 +51,11 @@ public class OrderedPendingList implements PendingList {
             root.linkBefore(node);
             root = node;
         }
-        this.map.put(message.getMessageId(), node);
+        pendingMessageHelper.addToMap(message, node);
         return node;
     }
 
+    @Override
     public PendingNode addMessageLast(MessageReference message) {
         PendingNode node = new PendingNode(this, message);
         if (root == null) {
@@ -53,29 +64,35 @@ public class OrderedPendingList implements PendingList {
             tail.linkAfter(node);
         }
         tail = node;
-        this.map.put(message.getMessageId(), node);
+        pendingMessageHelper.addToMap(message, node);
         return node;
     }
 
+    @Override
     public void clear() {
         this.root = null;
         this.tail = null;
         this.map.clear();
+        this.messageSize.reset();
     }
 
+    @Override
     public boolean isEmpty() {
         return this.map.isEmpty();
     }
 
+    @Override
     public Iterator<MessageReference> iterator() {
         return new Iterator<MessageReference>() {
             private PendingNode current = null;
             private PendingNode next = root;
 
+            @Override
             public boolean hasNext() {
                 return next != null;
             }
 
+            @Override
             public MessageReference next() {
                 MessageReference result = null;
                 this.current = this.next;
@@ -84,31 +101,39 @@ public class OrderedPendingList implements PendingList {
                 return result;
             }
 
+            @Override
             public void remove() {
                 if (this.current != null && this.current.getMessage() != null) {
-                    map.remove(this.current.getMessage().getMessageId());
+                    pendingMessageHelper.removeFromMap(this.current.getMessage());
                 }
                 removeNode(this.current);
             }
         };
     }
 
+    @Override
     public PendingNode remove(MessageReference message) {
         PendingNode node = null;
         if (message != null) {
-            node = this.map.remove(message.getMessageId());
+            node = pendingMessageHelper.removeFromMap(message);
             removeNode(node);
         }
         return node;
     }
 
+    @Override
     public int size() {
         return this.map.size();
     }
 
+    @Override
+    public long messageSize() {
+        return this.messageSize.getTotalSize();
+    }
+
     void removeNode(PendingNode node) {
         if (node != null) {
-            map.remove(node.getMessage().getMessageId());
+            pendingMessageHelper.removeFromMap(node.getMessage());
             if (root == node) {
                 root = (PendingNode) node.getNext();
             }

http://git-wip-us.apache.org/repos/asf/activemq/blob/734fb7dd/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/PendingList.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/PendingList.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/PendingList.java
index 153d8bd..adfa78e 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/PendingList.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/PendingList.java
@@ -73,6 +73,8 @@ public interface PendingList extends Iterable<MessageReference> {
      */
     public int size();
 
+    public long messageSize();
+
     /**
      * Returns an iterator over the pending Messages.  The subclass controls how
      * the returned iterator actually traverses the list of pending messages allowing
@@ -81,6 +83,7 @@ public interface PendingList extends Iterable<MessageReference> {
      *
      * @return an Iterator that returns MessageReferences contained in this list.
      */
+    @Override
     public Iterator<MessageReference> iterator();
 
     /**

http://git-wip-us.apache.org/repos/asf/activemq/blob/734fb7dd/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java
index 06d59f1..bf7fd7a 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java
@@ -30,14 +30,14 @@ import org.apache.activemq.usage.SystemUsage;
 /**
  * Interface to pending message (messages awaiting disptach to a consumer)
  * cursor
- * 
- * 
+ *
+ *
  */
 public interface PendingMessageCursor extends Service {
 
     /**
      * Add a destination
-     * 
+     *
      * @param context
      * @param destination
      * @throws Exception
@@ -46,7 +46,7 @@ public interface PendingMessageCursor extends Service {
 
     /**
      * remove a destination
-     * 
+     *
      * @param context
      * @param destination
      * @throws Exception
@@ -60,7 +60,7 @@ public interface PendingMessageCursor extends Service {
 
     /**
      * check if a Destination is Empty for this cursor
-     * 
+     *
      * @param destination
      * @return true id the Destination is empty
      */
@@ -79,7 +79,7 @@ public interface PendingMessageCursor extends Service {
 
     /**
      * add message to await dispatch
-     * 
+     *
      * @param node
      * @return boolean true if successful, false if cursor traps a duplicate
      * @throws IOException
@@ -89,9 +89,9 @@ public interface PendingMessageCursor extends Service {
 
     /**
      * add message to await dispatch - if it can
-     * 
+     *
      * @param node
-     * @param maxWaitTime 
+     * @param maxWaitTime
      * @return true if successful
      * @throws IOException
      * @throws Exception
@@ -100,7 +100,7 @@ public interface PendingMessageCursor extends Service {
 
     /**
      * add message to await dispatch
-     * 
+     *
      * @param node
      * @throws Exception
      */
@@ -108,7 +108,7 @@ public interface PendingMessageCursor extends Service {
 
     /**
      * Add a message recovered from a retroactive policy
-     * 
+     *
      * @param node
      * @throws Exception
      */
@@ -134,6 +134,8 @@ public interface PendingMessageCursor extends Service {
      */
     int size();
 
+    long messageSize();
+
     /**
      * clear all pending messages
      */
@@ -142,7 +144,7 @@ public interface PendingMessageCursor extends Service {
     /**
      * Informs the Broker if the subscription needs to intervention to recover
      * it's state e.g. DurableTopicSubscriber may do
-     * 
+     *
      * @return true if recovery required
      */
     boolean isRecoveryRequired();
@@ -154,7 +156,7 @@ public interface PendingMessageCursor extends Service {
 
     /**
      * Set the max batch size
-     * 
+     *
      * @param maxBatchSize
      */
     void setMaxBatchSize(int maxBatchSize);
@@ -167,7 +169,7 @@ public interface PendingMessageCursor extends Service {
 
     /**
      * remove a node
-     * 
+     *
      * @param node
      */
     void remove(MessageReference node);
@@ -179,7 +181,7 @@ public interface PendingMessageCursor extends Service {
 
     /**
      * Set the UsageManager
-     * 
+     *
      * @param systemUsage
      * @see org.apache.activemq.usage.SystemUsage
      */
@@ -204,7 +206,7 @@ public interface PendingMessageCursor extends Service {
      * @return true if the cursor is full
      */
     boolean isFull();
-    
+
     /**
      * @return true if the cursor has space to page messages into
      */
@@ -217,41 +219,41 @@ public interface PendingMessageCursor extends Service {
 
     /**
      * destroy the cursor
-     * 
+     *
      * @throws Exception
      */
     void destroy() throws Exception;
 
     /**
      * Page in a restricted number of messages and increment the reference count
-     * 
+     *
      * @param maxItems
      * @return a list of paged in messages
      */
     LinkedList<MessageReference> pageInList(int maxItems);
-    
+
     /**
      * set the maximum number of producers to track at one time
      * @param value
      */
     void setMaxProducersToAudit(int value);
-    
+
     /**
      * @return the maximum number of producers to audit
      */
     int getMaxProducersToAudit();
-    
+
     /**
      * Set the maximum depth of message ids to track
-     * @param depth 
+     * @param depth
      */
     void setMaxAuditDepth(int depth);
-    
+
     /**
      * @return the audit depth
      */
     int getMaxAuditDepth();
-    
+
     /**
      * @return the enableAudit
      */
@@ -260,37 +262,37 @@ public interface PendingMessageCursor extends Service {
      * @param enableAudit the enableAudit to set
      */
     public void setEnableAudit(boolean enableAudit);
-    
+
     /**
-     * @return true if the underlying state of this cursor 
+     * @return true if the underlying state of this cursor
      * disappears when the broker shuts down
      */
     public boolean isTransient();
-    
-    
+
+
     /**
      * set the audit
      * @param audit
      */
     public void setMessageAudit(ActiveMQMessageAudit audit);
-    
-    
+
+
     /**
      * @return the audit - could be null
      */
     public ActiveMQMessageAudit getMessageAudit();
-    
+
     /**
      * use a cache to improve performance
      * @param useCache
      */
     public void setUseCache(boolean useCache);
-    
+
     /**
      * @return true if a cache may be used
      */
     public boolean isUseCache();
-    
+
     /**
      * remove from auditing the message id
      * @param id

http://git-wip-us.apache.org/repos/asf/activemq/blob/734fb7dd/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageHelper.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageHelper.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageHelper.java
new file mode 100644
index 0000000..f28d61b
--- /dev/null
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageHelper.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.region.cursors;
+
+import java.util.Map;
+
+import org.apache.activemq.broker.region.MessageReference;
+import org.apache.activemq.command.MessageId;
+import org.apache.activemq.management.SizeStatisticImpl;
+
+/**
+ *
+ *
+ */
+public class PendingMessageHelper {
+
+    private final Map<MessageId, PendingNode> map;
+    private final SizeStatisticImpl messageSize;
+
+    public PendingMessageHelper(Map<MessageId, PendingNode> map,
+            SizeStatisticImpl messageSize) {
+        super();
+        this.map = map;
+        this.messageSize = messageSize;
+    }
+
+    public void addToMap(MessageReference message, PendingNode node) {
+        PendingNode previous = this.map.put(message.getMessageId(), node);
+        if (previous != null) {
+            try {
+                messageSize.addSize(-previous.getMessage().getSize());
+            } catch (Exception e) {
+              //expected for NullMessageReference
+            }
+        }
+        try {
+            messageSize.addSize(message.getSize());
+        } catch (Exception e) {
+          //expected for NullMessageReference
+        }
+    }
+
+    public PendingNode removeFromMap(MessageReference message) {
+        PendingNode removed = this.map.remove(message.getMessageId());
+        if (removed != null) {
+            try {
+                messageSize.addSize(-removed.getMessage().getSize());
+            } catch (Exception e) {
+                //expected for NullMessageReference
+            }
+        }
+        return removed;
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/734fb7dd/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/PrioritizedPendingList.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/PrioritizedPendingList.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/PrioritizedPendingList.java
index 9235b2c..70eaa53 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/PrioritizedPendingList.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/PrioritizedPendingList.java
@@ -25,50 +25,64 @@ import java.util.Map;
 
 import org.apache.activemq.broker.region.MessageReference;
 import org.apache.activemq.command.MessageId;
+import org.apache.activemq.management.SizeStatisticImpl;
 
 public class PrioritizedPendingList implements PendingList {
 
     private static final Integer MAX_PRIORITY = 10;
     private final OrderedPendingList[] lists = new OrderedPendingList[MAX_PRIORITY];
     private final Map<MessageId, PendingNode> map = new HashMap<MessageId, PendingNode>();
+    private final SizeStatisticImpl messageSize;
+    private final PendingMessageHelper pendingMessageHelper;
+
 
     public PrioritizedPendingList() {
         for (int i = 0; i < MAX_PRIORITY; i++) {
             this.lists[i] = new OrderedPendingList();
         }
+        messageSize = new SizeStatisticImpl("messageSize", "The size in bytes of the pending messages");
+        messageSize.setEnabled(true);
+        pendingMessageHelper = new PendingMessageHelper(map, messageSize);
     }
 
+    @Override
     public PendingNode addMessageFirst(MessageReference message) {
         PendingNode node = getList(message).addMessageFirst(message);
-        this.map.put(message.getMessageId(), node);
+        this.pendingMessageHelper.addToMap(message, node);
         return node;
     }
 
+    @Override
     public PendingNode addMessageLast(MessageReference message) {
         PendingNode node = getList(message).addMessageLast(message);
-        this.map.put(message.getMessageId(), node);
+        this.pendingMessageHelper.addToMap(message, node);
         return node;
     }
 
+    @Override
     public void clear() {
         for (int i = 0; i < MAX_PRIORITY; i++) {
             this.lists[i].clear();
         }
         this.map.clear();
+        this.messageSize.reset();
     }
 
+    @Override
     public boolean isEmpty() {
         return this.map.isEmpty();
     }
 
+    @Override
     public Iterator<MessageReference> iterator() {
         return new PrioritizedPendingListIterator();
     }
 
+    @Override
     public PendingNode remove(MessageReference message) {
         PendingNode node = null;
         if (message != null) {
-            node = this.map.remove(message.getMessageId());
+            node = this.pendingMessageHelper.removeFromMap(message);
             if (node != null) {
                 node.getList().removeNode(node);
             }
@@ -76,11 +90,17 @@ public class PrioritizedPendingList implements PendingList {
         return node;
     }
 
+    @Override
     public int size() {
         return this.map.size();
     }
 
     @Override
+    public long messageSize() {
+        return this.messageSize.getTotalSize();
+    }
+
+    @Override
     public String toString() {
         return "PrioritizedPendingList(" + System.identityHashCode(this) + ")";
     }
@@ -111,10 +131,12 @@ public class PrioritizedPendingList implements PendingList {
                 }
             }
         }
+        @Override
         public boolean hasNext() {
             return list.size() > index;
         }
 
+        @Override
         public MessageReference next() {
             PendingNode node = list.get(this.index);
             this.currentIndex = this.index;
@@ -122,10 +144,11 @@ public class PrioritizedPendingList implements PendingList {
             return node.getMessage();
         }
 
+        @Override
         public void remove() {
             PendingNode node = list.get(this.currentIndex);
             if (node != null) {
-                map.remove(node.getMessage().getMessageId());
+                pendingMessageHelper.removeFromMap(node.getMessage());
                 node.getList().removeNode(node);
             }
         }

http://git-wip-us.apache.org/repos/asf/activemq/blob/734fb7dd/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/QueueDispatchPendingList.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/QueueDispatchPendingList.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/QueueDispatchPendingList.java
index 380569e..cdddd4c 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/QueueDispatchPendingList.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/QueueDispatchPendingList.java
@@ -97,6 +97,11 @@ public class QueueDispatchPendingList implements PendingList {
     }
 
     @Override
+    public long messageSize() {
+        return pagedInPendingDispatch.messageSize() + redeliveredWaitingDispatch.messageSize();
+    }
+
+    @Override
     public Iterator<MessageReference> iterator() {
         return new Iterator<MessageReference>() {
 

http://git-wip-us.apache.org/repos/asf/activemq/blob/734fb7dd/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java
index 9fb73c5..b10b2e2 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java
@@ -32,14 +32,14 @@ import org.slf4j.LoggerFactory;
 /**
  * persist pending messages pending message (messages awaiting dispatch to a
  * consumer) cursor
- * 
- * 
+ *
+ *
  */
 class QueueStorePrefetch extends AbstractStoreCursor {
     private static final Logger LOG = LoggerFactory.getLogger(QueueStorePrefetch.class);
     private final MessageStore store;
     private final Broker broker;
-   
+
     /**
      * Construct it
      * @param queue
@@ -51,6 +51,7 @@ class QueueStorePrefetch extends AbstractStoreCursor {
 
     }
 
+    @Override
     public boolean recoverMessageReference(MessageId messageReference) throws Exception {
         Message msg = this.store.getMessage(messageReference);
         if (msg != null) {
@@ -62,36 +63,46 @@ class QueueStorePrefetch extends AbstractStoreCursor {
         }
     }
 
-   
-        
+
+
     @Override
     protected synchronized int getStoreSize() {
         try {
             int result = this.store.getMessageCount();
             return result;
-            
+
         } catch (IOException e) {
             LOG.error("Failed to get message count", e);
             throw new RuntimeException(e);
         }
     }
-    
+
+    @Override
+    protected synchronized long getStoreMessageSize() {
+        try {
+            return this.store.getMessageSize();
+        } catch (IOException e) {
+            LOG.error("Failed to get message size", e);
+            throw new RuntimeException(e);
+        }
+    }
+
     @Override
     protected synchronized boolean isStoreEmpty() {
         try {
             return this.store.isEmpty();
-            
+
         } catch (Exception e) {
             LOG.error("Failed to get message count", e);
             throw new RuntimeException(e);
         }
     }
-    
+
     @Override
     protected void resetBatch() {
         this.store.resetBatching();
     }
-    
+
     @Override
     protected void setBatch(MessageId messageId) throws Exception {
         if (LOG.isTraceEnabled()) {
@@ -101,7 +112,7 @@ class QueueStorePrefetch extends AbstractStoreCursor {
         batchResetNeeded = false;
     }
 
-    
+
     @Override
     protected void doFillBatch() throws Exception {
         hadSpace = this.hasSpace();

http://git-wip-us.apache.org/repos/asf/activemq/blob/734fb7dd/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java
index 32000f5..9d723b8 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java
@@ -303,6 +303,15 @@ public class StoreDurableSubscriberCursor extends AbstractPendingMessageCursor {
     }
 
     @Override
+    public synchronized long messageSize() {
+        long pendingSize=0;
+        for (PendingMessageCursor tsp : storePrefetches) {
+            pendingSize += tsp.messageSize();
+        }
+        return pendingSize;
+    }
+
+    @Override
     public void setMaxBatchSize(int newMaxBatchSize) {
         for (PendingMessageCursor storePrefetch : storePrefetches) {
             storePrefetch.setMaxBatchSize(newMaxBatchSize);

http://git-wip-us.apache.org/repos/asf/activemq/blob/734fb7dd/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java
index 5b072a6..caa93b6 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java
@@ -51,6 +51,7 @@ public class StoreQueueCursor extends AbstractPendingMessageCursor {
         currentCursor = persistent;
     }
 
+    @Override
     public synchronized void start() throws Exception {
         started = true;
         super.start();
@@ -73,6 +74,7 @@ public class StoreQueueCursor extends AbstractPendingMessageCursor {
         pendingCount = persistent.size() + nonPersistent.size();
     }
 
+    @Override
     public synchronized void stop() throws Exception {
         started = false;
         if (nonPersistent != null) {
@@ -87,6 +89,7 @@ public class StoreQueueCursor extends AbstractPendingMessageCursor {
         pendingCount = 0;
     }
 
+    @Override
     public synchronized boolean addMessageLast(MessageReference node) throws Exception {
         boolean result = true;
         if (node != null) {
@@ -104,6 +107,7 @@ public class StoreQueueCursor extends AbstractPendingMessageCursor {
         return result;
     }
 
+    @Override
     public synchronized void addMessageFirst(MessageReference node) throws Exception {
         if (node != null) {
             Message msg = node.getMessage();
@@ -119,10 +123,12 @@ public class StoreQueueCursor extends AbstractPendingMessageCursor {
         }
     }
 
+    @Override
     public synchronized void clear() {
         pendingCount = 0;
     }
 
+    @Override
     public synchronized boolean hasNext() {
         try {
             getNextCursor();
@@ -133,11 +139,13 @@ public class StoreQueueCursor extends AbstractPendingMessageCursor {
        return currentCursor != null ? currentCursor.hasNext() : false;
     }
 
+    @Override
     public synchronized MessageReference next() {
         MessageReference result = currentCursor != null ? currentCursor.next() : null;
         return result;
     }
 
+    @Override
     public synchronized void remove() {
         if (currentCursor != null) {
             currentCursor.remove();
@@ -145,6 +153,7 @@ public class StoreQueueCursor extends AbstractPendingMessageCursor {
         pendingCount--;
     }
 
+    @Override
     public synchronized void remove(MessageReference node) {
         if (!node.isPersistent()) {
             nonPersistent.remove(node);
@@ -154,18 +163,21 @@ public class StoreQueueCursor extends AbstractPendingMessageCursor {
         pendingCount--;
     }
 
+    @Override
     public synchronized void reset() {
         nonPersistent.reset();
         persistent.reset();
         pendingCount = persistent.size() + nonPersistent.size();
     }
 
+    @Override
     public void release() {
         nonPersistent.release();
         persistent.release();
     }
 
 
+    @Override
     public synchronized int size() {
         if (pendingCount < 0) {
             pendingCount = persistent.size() + nonPersistent.size();
@@ -173,6 +185,12 @@ public class StoreQueueCursor extends AbstractPendingMessageCursor {
         return pendingCount;
     }
 
+    @Override
+    public synchronized long messageSize() {
+        return persistent.messageSize() + nonPersistent.messageSize();
+    }
+
+    @Override
     public synchronized boolean isEmpty() {
         // if negative, more messages arrived in store since last reset so non empty
         return pendingCount == 0;
@@ -185,6 +203,7 @@ public class StoreQueueCursor extends AbstractPendingMessageCursor {
      * @see org.apache.activemq.broker.region.cursors.PendingMessageCursor
      * @return true if recovery required
      */
+    @Override
     public boolean isRecoveryRequired() {
         return false;
     }
@@ -203,6 +222,7 @@ public class StoreQueueCursor extends AbstractPendingMessageCursor {
         this.nonPersistent = nonPersistent;
     }
 
+    @Override
     public void setMaxBatchSize(int maxBatchSize) {
         persistent.setMaxBatchSize(maxBatchSize);
         if (nonPersistent != null) {
@@ -212,6 +232,7 @@ public class StoreQueueCursor extends AbstractPendingMessageCursor {
     }
 
 
+    @Override
     public void setMaxProducersToAudit(int maxProducersToAudit) {
         super.setMaxProducersToAudit(maxProducersToAudit);
         if (persistent != null) {
@@ -222,6 +243,7 @@ public class StoreQueueCursor extends AbstractPendingMessageCursor {
         }
     }
 
+    @Override
     public void setMaxAuditDepth(int maxAuditDepth) {
         super.setMaxAuditDepth(maxAuditDepth);
         if (persistent != null) {
@@ -232,6 +254,7 @@ public class StoreQueueCursor extends AbstractPendingMessageCursor {
         }
     }
 
+    @Override
     public void setEnableAudit(boolean enableAudit) {
         super.setEnableAudit(enableAudit);
         if (persistent != null) {
@@ -266,6 +289,7 @@ public class StoreQueueCursor extends AbstractPendingMessageCursor {
 
 
 
+    @Override
     public synchronized void gc() {
         if (persistent != null) {
             persistent.gc();
@@ -276,6 +300,7 @@ public class StoreQueueCursor extends AbstractPendingMessageCursor {
         pendingCount = persistent.size() + nonPersistent.size();
     }
 
+    @Override
     public void setSystemUsage(SystemUsage usageManager) {
         super.setSystemUsage(usageManager);
         if (persistent != null) {

http://git-wip-us.apache.org/repos/asf/activemq/blob/734fb7dd/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java
index 811531e..c3f788f 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java
@@ -30,8 +30,8 @@ import org.slf4j.LoggerFactory;
 /**
  * persist pendingCount messages pendingCount message (messages awaiting disptach
  * to a consumer) cursor
- * 
- * 
+ *
+ *
  */
 class TopicStorePrefetch extends AbstractStoreCursor {
     private static final Logger LOG = LoggerFactory.getLogger(TopicStorePrefetch.class);
@@ -59,14 +59,17 @@ class TopicStorePrefetch extends AbstractStoreCursor {
         this.storeHasMessages=this.size > 0;
     }
 
+    @Override
     public boolean recoverMessageReference(MessageId messageReference) throws Exception {
         // shouldn't get called
         throw new RuntimeException("Not supported");
     }
 
+    @Override
     public synchronized void addMessageFirst(MessageReference node) throws Exception {
         batchList.addMessageFirst(node);
         size++;
+        //this.messageSize.addSize(node.getMessage().getSize());
     }
 
     @Override
@@ -88,7 +91,7 @@ class TopicStorePrefetch extends AbstractStoreCursor {
             }
             storeHasMessages = true;
         }
-        return recovered;      
+        return recovered;
     }
 
     @Override
@@ -100,7 +103,18 @@ class TopicStorePrefetch extends AbstractStoreCursor {
             throw new RuntimeException(e);
         }
     }
-    
+
+
+    @Override
+    protected synchronized long getStoreMessageSize() {
+        try {
+            return store.getMessageSize(clientId, subscriberName);
+        } catch (Exception e) {
+            LOG.error("{} Failed to get the outstanding message count from the store", this, e);
+            throw new RuntimeException(e);
+        }
+    }
+
     @Override
     protected synchronized boolean isStoreEmpty() {
         try {
@@ -111,7 +125,7 @@ class TopicStorePrefetch extends AbstractStoreCursor {
         }
     }
 
-            
+
     @Override
     protected void resetBatch() {
         this.store.resetBatching(clientId, subscriberName);

http://git-wip-us.apache.org/repos/asf/activemq/blob/734fb7dd/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/VMPendingMessageCursor.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/VMPendingMessageCursor.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/VMPendingMessageCursor.java
index 15c61df..75be766 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/VMPendingMessageCursor.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/VMPendingMessageCursor.java
@@ -20,6 +20,7 @@ import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
+
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.broker.region.Destination;
 import org.apache.activemq.broker.region.MessageReference;
@@ -28,13 +29,13 @@ import org.apache.activemq.broker.region.QueueMessageReference;
 /**
  * hold pending messages in a linked list (messages awaiting disptach to a
  * consumer) cursor
- * 
- * 
+ *
+ *
  */
 public class VMPendingMessageCursor extends AbstractPendingMessageCursor {
     private final PendingList list;
     private Iterator<MessageReference> iter;
-    
+
     public VMPendingMessageCursor(boolean prioritizedMessages) {
         super(prioritizedMessages);
         if (this.prioritizedMessages) {
@@ -44,7 +45,8 @@ public class VMPendingMessageCursor extends AbstractPendingMessageCursor {
         }
     }
 
-    
+
+    @Override
     public synchronized List<MessageReference> remove(ConnectionContext context, Destination destination)
             throws Exception {
         List<MessageReference> rc = new ArrayList<MessageReference>();
@@ -62,7 +64,8 @@ public class VMPendingMessageCursor extends AbstractPendingMessageCursor {
     /**
      * @return true if there are no pending messages
      */
-    
+
+    @Override
     public synchronized boolean isEmpty() {
         if (list.isEmpty()) {
             return true;
@@ -85,7 +88,8 @@ public class VMPendingMessageCursor extends AbstractPendingMessageCursor {
     /**
      * reset the cursor
      */
-    
+
+    @Override
     public synchronized void reset() {
         iter = list.iterator();
         last = null;
@@ -93,10 +97,11 @@ public class VMPendingMessageCursor extends AbstractPendingMessageCursor {
 
     /**
      * add message to await dispatch
-     * 
+     *
      * @param node
      */
-    
+
+    @Override
     public synchronized boolean addMessageLast(MessageReference node) {
         node.incrementReferenceCount();
         list.addMessageLast(node);
@@ -105,10 +110,11 @@ public class VMPendingMessageCursor extends AbstractPendingMessageCursor {
 
     /**
      * add message to await dispatch
-     * 
+     *
      * @param node
      */
-    
+
+    @Override
     public synchronized void addMessageFirst(MessageReference node) {
         node.incrementReferenceCount();
         list.addMessageFirst(node);
@@ -117,7 +123,8 @@ public class VMPendingMessageCursor extends AbstractPendingMessageCursor {
     /**
      * @return true if there pending messages to dispatch
      */
-    
+
+    @Override
     public synchronized boolean hasNext() {
         return iter.hasNext();
     }
@@ -125,7 +132,8 @@ public class VMPendingMessageCursor extends AbstractPendingMessageCursor {
     /**
      * @return the next pending message
      */
-    
+
+    @Override
     public synchronized MessageReference next() {
         last = iter.next();
         if (last != null) {
@@ -137,7 +145,8 @@ public class VMPendingMessageCursor extends AbstractPendingMessageCursor {
     /**
      * remove the message at the cursor position
      */
-    
+
+    @Override
     public synchronized void remove() {
         if (last != null) {
             last.decrementReferenceCount();
@@ -148,15 +157,22 @@ public class VMPendingMessageCursor extends AbstractPendingMessageCursor {
     /**
      * @return the number of pending messages
      */
-    
+
+    @Override
     public synchronized int size() {
         return list.size();
     }
 
+    @Override
+    public synchronized long messageSize() {
+        return list.messageSize();
+    }
+
     /**
      * clear all pending messages
      */
-    
+
+    @Override
     public synchronized void clear() {
         for (Iterator<MessageReference> i = list.iterator(); i.hasNext();) {
             MessageReference ref = i.next();
@@ -165,7 +181,8 @@ public class VMPendingMessageCursor extends AbstractPendingMessageCursor {
         list.clear();
     }
 
-    
+
+    @Override
     public synchronized void remove(MessageReference node) {
         list.remove(node);
         node.decrementReferenceCount();
@@ -173,11 +190,12 @@ public class VMPendingMessageCursor extends AbstractPendingMessageCursor {
 
     /**
      * Page in a restricted number of messages
-     * 
+     *
      * @param maxItems
      * @return a list of paged in messages
      */
-    
+
+    @Override
     public LinkedList<MessageReference> pageInList(int maxItems) {
         LinkedList<MessageReference> result = new LinkedList<MessageReference>();
         for (Iterator<MessageReference>i = list.iterator();i.hasNext();) {
@@ -191,12 +209,14 @@ public class VMPendingMessageCursor extends AbstractPendingMessageCursor {
         return result;
     }
 
-    
+
+    @Override
     public boolean isTransient() {
         return true;
     }
 
-    
+
+    @Override
     public void destroy() throws Exception {
         super.destroy();
         clear();

http://git-wip-us.apache.org/repos/asf/activemq/blob/734fb7dd/activemq-broker/src/main/java/org/apache/activemq/store/PList.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/store/PList.java b/activemq-broker/src/main/java/org/apache/activemq/store/PList.java
index 7438963..efe29ac 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/store/PList.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/store/PList.java
@@ -41,6 +41,8 @@ public interface PList {
 
     long size();
 
+    long messageSize();
+
     public interface PListIterator extends Iterator<PListEntry> {
         void release();
     }

http://git-wip-us.apache.org/repos/asf/activemq/blob/734fb7dd/activemq-broker/src/main/java/org/apache/activemq/store/ProxyTopicMessageStore.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/store/ProxyTopicMessageStore.java b/activemq-broker/src/main/java/org/apache/activemq/store/ProxyTopicMessageStore.java
index 5c59158..6e79358 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/store/ProxyTopicMessageStore.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/store/ProxyTopicMessageStore.java
@@ -209,6 +209,7 @@ public class ProxyTopicMessageStore implements TopicMessageStore {
         return delegate.isPrioritizedMessages();
     }
 
+    @Override
     public void updateMessage(Message message) throws IOException {
         delegate.updateMessage(message);
     }
@@ -223,4 +224,13 @@ public class ProxyTopicMessageStore implements TopicMessageStore {
         return delegate.getMessageStoreStatistics();
     }
 
+    /* (non-Javadoc)
+     * @see org.apache.activemq.store.TopicMessageStore#getMessageSize(java.lang.String, java.lang.String)
+     */
+    @Override
+    public long getMessageSize(String clientId, String subscriberName)
+            throws IOException {
+        return delegate.getMessageSize(clientId, subscriberName);
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/734fb7dd/activemq-broker/src/main/java/org/apache/activemq/store/TopicMessageStore.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/store/TopicMessageStore.java b/activemq-broker/src/main/java/org/apache/activemq/store/TopicMessageStore.java
index 163b184..a55118f 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/store/TopicMessageStore.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/store/TopicMessageStore.java
@@ -102,6 +102,8 @@ public interface TopicMessageStore extends MessageStore {
      */
     int getMessageCount(String clientId, String subscriberName) throws IOException;
 
+    long getMessageSize(String clientId, String subscriberName) throws IOException;
+
     /**
      * Finds the subscriber entry for the given consumer info
      *

http://git-wip-us.apache.org/repos/asf/activemq/blob/734fb7dd/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryTopicMessageStore.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryTopicMessageStore.java b/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryTopicMessageStore.java
index 76199d7..ae693f1 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryTopicMessageStore.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryTopicMessageStore.java
@@ -146,6 +146,16 @@ public class MemoryTopicMessageStore extends MemoryMessageStore implements Topic
     }
 
     @Override
+    public synchronized long getMessageSize(String clientId, String subscriberName) throws IOException {
+        long result = 0;
+        MemoryTopicSub sub = topicSubMap.get(new SubscriptionKey(clientId, subscriberName));
+        if (sub != null) {
+            result = sub.messageSize();
+        }
+        return result;
+    }
+
+    @Override
     public synchronized void recoverNextMessages(String clientId, String subscriptionName, int maxReturned, MessageRecoveryListener listener) throws Exception {
         MemoryTopicSub sub = this.topicSubMap.get(new SubscriptionKey(clientId, subscriptionName));
         if (sub != null) {

http://git-wip-us.apache.org/repos/asf/activemq/blob/734fb7dd/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryTopicSub.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryTopicSub.java b/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryTopicSub.java
index ec3807e..fc986f2 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryTopicSub.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryTopicSub.java
@@ -26,8 +26,8 @@ import org.apache.activemq.store.MessageRecoveryListener;
 
 /**
  * A holder for a durable subscriber
- * 
- * 
+ *
+ *
  */
 class MemoryTopicSub {
 
@@ -58,9 +58,20 @@ class MemoryTopicSub {
         return map.size();
     }
 
+    synchronized long messageSize() {
+        long messageSize = 0;
+
+        for (Iterator<Entry<MessageId, Message>> iter = map.entrySet().iterator(); iter.hasNext();) {
+            Entry<MessageId, Message> entry = iter.next();
+            messageSize += entry.getValue().getSize();
+        }
+
+        return messageSize;
+    }
+
     synchronized void recoverSubscription(MessageRecoveryListener listener) throws Exception {
-        for (Iterator iter = map.entrySet().iterator(); iter.hasNext();) {
-            Map.Entry entry = (Entry)iter.next();
+        for (Iterator<Entry<MessageId, Message>> iter = map.entrySet().iterator(); iter.hasNext();) {
+            Entry<MessageId, Message> entry = iter.next();
             Object msg = entry.getValue();
             if (msg.getClass() == MessageId.class) {
                 listener.recoverMessageReference((MessageId)msg);
@@ -76,8 +87,8 @@ class MemoryTopicSub {
         // the message table is a synchronizedMap - so just have to synchronize
         // here
         int count = 0;
-        for (Iterator iter = map.entrySet().iterator(); iter.hasNext() && count < maxReturned;) {
-            Map.Entry entry = (Entry)iter.next();
+        for (Iterator<Entry<MessageId, Message>> iter = map.entrySet().iterator(); iter.hasNext() && count < maxReturned;) {
+            Entry<MessageId, Message> entry = iter.next();
             if (pastLackBatch) {
                 count++;
                 Object msg = entry.getValue();

http://git-wip-us.apache.org/repos/asf/activemq/blob/734fb7dd/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java
----------------------------------------------------------------------
diff --git a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java
index a0cb133..3bff9b2 100755
--- a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java
+++ b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java
@@ -44,7 +44,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * 
+ *
  */
 public class JDBCTopicMessageStore extends JDBCMessageStore implements TopicMessageStore {
 
@@ -57,7 +57,8 @@ public class JDBCTopicMessageStore extends JDBCMessageStore implements TopicMess
                PROPERTY_SEQUENCE_ID_CACHE_SIZE, "1000"), 10);
     private final ReentrantReadWriteLock sequenceIdCacheSizeLock = new ReentrantReadWriteLock();
     private Map<MessageId, long[]> sequenceIdCache = new LinkedHashMap<MessageId, long[]>() {
-         protected boolean removeEldestEntry(Map.Entry<MessageId, long[]> eldest) {
+         @Override
+        protected boolean removeEldestEntry(Map.Entry<MessageId, long[]> eldest) {
            return size() > SEQUENCE_ID_CACHE_SIZE;
         }
     };
@@ -67,6 +68,7 @@ public class JDBCTopicMessageStore extends JDBCMessageStore implements TopicMess
         super(persistenceAdapter, adapter, wireFormat, topic, audit);
     }
 
+    @Override
     public void acknowledge(ConnectionContext context, String clientId, String subscriptionName, MessageId messageId, MessageAck ack) throws IOException {
         if (ack != null && ack.isUnmatchedAck()) {
             if (LOG.isTraceEnabled()) {
@@ -110,16 +112,19 @@ public class JDBCTopicMessageStore extends JDBCMessageStore implements TopicMess
     /**
      * @throws Exception
      */
+    @Override
     public void recoverSubscription(String clientId, String subscriptionName, final MessageRecoveryListener listener) throws Exception {
         TransactionContext c = persistenceAdapter.getTransactionContext();
         try {
             adapter.doRecoverSubscription(c, destination, clientId, subscriptionName, new JDBCMessageRecoveryListener() {
+                @Override
                 public boolean recoverMessage(long sequenceId, byte[] data) throws Exception {
                     Message msg = (Message)wireFormat.unmarshal(new ByteSequence(data));
                     msg.getMessageId().setBrokerSequenceId(sequenceId);
                     return listener.recoverMessage(msg);
                 }
 
+                @Override
                 public boolean recoverMessageReference(String reference) throws Exception {
                     return listener.recoverMessageReference(new MessageId(reference));
                 }
@@ -149,16 +154,19 @@ public class JDBCTopicMessageStore extends JDBCMessageStore implements TopicMess
             return perPriority[javax.jms.Message.DEFAULT_PRIORITY];
         }
 
+        @Override
         public String toString() {
             return Arrays.deepToString(perPriority);
         }
 
+        @Override
         public Iterator<LastRecoveredEntry> iterator() {
             return new PriorityIterator();
         }
 
         class PriorityIterator implements Iterator<LastRecoveredEntry> {
             int current = 9;
+            @Override
             public boolean hasNext() {
                 for (int i=current; i>=0; i--) {
                     if (perPriority[i].hasMessages()) {
@@ -169,10 +177,12 @@ public class JDBCTopicMessageStore extends JDBCMessageStore implements TopicMess
                 return false;
             }
 
+            @Override
             public LastRecoveredEntry next() {
                 return perPriority[current];
             }
 
+            @Override
             public void remove() {
                 throw new RuntimeException("not implemented");
             }
@@ -188,6 +198,7 @@ public class JDBCTopicMessageStore extends JDBCMessageStore implements TopicMess
             this.priority = priority;
         }
 
+        @Override
         public String toString() {
             return priority + "-" + stored + ":" + recovered;
         }
@@ -213,6 +224,7 @@ public class JDBCTopicMessageStore extends JDBCMessageStore implements TopicMess
             this.maxMessages = maxMessages;
         }
 
+        @Override
         public boolean recoverMessage(long sequenceId, byte[] data) throws Exception {
             if (delegate.hasSpace() && recoveredCount < maxMessages) {
                 Message msg = (Message) wireFormat.unmarshal(new ByteSequence(data));
@@ -226,6 +238,7 @@ public class JDBCTopicMessageStore extends JDBCMessageStore implements TopicMess
             return false;
         }
 
+        @Override
         public boolean recoverMessageReference(String reference) throws Exception {
             return delegate.recoverMessageReference(new MessageId(reference));
         }
@@ -244,6 +257,7 @@ public class JDBCTopicMessageStore extends JDBCMessageStore implements TopicMess
         }
     }
 
+    @Override
     public synchronized void recoverNextMessages(final String clientId, final String subscriptionName, final int maxReturned, final MessageRecoveryListener listener)
             throws Exception {
         //Duration duration = new Duration("recoverNextMessages");
@@ -253,7 +267,7 @@ public class JDBCTopicMessageStore extends JDBCMessageStore implements TopicMess
         if (!subscriberLastRecoveredMap.containsKey(key)) {
            subscriberLastRecoveredMap.put(key, new LastRecovered());
         }
-        final LastRecovered lastRecovered = subscriberLastRecoveredMap.get(key);        
+        final LastRecovered lastRecovered = subscriberLastRecoveredMap.get(key);
         LastRecoveredAwareListener recoveredAwareListener = new LastRecoveredAwareListener(listener, maxReturned);
         try {
             if (LOG.isTraceEnabled()) {
@@ -293,6 +307,7 @@ public class JDBCTopicMessageStore extends JDBCMessageStore implements TopicMess
         }
     }
 
+    @Override
     public void resetBatching(String clientId, String subscriptionName) {
         String key = getSubscriptionKey(clientId, subscriptionName);
         if (!pendingCompletion.contains(key))  {
@@ -330,6 +345,7 @@ public class JDBCTopicMessageStore extends JDBCMessageStore implements TopicMess
         }
     }
 
+    @Override
     public void addSubscription(SubscriptionInfo subscriptionInfo, boolean retroactive) throws IOException {
         TransactionContext c = persistenceAdapter.getTransactionContext();
         try {
@@ -347,6 +363,7 @@ public class JDBCTopicMessageStore extends JDBCMessageStore implements TopicMess
      * @see org.apache.activemq.store.TopicMessageStore#lookupSubscription(String,
      *      String)
      */
+    @Override
     public SubscriptionInfo lookupSubscription(String clientId, String subscriptionName) throws IOException {
         TransactionContext c = persistenceAdapter.getTransactionContext();
         try {
@@ -359,6 +376,7 @@ public class JDBCTopicMessageStore extends JDBCMessageStore implements TopicMess
         }
     }
 
+    @Override
     public void deleteSubscription(String clientId, String subscriptionName) throws IOException {
         TransactionContext c = persistenceAdapter.getTransactionContext();
         try {
@@ -372,6 +390,7 @@ public class JDBCTopicMessageStore extends JDBCMessageStore implements TopicMess
         }
     }
 
+    @Override
     public SubscriptionInfo[] getAllSubscriptions() throws IOException {
         TransactionContext c = persistenceAdapter.getTransactionContext();
         try {
@@ -384,6 +403,7 @@ public class JDBCTopicMessageStore extends JDBCMessageStore implements TopicMess
         }
     }
 
+    @Override
     public int getMessageCount(String clientId, String subscriberName) throws IOException {
         //Duration duration = new Duration("getMessageCount");
         int result = 0;
@@ -403,6 +423,11 @@ public class JDBCTopicMessageStore extends JDBCMessageStore implements TopicMess
         return result;
     }
 
+    @Override
+    public long getMessageSize(String clientId, String subscriberName) throws IOException {
+        return 0;
+    }
+
     protected String getSubscriptionKey(String clientId, String subscriberName) {
         String result = clientId + ":";
         result += subscriberName != null ? subscriberName : "NOT_SET";

http://git-wip-us.apache.org/repos/asf/activemq/blob/734fb7dd/activemq-jdbc-store/src/main/java/org/apache/activemq/store/journal/JournalTopicMessageStore.java
----------------------------------------------------------------------
diff --git a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/journal/JournalTopicMessageStore.java b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/journal/JournalTopicMessageStore.java
index 51d9693..aa0cb5d 100755
--- a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/journal/JournalTopicMessageStore.java
+++ b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/journal/JournalTopicMessageStore.java
@@ -38,8 +38,8 @@ import org.slf4j.LoggerFactory;
 
 /**
  * A MessageStore that uses a Journal to store it's messages.
- * 
- * 
+ *
+ *
  */
 public class JournalTopicMessageStore extends JournalMessageStore implements TopicMessageStore {
 
@@ -54,12 +54,14 @@ public class JournalTopicMessageStore extends JournalMessageStore implements Top
         this.longTermStore = checkpointStore;
     }
 
+    @Override
     public void recoverSubscription(String clientId, String subscriptionName, MessageRecoveryListener listener)
         throws Exception {
         this.peristenceAdapter.checkpoint(true, true);
         longTermStore.recoverSubscription(clientId, subscriptionName, listener);
     }
 
+    @Override
     public void recoverNextMessages(String clientId, String subscriptionName, int maxReturned,
                                     MessageRecoveryListener listener) throws Exception {
         this.peristenceAdapter.checkpoint(true, true);
@@ -67,21 +69,25 @@ public class JournalTopicMessageStore extends JournalMessageStore implements Top
 
     }
 
+    @Override
     public SubscriptionInfo lookupSubscription(String clientId, String subscriptionName) throws IOException {
         return longTermStore.lookupSubscription(clientId, subscriptionName);
     }
 
+    @Override
     public void addSubscription(SubscriptionInfo subscriptionInfo, boolean retroactive) throws IOException {
         this.peristenceAdapter.checkpoint(true, true);
         longTermStore.addSubscription(subscriptionInfo, retroactive);
     }
 
+    @Override
     public void addMessage(ConnectionContext context, Message message) throws IOException {
         super.addMessage(context, message);
     }
 
     /**
      */
+    @Override
     public void acknowledge(ConnectionContext context, String clientId, String subscriptionName,
                             final MessageId messageId, MessageAck originalAck) throws IOException {
         final boolean debug = LOG.isDebugEnabled();
@@ -111,6 +117,7 @@ public class JournalTopicMessageStore extends JournalMessageStore implements Top
             }
             transactionStore.acknowledge(this, ack, location);
             context.getTransaction().addSynchronization(new Synchronization() {
+                @Override
                 public void afterCommit() throws Exception {
                     if (debug) {
                         LOG.debug("Transacted acknowledge commit for: " + messageId + ", at: " + location);
@@ -121,6 +128,7 @@ public class JournalTopicMessageStore extends JournalMessageStore implements Top
                     }
                 }
 
+                @Override
                 public void afterRollback() throws Exception {
                     if (debug) {
                         LOG.debug("Transacted acknowledge rollback for: " + messageId + ", at: " + location);
@@ -159,6 +167,7 @@ public class JournalTopicMessageStore extends JournalMessageStore implements Top
         }
     }
 
+    @Override
     public RecordLocation checkpoint() throws IOException {
 
         final HashMap<SubscriptionKey, MessageId> cpAckedLastAckLocations;
@@ -170,6 +179,7 @@ public class JournalTopicMessageStore extends JournalMessageStore implements Top
         }
 
         return super.checkpoint(new Callback() {
+            @Override
             public void execute() throws Exception {
 
                 // Checkpoint the acknowledged messages.
@@ -193,19 +203,29 @@ public class JournalTopicMessageStore extends JournalMessageStore implements Top
         return longTermStore;
     }
 
+    @Override
     public void deleteSubscription(String clientId, String subscriptionName) throws IOException {
         longTermStore.deleteSubscription(clientId, subscriptionName);
     }
 
+    @Override
     public SubscriptionInfo[] getAllSubscriptions() throws IOException {
         return longTermStore.getAllSubscriptions();
     }
 
+    @Override
     public int getMessageCount(String clientId, String subscriberName) throws IOException {
         this.peristenceAdapter.checkpoint(true, true);
         return longTermStore.getMessageCount(clientId, subscriberName);
     }
 
+    @Override
+    public long getMessageSize(String clientId, String subscriberName) throws IOException {
+        this.peristenceAdapter.checkpoint(true, true);
+        return longTermStore.getMessageSize(clientId, subscriberName);
+    }
+
+    @Override
     public void resetBatching(String clientId, String subscriptionName) {
         longTermStore.resetBatching(clientId, subscriptionName);
     }

http://git-wip-us.apache.org/repos/asf/activemq/blob/734fb7dd/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
index 84aba07..bd45394 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
@@ -892,6 +892,30 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
             }
         }
 
+
+        @Override
+        public long getMessageSize(String clientId, String subscriptionName) throws IOException {
+            final String subscriptionKey = subscriptionKey(clientId, subscriptionName);
+            indexLock.writeLock().lock();
+            try {
+                return pageFile.tx().execute(new Transaction.CallableClosure<Integer, IOException>() {
+                    @Override
+                    public Integer execute(Transaction tx) throws IOException {
+                        StoredDestination sd = getStoredDestination(dest, tx);
+                        LastAck cursorPos = getLastAck(tx, sd, subscriptionKey);
+                        if (cursorPos == null) {
+                            // The subscription might not exist.
+                            return 0;
+                        }
+
+                        return (int) getStoredMessageSize(tx, sd, subscriptionKey);
+                    }
+                });
+            } finally {
+                indexLock.writeLock().unlock();
+            }
+        }
+
         @Override
         public void recoverSubscription(String clientId, String subscriptionName, final MessageRecoveryListener listener)
                 throws Exception {

http://git-wip-us.apache.org/repos/asf/activemq/blob/734fb7dd/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
index ac767a7..b3fcfaa 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
@@ -2536,6 +2536,32 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
         return 0;
     }
 
+    public long getStoredMessageSize(Transaction tx, StoredDestination sd, String subscriptionKey) throws IOException {
+        SequenceSet messageSequences = sd.ackPositions.get(tx, subscriptionKey);
+        long locationSize = 0;
+        if (messageSequences != null) {
+            Iterator<Long> sequences = messageSequences.iterator();
+
+            while (sequences.hasNext()) {
+                Long sequenceId = sequences.next();
+                //the last item is the next marker
+                if (!sequences.hasNext()) {
+                    break;
+                }
+                Iterator<Entry<Location, Long>> iterator = sd.locationIndex.iterator(tx);
+                while (iterator.hasNext()) {
+                    Entry<Location, Long> entry = iterator.next();
+                    if (entry.getValue() == sequenceId - 1) {
+                        locationSize += entry.getKey().getSize();
+                        break;
+                    }
+
+                }
+            }
+        }
+
+        return locationSize;
+    }
     protected String key(KahaDestination destination) {
         return destination.getType().getNumber() + ":" + destination.getName();
     }

http://git-wip-us.apache.org/repos/asf/activemq/blob/734fb7dd/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/TempKahaDBStore.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/TempKahaDBStore.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/TempKahaDBStore.java
index 04d74b6..920fc53 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/TempKahaDBStore.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/TempKahaDBStore.java
@@ -409,6 +409,11 @@ public class TempKahaDBStore extends TempMessageDatabase implements PersistenceA
         }
 
         @Override
+        public long getMessageSize(String clientId, String subscriptionName) throws IOException {
+            return 0;
+        }
+
+        @Override
         public void recoverSubscription(String clientId, String subscriptionName, final MessageRecoveryListener listener) throws Exception {
             final String subscriptionKey = subscriptionKey(clientId, subscriptionName);
             synchronized(indexMutex) {

http://git-wip-us.apache.org/repos/asf/activemq/blob/734fb7dd/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/index/ListIndex.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/index/ListIndex.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/index/ListIndex.java
index 82379ea..79ac7d1 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/index/ListIndex.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/index/ListIndex.java
@@ -20,6 +20,7 @@ import java.io.IOException;
 import java.lang.ref.WeakReference;
 import java.util.Iterator;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 
@@ -59,6 +60,7 @@ public class ListIndex<Key,Value> implements Index<Key,Value> {
         this(pageFile, page.getPageId());
     }
 
+    @Override
     synchronized public void load(Transaction tx) throws IOException {
         if (loaded.compareAndSet(false, true)) {
             LOG.debug("loading");
@@ -81,15 +83,22 @@ public class ListIndex<Key,Value> implements Index<Key,Value> {
                 ListNode<Key, Value> node = loadNode(tx, getHeadPageId());
                 setTailPageId(getHeadPageId());
                 size.addAndGet(node.size(tx));
+                onLoad(node, tx);
                 while (node.getNext() != NOT_SET ) {
                     node = loadNode(tx, node.getNext());
                     size.addAndGet(node.size(tx));
+                    onLoad(node, tx);
                     setTailPageId(node.getPageId());
                 }
             }
         }
     }
 
+    protected void onLoad(ListNode<Key, Value> node, Transaction tx) {
+
+    }
+
+    @Override
     synchronized public void unload(Transaction tx) {
         if (loaded.compareAndSet(true, false)) {
         }
@@ -103,6 +112,7 @@ public class ListIndex<Key,Value> implements Index<Key,Value> {
         return loadNode(tx, getTailPageId());
     }
 
+    @Override
     synchronized public boolean containsKey(Transaction tx, Key key) throws IOException {
         assertLoaded();
 
@@ -123,6 +133,7 @@ public class ListIndex<Key,Value> implements Index<Key,Value> {
     private Map.Entry<Key, Value> lastGetEntryCache = null;
     private WeakReference<Transaction> lastCacheTxSrc = new WeakReference<Transaction>(null);
 
+    @Override
     @SuppressWarnings({ "rawtypes", "unchecked" })
     synchronized public Value get(Transaction tx, Key key) throws IOException {
         assertLoaded();
@@ -144,6 +155,7 @@ public class ListIndex<Key,Value> implements Index<Key,Value> {
      *
      * @return the old value contained in the list if one exists or null.
      */
+    @Override
     @SuppressWarnings({ "rawtypes" })
     synchronized public Value put(Transaction tx, Key key, Value value) throws IOException {
 
@@ -211,6 +223,7 @@ public class ListIndex<Key,Value> implements Index<Key,Value> {
         return null;
     }
 
+    @Override
     @SuppressWarnings("rawtypes")
     synchronized public Value remove(Transaction tx, Key key) throws IOException {
         assertLoaded();
@@ -252,15 +265,17 @@ public class ListIndex<Key,Value> implements Index<Key,Value> {
         return null;
     }
 
-    public void onRemove() {
+    public void onRemove(Entry<Key, Value> removed) {
         size.decrementAndGet();
         flushCache();
     }
 
+    @Override
     public boolean isTransient() {
         return false;
     }
 
+    @Override
     synchronized public void clear(Transaction tx) throws IOException {
         for (Iterator<ListNode<Key,Value>> iterator = listNodeIterator(tx); iterator.hasNext(); ) {
             ListNode<Key,Value>candidate = iterator.next();
@@ -280,6 +295,7 @@ public class ListIndex<Key,Value> implements Index<Key,Value> {
         return getHead(tx).isEmpty(tx);
     }
 
+    @Override
     synchronized public Iterator<Map.Entry<Key,Value>> iterator(final Transaction tx) throws IOException {
         return getHead(tx).iterator(tx);
     }
@@ -346,6 +362,7 @@ public class ListIndex<Key,Value> implements Index<Key,Value> {
     public Marshaller<Key> getKeyMarshaller() {
         return keyMarshaller;
     }
+    @Override
     public void setKeyMarshaller(Marshaller<Key> keyMarshaller) {
         this.keyMarshaller = keyMarshaller;
     }
@@ -353,6 +370,7 @@ public class ListIndex<Key,Value> implements Index<Key,Value> {
     public Marshaller<Value> getValueMarshaller() {
         return valueMarshaller;
     }
+    @Override
     public void setValueMarshaller(Marshaller<Value> valueMarshaller) {
         this.valueMarshaller = valueMarshaller;
     }


[2/3] activemq git commit: https://issues.apache.org/jira/browse/AMQ-5923

Posted by cs...@apache.org.
http://git-wip-us.apache.org/repos/asf/activemq/blob/734fb7dd/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/index/ListNode.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/index/ListNode.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/index/ListNode.java
index 6911e4f..8fed042 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/index/ListNode.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/index/ListNode.java
@@ -66,14 +66,17 @@ public final class ListNode<Key, Value> {
             this.value = value;
         }
 
+        @Override
         public Key getKey() {
             return key;
         }
 
+        @Override
         public Value getValue() {
             return value;
         }
 
+        @Override
         public Value setValue(Value value) {
             Value oldValue = this.value;
             this.value = value;
@@ -98,10 +101,12 @@ public final class ListNode<Key, Value> {
             index = current.getContainingList();
         }
 
+        @Override
         public boolean hasNext() {
             return nextEntry != null;
         }
 
+        @Override
         public ListNode<Key, Value> next() {
             ListNode<Key, Value> current = nextEntry;
             if (current != null) {
@@ -121,6 +126,7 @@ public final class ListNode<Key, Value> {
             return current;
         }
 
+        @Override
         public void remove() {
             throw new UnsupportedOperationException();
         }
@@ -171,6 +177,7 @@ public final class ListNode<Key, Value> {
             return result;
         }
 
+        @Override
         public boolean hasNext() {
             if (nextEntry == null) {
                 nextEntry = getFromNextNode();
@@ -178,6 +185,7 @@ public final class ListNode<Key, Value> {
             return nextEntry != null;
         }
 
+        @Override
         public Entry<Key, Value> next() {
             if (nextEntry != null) {
                 entryToRemove = nextEntry;
@@ -188,6 +196,7 @@ public final class ListNode<Key, Value> {
             }
         }
 
+        @Override
         public void remove() {
             if (entryToRemove == null) {
                 throw new IllegalStateException("can only remove once, call hasNext();next() again");
@@ -228,7 +237,7 @@ public final class ListNode<Key, Value> {
                         currentNode = previousNode;
                     }
                 }
-                targetList.onRemove();
+                targetList.onRemove(entryToRemove);
 
                 if (toRemoveNode != null) {
                     tx.free(toRemoveNode.getPage());
@@ -262,6 +271,7 @@ public final class ListNode<Key, Value> {
             this.valueMarshaller = valueMarshaller;
         }
 
+        @Override
         public void writePayload(ListNode<Key, Value> node, DataOutput os) throws IOException {
             os.writeLong(node.next);
             short count = (short) node.entries.size(); // cast may truncate
@@ -279,6 +289,7 @@ public final class ListNode<Key, Value> {
             }
         }
 
+        @Override
         @SuppressWarnings({ "unchecked", "rawtypes" })
         public ListNode<Key, Value> readPayload(DataInput is) throws IOException {
             ListNode<Key, Value> node = new ListNode<Key, Value>();

http://git-wip-us.apache.org/repos/asf/activemq/blob/734fb7dd/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/plist/PListImpl.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/plist/PListImpl.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/plist/PListImpl.java
index eafb2ac..b45692a 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/plist/PListImpl.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/plist/PListImpl.java
@@ -21,22 +21,22 @@ import java.io.DataOutput;
 import java.io.IOException;
 import java.util.Iterator;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.NoSuchElementException;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 
-import org.apache.activemq.broker.region.MessageReference;
-import org.apache.activemq.command.Message;
+import org.apache.activemq.management.SizeStatisticImpl;
 import org.apache.activemq.store.PList;
 import org.apache.activemq.store.PListEntry;
 import org.apache.activemq.store.kahadb.disk.index.ListIndex;
+import org.apache.activemq.store.kahadb.disk.index.ListNode;
 import org.apache.activemq.store.kahadb.disk.journal.Location;
 import org.apache.activemq.store.kahadb.disk.page.Transaction;
-import org.apache.activemq.util.ByteSequence;
 import org.apache.activemq.store.kahadb.disk.util.LocationMarshaller;
 import org.apache.activemq.store.kahadb.disk.util.StringMarshaller;
-import org.apache.activemq.wireformat.WireFormat;
+import org.apache.activemq.util.ByteSequence;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -45,6 +45,7 @@ public class PListImpl extends ListIndex<String, Location> implements PList {
     final PListStoreImpl store;
     private String name;
     Object indexLock;
+    private final SizeStatisticImpl messageSize;
 
     PListImpl(PListStoreImpl store) {
         this.store = store;
@@ -52,6 +53,9 @@ public class PListImpl extends ListIndex<String, Location> implements PList {
         setPageFile(store.getPageFile());
         setKeyMarshaller(StringMarshaller.INSTANCE);
         setValueMarshaller(LocationMarshaller.INSTANCE);
+
+        messageSize = new SizeStatisticImpl("messageSize", "The size in bytes of the pending messages");
+        messageSize.setEnabled(true);
     }
 
     public void setName(String name) {
@@ -75,6 +79,7 @@ public class PListImpl extends ListIndex<String, Location> implements PList {
     public synchronized void destroy() throws IOException {
         synchronized (indexLock) {
             this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
+                @Override
                 public void execute(Transaction tx) throws IOException {
                     clear(tx);
                     unload(tx);
@@ -100,6 +105,7 @@ public class PListImpl extends ListIndex<String, Location> implements PList {
         final Location location = this.store.write(bs, false);
         synchronized (indexLock) {
             this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
+                @Override
                 public void execute(Transaction tx) throws IOException {
                     add(tx, id, location);
                 }
@@ -113,6 +119,7 @@ public class PListImpl extends ListIndex<String, Location> implements PList {
         final Location location = this.store.write(bs, false);
         synchronized (indexLock) {
             this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
+                @Override
                 public void execute(Transaction tx) throws IOException {
                     addFirst(tx, id, location);
                 }
@@ -133,6 +140,7 @@ public class PListImpl extends ListIndex<String, Location> implements PList {
         final AtomicBoolean result = new AtomicBoolean();
         synchronized (indexLock) {
             this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
+                @Override
                 public void execute(Transaction tx) throws IOException {
                     result.set(remove(tx, id) != null);
                 }
@@ -145,6 +153,7 @@ public class PListImpl extends ListIndex<String, Location> implements PList {
         final AtomicBoolean result = new AtomicBoolean();
         synchronized (indexLock) {
             this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
+                @Override
                 public void execute(Transaction tx) throws IOException {
                     Iterator<Map.Entry<String, Location>> iterator = iterator(tx, position);
                     if (iterator.hasNext()) {
@@ -165,6 +174,7 @@ public class PListImpl extends ListIndex<String, Location> implements PList {
         final AtomicReference<Map.Entry<String, Location>> ref = new AtomicReference<Map.Entry<String, Location>>();
         synchronized (indexLock) {
             this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
+                @Override
                 public void execute(Transaction tx) throws IOException {
                     Iterator<Map.Entry<String, Location>> iterator = iterator(tx, position);
                     ref.set(iterator.next());
@@ -183,6 +193,7 @@ public class PListImpl extends ListIndex<String, Location> implements PList {
         final AtomicReference<Map.Entry<String, Location>> ref = new AtomicReference<Map.Entry<String, Location>>();
         synchronized (indexLock) {
             this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
+                @Override
                 public void execute(Transaction tx) throws IOException {
                     ref.set(getFirst(tx));
                 }
@@ -200,6 +211,7 @@ public class PListImpl extends ListIndex<String, Location> implements PList {
         final AtomicReference<Map.Entry<String, Location>> ref = new AtomicReference<Map.Entry<String, Location>>();
         synchronized (indexLock) {
             this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
+                @Override
                 public void execute(Transaction tx) throws IOException {
                     ref.set(getLast(tx));
                 }
@@ -270,6 +282,7 @@ public class PListImpl extends ListIndex<String, Location> implements PList {
             }
         }
 
+        @Override
         public void release() {
             try {
                 tx.rollback();
@@ -285,6 +298,7 @@ public class PListImpl extends ListIndex<String, Location> implements PList {
         synchronized (indexLock) {
             if (loaded.get()) {
                 this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
+                    @Override
                     public void execute(Transaction tx) throws IOException {
                         Iterator<Map.Entry<String,Location>> iterator = iterator(tx);
                         while (iterator.hasNext()) {
@@ -298,6 +312,51 @@ public class PListImpl extends ListIndex<String, Location> implements PList {
     }
 
     @Override
+    public long messageSize() {
+        return messageSize.getTotalSize();
+    }
+
+    @Override
+    public synchronized Location add(Transaction tx, String key, Location value)
+            throws IOException {
+        Location location = super.add(tx, key, value);
+        messageSize.addSize(value.getSize());
+        return location;
+    }
+
+    @Override
+    public synchronized Location addFirst(Transaction tx, String key,
+            Location value) throws IOException {
+        Location location = super.addFirst(tx, key, value);
+        messageSize.addSize(value.getSize());
+        return location;
+    }
+
+    @Override
+    public synchronized void clear(Transaction tx) throws IOException {
+        messageSize.reset();
+        super.clear(tx);
+    }
+
+    @Override
+    protected synchronized void onLoad(ListNode<String, Location> node, Transaction tx) {
+        try {
+            Iterator<Entry<String, Location>> i = node.iterator(tx);
+            while (i.hasNext()) {
+                messageSize.addSize(i.next().getValue().getSize());
+            }
+        } catch (IOException e) {
+            LOG.warn("could not increment message size", e);
+        }
+    }
+
+    @Override
+    public void onRemove(Entry<String, Location> removed) {
+        super.onRemove(removed);
+        messageSize.addSize(-removed.getValue().getSize());
+    }
+
+    @Override
     public String toString() {
         return name + "[headPageId=" + getHeadPageId()  + ",tailPageId=" + getTailPageId() + ", size=" + size() + "]";
     }

http://git-wip-us.apache.org/repos/asf/activemq/blob/734fb7dd/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala
----------------------------------------------------------------------
diff --git a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala
index 7c2d327..a4cdcac 100644
--- a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala
+++ b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala
@@ -1008,6 +1008,11 @@ class LevelDBStore extends LockableServiceSupport with BrokerServiceAware with P
         case None => 0
       }
     }
+    
+    def getMessageSize(clientId: String, subscriptionName: String): Long = {
+      check_running
+      return 0
+    }
 
   }
   class LevelDBPList(val name: String, val key: Long) extends PList {
@@ -1066,6 +1071,7 @@ class LevelDBStore extends LockableServiceSupport with BrokerServiceAware with P
 
     def isEmpty = size()==0
     def size(): Long = listSize.get()
+    def messageSize(): Long = 0
 
     def iterator() = new PListIterator() {
       check_running

http://git-wip-us.apache.org/repos/asf/activemq/blob/734fb7dd/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/QueueDuplicatesFromStoreTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/QueueDuplicatesFromStoreTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/QueueDuplicatesFromStoreTest.java
index 99382d0..6cef709 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/QueueDuplicatesFromStoreTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/QueueDuplicatesFromStoreTest.java
@@ -228,6 +228,11 @@ public class QueueDuplicatesFromStoreTest extends TestCase {
             }
 
             @Override
+            public long getPendingMessageSize() {
+                return 0;
+            }
+
+            @Override
             public int getPrefetchSize() {
                 return 0;
             }
@@ -354,10 +359,12 @@ public class QueueDuplicatesFromStoreTest extends TestCase {
                 return 0;
             }
 
+            @Override
             public void incrementConsumedCount(){
 
             }
 
+            @Override
             public void resetConsumedCount(){
 
             }

http://git-wip-us.apache.org/repos/asf/activemq/blob/734fb7dd/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/SubscriptionAddRemoveQueueTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/SubscriptionAddRemoveQueueTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/SubscriptionAddRemoveQueueTest.java
index 2541a64..207ecda 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/SubscriptionAddRemoveQueueTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/SubscriptionAddRemoveQueueTest.java
@@ -102,6 +102,7 @@ public class SubscriptionAddRemoveQueueTest extends TestCase {
     public void testNoDispatchToRemovedConsumers() throws Exception {
         final AtomicInteger producerId = new AtomicInteger();
         Runnable sender = new Runnable() {
+            @Override
             public void run() {
                 AtomicInteger id = new AtomicInteger();
                 int producerIdAndIncrement = producerId.getAndIncrement();
@@ -120,6 +121,7 @@ public class SubscriptionAddRemoveQueueTest extends TestCase {
         };
 
         Runnable subRemover = new Runnable() {
+            @Override
             public void run() {
                 for (Subscription sub : subs) {
                     try {
@@ -177,10 +179,12 @@ public class SubscriptionAddRemoveQueueTest extends TestCase {
         List<MessageReference> dispatched =
                 Collections.synchronizedList(new ArrayList<MessageReference>());
 
+        @Override
         public void acknowledge(ConnectionContext context, MessageAck ack)
                 throws Exception {
         }
 
+        @Override
         public void add(MessageReference node) throws Exception {
             // immediate dispatch
             QueueMessageReference  qmr = (QueueMessageReference)node;
@@ -188,6 +192,7 @@ public class SubscriptionAddRemoveQueueTest extends TestCase {
             dispatched.add(qmr);
         }
 
+        @Override
         public ConnectionContext getContext() {
             return null;
         }
@@ -228,76 +233,100 @@ public class SubscriptionAddRemoveQueueTest extends TestCase {
         public void resetConsumedCount() {
         }
 
+        @Override
         public void add(ConnectionContext context, Destination destination)
                 throws Exception {
         }
 
+        @Override
         public void destroy() {
         }
 
+        @Override
         public void gc() {
         }
 
+        @Override
         public ConsumerInfo getConsumerInfo() {
             return info;
         }
 
+        @Override
         public long getDequeueCounter() {
             return 0;
         }
 
+        @Override
         public long getDispatchedCounter() {
             return 0;
         }
 
+        @Override
         public int getDispatchedQueueSize() {
             return 0;
         }
 
+        @Override
         public long getEnqueueCounter() {
             return 0;
         }
 
+        @Override
         public int getInFlightSize() {
             return 0;
         }
 
+        @Override
         public int getInFlightUsage() {
             return 0;
         }
 
+        @Override
         public ObjectName getObjectName() {
             return null;
         }
 
+        @Override
         public int getPendingQueueSize() {
             return 0;
         }
 
+        @Override
+        public long getPendingMessageSize() {
+            return 0;
+        }
+
+        @Override
         public int getPrefetchSize() {
             return 0;
         }
 
+        @Override
         public String getSelector() {
             return null;
         }
 
+        @Override
         public boolean isBrowser() {
             return false;
         }
 
+        @Override
         public boolean isFull() {
             return false;
         }
 
+        @Override
         public boolean isHighWaterMark() {
             return false;
         }
 
+        @Override
         public boolean isLowWaterMark() {
             return false;
         }
 
+        @Override
         public boolean isRecoveryRequired() {
             return false;
         }
@@ -306,19 +335,23 @@ public class SubscriptionAddRemoveQueueTest extends TestCase {
             return false;
         }
 
+        @Override
         public boolean matches(MessageReference node,
                 MessageEvaluationContext context) throws IOException {
             return true;
         }
 
+        @Override
         public boolean matches(ActiveMQDestination destination) {
             return false;
         }
 
+        @Override
         public void processMessageDispatchNotification(
                 MessageDispatchNotification mdn) throws Exception {
         }
 
+        @Override
         public Response pullMessage(ConnectionContext context, MessagePull pull)
                 throws Exception {
             return null;
@@ -329,34 +362,42 @@ public class SubscriptionAddRemoveQueueTest extends TestCase {
             return false;
         }
 
+        @Override
         public List<MessageReference> remove(ConnectionContext context,
                 Destination destination) throws Exception {
             return new ArrayList<MessageReference>(dispatched);
         }
 
+        @Override
         public void setObjectName(ObjectName objectName) {
         }
 
+        @Override
         public void setSelector(String selector)
                 throws InvalidSelectorException, UnsupportedOperationException {
         }
 
+        @Override
         public void updateConsumerPrefetch(int newPrefetch) {
         }
 
+        @Override
         public boolean addRecoveredMessage(ConnectionContext context,
                 MessageReference message) throws Exception {
             return false;
         }
 
+        @Override
         public ActiveMQDestination getActiveMQDestination() {
             return null;
         }
 
+        @Override
         public int getLockPriority() {
             return 0;
         }
 
+        @Override
         public boolean isLockExclusive() {
             return false;
         }
@@ -367,6 +408,7 @@ public class SubscriptionAddRemoveQueueTest extends TestCase {
         public void removeDestination(Destination destination) {
         }
 
+        @Override
         public int countBeforeFull() {
             return 10;
         }

http://git-wip-us.apache.org/repos/asf/activemq/blob/734fb7dd/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursorTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursorTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursorTest.java
new file mode 100644
index 0000000..5d0a82c
--- /dev/null
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursorTest.java
@@ -0,0 +1,547 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.region.cursors;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.concurrent.atomic.AtomicLong;
+
+import javax.jms.Connection;
+import javax.jms.DeliveryMode;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.Topic;
+import javax.jms.TopicSession;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.TransportConnector;
+import org.apache.activemq.broker.region.Destination;
+import org.apache.activemq.broker.region.DurableTopicSubscription;
+import org.apache.activemq.broker.region.TopicSubscription;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.store.AbstractStoreStatTestSupport;
+import org.apache.activemq.store.MessageStore;
+import org.apache.activemq.util.SubscriptionKey;
+import org.apache.activemq.util.Wait;
+import org.apache.activemq.util.Wait.Condition;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This test checks that KahaDB properly sets the new storeMessageSize statistic.
+ *
+ * AMQ-5748
+ *
+ */
+public abstract class AbstractPendingMessageCursorTest extends AbstractStoreStatTestSupport {
+    protected static final Logger LOG = LoggerFactory
+            .getLogger(AbstractPendingMessageCursorTest.class);
+
+
+    protected BrokerService broker;
+    protected URI brokerConnectURI;
+    protected String defaultQueueName = "test.queue";
+    protected String defaultTopicName = "test.topic";
+    protected static int maxMessageSize = 1000;
+
+    @Before
+    public void startBroker() throws Exception {
+        setUpBroker(true);
+    }
+
+    protected void setUpBroker(boolean clearDataDir) throws Exception {
+
+        broker = new BrokerService();
+        this.initPersistence(broker);
+        //set up a transport
+        TransportConnector connector = broker
+                .addConnector(new TransportConnector());
+        connector.setUri(new URI("tcp://0.0.0.0:0"));
+        connector.setName("tcp");
+
+        PolicyEntry policy = new PolicyEntry();
+        policy.setTopicPrefetch(100);
+        policy.setDurableTopicPrefetch(100);
+        PolicyMap pMap = new PolicyMap();
+        pMap.setDefaultEntry(policy);
+        broker.setDestinationPolicy(pMap);
+
+        broker.start();
+        broker.waitUntilStarted();
+        brokerConnectURI = broker.getConnectorByName("tcp").getConnectUri();
+
+    }
+
+    @After
+    public void stopBroker() throws Exception {
+        broker.stop();
+        broker.waitUntilStopped();
+    }
+
+    @Override
+    protected BrokerService getBroker() {
+        return this.broker;
+    }
+
+    @Override
+    protected URI getBrokerConnectURI() {
+        return this.brokerConnectURI;
+    }
+
+    protected abstract void initPersistence(BrokerService brokerService) throws IOException;
+
+    @Test
+    public void testQueueMessageSize() throws Exception {
+        AtomicLong publishedMessageSize = new AtomicLong();
+
+        org.apache.activemq.broker.region.Queue dest = publishTestQueueMessages(200, publishedMessageSize);
+        verifyPendingStats(dest, 200, publishedMessageSize.get());
+        verifyStoreStats(dest, 200, publishedMessageSize.get());
+    }
+
+    @Test
+    public void testQueueBrowserMessageSize() throws Exception {
+        AtomicLong publishedMessageSize = new AtomicLong();
+
+        org.apache.activemq.broker.region.Queue dest = publishTestQueueMessages(200, publishedMessageSize);
+        browseTestQueueMessages(dest.getName());
+        verifyPendingStats(dest, 200, publishedMessageSize.get());
+        verifyStoreStats(dest, 200, publishedMessageSize.get());
+    }
+
+    @Test
+    public void testQueueMessageSizeNonPersistent() throws Exception {
+        AtomicLong publishedMessageSize = new AtomicLong();
+
+        org.apache.activemq.broker.region.Queue dest = publishTestQueueMessages(200,
+                DeliveryMode.NON_PERSISTENT, publishedMessageSize);
+        verifyPendingStats(dest, 200, publishedMessageSize.get());
+    }
+
+    @Test
+    public void testQueueMessageSizePersistentAndNonPersistent() throws Exception {
+        AtomicLong publishedNonPersistentMessageSize = new AtomicLong();
+        AtomicLong publishedMessageSize = new AtomicLong();
+
+        org.apache.activemq.broker.region.Queue dest = publishTestQueueMessages(100,
+                DeliveryMode.PERSISTENT, publishedMessageSize);
+        dest = publishTestQueueMessages(100,
+                DeliveryMode.NON_PERSISTENT, publishedNonPersistentMessageSize);
+        verifyPendingStats(dest, 200, publishedMessageSize.get() + publishedNonPersistentMessageSize.get());
+        verifyStoreStats(dest, 100, publishedMessageSize.get());
+    }
+
+    @Test
+    public void testQueueMessageSizeAfterConsumption() throws Exception {
+        AtomicLong publishedMessageSize = new AtomicLong();
+
+        org.apache.activemq.broker.region.Queue dest = publishTestQueueMessages(200, publishedMessageSize);
+        verifyPendingStats(dest, 200, publishedMessageSize.get());
+
+        consumeTestQueueMessages();
+
+        verifyPendingStats(dest, 0, 0);
+        verifyStoreStats(dest, 0, 0);
+    }
+
+    @Test
+    public void testQueueMessageSizeAfterConsumptionNonPersistent() throws Exception {
+        AtomicLong publishedMessageSize = new AtomicLong();
+
+        org.apache.activemq.broker.region.Queue dest = publishTestQueueMessages(200, DeliveryMode.NON_PERSISTENT, publishedMessageSize);
+        verifyPendingStats(dest, 200, publishedMessageSize.get());
+
+        consumeTestQueueMessages();
+
+        verifyPendingStats(dest, 0, 0);
+        verifyStoreStats(dest, 0, 0);
+    }
+
+    @Test(timeout=100000)
+    public void testTopicMessageSize() throws Exception {
+        AtomicLong publishedMessageSize = new AtomicLong();
+
+        Connection connection = new ActiveMQConnectionFactory(brokerConnectURI).createConnection();
+        connection.setClientID("clientId");
+        connection.start();
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        MessageConsumer consumer = session.createConsumer(new ActiveMQTopic(this.defaultTopicName));
+
+        org.apache.activemq.broker.region.Topic dest = publishTestTopicMessages(200, publishedMessageSize);
+
+        //verify the count and size - there is a prefetch of 100 so only 100 are pending and 100
+        //are dispatched because we have an active consumer online
+        //verify that the size is greater than 100 messages times the minimum size of 100
+        verifyPendingStats(dest, 100, 100 * 100);
+
+        //consume all messages
+        consumeTestMessages(consumer, 200);
+
+        //All messages should now be gone
+        verifyPendingStats(dest, 0, 0);
+
+        connection.close();
+    }
+
+    @Test(timeout=100000)
+    public void testTopicNonPersistentMessageSize() throws Exception {
+        AtomicLong publishedMessageSize = new AtomicLong();
+
+        Connection connection = new ActiveMQConnectionFactory(brokerConnectURI).createConnection();
+        connection.setClientID("clientId");
+        connection.start();
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        MessageConsumer consumer = session.createConsumer(new ActiveMQTopic(this.defaultTopicName));
+
+        org.apache.activemq.broker.region.Topic dest = publishTestTopicMessages(200,
+                DeliveryMode.NON_PERSISTENT, publishedMessageSize);
+
+        //verify the count and size - there is a prefetch of 100 so only 100 are pending and 100
+        //are dispatched because we have an active consumer online
+        //verify the size is at least as big as 100 messages times the minimum of 100 size
+        verifyPendingStats(dest, 100, 100 * 100);
+
+        //consume all messages
+        consumeTestMessages(consumer, 200);
+
+        //All messages should now be gone
+        verifyPendingStats(dest, 0, 0);
+
+        connection.close();
+    }
+
+    @Test(timeout=100000)
+    public void testTopicPersistentAndNonPersistentMessageSize() throws Exception {
+        AtomicLong publishedMessageSize = new AtomicLong();
+
+        Connection connection = new ActiveMQConnectionFactory(brokerConnectURI).createConnection();
+        connection.setClientID("clientId");
+        connection.start();
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        MessageConsumer consumer = session.createConsumer(new ActiveMQTopic(this.defaultTopicName));
+
+        org.apache.activemq.broker.region.Topic dest = publishTestTopicMessages(100,
+                DeliveryMode.NON_PERSISTENT, publishedMessageSize);
+
+        dest = publishTestTopicMessages(100, DeliveryMode.PERSISTENT, publishedMessageSize);
+
+        //verify the count and size - there is a prefetch of 100 so only 100 are pending and 100
+        //are dispatched because we have an active consumer online
+      //verify the size is at least as big as 100 messages times the minimum of 100 size
+        verifyPendingStats(dest, 100, 100 * 100);
+
+        //consume all messages
+        consumeTestMessages(consumer, 200);
+
+        //All messages should now be gone
+        verifyPendingStats(dest, 0, 0);
+
+        connection.close();
+    }
+
+    @Test(timeout=10000)
+    public void testMessageSizeOneDurable() throws Exception {
+        AtomicLong publishedMessageSize = new AtomicLong();
+        Connection connection = new ActiveMQConnectionFactory(brokerConnectURI).createConnection();
+        connection.setClientID("clientId");
+        connection.start();
+
+        SubscriptionKey subKey = new SubscriptionKey("clientId", "sub1");
+        org.apache.activemq.broker.region.Topic dest = publishTestMessagesDurable(connection,
+                new String[] {"sub1"}, 200, publishedMessageSize, DeliveryMode.PERSISTENT);
+
+        //verify the count and size - durable is offline so all 200 should be pending since none are in prefetch
+        verifyPendingStats(dest, subKey, 200, publishedMessageSize.get());
+        verifyStoreStats(dest, 200, publishedMessageSize.get());
+
+        //consume all messages
+        consumeDurableTestMessages(connection, "sub1", 200, publishedMessageSize);
+
+        //All messages should now be gone
+        verifyPendingStats(dest, subKey, 0, 0);
+        verifyStoreStats(dest, 0, 0);
+
+        connection.close();
+    }
+
+    @Test(timeout=10000)
+    public void testMessageSizeOneDurablePartialConsumption() throws Exception {
+        AtomicLong publishedMessageSize = new AtomicLong();
+
+        Connection connection = new ActiveMQConnectionFactory(brokerConnectURI).createConnection();
+        connection.setClientID("clientId");
+        connection.start();
+
+        SubscriptionKey subKey = new SubscriptionKey("clientId", "sub1");
+        org.apache.activemq.broker.region.Topic dest = publishTestMessagesDurable(
+                connection, new String[] {"sub1"}, 200, publishedMessageSize, DeliveryMode.PERSISTENT);
+
+        //verify the count and size - durable is offline so all 200 should be pending since none are in prefetch
+        verifyPendingStats(dest, subKey, 200, publishedMessageSize.get());
+        verifyStoreStats(dest, 200, publishedMessageSize.get());
+
+        //consume all messages
+        consumeDurableTestMessages(connection, "sub1", 50, publishedMessageSize);
+
+        //150 should be left
+        verifyPendingStats(dest, subKey, 150, publishedMessageSize.get());
+        verifyStoreStats(dest, 150, publishedMessageSize.get());
+
+        connection.close();
+    }
+
+    @Test(timeout=10000)
+    public void testMessageSizeTwoDurables() throws Exception {
+        AtomicLong publishedMessageSize = new AtomicLong();
+
+        Connection connection = new ActiveMQConnectionFactory(brokerConnectURI).createConnection();
+        connection.setClientID("clientId");
+        connection.start();
+
+        org.apache.activemq.broker.region.Topic dest =
+                publishTestMessagesDurable(connection, new String[] {"sub1", "sub2"}, 200,
+                        publishedMessageSize, DeliveryMode.PERSISTENT);
+
+        //verify the count and size
+        SubscriptionKey subKey = new SubscriptionKey("clientId", "sub1");
+        verifyPendingStats(dest, subKey, 200, publishedMessageSize.get());
+
+        //consume messages just for sub1
+        consumeDurableTestMessages(connection, "sub1", 200, publishedMessageSize);
+
+        //There is still a durable that hasn't consumed so the messages should exist
+        SubscriptionKey subKey2 = new SubscriptionKey("clientId", "sub2");
+        verifyPendingStats(dest, subKey, 0, 0);
+        verifyPendingStats(dest, subKey2, 200, publishedMessageSize.get());
+        verifyStoreStats(dest, 200, publishedMessageSize.get());
+
+        connection.stop();
+    }
+
+
+    protected void verifyPendingStats(final org.apache.activemq.broker.region.Queue queue,
+            final int count, final long minimumSize) throws Exception {
+        this.verifyPendingStats(queue, count, minimumSize, count, minimumSize);
+    }
+
+    protected void verifyPendingStats(final org.apache.activemq.broker.region.Queue queue,
+            final int count, final long minimumSize, final int storeCount, final long minimumStoreSize) throws Exception {
+
+        Wait.waitFor(new Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                return queue.getPendingMessageCount() == count;
+            }
+        });
+
+        verifySize(count, new MessageSizeCalculator() {
+            @Override
+            public long getMessageSize() throws Exception {
+                return queue.getPendingMessageSize();
+            }
+        }, minimumSize);
+    }
+
+    //For a non-durable there won't necessarily be a message store
+    protected void verifyPendingStats(org.apache.activemq.broker.region.Topic topic,
+            final int count, final long minimumSize) throws Exception {
+
+        final TopicSubscription sub = (TopicSubscription) topic.getConsumers().get(0);
+
+        Wait.waitFor(new Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                return sub.getPendingQueueSize() == count;
+            }
+        });
+
+        verifySize(count, new MessageSizeCalculator() {
+            @Override
+            public long getMessageSize() throws Exception {
+                return sub.getPendingMessageSize();
+            }
+        }, minimumSize);
+    }
+
+    protected void verifyPendingStats(org.apache.activemq.broker.region.Topic topic, SubscriptionKey subKey,
+            final int count, final long minimumSize) throws Exception {
+
+        final DurableTopicSubscription sub = topic.getDurableTopicSubs().get(subKey);
+
+        //verify message count
+        Wait.waitFor(new Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                return sub.getPendingQueueSize() == count;
+            }
+        });
+
+        //verify message size
+        verifySize(count, new MessageSizeCalculator() {
+            @Override
+            public long getMessageSize() throws Exception {
+                return sub.getPendingMessageSize();
+            }
+        }, minimumSize);
+    }
+
+    protected void verifyStoreStats(org.apache.activemq.broker.region.Destination dest,
+            final int storeCount, final long minimumStoreSize) throws Exception {
+        final MessageStore messageStore = dest.getMessageStore();
+
+        Wait.waitFor(new Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                return messageStore.getMessageCount() == storeCount;
+            }
+        });
+        verifySize(storeCount, new MessageSizeCalculator() {
+            @Override
+            public long getMessageSize() throws Exception {
+                return messageStore.getMessageSize();
+            }
+        }, minimumStoreSize);
+
+    }
+
+
+    protected void verifySize(final int count, final MessageSizeCalculator messageSizeCalculator,
+            final long minimumSize) throws Exception {
+        if (count > 0) {
+            Wait.waitFor(new Condition() {
+                @Override
+                public boolean isSatisified() throws Exception {
+                    return messageSizeCalculator.getMessageSize() > minimumSize ;
+                }
+            });
+        } else {
+            Wait.waitFor(new Condition() {
+                @Override
+                public boolean isSatisified() throws Exception {
+                    return messageSizeCalculator.getMessageSize() == 0;
+                }
+            });
+        }
+    }
+
+    protected static interface MessageSizeCalculator {
+        long getMessageSize() throws Exception;
+    }
+
+
+    protected Destination consumeTestMessages(MessageConsumer consumer, int size) throws Exception {
+        return consumeTestMessages(consumer, size, defaultTopicName);
+    }
+
+
+    protected Destination consumeTestMessages(MessageConsumer consumer, int size, String topicName) throws Exception {
+        // create a new queue
+        final ActiveMQDestination activeMqTopic = new ActiveMQTopic(
+                topicName);
+
+        Destination dest = broker.getDestination(activeMqTopic);
+
+        //Topic topic = session.createTopic(topicName);
+
+        try {
+            for (int i = 0; i < size; i++) {
+                consumer.receive();
+            }
+
+        } finally {
+            //session.close();
+        }
+
+        return dest;
+    }
+
+    protected Destination consumeDurableTestMessages(Connection connection, String sub, int size, AtomicLong publishedMessageSize) throws Exception {
+        return consumeDurableTestMessages(connection, sub, size, defaultTopicName, publishedMessageSize);
+    }
+
+    protected org.apache.activemq.broker.region.Topic publishTestMessagesDurable(Connection connection,
+            String[] subNames, int publishSize, AtomicLong publishedMessageSize, int deliveryMode) throws Exception {
+
+        return publishTestMessagesDurable(connection, subNames, defaultTopicName,
+                publishSize, 0, AbstractStoreStatTestSupport.defaultMessageSize,
+                publishedMessageSize, false, deliveryMode);
+    }
+
+    protected org.apache.activemq.broker.region.Topic publishTestTopicMessages(int publishSize,
+            AtomicLong publishedMessageSize) throws Exception {
+        return publishTestTopicMessages(publishSize, DeliveryMode.PERSISTENT, publishedMessageSize);
+    }
+
+    protected org.apache.activemq.broker.region.Topic publishTestTopicMessages(int publishSize,
+            int deliveryMode, AtomicLong publishedMessageSize) throws Exception {
+        // create a new queue
+        Connection connection = new ActiveMQConnectionFactory(brokerConnectURI).createConnection();
+        connection.setClientID("clientId2");
+        connection.start();
+
+        final ActiveMQDestination activeMqTopic = new ActiveMQTopic(
+                defaultTopicName);
+
+        org.apache.activemq.broker.region.Topic dest =
+                (org.apache.activemq.broker.region.Topic) broker.getDestination(activeMqTopic);
+
+        // Start the connection
+        Session session = connection.createSession(false,
+                TopicSession.AUTO_ACKNOWLEDGE);
+        Topic topic = session.createTopic(defaultTopicName);
+
+        try {
+            // publish a bunch of non-persistent messages to fill up the temp
+            // store
+            MessageProducer prod = session.createProducer(topic);
+            prod.setDeliveryMode(deliveryMode);
+            for (int i = 0; i < publishSize; i++) {
+                prod.send(createMessage(session, AbstractPendingMessageCursorTest.maxMessageSize, publishedMessageSize));
+            }
+
+        } finally {
+            connection.close();
+        }
+
+        return dest;
+    }
+
+    protected org.apache.activemq.broker.region.Queue publishTestQueueMessages(int count,
+            AtomicLong publishedMessageSize) throws Exception {
+        return publishTestQueueMessages(count, defaultQueueName, DeliveryMode.PERSISTENT,
+                AbstractPendingMessageCursorTest.maxMessageSize, publishedMessageSize);
+    }
+
+    protected org.apache.activemq.broker.region.Queue publishTestQueueMessages(int count, int deliveryMode,
+            AtomicLong publishedMessageSize) throws Exception {
+        return publishTestQueueMessages(count, defaultQueueName, deliveryMode,
+                AbstractPendingMessageCursorTest.maxMessageSize, publishedMessageSize);
+    }
+
+    protected Destination consumeTestQueueMessages() throws Exception {
+        return consumeTestQueueMessages(defaultQueueName);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/734fb7dd/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/KahaDBPendingMessageCursorTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/KahaDBPendingMessageCursorTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/KahaDBPendingMessageCursorTest.java
new file mode 100644
index 0000000..557c70e
--- /dev/null
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/KahaDBPendingMessageCursorTest.java
@@ -0,0 +1,126 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.region.cursors;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicLong;
+
+import javax.jms.Connection;
+import javax.jms.DeliveryMode;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.region.Topic;
+import org.apache.activemq.util.SubscriptionKey;
+import org.apache.commons.io.FileUtils;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This test checks that pending message metrics work properly with KahaDB
+ *
+ * AMQ-5923
+ *
+ */
+public class KahaDBPendingMessageCursorTest extends
+        AbstractPendingMessageCursorTest {
+    protected static final Logger LOG = LoggerFactory
+            .getLogger(KahaDBPendingMessageCursorTest.class);
+
+    @Rule
+    public TemporaryFolder dataFileDir = new TemporaryFolder(new File("target"));
+
+    @Override
+    protected void setUpBroker(boolean clearDataDir) throws Exception {
+        if (clearDataDir && dataFileDir.getRoot().exists())
+            FileUtils.cleanDirectory(dataFileDir.getRoot());
+        super.setUpBroker(clearDataDir);
+    }
+
+    @Override
+    protected void initPersistence(BrokerService brokerService)
+            throws IOException {
+        broker.setPersistent(true);
+        broker.setDataDirectoryFile(dataFileDir.getRoot());
+    }
+
+    /**
+     * Test that the the counter restores size and works after restart and more
+     * messages are published
+     *
+     * @throws Exception
+     */
+    @Test(timeout=10000)
+    public void testDurableMessageSizeAfterRestartAndPublish() throws Exception {
+        AtomicLong publishedMessageSize = new AtomicLong();
+
+        Connection connection = new ActiveMQConnectionFactory(brokerConnectURI).createConnection();
+        connection.setClientID("clientId");
+        connection.start();
+        Topic topic =  publishTestMessagesDurable(connection, new String[] {"sub1"}, 200,
+                publishedMessageSize, DeliveryMode.PERSISTENT);
+
+        SubscriptionKey subKey = new SubscriptionKey("clientId", "sub1");
+
+        // verify the count and size
+        verifyPendingStats(topic, subKey, 200, publishedMessageSize.get());
+        verifyStoreStats(topic, 200, publishedMessageSize.get());
+
+        // stop, restart broker and publish more messages
+        stopBroker();
+        this.setUpBroker(false);
+
+        connection = new ActiveMQConnectionFactory(brokerConnectURI).createConnection();
+        connection.setClientID("clientId");
+        connection.start();
+
+        topic = publishTestMessagesDurable(connection, new String[] {"sub1"}, 200,
+                publishedMessageSize, DeliveryMode.PERSISTENT);
+
+        // verify the count and size
+        verifyPendingStats(topic, subKey, 400, publishedMessageSize.get());
+        verifyStoreStats(topic, 400, publishedMessageSize.get());
+
+    }
+
+    /**
+     * Test that the the counter restores size and works after restart and more
+     * messages are published
+     *
+     * @throws Exception
+     */
+    @Test(timeout=10000)
+    public void testNonPersistentDurableMessageSize() throws Exception {
+        AtomicLong publishedMessageSize = new AtomicLong();
+
+        Connection connection = new ActiveMQConnectionFactory(brokerConnectURI).createConnection();
+        connection.setClientID("clientId");
+        connection.start();
+        Topic topic =  publishTestMessagesDurable(connection, new String[] {"sub1"}, 200,
+                publishedMessageSize, DeliveryMode.NON_PERSISTENT);
+
+        SubscriptionKey subKey = new SubscriptionKey("clientId", "sub1");
+
+        // verify the count and size
+        verifyPendingStats(topic, subKey, 200, publishedMessageSize.get());
+        verifyStoreStats(topic, 0, 0);
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/734fb7dd/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/MemoryPendingMessageCursorTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/MemoryPendingMessageCursorTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/MemoryPendingMessageCursorTest.java
new file mode 100644
index 0000000..948193d
--- /dev/null
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/MemoryPendingMessageCursorTest.java
@@ -0,0 +1,145 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.region.cursors;
+
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicLong;
+
+import javax.jms.Connection;
+import javax.jms.DeliveryMode;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.store.memory.MemoryPersistenceAdapter;
+import org.apache.activemq.util.SubscriptionKey;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This test checks that PendingMessageCursor size statistics work with the MemoryPersistentAdapter
+ *
+ * AMQ-5748
+ *
+ */
+public class MemoryPendingMessageCursorTest extends AbstractPendingMessageCursorTest {
+    protected static final Logger LOG = LoggerFactory
+            .getLogger(MemoryPendingMessageCursorTest.class);
+
+    @Override
+    protected void initPersistence(BrokerService brokerService) throws IOException {
+        broker.setPersistent(false);
+        broker.setPersistenceAdapter(new MemoryPersistenceAdapter());
+    }
+
+
+    @Override
+    @Test(timeout=10000)
+    public void testMessageSizeOneDurable() throws Exception {
+        AtomicLong publishedMessageSize = new AtomicLong();
+        Connection connection = new ActiveMQConnectionFactory(brokerConnectURI).createConnection();
+        connection.setClientID("clientId");
+        connection.start();
+
+        SubscriptionKey subKey = new SubscriptionKey("clientId", "sub1");
+        org.apache.activemq.broker.region.Topic dest =
+                publishTestMessagesDurable(connection, new String[] {"sub1"},
+                        200, publishedMessageSize, DeliveryMode.PERSISTENT);
+
+        verifyPendingStats(dest, subKey, 200, publishedMessageSize.get());
+
+        //The expected value is only 100 because for durables a LRUCache is being used
+        //with a max size of 100
+        verifyStoreStats(dest, 100, publishedMessageSize.get());
+
+        //consume 100 messages
+        consumeDurableTestMessages(connection, "sub1", 100, publishedMessageSize);
+
+        //100 should be left
+        verifyPendingStats(dest, subKey, 100, publishedMessageSize.get());
+        verifyStoreStats(dest, 100, publishedMessageSize.get());
+
+        connection.close();
+    }
+
+    @Override
+    @Test(timeout=10000)
+    public void testMessageSizeTwoDurables() throws Exception {
+        AtomicLong publishedMessageSize = new AtomicLong();
+
+        Connection connection = new ActiveMQConnectionFactory(brokerConnectURI).createConnection();
+        connection.setClientID("clientId");
+        connection.start();
+
+        org.apache.activemq.broker.region.Topic dest =
+                publishTestMessagesDurable(connection, new String[] {"sub1", "sub2"},
+                        200, publishedMessageSize, DeliveryMode.PERSISTENT);
+
+        //verify the count and size
+        SubscriptionKey subKey = new SubscriptionKey("clientId", "sub1");
+        verifyPendingStats(dest, subKey, 200, publishedMessageSize.get());
+
+        //consume messages just for sub1
+        consumeDurableTestMessages(connection, "sub1", 200, publishedMessageSize);
+
+        //There is still a durable that hasn't consumed so the messages should exist
+        SubscriptionKey subKey2 = new SubscriptionKey("clientId", "sub2");
+        verifyPendingStats(dest, subKey, 0, 0);
+        verifyPendingStats(dest, subKey2, 200, publishedMessageSize.get());
+
+        //The expected value is only 100 because for durables a LRUCache is being used
+        //with a max size of 100
+        verifyStoreStats(dest, 100, publishedMessageSize.get());
+
+        connection.stop();
+    }
+
+    @Override
+    @Test(timeout=10000)
+    public void testMessageSizeOneDurablePartialConsumption() throws Exception {
+        AtomicLong publishedMessageSize = new AtomicLong();
+
+        Connection connection = new ActiveMQConnectionFactory(brokerConnectURI).createConnection();
+        connection.setClientID("clientId");
+        connection.start();
+
+        SubscriptionKey subKey = new SubscriptionKey("clientId", "sub1");
+        org.apache.activemq.broker.region.Topic dest = publishTestMessagesDurable(connection,
+                new String[] {"sub1"}, 200, publishedMessageSize, DeliveryMode.PERSISTENT);
+
+        //verify the count and size - durable is offline so all 200 should be pending since none are in prefetch
+        verifyPendingStats(dest, subKey, 200, publishedMessageSize.get());
+
+        //The expected value is only 100 because for durables a LRUCache is being used
+        //with a max size of 100
+        verifyStoreStats(dest, 100, publishedMessageSize.get());
+
+        //consume all messages
+        consumeDurableTestMessages(connection, "sub1", 50, publishedMessageSize);
+
+        //All messages should now be gone
+        verifyPendingStats(dest, subKey, 150, publishedMessageSize.get());
+
+        //The expected value is only 100 because for durables a LRUCache is being used
+        //with a max size of 100
+       //verify the size is at least as big as 100 messages times the minimum of 100 size
+        verifyStoreStats(dest, 100, 100 * 100);
+
+        connection.close();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/734fb7dd/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/MultiKahaDBPendingMessageCursorTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/MultiKahaDBPendingMessageCursorTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/MultiKahaDBPendingMessageCursorTest.java
new file mode 100644
index 0000000..9768980
--- /dev/null
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/MultiKahaDBPendingMessageCursorTest.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.region.cursors;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.store.kahadb.FilteredKahaDBPersistenceAdapter;
+import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
+import org.apache.activemq.store.kahadb.MultiKahaDBPersistenceAdapter;
+
+/**
+ * This test checks that pending message metrics work properly with MultiKahaDB
+ *
+ * AMQ-5923
+ *
+ */
+public class MultiKahaDBPendingMessageCursorTest extends
+    KahaDBPendingMessageCursorTest {
+
+    @Override
+    protected void initPersistence(BrokerService brokerService)
+            throws IOException {
+        broker.setPersistent(true);
+
+        //setup multi-kaha adapter
+        MultiKahaDBPersistenceAdapter persistenceAdapter = new MultiKahaDBPersistenceAdapter();
+        persistenceAdapter.setDirectory(dataFileDir.getRoot());
+
+        KahaDBPersistenceAdapter kahaStore = new KahaDBPersistenceAdapter();
+        kahaStore.setJournalMaxFileLength(1024 * 512);
+
+        //set up a store per destination
+        FilteredKahaDBPersistenceAdapter filtered = new FilteredKahaDBPersistenceAdapter();
+        filtered.setPersistenceAdapter(kahaStore);
+        filtered.setPerDestination(true);
+        List<FilteredKahaDBPersistenceAdapter> stores = new ArrayList<>();
+        stores.add(filtered);
+
+        persistenceAdapter.setFilteredPersistenceAdapters(stores);
+        broker.setPersistenceAdapter(persistenceAdapter);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/734fb7dd/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/OrderPendingListTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/OrderPendingListTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/OrderPendingListTest.java
index 79d7e6c..6a9dd6b 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/OrderPendingListTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/OrderPendingListTest.java
@@ -309,6 +309,16 @@ public class OrderPendingListTest {
         }
 
         @Override
+        public long messageSize() {
+            long size = 0;
+            Iterator<MessageReference> i = theList.iterator();
+            while (i.hasNext()) {
+                size += i.next().getMessage().getSize();
+            }
+            return size;
+        }
+
+        @Override
         public Iterator<MessageReference> iterator() {
             return theList.iterator();
         }

http://git-wip-us.apache.org/repos/asf/activemq/blob/734fb7dd/activemq-unit-tests/src/test/java/org/apache/activemq/store/AbstractMessageStoreSizeStatTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/store/AbstractMessageStoreSizeStatTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/store/AbstractMessageStoreSizeStatTest.java
index 944d183..116500e 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/store/AbstractMessageStoreSizeStatTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/store/AbstractMessageStoreSizeStatTest.java
@@ -16,38 +16,19 @@
  */
 package org.apache.activemq.store;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 
 import java.io.IOException;
 import java.net.URI;
-import java.util.Random;
+import java.util.concurrent.atomic.AtomicLong;
 
-import javax.jms.BytesMessage;
 import javax.jms.Connection;
 import javax.jms.DeliveryMode;
-import javax.jms.JMSException;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.Queue;
-import javax.jms.QueueSession;
-import javax.jms.Session;
-import javax.jms.Topic;
-import javax.jms.TopicSession;
-import javax.jms.TopicSubscriber;
-import javax.management.ObjectName;
-import javax.management.openmbean.CompositeData;
-import javax.management.openmbean.TabularData;
 
 import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.broker.TransportConnector;
-import org.apache.activemq.broker.jmx.DurableSubscriptionViewMBean;
 import org.apache.activemq.broker.region.Destination;
-import org.apache.activemq.command.ActiveMQDestination;
-import org.apache.activemq.command.ActiveMQQueue;
-import org.apache.activemq.command.ActiveMQTopic;
 import org.apache.activemq.util.Wait;
 import org.apache.activemq.util.Wait.Condition;
 import org.junit.After;
@@ -62,7 +43,7 @@ import org.slf4j.LoggerFactory;
  * AMQ-5748
  *
  */
-public abstract class AbstractMessageStoreSizeStatTest {
+public abstract class AbstractMessageStoreSizeStatTest extends AbstractStoreStatTestSupport {
     protected static final Logger LOG = LoggerFactory
             .getLogger(AbstractMessageStoreSizeStatTest.class);
 
@@ -71,7 +52,6 @@ public abstract class AbstractMessageStoreSizeStatTest {
     protected URI brokerConnectURI;
     protected String defaultQueueName = "test.queue";
     protected String defaultTopicName = "test.topic";
-    protected static int messageSize = 1000;
 
     @Before
     public void startBroker() throws Exception {
@@ -100,39 +80,52 @@ public abstract class AbstractMessageStoreSizeStatTest {
         broker.waitUntilStopped();
     }
 
+    @Override
+    protected BrokerService getBroker() {
+        return this.broker;
+    }
+
+    @Override
+    protected URI getBrokerConnectURI() {
+        return this.brokerConnectURI;
+    }
+
     protected abstract void initPersistence(BrokerService brokerService) throws IOException;
 
-    @Test
+    @Test(timeout=10000)
     public void testMessageSize() throws Exception {
-        Destination dest = publishTestQueueMessages(200);
-        verifyStats(dest, 200, 200 * messageSize);
+        AtomicLong publishedMessageSize = new AtomicLong();
+
+        Destination dest = publishTestQueueMessages(200, publishedMessageSize);
+        verifyStats(dest, 200, publishedMessageSize.get());
     }
 
-    @Test
+    @Test(timeout=10000)
     public void testMessageSizeAfterConsumption() throws Exception {
+        AtomicLong publishedMessageSize = new AtomicLong();
 
-        Destination dest = publishTestQueueMessages(200);
-        verifyStats(dest, 200, 200 * messageSize);
+        Destination dest = publishTestQueueMessages(200, publishedMessageSize);
+        verifyStats(dest, 200, publishedMessageSize.get());
 
         consumeTestQueueMessages();
 
         verifyStats(dest, 0, 0);
     }
 
-    @Test
+    @Test(timeout=10000)
     public void testMessageSizeOneDurable() throws Exception {
-
+        AtomicLong publishedMessageSize = new AtomicLong();
         Connection connection = new ActiveMQConnectionFactory(brokerConnectURI).createConnection();
         connection.setClientID("clientId");
         connection.start();
 
-        Destination dest = publishTestMessagesDurable(connection, new String[] {"sub1"}, 200, 200);
+        Destination dest = publishTestMessagesDurable(connection, new String[] {"sub1"}, 200, 200, publishedMessageSize);
 
         //verify the count and size
-        verifyStats(dest, 200, 200 * messageSize);
+        verifyStats(dest, 200, publishedMessageSize.get());
 
         //consume all messages
-        consumeDurableTestMessages(connection, "sub1", 200);
+        consumeDurableTestMessages(connection, "sub1", 200, publishedMessageSize);
 
         //All messages should now be gone
         verifyStats(dest, 0, 0);
@@ -142,21 +135,21 @@ public abstract class AbstractMessageStoreSizeStatTest {
 
     @Test(timeout=10000)
     public void testMessageSizeTwoDurables() throws Exception {
-
+        AtomicLong publishedMessageSize = new AtomicLong();
         Connection connection = new ActiveMQConnectionFactory(brokerConnectURI).createConnection();
         connection.setClientID("clientId");
         connection.start();
 
-        Destination dest = publishTestMessagesDurable(connection, new String[] {"sub1", "sub2"}, 200, 200);
+        Destination dest = publishTestMessagesDurable(connection, new String[] {"sub1", "sub2"}, 200, 200, publishedMessageSize);
 
         //verify the count and size
-        verifyStats(dest, 200, 200 * messageSize);
+        verifyStats(dest, 200, publishedMessageSize.get());
 
         //consume messages just for sub1
-        consumeDurableTestMessages(connection, "sub1", 200);
+        consumeDurableTestMessages(connection, "sub1", 200, publishedMessageSize);
 
         //There is still a durable that hasn't consumed so the messages should exist
-        verifyStats(dest, 200, 200 * messageSize);
+        verifyStats(dest, 200, publishedMessageSize.get());
 
         connection.stop();
 
@@ -164,14 +157,24 @@ public abstract class AbstractMessageStoreSizeStatTest {
 
     @Test
     public void testMessageSizeAfterDestinationDeletion() throws Exception {
-        Destination dest = publishTestQueueMessages(200);
-        verifyStats(dest, 200, 200 * messageSize);
+        AtomicLong publishedMessageSize = new AtomicLong();
+        Destination dest = publishTestQueueMessages(200, publishedMessageSize);
+        verifyStats(dest, 200, publishedMessageSize.get());
 
         //check that the size is 0 after deletion
         broker.removeDestination(dest.getActiveMQDestination());
         verifyStats(dest, 0, 0);
     }
 
+    @Test
+    public void testQueueBrowserMessageSize() throws Exception {
+        AtomicLong publishedMessageSize = new AtomicLong();
+
+        Destination dest = publishTestQueueMessages(200, publishedMessageSize);
+        browseTestQueueMessages(dest.getName());
+        verifyStats(dest, 200, publishedMessageSize.get());
+    }
+
     protected void verifyStats(Destination dest, final int count, final long minimumSize) throws Exception {
         final MessageStore messageStore = dest.getMessageStore();
         final MessageStoreStatistics storeStats = dest.getMessageStore().getMessageStoreStatistics();
@@ -203,164 +206,31 @@ public abstract class AbstractMessageStoreSizeStatTest {
         }
     }
 
-    /**
-     * Generate random 1 megabyte messages
-     * @param session
-     * @return
-     * @throws JMSException
-     */
-    protected BytesMessage createMessage(Session session) throws JMSException {
-        final BytesMessage message = session.createBytesMessage();
-        final byte[] data = new byte[messageSize];
-        final Random rng = new Random();
-        rng.nextBytes(data);
-        message.writeBytes(data);
-        return message;
-    }
 
-
-    protected Destination publishTestQueueMessages(int count) throws Exception {
-        return publishTestQueueMessages(count, defaultQueueName);
+    protected Destination publishTestQueueMessages(int count, AtomicLong publishedMessageSize) throws Exception {
+        return publishTestQueueMessages(count, defaultQueueName, DeliveryMode.PERSISTENT,
+                AbstractStoreStatTestSupport.defaultMessageSize, publishedMessageSize);
     }
 
-    protected Destination publishTestQueueMessages(int count, String queueName) throws Exception {
-        // create a new queue
-        final ActiveMQDestination activeMqQueue = new ActiveMQQueue(
-                queueName);
-
-        Destination dest = broker.getDestination(activeMqQueue);
-
-        // Start the connection
-        Connection connection = new ActiveMQConnectionFactory(brokerConnectURI)
-        .createConnection();
-        connection.setClientID("clientId" + queueName);
-        connection.start();
-        Session session = connection.createSession(false,
-                QueueSession.AUTO_ACKNOWLEDGE);
-        Queue queue = session.createQueue(queueName);
-
-        try {
-            // publish a bunch of non-persistent messages to fill up the temp
-            // store
-            MessageProducer prod = session.createProducer(queue);
-            prod.setDeliveryMode(DeliveryMode.PERSISTENT);
-            for (int i = 0; i < count; i++) {
-                prod.send(createMessage(session));
-            }
-
-        } finally {
-            connection.close();
-        }
-
-        return dest;
+    protected Destination publishTestQueueMessages(int count, String queueName, AtomicLong publishedMessageSize) throws Exception {
+        return publishTestQueueMessages(count, queueName, DeliveryMode.PERSISTENT,
+                AbstractStoreStatTestSupport.defaultMessageSize, publishedMessageSize);
     }
 
     protected Destination consumeTestQueueMessages() throws Exception {
         return consumeTestQueueMessages(defaultQueueName);
     }
 
-    protected Destination consumeDurableTestMessages(Connection connection, String sub, int size) throws Exception {
-        return consumeDurableTestMessages(connection, sub, size, defaultTopicName);
+    protected Destination consumeDurableTestMessages(Connection connection, String sub, int size,
+            AtomicLong publishedMessageSize) throws Exception {
+        return consumeDurableTestMessages(connection, sub, size, defaultTopicName, publishedMessageSize);
     }
 
-    protected Destination consumeTestQueueMessages(String queueName) throws Exception {
-        // create a new queue
-        final ActiveMQDestination activeMqQueue = new ActiveMQQueue(
-                queueName);
-
-        Destination dest = broker.getDestination(activeMqQueue);
-
-        // Start the connection
-        Connection connection = new ActiveMQConnectionFactory(brokerConnectURI)
-        .createConnection();
-        connection.setClientID("clientId2" + queueName);
-        connection.start();
-        Session session = connection.createSession(false,
-                QueueSession.AUTO_ACKNOWLEDGE);
-        Queue queue = session.createQueue(queueName);
-
-        try {
-            MessageConsumer consumer = session.createConsumer(queue);
-            for (int i = 0; i < 200; i++) {
-                consumer.receive();
-            }
-
-        } finally {
-            connection.stop();
-        }
-
-        return dest;
-    }
-
-    protected Destination consumeDurableTestMessages(Connection connection, String sub, int size, String topicName) throws Exception {
-        // create a new queue
-        final ActiveMQDestination activeMqTopic = new ActiveMQTopic(
-                topicName);
-
-        Destination dest = broker.getDestination(activeMqTopic);
-
-        Session session = connection.createSession(false,
-                QueueSession.AUTO_ACKNOWLEDGE);
-        Topic topic = session.createTopic(topicName);
-
-        try {
-            TopicSubscriber consumer = session.createDurableSubscriber(topic, sub);
-            for (int i = 0; i < size; i++) {
-                consumer.receive();
-            }
-
-        } finally {
-            session.close();
-        }
-
-        return dest;
-    }
-
-    protected Destination publishTestMessagesDurable(Connection connection, String[] subNames, int publishSize, int expectedSize) throws Exception {
-        // create a new queue
-        final ActiveMQDestination activeMqTopic = new ActiveMQTopic(
-                defaultTopicName);
-
-        Destination dest = broker.getDestination(activeMqTopic);
-
-        // Start the connection
-
-        Session session = connection.createSession(false,
-                TopicSession.AUTO_ACKNOWLEDGE);
-        Topic topic = session.createTopic(defaultTopicName);
-        for (String subName : subNames) {
-            session.createDurableSubscriber(topic, subName);
-        }
-
-        // browse the durable sub - this test is to verify that browsing (which calls createTopicMessageStore)
-        //in KahaDBStore will not create a brand new store (ie uses the cache) If the cache is not used,
-        //then the statistics won't be updated properly because a new store would overwrite the old store
-        //which is still in use
-        ObjectName[] subs = broker.getAdminView().getDurableTopicSubscribers();
-
-        try {
-            // publish a bunch of non-persistent messages to fill up the temp
-            // store
-            MessageProducer prod = session.createProducer(topic);
-            prod.setDeliveryMode(DeliveryMode.PERSISTENT);
-            for (int i = 0; i < publishSize; i++) {
-                prod.send(createMessage(session));
-            }
-
-            //verify the view has expected messages
-            assertEquals(subNames.length, subs.length);
-            ObjectName subName = subs[0];
-            DurableSubscriptionViewMBean sub = (DurableSubscriptionViewMBean)
-                    broker.getManagementContext().newProxyInstance(subName, DurableSubscriptionViewMBean.class, true);
-            CompositeData[] data  = sub.browse();
-            assertNotNull(data);
-            assertEquals(expectedSize, data.length);
-
-        } finally {
-            session.close();
-        }
-
-        return dest;
+    protected Destination publishTestMessagesDurable(Connection connection, String[] subNames,
+            int publishSize, int expectedSize, AtomicLong publishedMessageSize) throws Exception {
+       return publishTestMessagesDurable(connection, subNames, defaultTopicName,
+                publishSize, expectedSize, AbstractStoreStatTestSupport.defaultMessageSize,
+                publishedMessageSize, true);
     }
 
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/734fb7dd/activemq-unit-tests/src/test/java/org/apache/activemq/store/AbstractStoreStatTestSupport.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/store/AbstractStoreStatTestSupport.java b/activemq-unit-tests/src/test/java/org/apache/activemq/store/AbstractStoreStatTestSupport.java
new file mode 100644
index 0000000..3f0e7c1
--- /dev/null
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/store/AbstractStoreStatTestSupport.java
@@ -0,0 +1,268 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.store;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+import java.net.URI;
+import java.util.Enumeration;
+import java.util.Random;
+import java.util.concurrent.atomic.AtomicLong;
+
+import javax.jms.BytesMessage;
+import javax.jms.Connection;
+import javax.jms.DeliveryMode;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.QueueBrowser;
+import javax.jms.QueueSession;
+import javax.jms.Session;
+import javax.jms.Topic;
+import javax.jms.TopicSession;
+import javax.jms.TopicSubscriber;
+import javax.management.ObjectName;
+import javax.management.openmbean.CompositeData;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.jmx.DurableSubscriptionViewMBean;
+import org.apache.activemq.broker.region.Destination;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQMessage;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ActiveMQTopic;
+
+/**
+ *
+ *
+ */
+public abstract class AbstractStoreStatTestSupport {
+
+    protected static int defaultMessageSize = 1000;
+
+    protected abstract BrokerService getBroker();
+
+    protected abstract URI getBrokerConnectURI();
+
+    protected Destination consumeTestQueueMessages(String queueName) throws Exception {
+        // create a new queue
+        final ActiveMQDestination activeMqQueue = new ActiveMQQueue(
+                queueName);
+
+        Destination dest = getBroker().getDestination(activeMqQueue);
+
+        // Start the connection
+        Connection connection = new ActiveMQConnectionFactory(getBrokerConnectURI())
+        .createConnection();
+        connection.setClientID("clientId2" + queueName);
+        connection.start();
+        Session session = connection.createSession(false,
+                QueueSession.AUTO_ACKNOWLEDGE);
+        Queue queue = session.createQueue(queueName);
+
+        try {
+            MessageConsumer consumer = session.createConsumer(queue);
+            for (int i = 0; i < 200; i++) {
+                consumer.receive();
+            }
+
+        } finally {
+            connection.stop();
+        }
+
+        return dest;
+    }
+
+    protected Destination browseTestQueueMessages(String queueName) throws Exception {
+        // create a new queue
+        final ActiveMQDestination activeMqQueue = new ActiveMQQueue(
+                queueName);
+
+        Destination dest = getBroker().getDestination(activeMqQueue);
+
+        // Start the connection
+        Connection connection = new ActiveMQConnectionFactory(getBrokerConnectURI())
+        .createConnection();
+        connection.setClientID("clientId2" + queueName);
+        connection.start();
+        Session session = connection.createSession(false,
+                QueueSession.AUTO_ACKNOWLEDGE);
+        Queue queue = session.createQueue(queueName);
+
+        try {
+            QueueBrowser queueBrowser = session.createBrowser(queue);
+            @SuppressWarnings("unchecked")
+            Enumeration<Message> messages = queueBrowser.getEnumeration();
+            while (messages.hasMoreElements()) {
+                messages.nextElement();
+            }
+
+        } finally {
+            connection.stop();
+        }
+
+        return dest;
+    }
+
+    protected Destination consumeDurableTestMessages(Connection connection, String sub,
+            int size, String topicName, AtomicLong publishedMessageSize) throws Exception {
+        // create a new queue
+        final ActiveMQDestination activeMqTopic = new ActiveMQTopic(
+                topicName);
+
+        Destination dest = getBroker().getDestination(activeMqTopic);
+
+        Session session = connection.createSession(false,
+                QueueSession.AUTO_ACKNOWLEDGE);
+        Topic topic = session.createTopic(topicName);
+
+        try {
+            TopicSubscriber consumer = session.createDurableSubscriber(topic, sub);
+            for (int i = 0; i < size; i++) {
+                ActiveMQMessage message = (ActiveMQMessage) consumer.receive();
+                if (publishedMessageSize != null) {
+                    publishedMessageSize.addAndGet(-message.getSize());
+                }
+            }
+
+        } finally {
+            session.close();
+        }
+
+        return dest;
+    }
+
+    protected org.apache.activemq.broker.region.Queue publishTestQueueMessages(int count, String queueName,
+            int deliveryMode, int messageSize, AtomicLong publishedMessageSize) throws Exception {
+        // create a new queue
+        final ActiveMQDestination activeMqQueue = new ActiveMQQueue(
+                queueName);
+
+        Destination dest = getBroker().getDestination(activeMqQueue);
+
+        // Start the connection
+        Connection connection = new ActiveMQConnectionFactory(getBrokerConnectURI())
+        .createConnection();
+        connection.setClientID("clientId" + queueName);
+        connection.start();
+        Session session = connection.createSession(false,
+                QueueSession.AUTO_ACKNOWLEDGE);
+        Queue queue = session.createQueue(queueName);
+
+        try {
+            // publish a bunch of non-persistent messages to fill up the temp
+            // store
+            MessageProducer prod = session.createProducer(queue);
+            prod.setDeliveryMode(deliveryMode);
+            for (int i = 0; i < count; i++) {
+                prod.send(createMessage(session, messageSize, publishedMessageSize));
+            }
+
+        } finally {
+            connection.close();
+        }
+
+        return (org.apache.activemq.broker.region.Queue) dest;
+    }
+
+    protected org.apache.activemq.broker.region.Topic publishTestMessagesDurable(Connection connection, String[] subNames, String topicName,
+            int publishSize, int expectedSize, int messageSize, AtomicLong publishedMessageSize,
+            boolean verifyBrowsing) throws Exception {
+        return this.publishTestMessagesDurable(connection, subNames, topicName, publishSize, expectedSize, messageSize,
+                publishedMessageSize, verifyBrowsing, DeliveryMode.PERSISTENT);
+    }
+
+    protected org.apache.activemq.broker.region.Topic publishTestMessagesDurable(Connection connection, String[] subNames, String topicName,
+            int publishSize, int expectedSize, int messageSize, AtomicLong publishedMessageSize,
+            boolean verifyBrowsing, int deliveryMode) throws Exception {
+        // create a new queue
+        final ActiveMQDestination activeMqTopic = new ActiveMQTopic(
+                topicName);
+
+        Destination dest = getBroker().getDestination(activeMqTopic);
+
+        // Start the connection
+
+        Session session = connection.createSession(false,
+                TopicSession.AUTO_ACKNOWLEDGE);
+        Topic topic = session.createTopic(topicName);
+        for (String subName : subNames) {
+            session.createDurableSubscriber(topic, subName);
+        }
+
+        ObjectName[] subs = null;
+        if (verifyBrowsing) {
+            // browse the durable sub - this test is to verify that browsing (which calls createTopicMessageStore)
+            //in KahaDBStore will not create a brand new store (ie uses the cache) If the cache is not used,
+            //then the statistics won't be updated properly because a new store would overwrite the old store
+            //which is still in use
+            subs = getBroker().getAdminView().getDurableTopicSubscribers();
+        }
+
+        try {
+            // publish a bunch of non-persistent messages to fill up the temp
+            // store
+            MessageProducer prod = session.createProducer(topic);
+            prod.setDeliveryMode(deliveryMode);
+            for (int i = 0; i < publishSize; i++) {
+                prod.send(createMessage(session, messageSize, publishedMessageSize));
+            }
+
+            //verify the view has expected messages
+            if (verifyBrowsing) {
+                assertNotNull(subs);
+                assertEquals(subNames.length, subs.length);
+                ObjectName subName = subs[0];
+                DurableSubscriptionViewMBean sub = (DurableSubscriptionViewMBean)
+                        getBroker().getManagementContext().newProxyInstance(subName, DurableSubscriptionViewMBean.class, true);
+                CompositeData[] data  = sub.browse();
+                assertNotNull(data);
+                assertEquals(expectedSize, data.length);
+            }
+
+        } finally {
+            session.close();
+        }
+
+        return (org.apache.activemq.broker.region.Topic) dest;
+    }
+
+    /**
+     * Generate random messages between 100 bytes and messageSize
+     * @param session
+     * @return
+     * @throws JMSException
+     */
+    protected BytesMessage createMessage(Session session, int messageSize, AtomicLong publishedMessageSize) throws JMSException {
+        final BytesMessage message = session.createBytesMessage();
+        final Random rn = new Random();
+        int size = rn.nextInt(messageSize - 100);
+        if (publishedMessageSize != null) {
+            publishedMessageSize.addAndGet(size);
+        }
+
+        final byte[] data = new byte[size];
+        final Random rng = new Random();
+        rng.nextBytes(data);
+        message.writeBytes(data);
+        return message;
+    }
+}