You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cs...@apache.org on 2015/09/09 20:12:48 UTC
[2/3] activemq git commit:
https://issues.apache.org/jira/browse/AMQ-5923
http://git-wip-us.apache.org/repos/asf/activemq/blob/734fb7dd/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/index/ListNode.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/index/ListNode.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/index/ListNode.java
index 6911e4f..8fed042 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/index/ListNode.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/index/ListNode.java
@@ -66,14 +66,17 @@ public final class ListNode<Key, Value> {
this.value = value;
}
+ @Override
public Key getKey() {
return key;
}
+ @Override
public Value getValue() {
return value;
}
+ @Override
public Value setValue(Value value) {
Value oldValue = this.value;
this.value = value;
@@ -98,10 +101,12 @@ public final class ListNode<Key, Value> {
index = current.getContainingList();
}
+ @Override
public boolean hasNext() {
return nextEntry != null;
}
+ @Override
public ListNode<Key, Value> next() {
ListNode<Key, Value> current = nextEntry;
if (current != null) {
@@ -121,6 +126,7 @@ public final class ListNode<Key, Value> {
return current;
}
+ @Override
public void remove() {
throw new UnsupportedOperationException();
}
@@ -171,6 +177,7 @@ public final class ListNode<Key, Value> {
return result;
}
+ @Override
public boolean hasNext() {
if (nextEntry == null) {
nextEntry = getFromNextNode();
@@ -178,6 +185,7 @@ public final class ListNode<Key, Value> {
return nextEntry != null;
}
+ @Override
public Entry<Key, Value> next() {
if (nextEntry != null) {
entryToRemove = nextEntry;
@@ -188,6 +196,7 @@ public final class ListNode<Key, Value> {
}
}
+ @Override
public void remove() {
if (entryToRemove == null) {
throw new IllegalStateException("can only remove once, call hasNext();next() again");
@@ -228,7 +237,7 @@ public final class ListNode<Key, Value> {
currentNode = previousNode;
}
}
- targetList.onRemove();
+ targetList.onRemove(entryToRemove);
if (toRemoveNode != null) {
tx.free(toRemoveNode.getPage());
@@ -262,6 +271,7 @@ public final class ListNode<Key, Value> {
this.valueMarshaller = valueMarshaller;
}
+ @Override
public void writePayload(ListNode<Key, Value> node, DataOutput os) throws IOException {
os.writeLong(node.next);
short count = (short) node.entries.size(); // cast may truncate
@@ -279,6 +289,7 @@ public final class ListNode<Key, Value> {
}
}
+ @Override
@SuppressWarnings({ "unchecked", "rawtypes" })
public ListNode<Key, Value> readPayload(DataInput is) throws IOException {
ListNode<Key, Value> node = new ListNode<Key, Value>();
http://git-wip-us.apache.org/repos/asf/activemq/blob/734fb7dd/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/plist/PListImpl.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/plist/PListImpl.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/plist/PListImpl.java
index eafb2ac..b45692a 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/plist/PListImpl.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/plist/PListImpl.java
@@ -21,22 +21,22 @@ import java.io.DataOutput;
import java.io.IOException;
import java.util.Iterator;
import java.util.Map;
+import java.util.Map.Entry;
import java.util.NoSuchElementException;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
-import org.apache.activemq.broker.region.MessageReference;
-import org.apache.activemq.command.Message;
+import org.apache.activemq.management.SizeStatisticImpl;
import org.apache.activemq.store.PList;
import org.apache.activemq.store.PListEntry;
import org.apache.activemq.store.kahadb.disk.index.ListIndex;
+import org.apache.activemq.store.kahadb.disk.index.ListNode;
import org.apache.activemq.store.kahadb.disk.journal.Location;
import org.apache.activemq.store.kahadb.disk.page.Transaction;
-import org.apache.activemq.util.ByteSequence;
import org.apache.activemq.store.kahadb.disk.util.LocationMarshaller;
import org.apache.activemq.store.kahadb.disk.util.StringMarshaller;
-import org.apache.activemq.wireformat.WireFormat;
+import org.apache.activemq.util.ByteSequence;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -45,6 +45,7 @@ public class PListImpl extends ListIndex<String, Location> implements PList {
final PListStoreImpl store;
private String name;
Object indexLock;
+ private final SizeStatisticImpl messageSize;
PListImpl(PListStoreImpl store) {
this.store = store;
@@ -52,6 +53,9 @@ public class PListImpl extends ListIndex<String, Location> implements PList {
setPageFile(store.getPageFile());
setKeyMarshaller(StringMarshaller.INSTANCE);
setValueMarshaller(LocationMarshaller.INSTANCE);
+
+ messageSize = new SizeStatisticImpl("messageSize", "The size in bytes of the pending messages");
+ messageSize.setEnabled(true);
}
public void setName(String name) {
@@ -75,6 +79,7 @@ public class PListImpl extends ListIndex<String, Location> implements PList {
public synchronized void destroy() throws IOException {
synchronized (indexLock) {
this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
+ @Override
public void execute(Transaction tx) throws IOException {
clear(tx);
unload(tx);
@@ -100,6 +105,7 @@ public class PListImpl extends ListIndex<String, Location> implements PList {
final Location location = this.store.write(bs, false);
synchronized (indexLock) {
this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
+ @Override
public void execute(Transaction tx) throws IOException {
add(tx, id, location);
}
@@ -113,6 +119,7 @@ public class PListImpl extends ListIndex<String, Location> implements PList {
final Location location = this.store.write(bs, false);
synchronized (indexLock) {
this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
+ @Override
public void execute(Transaction tx) throws IOException {
addFirst(tx, id, location);
}
@@ -133,6 +140,7 @@ public class PListImpl extends ListIndex<String, Location> implements PList {
final AtomicBoolean result = new AtomicBoolean();
synchronized (indexLock) {
this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
+ @Override
public void execute(Transaction tx) throws IOException {
result.set(remove(tx, id) != null);
}
@@ -145,6 +153,7 @@ public class PListImpl extends ListIndex<String, Location> implements PList {
final AtomicBoolean result = new AtomicBoolean();
synchronized (indexLock) {
this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
+ @Override
public void execute(Transaction tx) throws IOException {
Iterator<Map.Entry<String, Location>> iterator = iterator(tx, position);
if (iterator.hasNext()) {
@@ -165,6 +174,7 @@ public class PListImpl extends ListIndex<String, Location> implements PList {
final AtomicReference<Map.Entry<String, Location>> ref = new AtomicReference<Map.Entry<String, Location>>();
synchronized (indexLock) {
this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
+ @Override
public void execute(Transaction tx) throws IOException {
Iterator<Map.Entry<String, Location>> iterator = iterator(tx, position);
ref.set(iterator.next());
@@ -183,6 +193,7 @@ public class PListImpl extends ListIndex<String, Location> implements PList {
final AtomicReference<Map.Entry<String, Location>> ref = new AtomicReference<Map.Entry<String, Location>>();
synchronized (indexLock) {
this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
+ @Override
public void execute(Transaction tx) throws IOException {
ref.set(getFirst(tx));
}
@@ -200,6 +211,7 @@ public class PListImpl extends ListIndex<String, Location> implements PList {
final AtomicReference<Map.Entry<String, Location>> ref = new AtomicReference<Map.Entry<String, Location>>();
synchronized (indexLock) {
this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
+ @Override
public void execute(Transaction tx) throws IOException {
ref.set(getLast(tx));
}
@@ -270,6 +282,7 @@ public class PListImpl extends ListIndex<String, Location> implements PList {
}
}
+ @Override
public void release() {
try {
tx.rollback();
@@ -285,6 +298,7 @@ public class PListImpl extends ListIndex<String, Location> implements PList {
synchronized (indexLock) {
if (loaded.get()) {
this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
+ @Override
public void execute(Transaction tx) throws IOException {
Iterator<Map.Entry<String,Location>> iterator = iterator(tx);
while (iterator.hasNext()) {
@@ -298,6 +312,51 @@ public class PListImpl extends ListIndex<String, Location> implements PList {
}
@Override
+ public long messageSize() {
+ return messageSize.getTotalSize();
+ }
+
+ @Override
+ public synchronized Location add(Transaction tx, String key, Location value)
+ throws IOException {
+ Location location = super.add(tx, key, value);
+ messageSize.addSize(value.getSize());
+ return location;
+ }
+
+ @Override
+ public synchronized Location addFirst(Transaction tx, String key,
+ Location value) throws IOException {
+ Location location = super.addFirst(tx, key, value);
+ messageSize.addSize(value.getSize());
+ return location;
+ }
+
+ @Override
+ public synchronized void clear(Transaction tx) throws IOException {
+ messageSize.reset();
+ super.clear(tx);
+ }
+
+ @Override
+ protected synchronized void onLoad(ListNode<String, Location> node, Transaction tx) {
+ try {
+ Iterator<Entry<String, Location>> i = node.iterator(tx);
+ while (i.hasNext()) {
+ messageSize.addSize(i.next().getValue().getSize());
+ }
+ } catch (IOException e) {
+ LOG.warn("could not increment message size", e);
+ }
+ }
+
+ @Override
+ public void onRemove(Entry<String, Location> removed) {
+ super.onRemove(removed);
+ messageSize.addSize(-removed.getValue().getSize());
+ }
+
+ @Override
public String toString() {
return name + "[headPageId=" + getHeadPageId() + ",tailPageId=" + getTailPageId() + ", size=" + size() + "]";
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/734fb7dd/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala
----------------------------------------------------------------------
diff --git a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala
index 7c2d327..a4cdcac 100644
--- a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala
+++ b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala
@@ -1008,6 +1008,11 @@ class LevelDBStore extends LockableServiceSupport with BrokerServiceAware with P
case None => 0
}
}
+
+ def getMessageSize(clientId: String, subscriptionName: String): Long = {
+ check_running
+ return 0
+ }
}
class LevelDBPList(val name: String, val key: Long) extends PList {
@@ -1066,6 +1071,7 @@ class LevelDBStore extends LockableServiceSupport with BrokerServiceAware with P
def isEmpty = size()==0
def size(): Long = listSize.get()
+ def messageSize(): Long = 0
def iterator() = new PListIterator() {
check_running
http://git-wip-us.apache.org/repos/asf/activemq/blob/734fb7dd/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/QueueDuplicatesFromStoreTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/QueueDuplicatesFromStoreTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/QueueDuplicatesFromStoreTest.java
index 99382d0..6cef709 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/QueueDuplicatesFromStoreTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/QueueDuplicatesFromStoreTest.java
@@ -228,6 +228,11 @@ public class QueueDuplicatesFromStoreTest extends TestCase {
}
@Override
+ public long getPendingMessageSize() {
+ return 0;
+ }
+
+ @Override
public int getPrefetchSize() {
return 0;
}
@@ -354,10 +359,12 @@ public class QueueDuplicatesFromStoreTest extends TestCase {
return 0;
}
+ @Override
public void incrementConsumedCount(){
}
+ @Override
public void resetConsumedCount(){
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/734fb7dd/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/SubscriptionAddRemoveQueueTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/SubscriptionAddRemoveQueueTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/SubscriptionAddRemoveQueueTest.java
index 2541a64..207ecda 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/SubscriptionAddRemoveQueueTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/SubscriptionAddRemoveQueueTest.java
@@ -102,6 +102,7 @@ public class SubscriptionAddRemoveQueueTest extends TestCase {
public void testNoDispatchToRemovedConsumers() throws Exception {
final AtomicInteger producerId = new AtomicInteger();
Runnable sender = new Runnable() {
+ @Override
public void run() {
AtomicInteger id = new AtomicInteger();
int producerIdAndIncrement = producerId.getAndIncrement();
@@ -120,6 +121,7 @@ public class SubscriptionAddRemoveQueueTest extends TestCase {
};
Runnable subRemover = new Runnable() {
+ @Override
public void run() {
for (Subscription sub : subs) {
try {
@@ -177,10 +179,12 @@ public class SubscriptionAddRemoveQueueTest extends TestCase {
List<MessageReference> dispatched =
Collections.synchronizedList(new ArrayList<MessageReference>());
+ @Override
public void acknowledge(ConnectionContext context, MessageAck ack)
throws Exception {
}
+ @Override
public void add(MessageReference node) throws Exception {
// immediate dispatch
QueueMessageReference qmr = (QueueMessageReference)node;
@@ -188,6 +192,7 @@ public class SubscriptionAddRemoveQueueTest extends TestCase {
dispatched.add(qmr);
}
+ @Override
public ConnectionContext getContext() {
return null;
}
@@ -228,76 +233,100 @@ public class SubscriptionAddRemoveQueueTest extends TestCase {
public void resetConsumedCount() {
}
+ @Override
public void add(ConnectionContext context, Destination destination)
throws Exception {
}
+ @Override
public void destroy() {
}
+ @Override
public void gc() {
}
+ @Override
public ConsumerInfo getConsumerInfo() {
return info;
}
+ @Override
public long getDequeueCounter() {
return 0;
}
+ @Override
public long getDispatchedCounter() {
return 0;
}
+ @Override
public int getDispatchedQueueSize() {
return 0;
}
+ @Override
public long getEnqueueCounter() {
return 0;
}
+ @Override
public int getInFlightSize() {
return 0;
}
+ @Override
public int getInFlightUsage() {
return 0;
}
+ @Override
public ObjectName getObjectName() {
return null;
}
+ @Override
public int getPendingQueueSize() {
return 0;
}
+ @Override
+ public long getPendingMessageSize() {
+ return 0;
+ }
+
+ @Override
public int getPrefetchSize() {
return 0;
}
+ @Override
public String getSelector() {
return null;
}
+ @Override
public boolean isBrowser() {
return false;
}
+ @Override
public boolean isFull() {
return false;
}
+ @Override
public boolean isHighWaterMark() {
return false;
}
+ @Override
public boolean isLowWaterMark() {
return false;
}
+ @Override
public boolean isRecoveryRequired() {
return false;
}
@@ -306,19 +335,23 @@ public class SubscriptionAddRemoveQueueTest extends TestCase {
return false;
}
+ @Override
public boolean matches(MessageReference node,
MessageEvaluationContext context) throws IOException {
return true;
}
+ @Override
public boolean matches(ActiveMQDestination destination) {
return false;
}
+ @Override
public void processMessageDispatchNotification(
MessageDispatchNotification mdn) throws Exception {
}
+ @Override
public Response pullMessage(ConnectionContext context, MessagePull pull)
throws Exception {
return null;
@@ -329,34 +362,42 @@ public class SubscriptionAddRemoveQueueTest extends TestCase {
return false;
}
+ @Override
public List<MessageReference> remove(ConnectionContext context,
Destination destination) throws Exception {
return new ArrayList<MessageReference>(dispatched);
}
+ @Override
public void setObjectName(ObjectName objectName) {
}
+ @Override
public void setSelector(String selector)
throws InvalidSelectorException, UnsupportedOperationException {
}
+ @Override
public void updateConsumerPrefetch(int newPrefetch) {
}
+ @Override
public boolean addRecoveredMessage(ConnectionContext context,
MessageReference message) throws Exception {
return false;
}
+ @Override
public ActiveMQDestination getActiveMQDestination() {
return null;
}
+ @Override
public int getLockPriority() {
return 0;
}
+ @Override
public boolean isLockExclusive() {
return false;
}
@@ -367,6 +408,7 @@ public class SubscriptionAddRemoveQueueTest extends TestCase {
public void removeDestination(Destination destination) {
}
+ @Override
public int countBeforeFull() {
return 10;
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/734fb7dd/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursorTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursorTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursorTest.java
new file mode 100644
index 0000000..5d0a82c
--- /dev/null
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursorTest.java
@@ -0,0 +1,547 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.region.cursors;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.concurrent.atomic.AtomicLong;
+
+import javax.jms.Connection;
+import javax.jms.DeliveryMode;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.Topic;
+import javax.jms.TopicSession;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.TransportConnector;
+import org.apache.activemq.broker.region.Destination;
+import org.apache.activemq.broker.region.DurableTopicSubscription;
+import org.apache.activemq.broker.region.TopicSubscription;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.store.AbstractStoreStatTestSupport;
+import org.apache.activemq.store.MessageStore;
+import org.apache.activemq.util.SubscriptionKey;
+import org.apache.activemq.util.Wait;
+import org.apache.activemq.util.Wait.Condition;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This test checks that KahaDB properly sets the new storeMessageSize statistic.
+ *
+ * AMQ-5748
+ *
+ */
+public abstract class AbstractPendingMessageCursorTest extends AbstractStoreStatTestSupport {
+ protected static final Logger LOG = LoggerFactory
+ .getLogger(AbstractPendingMessageCursorTest.class);
+
+
+ protected BrokerService broker;
+ protected URI brokerConnectURI;
+ protected String defaultQueueName = "test.queue";
+ protected String defaultTopicName = "test.topic";
+ protected static int maxMessageSize = 1000;
+
+ @Before
+ public void startBroker() throws Exception {
+ setUpBroker(true);
+ }
+
+ protected void setUpBroker(boolean clearDataDir) throws Exception {
+
+ broker = new BrokerService();
+ this.initPersistence(broker);
+ //set up a transport
+ TransportConnector connector = broker
+ .addConnector(new TransportConnector());
+ connector.setUri(new URI("tcp://0.0.0.0:0"));
+ connector.setName("tcp");
+
+ PolicyEntry policy = new PolicyEntry();
+ policy.setTopicPrefetch(100);
+ policy.setDurableTopicPrefetch(100);
+ PolicyMap pMap = new PolicyMap();
+ pMap.setDefaultEntry(policy);
+ broker.setDestinationPolicy(pMap);
+
+ broker.start();
+ broker.waitUntilStarted();
+ brokerConnectURI = broker.getConnectorByName("tcp").getConnectUri();
+
+ }
+
+ @After
+ public void stopBroker() throws Exception {
+ broker.stop();
+ broker.waitUntilStopped();
+ }
+
+ @Override
+ protected BrokerService getBroker() {
+ return this.broker;
+ }
+
+ @Override
+ protected URI getBrokerConnectURI() {
+ return this.brokerConnectURI;
+ }
+
+ protected abstract void initPersistence(BrokerService brokerService) throws IOException;
+
+ @Test
+ public void testQueueMessageSize() throws Exception {
+ AtomicLong publishedMessageSize = new AtomicLong();
+
+ org.apache.activemq.broker.region.Queue dest = publishTestQueueMessages(200, publishedMessageSize);
+ verifyPendingStats(dest, 200, publishedMessageSize.get());
+ verifyStoreStats(dest, 200, publishedMessageSize.get());
+ }
+
+ @Test
+ public void testQueueBrowserMessageSize() throws Exception {
+ AtomicLong publishedMessageSize = new AtomicLong();
+
+ org.apache.activemq.broker.region.Queue dest = publishTestQueueMessages(200, publishedMessageSize);
+ browseTestQueueMessages(dest.getName());
+ verifyPendingStats(dest, 200, publishedMessageSize.get());
+ verifyStoreStats(dest, 200, publishedMessageSize.get());
+ }
+
+ @Test
+ public void testQueueMessageSizeNonPersistent() throws Exception {
+ AtomicLong publishedMessageSize = new AtomicLong();
+
+ org.apache.activemq.broker.region.Queue dest = publishTestQueueMessages(200,
+ DeliveryMode.NON_PERSISTENT, publishedMessageSize);
+ verifyPendingStats(dest, 200, publishedMessageSize.get());
+ }
+
+ @Test
+ public void testQueueMessageSizePersistentAndNonPersistent() throws Exception {
+ AtomicLong publishedNonPersistentMessageSize = new AtomicLong();
+ AtomicLong publishedMessageSize = new AtomicLong();
+
+ org.apache.activemq.broker.region.Queue dest = publishTestQueueMessages(100,
+ DeliveryMode.PERSISTENT, publishedMessageSize);
+ dest = publishTestQueueMessages(100,
+ DeliveryMode.NON_PERSISTENT, publishedNonPersistentMessageSize);
+ verifyPendingStats(dest, 200, publishedMessageSize.get() + publishedNonPersistentMessageSize.get());
+ verifyStoreStats(dest, 100, publishedMessageSize.get());
+ }
+
+ @Test
+ public void testQueueMessageSizeAfterConsumption() throws Exception {
+ AtomicLong publishedMessageSize = new AtomicLong();
+
+ org.apache.activemq.broker.region.Queue dest = publishTestQueueMessages(200, publishedMessageSize);
+ verifyPendingStats(dest, 200, publishedMessageSize.get());
+
+ consumeTestQueueMessages();
+
+ verifyPendingStats(dest, 0, 0);
+ verifyStoreStats(dest, 0, 0);
+ }
+
+ @Test
+ public void testQueueMessageSizeAfterConsumptionNonPersistent() throws Exception {
+ AtomicLong publishedMessageSize = new AtomicLong();
+
+ org.apache.activemq.broker.region.Queue dest = publishTestQueueMessages(200, DeliveryMode.NON_PERSISTENT, publishedMessageSize);
+ verifyPendingStats(dest, 200, publishedMessageSize.get());
+
+ consumeTestQueueMessages();
+
+ verifyPendingStats(dest, 0, 0);
+ verifyStoreStats(dest, 0, 0);
+ }
+
+ @Test(timeout=100000)
+ public void testTopicMessageSize() throws Exception {
+ AtomicLong publishedMessageSize = new AtomicLong();
+
+ Connection connection = new ActiveMQConnectionFactory(brokerConnectURI).createConnection();
+ connection.setClientID("clientId");
+ connection.start();
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageConsumer consumer = session.createConsumer(new ActiveMQTopic(this.defaultTopicName));
+
+ org.apache.activemq.broker.region.Topic dest = publishTestTopicMessages(200, publishedMessageSize);
+
+ //verify the count and size - there is a prefetch of 100 so only 100 are pending and 100
+ //are dispatched because we have an active consumer online
+ //verify that the size is greater than 100 messages times the minimum size of 100
+ verifyPendingStats(dest, 100, 100 * 100);
+
+ //consume all messages
+ consumeTestMessages(consumer, 200);
+
+ //All messages should now be gone
+ verifyPendingStats(dest, 0, 0);
+
+ connection.close();
+ }
+
+ @Test(timeout=100000)
+ public void testTopicNonPersistentMessageSize() throws Exception {
+ AtomicLong publishedMessageSize = new AtomicLong();
+
+ Connection connection = new ActiveMQConnectionFactory(brokerConnectURI).createConnection();
+ connection.setClientID("clientId");
+ connection.start();
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageConsumer consumer = session.createConsumer(new ActiveMQTopic(this.defaultTopicName));
+
+ org.apache.activemq.broker.region.Topic dest = publishTestTopicMessages(200,
+ DeliveryMode.NON_PERSISTENT, publishedMessageSize);
+
+ //verify the count and size - there is a prefetch of 100 so only 100 are pending and 100
+ //are dispatched because we have an active consumer online
+ //verify the size is at least as big as 100 messages times the minimum of 100 size
+ verifyPendingStats(dest, 100, 100 * 100);
+
+ //consume all messages
+ consumeTestMessages(consumer, 200);
+
+ //All messages should now be gone
+ verifyPendingStats(dest, 0, 0);
+
+ connection.close();
+ }
+
+ @Test(timeout=100000)
+ public void testTopicPersistentAndNonPersistentMessageSize() throws Exception {
+ AtomicLong publishedMessageSize = new AtomicLong();
+
+ Connection connection = new ActiveMQConnectionFactory(brokerConnectURI).createConnection();
+ connection.setClientID("clientId");
+ connection.start();
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageConsumer consumer = session.createConsumer(new ActiveMQTopic(this.defaultTopicName));
+
+ org.apache.activemq.broker.region.Topic dest = publishTestTopicMessages(100,
+ DeliveryMode.NON_PERSISTENT, publishedMessageSize);
+
+ dest = publishTestTopicMessages(100, DeliveryMode.PERSISTENT, publishedMessageSize);
+
+ //verify the count and size - there is a prefetch of 100 so only 100 are pending and 100
+ //are dispatched because we have an active consumer online
+ //verify the size is at least as big as 100 messages times the minimum of 100 size
+ verifyPendingStats(dest, 100, 100 * 100);
+
+ //consume all messages
+ consumeTestMessages(consumer, 200);
+
+ //All messages should now be gone
+ verifyPendingStats(dest, 0, 0);
+
+ connection.close();
+ }
+
+ @Test(timeout=10000)
+ public void testMessageSizeOneDurable() throws Exception {
+ AtomicLong publishedMessageSize = new AtomicLong();
+ Connection connection = new ActiveMQConnectionFactory(brokerConnectURI).createConnection();
+ connection.setClientID("clientId");
+ connection.start();
+
+ SubscriptionKey subKey = new SubscriptionKey("clientId", "sub1");
+ org.apache.activemq.broker.region.Topic dest = publishTestMessagesDurable(connection,
+ new String[] {"sub1"}, 200, publishedMessageSize, DeliveryMode.PERSISTENT);
+
+ //verify the count and size - durable is offline so all 200 should be pending since none are in prefetch
+ verifyPendingStats(dest, subKey, 200, publishedMessageSize.get());
+ verifyStoreStats(dest, 200, publishedMessageSize.get());
+
+ //consume all messages
+ consumeDurableTestMessages(connection, "sub1", 200, publishedMessageSize);
+
+ //All messages should now be gone
+ verifyPendingStats(dest, subKey, 0, 0);
+ verifyStoreStats(dest, 0, 0);
+
+ connection.close();
+ }
+
+ @Test(timeout=10000)
+ public void testMessageSizeOneDurablePartialConsumption() throws Exception {
+ AtomicLong publishedMessageSize = new AtomicLong();
+
+ Connection connection = new ActiveMQConnectionFactory(brokerConnectURI).createConnection();
+ connection.setClientID("clientId");
+ connection.start();
+
+ SubscriptionKey subKey = new SubscriptionKey("clientId", "sub1");
+ org.apache.activemq.broker.region.Topic dest = publishTestMessagesDurable(
+ connection, new String[] {"sub1"}, 200, publishedMessageSize, DeliveryMode.PERSISTENT);
+
+ //verify the count and size - durable is offline so all 200 should be pending since none are in prefetch
+ verifyPendingStats(dest, subKey, 200, publishedMessageSize.get());
+ verifyStoreStats(dest, 200, publishedMessageSize.get());
+
+ //consume all messages
+ consumeDurableTestMessages(connection, "sub1", 50, publishedMessageSize);
+
+ //150 should be left
+ verifyPendingStats(dest, subKey, 150, publishedMessageSize.get());
+ verifyStoreStats(dest, 150, publishedMessageSize.get());
+
+ connection.close();
+ }
+
+ @Test(timeout=10000)
+ public void testMessageSizeTwoDurables() throws Exception {
+ AtomicLong publishedMessageSize = new AtomicLong();
+
+ Connection connection = new ActiveMQConnectionFactory(brokerConnectURI).createConnection();
+ connection.setClientID("clientId");
+ connection.start();
+
+ org.apache.activemq.broker.region.Topic dest =
+ publishTestMessagesDurable(connection, new String[] {"sub1", "sub2"}, 200,
+ publishedMessageSize, DeliveryMode.PERSISTENT);
+
+ //verify the count and size
+ SubscriptionKey subKey = new SubscriptionKey("clientId", "sub1");
+ verifyPendingStats(dest, subKey, 200, publishedMessageSize.get());
+
+ //consume messages just for sub1
+ consumeDurableTestMessages(connection, "sub1", 200, publishedMessageSize);
+
+ //There is still a durable that hasn't consumed so the messages should exist
+ SubscriptionKey subKey2 = new SubscriptionKey("clientId", "sub2");
+ verifyPendingStats(dest, subKey, 0, 0);
+ verifyPendingStats(dest, subKey2, 200, publishedMessageSize.get());
+ verifyStoreStats(dest, 200, publishedMessageSize.get());
+
+ connection.stop();
+ }
+
+
+ protected void verifyPendingStats(final org.apache.activemq.broker.region.Queue queue,
+ final int count, final long minimumSize) throws Exception {
+ this.verifyPendingStats(queue, count, minimumSize, count, minimumSize);
+ }
+
+ protected void verifyPendingStats(final org.apache.activemq.broker.region.Queue queue,
+ final int count, final long minimumSize, final int storeCount, final long minimumStoreSize) throws Exception {
+
+ Wait.waitFor(new Condition() {
+ @Override
+ public boolean isSatisified() throws Exception {
+ return queue.getPendingMessageCount() == count;
+ }
+ });
+
+ verifySize(count, new MessageSizeCalculator() {
+ @Override
+ public long getMessageSize() throws Exception {
+ return queue.getPendingMessageSize();
+ }
+ }, minimumSize);
+ }
+
+ //For a non-durable there won't necessarily be a message store
+ protected void verifyPendingStats(org.apache.activemq.broker.region.Topic topic,
+ final int count, final long minimumSize) throws Exception {
+
+ final TopicSubscription sub = (TopicSubscription) topic.getConsumers().get(0);
+
+ Wait.waitFor(new Condition() {
+ @Override
+ public boolean isSatisified() throws Exception {
+ return sub.getPendingQueueSize() == count;
+ }
+ });
+
+ verifySize(count, new MessageSizeCalculator() {
+ @Override
+ public long getMessageSize() throws Exception {
+ return sub.getPendingMessageSize();
+ }
+ }, minimumSize);
+ }
+
+ protected void verifyPendingStats(org.apache.activemq.broker.region.Topic topic, SubscriptionKey subKey,
+ final int count, final long minimumSize) throws Exception {
+
+ final DurableTopicSubscription sub = topic.getDurableTopicSubs().get(subKey);
+
+ //verify message count
+ Wait.waitFor(new Condition() {
+ @Override
+ public boolean isSatisified() throws Exception {
+ return sub.getPendingQueueSize() == count;
+ }
+ });
+
+ //verify message size
+ verifySize(count, new MessageSizeCalculator() {
+ @Override
+ public long getMessageSize() throws Exception {
+ return sub.getPendingMessageSize();
+ }
+ }, minimumSize);
+ }
+
+ protected void verifyStoreStats(org.apache.activemq.broker.region.Destination dest,
+ final int storeCount, final long minimumStoreSize) throws Exception {
+ final MessageStore messageStore = dest.getMessageStore();
+
+ Wait.waitFor(new Condition() {
+ @Override
+ public boolean isSatisified() throws Exception {
+ return messageStore.getMessageCount() == storeCount;
+ }
+ });
+ verifySize(storeCount, new MessageSizeCalculator() {
+ @Override
+ public long getMessageSize() throws Exception {
+ return messageStore.getMessageSize();
+ }
+ }, minimumStoreSize);
+
+ }
+
+
+ protected void verifySize(final int count, final MessageSizeCalculator messageSizeCalculator,
+ final long minimumSize) throws Exception {
+ if (count > 0) {
+ Wait.waitFor(new Condition() {
+ @Override
+ public boolean isSatisified() throws Exception {
+ return messageSizeCalculator.getMessageSize() > minimumSize ;
+ }
+ });
+ } else {
+ Wait.waitFor(new Condition() {
+ @Override
+ public boolean isSatisified() throws Exception {
+ return messageSizeCalculator.getMessageSize() == 0;
+ }
+ });
+ }
+ }
+
+ protected static interface MessageSizeCalculator {
+ long getMessageSize() throws Exception;
+ }
+
+
+ protected Destination consumeTestMessages(MessageConsumer consumer, int size) throws Exception {
+ return consumeTestMessages(consumer, size, defaultTopicName);
+ }
+
+
+ protected Destination consumeTestMessages(MessageConsumer consumer, int size, String topicName) throws Exception {
+ // create a new queue
+ final ActiveMQDestination activeMqTopic = new ActiveMQTopic(
+ topicName);
+
+ Destination dest = broker.getDestination(activeMqTopic);
+
+ //Topic topic = session.createTopic(topicName);
+
+ try {
+ for (int i = 0; i < size; i++) {
+ consumer.receive();
+ }
+
+ } finally {
+ //session.close();
+ }
+
+ return dest;
+ }
+
+ protected Destination consumeDurableTestMessages(Connection connection, String sub, int size, AtomicLong publishedMessageSize) throws Exception {
+ return consumeDurableTestMessages(connection, sub, size, defaultTopicName, publishedMessageSize);
+ }
+
+ protected org.apache.activemq.broker.region.Topic publishTestMessagesDurable(Connection connection,
+ String[] subNames, int publishSize, AtomicLong publishedMessageSize, int deliveryMode) throws Exception {
+
+ return publishTestMessagesDurable(connection, subNames, defaultTopicName,
+ publishSize, 0, AbstractStoreStatTestSupport.defaultMessageSize,
+ publishedMessageSize, false, deliveryMode);
+ }
+
+ protected org.apache.activemq.broker.region.Topic publishTestTopicMessages(int publishSize,
+ AtomicLong publishedMessageSize) throws Exception {
+ return publishTestTopicMessages(publishSize, DeliveryMode.PERSISTENT, publishedMessageSize);
+ }
+
+ protected org.apache.activemq.broker.region.Topic publishTestTopicMessages(int publishSize,
+ int deliveryMode, AtomicLong publishedMessageSize) throws Exception {
+ // create a new queue
+ Connection connection = new ActiveMQConnectionFactory(brokerConnectURI).createConnection();
+ connection.setClientID("clientId2");
+ connection.start();
+
+ final ActiveMQDestination activeMqTopic = new ActiveMQTopic(
+ defaultTopicName);
+
+ org.apache.activemq.broker.region.Topic dest =
+ (org.apache.activemq.broker.region.Topic) broker.getDestination(activeMqTopic);
+
+ // Start the connection
+ Session session = connection.createSession(false,
+ TopicSession.AUTO_ACKNOWLEDGE);
+ Topic topic = session.createTopic(defaultTopicName);
+
+ try {
+ // publish a bunch of non-persistent messages to fill up the temp
+ // store
+ MessageProducer prod = session.createProducer(topic);
+ prod.setDeliveryMode(deliveryMode);
+ for (int i = 0; i < publishSize; i++) {
+ prod.send(createMessage(session, AbstractPendingMessageCursorTest.maxMessageSize, publishedMessageSize));
+ }
+
+ } finally {
+ connection.close();
+ }
+
+ return dest;
+ }
+
+ protected org.apache.activemq.broker.region.Queue publishTestQueueMessages(int count,
+ AtomicLong publishedMessageSize) throws Exception {
+ return publishTestQueueMessages(count, defaultQueueName, DeliveryMode.PERSISTENT,
+ AbstractPendingMessageCursorTest.maxMessageSize, publishedMessageSize);
+ }
+
+ protected org.apache.activemq.broker.region.Queue publishTestQueueMessages(int count, int deliveryMode,
+ AtomicLong publishedMessageSize) throws Exception {
+ return publishTestQueueMessages(count, defaultQueueName, deliveryMode,
+ AbstractPendingMessageCursorTest.maxMessageSize, publishedMessageSize);
+ }
+
+ protected Destination consumeTestQueueMessages() throws Exception {
+ return consumeTestQueueMessages(defaultQueueName);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/activemq/blob/734fb7dd/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/KahaDBPendingMessageCursorTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/KahaDBPendingMessageCursorTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/KahaDBPendingMessageCursorTest.java
new file mode 100644
index 0000000..557c70e
--- /dev/null
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/KahaDBPendingMessageCursorTest.java
@@ -0,0 +1,126 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.region.cursors;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicLong;
+
+import javax.jms.Connection;
+import javax.jms.DeliveryMode;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.region.Topic;
+import org.apache.activemq.util.SubscriptionKey;
+import org.apache.commons.io.FileUtils;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This test checks that pending message metrics work properly with KahaDB
+ *
+ * AMQ-5923
+ *
+ */
+public class KahaDBPendingMessageCursorTest extends
+ AbstractPendingMessageCursorTest {
+ protected static final Logger LOG = LoggerFactory
+ .getLogger(KahaDBPendingMessageCursorTest.class);
+
+ @Rule
+ public TemporaryFolder dataFileDir = new TemporaryFolder(new File("target"));
+
+ @Override
+ protected void setUpBroker(boolean clearDataDir) throws Exception {
+ if (clearDataDir && dataFileDir.getRoot().exists())
+ FileUtils.cleanDirectory(dataFileDir.getRoot());
+ super.setUpBroker(clearDataDir);
+ }
+
+ @Override
+ protected void initPersistence(BrokerService brokerService)
+ throws IOException {
+ broker.setPersistent(true);
+ broker.setDataDirectoryFile(dataFileDir.getRoot());
+ }
+
+ /**
+ * Test that the the counter restores size and works after restart and more
+ * messages are published
+ *
+ * @throws Exception
+ */
+ @Test(timeout=10000)
+ public void testDurableMessageSizeAfterRestartAndPublish() throws Exception {
+ AtomicLong publishedMessageSize = new AtomicLong();
+
+ Connection connection = new ActiveMQConnectionFactory(brokerConnectURI).createConnection();
+ connection.setClientID("clientId");
+ connection.start();
+ Topic topic = publishTestMessagesDurable(connection, new String[] {"sub1"}, 200,
+ publishedMessageSize, DeliveryMode.PERSISTENT);
+
+ SubscriptionKey subKey = new SubscriptionKey("clientId", "sub1");
+
+ // verify the count and size
+ verifyPendingStats(topic, subKey, 200, publishedMessageSize.get());
+ verifyStoreStats(topic, 200, publishedMessageSize.get());
+
+ // stop, restart broker and publish more messages
+ stopBroker();
+ this.setUpBroker(false);
+
+ connection = new ActiveMQConnectionFactory(brokerConnectURI).createConnection();
+ connection.setClientID("clientId");
+ connection.start();
+
+ topic = publishTestMessagesDurable(connection, new String[] {"sub1"}, 200,
+ publishedMessageSize, DeliveryMode.PERSISTENT);
+
+ // verify the count and size
+ verifyPendingStats(topic, subKey, 400, publishedMessageSize.get());
+ verifyStoreStats(topic, 400, publishedMessageSize.get());
+
+ }
+
+ /**
+ * Test that the the counter restores size and works after restart and more
+ * messages are published
+ *
+ * @throws Exception
+ */
+ @Test(timeout=10000)
+ public void testNonPersistentDurableMessageSize() throws Exception {
+ AtomicLong publishedMessageSize = new AtomicLong();
+
+ Connection connection = new ActiveMQConnectionFactory(brokerConnectURI).createConnection();
+ connection.setClientID("clientId");
+ connection.start();
+ Topic topic = publishTestMessagesDurable(connection, new String[] {"sub1"}, 200,
+ publishedMessageSize, DeliveryMode.NON_PERSISTENT);
+
+ SubscriptionKey subKey = new SubscriptionKey("clientId", "sub1");
+
+ // verify the count and size
+ verifyPendingStats(topic, subKey, 200, publishedMessageSize.get());
+ verifyStoreStats(topic, 0, 0);
+ }
+}
http://git-wip-us.apache.org/repos/asf/activemq/blob/734fb7dd/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/MemoryPendingMessageCursorTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/MemoryPendingMessageCursorTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/MemoryPendingMessageCursorTest.java
new file mode 100644
index 0000000..948193d
--- /dev/null
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/MemoryPendingMessageCursorTest.java
@@ -0,0 +1,145 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.region.cursors;
+
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicLong;
+
+import javax.jms.Connection;
+import javax.jms.DeliveryMode;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.store.memory.MemoryPersistenceAdapter;
+import org.apache.activemq.util.SubscriptionKey;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This test checks that PendingMessageCursor size statistics work with the MemoryPersistentAdapter
+ *
+ * AMQ-5748
+ *
+ */
+public class MemoryPendingMessageCursorTest extends AbstractPendingMessageCursorTest {
+ protected static final Logger LOG = LoggerFactory
+ .getLogger(MemoryPendingMessageCursorTest.class);
+
+ @Override
+ protected void initPersistence(BrokerService brokerService) throws IOException {
+ broker.setPersistent(false);
+ broker.setPersistenceAdapter(new MemoryPersistenceAdapter());
+ }
+
+
+ @Override
+ @Test(timeout=10000)
+ public void testMessageSizeOneDurable() throws Exception {
+ AtomicLong publishedMessageSize = new AtomicLong();
+ Connection connection = new ActiveMQConnectionFactory(brokerConnectURI).createConnection();
+ connection.setClientID("clientId");
+ connection.start();
+
+ SubscriptionKey subKey = new SubscriptionKey("clientId", "sub1");
+ org.apache.activemq.broker.region.Topic dest =
+ publishTestMessagesDurable(connection, new String[] {"sub1"},
+ 200, publishedMessageSize, DeliveryMode.PERSISTENT);
+
+ verifyPendingStats(dest, subKey, 200, publishedMessageSize.get());
+
+ //The expected value is only 100 because for durables a LRUCache is being used
+ //with a max size of 100
+ verifyStoreStats(dest, 100, publishedMessageSize.get());
+
+ //consume 100 messages
+ consumeDurableTestMessages(connection, "sub1", 100, publishedMessageSize);
+
+ //100 should be left
+ verifyPendingStats(dest, subKey, 100, publishedMessageSize.get());
+ verifyStoreStats(dest, 100, publishedMessageSize.get());
+
+ connection.close();
+ }
+
+ @Override
+ @Test(timeout=10000)
+ public void testMessageSizeTwoDurables() throws Exception {
+ AtomicLong publishedMessageSize = new AtomicLong();
+
+ Connection connection = new ActiveMQConnectionFactory(brokerConnectURI).createConnection();
+ connection.setClientID("clientId");
+ connection.start();
+
+ org.apache.activemq.broker.region.Topic dest =
+ publishTestMessagesDurable(connection, new String[] {"sub1", "sub2"},
+ 200, publishedMessageSize, DeliveryMode.PERSISTENT);
+
+ //verify the count and size
+ SubscriptionKey subKey = new SubscriptionKey("clientId", "sub1");
+ verifyPendingStats(dest, subKey, 200, publishedMessageSize.get());
+
+ //consume messages just for sub1
+ consumeDurableTestMessages(connection, "sub1", 200, publishedMessageSize);
+
+ //There is still a durable that hasn't consumed so the messages should exist
+ SubscriptionKey subKey2 = new SubscriptionKey("clientId", "sub2");
+ verifyPendingStats(dest, subKey, 0, 0);
+ verifyPendingStats(dest, subKey2, 200, publishedMessageSize.get());
+
+ //The expected value is only 100 because for durables a LRUCache is being used
+ //with a max size of 100
+ verifyStoreStats(dest, 100, publishedMessageSize.get());
+
+ connection.stop();
+ }
+
+ @Override
+ @Test(timeout=10000)
+ public void testMessageSizeOneDurablePartialConsumption() throws Exception {
+ AtomicLong publishedMessageSize = new AtomicLong();
+
+ Connection connection = new ActiveMQConnectionFactory(brokerConnectURI).createConnection();
+ connection.setClientID("clientId");
+ connection.start();
+
+ SubscriptionKey subKey = new SubscriptionKey("clientId", "sub1");
+ org.apache.activemq.broker.region.Topic dest = publishTestMessagesDurable(connection,
+ new String[] {"sub1"}, 200, publishedMessageSize, DeliveryMode.PERSISTENT);
+
+ //verify the count and size - durable is offline so all 200 should be pending since none are in prefetch
+ verifyPendingStats(dest, subKey, 200, publishedMessageSize.get());
+
+ //The expected value is only 100 because for durables a LRUCache is being used
+ //with a max size of 100
+ verifyStoreStats(dest, 100, publishedMessageSize.get());
+
+ //consume all messages
+ consumeDurableTestMessages(connection, "sub1", 50, publishedMessageSize);
+
+ //All messages should now be gone
+ verifyPendingStats(dest, subKey, 150, publishedMessageSize.get());
+
+ //The expected value is only 100 because for durables a LRUCache is being used
+ //with a max size of 100
+ //verify the size is at least as big as 100 messages times the minimum of 100 size
+ verifyStoreStats(dest, 100, 100 * 100);
+
+ connection.close();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/activemq/blob/734fb7dd/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/MultiKahaDBPendingMessageCursorTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/MultiKahaDBPendingMessageCursorTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/MultiKahaDBPendingMessageCursorTest.java
new file mode 100644
index 0000000..9768980
--- /dev/null
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/MultiKahaDBPendingMessageCursorTest.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.region.cursors;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.store.kahadb.FilteredKahaDBPersistenceAdapter;
+import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
+import org.apache.activemq.store.kahadb.MultiKahaDBPersistenceAdapter;
+
+/**
+ * This test checks that pending message metrics work properly with MultiKahaDB
+ *
+ * AMQ-5923
+ *
+ */
+public class MultiKahaDBPendingMessageCursorTest extends
+ KahaDBPendingMessageCursorTest {
+
+ @Override
+ protected void initPersistence(BrokerService brokerService)
+ throws IOException {
+ broker.setPersistent(true);
+
+ //setup multi-kaha adapter
+ MultiKahaDBPersistenceAdapter persistenceAdapter = new MultiKahaDBPersistenceAdapter();
+ persistenceAdapter.setDirectory(dataFileDir.getRoot());
+
+ KahaDBPersistenceAdapter kahaStore = new KahaDBPersistenceAdapter();
+ kahaStore.setJournalMaxFileLength(1024 * 512);
+
+ //set up a store per destination
+ FilteredKahaDBPersistenceAdapter filtered = new FilteredKahaDBPersistenceAdapter();
+ filtered.setPersistenceAdapter(kahaStore);
+ filtered.setPerDestination(true);
+ List<FilteredKahaDBPersistenceAdapter> stores = new ArrayList<>();
+ stores.add(filtered);
+
+ persistenceAdapter.setFilteredPersistenceAdapters(stores);
+ broker.setPersistenceAdapter(persistenceAdapter);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/activemq/blob/734fb7dd/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/OrderPendingListTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/OrderPendingListTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/OrderPendingListTest.java
index 79d7e6c..6a9dd6b 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/OrderPendingListTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/OrderPendingListTest.java
@@ -309,6 +309,16 @@ public class OrderPendingListTest {
}
@Override
+ public long messageSize() {
+ long size = 0;
+ Iterator<MessageReference> i = theList.iterator();
+ while (i.hasNext()) {
+ size += i.next().getMessage().getSize();
+ }
+ return size;
+ }
+
+ @Override
public Iterator<MessageReference> iterator() {
return theList.iterator();
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/734fb7dd/activemq-unit-tests/src/test/java/org/apache/activemq/store/AbstractMessageStoreSizeStatTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/store/AbstractMessageStoreSizeStatTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/store/AbstractMessageStoreSizeStatTest.java
index 944d183..116500e 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/store/AbstractMessageStoreSizeStatTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/store/AbstractMessageStoreSizeStatTest.java
@@ -16,38 +16,19 @@
*/
package org.apache.activemq.store;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.net.URI;
-import java.util.Random;
+import java.util.concurrent.atomic.AtomicLong;
-import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.DeliveryMode;
-import javax.jms.JMSException;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.Queue;
-import javax.jms.QueueSession;
-import javax.jms.Session;
-import javax.jms.Topic;
-import javax.jms.TopicSession;
-import javax.jms.TopicSubscriber;
-import javax.management.ObjectName;
-import javax.management.openmbean.CompositeData;
-import javax.management.openmbean.TabularData;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
-import org.apache.activemq.broker.jmx.DurableSubscriptionViewMBean;
import org.apache.activemq.broker.region.Destination;
-import org.apache.activemq.command.ActiveMQDestination;
-import org.apache.activemq.command.ActiveMQQueue;
-import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.util.Wait;
import org.apache.activemq.util.Wait.Condition;
import org.junit.After;
@@ -62,7 +43,7 @@ import org.slf4j.LoggerFactory;
* AMQ-5748
*
*/
-public abstract class AbstractMessageStoreSizeStatTest {
+public abstract class AbstractMessageStoreSizeStatTest extends AbstractStoreStatTestSupport {
protected static final Logger LOG = LoggerFactory
.getLogger(AbstractMessageStoreSizeStatTest.class);
@@ -71,7 +52,6 @@ public abstract class AbstractMessageStoreSizeStatTest {
protected URI brokerConnectURI;
protected String defaultQueueName = "test.queue";
protected String defaultTopicName = "test.topic";
- protected static int messageSize = 1000;
@Before
public void startBroker() throws Exception {
@@ -100,39 +80,52 @@ public abstract class AbstractMessageStoreSizeStatTest {
broker.waitUntilStopped();
}
+ @Override
+ protected BrokerService getBroker() {
+ return this.broker;
+ }
+
+ @Override
+ protected URI getBrokerConnectURI() {
+ return this.brokerConnectURI;
+ }
+
protected abstract void initPersistence(BrokerService brokerService) throws IOException;
- @Test
+ @Test(timeout=10000)
public void testMessageSize() throws Exception {
- Destination dest = publishTestQueueMessages(200);
- verifyStats(dest, 200, 200 * messageSize);
+ AtomicLong publishedMessageSize = new AtomicLong();
+
+ Destination dest = publishTestQueueMessages(200, publishedMessageSize);
+ verifyStats(dest, 200, publishedMessageSize.get());
}
- @Test
+ @Test(timeout=10000)
public void testMessageSizeAfterConsumption() throws Exception {
+ AtomicLong publishedMessageSize = new AtomicLong();
- Destination dest = publishTestQueueMessages(200);
- verifyStats(dest, 200, 200 * messageSize);
+ Destination dest = publishTestQueueMessages(200, publishedMessageSize);
+ verifyStats(dest, 200, publishedMessageSize.get());
consumeTestQueueMessages();
verifyStats(dest, 0, 0);
}
- @Test
+ @Test(timeout=10000)
public void testMessageSizeOneDurable() throws Exception {
-
+ AtomicLong publishedMessageSize = new AtomicLong();
Connection connection = new ActiveMQConnectionFactory(brokerConnectURI).createConnection();
connection.setClientID("clientId");
connection.start();
- Destination dest = publishTestMessagesDurable(connection, new String[] {"sub1"}, 200, 200);
+ Destination dest = publishTestMessagesDurable(connection, new String[] {"sub1"}, 200, 200, publishedMessageSize);
//verify the count and size
- verifyStats(dest, 200, 200 * messageSize);
+ verifyStats(dest, 200, publishedMessageSize.get());
//consume all messages
- consumeDurableTestMessages(connection, "sub1", 200);
+ consumeDurableTestMessages(connection, "sub1", 200, publishedMessageSize);
//All messages should now be gone
verifyStats(dest, 0, 0);
@@ -142,21 +135,21 @@ public abstract class AbstractMessageStoreSizeStatTest {
@Test(timeout=10000)
public void testMessageSizeTwoDurables() throws Exception {
-
+ AtomicLong publishedMessageSize = new AtomicLong();
Connection connection = new ActiveMQConnectionFactory(brokerConnectURI).createConnection();
connection.setClientID("clientId");
connection.start();
- Destination dest = publishTestMessagesDurable(connection, new String[] {"sub1", "sub2"}, 200, 200);
+ Destination dest = publishTestMessagesDurable(connection, new String[] {"sub1", "sub2"}, 200, 200, publishedMessageSize);
//verify the count and size
- verifyStats(dest, 200, 200 * messageSize);
+ verifyStats(dest, 200, publishedMessageSize.get());
//consume messages just for sub1
- consumeDurableTestMessages(connection, "sub1", 200);
+ consumeDurableTestMessages(connection, "sub1", 200, publishedMessageSize);
//There is still a durable that hasn't consumed so the messages should exist
- verifyStats(dest, 200, 200 * messageSize);
+ verifyStats(dest, 200, publishedMessageSize.get());
connection.stop();
@@ -164,14 +157,24 @@ public abstract class AbstractMessageStoreSizeStatTest {
@Test
public void testMessageSizeAfterDestinationDeletion() throws Exception {
- Destination dest = publishTestQueueMessages(200);
- verifyStats(dest, 200, 200 * messageSize);
+ AtomicLong publishedMessageSize = new AtomicLong();
+ Destination dest = publishTestQueueMessages(200, publishedMessageSize);
+ verifyStats(dest, 200, publishedMessageSize.get());
//check that the size is 0 after deletion
broker.removeDestination(dest.getActiveMQDestination());
verifyStats(dest, 0, 0);
}
+ @Test
+ public void testQueueBrowserMessageSize() throws Exception {
+ AtomicLong publishedMessageSize = new AtomicLong();
+
+ Destination dest = publishTestQueueMessages(200, publishedMessageSize);
+ browseTestQueueMessages(dest.getName());
+ verifyStats(dest, 200, publishedMessageSize.get());
+ }
+
protected void verifyStats(Destination dest, final int count, final long minimumSize) throws Exception {
final MessageStore messageStore = dest.getMessageStore();
final MessageStoreStatistics storeStats = dest.getMessageStore().getMessageStoreStatistics();
@@ -203,164 +206,31 @@ public abstract class AbstractMessageStoreSizeStatTest {
}
}
- /**
- * Generate random 1 megabyte messages
- * @param session
- * @return
- * @throws JMSException
- */
- protected BytesMessage createMessage(Session session) throws JMSException {
- final BytesMessage message = session.createBytesMessage();
- final byte[] data = new byte[messageSize];
- final Random rng = new Random();
- rng.nextBytes(data);
- message.writeBytes(data);
- return message;
- }
-
- protected Destination publishTestQueueMessages(int count) throws Exception {
- return publishTestQueueMessages(count, defaultQueueName);
+ protected Destination publishTestQueueMessages(int count, AtomicLong publishedMessageSize) throws Exception {
+ return publishTestQueueMessages(count, defaultQueueName, DeliveryMode.PERSISTENT,
+ AbstractStoreStatTestSupport.defaultMessageSize, publishedMessageSize);
}
- protected Destination publishTestQueueMessages(int count, String queueName) throws Exception {
- // create a new queue
- final ActiveMQDestination activeMqQueue = new ActiveMQQueue(
- queueName);
-
- Destination dest = broker.getDestination(activeMqQueue);
-
- // Start the connection
- Connection connection = new ActiveMQConnectionFactory(brokerConnectURI)
- .createConnection();
- connection.setClientID("clientId" + queueName);
- connection.start();
- Session session = connection.createSession(false,
- QueueSession.AUTO_ACKNOWLEDGE);
- Queue queue = session.createQueue(queueName);
-
- try {
- // publish a bunch of non-persistent messages to fill up the temp
- // store
- MessageProducer prod = session.createProducer(queue);
- prod.setDeliveryMode(DeliveryMode.PERSISTENT);
- for (int i = 0; i < count; i++) {
- prod.send(createMessage(session));
- }
-
- } finally {
- connection.close();
- }
-
- return dest;
+ protected Destination publishTestQueueMessages(int count, String queueName, AtomicLong publishedMessageSize) throws Exception {
+ return publishTestQueueMessages(count, queueName, DeliveryMode.PERSISTENT,
+ AbstractStoreStatTestSupport.defaultMessageSize, publishedMessageSize);
}
protected Destination consumeTestQueueMessages() throws Exception {
return consumeTestQueueMessages(defaultQueueName);
}
- protected Destination consumeDurableTestMessages(Connection connection, String sub, int size) throws Exception {
- return consumeDurableTestMessages(connection, sub, size, defaultTopicName);
+ protected Destination consumeDurableTestMessages(Connection connection, String sub, int size,
+ AtomicLong publishedMessageSize) throws Exception {
+ return consumeDurableTestMessages(connection, sub, size, defaultTopicName, publishedMessageSize);
}
- protected Destination consumeTestQueueMessages(String queueName) throws Exception {
- // create a new queue
- final ActiveMQDestination activeMqQueue = new ActiveMQQueue(
- queueName);
-
- Destination dest = broker.getDestination(activeMqQueue);
-
- // Start the connection
- Connection connection = new ActiveMQConnectionFactory(brokerConnectURI)
- .createConnection();
- connection.setClientID("clientId2" + queueName);
- connection.start();
- Session session = connection.createSession(false,
- QueueSession.AUTO_ACKNOWLEDGE);
- Queue queue = session.createQueue(queueName);
-
- try {
- MessageConsumer consumer = session.createConsumer(queue);
- for (int i = 0; i < 200; i++) {
- consumer.receive();
- }
-
- } finally {
- connection.stop();
- }
-
- return dest;
- }
-
- protected Destination consumeDurableTestMessages(Connection connection, String sub, int size, String topicName) throws Exception {
- // create a new queue
- final ActiveMQDestination activeMqTopic = new ActiveMQTopic(
- topicName);
-
- Destination dest = broker.getDestination(activeMqTopic);
-
- Session session = connection.createSession(false,
- QueueSession.AUTO_ACKNOWLEDGE);
- Topic topic = session.createTopic(topicName);
-
- try {
- TopicSubscriber consumer = session.createDurableSubscriber(topic, sub);
- for (int i = 0; i < size; i++) {
- consumer.receive();
- }
-
- } finally {
- session.close();
- }
-
- return dest;
- }
-
- protected Destination publishTestMessagesDurable(Connection connection, String[] subNames, int publishSize, int expectedSize) throws Exception {
- // create a new queue
- final ActiveMQDestination activeMqTopic = new ActiveMQTopic(
- defaultTopicName);
-
- Destination dest = broker.getDestination(activeMqTopic);
-
- // Start the connection
-
- Session session = connection.createSession(false,
- TopicSession.AUTO_ACKNOWLEDGE);
- Topic topic = session.createTopic(defaultTopicName);
- for (String subName : subNames) {
- session.createDurableSubscriber(topic, subName);
- }
-
- // browse the durable sub - this test is to verify that browsing (which calls createTopicMessageStore)
- //in KahaDBStore will not create a brand new store (ie uses the cache) If the cache is not used,
- //then the statistics won't be updated properly because a new store would overwrite the old store
- //which is still in use
- ObjectName[] subs = broker.getAdminView().getDurableTopicSubscribers();
-
- try {
- // publish a bunch of non-persistent messages to fill up the temp
- // store
- MessageProducer prod = session.createProducer(topic);
- prod.setDeliveryMode(DeliveryMode.PERSISTENT);
- for (int i = 0; i < publishSize; i++) {
- prod.send(createMessage(session));
- }
-
- //verify the view has expected messages
- assertEquals(subNames.length, subs.length);
- ObjectName subName = subs[0];
- DurableSubscriptionViewMBean sub = (DurableSubscriptionViewMBean)
- broker.getManagementContext().newProxyInstance(subName, DurableSubscriptionViewMBean.class, true);
- CompositeData[] data = sub.browse();
- assertNotNull(data);
- assertEquals(expectedSize, data.length);
-
- } finally {
- session.close();
- }
-
- return dest;
+ protected Destination publishTestMessagesDurable(Connection connection, String[] subNames,
+ int publishSize, int expectedSize, AtomicLong publishedMessageSize) throws Exception {
+ return publishTestMessagesDurable(connection, subNames, defaultTopicName,
+ publishSize, expectedSize, AbstractStoreStatTestSupport.defaultMessageSize,
+ publishedMessageSize, true);
}
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/734fb7dd/activemq-unit-tests/src/test/java/org/apache/activemq/store/AbstractStoreStatTestSupport.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/store/AbstractStoreStatTestSupport.java b/activemq-unit-tests/src/test/java/org/apache/activemq/store/AbstractStoreStatTestSupport.java
new file mode 100644
index 0000000..3f0e7c1
--- /dev/null
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/store/AbstractStoreStatTestSupport.java
@@ -0,0 +1,268 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.store;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+import java.net.URI;
+import java.util.Enumeration;
+import java.util.Random;
+import java.util.concurrent.atomic.AtomicLong;
+
+import javax.jms.BytesMessage;
+import javax.jms.Connection;
+import javax.jms.DeliveryMode;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.QueueBrowser;
+import javax.jms.QueueSession;
+import javax.jms.Session;
+import javax.jms.Topic;
+import javax.jms.TopicSession;
+import javax.jms.TopicSubscriber;
+import javax.management.ObjectName;
+import javax.management.openmbean.CompositeData;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.jmx.DurableSubscriptionViewMBean;
+import org.apache.activemq.broker.region.Destination;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQMessage;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ActiveMQTopic;
+
+/**
+ *
+ *
+ */
+public abstract class AbstractStoreStatTestSupport {
+
+ protected static int defaultMessageSize = 1000;
+
+ protected abstract BrokerService getBroker();
+
+ protected abstract URI getBrokerConnectURI();
+
+ protected Destination consumeTestQueueMessages(String queueName) throws Exception {
+ // create a new queue
+ final ActiveMQDestination activeMqQueue = new ActiveMQQueue(
+ queueName);
+
+ Destination dest = getBroker().getDestination(activeMqQueue);
+
+ // Start the connection
+ Connection connection = new ActiveMQConnectionFactory(getBrokerConnectURI())
+ .createConnection();
+ connection.setClientID("clientId2" + queueName);
+ connection.start();
+ Session session = connection.createSession(false,
+ QueueSession.AUTO_ACKNOWLEDGE);
+ Queue queue = session.createQueue(queueName);
+
+ try {
+ MessageConsumer consumer = session.createConsumer(queue);
+ for (int i = 0; i < 200; i++) {
+ consumer.receive();
+ }
+
+ } finally {
+ connection.stop();
+ }
+
+ return dest;
+ }
+
+ protected Destination browseTestQueueMessages(String queueName) throws Exception {
+ // create a new queue
+ final ActiveMQDestination activeMqQueue = new ActiveMQQueue(
+ queueName);
+
+ Destination dest = getBroker().getDestination(activeMqQueue);
+
+ // Start the connection
+ Connection connection = new ActiveMQConnectionFactory(getBrokerConnectURI())
+ .createConnection();
+ connection.setClientID("clientId2" + queueName);
+ connection.start();
+ Session session = connection.createSession(false,
+ QueueSession.AUTO_ACKNOWLEDGE);
+ Queue queue = session.createQueue(queueName);
+
+ try {
+ QueueBrowser queueBrowser = session.createBrowser(queue);
+ @SuppressWarnings("unchecked")
+ Enumeration<Message> messages = queueBrowser.getEnumeration();
+ while (messages.hasMoreElements()) {
+ messages.nextElement();
+ }
+
+ } finally {
+ connection.stop();
+ }
+
+ return dest;
+ }
+
+ protected Destination consumeDurableTestMessages(Connection connection, String sub,
+ int size, String topicName, AtomicLong publishedMessageSize) throws Exception {
+ // create a new queue
+ final ActiveMQDestination activeMqTopic = new ActiveMQTopic(
+ topicName);
+
+ Destination dest = getBroker().getDestination(activeMqTopic);
+
+ Session session = connection.createSession(false,
+ QueueSession.AUTO_ACKNOWLEDGE);
+ Topic topic = session.createTopic(topicName);
+
+ try {
+ TopicSubscriber consumer = session.createDurableSubscriber(topic, sub);
+ for (int i = 0; i < size; i++) {
+ ActiveMQMessage message = (ActiveMQMessage) consumer.receive();
+ if (publishedMessageSize != null) {
+ publishedMessageSize.addAndGet(-message.getSize());
+ }
+ }
+
+ } finally {
+ session.close();
+ }
+
+ return dest;
+ }
+
+ protected org.apache.activemq.broker.region.Queue publishTestQueueMessages(int count, String queueName,
+ int deliveryMode, int messageSize, AtomicLong publishedMessageSize) throws Exception {
+ // create a new queue
+ final ActiveMQDestination activeMqQueue = new ActiveMQQueue(
+ queueName);
+
+ Destination dest = getBroker().getDestination(activeMqQueue);
+
+ // Start the connection
+ Connection connection = new ActiveMQConnectionFactory(getBrokerConnectURI())
+ .createConnection();
+ connection.setClientID("clientId" + queueName);
+ connection.start();
+ Session session = connection.createSession(false,
+ QueueSession.AUTO_ACKNOWLEDGE);
+ Queue queue = session.createQueue(queueName);
+
+ try {
+ // publish a bunch of non-persistent messages to fill up the temp
+ // store
+ MessageProducer prod = session.createProducer(queue);
+ prod.setDeliveryMode(deliveryMode);
+ for (int i = 0; i < count; i++) {
+ prod.send(createMessage(session, messageSize, publishedMessageSize));
+ }
+
+ } finally {
+ connection.close();
+ }
+
+ return (org.apache.activemq.broker.region.Queue) dest;
+ }
+
+ protected org.apache.activemq.broker.region.Topic publishTestMessagesDurable(Connection connection, String[] subNames, String topicName,
+ int publishSize, int expectedSize, int messageSize, AtomicLong publishedMessageSize,
+ boolean verifyBrowsing) throws Exception {
+ return this.publishTestMessagesDurable(connection, subNames, topicName, publishSize, expectedSize, messageSize,
+ publishedMessageSize, verifyBrowsing, DeliveryMode.PERSISTENT);
+ }
+
+ protected org.apache.activemq.broker.region.Topic publishTestMessagesDurable(Connection connection, String[] subNames, String topicName,
+ int publishSize, int expectedSize, int messageSize, AtomicLong publishedMessageSize,
+ boolean verifyBrowsing, int deliveryMode) throws Exception {
+ // create a new queue
+ final ActiveMQDestination activeMqTopic = new ActiveMQTopic(
+ topicName);
+
+ Destination dest = getBroker().getDestination(activeMqTopic);
+
+ // Start the connection
+
+ Session session = connection.createSession(false,
+ TopicSession.AUTO_ACKNOWLEDGE);
+ Topic topic = session.createTopic(topicName);
+ for (String subName : subNames) {
+ session.createDurableSubscriber(topic, subName);
+ }
+
+ ObjectName[] subs = null;
+ if (verifyBrowsing) {
+ // browse the durable sub - this test is to verify that browsing (which calls createTopicMessageStore)
+ //in KahaDBStore will not create a brand new store (ie uses the cache) If the cache is not used,
+ //then the statistics won't be updated properly because a new store would overwrite the old store
+ //which is still in use
+ subs = getBroker().getAdminView().getDurableTopicSubscribers();
+ }
+
+ try {
+ // publish a bunch of non-persistent messages to fill up the temp
+ // store
+ MessageProducer prod = session.createProducer(topic);
+ prod.setDeliveryMode(deliveryMode);
+ for (int i = 0; i < publishSize; i++) {
+ prod.send(createMessage(session, messageSize, publishedMessageSize));
+ }
+
+ //verify the view has expected messages
+ if (verifyBrowsing) {
+ assertNotNull(subs);
+ assertEquals(subNames.length, subs.length);
+ ObjectName subName = subs[0];
+ DurableSubscriptionViewMBean sub = (DurableSubscriptionViewMBean)
+ getBroker().getManagementContext().newProxyInstance(subName, DurableSubscriptionViewMBean.class, true);
+ CompositeData[] data = sub.browse();
+ assertNotNull(data);
+ assertEquals(expectedSize, data.length);
+ }
+
+ } finally {
+ session.close();
+ }
+
+ return (org.apache.activemq.broker.region.Topic) dest;
+ }
+
+ /**
+ * Generate random messages between 100 bytes and messageSize
+ * @param session
+ * @return
+ * @throws JMSException
+ */
+ protected BytesMessage createMessage(Session session, int messageSize, AtomicLong publishedMessageSize) throws JMSException {
+ final BytesMessage message = session.createBytesMessage();
+ final Random rn = new Random();
+ int size = rn.nextInt(messageSize - 100);
+ if (publishedMessageSize != null) {
+ publishedMessageSize.addAndGet(size);
+ }
+
+ final byte[] data = new byte[size];
+ final Random rng = new Random();
+ rng.nextBytes(data);
+ message.writeBytes(data);
+ return message;
+ }
+}