You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cs...@apache.org on 2015/08/08 20:00:08 UTC
activemq git commit: https://issues.apache.org/jira/browse/AMQ-5748
Repository: activemq
Updated Branches:
refs/heads/master a3c8bee1f -> a49d46e3c
https://issues.apache.org/jira/browse/AMQ-5748
Updating MemoryTopicMessageStore to decrement store statistics on cache
eviction. Updating KahaDBMessageStoreSizeStatTest to account for the
fact that a LRU cache is used so the last 100 messages are kept in
memroy.
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/a49d46e3
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/a49d46e3
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/a49d46e3
Branch: refs/heads/master
Commit: a49d46e3ca689af6f2cb721c457be97d654b2492
Parents: a3c8bee
Author: Christopher L. Shannon (cshannon) <ch...@gmail.com>
Authored: Sat Aug 8 17:55:41 2015 +0000
Committer: Christopher L. Shannon (cshannon) <ch...@gmail.com>
Committed: Sat Aug 8 17:59:19 2015 +0000
----------------------------------------------------------------------
.../store/memory/MemoryMessageStore.java | 16 +-
.../store/memory/MemoryTopicMessageStore.java | 56 ++++++-
.../store/AbstractMessageStoreSizeStatTest.java | 150 ++++++++++++++-----
.../kahadb/KahaDBMessageStoreSizeStatTest.java | 4 +-
.../MultiKahaDBMessageStoreSizeStatTest.java | 12 +-
.../memory/MemoryMessageStoreSizeStatTest.java | 57 ++++++-
6 files changed, 242 insertions(+), 53 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq/blob/a49d46e3/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryMessageStore.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryMessageStore.java b/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryMessageStore.java
index 51006c2..3989646 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryMessageStore.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryMessageStore.java
@@ -57,8 +57,7 @@ public class MemoryMessageStore extends AbstractMessageStore {
public synchronized void addMessage(ConnectionContext context, Message message) throws IOException {
synchronized (messageTable) {
messageTable.put(message.getMessageId(), message);
- getMessageStoreStatistics().getMessageCount().increment();
- getMessageStoreStatistics().getMessageSize().addSize(message.getSize());
+ incMessageStoreStatistics(message);
}
message.incrementReferenceCount();
message.getMessageId().setFutureOrSequenceLong(sequenceId++);
@@ -94,8 +93,7 @@ public class MemoryMessageStore extends AbstractMessageStore {
Message removed = messageTable.remove(msgId);
if( removed !=null ) {
removed.decrementReferenceCount();
- getMessageStoreStatistics().getMessageCount().decrement();
- getMessageStoreStatistics().getMessageSize().addSize(-removed.getSize());
+ decMessageStoreStatistics(removed);
}
if ((lastBatchId != null && lastBatchId.equals(msgId)) || messageTable.isEmpty()) {
lastBatchId = null;
@@ -200,4 +198,14 @@ public class MemoryMessageStore extends AbstractMessageStore {
}
}
+ protected final void incMessageStoreStatistics(Message message) {
+ getMessageStoreStatistics().getMessageCount().increment();
+ getMessageStoreStatistics().getMessageSize().addSize(message.getSize());
+ }
+
+ protected final void decMessageStoreStatistics(Message message) {
+ getMessageStoreStatistics().getMessageCount().decrement();
+ getMessageStoreStatistics().getMessageSize().addSize(-message.getSize());
+ }
+
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/a49d46e3/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryTopicMessageStore.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryTopicMessageStore.java b/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryTopicMessageStore.java
index 0debfe6..142547f 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryTopicMessageStore.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryTopicMessageStore.java
@@ -22,6 +22,7 @@ import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Map.Entry;
+
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.Message;
@@ -29,36 +30,47 @@ import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.command.SubscriptionInfo;
import org.apache.activemq.store.MessageRecoveryListener;
+import org.apache.activemq.store.MessageStoreStatistics;
import org.apache.activemq.store.TopicMessageStore;
import org.apache.activemq.util.LRUCache;
import org.apache.activemq.util.SubscriptionKey;
/**
- *
+ *
*/
public class MemoryTopicMessageStore extends MemoryMessageStore implements TopicMessageStore {
private Map<SubscriptionKey, SubscriptionInfo> subscriberDatabase;
private Map<SubscriptionKey, MemoryTopicSub> topicSubMap;
+ private final Map<MessageId, Message> originalMessageTable;
public MemoryTopicMessageStore(ActiveMQDestination destination) {
- this(destination, new LRUCache<MessageId, Message>(100, 100, 0.75f, false), makeSubscriptionInfoMap());
+ this(destination, new MemoryTopicMessageStoreLRUCache(100, 100, 0.75f, false), makeSubscriptionInfoMap());
+
+ //Set the messageStoreStatistics after the super class is initialized so that the stats can be
+ //properly updated on cache eviction
+ MemoryTopicMessageStoreLRUCache cache = (MemoryTopicMessageStoreLRUCache) originalMessageTable;
+ cache.setMessageStoreStatistics(messageStoreStatistics);
}
public MemoryTopicMessageStore(ActiveMQDestination destination, Map<MessageId, Message> messageTable, Map<SubscriptionKey, SubscriptionInfo> subscriberDatabase) {
super(destination, messageTable);
this.subscriberDatabase = subscriberDatabase;
this.topicSubMap = makeSubMap();
+ //this is only necessary so that messageStoreStatistics can be set if necessary
+ //We need the original reference since messageTable is wrapped in a synchronized map in the parent class
+ this.originalMessageTable = messageTable;
}
protected static Map<SubscriptionKey, SubscriptionInfo> makeSubscriptionInfoMap() {
return Collections.synchronizedMap(new HashMap<SubscriptionKey, SubscriptionInfo>());
}
-
+
protected static Map<SubscriptionKey, MemoryTopicSub> makeSubMap() {
return Collections.synchronizedMap(new HashMap<SubscriptionKey, MemoryTopicSub>());
}
+ @Override
public synchronized void addMessage(ConnectionContext context, Message message) throws IOException {
super.addMessage(context, message);
for (Iterator<MemoryTopicSub> i = topicSubMap.values().iterator(); i.hasNext();) {
@@ -67,6 +79,7 @@ public class MemoryTopicMessageStore extends MemoryMessageStore implements Topic
}
}
+ @Override
public synchronized void acknowledge(ConnectionContext context, String clientId, String subscriptionName,
MessageId messageId, MessageAck ack) throws IOException {
SubscriptionKey key = new SubscriptionKey(clientId, subscriptionName);
@@ -76,10 +89,12 @@ public class MemoryTopicMessageStore extends MemoryMessageStore implements Topic
}
}
+ @Override
public synchronized SubscriptionInfo lookupSubscription(String clientId, String subscriptionName) throws IOException {
return subscriberDatabase.get(new SubscriptionKey(clientId, subscriptionName));
}
+ @Override
public synchronized void addSubscription(SubscriptionInfo info, boolean retroactive) throws IOException {
SubscriptionKey key = new SubscriptionKey(info);
MemoryTopicSub sub = new MemoryTopicSub();
@@ -93,12 +108,14 @@ public class MemoryTopicMessageStore extends MemoryMessageStore implements Topic
subscriberDatabase.put(key, info);
}
+ @Override
public synchronized void deleteSubscription(String clientId, String subscriptionName) {
org.apache.activemq.util.SubscriptionKey key = new SubscriptionKey(clientId, subscriptionName);
subscriberDatabase.remove(key);
topicSubMap.remove(key);
}
+ @Override
public synchronized void recoverSubscription(String clientId, String subscriptionName, MessageRecoveryListener listener) throws Exception {
MemoryTopicSub sub = topicSubMap.get(new SubscriptionKey(clientId, subscriptionName));
if (sub != null) {
@@ -106,16 +123,19 @@ public class MemoryTopicMessageStore extends MemoryMessageStore implements Topic
}
}
+ @Override
public synchronized void delete() {
super.delete();
subscriberDatabase.clear();
topicSubMap.clear();
}
+ @Override
public SubscriptionInfo[] getAllSubscriptions() throws IOException {
return subscriberDatabase.values().toArray(new SubscriptionInfo[subscriberDatabase.size()]);
}
+ @Override
public synchronized int getMessageCount(String clientId, String subscriberName) throws IOException {
int result = 0;
MemoryTopicSub sub = topicSubMap.get(new SubscriptionKey(clientId, subscriberName));
@@ -125,6 +145,7 @@ public class MemoryTopicMessageStore extends MemoryMessageStore implements Topic
return result;
}
+ @Override
public synchronized void recoverNextMessages(String clientId, String subscriptionName, int maxReturned, MessageRecoveryListener listener) throws Exception {
MemoryTopicSub sub = this.topicSubMap.get(new SubscriptionKey(clientId, subscriptionName));
if (sub != null) {
@@ -132,10 +153,39 @@ public class MemoryTopicMessageStore extends MemoryMessageStore implements Topic
}
}
+ @Override
public void resetBatching(String clientId, String subscriptionName) {
MemoryTopicSub sub = topicSubMap.get(new SubscriptionKey(clientId, subscriptionName));
if (sub != null) {
sub.resetBatching();
}
}
+
+ /**
+ * Since we initialize the store with a LRUCache in some cases, we need to account for cache evictions
+ * when computing the message store statistics.
+ *
+ */
+ private static class MemoryTopicMessageStoreLRUCache extends LRUCache<MessageId, Message> {
+ private static final long serialVersionUID = -342098639681884413L;
+ private MessageStoreStatistics messageStoreStatistics;
+
+ public MemoryTopicMessageStoreLRUCache(int initialCapacity, int maximumCacheSize,
+ float loadFactor, boolean accessOrder) {
+ super(initialCapacity, maximumCacheSize, loadFactor, accessOrder);
+ }
+
+ public void setMessageStoreStatistics(
+ MessageStoreStatistics messageStoreStatistics) {
+ this.messageStoreStatistics = messageStoreStatistics;
+ }
+
+ @Override
+ protected void onCacheEviction(Map.Entry<MessageId, Message> eldest) {
+ if (messageStoreStatistics != null) {
+ messageStoreStatistics.getMessageCount().decrement();
+ messageStoreStatistics.getMessageSize().addSize(-eldest.getValue().getSize());
+ }
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/a49d46e3/activemq-unit-tests/src/test/java/org/apache/activemq/store/AbstractMessageStoreSizeStatTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/store/AbstractMessageStoreSizeStatTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/store/AbstractMessageStoreSizeStatTest.java
index 1b927b4..944d183 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/store/AbstractMessageStoreSizeStatTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/store/AbstractMessageStoreSizeStatTest.java
@@ -35,6 +35,7 @@ import javax.jms.QueueSession;
import javax.jms.Session;
import javax.jms.Topic;
import javax.jms.TopicSession;
+import javax.jms.TopicSubscriber;
import javax.management.ObjectName;
import javax.management.openmbean.CompositeData;
import javax.management.openmbean.TabularData;
@@ -47,6 +48,8 @@ import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.util.Wait;
+import org.apache.activemq.util.Wait.Condition;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -67,6 +70,7 @@ public abstract class AbstractMessageStoreSizeStatTest {
protected BrokerService broker;
protected URI brokerConnectURI;
protected String defaultQueueName = "test.queue";
+ protected String defaultTopicName = "test.topic";
protected static int messageSize = 1000;
@Before
@@ -100,34 +104,67 @@ public abstract class AbstractMessageStoreSizeStatTest {
@Test
public void testMessageSize() throws Exception {
- Destination dest = publishTestMessages(200);
+ Destination dest = publishTestQueueMessages(200);
verifyStats(dest, 200, 200 * messageSize);
}
@Test
public void testMessageSizeAfterConsumption() throws Exception {
- Destination dest = publishTestMessages(200);
+ Destination dest = publishTestQueueMessages(200);
verifyStats(dest, 200, 200 * messageSize);
- consumeTestMessages();
- Thread.sleep(3000);
+ consumeTestQueueMessages();
+
verifyStats(dest, 0, 0);
}
@Test
- public void testMessageSizeDurable() throws Exception {
+ public void testMessageSizeOneDurable() throws Exception {
+
+ Connection connection = new ActiveMQConnectionFactory(brokerConnectURI).createConnection();
+ connection.setClientID("clientId");
+ connection.start();
- Destination dest = publishTestMessagesDurable();
+ Destination dest = publishTestMessagesDurable(connection, new String[] {"sub1"}, 200, 200);
//verify the count and size
verifyStats(dest, 200, 200 * messageSize);
+ //consume all messages
+ consumeDurableTestMessages(connection, "sub1", 200);
+
+ //All messages should now be gone
+ verifyStats(dest, 0, 0);
+
+ connection.close();
+ }
+
+ @Test(timeout=10000)
+ public void testMessageSizeTwoDurables() throws Exception {
+
+ Connection connection = new ActiveMQConnectionFactory(brokerConnectURI).createConnection();
+ connection.setClientID("clientId");
+ connection.start();
+
+ Destination dest = publishTestMessagesDurable(connection, new String[] {"sub1", "sub2"}, 200, 200);
+
+ //verify the count and size
+ verifyStats(dest, 200, 200 * messageSize);
+
+ //consume messages just for sub1
+ consumeDurableTestMessages(connection, "sub1", 200);
+
+ //There is still a durable that hasn't consumed so the messages should exist
+ verifyStats(dest, 200, 200 * messageSize);
+
+ connection.stop();
+
}
@Test
public void testMessageSizeAfterDestinationDeletion() throws Exception {
- Destination dest = publishTestMessages(200);
+ Destination dest = publishTestQueueMessages(200);
verifyStats(dest, 200, 200 * messageSize);
//check that the size is 0 after deletion
@@ -135,18 +172,34 @@ public abstract class AbstractMessageStoreSizeStatTest {
verifyStats(dest, 0, 0);
}
- protected void verifyStats(Destination dest, int count, long minimumSize) throws Exception {
- MessageStore messageStore = dest.getMessageStore();
- MessageStoreStatistics storeStats = dest.getMessageStore().getMessageStoreStatistics();
- assertEquals(messageStore.getMessageCount(), count);
- assertEquals(messageStore.getMessageCount(),
- storeStats.getMessageCount().getCount());
- assertEquals(messageStore.getMessageSize(),
+ protected void verifyStats(Destination dest, final int count, final long minimumSize) throws Exception {
+ final MessageStore messageStore = dest.getMessageStore();
+ final MessageStoreStatistics storeStats = dest.getMessageStore().getMessageStoreStatistics();
+
+ Wait.waitFor(new Condition() {
+ @Override
+ public boolean isSatisified() throws Exception {
+ return (count == messageStore.getMessageCount()) && (messageStore.getMessageCount() ==
+ storeStats.getMessageCount().getCount()) && (messageStore.getMessageSize() ==
messageStore.getMessageStoreStatistics().getMessageSize().getTotalSize());
+ }
+ });
+
if (count > 0) {
assertTrue(storeStats.getMessageSize().getTotalSize() > minimumSize);
+ Wait.waitFor(new Condition() {
+ @Override
+ public boolean isSatisified() throws Exception {
+ return storeStats.getMessageSize().getTotalSize() > minimumSize;
+ }
+ });
} else {
- assertEquals(storeStats.getMessageSize().getTotalSize(), 0);
+ Wait.waitFor(new Condition() {
+ @Override
+ public boolean isSatisified() throws Exception {
+ return storeStats.getMessageSize().getTotalSize() == 0;
+ }
+ });
}
}
@@ -166,11 +219,11 @@ public abstract class AbstractMessageStoreSizeStatTest {
}
- protected Destination publishTestMessages(int count) throws Exception {
- return publishTestMessages(count, defaultQueueName);
+ protected Destination publishTestQueueMessages(int count) throws Exception {
+ return publishTestQueueMessages(count, defaultQueueName);
}
- protected Destination publishTestMessages(int count, String queueName) throws Exception {
+ protected Destination publishTestQueueMessages(int count, String queueName) throws Exception {
// create a new queue
final ActiveMQDestination activeMqQueue = new ActiveMQQueue(
queueName);
@@ -196,17 +249,21 @@ public abstract class AbstractMessageStoreSizeStatTest {
}
} finally {
- connection.stop();
+ connection.close();
}
return dest;
}
- protected Destination consumeTestMessages() throws Exception {
- return consumeTestMessages(defaultQueueName);
+ protected Destination consumeTestQueueMessages() throws Exception {
+ return consumeTestQueueMessages(defaultQueueName);
+ }
+
+ protected Destination consumeDurableTestMessages(Connection connection, String sub, int size) throws Exception {
+ return consumeDurableTestMessages(connection, sub, size, defaultTopicName);
}
- protected Destination consumeTestMessages(String queueName) throws Exception {
+ protected Destination consumeTestQueueMessages(String queueName) throws Exception {
// create a new queue
final ActiveMQDestination activeMqQueue = new ActiveMQQueue(
queueName);
@@ -235,22 +292,45 @@ public abstract class AbstractMessageStoreSizeStatTest {
return dest;
}
- protected Destination publishTestMessagesDurable() throws Exception {
+ protected Destination consumeDurableTestMessages(Connection connection, String sub, int size, String topicName) throws Exception {
// create a new queue
final ActiveMQDestination activeMqTopic = new ActiveMQTopic(
- "test.topic");
+ topicName);
+
+ Destination dest = broker.getDestination(activeMqTopic);
+
+ Session session = connection.createSession(false,
+ QueueSession.AUTO_ACKNOWLEDGE);
+ Topic topic = session.createTopic(topicName);
+
+ try {
+ TopicSubscriber consumer = session.createDurableSubscriber(topic, sub);
+ for (int i = 0; i < size; i++) {
+ consumer.receive();
+ }
+
+ } finally {
+ session.close();
+ }
+
+ return dest;
+ }
+
+ protected Destination publishTestMessagesDurable(Connection connection, String[] subNames, int publishSize, int expectedSize) throws Exception {
+ // create a new queue
+ final ActiveMQDestination activeMqTopic = new ActiveMQTopic(
+ defaultTopicName);
Destination dest = broker.getDestination(activeMqTopic);
// Start the connection
- Connection connection = new ActiveMQConnectionFactory(brokerConnectURI)
- .createConnection();
- connection.setClientID("clientId");
- connection.start();
+
Session session = connection.createSession(false,
TopicSession.AUTO_ACKNOWLEDGE);
- Topic topic = session.createTopic("test.topic");
- session.createDurableSubscriber(topic, "sub1");
+ Topic topic = session.createTopic(defaultTopicName);
+ for (String subName : subNames) {
+ session.createDurableSubscriber(topic, subName);
+ }
// browse the durable sub - this test is to verify that browsing (which calls createTopicMessageStore)
//in KahaDBStore will not create a brand new store (ie uses the cache) If the cache is not used,
@@ -263,21 +343,21 @@ public abstract class AbstractMessageStoreSizeStatTest {
// store
MessageProducer prod = session.createProducer(topic);
prod.setDeliveryMode(DeliveryMode.PERSISTENT);
- for (int i = 0; i < 200; i++) {
+ for (int i = 0; i < publishSize; i++) {
prod.send(createMessage(session));
}
- //verify the view has 200 messages
- assertEquals(1, subs.length);
+ //verify the view has expected messages
+ assertEquals(subNames.length, subs.length);
ObjectName subName = subs[0];
DurableSubscriptionViewMBean sub = (DurableSubscriptionViewMBean)
broker.getManagementContext().newProxyInstance(subName, DurableSubscriptionViewMBean.class, true);
CompositeData[] data = sub.browse();
assertNotNull(data);
- assertEquals(200, data.length);
+ assertEquals(expectedSize, data.length);
} finally {
- connection.stop();
+ session.close();
}
return dest;
http://git-wip-us.apache.org/repos/asf/activemq/blob/a49d46e3/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBMessageStoreSizeStatTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBMessageStoreSizeStatTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBMessageStoreSizeStatTest.java
index bb46f20..28884e6 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBMessageStoreSizeStatTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBMessageStoreSizeStatTest.java
@@ -64,7 +64,7 @@ public class KahaDBMessageStoreSizeStatTest extends
@Test
public void testMessageSizeAfterRestartAndPublish() throws Exception {
- Destination dest = publishTestMessages(200);
+ Destination dest = publishTestQueueMessages(200);
// verify the count and size
verifyStats(dest, 200, 200 * messageSize);
@@ -72,7 +72,7 @@ public class KahaDBMessageStoreSizeStatTest extends
// stop, restart broker and publish more messages
stopBroker();
this.setUpBroker(false);
- dest = publishTestMessages(200);
+ dest = publishTestQueueMessages(200);
// verify the count and size
verifyStats(dest, 400, 400 * messageSize);
http://git-wip-us.apache.org/repos/asf/activemq/blob/a49d46e3/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/MultiKahaDBMessageStoreSizeStatTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/MultiKahaDBMessageStoreSizeStatTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/MultiKahaDBMessageStoreSizeStatTest.java
index 4342e1d..849a91b 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/MultiKahaDBMessageStoreSizeStatTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/MultiKahaDBMessageStoreSizeStatTest.java
@@ -84,7 +84,7 @@ public class MultiKahaDBMessageStoreSizeStatTest extends
@Test
public void testMessageSizeAfterRestartAndPublish() throws Exception {
- Destination dest = publishTestMessages(200);
+ Destination dest = publishTestQueueMessages(200);
// verify the count and size
verifyStats(dest, 200, 200 * messageSize);
@@ -92,7 +92,7 @@ public class MultiKahaDBMessageStoreSizeStatTest extends
// stop, restart broker and publish more messages
stopBroker();
this.setUpBroker(false);
- dest = publishTestMessages(200);
+ dest = publishTestQueueMessages(200);
// verify the count and size
verifyStats(dest, 400, 400 * messageSize);
@@ -102,13 +102,13 @@ public class MultiKahaDBMessageStoreSizeStatTest extends
@Test
public void testMessageSizeAfterRestartAndPublishMultiQueue() throws Exception {
- Destination dest = publishTestMessages(200);
+ Destination dest = publishTestQueueMessages(200);
// verify the count and size
verifyStats(dest, 200, 200 * messageSize);
assertTrue(broker.getPersistenceAdapter().size() > 200 * messageSize);
- Destination dest2 = publishTestMessages(200, "test.queue2");
+ Destination dest2 = publishTestQueueMessages(200, "test.queue2");
// verify the count and size
verifyStats(dest2, 200, 200 * messageSize);
@@ -117,8 +117,8 @@ public class MultiKahaDBMessageStoreSizeStatTest extends
// stop, restart broker and publish more messages
stopBroker();
this.setUpBroker(false);
- dest = publishTestMessages(200);
- dest2 = publishTestMessages(200, "test.queue2");
+ dest = publishTestQueueMessages(200);
+ dest2 = publishTestQueueMessages(200, "test.queue2");
// verify the count and size after publishing messages
verifyStats(dest, 400, 400 * messageSize);
http://git-wip-us.apache.org/repos/asf/activemq/blob/a49d46e3/activemq-unit-tests/src/test/java/org/apache/activemq/store/memory/MemoryMessageStoreSizeStatTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/store/memory/MemoryMessageStoreSizeStatTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/store/memory/MemoryMessageStoreSizeStatTest.java
index 755936c..dc6ff8b 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/store/memory/MemoryMessageStoreSizeStatTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/store/memory/MemoryMessageStoreSizeStatTest.java
@@ -18,16 +18,21 @@ package org.apache.activemq.store.memory;
import java.io.IOException;
+import javax.jms.Connection;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.store.AbstractMessageStoreSizeStatTest;
+import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* This test checks that KahaDB properly sets the new storeMessageSize statistic.
- *
+ *
* AMQ-5748
- *
+ *
*/
public class MemoryMessageStoreSizeStatTest extends AbstractMessageStoreSizeStatTest {
protected static final Logger LOG = LoggerFactory
@@ -39,7 +44,53 @@ public class MemoryMessageStoreSizeStatTest extends AbstractMessageStoreSizeStat
broker.setPersistenceAdapter(new MemoryPersistenceAdapter());
}
+ @Override
+ @Test(timeout=10000)
+ public void testMessageSizeOneDurable() throws Exception {
+ Connection connection = new ActiveMQConnectionFactory(brokerConnectURI).createConnection();
+ connection.setClientID("clientId");
+ connection.start();
+
+ //The expected value is only 100 because for durables a LRUCache is being used
+ //with a max size of 100
+ Destination dest = publishTestMessagesDurable(connection, new String[] {"sub1"}, 200, 100);
+
+ //verify the count and size, should be 100 because of the LRUCache
+ verifyStats(dest, 100, 100 * messageSize);
+
+ consumeDurableTestMessages(connection, "sub1", 100);
+
+ //Since an LRU cache is used and messages are kept in memory, this should be 100 still
+ verifyStats(dest, 100, 100 * messageSize);
+
+ connection.stop();
+
+ }
+
+ @Override
+ @Test(timeout=10000)
+ public void testMessageSizeTwoDurables() throws Exception {
+ Connection connection = new ActiveMQConnectionFactory(brokerConnectURI).createConnection();
+ connection.setClientID("clientId");
+ connection.start();
+
+ //The expected value is only 100 because for durables a LRUCache is being used
+ //with a max size of 100, so only 100 messages are kept
+ Destination dest = publishTestMessagesDurable(connection, new String[] {"sub1", "sub2"}, 200, 100);
+
+ //verify the count and size
+ verifyStats(dest, 100, 100 * messageSize);
+
+ //consume for sub1
+ consumeDurableTestMessages(connection, "sub1", 100);
+
+ //Should be 100 messages still
+ verifyStats(dest, 100, 100 * messageSize);
+
+ connection.stop();
+
+ }
+
-
}