You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cs...@apache.org on 2015/09/09 20:12:48 UTC

[2/3] activemq git commit: https://issues.apache.org/jira/browse/AMQ-5923

http://git-wip-us.apache.org/repos/asf/activemq/blob/734fb7dd/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/index/ListNode.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/index/ListNode.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/index/ListNode.java
index 6911e4f..8fed042 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/index/ListNode.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/index/ListNode.java
@@ -66,14 +66,17 @@ public final class ListNode<Key, Value> {
             this.value = value;
         }
 
+        @Override
         public Key getKey() {
             return key;
         }
 
+        @Override
         public Value getValue() {
             return value;
         }
 
+        @Override
         public Value setValue(Value value) {
             Value oldValue = this.value;
             this.value = value;
@@ -98,10 +101,12 @@ public final class ListNode<Key, Value> {
             index = current.getContainingList();
         }
 
+        @Override
         public boolean hasNext() {
             return nextEntry != null;
         }
 
+        @Override
         public ListNode<Key, Value> next() {
             ListNode<Key, Value> current = nextEntry;
             if (current != null) {
@@ -121,6 +126,7 @@ public final class ListNode<Key, Value> {
             return current;
         }
 
+        @Override
         public void remove() {
             throw new UnsupportedOperationException();
         }
@@ -171,6 +177,7 @@ public final class ListNode<Key, Value> {
             return result;
         }
 
+        @Override
         public boolean hasNext() {
             if (nextEntry == null) {
                 nextEntry = getFromNextNode();
@@ -178,6 +185,7 @@ public final class ListNode<Key, Value> {
             return nextEntry != null;
         }
 
+        @Override
         public Entry<Key, Value> next() {
             if (nextEntry != null) {
                 entryToRemove = nextEntry;
@@ -188,6 +196,7 @@ public final class ListNode<Key, Value> {
             }
         }
 
+        @Override
         public void remove() {
             if (entryToRemove == null) {
                 throw new IllegalStateException("can only remove once, call hasNext();next() again");
@@ -228,7 +237,7 @@ public final class ListNode<Key, Value> {
                         currentNode = previousNode;
                     }
                 }
-                targetList.onRemove();
+                targetList.onRemove(entryToRemove);
 
                 if (toRemoveNode != null) {
                     tx.free(toRemoveNode.getPage());
@@ -262,6 +271,7 @@ public final class ListNode<Key, Value> {
             this.valueMarshaller = valueMarshaller;
         }
 
+        @Override
         public void writePayload(ListNode<Key, Value> node, DataOutput os) throws IOException {
             os.writeLong(node.next);
             short count = (short) node.entries.size(); // cast may truncate
@@ -279,6 +289,7 @@ public final class ListNode<Key, Value> {
             }
         }
 
+        @Override
         @SuppressWarnings({ "unchecked", "rawtypes" })
         public ListNode<Key, Value> readPayload(DataInput is) throws IOException {
             ListNode<Key, Value> node = new ListNode<Key, Value>();

http://git-wip-us.apache.org/repos/asf/activemq/blob/734fb7dd/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/plist/PListImpl.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/plist/PListImpl.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/plist/PListImpl.java
index eafb2ac..b45692a 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/plist/PListImpl.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/plist/PListImpl.java
@@ -21,22 +21,22 @@ import java.io.DataOutput;
 import java.io.IOException;
 import java.util.Iterator;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.NoSuchElementException;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 
-import org.apache.activemq.broker.region.MessageReference;
-import org.apache.activemq.command.Message;
+import org.apache.activemq.management.SizeStatisticImpl;
 import org.apache.activemq.store.PList;
 import org.apache.activemq.store.PListEntry;
 import org.apache.activemq.store.kahadb.disk.index.ListIndex;
+import org.apache.activemq.store.kahadb.disk.index.ListNode;
 import org.apache.activemq.store.kahadb.disk.journal.Location;
 import org.apache.activemq.store.kahadb.disk.page.Transaction;
-import org.apache.activemq.util.ByteSequence;
 import org.apache.activemq.store.kahadb.disk.util.LocationMarshaller;
 import org.apache.activemq.store.kahadb.disk.util.StringMarshaller;
-import org.apache.activemq.wireformat.WireFormat;
+import org.apache.activemq.util.ByteSequence;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -45,6 +45,7 @@ public class PListImpl extends ListIndex<String, Location> implements PList {
     final PListStoreImpl store;
     private String name;
     Object indexLock;
+    private final SizeStatisticImpl messageSize;
 
     PListImpl(PListStoreImpl store) {
         this.store = store;
@@ -52,6 +53,9 @@ public class PListImpl extends ListIndex<String, Location> implements PList {
         setPageFile(store.getPageFile());
         setKeyMarshaller(StringMarshaller.INSTANCE);
         setValueMarshaller(LocationMarshaller.INSTANCE);
+
+        messageSize = new SizeStatisticImpl("messageSize", "The size in bytes of the pending messages");
+        messageSize.setEnabled(true);
     }
 
     public void setName(String name) {
@@ -75,6 +79,7 @@ public class PListImpl extends ListIndex<String, Location> implements PList {
     public synchronized void destroy() throws IOException {
         synchronized (indexLock) {
             this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
+                @Override
                 public void execute(Transaction tx) throws IOException {
                     clear(tx);
                     unload(tx);
@@ -100,6 +105,7 @@ public class PListImpl extends ListIndex<String, Location> implements PList {
         final Location location = this.store.write(bs, false);
         synchronized (indexLock) {
             this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
+                @Override
                 public void execute(Transaction tx) throws IOException {
                     add(tx, id, location);
                 }
@@ -113,6 +119,7 @@ public class PListImpl extends ListIndex<String, Location> implements PList {
         final Location location = this.store.write(bs, false);
         synchronized (indexLock) {
             this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
+                @Override
                 public void execute(Transaction tx) throws IOException {
                     addFirst(tx, id, location);
                 }
@@ -133,6 +140,7 @@ public class PListImpl extends ListIndex<String, Location> implements PList {
         final AtomicBoolean result = new AtomicBoolean();
         synchronized (indexLock) {
             this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
+                @Override
                 public void execute(Transaction tx) throws IOException {
                     result.set(remove(tx, id) != null);
                 }
@@ -145,6 +153,7 @@ public class PListImpl extends ListIndex<String, Location> implements PList {
         final AtomicBoolean result = new AtomicBoolean();
         synchronized (indexLock) {
             this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
+                @Override
                 public void execute(Transaction tx) throws IOException {
                     Iterator<Map.Entry<String, Location>> iterator = iterator(tx, position);
                     if (iterator.hasNext()) {
@@ -165,6 +174,7 @@ public class PListImpl extends ListIndex<String, Location> implements PList {
         final AtomicReference<Map.Entry<String, Location>> ref = new AtomicReference<Map.Entry<String, Location>>();
         synchronized (indexLock) {
             this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
+                @Override
                 public void execute(Transaction tx) throws IOException {
                     Iterator<Map.Entry<String, Location>> iterator = iterator(tx, position);
                     ref.set(iterator.next());
@@ -183,6 +193,7 @@ public class PListImpl extends ListIndex<String, Location> implements PList {
         final AtomicReference<Map.Entry<String, Location>> ref = new AtomicReference<Map.Entry<String, Location>>();
         synchronized (indexLock) {
             this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
+                @Override
                 public void execute(Transaction tx) throws IOException {
                     ref.set(getFirst(tx));
                 }
@@ -200,6 +211,7 @@ public class PListImpl extends ListIndex<String, Location> implements PList {
         final AtomicReference<Map.Entry<String, Location>> ref = new AtomicReference<Map.Entry<String, Location>>();
         synchronized (indexLock) {
             this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
+                @Override
                 public void execute(Transaction tx) throws IOException {
                     ref.set(getLast(tx));
                 }
@@ -270,6 +282,7 @@ public class PListImpl extends ListIndex<String, Location> implements PList {
             }
         }
 
+        @Override
         public void release() {
             try {
                 tx.rollback();
@@ -285,6 +298,7 @@ public class PListImpl extends ListIndex<String, Location> implements PList {
         synchronized (indexLock) {
             if (loaded.get()) {
                 this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
+                    @Override
                     public void execute(Transaction tx) throws IOException {
                         Iterator<Map.Entry<String,Location>> iterator = iterator(tx);
                         while (iterator.hasNext()) {
@@ -298,6 +312,51 @@ public class PListImpl extends ListIndex<String, Location> implements PList {
     }
 
     @Override
+    public long messageSize() {
+        return messageSize.getTotalSize();
+    }
+
+    @Override
+    public synchronized Location add(Transaction tx, String key, Location value)
+            throws IOException {
+        Location location = super.add(tx, key, value);
+        messageSize.addSize(value.getSize());
+        return location;
+    }
+
+    @Override
+    public synchronized Location addFirst(Transaction tx, String key,
+            Location value) throws IOException {
+        Location location = super.addFirst(tx, key, value);
+        messageSize.addSize(value.getSize());
+        return location;
+    }
+
+    @Override
+    public synchronized void clear(Transaction tx) throws IOException {
+        messageSize.reset();
+        super.clear(tx);
+    }
+
+    @Override
+    protected synchronized void onLoad(ListNode<String, Location> node, Transaction tx) {
+        try {
+            Iterator<Entry<String, Location>> i = node.iterator(tx);
+            while (i.hasNext()) {
+                messageSize.addSize(i.next().getValue().getSize());
+            }
+        } catch (IOException e) {
+            LOG.warn("could not increment message size", e);
+        }
+    }
+
+    @Override
+    public void onRemove(Entry<String, Location> removed) {
+        super.onRemove(removed);
+        messageSize.addSize(-removed.getValue().getSize());
+    }
+
+    @Override
     public String toString() {
         return name + "[headPageId=" + getHeadPageId()  + ",tailPageId=" + getTailPageId() + ", size=" + size() + "]";
     }

http://git-wip-us.apache.org/repos/asf/activemq/blob/734fb7dd/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala
----------------------------------------------------------------------
diff --git a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala
index 7c2d327..a4cdcac 100644
--- a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala
+++ b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala
@@ -1008,6 +1008,11 @@ class LevelDBStore extends LockableServiceSupport with BrokerServiceAware with P
         case None => 0
       }
     }
+    
+    def getMessageSize(clientId: String, subscriptionName: String): Long = {
+      check_running
+      return 0
+    }
 
   }
   class LevelDBPList(val name: String, val key: Long) extends PList {
@@ -1066,6 +1071,7 @@ class LevelDBStore extends LockableServiceSupport with BrokerServiceAware with P
 
     def isEmpty = size()==0
     def size(): Long = listSize.get()
+    def messageSize(): Long = 0
 
     def iterator() = new PListIterator() {
       check_running

http://git-wip-us.apache.org/repos/asf/activemq/blob/734fb7dd/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/QueueDuplicatesFromStoreTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/QueueDuplicatesFromStoreTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/QueueDuplicatesFromStoreTest.java
index 99382d0..6cef709 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/QueueDuplicatesFromStoreTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/QueueDuplicatesFromStoreTest.java
@@ -228,6 +228,11 @@ public class QueueDuplicatesFromStoreTest extends TestCase {
             }
 
             @Override
+            public long getPendingMessageSize() {
+                return 0;
+            }
+
+            @Override
             public int getPrefetchSize() {
                 return 0;
             }
@@ -354,10 +359,12 @@ public class QueueDuplicatesFromStoreTest extends TestCase {
                 return 0;
             }
 
+            @Override
             public void incrementConsumedCount(){
 
             }
 
+            @Override
             public void resetConsumedCount(){
 
             }

http://git-wip-us.apache.org/repos/asf/activemq/blob/734fb7dd/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/SubscriptionAddRemoveQueueTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/SubscriptionAddRemoveQueueTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/SubscriptionAddRemoveQueueTest.java
index 2541a64..207ecda 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/SubscriptionAddRemoveQueueTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/SubscriptionAddRemoveQueueTest.java
@@ -102,6 +102,7 @@ public class SubscriptionAddRemoveQueueTest extends TestCase {
     public void testNoDispatchToRemovedConsumers() throws Exception {
         final AtomicInteger producerId = new AtomicInteger();
         Runnable sender = new Runnable() {
+            @Override
             public void run() {
                 AtomicInteger id = new AtomicInteger();
                 int producerIdAndIncrement = producerId.getAndIncrement();
@@ -120,6 +121,7 @@ public class SubscriptionAddRemoveQueueTest extends TestCase {
         };
 
         Runnable subRemover = new Runnable() {
+            @Override
             public void run() {
                 for (Subscription sub : subs) {
                     try {
@@ -177,10 +179,12 @@ public class SubscriptionAddRemoveQueueTest extends TestCase {
         List<MessageReference> dispatched =
                 Collections.synchronizedList(new ArrayList<MessageReference>());
 
+        @Override
         public void acknowledge(ConnectionContext context, MessageAck ack)
                 throws Exception {
         }
 
+        @Override
         public void add(MessageReference node) throws Exception {
             // immediate dispatch
             QueueMessageReference  qmr = (QueueMessageReference)node;
@@ -188,6 +192,7 @@ public class SubscriptionAddRemoveQueueTest extends TestCase {
             dispatched.add(qmr);
         }
 
+        @Override
         public ConnectionContext getContext() {
             return null;
         }
@@ -228,76 +233,100 @@ public class SubscriptionAddRemoveQueueTest extends TestCase {
         public void resetConsumedCount() {
         }
 
+        @Override
         public void add(ConnectionContext context, Destination destination)
                 throws Exception {
         }
 
+        @Override
         public void destroy() {
         }
 
+        @Override
         public void gc() {
         }
 
+        @Override
         public ConsumerInfo getConsumerInfo() {
             return info;
         }
 
+        @Override
         public long getDequeueCounter() {
             return 0;
         }
 
+        @Override
         public long getDispatchedCounter() {
             return 0;
         }
 
+        @Override
         public int getDispatchedQueueSize() {
             return 0;
         }
 
+        @Override
         public long getEnqueueCounter() {
             return 0;
         }
 
+        @Override
         public int getInFlightSize() {
             return 0;
         }
 
+        @Override
         public int getInFlightUsage() {
             return 0;
         }
 
+        @Override
         public ObjectName getObjectName() {
             return null;
         }
 
+        @Override
         public int getPendingQueueSize() {
             return 0;
         }
 
+        @Override
+        public long getPendingMessageSize() {
+            return 0;
+        }
+
+        @Override
         public int getPrefetchSize() {
             return 0;
         }
 
+        @Override
         public String getSelector() {
             return null;
         }
 
+        @Override
         public boolean isBrowser() {
             return false;
         }
 
+        @Override
         public boolean isFull() {
             return false;
         }
 
+        @Override
         public boolean isHighWaterMark() {
             return false;
         }
 
+        @Override
         public boolean isLowWaterMark() {
             return false;
         }
 
+        @Override
         public boolean isRecoveryRequired() {
             return false;
         }
@@ -306,19 +335,23 @@ public class SubscriptionAddRemoveQueueTest extends TestCase {
             return false;
         }
 
+        @Override
         public boolean matches(MessageReference node,
                 MessageEvaluationContext context) throws IOException {
             return true;
         }
 
+        @Override
         public boolean matches(ActiveMQDestination destination) {
             return false;
         }
 
+        @Override
         public void processMessageDispatchNotification(
                 MessageDispatchNotification mdn) throws Exception {
         }
 
+        @Override
         public Response pullMessage(ConnectionContext context, MessagePull pull)
                 throws Exception {
             return null;
@@ -329,34 +362,42 @@ public class SubscriptionAddRemoveQueueTest extends TestCase {
             return false;
         }
 
+        @Override
         public List<MessageReference> remove(ConnectionContext context,
                 Destination destination) throws Exception {
             return new ArrayList<MessageReference>(dispatched);
         }
 
+        @Override
         public void setObjectName(ObjectName objectName) {
         }
 
+        @Override
         public void setSelector(String selector)
                 throws InvalidSelectorException, UnsupportedOperationException {
         }
 
+        @Override
         public void updateConsumerPrefetch(int newPrefetch) {
         }
 
+        @Override
         public boolean addRecoveredMessage(ConnectionContext context,
                 MessageReference message) throws Exception {
             return false;
         }
 
+        @Override
         public ActiveMQDestination getActiveMQDestination() {
             return null;
         }
 
+        @Override
         public int getLockPriority() {
             return 0;
         }
 
+        @Override
         public boolean isLockExclusive() {
             return false;
         }
@@ -367,6 +408,7 @@ public class SubscriptionAddRemoveQueueTest extends TestCase {
         public void removeDestination(Destination destination) {
         }
 
+        @Override
         public int countBeforeFull() {
             return 10;
         }

http://git-wip-us.apache.org/repos/asf/activemq/blob/734fb7dd/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursorTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursorTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursorTest.java
new file mode 100644
index 0000000..5d0a82c
--- /dev/null
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursorTest.java
@@ -0,0 +1,547 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.region.cursors;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.concurrent.atomic.AtomicLong;
+
+import javax.jms.Connection;
+import javax.jms.DeliveryMode;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.Topic;
+import javax.jms.TopicSession;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.TransportConnector;
+import org.apache.activemq.broker.region.Destination;
+import org.apache.activemq.broker.region.DurableTopicSubscription;
+import org.apache.activemq.broker.region.TopicSubscription;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.store.AbstractStoreStatTestSupport;
+import org.apache.activemq.store.MessageStore;
+import org.apache.activemq.util.SubscriptionKey;
+import org.apache.activemq.util.Wait;
+import org.apache.activemq.util.Wait.Condition;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This test checks that KahaDB properly sets the new storeMessageSize statistic.
+ *
+ * AMQ-5748
+ *
+ */
+public abstract class AbstractPendingMessageCursorTest extends AbstractStoreStatTestSupport {
+    protected static final Logger LOG = LoggerFactory
+            .getLogger(AbstractPendingMessageCursorTest.class);
+
+
+    protected BrokerService broker;
+    protected URI brokerConnectURI;
+    protected String defaultQueueName = "test.queue";
+    protected String defaultTopicName = "test.topic";
+    protected static int maxMessageSize = 1000;
+
+    @Before
+    public void startBroker() throws Exception {
+        setUpBroker(true);
+    }
+
+    protected void setUpBroker(boolean clearDataDir) throws Exception {
+
+        broker = new BrokerService();
+        this.initPersistence(broker);
+        //set up a transport
+        TransportConnector connector = broker
+                .addConnector(new TransportConnector());
+        connector.setUri(new URI("tcp://0.0.0.0:0"));
+        connector.setName("tcp");
+
+        PolicyEntry policy = new PolicyEntry();
+        policy.setTopicPrefetch(100);
+        policy.setDurableTopicPrefetch(100);
+        PolicyMap pMap = new PolicyMap();
+        pMap.setDefaultEntry(policy);
+        broker.setDestinationPolicy(pMap);
+
+        broker.start();
+        broker.waitUntilStarted();
+        brokerConnectURI = broker.getConnectorByName("tcp").getConnectUri();
+
+    }
+
+    @After
+    public void stopBroker() throws Exception {
+        broker.stop();
+        broker.waitUntilStopped();
+    }
+
+    @Override
+    protected BrokerService getBroker() {
+        return this.broker;
+    }
+
+    @Override
+    protected URI getBrokerConnectURI() {
+        return this.brokerConnectURI;
+    }
+
+    protected abstract void initPersistence(BrokerService brokerService) throws IOException;
+
+    @Test
+    public void testQueueMessageSize() throws Exception {
+        AtomicLong publishedMessageSize = new AtomicLong();
+
+        org.apache.activemq.broker.region.Queue dest = publishTestQueueMessages(200, publishedMessageSize);
+        verifyPendingStats(dest, 200, publishedMessageSize.get());
+        verifyStoreStats(dest, 200, publishedMessageSize.get());
+    }
+
+    @Test
+    public void testQueueBrowserMessageSize() throws Exception {
+        AtomicLong publishedMessageSize = new AtomicLong();
+
+        org.apache.activemq.broker.region.Queue dest = publishTestQueueMessages(200, publishedMessageSize);
+        browseTestQueueMessages(dest.getName());
+        verifyPendingStats(dest, 200, publishedMessageSize.get());
+        verifyStoreStats(dest, 200, publishedMessageSize.get());
+    }
+
+    @Test
+    public void testQueueMessageSizeNonPersistent() throws Exception {
+        AtomicLong publishedMessageSize = new AtomicLong();
+
+        org.apache.activemq.broker.region.Queue dest = publishTestQueueMessages(200,
+                DeliveryMode.NON_PERSISTENT, publishedMessageSize);
+        verifyPendingStats(dest, 200, publishedMessageSize.get());
+    }
+
+    @Test
+    public void testQueueMessageSizePersistentAndNonPersistent() throws Exception {
+        AtomicLong publishedNonPersistentMessageSize = new AtomicLong();
+        AtomicLong publishedMessageSize = new AtomicLong();
+
+        org.apache.activemq.broker.region.Queue dest = publishTestQueueMessages(100,
+                DeliveryMode.PERSISTENT, publishedMessageSize);
+        dest = publishTestQueueMessages(100,
+                DeliveryMode.NON_PERSISTENT, publishedNonPersistentMessageSize);
+        verifyPendingStats(dest, 200, publishedMessageSize.get() + publishedNonPersistentMessageSize.get());
+        verifyStoreStats(dest, 100, publishedMessageSize.get());
+    }
+
+    @Test
+    public void testQueueMessageSizeAfterConsumption() throws Exception {
+        AtomicLong publishedMessageSize = new AtomicLong();
+
+        org.apache.activemq.broker.region.Queue dest = publishTestQueueMessages(200, publishedMessageSize);
+        verifyPendingStats(dest, 200, publishedMessageSize.get());
+
+        consumeTestQueueMessages();
+
+        verifyPendingStats(dest, 0, 0);
+        verifyStoreStats(dest, 0, 0);
+    }
+
+    @Test
+    public void testQueueMessageSizeAfterConsumptionNonPersistent() throws Exception {
+        AtomicLong publishedMessageSize = new AtomicLong();
+
+        org.apache.activemq.broker.region.Queue dest = publishTestQueueMessages(200, DeliveryMode.NON_PERSISTENT, publishedMessageSize);
+        verifyPendingStats(dest, 200, publishedMessageSize.get());
+
+        consumeTestQueueMessages();
+
+        verifyPendingStats(dest, 0, 0);
+        verifyStoreStats(dest, 0, 0);
+    }
+
+    @Test(timeout=100000)
+    public void testTopicMessageSize() throws Exception {
+        AtomicLong publishedMessageSize = new AtomicLong();
+
+        Connection connection = new ActiveMQConnectionFactory(brokerConnectURI).createConnection();
+        connection.setClientID("clientId");
+        connection.start();
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        MessageConsumer consumer = session.createConsumer(new ActiveMQTopic(this.defaultTopicName));
+
+        org.apache.activemq.broker.region.Topic dest = publishTestTopicMessages(200, publishedMessageSize);
+
+        //verify the count and size - there is a prefetch of 100 so only 100 are pending and 100
+        //are dispatched because we have an active consumer online
+        //verify that the size is greater than 100 messages times the minimum size of 100
+        verifyPendingStats(dest, 100, 100 * 100);
+
+        //consume all messages
+        consumeTestMessages(consumer, 200);
+
+        //All messages should now be gone
+        verifyPendingStats(dest, 0, 0);
+
+        connection.close();
+    }
+
+    @Test(timeout=100000)
+    public void testTopicNonPersistentMessageSize() throws Exception {
+        AtomicLong publishedMessageSize = new AtomicLong();
+
+        Connection connection = new ActiveMQConnectionFactory(brokerConnectURI).createConnection();
+        connection.setClientID("clientId");
+        connection.start();
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        MessageConsumer consumer = session.createConsumer(new ActiveMQTopic(this.defaultTopicName));
+
+        org.apache.activemq.broker.region.Topic dest = publishTestTopicMessages(200,
+                DeliveryMode.NON_PERSISTENT, publishedMessageSize);
+
+        //verify the count and size - there is a prefetch of 100 so only 100 are pending and 100
+        //are dispatched because we have an active consumer online
+        //verify the size is at least as big as 100 messages times the minimum of 100 size
+        verifyPendingStats(dest, 100, 100 * 100);
+
+        //consume all messages
+        consumeTestMessages(consumer, 200);
+
+        //All messages should now be gone
+        verifyPendingStats(dest, 0, 0);
+
+        connection.close();
+    }
+
+    @Test(timeout=100000)
+    public void testTopicPersistentAndNonPersistentMessageSize() throws Exception {
+        AtomicLong publishedMessageSize = new AtomicLong();
+
+        Connection connection = new ActiveMQConnectionFactory(brokerConnectURI).createConnection();
+        connection.setClientID("clientId");
+        connection.start();
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        MessageConsumer consumer = session.createConsumer(new ActiveMQTopic(this.defaultTopicName));
+
+        org.apache.activemq.broker.region.Topic dest = publishTestTopicMessages(100,
+                DeliveryMode.NON_PERSISTENT, publishedMessageSize);
+
+        dest = publishTestTopicMessages(100, DeliveryMode.PERSISTENT, publishedMessageSize);
+
+        //verify the count and size - there is a prefetch of 100 so only 100 are pending and 100
+        //are dispatched because we have an active consumer online
+      //verify the size is at least as big as 100 messages times the minimum of 100 size
+        verifyPendingStats(dest, 100, 100 * 100);
+
+        //consume all messages
+        consumeTestMessages(consumer, 200);
+
+        //All messages should now be gone
+        verifyPendingStats(dest, 0, 0);
+
+        connection.close();
+    }
+
+    @Test(timeout=10000)
+    public void testMessageSizeOneDurable() throws Exception {
+        AtomicLong publishedMessageSize = new AtomicLong();
+        Connection connection = new ActiveMQConnectionFactory(brokerConnectURI).createConnection();
+        connection.setClientID("clientId");
+        connection.start();
+
+        SubscriptionKey subKey = new SubscriptionKey("clientId", "sub1");
+        org.apache.activemq.broker.region.Topic dest = publishTestMessagesDurable(connection,
+                new String[] {"sub1"}, 200, publishedMessageSize, DeliveryMode.PERSISTENT);
+
+        //verify the count and size - durable is offline so all 200 should be pending since none are in prefetch
+        verifyPendingStats(dest, subKey, 200, publishedMessageSize.get());
+        verifyStoreStats(dest, 200, publishedMessageSize.get());
+
+        //consume all messages
+        consumeDurableTestMessages(connection, "sub1", 200, publishedMessageSize);
+
+        //All messages should now be gone
+        verifyPendingStats(dest, subKey, 0, 0);
+        verifyStoreStats(dest, 0, 0);
+
+        connection.close();
+    }
+
+    @Test(timeout=10000)
+    public void testMessageSizeOneDurablePartialConsumption() throws Exception {
+        AtomicLong publishedMessageSize = new AtomicLong();
+
+        Connection connection = new ActiveMQConnectionFactory(brokerConnectURI).createConnection();
+        connection.setClientID("clientId");
+        connection.start();
+
+        SubscriptionKey subKey = new SubscriptionKey("clientId", "sub1");
+        org.apache.activemq.broker.region.Topic dest = publishTestMessagesDurable(
+                connection, new String[] {"sub1"}, 200, publishedMessageSize, DeliveryMode.PERSISTENT);
+
+        //verify the count and size - durable is offline so all 200 should be pending since none are in prefetch
+        verifyPendingStats(dest, subKey, 200, publishedMessageSize.get());
+        verifyStoreStats(dest, 200, publishedMessageSize.get());
+
+        //consume all messages
+        consumeDurableTestMessages(connection, "sub1", 50, publishedMessageSize);
+
+        //150 should be left
+        verifyPendingStats(dest, subKey, 150, publishedMessageSize.get());
+        verifyStoreStats(dest, 150, publishedMessageSize.get());
+
+        connection.close();
+    }
+
+    @Test(timeout=10000)
+    public void testMessageSizeTwoDurables() throws Exception {
+        AtomicLong publishedMessageSize = new AtomicLong();
+
+        Connection connection = new ActiveMQConnectionFactory(brokerConnectURI).createConnection();
+        connection.setClientID("clientId");
+        connection.start();
+
+        org.apache.activemq.broker.region.Topic dest =
+                publishTestMessagesDurable(connection, new String[] {"sub1", "sub2"}, 200,
+                        publishedMessageSize, DeliveryMode.PERSISTENT);
+
+        //verify the count and size
+        SubscriptionKey subKey = new SubscriptionKey("clientId", "sub1");
+        verifyPendingStats(dest, subKey, 200, publishedMessageSize.get());
+
+        //consume messages just for sub1
+        consumeDurableTestMessages(connection, "sub1", 200, publishedMessageSize);
+
+        //There is still a durable that hasn't consumed so the messages should exist
+        SubscriptionKey subKey2 = new SubscriptionKey("clientId", "sub2");
+        verifyPendingStats(dest, subKey, 0, 0);
+        verifyPendingStats(dest, subKey2, 200, publishedMessageSize.get());
+        verifyStoreStats(dest, 200, publishedMessageSize.get());
+
+        connection.stop();
+    }
+
+
+    protected void verifyPendingStats(final org.apache.activemq.broker.region.Queue queue,
+            final int count, final long minimumSize) throws Exception {
+        this.verifyPendingStats(queue, count, minimumSize, count, minimumSize);
+    }
+
+    protected void verifyPendingStats(final org.apache.activemq.broker.region.Queue queue,
+            final int count, final long minimumSize, final int storeCount, final long minimumStoreSize) throws Exception {
+
+        Wait.waitFor(new Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                return queue.getPendingMessageCount() == count;
+            }
+        });
+
+        verifySize(count, new MessageSizeCalculator() {
+            @Override
+            public long getMessageSize() throws Exception {
+                return queue.getPendingMessageSize();
+            }
+        }, minimumSize);
+    }
+
+    //For a non-durable there won't necessarily be a message store
+    protected void verifyPendingStats(org.apache.activemq.broker.region.Topic topic,
+            final int count, final long minimumSize) throws Exception {
+
+        final TopicSubscription sub = (TopicSubscription) topic.getConsumers().get(0);
+
+        Wait.waitFor(new Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                return sub.getPendingQueueSize() == count;
+            }
+        });
+
+        verifySize(count, new MessageSizeCalculator() {
+            @Override
+            public long getMessageSize() throws Exception {
+                return sub.getPendingMessageSize();
+            }
+        }, minimumSize);
+    }
+
+    protected void verifyPendingStats(org.apache.activemq.broker.region.Topic topic, SubscriptionKey subKey,
+            final int count, final long minimumSize) throws Exception {
+
+        final DurableTopicSubscription sub = topic.getDurableTopicSubs().get(subKey);
+
+        //verify message count
+        Wait.waitFor(new Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                return sub.getPendingQueueSize() == count;
+            }
+        });
+
+        //verify message size
+        verifySize(count, new MessageSizeCalculator() {
+            @Override
+            public long getMessageSize() throws Exception {
+                return sub.getPendingMessageSize();
+            }
+        }, minimumSize);
+    }
+
+    protected void verifyStoreStats(org.apache.activemq.broker.region.Destination dest,
+            final int storeCount, final long minimumStoreSize) throws Exception {
+        final MessageStore messageStore = dest.getMessageStore();
+
+        Wait.waitFor(new Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                return messageStore.getMessageCount() == storeCount;
+            }
+        });
+        verifySize(storeCount, new MessageSizeCalculator() {
+            @Override
+            public long getMessageSize() throws Exception {
+                return messageStore.getMessageSize();
+            }
+        }, minimumStoreSize);
+
+    }
+
+
+    protected void verifySize(final int count, final MessageSizeCalculator messageSizeCalculator,
+            final long minimumSize) throws Exception {
+        if (count > 0) {
+            Wait.waitFor(new Condition() {
+                @Override
+                public boolean isSatisified() throws Exception {
+                    return messageSizeCalculator.getMessageSize() > minimumSize ;
+                }
+            });
+        } else {
+            Wait.waitFor(new Condition() {
+                @Override
+                public boolean isSatisified() throws Exception {
+                    return messageSizeCalculator.getMessageSize() == 0;
+                }
+            });
+        }
+    }
+
+    protected static interface MessageSizeCalculator {
+        long getMessageSize() throws Exception;
+    }
+
+
+    protected Destination consumeTestMessages(MessageConsumer consumer, int size) throws Exception {
+        return consumeTestMessages(consumer, size, defaultTopicName);
+    }
+
+
+    protected Destination consumeTestMessages(MessageConsumer consumer, int size, String topicName) throws Exception {
+        // create a new queue
+        final ActiveMQDestination activeMqTopic = new ActiveMQTopic(
+                topicName);
+
+        Destination dest = broker.getDestination(activeMqTopic);
+
+        //Topic topic = session.createTopic(topicName);
+
+        try {
+            for (int i = 0; i < size; i++) {
+                consumer.receive();
+            }
+
+        } finally {
+            //session.close();
+        }
+
+        return dest;
+    }
+
+    protected Destination consumeDurableTestMessages(Connection connection, String sub, int size, AtomicLong publishedMessageSize) throws Exception {
+        return consumeDurableTestMessages(connection, sub, size, defaultTopicName, publishedMessageSize);
+    }
+
+    protected org.apache.activemq.broker.region.Topic publishTestMessagesDurable(Connection connection,
+            String[] subNames, int publishSize, AtomicLong publishedMessageSize, int deliveryMode) throws Exception {
+
+        return publishTestMessagesDurable(connection, subNames, defaultTopicName,
+                publishSize, 0, AbstractStoreStatTestSupport.defaultMessageSize,
+                publishedMessageSize, false, deliveryMode);
+    }
+
+    protected org.apache.activemq.broker.region.Topic publishTestTopicMessages(int publishSize,
+            AtomicLong publishedMessageSize) throws Exception {
+        return publishTestTopicMessages(publishSize, DeliveryMode.PERSISTENT, publishedMessageSize);
+    }
+
+    protected org.apache.activemq.broker.region.Topic publishTestTopicMessages(int publishSize,
+            int deliveryMode, AtomicLong publishedMessageSize) throws Exception {
+        // create a new queue
+        Connection connection = new ActiveMQConnectionFactory(brokerConnectURI).createConnection();
+        connection.setClientID("clientId2");
+        connection.start();
+
+        final ActiveMQDestination activeMqTopic = new ActiveMQTopic(
+                defaultTopicName);
+
+        org.apache.activemq.broker.region.Topic dest =
+                (org.apache.activemq.broker.region.Topic) broker.getDestination(activeMqTopic);
+
+        // Start the connection
+        Session session = connection.createSession(false,
+                TopicSession.AUTO_ACKNOWLEDGE);
+        Topic topic = session.createTopic(defaultTopicName);
+
+        try {
+            // publish a bunch of non-persistent messages to fill up the temp
+            // store
+            MessageProducer prod = session.createProducer(topic);
+            prod.setDeliveryMode(deliveryMode);
+            for (int i = 0; i < publishSize; i++) {
+                prod.send(createMessage(session, AbstractPendingMessageCursorTest.maxMessageSize, publishedMessageSize));
+            }
+
+        } finally {
+            connection.close();
+        }
+
+        return dest;
+    }
+
+    protected org.apache.activemq.broker.region.Queue publishTestQueueMessages(int count,
+            AtomicLong publishedMessageSize) throws Exception {
+        return publishTestQueueMessages(count, defaultQueueName, DeliveryMode.PERSISTENT,
+                AbstractPendingMessageCursorTest.maxMessageSize, publishedMessageSize);
+    }
+
+    protected org.apache.activemq.broker.region.Queue publishTestQueueMessages(int count, int deliveryMode,
+            AtomicLong publishedMessageSize) throws Exception {
+        return publishTestQueueMessages(count, defaultQueueName, deliveryMode,
+                AbstractPendingMessageCursorTest.maxMessageSize, publishedMessageSize);
+    }
+
+    protected Destination consumeTestQueueMessages() throws Exception {
+        return consumeTestQueueMessages(defaultQueueName);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/734fb7dd/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/KahaDBPendingMessageCursorTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/KahaDBPendingMessageCursorTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/KahaDBPendingMessageCursorTest.java
new file mode 100644
index 0000000..557c70e
--- /dev/null
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/KahaDBPendingMessageCursorTest.java
@@ -0,0 +1,126 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.region.cursors;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicLong;
+
+import javax.jms.Connection;
+import javax.jms.DeliveryMode;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.region.Topic;
+import org.apache.activemq.util.SubscriptionKey;
+import org.apache.commons.io.FileUtils;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This test checks that pending message metrics work properly with KahaDB
+ *
+ * AMQ-5923
+ *
+ */
+public class KahaDBPendingMessageCursorTest extends
+        AbstractPendingMessageCursorTest {
+    protected static final Logger LOG = LoggerFactory
+            .getLogger(KahaDBPendingMessageCursorTest.class);
+
+    @Rule
+    public TemporaryFolder dataFileDir = new TemporaryFolder(new File("target"));
+
+    @Override
+    protected void setUpBroker(boolean clearDataDir) throws Exception {
+        if (clearDataDir && dataFileDir.getRoot().exists())
+            FileUtils.cleanDirectory(dataFileDir.getRoot());
+        super.setUpBroker(clearDataDir);
+    }
+
+    @Override
+    protected void initPersistence(BrokerService brokerService)
+            throws IOException {
+        broker.setPersistent(true);
+        broker.setDataDirectoryFile(dataFileDir.getRoot());
+    }
+
+    /**
+     * Test that the the counter restores size and works after restart and more
+     * messages are published
+     *
+     * @throws Exception
+     */
+    @Test(timeout=10000)
+    public void testDurableMessageSizeAfterRestartAndPublish() throws Exception {
+        AtomicLong publishedMessageSize = new AtomicLong();
+
+        Connection connection = new ActiveMQConnectionFactory(brokerConnectURI).createConnection();
+        connection.setClientID("clientId");
+        connection.start();
+        Topic topic =  publishTestMessagesDurable(connection, new String[] {"sub1"}, 200,
+                publishedMessageSize, DeliveryMode.PERSISTENT);
+
+        SubscriptionKey subKey = new SubscriptionKey("clientId", "sub1");
+
+        // verify the count and size
+        verifyPendingStats(topic, subKey, 200, publishedMessageSize.get());
+        verifyStoreStats(topic, 200, publishedMessageSize.get());
+
+        // stop, restart broker and publish more messages
+        stopBroker();
+        this.setUpBroker(false);
+
+        connection = new ActiveMQConnectionFactory(brokerConnectURI).createConnection();
+        connection.setClientID("clientId");
+        connection.start();
+
+        topic = publishTestMessagesDurable(connection, new String[] {"sub1"}, 200,
+                publishedMessageSize, DeliveryMode.PERSISTENT);
+
+        // verify the count and size
+        verifyPendingStats(topic, subKey, 400, publishedMessageSize.get());
+        verifyStoreStats(topic, 400, publishedMessageSize.get());
+
+    }
+
+    /**
+     * Test that the the counter restores size and works after restart and more
+     * messages are published
+     *
+     * @throws Exception
+     */
+    @Test(timeout=10000)
+    public void testNonPersistentDurableMessageSize() throws Exception {
+        AtomicLong publishedMessageSize = new AtomicLong();
+
+        Connection connection = new ActiveMQConnectionFactory(brokerConnectURI).createConnection();
+        connection.setClientID("clientId");
+        connection.start();
+        Topic topic =  publishTestMessagesDurable(connection, new String[] {"sub1"}, 200,
+                publishedMessageSize, DeliveryMode.NON_PERSISTENT);
+
+        SubscriptionKey subKey = new SubscriptionKey("clientId", "sub1");
+
+        // verify the count and size
+        verifyPendingStats(topic, subKey, 200, publishedMessageSize.get());
+        verifyStoreStats(topic, 0, 0);
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/734fb7dd/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/MemoryPendingMessageCursorTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/MemoryPendingMessageCursorTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/MemoryPendingMessageCursorTest.java
new file mode 100644
index 0000000..948193d
--- /dev/null
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/MemoryPendingMessageCursorTest.java
@@ -0,0 +1,145 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.region.cursors;
+
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicLong;
+
+import javax.jms.Connection;
+import javax.jms.DeliveryMode;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.store.memory.MemoryPersistenceAdapter;
+import org.apache.activemq.util.SubscriptionKey;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This test checks that PendingMessageCursor size statistics work with the MemoryPersistentAdapter
+ *
+ * AMQ-5748
+ *
+ */
+public class MemoryPendingMessageCursorTest extends AbstractPendingMessageCursorTest {
+    protected static final Logger LOG = LoggerFactory
+            .getLogger(MemoryPendingMessageCursorTest.class);
+
+    @Override
+    protected void initPersistence(BrokerService brokerService) throws IOException {
+        broker.setPersistent(false);
+        broker.setPersistenceAdapter(new MemoryPersistenceAdapter());
+    }
+
+
+    @Override
+    @Test(timeout=10000)
+    public void testMessageSizeOneDurable() throws Exception {
+        AtomicLong publishedMessageSize = new AtomicLong();
+        Connection connection = new ActiveMQConnectionFactory(brokerConnectURI).createConnection();
+        connection.setClientID("clientId");
+        connection.start();
+
+        SubscriptionKey subKey = new SubscriptionKey("clientId", "sub1");
+        org.apache.activemq.broker.region.Topic dest =
+                publishTestMessagesDurable(connection, new String[] {"sub1"},
+                        200, publishedMessageSize, DeliveryMode.PERSISTENT);
+
+        verifyPendingStats(dest, subKey, 200, publishedMessageSize.get());
+
+        //The expected value is only 100 because for durables a LRUCache is being used
+        //with a max size of 100
+        verifyStoreStats(dest, 100, publishedMessageSize.get());
+
+        //consume 100 messages
+        consumeDurableTestMessages(connection, "sub1", 100, publishedMessageSize);
+
+        //100 should be left
+        verifyPendingStats(dest, subKey, 100, publishedMessageSize.get());
+        verifyStoreStats(dest, 100, publishedMessageSize.get());
+
+        connection.close();
+    }
+
+    @Override
+    @Test(timeout=10000)
+    public void testMessageSizeTwoDurables() throws Exception {
+        AtomicLong publishedMessageSize = new AtomicLong();
+
+        Connection connection = new ActiveMQConnectionFactory(brokerConnectURI).createConnection();
+        connection.setClientID("clientId");
+        connection.start();
+
+        org.apache.activemq.broker.region.Topic dest =
+                publishTestMessagesDurable(connection, new String[] {"sub1", "sub2"},
+                        200, publishedMessageSize, DeliveryMode.PERSISTENT);
+
+        //verify the count and size
+        SubscriptionKey subKey = new SubscriptionKey("clientId", "sub1");
+        verifyPendingStats(dest, subKey, 200, publishedMessageSize.get());
+
+        //consume messages just for sub1
+        consumeDurableTestMessages(connection, "sub1", 200, publishedMessageSize);
+
+        //There is still a durable that hasn't consumed so the messages should exist
+        SubscriptionKey subKey2 = new SubscriptionKey("clientId", "sub2");
+        verifyPendingStats(dest, subKey, 0, 0);
+        verifyPendingStats(dest, subKey2, 200, publishedMessageSize.get());
+
+        //The expected value is only 100 because for durables a LRUCache is being used
+        //with a max size of 100
+        verifyStoreStats(dest, 100, publishedMessageSize.get());
+
+        connection.stop();
+    }
+
+    @Override
+    @Test(timeout=10000)
+    public void testMessageSizeOneDurablePartialConsumption() throws Exception {
+        AtomicLong publishedMessageSize = new AtomicLong();
+
+        Connection connection = new ActiveMQConnectionFactory(brokerConnectURI).createConnection();
+        connection.setClientID("clientId");
+        connection.start();
+
+        SubscriptionKey subKey = new SubscriptionKey("clientId", "sub1");
+        org.apache.activemq.broker.region.Topic dest = publishTestMessagesDurable(connection,
+                new String[] {"sub1"}, 200, publishedMessageSize, DeliveryMode.PERSISTENT);
+
+        //verify the count and size - durable is offline so all 200 should be pending since none are in prefetch
+        verifyPendingStats(dest, subKey, 200, publishedMessageSize.get());
+
+        //The expected value is only 100 because for durables a LRUCache is being used
+        //with a max size of 100
+        verifyStoreStats(dest, 100, publishedMessageSize.get());
+
+        //consume all messages
+        consumeDurableTestMessages(connection, "sub1", 50, publishedMessageSize);
+
+        //All messages should now be gone
+        verifyPendingStats(dest, subKey, 150, publishedMessageSize.get());
+
+        //The expected value is only 100 because for durables a LRUCache is being used
+        //with a max size of 100
+       //verify the size is at least as big as 100 messages times the minimum of 100 size
+        verifyStoreStats(dest, 100, 100 * 100);
+
+        connection.close();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/734fb7dd/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/MultiKahaDBPendingMessageCursorTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/MultiKahaDBPendingMessageCursorTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/MultiKahaDBPendingMessageCursorTest.java
new file mode 100644
index 0000000..9768980
--- /dev/null
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/MultiKahaDBPendingMessageCursorTest.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.region.cursors;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.store.kahadb.FilteredKahaDBPersistenceAdapter;
+import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
+import org.apache.activemq.store.kahadb.MultiKahaDBPersistenceAdapter;
+
+/**
+ * This test checks that pending message metrics work properly with MultiKahaDB
+ *
+ * AMQ-5923
+ *
+ */
+public class MultiKahaDBPendingMessageCursorTest extends
+    KahaDBPendingMessageCursorTest {
+
+    @Override
+    protected void initPersistence(BrokerService brokerService)
+            throws IOException {
+        broker.setPersistent(true);
+
+        //setup multi-kaha adapter
+        MultiKahaDBPersistenceAdapter persistenceAdapter = new MultiKahaDBPersistenceAdapter();
+        persistenceAdapter.setDirectory(dataFileDir.getRoot());
+
+        KahaDBPersistenceAdapter kahaStore = new KahaDBPersistenceAdapter();
+        kahaStore.setJournalMaxFileLength(1024 * 512);
+
+        //set up a store per destination
+        FilteredKahaDBPersistenceAdapter filtered = new FilteredKahaDBPersistenceAdapter();
+        filtered.setPersistenceAdapter(kahaStore);
+        filtered.setPerDestination(true);
+        List<FilteredKahaDBPersistenceAdapter> stores = new ArrayList<>();
+        stores.add(filtered);
+
+        persistenceAdapter.setFilteredPersistenceAdapters(stores);
+        broker.setPersistenceAdapter(persistenceAdapter);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/734fb7dd/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/OrderPendingListTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/OrderPendingListTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/OrderPendingListTest.java
index 79d7e6c..6a9dd6b 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/OrderPendingListTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/OrderPendingListTest.java
@@ -309,6 +309,16 @@ public class OrderPendingListTest {
         }
 
         @Override
+        public long messageSize() {
+            long size = 0;
+            Iterator<MessageReference> i = theList.iterator();
+            while (i.hasNext()) {
+                size += i.next().getMessage().getSize();
+            }
+            return size;
+        }
+
+        @Override
         public Iterator<MessageReference> iterator() {
             return theList.iterator();
         }

http://git-wip-us.apache.org/repos/asf/activemq/blob/734fb7dd/activemq-unit-tests/src/test/java/org/apache/activemq/store/AbstractMessageStoreSizeStatTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/store/AbstractMessageStoreSizeStatTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/store/AbstractMessageStoreSizeStatTest.java
index 944d183..116500e 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/store/AbstractMessageStoreSizeStatTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/store/AbstractMessageStoreSizeStatTest.java
@@ -16,38 +16,19 @@
  */
 package org.apache.activemq.store;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 
 import java.io.IOException;
 import java.net.URI;
-import java.util.Random;
+import java.util.concurrent.atomic.AtomicLong;
 
-import javax.jms.BytesMessage;
 import javax.jms.Connection;
 import javax.jms.DeliveryMode;
-import javax.jms.JMSException;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.Queue;
-import javax.jms.QueueSession;
-import javax.jms.Session;
-import javax.jms.Topic;
-import javax.jms.TopicSession;
-import javax.jms.TopicSubscriber;
-import javax.management.ObjectName;
-import javax.management.openmbean.CompositeData;
-import javax.management.openmbean.TabularData;
 
 import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.broker.TransportConnector;
-import org.apache.activemq.broker.jmx.DurableSubscriptionViewMBean;
 import org.apache.activemq.broker.region.Destination;
-import org.apache.activemq.command.ActiveMQDestination;
-import org.apache.activemq.command.ActiveMQQueue;
-import org.apache.activemq.command.ActiveMQTopic;
 import org.apache.activemq.util.Wait;
 import org.apache.activemq.util.Wait.Condition;
 import org.junit.After;
@@ -62,7 +43,7 @@ import org.slf4j.LoggerFactory;
  * AMQ-5748
  *
  */
-public abstract class AbstractMessageStoreSizeStatTest {
+public abstract class AbstractMessageStoreSizeStatTest extends AbstractStoreStatTestSupport {
     protected static final Logger LOG = LoggerFactory
             .getLogger(AbstractMessageStoreSizeStatTest.class);
 
@@ -71,7 +52,6 @@ public abstract class AbstractMessageStoreSizeStatTest {
     protected URI brokerConnectURI;
     protected String defaultQueueName = "test.queue";
     protected String defaultTopicName = "test.topic";
-    protected static int messageSize = 1000;
 
     @Before
     public void startBroker() throws Exception {
@@ -100,39 +80,52 @@ public abstract class AbstractMessageStoreSizeStatTest {
         broker.waitUntilStopped();
     }
 
+    @Override
+    protected BrokerService getBroker() {
+        return this.broker;
+    }
+
+    @Override
+    protected URI getBrokerConnectURI() {
+        return this.brokerConnectURI;
+    }
+
     protected abstract void initPersistence(BrokerService brokerService) throws IOException;
 
-    @Test
+    @Test(timeout=10000)
     public void testMessageSize() throws Exception {
-        Destination dest = publishTestQueueMessages(200);
-        verifyStats(dest, 200, 200 * messageSize);
+        AtomicLong publishedMessageSize = new AtomicLong();
+
+        Destination dest = publishTestQueueMessages(200, publishedMessageSize);
+        verifyStats(dest, 200, publishedMessageSize.get());
     }
 
-    @Test
+    @Test(timeout=10000)
     public void testMessageSizeAfterConsumption() throws Exception {
+        AtomicLong publishedMessageSize = new AtomicLong();
 
-        Destination dest = publishTestQueueMessages(200);
-        verifyStats(dest, 200, 200 * messageSize);
+        Destination dest = publishTestQueueMessages(200, publishedMessageSize);
+        verifyStats(dest, 200, publishedMessageSize.get());
 
         consumeTestQueueMessages();
 
         verifyStats(dest, 0, 0);
     }
 
-    @Test
+    @Test(timeout=10000)
     public void testMessageSizeOneDurable() throws Exception {
-
+        AtomicLong publishedMessageSize = new AtomicLong();
         Connection connection = new ActiveMQConnectionFactory(brokerConnectURI).createConnection();
         connection.setClientID("clientId");
         connection.start();
 
-        Destination dest = publishTestMessagesDurable(connection, new String[] {"sub1"}, 200, 200);
+        Destination dest = publishTestMessagesDurable(connection, new String[] {"sub1"}, 200, 200, publishedMessageSize);
 
         //verify the count and size
-        verifyStats(dest, 200, 200 * messageSize);
+        verifyStats(dest, 200, publishedMessageSize.get());
 
         //consume all messages
-        consumeDurableTestMessages(connection, "sub1", 200);
+        consumeDurableTestMessages(connection, "sub1", 200, publishedMessageSize);
 
         //All messages should now be gone
         verifyStats(dest, 0, 0);
@@ -142,21 +135,21 @@ public abstract class AbstractMessageStoreSizeStatTest {
 
     @Test(timeout=10000)
     public void testMessageSizeTwoDurables() throws Exception {
-
+        AtomicLong publishedMessageSize = new AtomicLong();
         Connection connection = new ActiveMQConnectionFactory(brokerConnectURI).createConnection();
         connection.setClientID("clientId");
         connection.start();
 
-        Destination dest = publishTestMessagesDurable(connection, new String[] {"sub1", "sub2"}, 200, 200);
+        Destination dest = publishTestMessagesDurable(connection, new String[] {"sub1", "sub2"}, 200, 200, publishedMessageSize);
 
         //verify the count and size
-        verifyStats(dest, 200, 200 * messageSize);
+        verifyStats(dest, 200, publishedMessageSize.get());
 
         //consume messages just for sub1
-        consumeDurableTestMessages(connection, "sub1", 200);
+        consumeDurableTestMessages(connection, "sub1", 200, publishedMessageSize);
 
         //There is still a durable that hasn't consumed so the messages should exist
-        verifyStats(dest, 200, 200 * messageSize);
+        verifyStats(dest, 200, publishedMessageSize.get());
 
         connection.stop();
 
@@ -164,14 +157,24 @@ public abstract class AbstractMessageStoreSizeStatTest {
 
     @Test
     public void testMessageSizeAfterDestinationDeletion() throws Exception {
-        Destination dest = publishTestQueueMessages(200);
-        verifyStats(dest, 200, 200 * messageSize);
+        AtomicLong publishedMessageSize = new AtomicLong();
+        Destination dest = publishTestQueueMessages(200, publishedMessageSize);
+        verifyStats(dest, 200, publishedMessageSize.get());
 
         //check that the size is 0 after deletion
         broker.removeDestination(dest.getActiveMQDestination());
         verifyStats(dest, 0, 0);
     }
 
+    @Test
+    public void testQueueBrowserMessageSize() throws Exception {
+        AtomicLong publishedMessageSize = new AtomicLong();
+
+        Destination dest = publishTestQueueMessages(200, publishedMessageSize);
+        browseTestQueueMessages(dest.getName());
+        verifyStats(dest, 200, publishedMessageSize.get());
+    }
+
     protected void verifyStats(Destination dest, final int count, final long minimumSize) throws Exception {
         final MessageStore messageStore = dest.getMessageStore();
         final MessageStoreStatistics storeStats = dest.getMessageStore().getMessageStoreStatistics();
@@ -203,164 +206,31 @@ public abstract class AbstractMessageStoreSizeStatTest {
         }
     }
 
-    /**
-     * Generate random 1 megabyte messages
-     * @param session
-     * @return
-     * @throws JMSException
-     */
-    protected BytesMessage createMessage(Session session) throws JMSException {
-        final BytesMessage message = session.createBytesMessage();
-        final byte[] data = new byte[messageSize];
-        final Random rng = new Random();
-        rng.nextBytes(data);
-        message.writeBytes(data);
-        return message;
-    }
 
-
-    protected Destination publishTestQueueMessages(int count) throws Exception {
-        return publishTestQueueMessages(count, defaultQueueName);
+    protected Destination publishTestQueueMessages(int count, AtomicLong publishedMessageSize) throws Exception {
+        return publishTestQueueMessages(count, defaultQueueName, DeliveryMode.PERSISTENT,
+                AbstractStoreStatTestSupport.defaultMessageSize, publishedMessageSize);
     }
 
-    protected Destination publishTestQueueMessages(int count, String queueName) throws Exception {
-        // create a new queue
-        final ActiveMQDestination activeMqQueue = new ActiveMQQueue(
-                queueName);
-
-        Destination dest = broker.getDestination(activeMqQueue);
-
-        // Start the connection
-        Connection connection = new ActiveMQConnectionFactory(brokerConnectURI)
-        .createConnection();
-        connection.setClientID("clientId" + queueName);
-        connection.start();
-        Session session = connection.createSession(false,
-                QueueSession.AUTO_ACKNOWLEDGE);
-        Queue queue = session.createQueue(queueName);
-
-        try {
-            // publish a bunch of non-persistent messages to fill up the temp
-            // store
-            MessageProducer prod = session.createProducer(queue);
-            prod.setDeliveryMode(DeliveryMode.PERSISTENT);
-            for (int i = 0; i < count; i++) {
-                prod.send(createMessage(session));
-            }
-
-        } finally {
-            connection.close();
-        }
-
-        return dest;
+    protected Destination publishTestQueueMessages(int count, String queueName, AtomicLong publishedMessageSize) throws Exception {
+        return publishTestQueueMessages(count, queueName, DeliveryMode.PERSISTENT,
+                AbstractStoreStatTestSupport.defaultMessageSize, publishedMessageSize);
     }
 
     protected Destination consumeTestQueueMessages() throws Exception {
         return consumeTestQueueMessages(defaultQueueName);
     }
 
-    protected Destination consumeDurableTestMessages(Connection connection, String sub, int size) throws Exception {
-        return consumeDurableTestMessages(connection, sub, size, defaultTopicName);
+    protected Destination consumeDurableTestMessages(Connection connection, String sub, int size,
+            AtomicLong publishedMessageSize) throws Exception {
+        return consumeDurableTestMessages(connection, sub, size, defaultTopicName, publishedMessageSize);
     }
 
-    protected Destination consumeTestQueueMessages(String queueName) throws Exception {
-        // create a new queue
-        final ActiveMQDestination activeMqQueue = new ActiveMQQueue(
-                queueName);
-
-        Destination dest = broker.getDestination(activeMqQueue);
-
-        // Start the connection
-        Connection connection = new ActiveMQConnectionFactory(brokerConnectURI)
-        .createConnection();
-        connection.setClientID("clientId2" + queueName);
-        connection.start();
-        Session session = connection.createSession(false,
-                QueueSession.AUTO_ACKNOWLEDGE);
-        Queue queue = session.createQueue(queueName);
-
-        try {
-            MessageConsumer consumer = session.createConsumer(queue);
-            for (int i = 0; i < 200; i++) {
-                consumer.receive();
-            }
-
-        } finally {
-            connection.stop();
-        }
-
-        return dest;
-    }
-
-    protected Destination consumeDurableTestMessages(Connection connection, String sub, int size, String topicName) throws Exception {
-        // create a new queue
-        final ActiveMQDestination activeMqTopic = new ActiveMQTopic(
-                topicName);
-
-        Destination dest = broker.getDestination(activeMqTopic);
-
-        Session session = connection.createSession(false,
-                QueueSession.AUTO_ACKNOWLEDGE);
-        Topic topic = session.createTopic(topicName);
-
-        try {
-            TopicSubscriber consumer = session.createDurableSubscriber(topic, sub);
-            for (int i = 0; i < size; i++) {
-                consumer.receive();
-            }
-
-        } finally {
-            session.close();
-        }
-
-        return dest;
-    }
-
-    protected Destination publishTestMessagesDurable(Connection connection, String[] subNames, int publishSize, int expectedSize) throws Exception {
-        // create a new queue
-        final ActiveMQDestination activeMqTopic = new ActiveMQTopic(
-                defaultTopicName);
-
-        Destination dest = broker.getDestination(activeMqTopic);
-
-        // Start the connection
-
-        Session session = connection.createSession(false,
-                TopicSession.AUTO_ACKNOWLEDGE);
-        Topic topic = session.createTopic(defaultTopicName);
-        for (String subName : subNames) {
-            session.createDurableSubscriber(topic, subName);
-        }
-
-        // browse the durable sub - this test is to verify that browsing (which calls createTopicMessageStore)
-        //in KahaDBStore will not create a brand new store (ie uses the cache) If the cache is not used,
-        //then the statistics won't be updated properly because a new store would overwrite the old store
-        //which is still in use
-        ObjectName[] subs = broker.getAdminView().getDurableTopicSubscribers();
-
-        try {
-            // publish a bunch of non-persistent messages to fill up the temp
-            // store
-            MessageProducer prod = session.createProducer(topic);
-            prod.setDeliveryMode(DeliveryMode.PERSISTENT);
-            for (int i = 0; i < publishSize; i++) {
-                prod.send(createMessage(session));
-            }
-
-            //verify the view has expected messages
-            assertEquals(subNames.length, subs.length);
-            ObjectName subName = subs[0];
-            DurableSubscriptionViewMBean sub = (DurableSubscriptionViewMBean)
-                    broker.getManagementContext().newProxyInstance(subName, DurableSubscriptionViewMBean.class, true);
-            CompositeData[] data  = sub.browse();
-            assertNotNull(data);
-            assertEquals(expectedSize, data.length);
-
-        } finally {
-            session.close();
-        }
-
-        return dest;
+    protected Destination publishTestMessagesDurable(Connection connection, String[] subNames,
+            int publishSize, int expectedSize, AtomicLong publishedMessageSize) throws Exception {
+       return publishTestMessagesDurable(connection, subNames, defaultTopicName,
+                publishSize, expectedSize, AbstractStoreStatTestSupport.defaultMessageSize,
+                publishedMessageSize, true);
     }
 
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/734fb7dd/activemq-unit-tests/src/test/java/org/apache/activemq/store/AbstractStoreStatTestSupport.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/store/AbstractStoreStatTestSupport.java b/activemq-unit-tests/src/test/java/org/apache/activemq/store/AbstractStoreStatTestSupport.java
new file mode 100644
index 0000000..3f0e7c1
--- /dev/null
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/store/AbstractStoreStatTestSupport.java
@@ -0,0 +1,268 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.store;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+import java.net.URI;
+import java.util.Enumeration;
+import java.util.Random;
+import java.util.concurrent.atomic.AtomicLong;
+
+import javax.jms.BytesMessage;
+import javax.jms.Connection;
+import javax.jms.DeliveryMode;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.QueueBrowser;
+import javax.jms.QueueSession;
+import javax.jms.Session;
+import javax.jms.Topic;
+import javax.jms.TopicSession;
+import javax.jms.TopicSubscriber;
+import javax.management.ObjectName;
+import javax.management.openmbean.CompositeData;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.jmx.DurableSubscriptionViewMBean;
+import org.apache.activemq.broker.region.Destination;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQMessage;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ActiveMQTopic;
+
+/**
+ *
+ *
+ */
+public abstract class AbstractStoreStatTestSupport {
+
+    protected static int defaultMessageSize = 1000;
+
+    protected abstract BrokerService getBroker();
+
+    protected abstract URI getBrokerConnectURI();
+
+    protected Destination consumeTestQueueMessages(String queueName) throws Exception {
+        // create a new queue
+        final ActiveMQDestination activeMqQueue = new ActiveMQQueue(
+                queueName);
+
+        Destination dest = getBroker().getDestination(activeMqQueue);
+
+        // Start the connection
+        Connection connection = new ActiveMQConnectionFactory(getBrokerConnectURI())
+        .createConnection();
+        connection.setClientID("clientId2" + queueName);
+        connection.start();
+        Session session = connection.createSession(false,
+                QueueSession.AUTO_ACKNOWLEDGE);
+        Queue queue = session.createQueue(queueName);
+
+        try {
+            MessageConsumer consumer = session.createConsumer(queue);
+            for (int i = 0; i < 200; i++) {
+                consumer.receive();
+            }
+
+        } finally {
+            connection.stop();
+        }
+
+        return dest;
+    }
+
+    protected Destination browseTestQueueMessages(String queueName) throws Exception {
+        // create a new queue
+        final ActiveMQDestination activeMqQueue = new ActiveMQQueue(
+                queueName);
+
+        Destination dest = getBroker().getDestination(activeMqQueue);
+
+        // Start the connection
+        Connection connection = new ActiveMQConnectionFactory(getBrokerConnectURI())
+        .createConnection();
+        connection.setClientID("clientId2" + queueName);
+        connection.start();
+        Session session = connection.createSession(false,
+                QueueSession.AUTO_ACKNOWLEDGE);
+        Queue queue = session.createQueue(queueName);
+
+        try {
+            QueueBrowser queueBrowser = session.createBrowser(queue);
+            @SuppressWarnings("unchecked")
+            Enumeration<Message> messages = queueBrowser.getEnumeration();
+            while (messages.hasMoreElements()) {
+                messages.nextElement();
+            }
+
+        } finally {
+            connection.stop();
+        }
+
+        return dest;
+    }
+
+    protected Destination consumeDurableTestMessages(Connection connection, String sub,
+            int size, String topicName, AtomicLong publishedMessageSize) throws Exception {
+        // create a new queue
+        final ActiveMQDestination activeMqTopic = new ActiveMQTopic(
+                topicName);
+
+        Destination dest = getBroker().getDestination(activeMqTopic);
+
+        Session session = connection.createSession(false,
+                QueueSession.AUTO_ACKNOWLEDGE);
+        Topic topic = session.createTopic(topicName);
+
+        try {
+            TopicSubscriber consumer = session.createDurableSubscriber(topic, sub);
+            for (int i = 0; i < size; i++) {
+                ActiveMQMessage message = (ActiveMQMessage) consumer.receive();
+                if (publishedMessageSize != null) {
+                    publishedMessageSize.addAndGet(-message.getSize());
+                }
+            }
+
+        } finally {
+            session.close();
+        }
+
+        return dest;
+    }
+
+    protected org.apache.activemq.broker.region.Queue publishTestQueueMessages(int count, String queueName,
+            int deliveryMode, int messageSize, AtomicLong publishedMessageSize) throws Exception {
+        // create a new queue
+        final ActiveMQDestination activeMqQueue = new ActiveMQQueue(
+                queueName);
+
+        Destination dest = getBroker().getDestination(activeMqQueue);
+
+        // Start the connection
+        Connection connection = new ActiveMQConnectionFactory(getBrokerConnectURI())
+        .createConnection();
+        connection.setClientID("clientId" + queueName);
+        connection.start();
+        Session session = connection.createSession(false,
+                QueueSession.AUTO_ACKNOWLEDGE);
+        Queue queue = session.createQueue(queueName);
+
+        try {
+            // publish a bunch of non-persistent messages to fill up the temp
+            // store
+            MessageProducer prod = session.createProducer(queue);
+            prod.setDeliveryMode(deliveryMode);
+            for (int i = 0; i < count; i++) {
+                prod.send(createMessage(session, messageSize, publishedMessageSize));
+            }
+
+        } finally {
+            connection.close();
+        }
+
+        return (org.apache.activemq.broker.region.Queue) dest;
+    }
+
+    protected org.apache.activemq.broker.region.Topic publishTestMessagesDurable(Connection connection, String[] subNames, String topicName,
+            int publishSize, int expectedSize, int messageSize, AtomicLong publishedMessageSize,
+            boolean verifyBrowsing) throws Exception {
+        return this.publishTestMessagesDurable(connection, subNames, topicName, publishSize, expectedSize, messageSize,
+                publishedMessageSize, verifyBrowsing, DeliveryMode.PERSISTENT);
+    }
+
+    protected org.apache.activemq.broker.region.Topic publishTestMessagesDurable(Connection connection, String[] subNames, String topicName,
+            int publishSize, int expectedSize, int messageSize, AtomicLong publishedMessageSize,
+            boolean verifyBrowsing, int deliveryMode) throws Exception {
+        // create a new queue
+        final ActiveMQDestination activeMqTopic = new ActiveMQTopic(
+                topicName);
+
+        Destination dest = getBroker().getDestination(activeMqTopic);
+
+        // Start the connection
+
+        Session session = connection.createSession(false,
+                TopicSession.AUTO_ACKNOWLEDGE);
+        Topic topic = session.createTopic(topicName);
+        for (String subName : subNames) {
+            session.createDurableSubscriber(topic, subName);
+        }
+
+        ObjectName[] subs = null;
+        if (verifyBrowsing) {
+            // browse the durable sub - this test is to verify that browsing (which calls createTopicMessageStore)
+            //in KahaDBStore will not create a brand new store (ie uses the cache) If the cache is not used,
+            //then the statistics won't be updated properly because a new store would overwrite the old store
+            //which is still in use
+            subs = getBroker().getAdminView().getDurableTopicSubscribers();
+        }
+
+        try {
+            // publish a bunch of non-persistent messages to fill up the temp
+            // store
+            MessageProducer prod = session.createProducer(topic);
+            prod.setDeliveryMode(deliveryMode);
+            for (int i = 0; i < publishSize; i++) {
+                prod.send(createMessage(session, messageSize, publishedMessageSize));
+            }
+
+            //verify the view has expected messages
+            if (verifyBrowsing) {
+                assertNotNull(subs);
+                assertEquals(subNames.length, subs.length);
+                ObjectName subName = subs[0];
+                DurableSubscriptionViewMBean sub = (DurableSubscriptionViewMBean)
+                        getBroker().getManagementContext().newProxyInstance(subName, DurableSubscriptionViewMBean.class, true);
+                CompositeData[] data  = sub.browse();
+                assertNotNull(data);
+                assertEquals(expectedSize, data.length);
+            }
+
+        } finally {
+            session.close();
+        }
+
+        return (org.apache.activemq.broker.region.Topic) dest;
+    }
+
+    /**
+     * Generate random messages between 100 bytes and messageSize
+     * @param session
+     * @return
+     * @throws JMSException
+     */
+    protected BytesMessage createMessage(Session session, int messageSize, AtomicLong publishedMessageSize) throws JMSException {
+        final BytesMessage message = session.createBytesMessage();
+        final Random rn = new Random();
+        int size = rn.nextInt(messageSize - 100);
+        if (publishedMessageSize != null) {
+            publishedMessageSize.addAndGet(size);
+        }
+
+        final byte[] data = new byte[size];
+        final Random rng = new Random();
+        rng.nextBytes(data);
+        message.writeBytes(data);
+        return message;
+    }
+}