You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2014/10/17 00:54:37 UTC
[5/6] git commit: https://issues.apache.org/jira/browse/AMQ-5266
https://issues.apache.org/jira/browse/AMQ-4485 - store has messages must be
aware of pending also kahadb setBatch for async sends. additional tests and
tidy up of cusror sync with store to
https://issues.apache.org/jira/browse/AMQ-5266 https://issues.apache.org/jira/browse/AMQ-4485 - store has messages must be aware of pending also kahadb setBatch for async sends. additional tests and tidy up of cusror sync with store to reflect async/sync additions
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/9c2b1d25
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/9c2b1d25
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/9c2b1d25
Branch: refs/heads/trunk
Commit: 9c2b1d257288fb85138a37e30e1216251ca13eaf
Parents: 243db1c
Author: gtully <ga...@gmail.com>
Authored: Thu Oct 16 23:32:55 2014 +0100
Committer: gtully <ga...@gmail.com>
Committed: Thu Oct 16 23:35:18 2014 +0100
----------------------------------------------------------------------
.../apache/activemq/broker/region/Queue.java | 19 +-
.../region/cursors/AbstractStoreCursor.java | 95 +--
.../region/cursors/QueueStorePrefetch.java | 8 +-
.../activemq/store/ProxyMessageStore.java | 5 +
.../activemq/store/kahadb/KahaDBStore.java | 8 +
.../activemq/store/kahadb/MessageDatabase.java | 9 +-
activemq-unit-tests/pom.xml | 1 +
.../cursors/StoreQueueCursorOrderTest.java | 517 +++++++++++++++
.../bugs/AMQ5266StarvedConsumerTest.java | 641 +++++++++++++++++++
9 files changed, 1249 insertions(+), 54 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq/blob/9c2b1d25/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
index 6df48da..21d7522 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
@@ -771,19 +771,24 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index
candidate = indexOrderedCursorUpdates.peek();
}
}
- for (MessageContext messageContext : orderedUpdates) {
- if (!cursorAdd(messageContext.message)) {
- // cursor suppressed a duplicate
- messageContext.duplicate = true;
+ messagesLock.writeLock().lock();
+ try {
+ for (MessageContext messageContext : orderedUpdates) {
+ if (!messages.addMessageLast(messageContext.message)) {
+ // cursor suppressed a duplicate
+ messageContext.duplicate = true;
+ }
+ if (messageContext.onCompletion != null) {
+ messageContext.onCompletion.run();
+ }
}
+ } finally {
+ messagesLock.writeLock().unlock();
}
} finally {
sendLock.unlock();
}
for (MessageContext messageContext : orderedUpdates) {
- if (messageContext.onCompletion != null) {
- messageContext.onCompletion.run();
- }
if (!messageContext.duplicate) {
messageSent(messageContext.context, messageContext.message);
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/9c2b1d25/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
index 19864b7..c4bf985 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
@@ -20,6 +20,8 @@ import java.util.Iterator;
import java.util.LinkedList;
import java.util.ListIterator;
import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.MessageReference;
import org.apache.activemq.broker.region.Subscription;
@@ -90,6 +92,7 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i
public synchronized boolean recoverMessage(Message message, boolean cached) throws Exception {
boolean recovered = false;
+ storeHasMessages = true;
if (recordUniqueId(message.getMessageId())) {
if (!cached) {
message.setRegionDestination(regionDestination);
@@ -101,12 +104,13 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i
batchList.addMessageLast(message);
clearIterator(true);
recovered = true;
- storeHasMessages = true;
} else if (!cached) {
// a duplicate from the store (!cached) - needs to be removed/acked - otherwise it will get re dispatched on restart
if (message.isRecievedByDFBridge()) {
// expected for messages pending acks with kahadb.concurrentStoreAndDispatchQueues=true
- LOG.trace("{} store replayed pending message due to concurrentStoreAndDispatchQueues {} seq: {}", this, message.getMessageId(), message.getMessageId().getFutureOrSequenceLong());
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("{} store replayed pending message due to concurrentStoreAndDispatchQueues {} seq: {}", this, message.getMessageId(), message.getMessageId().getFutureOrSequenceLong());
+ }
} else {
LOG.warn("{} - cursor got duplicate from store {} seq: {}", this, message.getMessageId(), message.getMessageId().getFutureOrSequenceLong());
duplicate(message);
@@ -201,7 +205,9 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i
boolean disableCache = false;
if (hasSpace()) {
if (!isCacheEnabled() && size==0 && isStarted() && useCache) {
- LOG.trace("{} - enabling cache for empty store {} {}", this, node.getMessageId(), node.getMessageId().getFutureOrSequenceLong());
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("{} - enabling cache for empty store {} {}", this, node.getMessageId(), node.getMessageId().getFutureOrSequenceLong());
+ }
setCacheEnabled(true);
}
if (isCacheEnabled()) {
@@ -217,64 +223,48 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i
}
if (disableCache && isCacheEnabled()) {
- LOG.trace("{} - disabling cache on add {} {}", this, node.getMessageId(), node.getMessageId().getFutureOrSequenceLong());
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("{} - disabling cache on add {} {}", this, node.getMessageId(), node.getMessageId().getFutureOrSequenceLong());
+ }
+ syncWithStore(node.getMessage());
setCacheEnabled(false);
- syncWithStore();
}
this.storeHasMessages = true;
size++;
return true;
}
- private void syncWithStore() throws Exception {
+ private void syncWithStore(Message currentAdd) throws Exception {
+ pruneLastCached();
if (lastCachedIds[SYNC_ADD] == null) {
- // only async adds, lets wait on the potential last add and reset from there
+ // possibly only async adds, lets wait on the potential last add and reset from there
for (ListIterator<MessageId> it = pendingCachedIds.listIterator(pendingCachedIds.size()); it.hasPrevious(); ) {
- MessageId lastStored = it.previous();
- Object futureOrLong = lastStored.getFutureOrSequenceLong();
+ MessageId lastPending = it.previous();
+ Object futureOrLong = lastPending.getFutureOrSequenceLong();
if (futureOrLong instanceof Future) {
Future future = (Future) futureOrLong;
if (future.isCancelled()) {
continue;
- } else {
- try {
- future.get();
- setLastCachedId(ASYNC_ADD, lastStored);
- } catch (Exception ignored) {}
}
+ try {
+ future.get(5, TimeUnit.SECONDS);
+ setLastCachedId(ASYNC_ADD, lastPending);
+ } catch (TimeoutException potentialDeadlock) {
+ LOG.warn("{} timed out waiting for async add", this, potentialDeadlock);
+ } catch (Exception cancelledOrTimeOutOrErrorWorstCaseWeReplay) {cancelledOrTimeOutOrErrorWorstCaseWeReplay.printStackTrace();}
+ } else {
+ setLastCachedId(ASYNC_ADD, lastPending);
}
+ break;
}
if (lastCachedIds[ASYNC_ADD] != null) {
- setBatch(lastCachedIds[ASYNC_ADD]);
- }
- } else {
- // mix of async and sync - async can exceed sync only if next in sequence
- for (Iterator<MessageId> it = pendingCachedIds.iterator(); it.hasNext(); ) {
- MessageId candidate = it.next();
- final Object futureOrLong = candidate.getFutureOrSequenceLong();
- if (futureOrLong instanceof Future) {
- Future future = (Future) futureOrLong;
- if (future.isCancelled()) {
- it.remove();
- } else {
- try {
- future.get();
- long next = 1 + (Long)lastCachedIds[SYNC_ADD].getFutureOrSequenceLong();
- if (Long.compare(((Long) candidate.getFutureOrSequenceLong()), next) == 0) {
- setLastCachedId(SYNC_ADD, candidate);
- } else {
- // out of sequence, revert to sync state
- LOG.trace("{} cursor order out of sync at seq {}, audit must suppress potential replay of {} messages from the store", this, next, pendingCachedIds.size());
- break;
- }
- } catch (Exception ignored) {}
- }
+ // ensure we don't skip current possibly sync add b/c we waited on the future
+ if (currentAdd.isRecievedByDFBridge() || Long.compare(((Long) currentAdd.getMessageId().getFutureOrSequenceLong()), ((Long) lastCachedIds[ASYNC_ADD].getFutureOrSequenceLong())) > 0) {
+ setBatch(lastCachedIds[ASYNC_ADD]);
}
}
- if (lastCachedIds[SYNC_ADD] != null) {
- setBatch(lastCachedIds[SYNC_ADD]);
- }
-
+ } else {
+ setBatch(lastCachedIds[SYNC_ADD]);
}
// cleanup
lastCachedIds[SYNC_ADD] = lastCachedIds[ASYNC_ADD] = null;
@@ -282,7 +272,7 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i
}
private void trackLastCached(MessageReference node) {
- if (node.getMessageId().getFutureOrSequenceLong() instanceof Future) {
+ if (node.getMessageId().getFutureOrSequenceLong() instanceof Future || node.getMessage().isRecievedByDFBridge()) {
pruneLastCached();
pendingCachedIds.add(node.getMessageId());
} else {
@@ -305,6 +295,19 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i
} else {
// complete
setLastCachedId(ASYNC_ADD, candidate);
+
+ // keep lock step with sync adds while order is preserved
+ if (lastCachedIds[SYNC_ADD] != null) {
+ long next = 1 + (Long)lastCachedIds[SYNC_ADD].getFutureOrSequenceLong();
+ if (Long.compare((Long)futureOrLong, next) == 0) {
+ setLastCachedId(SYNC_ADD, candidate);
+ } else {
+ // out of sequence, revert to sync state
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("{} cursor order out of sync at seq {}, audit must suppress potential replay of {} messages from the store", this, next, pendingCachedIds.size());
+ }
+ }
+ }
it.remove();
}
}
@@ -374,13 +377,17 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i
this.batchResetNeeded = false;
}
if (this.batchList.isEmpty() && this.storeHasMessages && this.size >0) {
+ // avoid repeated trips to the store if there is nothing of interest
+ this.storeHasMessages = false;
try {
doFillBatch();
} catch (Exception e) {
LOG.error("{} - Failed to fill batch", this, e);
throw new RuntimeException(e);
}
- this.storeHasMessages = !this.batchList.isEmpty() || !hadSpace;
+ if (!this.storeHasMessages && (!this.batchList.isEmpty() || !hadSpace)) {
+ this.storeHasMessages = true;
+ }
}
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/9c2b1d25/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java
index 94dc817..9fb73c5 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java
@@ -94,7 +94,9 @@ class QueueStorePrefetch extends AbstractStoreCursor {
@Override
protected void setBatch(MessageId messageId) throws Exception {
- LOG.trace("{} setBatch {} seq: {}, loc: {}", this, messageId, messageId.getFutureOrSequenceLong(), messageId.getEntryLocator());
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("{} setBatch {} seq: {}, loc: {}", this, messageId, messageId.getFutureOrSequenceLong(), messageId.getEntryLocator());
+ }
store.setBatch(messageId);
batchResetNeeded = false;
}
@@ -109,4 +111,8 @@ class QueueStorePrefetch extends AbstractStoreCursor {
}
}
+ @Override
+ public String toString(){
+ return super.toString() + ",store=" + store;
+ }
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/9c2b1d25/activemq-broker/src/main/java/org/apache/activemq/store/ProxyMessageStore.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/store/ProxyMessageStore.java b/activemq-broker/src/main/java/org/apache/activemq/store/ProxyMessageStore.java
index 8c747e8..901c769 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/store/ProxyMessageStore.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/store/ProxyMessageStore.java
@@ -165,4 +165,9 @@ public class ProxyMessageStore implements MessageStore {
public void registerIndexListener(IndexListener indexListener) {
delegate.registerIndexListener(indexListener);
}
+
+ @Override
+ public String toString() {
+ return delegate.toString();
+ }
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/9c2b1d25/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
index eb5d1c4..a18071b 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
@@ -665,6 +665,10 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
StoredDestination sd = getStoredDestination(dest, tx);
Long location = sd.messageIdIndex.get(tx, key);
if (location != null) {
+ Long pending = sd.orderIndex.minPendingAdd();
+ if (pending != null) {
+ location = Math.min(location, pending-1);
+ }
sd.orderIndex.setBatch(tx, location);
} else {
LOG.warn("{} {} setBatch failed, location for {} not found in messageId index for {}", this, dest.getName(), identity.getFutureOrSequenceLong(), identity);
@@ -714,6 +718,10 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
this.localDestinationSemaphore.release();
}
+ @Override
+ public String toString(){
+ return "permits:" + this.localDestinationSemaphore.availablePermits() + ",sd=" + storedDestinations.get(key(dest));
+ }
}
class KahaDBTopicMessageStore extends KahaDBMessageStore implements TopicMessageStore {
http://git-wip-us.apache.org/repos/asf/activemq/blob/9c2b1d25/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
index 554f1d3..4de5f16 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
@@ -1767,7 +1767,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
// StoredDestination related implementation methods.
// /////////////////////////////////////////////////////////////////
- private final HashMap<String, StoredDestination> storedDestinations = new HashMap<String, StoredDestination>();
+ protected final HashMap<String, StoredDestination> storedDestinations = new HashMap<String, StoredDestination>();
static class MessageKeys {
final String messageId;
@@ -1886,6 +1886,11 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
public void trackPendingAddComplete(Long seq) {
orderIndex.trackPendingAddComplete(seq);
}
+
+ @Override
+ public String toString() {
+ return "nextSeq:" + orderIndex.nextMessageId + ",lastRet:" + orderIndex.cursor + ",pending:" + orderIndex.pendingAdditions.size();
+ }
}
protected class StoredDestinationMarshaller extends VariableMarshaller<StoredDestination> {
@@ -2337,7 +2342,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
return 0;
}
- private String key(KahaDestination destination) {
+ protected String key(KahaDestination destination) {
return destination.getType().getNumber() + ":" + destination.getName();
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/9c2b1d25/activemq-unit-tests/pom.xml
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/pom.xml b/activemq-unit-tests/pom.xml
index 1333412..4735144 100755
--- a/activemq-unit-tests/pom.xml
+++ b/activemq-unit-tests/pom.xml
@@ -1033,6 +1033,7 @@
<exclude>org/apache/activemq/store/kahadb/disk/index/HashIndexTest.*</exclude>
<exclude>org/apache/activemq/store/kahadb/disk/index/ListIndexTest.*</exclude>
<exclude>org/apache/activemq/store/kahadb/disk/util/DataByteArrayInputStreamTest.*</exclude>
+ <exclude>org/apache/activemq/bugs/AMQ5266StarvedConsumerTest.*</exclude>
</excludes>
</configuration>
</plugin>
http://git-wip-us.apache.org/repos/asf/activemq/blob/9c2b1d25/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/StoreQueueCursorOrderTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/StoreQueueCursorOrderTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/StoreQueueCursorOrderTest.java
new file mode 100644
index 0000000..f8fab10
--- /dev/null
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/StoreQueueCursorOrderTest.java
@@ -0,0 +1,517 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.broker.region.cursors;
+
+import java.io.IOException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.FutureTask;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.broker.region.DestinationStatistics;
+import org.apache.activemq.broker.region.MessageReference;
+import org.apache.activemq.broker.region.Queue;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ActiveMQTextMessage;
+import org.apache.activemq.command.ConsumerInfo;
+import org.apache.activemq.command.Message;
+import org.apache.activemq.command.MessageAck;
+import org.apache.activemq.command.MessageId;
+import org.apache.activemq.store.AbstractMessageStore;
+import org.apache.activemq.store.MessageRecoveryListener;
+import org.apache.activemq.usage.SystemUsage;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class StoreQueueCursorOrderTest {
+ private static final Logger LOG = LoggerFactory.getLogger(StoreQueueCursorOrderTest.class);
+
+ ActiveMQQueue destination = new ActiveMQQueue("queue-"
+ + StoreQueueCursorOrderTest.class.getSimpleName());
+ BrokerService brokerService;
+
+ final static String mesageIdRoot = "11111:22222:0:";
+ final int messageBytesSize = 1024;
+ final String text = new String(new byte[messageBytesSize]);
+
+ @Before
+ public void setUp() throws Exception {
+ brokerService = createBroker();
+ brokerService.setUseJmx(false);
+ brokerService.deleteAllMessages();
+ brokerService.start();
+ }
+
+ protected BrokerService createBroker() throws Exception {
+ return new BrokerService();
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ brokerService.stop();
+ }
+
+ @Test
+ public void tesBlockedFuture() throws Exception {
+ final int count = 2;
+ final Message[] messages = new Message[count];
+ final TestMessageStore queueMessageStore = new TestMessageStore(messages, destination);
+ final ConsumerInfo consumerInfo = new ConsumerInfo();
+ final DestinationStatistics destinationStatistics = new DestinationStatistics();
+ consumerInfo.setExclusive(true);
+
+ final Queue queue = new Queue(brokerService, destination,
+ queueMessageStore, destinationStatistics, null);
+
+ queueMessageStore.start();
+ queueMessageStore.registerIndexListener(null);
+
+ QueueStorePrefetch underTest = new QueueStorePrefetch(queue, brokerService.getBroker());
+ SystemUsage systemUsage = new SystemUsage();
+ // ensure memory limit is reached
+ systemUsage.getMemoryUsage().setLimit(messageBytesSize * 1);
+ underTest.setSystemUsage(systemUsage);
+ underTest.setEnableAudit(false);
+ underTest.start();
+ assertTrue("cache enabled", underTest.isUseCache() && underTest.isCacheEnabled());
+
+ ActiveMQTextMessage msg = getMessage(0);
+ messages[1] = msg;
+ msg.setMemoryUsage(systemUsage.getMemoryUsage());
+ msg.setRecievedByDFBridge(true);
+ FutureTask<Long> future = new FutureTask<Long>(new Runnable() {
+ @Override
+ public void run() {
+ }
+ }, 2l) {};
+ msg.getMessageId().setFutureOrSequenceLong(future);
+ underTest.addMessageLast(msg);
+
+ assertTrue("cache enabled", underTest.isUseCache() && underTest.isCacheEnabled());
+
+ // second message will flip the cache but will be stored before the future task
+ msg = getMessage(1);
+ messages[0] = msg;
+ msg.setMemoryUsage(systemUsage.getMemoryUsage());
+ msg.getMessageId().setFutureOrSequenceLong(1l);
+ underTest.addMessageLast(msg);
+
+
+ assertTrue("cache is disabled as limit reached", !underTest.isCacheEnabled());
+ assertEquals("setBatch unset", 0l, queueMessageStore.batch.get());
+
+ int dequeueCount = 0;
+
+ underTest.setMaxBatchSize(2);
+ underTest.reset();
+ while (underTest.hasNext() && dequeueCount < count) {
+ MessageReference ref = underTest.next();
+ ref.decrementReferenceCount();
+ underTest.remove();
+ LOG.info("Received message: {} with body: {}",
+ ref.getMessageId(), ((ActiveMQTextMessage)ref.getMessage()).getText());
+ assertEquals(dequeueCount++, ref.getMessageId().getProducerSequenceId());
+ }
+ underTest.release();
+ assertEquals(count, dequeueCount);
+ }
+
+ @Test
+ public void testNoSetBatchWithUnOrderedFutureCurrentSync() throws Exception {
+ final int count = 2;
+ final Message[] messages = new Message[count];
+ final TestMessageStore queueMessageStore = new TestMessageStore(messages, destination);
+ final ConsumerInfo consumerInfo = new ConsumerInfo();
+ final DestinationStatistics destinationStatistics = new DestinationStatistics();
+ consumerInfo.setExclusive(true);
+
+ final Queue queue = new Queue(brokerService, destination,
+ queueMessageStore, destinationStatistics, null);
+
+ queueMessageStore.start();
+ queueMessageStore.registerIndexListener(null);
+
+ QueueStorePrefetch underTest = new QueueStorePrefetch(queue, brokerService.getBroker());
+ SystemUsage systemUsage = new SystemUsage();
+ // ensure memory limit is reached
+ systemUsage.getMemoryUsage().setLimit(messageBytesSize * 1);
+ underTest.setSystemUsage(systemUsage);
+ underTest.setEnableAudit(false);
+ underTest.start();
+ assertTrue("cache enabled", underTest.isUseCache() && underTest.isCacheEnabled());
+
+ ActiveMQTextMessage msg = getMessage(0);
+ messages[1] = msg;
+ msg.setMemoryUsage(systemUsage.getMemoryUsage());
+ msg.setRecievedByDFBridge(true);
+ final ActiveMQTextMessage msgRef = msg;
+ FutureTask<Long> future = new FutureTask<Long>(new Runnable() {
+ @Override
+ public void run() {
+ msgRef.getMessageId().setFutureOrSequenceLong(1l);
+ }
+ }, 1l) {};
+ msg.getMessageId().setFutureOrSequenceLong(future);
+ Executors.newSingleThreadExecutor().submit(future);
+ underTest.addMessageLast(msg);
+
+ assertTrue("cache enabled", underTest.isUseCache() && underTest.isCacheEnabled());
+
+ // second message will flip the cache but will be stored before the future task
+ msg = getMessage(1);
+ messages[0] = msg;
+ msg.setMemoryUsage(systemUsage.getMemoryUsage());
+ msg.getMessageId().setFutureOrSequenceLong(1l);
+ underTest.addMessageLast(msg);
+
+
+ assertTrue("cache is disabled as limit reached", !underTest.isCacheEnabled());
+ assertEquals("setBatch unset", 0l, queueMessageStore.batch.get());
+
+ int dequeueCount = 0;
+
+ underTest.setMaxBatchSize(2);
+ underTest.reset();
+ while (underTest.hasNext() && dequeueCount < count) {
+ MessageReference ref = underTest.next();
+ ref.decrementReferenceCount();
+ underTest.remove();
+ LOG.info("Received message: {} with body: {}",
+ ref.getMessageId(), ((ActiveMQTextMessage)ref.getMessage()).getText());
+ assertEquals(dequeueCount++, ref.getMessageId().getProducerSequenceId());
+ }
+ underTest.release();
+ assertEquals(count, dequeueCount);
+ }
+
+ @Test
+ public void testSetBatchWithOrderedFutureCurrentFuture() throws Exception {
+ final int count = 2;
+ final Message[] messages = new Message[count];
+ final TestMessageStore queueMessageStore = new TestMessageStore(messages, destination);
+ final ConsumerInfo consumerInfo = new ConsumerInfo();
+ final DestinationStatistics destinationStatistics = new DestinationStatistics();
+ consumerInfo.setExclusive(true);
+
+ final Queue queue = new Queue(brokerService, destination,
+ queueMessageStore, destinationStatistics, null);
+
+ queueMessageStore.start();
+ queueMessageStore.registerIndexListener(null);
+
+ QueueStorePrefetch underTest = new QueueStorePrefetch(queue, brokerService.getBroker());
+ SystemUsage systemUsage = new SystemUsage();
+ // ensure memory limit is reached
+ systemUsage.getMemoryUsage().setLimit(messageBytesSize * 1);
+ underTest.setSystemUsage(systemUsage);
+ underTest.setEnableAudit(false);
+ underTest.start();
+ assertTrue("cache enabled", underTest.isUseCache() && underTest.isCacheEnabled());
+
+ ActiveMQTextMessage msg = getMessage(0);
+ messages[0] = msg;
+ msg.setMemoryUsage(systemUsage.getMemoryUsage());
+ msg.setRecievedByDFBridge(true);
+ final ActiveMQTextMessage msgRef = msg;
+ FutureTask<Long> future = new FutureTask<Long>(new Runnable() {
+ @Override
+ public void run() {
+ msgRef.getMessageId().setFutureOrSequenceLong(0l);
+ }
+ }, 0l) {};
+ msg.getMessageId().setFutureOrSequenceLong(future);
+ Executors.newSingleThreadExecutor().submit(future);
+ underTest.addMessageLast(msg);
+
+ assertTrue("cache enabled", underTest.isUseCache() && underTest.isCacheEnabled());
+
+ // second message will flip the cache but will be stored before the future task
+ msg = getMessage(1);
+ messages[1] = msg;
+ msg.setMemoryUsage(systemUsage.getMemoryUsage());
+ msg.setRecievedByDFBridge(true);
+ final ActiveMQTextMessage msgRe2f = msg;
+ FutureTask<Long> future2 = new FutureTask<Long>(new Runnable() {
+ @Override
+ public void run() {
+ msgRe2f.getMessageId().setFutureOrSequenceLong(1l);
+ }
+ }, 1l) {};
+ msg.getMessageId().setFutureOrSequenceLong(future2);
+ Executors.newSingleThreadExecutor().submit(future2);
+ underTest.addMessageLast(msg);
+
+
+ assertTrue("cache is disabled as limit reached", !underTest.isCacheEnabled());
+ assertEquals("setBatch set", 1l, queueMessageStore.batch.get());
+
+ int dequeueCount = 0;
+
+ underTest.setMaxBatchSize(2);
+ underTest.reset();
+ while (underTest.hasNext() && dequeueCount < count) {
+ MessageReference ref = underTest.next();
+ ref.decrementReferenceCount();
+ underTest.remove();
+ LOG.info("Received message: {} with body: {}",
+ ref.getMessageId(), ((ActiveMQTextMessage)ref.getMessage()).getText());
+ assertEquals(dequeueCount++, ref.getMessageId().getProducerSequenceId());
+ }
+ underTest.release();
+ assertEquals(count, dequeueCount);
+ }
+
+ @Test
+ public void testSetBatchWithFuture() throws Exception {
+ final int count = 4;
+ final Message[] messages = new Message[count];
+ final TestMessageStore queueMessageStore = new TestMessageStore(messages, destination);
+ final ConsumerInfo consumerInfo = new ConsumerInfo();
+ final DestinationStatistics destinationStatistics = new DestinationStatistics();
+ consumerInfo.setExclusive(true);
+
+ final Queue queue = new Queue(brokerService, destination,
+ queueMessageStore, destinationStatistics, null);
+
+ queueMessageStore.start();
+ queueMessageStore.registerIndexListener(null);
+
+ QueueStorePrefetch underTest = new QueueStorePrefetch(queue, brokerService.getBroker());
+ SystemUsage systemUsage = new SystemUsage();
+ // ensure memory limit is reached
+ systemUsage.getMemoryUsage().setLimit(messageBytesSize * (count + 6));
+ underTest.setSystemUsage(systemUsage);
+ underTest.setEnableAudit(false);
+ underTest.start();
+ assertTrue("cache enabled", underTest.isUseCache() && underTest.isCacheEnabled());
+
+ ActiveMQTextMessage msg = getMessage(0);
+ messages[0] = msg;
+ msg.setMemoryUsage(systemUsage.getMemoryUsage());
+ msg.setRecievedByDFBridge(true);
+ final ActiveMQTextMessage msgRef = msg;
+ FutureTask<Long> future0 = new FutureTask<Long>(new Runnable() {
+ @Override
+ public void run() {
+ msgRef.getMessageId().setFutureOrSequenceLong(0l);
+ }
+ }, 0l) {};
+ msg.getMessageId().setFutureOrSequenceLong(future0);
+ underTest.addMessageLast(msg);
+ Executors.newSingleThreadExecutor().submit(future0);
+
+
+ msg = getMessage(1);
+ messages[3] = msg;
+ msg.setMemoryUsage(systemUsage.getMemoryUsage());
+ msg.setRecievedByDFBridge(true);
+ final ActiveMQTextMessage msgRef1 = msg;
+ FutureTask<Long> future1 = new FutureTask<Long>(new Runnable() {
+ @Override
+ public void run() {
+ msgRef1.getMessageId().setFutureOrSequenceLong(3l);
+ }
+ }, 3l) {};
+ msg.getMessageId().setFutureOrSequenceLong(future1);
+ underTest.addMessageLast(msg);
+
+
+ msg = getMessage(2);
+ messages[1] = msg;
+ msg.setMemoryUsage(systemUsage.getMemoryUsage());
+ msg.getMessageId().setFutureOrSequenceLong(1l);
+ underTest.addMessageLast(msg);
+
+ assertTrue("cache enabled", underTest.isUseCache() && underTest.isCacheEnabled());
+
+ // out of order future
+ Executors.newSingleThreadExecutor().submit(future1);
+
+ // sync add to flip cache
+ msg = getMessage(3);
+ messages[2] = msg;
+ msg.setMemoryUsage(systemUsage.getMemoryUsage());
+ msg.getMessageId().setFutureOrSequenceLong(3l);
+ underTest.addMessageLast(msg);
+
+
+ assertTrue("cache is disabled as limit reached", !underTest.isCacheEnabled());
+ assertEquals("setBatch set", 2l, queueMessageStore.batch.get());
+
+ int dequeueCount = 0;
+
+ underTest.setMaxBatchSize(count);
+ underTest.reset();
+ while (underTest.hasNext() && dequeueCount < count) {
+ MessageReference ref = underTest.next();
+ ref.decrementReferenceCount();
+ underTest.remove();
+ LOG.info("Received message: {} with body: {}",
+ ref.getMessageId(), ((ActiveMQTextMessage)ref.getMessage()).getText());
+ assertEquals(dequeueCount++, ref.getMessageId().getProducerSequenceId());
+ }
+ underTest.release();
+ assertEquals(count, dequeueCount);
+ }
+
+ @Test
+ public void testSetBatch() throws Exception {
+ final int count = 3;
+ final Message[] messages = new Message[count];
+ final TestMessageStore queueMessageStore = new TestMessageStore(messages, destination);
+ final ConsumerInfo consumerInfo = new ConsumerInfo();
+ final DestinationStatistics destinationStatistics = new DestinationStatistics();
+ consumerInfo.setExclusive(true);
+
+ final Queue queue = new Queue(brokerService, destination,
+ queueMessageStore, destinationStatistics, null);
+
+ queueMessageStore.start();
+ queueMessageStore.registerIndexListener(null);
+
+ QueueStorePrefetch underTest = new QueueStorePrefetch(queue, brokerService.getBroker());
+ SystemUsage systemUsage = new SystemUsage();
+ // ensure memory limit is reached
+ systemUsage.getMemoryUsage().setLimit(messageBytesSize * 5);
+ underTest.setSystemUsage(systemUsage);
+ underTest.setEnableAudit(false);
+ underTest.start();
+ assertTrue("cache enabled", underTest.isUseCache() && underTest.isCacheEnabled());
+
+
+ ActiveMQTextMessage msg = getMessage(0);
+ messages[0] = msg;
+ msg.setMemoryUsage(systemUsage.getMemoryUsage());
+ msg.getMessageId().setFutureOrSequenceLong(0l);
+ underTest.addMessageLast(msg);
+
+ msg = getMessage(1);
+ messages[1] = msg;
+ msg.setMemoryUsage(systemUsage.getMemoryUsage());
+ msg.getMessageId().setFutureOrSequenceLong(1l);
+ underTest.addMessageLast(msg);
+
+ assertTrue("cache enabled", underTest.isUseCache() && underTest.isCacheEnabled());
+
+ msg = getMessage(2);
+ messages[2] = msg;
+ msg.setMemoryUsage(systemUsage.getMemoryUsage());
+ msg.getMessageId().setFutureOrSequenceLong(2l);
+ underTest.addMessageLast(msg);
+
+
+ assertTrue("cache is disabled as limit reached", !underTest.isCacheEnabled());
+ assertEquals("setBatch set", 2l, queueMessageStore.batch.get());
+
+ int dequeueCount = 0;
+
+ underTest.setMaxBatchSize(2);
+ underTest.reset();
+ while (underTest.hasNext() && dequeueCount < count) {
+ MessageReference ref = underTest.next();
+ ref.decrementReferenceCount();
+ underTest.remove();
+ LOG.info("Received message: {} with body: {}",
+ ref.getMessageId(), ((ActiveMQTextMessage)ref.getMessage()).getText());
+ assertEquals(dequeueCount++, ref.getMessageId().getProducerSequenceId());
+ }
+ underTest.release();
+ assertEquals(count, dequeueCount);
+ }
+
+ private ActiveMQTextMessage getMessage(int i) throws Exception {
+ ActiveMQTextMessage message = new ActiveMQTextMessage();
+ MessageId id = new MessageId(mesageIdRoot + i);
+ id.setBrokerSequenceId(i);
+ id.setProducerSequenceId(i);
+ message.setMessageId(id);
+ message.setDestination(destination);
+ message.setPersistent(true);
+ message.setResponseRequired(true);
+ message.setText("Msg:" + i + " " + text);
+ assertEquals(message.getMessageId().getProducerSequenceId(), i);
+ return message;
+ }
+
+ class TestMessageStore extends AbstractMessageStore {
+ final Message[] messages;
+ public AtomicLong batch = new AtomicLong();
+
+ public TestMessageStore(Message[] messages, ActiveMQDestination dest) {
+ super(dest);
+ this.messages = messages;
+ }
+
+ @Override
+ public void addMessage(ConnectionContext context, Message message) throws IOException {
+
+ }
+
+ @Override
+ public Message getMessage(MessageId identity) throws IOException {
+ return null;
+ }
+
+ @Override
+ public void removeMessage(ConnectionContext context, MessageAck ack) throws IOException {
+
+ }
+
+ @Override
+ public void removeAllMessages(ConnectionContext context) throws IOException {
+
+ }
+
+ @Override
+ public void recover(MessageRecoveryListener container) throws Exception {
+
+ }
+
+ @Override
+ public int getMessageCount() throws IOException {
+ return 0;
+ }
+
+ @Override
+ public void resetBatching() {
+
+ }
+ @Override
+ public void recoverNextMessages(int maxReturned, MessageRecoveryListener listener) throws Exception {
+ for (int i=batch.intValue();i<messages.length;i++) {
+ LOG.info("recovered index:" + i);
+ listener.recoverMessage(messages[i]);
+ }
+ }
+
+ @Override
+ public void setBatch(MessageId message) {
+ batch.set((Long)message.getFutureOrSequenceLong());
+ batch.incrementAndGet();
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/activemq/blob/9c2b1d25/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5266StarvedConsumerTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5266StarvedConsumerTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5266StarvedConsumerTest.java
new file mode 100644
index 0000000..300bec1
--- /dev/null
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5266StarvedConsumerTest.java
@@ -0,0 +1,641 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.bugs;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeSet;
+import java.util.UUID;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.QueueConnection;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.RedeliveryPolicy;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.TransportConnector;
+import org.apache.activemq.broker.region.RegionBroker;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter;
+import org.apache.activemq.store.jdbc.adapter.DefaultJDBCAdapter;
+import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
+import org.apache.derby.jdbc.EmbeddedDataSource;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+import static org.junit.Assert.assertEquals;
+
+/*
+ * pause producers if consumers stall and verify broker drained before resume
+ */
+@RunWith(Parameterized.class)
+public class AMQ5266StarvedConsumerTest {
+ static Logger LOG = LoggerFactory.getLogger(AMQ5266StarvedConsumerTest.class);
+ String activemqURL;
+ BrokerService brokerService;
+ private EmbeddedDataSource dataSource;
+
+ public int messageSize = 1000;
+
+ @Parameterized.Parameter(0)
+ public int publisherMessagesPerThread = 1000;
+
+ @Parameterized.Parameter(1)
+ public int publisherThreadCount = 20;
+
+ @Parameterized.Parameter(2)
+ public int consumerThreadsPerQueue = 5;
+
+ @Parameterized.Parameter(3)
+ public int destMemoryLimit = 50 * 1024;
+
+ @Parameterized.Parameter(4)
+ public boolean useCache = true;
+
+ @Parameterized.Parameter(5)
+ public boolean useDefaultStore = false;
+
+ @Parameterized.Parameter(6)
+ public boolean optimizeDispatch = false;
+ private AtomicBoolean didNotReceive = new AtomicBoolean(false);
+
+ @Parameterized.Parameters(name="#{0},producerThreads:{1},consumerThreads:{2},mL:{3},useCache:{4},useDefaultStore:{5},optimizedDispatch:{6}")
+ public static Iterable<Object[]> parameters() {
+ return Arrays.asList(new Object[][]{
+ {1000, 40, 5, 1024*1024, false, false, true},
+ });
+ }
+
+ public int consumerBatchSize = 5;
+
+ @Before
+ public void startBroker() throws Exception {
+ brokerService = new BrokerService();
+
+ dataSource = new EmbeddedDataSource();
+ dataSource.setDatabaseName("target/derbyDb");
+ dataSource.setCreateDatabase("create");
+
+ JDBCPersistenceAdapter jdbcPersistenceAdapter = new JDBCPersistenceAdapter();
+ jdbcPersistenceAdapter.setDataSource(dataSource);
+ jdbcPersistenceAdapter.setUseLock(false);
+
+ if (!useDefaultStore) {
+ brokerService.setPersistenceAdapter(jdbcPersistenceAdapter);
+ } else {
+ KahaDBPersistenceAdapter kahaDBPersistenceAdapter = (KahaDBPersistenceAdapter) brokerService.getPersistenceAdapter();
+ kahaDBPersistenceAdapter.setConcurrentStoreAndDispatchQueues(true);
+ }
+ brokerService.setDeleteAllMessagesOnStartup(true);
+ brokerService.setUseJmx(false);
+ brokerService.setAdvisorySupport(false);
+
+
+ PolicyMap policyMap = new PolicyMap();
+ PolicyEntry defaultEntry = new PolicyEntry();
+ defaultEntry.setUseConsumerPriority(false); // java.lang.IllegalArgumentException: Comparison method violates its general contract!
+ defaultEntry.setMaxAuditDepth(publisherThreadCount);
+ defaultEntry.setEnableAudit(true);
+ defaultEntry.setUseCache(useCache);
+ defaultEntry.setMaxPageSize(1000);
+ defaultEntry.setOptimizedDispatch(optimizeDispatch);
+ defaultEntry.setMemoryLimit(destMemoryLimit);
+ defaultEntry.setExpireMessagesPeriod(0);
+ policyMap.setDefaultEntry(defaultEntry);
+ brokerService.setDestinationPolicy(policyMap);
+
+ brokerService.getSystemUsage().getMemoryUsage().setLimit(512 * 1024 * 1024);
+
+ TransportConnector transportConnector = brokerService.addConnector("tcp://0.0.0.0:0");
+ brokerService.start();
+ activemqURL = transportConnector.getPublishableConnectString();
+ }
+
+ @After
+ public void stopBroker() throws Exception {
+ if (brokerService != null) {
+ brokerService.stop();
+ }
+ try {
+ dataSource.setShutdownDatabase("shutdown");
+ dataSource.getConnection();
+ } catch (Exception ignored) {}
+ }
+
+ CyclicBarrier globalProducerHalt = new CyclicBarrier(publisherThreadCount, new Runnable() {
+ @Override
+ public void run() {
+ // wait for queue size to go to zero
+ try {
+ while (((RegionBroker)brokerService.getRegionBroker()).getDestinationStatistics().getMessages().getCount() > 0) {
+ LOG.info("Total messageCount: " + ((RegionBroker)brokerService.getRegionBroker()).getDestinationStatistics().getMessages().getCount());
+ TimeUnit.SECONDS.sleep(5);
+ }
+ } catch (Exception ignored) {
+ ignored.printStackTrace();
+ }
+ }
+ });
+
+ @Test(timeout = 30 * 60 * 1000)
+ public void test() throws Exception {
+
+ String activemqQueues = "activemq,activemq2,activemq3,activemq4";//,activemq5,activemq6,activemq7,activemq8,activemq9";
+
+ int consumerWaitForConsumption = 5 * 60 * 1000;
+
+ ExportQueuePublisher publisher = null;
+ ExportQueueConsumer consumer = null;
+
+ LOG.info("Publisher will publish " + (publisherMessagesPerThread * publisherThreadCount) + " messages to each queue specified.");
+ LOG.info("\nBuilding Publisher...");
+
+ publisher = new ExportQueuePublisher(activemqURL, activemqQueues, publisherMessagesPerThread, publisherThreadCount);
+
+ LOG.info("Building Consumer...");
+
+ consumer = new ExportQueueConsumer(activemqURL, activemqQueues, consumerThreadsPerQueue, consumerBatchSize, publisherMessagesPerThread * publisherThreadCount);
+
+
+ LOG.info("Starting Publisher...");
+
+ publisher.start();
+
+ LOG.info("Starting Consumer...");
+
+ consumer.start();
+
+ int distinctPublishedCount = 0;
+
+
+ LOG.info("Waiting For Publisher Completion...");
+
+ publisher.waitForCompletion();
+
+ List publishedIds = publisher.getIDs();
+ distinctPublishedCount = new TreeSet(publishedIds).size();
+
+ LOG.info("Publisher Complete. Published: " + publishedIds.size() + ", Distinct IDs Published: " + distinctPublishedCount);
+
+
+ long endWait = System.currentTimeMillis() + consumerWaitForConsumption;
+ while (!consumer.completed() && System.currentTimeMillis() < endWait) {
+ try {
+ int secs = (int) (endWait - System.currentTimeMillis()) / 1000;
+ LOG.info("Waiting For Consumer Completion. Time left: " + secs + " secs");
+ if (!useDefaultStore) {
+ DefaultJDBCAdapter.dumpTables(dataSource.getConnection());
+ }
+ Thread.sleep(10000);
+ } catch (Exception e) {
+ }
+ }
+
+ LOG.info("\nConsumer Complete: " + consumer.completed() +", Shutting Down.");
+
+ consumer.shutdown();
+
+ TimeUnit.SECONDS.sleep(2);
+ LOG.info("DB Contents START");
+ if (!useDefaultStore) {
+ DefaultJDBCAdapter.dumpTables(dataSource.getConnection());
+ }
+ LOG.info("DB Contents END");
+
+ LOG.info("Consumer Stats:");
+
+ for (Map.Entry<String, List<String>> entry : consumer.getIDs().entrySet()) {
+
+ List<String> idList = entry.getValue();
+
+ int distinctConsumed = new TreeSet<String>(idList).size();
+
+ StringBuilder sb = new StringBuilder();
+ sb.append(" Queue: " + entry.getKey() +
+ " -> Total Messages Consumed: " + idList.size() +
+ ", Distinct IDs Consumed: " + distinctConsumed);
+
+ int diff = distinctPublishedCount - distinctConsumed;
+ sb.append(" ( " + (diff > 0 ? diff : "NO") + " STUCK MESSAGES " + " ) ");
+ LOG.info(sb.toString());
+
+ assertEquals("expect to get all messages!", 0, diff);
+
+ }
+ }
+
+ public class ExportQueuePublisher {
+
+ private final String amqUser = ActiveMQConnection.DEFAULT_USER;
+ private final String amqPassword = ActiveMQConnection.DEFAULT_PASSWORD;
+ private ActiveMQConnectionFactory connectionFactory = null;
+ private String activemqURL = null;
+ private String activemqQueues = null;
+ // Collection of distinct IDs that the publisher has published.
+ // After a message is published, its UUID will be written to this list for tracking.
+ // This list of IDs (or distinct count) will be used to compare to the consumed list of IDs.
+ //private Set<String> ids = Collections.synchronizedSet(new TreeSet<String>());
+ private List<String> ids = Collections.synchronizedList(new ArrayList<String>());
+ private List<PublisherThread> threads;
+
+ public ExportQueuePublisher(String activemqURL, String activemqQueues, int messagesPerThread, int threadCount) throws Exception {
+
+ this.activemqURL = activemqURL;
+ this.activemqQueues = activemqQueues;
+
+ threads = new ArrayList<PublisherThread>();
+
+ // Build the threads and tell them how many messages to publish
+ for (int i = 0; i < threadCount; i++) {
+ PublisherThread pt = new PublisherThread(messagesPerThread);
+ threads.add(pt);
+ }
+ }
+
+ public List<String> getIDs() {
+ return ids;
+ }
+
+ // Kick off threads
+ public void start() throws Exception {
+
+ for (PublisherThread pt : threads) {
+ pt.start();
+ }
+ }
+
+ // Wait for threads to complete. They will complete once they've published all of their messages.
+ public void waitForCompletion() throws Exception {
+
+ for (PublisherThread pt : threads) {
+ pt.join();
+ pt.close();
+ }
+ }
+
+ private Session newSession(QueueConnection queueConnection) throws Exception {
+ return queueConnection.createSession(true, Session.SESSION_TRANSACTED);
+ }
+
+ private synchronized QueueConnection newQueueConnection() throws Exception {
+
+ if (connectionFactory == null) {
+ connectionFactory = new ActiveMQConnectionFactory(amqUser, amqPassword, activemqURL);
+ connectionFactory.setWatchTopicAdvisories(false);
+ }
+
+ // Set the redelivery count to -1 (infinite), or else messages will start dropping
+ // after the queue has had a certain number of failures (default is 6)
+ RedeliveryPolicy policy = connectionFactory.getRedeliveryPolicy();
+ policy.setMaximumRedeliveries(-1);
+
+ QueueConnection amqConnection = connectionFactory.createQueueConnection();
+ amqConnection.start();
+ return amqConnection;
+ }
+
+ private class PublisherThread extends Thread {
+
+ private int count;
+ private QueueConnection qc;
+ private Session session;
+ private MessageProducer mp;
+ private Queue q;
+
+ private PublisherThread(int count) throws Exception {
+
+ this.count = count;
+
+ // Each Thread has its own Connection and Session, so no sync worries
+ qc = newQueueConnection();
+ session = newSession(qc);
+
+ // In our code, when publishing to multiple queues,
+ // we're using composite destinations like below
+ q = new ActiveMQQueue(activemqQueues);
+ mp = session.createProducer(null);
+ }
+
+ public void run() {
+
+ try {
+
+ // Loop until we've published enough messages
+ while (count-- > 0) {
+
+ TextMessage tm = session.createTextMessage(getMessageText());
+ String id = UUID.randomUUID().toString();
+ tm.setStringProperty("KEY", id);
+ ids.add(id); // keep track of the key to compare against consumer
+
+ mp.send(q, tm);
+ session.commit();
+
+ if (didNotReceive.get()) {
+ globalProducerHalt.await();
+ }
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+ // Called by waitForCompletion
+ public void close() {
+
+ try {
+ mp.close();
+ } catch (Exception e) {
+ }
+
+ try {
+ session.close();
+ } catch (Exception e) {
+ }
+
+ try {
+ qc.close();
+ } catch (Exception e) {
+ }
+ }
+ }
+
+ }
+
+ String messageText;
+ private String getMessageText() {
+
+ if (messageText == null) {
+
+ synchronized (this) {
+
+ if (messageText == null) {
+
+ StringBuilder sb = new StringBuilder();
+ for (int i = 0; i < messageSize; i++) {
+ sb.append("X");
+ }
+ messageText = sb.toString();
+ }
+ }
+ }
+
+ return messageText;
+ }
+
+
+ public class ExportQueueConsumer {
+
+ private final String amqUser = ActiveMQConnection.DEFAULT_USER;
+ private final String amqPassword = ActiveMQConnection.DEFAULT_PASSWORD;
+ private final int totalToExpect;
+ private ActiveMQConnectionFactory connectionFactory = null;
+ private String activemqURL = null;
+ private String activemqQueues = null;
+ private String[] queues = null;
+ // Map of IDs that were consumed, keyed by queue name.
+ // We'll compare these against what was published to know if any got stuck or dropped.
+ private Map<String, List<String>> idsByQueue = new HashMap<String, List<String>>();
+ private Map<String, List<ConsumerThread>> threads;
+
+ public ExportQueueConsumer(String activemqURL, String activemqQueues, int threadsPerQueue, int batchSize, int totalToExpect) throws Exception {
+
+ this.activemqURL = activemqURL;
+ this.activemqQueues = activemqQueues;
+ this.totalToExpect = totalToExpect;
+
+ queues = this.activemqQueues.split(",");
+
+ for (int i = 0; i < queues.length; i++) {
+ queues[i] = queues[i].trim();
+ }
+
+ threads = new HashMap<String, List<ConsumerThread>>();
+
+ // For each queue, create a list of threads and set up the list of ids
+ for (String q : queues) {
+
+ List<ConsumerThread> list = new ArrayList<ConsumerThread>();
+
+ idsByQueue.put(q, Collections.synchronizedList(new ArrayList<String>()));
+
+ for (int i = 0; i < threadsPerQueue; i++) {
+ list.add(new ConsumerThread(q, batchSize));
+ }
+
+ threads.put(q, list);
+ }
+ }
+
+ public Map<String, List<String>> getIDs() {
+ return idsByQueue;
+ }
+
+ // Start the threads
+ public void start() throws Exception {
+
+ for (List<ConsumerThread> list : threads.values()) {
+
+ for (ConsumerThread ct : list) {
+
+ ct.start();
+ }
+ }
+ }
+
+ // Tell the threads to stop
+ // Then wait for them to stop
+ public void shutdown() throws Exception {
+
+ for (List<ConsumerThread> list : threads.values()) {
+
+ for (ConsumerThread ct : list) {
+
+ ct.shutdown();
+ }
+ }
+
+ for (List<ConsumerThread> list : threads.values()) {
+
+ for (ConsumerThread ct : list) {
+
+ ct.join();
+ }
+ }
+ }
+
+ private Session newSession(QueueConnection queueConnection) throws Exception {
+ return queueConnection.createSession(true, Session.SESSION_TRANSACTED);
+ }
+
+ private synchronized QueueConnection newQueueConnection() throws Exception {
+
+ if (connectionFactory == null) {
+ connectionFactory = new ActiveMQConnectionFactory(amqUser, amqPassword, activemqURL);
+ connectionFactory.setWatchTopicAdvisories(false);
+ }
+
+ // Set the redelivery count to -1 (infinite), or else messages will start dropping
+ // after the queue has had a certain number of failures (default is 6)
+ RedeliveryPolicy policy = connectionFactory.getRedeliveryPolicy();
+ policy.setMaximumRedeliveries(-1);
+
+ QueueConnection amqConnection = connectionFactory.createQueueConnection();
+ amqConnection.start();
+ return amqConnection;
+ }
+
+ public boolean completed() {
+ for (List<ConsumerThread> list : threads.values()) {
+
+ for (ConsumerThread ct : list) {
+
+ if (ct.isAlive()) {
+ LOG.info("thread for {} is still alive.", ct.qName);
+ return false;
+ }
+ }
+ }
+ return true;
+ }
+
+ private class ConsumerThread extends Thread {
+
+ private int batchSize;
+ private QueueConnection qc;
+ private Session session;
+ private MessageConsumer mc;
+ private List<String> idList;
+ private boolean shutdown = false;
+ private String qName;
+
+ private ConsumerThread(String queueName, int batchSize) throws Exception {
+
+ this.batchSize = batchSize;
+
+ // Each thread has its own connection and session
+ qName = queueName;
+ qc = newQueueConnection();
+ session = newSession(qc);
+ Queue q = session.createQueue(queueName + "?consumer.prefetchSize=" + batchSize);
+ mc = session.createConsumer(q);
+
+ idList = idsByQueue.get(queueName);
+ }
+
+ public void run() {
+
+ try {
+
+ int count = 0;
+
+ // Keep reading as long as it hasn't been told to shutdown
+ while (!shutdown) {
+
+ if (idList.size() >= totalToExpect) {
+ LOG.info("Got {} for q: {}", +idList.size(), qName);
+ session.commit();
+ break;
+ }
+ Message m = mc.receive(4000);
+
+ if (m != null) {
+
+ // We received a non-null message, add the ID to our list
+
+ idList.add(m.getStringProperty("KEY"));
+
+ count++;
+
+ // If we've reached our batch size, commit the batch and reset the count
+
+ if (count == batchSize) {
+ session.commit();
+ count = 0;
+ }
+ } else {
+
+ // We didn't receive anything this time, commit any current batch and reset the count
+
+ session.commit();
+ count = 0;
+
+ // Sleep a little before trying to read after not getting a message
+
+ try {
+ if (idList.size() < totalToExpect) {
+ LOG.info("did not receive on {}, current count: {}", qName, idList.size());
+ didNotReceive.set(true);
+ }
+ //sleep(3000);
+ } catch (Exception e) {
+ }
+ }
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ } finally {
+
+ // Once we exit, close everything
+ close();
+ }
+ }
+
+ public void shutdown() {
+ shutdown = true;
+ }
+
+ public void close() {
+
+ try {
+ mc.close();
+ } catch (Exception e) {
+ }
+
+ try {
+ session.close();
+ } catch (Exception e) {
+ }
+
+ try {
+ qc.close();
+ } catch (Exception e) {
+
+ }
+ }
+ }
+ }
+}