You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ra...@apache.org on 2010/07/07 07:10:18 UTC
svn commit: r961245 -
/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
Author: rajdavies
Date: Wed Jul 7 05:10:17 2010
New Revision: 961245
URL: http://svn.apache.org/viewvc?rev=961245&view=rev
Log:
Improve concurrency by using read/write locks
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?rev=961245&r1=961244&r2=961245&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java Wed Jul 7 05:10:17 2010
@@ -40,6 +40,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
import javax.jms.InvalidSelectorException;
import javax.jms.JMSException;
import javax.jms.ResourceAllocationException;
@@ -92,11 +93,15 @@ public class Queue extends BaseDestinati
protected static final Log LOG = LogFactory.getLog(Queue.class);
protected final TaskRunnerFactory taskFactory;
protected TaskRunner taskRunner;
+ private final ReentrantReadWriteLock consumersLock = new ReentrantReadWriteLock();
protected final List<Subscription> consumers = new ArrayList<Subscription>(50);
+ private final ReentrantReadWriteLock messagesLock = new ReentrantReadWriteLock();
protected PendingMessageCursor messages;
+ private final ReentrantReadWriteLock pagedInMessagesLock = new ReentrantReadWriteLock();
private final LinkedHashMap<MessageId, QueueMessageReference> pagedInMessages = new LinkedHashMap<MessageId, QueueMessageReference>();
// Messages that are paged in but have not yet been targeted at a
// subscription
+ private final ReentrantReadWriteLock pagedInPendingDispatchLock = new ReentrantReadWriteLock();
private List<QueueMessageReference> pagedInPendingDispatch = new ArrayList<QueueMessageReference>(100);
private List<QueueMessageReference> redeliveredWaitingDispatch = new ArrayList<QueueMessageReference>();
private MessageGroupMap messageGroupOwners;
@@ -106,7 +111,6 @@ public class Queue extends BaseDestinati
private ExecutorService executor;
protected final Map<MessageId, Runnable> messagesWaitingForSpace = Collections
.synchronizedMap(new LinkedHashMap<MessageId, Runnable>());
- private final Object dispatchMutex = new Object();
private boolean useConsumerPriority = true;
private boolean strictOrderDispatch = false;
private final QueueDispatchSelector dispatchSelector;
@@ -219,8 +223,11 @@ public class Queue extends BaseDestinati
}
public List<Subscription> getConsumers() {
- synchronized (consumers) {
+ consumersLock.readLock().lock();
+ try {
return new ArrayList<Subscription>(consumers);
+ }finally {
+ consumersLock.readLock().unlock();
}
}
@@ -284,12 +291,15 @@ public class Queue extends BaseDestinati
}
if (hasSpace()) {
message.setRegionDestination(Queue.this);
- synchronized (messages) {
+ messagesLock.writeLock().lock();
+ try{
try {
messages.addMessageLast(message);
} catch (Exception e) {
LOG.fatal("Failed to add message to cursor", e);
}
+ }finally {
+ messagesLock.writeLock().unlock();
}
destinationStatistics.getMessages().increment();
return true;
@@ -348,13 +358,16 @@ public class Queue extends BaseDestinati
// synchronize with dispatch method so that no new messages are sent
// while setting up a subscription. avoid out of order messages,
// duplicates, etc.
- synchronized (dispatchMutex) {
+ pagedInPendingDispatchLock.writeLock().lock();
+ try {
sub.add(context, this);
destinationStatistics.getConsumers().increment();
// needs to be synchronized - so no contention with dispatching
- synchronized (consumers) {
+ // consumersLock.
+ consumersLock.writeLock().lock();
+ try {
// set a flag if this is a first consumer
if (consumers.size() == 0) {
@@ -378,20 +391,27 @@ public class Queue extends BaseDestinati
}
dispatchSelector.setExclusiveConsumer(exclusiveConsumer);
}
+ }finally {
+ consumersLock.writeLock().unlock();
}
if (sub instanceof QueueBrowserSubscription) {
// tee up for dispatch in next iterate
QueueBrowserSubscription browserSubscription = (QueueBrowserSubscription) sub;
- synchronized (pagedInMessages) {
+ pagedInMessagesLock.readLock().lock();
+ try{
BrowserDispatch browserDispatch = new BrowserDispatch(browserSubscription);
browserDispatches.addLast(browserDispatch);
+ }finally {
+ pagedInMessagesLock.readLock().unlock();
}
}
if (!(this.optimizedDispatch || isSlave())) {
wakeup();
}
+ }finally {
+ pagedInPendingDispatchLock.writeLock().unlock();
}
if (this.optimizedDispatch || isSlave()) {
// Outside of dispatchLock() to maintain the lock hierarchy of
@@ -406,14 +426,16 @@ public class Queue extends BaseDestinati
destinationStatistics.getConsumers().decrement();
// synchronize with dispatch method so that no new messages are sent
// while removing up a subscription.
- synchronized (dispatchMutex) {
+ pagedInPendingDispatchLock.writeLock().lock();
+ try {
if (LOG.isDebugEnabled()) {
LOG.debug("remove sub: " + sub + ", lastDeliveredSeqId: " + lastDeiveredSequenceId + ", dequeues: "
+ getDestinationStatistics().getDequeues().getCount() + ", dispatched: "
+ getDestinationStatistics().getDispatched().getCount() + ", inflight: "
+ getDestinationStatistics().getInflight().getCount());
}
- synchronized (consumers) {
+ consumersLock.writeLock().lock();
+ try {
removeFromConsumerList(sub);
if (sub.getConsumerInfo().isExclusive()) {
Subscription exclusiveConsumer = dispatchSelector.getExclusiveConsumer();
@@ -461,10 +483,14 @@ public class Queue extends BaseDestinati
if (!redeliveredWaitingDispatch.isEmpty()) {
doDispatch(new ArrayList<QueueMessageReference>());
}
+ }finally {
+ consumersLock.writeLock().unlock();
}
if (!(this.optimizedDispatch || isSlave())) {
wakeup();
}
+ }finally {
+ pagedInPendingDispatchLock.writeLock().unlock();
}
if (this.optimizedDispatch || isSlave()) {
// Outside of dispatchLock() to maintain the lock hierarchy of
@@ -758,8 +784,11 @@ public class Queue extends BaseDestinati
@Override
public String toString() {
int size = 0;
- synchronized (messages) {
+ messagesLock.readLock().lock();
+ try{
size = messages.size();
+ }finally {
+ messagesLock.readLock().unlock();
}
return "Queue: destination=" + destination.getPhysicalName() + ", subscriptions=" + consumers.size()
+ ", memory=" + memoryUsage.getPercentUsage() + "%, size=" + size + ", in flight groups="
@@ -919,58 +948,70 @@ public class Queue extends BaseDestinati
try {
pageInMessages(false);
List<MessageReference> toExpire = new ArrayList<MessageReference>();
- synchronized (dispatchMutex) {
- synchronized (pagedInPendingDispatch) {
- addAll(pagedInPendingDispatch, browseList, max, toExpire);
- for (MessageReference ref : toExpire) {
- pagedInPendingDispatch.remove(ref);
- if (broker.isExpired(ref)) {
- LOG.debug("expiring from pagedInPending: " + ref);
- messageExpired(connectionContext, ref);
- }
- }
- }
- toExpire.clear();
- synchronized (pagedInMessages) {
- addAll(pagedInMessages.values(), browseList, max, toExpire);
- }
+
+ pagedInPendingDispatchLock.writeLock().lock();
+ try {
+ addAll(pagedInPendingDispatch, browseList, max, toExpire);
for (MessageReference ref : toExpire) {
+ pagedInPendingDispatch.remove(ref);
if (broker.isExpired(ref)) {
- LOG.debug("expiring from pagedInMessages: " + ref);
+ LOG.debug("expiring from pagedInPending: " + ref);
messageExpired(connectionContext, ref);
- } else {
- synchronized (pagedInMessages) {
- pagedInMessages.remove(ref.getMessageId());
- }
}
}
+ } finally {
+ pagedInPendingDispatchLock.writeLock().unlock();
+ }
+ toExpire.clear();
+ pagedInMessagesLock.readLock().lock();
+ try {
+ addAll(pagedInMessages.values(), browseList, max, toExpire);
+ } finally {
+ pagedInMessagesLock.readLock().unlock();
+ }
+ for (MessageReference ref : toExpire) {
+ if (broker.isExpired(ref)) {
+ LOG.debug("expiring from pagedInMessages: " + ref);
+ messageExpired(connectionContext, ref);
+ } else {
+ pagedInMessagesLock.writeLock().lock();
+ try {
+ pagedInMessages.remove(ref.getMessageId());
+ } finally {
+ pagedInMessagesLock.writeLock().unlock();
+ }
+ }
+ }
- if (browseList.size() < getMaxBrowsePageSize()) {
- synchronized (messages) {
- try {
- messages.reset();
- while (messages.hasNext() && browseList.size() < max) {
- MessageReference node = messages.next();
- if (node.isExpired()) {
- if (broker.isExpired(node)) {
- LOG.debug("expiring from messages: " + node);
- messageExpired(connectionContext, createMessageReference(node.getMessage()));
- }
- messages.remove();
- } else {
- messages.rollback(node.getMessageId());
- if (browseList.contains(node.getMessage()) == false) {
- browseList.add(node.getMessage());
- }
+ if (browseList.size() < getMaxBrowsePageSize()) {
+ messagesLock.writeLock().lock();
+ try {
+ try {
+ messages.reset();
+ while (messages.hasNext() && browseList.size() < max) {
+ MessageReference node = messages.next();
+ if (node.isExpired()) {
+ if (broker.isExpired(node)) {
+ LOG.debug("expiring from messages: " + node);
+ messageExpired(connectionContext, createMessageReference(node.getMessage()));
+ }
+ messages.remove();
+ } else {
+ messages.rollback(node.getMessageId());
+ if (browseList.contains(node.getMessage()) == false) {
+ browseList.add(node.getMessage());
}
- node.decrementReferenceCount();
}
- } finally {
- messages.release();
+ node.decrementReferenceCount();
}
+ } finally {
+ messages.release();
}
+ } finally {
+ messagesLock.writeLock().unlock();
}
}
+
} catch (Exception e) {
LOG.error("Problem retrieving message for browse", e);
}
@@ -990,13 +1031,17 @@ public class Queue extends BaseDestinati
public Message getMessage(String id) {
MessageId msgId = new MessageId(id);
- synchronized (pagedInMessages) {
+ pagedInMessagesLock.readLock().lock();
+ try{
QueueMessageReference r = this.pagedInMessages.get(msgId);
if (r != null) {
return r.getMessage();
}
+ }finally {
+ pagedInMessagesLock.readLock().unlock();
}
- synchronized (messages) {
+ messagesLock.readLock().lock();
+ try{
try {
messages.reset();
while (messages.hasNext()) {
@@ -1014,6 +1059,8 @@ public class Queue extends BaseDestinati
} finally {
messages.release();
}
+ }finally {
+ messagesLock.readLock().unlock();
}
return null;
}
@@ -1023,8 +1070,11 @@ public class Queue extends BaseDestinati
List<MessageReference> list = null;
do {
doPageIn(true);
- synchronized (pagedInMessages) {
+ pagedInMessagesLock.readLock().lock();
+ try {
list = new ArrayList<MessageReference>(pagedInMessages.values());
+ }finally {
+ pagedInMessagesLock.readLock().unlock();
}
for (MessageReference ref : list) {
@@ -1085,8 +1135,11 @@ public class Queue extends BaseDestinati
ConnectionContext context = createConnectionContext();
do {
doPageIn(true);
- synchronized (pagedInMessages) {
+ pagedInMessagesLock.readLock().lock();
+ try{
set.addAll(pagedInMessages.values());
+ }finally {
+ pagedInMessagesLock.readLock().unlock();
}
List<MessageReference> list = new ArrayList<MessageReference>(set);
for (MessageReference ref : list) {
@@ -1149,8 +1202,11 @@ public class Queue extends BaseDestinati
setMaxPageSize((int) this.destinationStatistics.getMessages().getCount());
doPageIn(true);
setMaxPageSize(oldMaxSize);
- synchronized (pagedInMessages) {
+ pagedInMessagesLock.readLock().lock();
+ try {
set.addAll(pagedInMessages.values());
+ }finally {
+ pagedInMessagesLock.readLock().unlock();
}
List<MessageReference> list = new ArrayList<MessageReference>(set);
for (MessageReference ref : list) {
@@ -1189,8 +1245,11 @@ public class Queue extends BaseDestinati
QueueMessageReference r = createMessageReference(m);
BrokerSupport.resend(context, m, dest);
removeMessage(context, r);
- synchronized (messages) {
+ messagesLock.writeLock().lock();
+ try{
messages.rollback(r.getMessageId());
+ }finally {
+ messagesLock.writeLock().unlock();
}
return true;
}
@@ -1232,8 +1291,11 @@ public class Queue extends BaseDestinati
Set<MessageReference> set = new CopyOnWriteArraySet<MessageReference>();
do {
doPageIn(true);
- synchronized (pagedInMessages) {
+ pagedInMessagesLock.readLock().lock();
+ try{
set.addAll(pagedInMessages.values());
+ }finally {
+ pagedInMessagesLock.readLock().unlock();
}
List<MessageReference> list = new ArrayList<MessageReference>(set);
for (MessageReference ref : list) {
@@ -1252,11 +1314,14 @@ public class Queue extends BaseDestinati
}
BrowserDispatch getNextBrowserDispatch() {
- synchronized (pagedInMessages) {
+ pagedInMessagesLock.readLock().lock();
+ try{
if (browserDispatches.isEmpty()) {
return null;
}
return browserDispatches.removeFirst();
+ }finally {
+ pagedInMessagesLock.readLock().unlock();
}
}
@@ -1317,15 +1382,18 @@ public class Queue extends BaseDestinati
BrowserDispatch pendingBrowserDispatch = getNextBrowserDispatch();
- synchronized (messages) {
+ messagesLock.readLock().lock();
+ try{
pageInMoreMessages |= !messages.isEmpty();
+ }finally {
+ messagesLock.readLock().unlock();
}
- // Kinda ugly.. but I think dispatchLock is the only mutex
- // protecting the
- // pagedInPendingDispatch variable.
- synchronized (dispatchMutex) {
+ pagedInPendingDispatchLock.readLock().lock();
+ try {
pageInMoreMessages |= !pagedInPendingDispatch.isEmpty();
+ }finally {
+ pagedInPendingDispatchLock.readLock().unlock();
}
// Perhaps we should page always into the pagedInPendingDispatch
@@ -1344,8 +1412,11 @@ public class Queue extends BaseDestinati
if (pendingBrowserDispatch != null) {
ArrayList<QueueMessageReference> alreadyDispatchedMessages = null;
- synchronized (pagedInMessages) {
+ pagedInMessagesLock.readLock().lock();
+ try{
alreadyDispatchedMessages = new ArrayList<QueueMessageReference>(pagedInMessages.values());
+ }finally {
+ pagedInMessagesLock.readLock().unlock();
}
if (LOG.isDebugEnabled()) {
LOG.debug("dispatch to browser: " + pendingBrowserDispatch.getBrowser()
@@ -1412,11 +1483,13 @@ public class Queue extends BaseDestinati
protected void removeMessage(ConnectionContext c, QueueMessageReference r) throws IOException {
removeMessage(c, null, r);
- synchronized (dispatchMutex) {
- synchronized (pagedInPendingDispatch) {
- pagedInPendingDispatch.remove(r);
- }
+ pagedInPendingDispatchLock.writeLock().lock();
+ try {
+ pagedInPendingDispatch.remove(r);
+ } finally {
+ pagedInPendingDispatchLock.writeLock().unlock();
}
+
}
protected void removeMessage(ConnectionContext c, Subscription subs, QueueMessageReference r) throws IOException {
@@ -1457,8 +1530,11 @@ public class Queue extends BaseDestinati
}
if (ack.isPoisonAck()) {
// message gone to DLQ, is ok to allow redelivery
- synchronized (messages) {
+ messagesLock.writeLock().lock();
+ try{
messages.rollback(reference.getMessageId());
+ }finally {
+ messagesLock.writeLock().unlock();
}
}
@@ -1467,8 +1543,11 @@ public class Queue extends BaseDestinati
private void dropMessage(QueueMessageReference reference) {
reference.drop();
destinationStatistics.getMessages().decrement();
- synchronized (pagedInMessages) {
+ pagedInMessagesLock.writeLock().lock();
+ try{
pagedInMessages.remove(reference.getMessageId());
+ }finally {
+ pagedInMessagesLock.writeLock().unlock();
}
}
@@ -1498,8 +1577,11 @@ public class Queue extends BaseDestinati
}
final void sendMessage(final Message msg) throws Exception {
- synchronized (messages) {
+ messagesLock.writeLock().lock();
+ try{
messages.addMessageLast(msg);
+ }finally {
+ messagesLock.writeLock().unlock();
}
}
@@ -1507,10 +1589,13 @@ public class Queue extends BaseDestinati
destinationStatistics.getEnqueues().increment();
destinationStatistics.getMessages().increment();
messageDelivered(context, msg);
- synchronized (consumers) {
+ consumersLock.readLock().lock();
+ try {
if (consumers.isEmpty()) {
onMessageWithNoConsumers(context, msg);
}
+ }finally {
+ consumersLock.readLock().unlock();
}
wakeup();
}
@@ -1540,99 +1625,118 @@ public class Queue extends BaseDestinati
private List<QueueMessageReference> doPageIn(boolean force) throws Exception {
List<QueueMessageReference> result = null;
List<QueueMessageReference> resultList = null;
- synchronized (dispatchMutex) {
- int toPageIn = Math.min(getMaxPageSize(), messages.size());
- if (LOG.isDebugEnabled()) {
- LOG.debug(destination.getPhysicalName() + " toPageIn: " + toPageIn + ", Inflight: "
- + destinationStatistics.getInflight().getCount() + ", pagedInMessages.size "
- + pagedInMessages.size() + ", enqueueSize: " + destinationStatistics.getEnqueues().getCount());
- }
- if (isLazyDispatch() && !force) {
- // Only page in the minimum number of messages which can be
- // dispatched immediately.
- toPageIn = Math.min(getConsumerMessageCountBeforeFull(), toPageIn);
- }
- if (toPageIn > 0 && (force || (!consumers.isEmpty() && pagedInPendingDispatch.size() < getMaxPageSize()))) {
- int count = 0;
- result = new ArrayList<QueueMessageReference>(toPageIn);
- synchronized (messages) {
- try {
- messages.setMaxBatchSize(toPageIn);
- messages.reset();
- while (messages.hasNext() && count < toPageIn) {
- MessageReference node = messages.next();
- messages.remove();
-
- QueueMessageReference ref = createMessageReference(node.getMessage());
- if (ref.isExpired()) {
- if (broker.isExpired(ref)) {
- messageExpired(createConnectionContext(), ref);
- } else {
- ref.decrementReferenceCount();
- }
+ int toPageIn = Math.min(getMaxPageSize(), messages.size());
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(destination.getPhysicalName() + " toPageIn: " + toPageIn + ", Inflight: "
+ + destinationStatistics.getInflight().getCount() + ", pagedInMessages.size "
+ + pagedInMessages.size() + ", enqueueSize: " + destinationStatistics.getEnqueues().getCount());
+ }
+
+ if (isLazyDispatch() && !force) {
+ // Only page in the minimum number of messages which can be
+ // dispatched immediately.
+ toPageIn = Math.min(getConsumerMessageCountBeforeFull(), toPageIn);
+ }
+ int pagedInPendingSize = 0;
+ pagedInPendingDispatchLock.readLock().lock();
+ try {
+ pagedInPendingSize = pagedInPendingDispatch.size();
+ } finally {
+ pagedInPendingDispatchLock.readLock().unlock();
+ }
+ if (toPageIn > 0 && (force || (!consumers.isEmpty() && pagedInPendingSize < getMaxPageSize()))) {
+ int count = 0;
+ result = new ArrayList<QueueMessageReference>(toPageIn);
+ messagesLock.writeLock().lock();
+ try {
+ try {
+ messages.setMaxBatchSize(toPageIn);
+ messages.reset();
+ while (messages.hasNext() && count < toPageIn) {
+ MessageReference node = messages.next();
+ messages.remove();
+
+ QueueMessageReference ref = createMessageReference(node.getMessage());
+ if (ref.isExpired()) {
+ if (broker.isExpired(ref)) {
+ messageExpired(createConnectionContext(), ref);
} else {
- result.add(ref);
- count++;
+ ref.decrementReferenceCount();
}
+ } else {
+ result.add(ref);
+ count++;
}
- } finally {
- messages.release();
}
+ } finally {
+ messages.release();
}
- // Only add new messages, not already pagedIn to avoid multiple
- // dispatch attempts
- synchronized (pagedInMessages) {
- resultList = new ArrayList<QueueMessageReference>(result.size());
- for (QueueMessageReference ref : result) {
- if (!pagedInMessages.containsKey(ref.getMessageId())) {
- pagedInMessages.put(ref.getMessageId(), ref);
- resultList.add(ref);
- } else {
- ref.decrementReferenceCount();
- }
+ } finally {
+ messagesLock.writeLock().unlock();
+ }
+ // Only add new messages, not already pagedIn to avoid multiple
+ // dispatch attempts
+ pagedInMessagesLock.readLock().lock();
+ try {
+ resultList = new ArrayList<QueueMessageReference>(result.size());
+ for (QueueMessageReference ref : result) {
+ if (!pagedInMessages.containsKey(ref.getMessageId())) {
+ pagedInMessagesLock.readLock().unlock();
+ pagedInMessagesLock.writeLock().lock();
+ pagedInMessages.put(ref.getMessageId(), ref);
+ pagedInMessagesLock.readLock().lock();
+ pagedInMessagesLock.writeLock().unlock();
+ resultList.add(ref);
+ } else {
+ ref.decrementReferenceCount();
}
}
- } else {
- // Avoid return null list, if condition is not validated
- resultList = new ArrayList<QueueMessageReference>();
+ } finally {
+ pagedInMessagesLock.readLock().unlock();
}
+ } else {
+ // Avoid return null list, if condition is not validated
+ resultList = new ArrayList<QueueMessageReference>();
}
+
return resultList;
}
private void doDispatch(List<QueueMessageReference> list) throws Exception {
boolean doWakeUp = false;
- synchronized (dispatchMutex) {
- synchronized (pagedInPendingDispatch) {
- if (!redeliveredWaitingDispatch.isEmpty()) {
- // Try first to dispatch redelivered messages to keep an
- // proper order
- redeliveredWaitingDispatch = doActualDispatch(redeliveredWaitingDispatch);
- }
- if (!pagedInPendingDispatch.isEmpty()) {
- // Next dispatch anything that had not been
- // dispatched before.
- pagedInPendingDispatch = doActualDispatch(pagedInPendingDispatch);
- }
- // and now see if we can dispatch the new stuff.. and append to
- // the pending
- // list anything that does not actually get dispatched.
- if (list != null && !list.isEmpty()) {
- if (pagedInPendingDispatch.isEmpty()) {
- pagedInPendingDispatch.addAll(doActualDispatch(list));
- } else {
- for (QueueMessageReference qmr : list) {
- if (!pagedInPendingDispatch.contains(qmr)) {
- pagedInPendingDispatch.add(qmr);
- }
+ pagedInPendingDispatchLock.writeLock().lock();
+ try {
+ if (!redeliveredWaitingDispatch.isEmpty()) {
+ // Try first to dispatch redelivered messages to keep an
+ // proper order
+ redeliveredWaitingDispatch = doActualDispatch(redeliveredWaitingDispatch);
+ }
+ if (!pagedInPendingDispatch.isEmpty()) {
+ // Next dispatch anything that had not been
+ // dispatched before.
+ pagedInPendingDispatch = doActualDispatch(pagedInPendingDispatch);
+ }
+ // and now see if we can dispatch the new stuff.. and append to
+ // the pending
+ // list anything that does not actually get dispatched.
+ if (list != null && !list.isEmpty()) {
+ if (pagedInPendingDispatch.isEmpty()) {
+ pagedInPendingDispatch.addAll(doActualDispatch(list));
+ } else {
+ for (QueueMessageReference qmr : list) {
+ if (!pagedInPendingDispatch.contains(qmr)) {
+ pagedInPendingDispatch.add(qmr);
}
- doWakeUp = true;
}
+ doWakeUp = true;
}
}
+ } finally {
+ pagedInPendingDispatchLock.writeLock().unlock();
}
+
if (doWakeUp) {
// avoid lock order contention
asyncWakeup();
@@ -1645,13 +1749,15 @@ public class Queue extends BaseDestinati
*/
private List<QueueMessageReference> doActualDispatch(List<QueueMessageReference> list) throws Exception {
List<Subscription> consumers;
-
- synchronized (this.consumers) {
+ consumersLock.writeLock().lock();
+ try {
if (this.consumers.isEmpty() || isSlave()) {
// slave dispatch happens in processDispatchNotification
return list;
}
consumers = new ArrayList<Subscription>(this.consumers);
+ }finally {
+ consumersLock.writeLock().unlock();
}
List<QueueMessageReference> rc = new ArrayList<QueueMessageReference>(list.size());
@@ -1698,11 +1804,14 @@ public class Queue extends BaseDestinati
// distribution.
if (target != null && !strictOrderDispatch && consumers.size() > 1
&& !dispatchSelector.isExclusiveConsumer(target)) {
- synchronized (this.consumers) {
+ consumersLock.writeLock().lock();
+ try {
if (removeFromConsumerList(target)) {
addToConsumerList(target);
consumers = new ArrayList<Subscription>(this.consumers);
}
+ }finally {
+ consumersLock.writeLock().unlock();
}
}
}
@@ -1730,12 +1839,15 @@ public class Queue extends BaseDestinati
private int getConsumerMessageCountBeforeFull() throws Exception {
int total = 0;
boolean zeroPrefetch = false;
- synchronized (consumers) {
+ consumersLock.readLock().lock();
+ try{
for (Subscription s : consumers) {
zeroPrefetch |= s.getPrefetchSize() == 0;
int countBeforeFull = s.countBeforeFull();
total += countBeforeFull;
}
+ }finally {
+ consumersLock.readLock().unlock();
}
if (total == 0 && zeroPrefetch) {
total = 1;
@@ -1768,50 +1880,57 @@ public class Queue extends BaseDestinati
QueueMessageReference message = null;
MessageId messageId = messageDispatchNotification.getMessageId();
- synchronized (dispatchMutex) {
- synchronized (pagedInPendingDispatch) {
- for (QueueMessageReference ref : pagedInPendingDispatch) {
- if (messageId.equals(ref.getMessageId())) {
- message = ref;
- pagedInPendingDispatch.remove(ref);
- break;
- }
+ pagedInPendingDispatchLock.writeLock().lock();
+ try {
+ for (QueueMessageReference ref : pagedInPendingDispatch) {
+ if (messageId.equals(ref.getMessageId())) {
+ message = ref;
+ pagedInPendingDispatch.remove(ref);
+ break;
}
}
+ } finally {
+ pagedInPendingDispatchLock.writeLock().unlock();
+ }
- if (message == null) {
- synchronized (pagedInMessages) {
- message = pagedInMessages.get(messageId);
- }
+ if (message == null) {
+ pagedInMessagesLock.readLock().lock();
+ try {
+ message = pagedInMessages.get(messageId);
+ } finally {
+ pagedInMessagesLock.readLock().unlock();
}
+ }
- if (message == null) {
- synchronized (messages) {
- try {
- messages.setMaxBatchSize(getMaxPageSize());
- messages.reset();
- while (messages.hasNext()) {
- MessageReference node = messages.next();
- messages.remove();
- if (messageId.equals(node.getMessageId())) {
- message = this.createMessageReference(node.getMessage());
- break;
- }
+ if (message == null) {
+ messagesLock.writeLock().lock();
+ try {
+ try {
+ messages.setMaxBatchSize(getMaxPageSize());
+ messages.reset();
+ while (messages.hasNext()) {
+ MessageReference node = messages.next();
+ messages.remove();
+ if (messageId.equals(node.getMessageId())) {
+ message = this.createMessageReference(node.getMessage());
+ break;
}
- } finally {
- messages.release();
}
+ } finally {
+ messages.release();
}
+ } finally {
+ messagesLock.writeLock().unlock();
}
+ }
- if (message == null) {
- Message msg = loadMessage(messageId);
- if (msg != null) {
- message = this.createMessageReference(msg);
- }
+ if (message == null) {
+ Message msg = loadMessage(messageId);
+ if (msg != null) {
+ message = this.createMessageReference(msg);
}
-
}
+
if (message == null) {
throw new JMSException("Slave broker out of sync with master - Message: "
+ messageDispatchNotification.getMessageId() + " on "
@@ -1832,13 +1951,16 @@ public class Queue extends BaseDestinati
private Subscription getMatchingSubscription(MessageDispatchNotification messageDispatchNotification)
throws JMSException {
Subscription sub = null;
- synchronized (consumers) {
+ consumersLock.readLock().lock();
+ try {
for (Subscription s : consumers) {
if (messageDispatchNotification.getConsumerId().equals(s.getConsumerInfo().getConsumerId())) {
sub = s;
break;
}
}
+ }finally {
+ consumersLock.readLock().unlock();
}
return sub;
}