You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2008/10/06 16:11:51 UTC
svn commit: r702152 - in /activemq/trunk/activemq-core/src:
main/java/org/apache/activemq/broker/region/
test/java/org/apache/activemq/broker/region/
Author: chirino
Date: Mon Oct 6 07:11:51 2008
New Revision: 702152
URL: http://svn.apache.org/viewvc?rev=702152&view=rev
Log:
Applying AMQ-1957.. Thanks for the patch.
Removed:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/SubscriptionAddRemoveQueueTest.java
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/IndirectMessageReference.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/IndirectMessageReference.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/IndirectMessageReference.java?rev=702152&r1=702151&r2=702152&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/IndirectMessageReference.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/IndirectMessageReference.java Mon Oct 6 07:11:51 2008
@@ -88,7 +88,7 @@
public boolean lock(LockOwner subscription) {
synchronized (this) {
- if (dropped || (lockOwner != null && lockOwner != subscription)) {
+ if (dropped || lockOwner != null) {
return false;
}
lockOwner = subscription;
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java?rev=702152&r1=702151&r2=702152&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java Mon Oct 6 07:11:51 2008
@@ -157,9 +157,11 @@
while (pending.hasNext()) {
MessageReference node = pending.next();
if (node.getMessageId().equals(mdn.getMessageId())) {
- pending.remove();
- createMessageDispatch(node, node.getMessage());
+ // Synchronize between dispatched list and removal of messages from pending list
+ // related to remove subscription action
synchronized(dispatchLock) {
+ pending.remove();
+ createMessageDispatch(node, node.getMessage());
dispatched.add(node);
}
return;
@@ -532,11 +534,18 @@
List<MessageReference> rc = new ArrayList<MessageReference>();
synchronized(pendingLock) {
super.remove(context, destination);
- for (MessageReference r : dispatched) {
- if( r.getRegionDestination() == destination ) {
- rc.add((QueueMessageReference)r);
- }
+ // Synchronized to DispatchLock
+ synchronized(dispatchLock) {
+ for (MessageReference r : dispatched) {
+ if( r.getRegionDestination() == destination) {
+ rc.add((QueueMessageReference)r);
+ }
+ }
}
+ // TODO Dispatched messages should be decremented from Inflight stat
+ // Here is a potential problem concerning Inflight stat:
+ // Messages not already committed or rolled back may not be removed from dispatched list at the moment
+ // Except if each commit or rollback callback action comes before remove of subscriber.
rc.addAll(pending.remove(context, destination));
}
return rc;
@@ -559,19 +568,23 @@
break;
}
- pending.remove();
- if( !isDropped(node) && canDispatch(node)) {
-
- // Message may have been sitting in the pending
- // list a while waiting for the consumer to ak the message.
- if (node!=QueueMessageReference.NULL_MESSAGE && node.isExpired()) {
- //increment number to dispatch
- numberToDispatch++;
- node.getRegionDestination().messageExpired(context, this, node);
- continue;
+ // Synchronize between dispatched list and remove of messageg from pending list
+ // related to remove subscription action
+ synchronized(dispatchLock) {
+ pending.remove();
+ if( !isDropped(node) && canDispatch(node)) {
+
+ // Message may have been sitting in the pending
+ // list a while waiting for the consumer to ak the message.
+ if (node!=QueueMessageReference.NULL_MESSAGE && node.isExpired()) {
+ //increment number to dispatch
+ numberToDispatch++;
+ node.getRegionDestination().messageExpired(context, this, node);
+ continue;
+ }
+ dispatch(node);
+ count++;
}
- dispatch(node);
- count++;
}
}
}else {
@@ -596,10 +609,10 @@
final Message message = node.getMessage();
if (message == null) {
return false;
- }
- // Make sure we can dispatch a message.
- if (canDispatch(node) && !isSlave()) {
-
+ }
+ // No reentrant lock - Patch needed to IndirectMessageReference on method lock
+ if (!isSlave()) {
+
MessageDispatch md = createMessageDispatch(node, message);
// NULL messages don't count... they don't get Acked.
if (node != QueueMessageReference.NULL_MESSAGE) {
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?rev=702152&r1=702151&r2=702152&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java Mon Oct 6 07:11:51 2008
@@ -959,6 +959,10 @@
if (!node.isDropped() && !node.isAcked() && (!node.isDropped() || rd.subscription.getConsumerInfo().isBrowser())) {
msgContext.setMessageReference(node);
if (rd.subscription.matches(node, msgContext)) {
+ // Log showing message dispatching
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(destination.getQualifiedName() + " - Recovery - Message pushed '" + node.hashCode() + " - " + node + "' to subscription: '" + rd.subscription + "'");
+ }
rd.subscription.add(node);
} else {
// make sure it gets queued for dispatched again
@@ -1063,23 +1067,26 @@
protected void removeMessage(ConnectionContext context,Subscription sub,final QueueMessageReference reference,MessageAck ack) throws IOException {
reference.setAcked(true);
// This sends the ack the the journal..
- acknowledge(context, sub, ack, reference);
-
if (!ack.isInTransaction()) {
+ acknowledge(context, sub, ack, reference);
dropMessage(reference);
wakeup();
} else {
- context.getTransaction().addSynchronization(new Synchronization() {
+ try {
+ acknowledge(context, sub, ack, reference);
+ } finally {
+ context.getTransaction().addSynchronization(new Synchronization() {
- public void afterCommit() throws Exception {
- dropMessage(reference);
- wakeup();
- }
+ public void afterCommit() throws Exception {
+ dropMessage(reference);
+ wakeup();
+ }
- public void afterRollback() throws Exception {
- reference.setAcked(false);
- }
- });
+ public void afterRollback() throws Exception {
+ reference.setAcked(false);
+ }
+ });
+ }
}
}
@@ -1153,18 +1160,11 @@
private List<QueueMessageReference> doPageIn(boolean force) throws Exception {
List<QueueMessageReference> result = null;
+ List<QueueMessageReference> resultList = null;
dispatchLock.lock();
try{
-
- int toPageIn = 0;
- if (force) {
- toPageIn = getMaxPageSize();
- } else {
- toPageIn = (getMaxPageSize() + (int) destinationStatistics
- .getInflight().getCount())
- - pagedInMessages.size();
- toPageIn = Math.min(toPageIn, getMaxPageSize());
- }
+ int toPageIn = getMaxPageSize() + Math.max(0, (int)destinationStatistics.getInflight().getCount()) - pagedInMessages.size();
+ toPageIn = Math.max(0, Math.min(toPageIn, getMaxPageSize()));
if (isLazyDispatch()&& !force) {
// Only page in the minimum number of messages which can be dispatched immediately.
toPageIn = Math.min(getConsumerMessageCountBeforeFull(), toPageIn);
@@ -1193,16 +1193,24 @@
messages.release();
}
}
+ // Only add new messages, not already pagedIn to avoid multiple dispatch attempts
synchronized (pagedInMessages) {
- for(QueueMessageReference ref:result) {
- pagedInMessages.put(ref.getMessageId(), ref);
+ resultList = new ArrayList<QueueMessageReference>(result.size());
+ for(QueueMessageReference ref : result) {
+ if (!pagedInMessages.containsKey(ref.getMessageId())) {
+ pagedInMessages.put(ref.getMessageId(), ref);
+ resultList.add(ref);
+ }
}
}
+ } else {
+ // Avoid return null list, if condition is not validated
+ resultList = new ArrayList<QueueMessageReference>();
}
}finally {
dispatchLock.unlock();
}
- return result;
+ return resultList;
}
private void doDispatch(List<QueueMessageReference> list) throws Exception {