You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2008/10/06 16:11:51 UTC

svn commit: r702152 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/broker/region/ test/java/org/apache/activemq/broker/region/

Author: chirino
Date: Mon Oct  6 07:11:51 2008
New Revision: 702152

URL: http://svn.apache.org/viewvc?rev=702152&view=rev
Log:
Applying AMQ-1957.. Thanks for the patch.

Removed:
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/SubscriptionAddRemoveQueueTest.java
Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/IndirectMessageReference.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/IndirectMessageReference.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/IndirectMessageReference.java?rev=702152&r1=702151&r2=702152&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/IndirectMessageReference.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/IndirectMessageReference.java Mon Oct  6 07:11:51 2008
@@ -88,7 +88,7 @@
 
     public boolean lock(LockOwner subscription) {
         synchronized (this) {
-            if (dropped || (lockOwner != null && lockOwner != subscription)) {
+            if (dropped || lockOwner != null) {
                 return false;
             }
             lockOwner = subscription;

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java?rev=702152&r1=702151&r2=702152&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java Mon Oct  6 07:11:51 2008
@@ -157,9 +157,11 @@
                 while (pending.hasNext()) {
                     MessageReference node = pending.next();
                     if (node.getMessageId().equals(mdn.getMessageId())) {
-                        pending.remove();
-                        createMessageDispatch(node, node.getMessage());
+                        // Synchronize between dispatched list and removal of messages from pending list
+                        // related to remove subscription action
                         synchronized(dispatchLock) {
+                            pending.remove();
+                            createMessageDispatch(node, node.getMessage());
                             dispatched.add(node);
                         }
                         return;
@@ -532,11 +534,18 @@
         List<MessageReference> rc = new ArrayList<MessageReference>();
         synchronized(pendingLock) {
             super.remove(context, destination);
-            for (MessageReference r : dispatched) {
-                if( r.getRegionDestination() == destination ) {
-                    rc.add((QueueMessageReference)r);
-                }
+            // Synchronized to DispatchLock
+            synchronized(dispatchLock) {
+	            for (MessageReference r : dispatched) {
+	                if( r.getRegionDestination() == destination) {
+	                	rc.add((QueueMessageReference)r);
+	                }
+	            }
             }
+            // TODO Dispatched messages should be decremented from Inflight stat 
+            // Here is a potential problem concerning Inflight stat:
+            // Messages not already committed or rolled back may not be removed from dispatched list at the moment
+            // Except if each commit or rollback callback action comes before remove of subscriber.
             rc.addAll(pending.remove(context, destination));
         }
         return rc;
@@ -559,19 +568,23 @@
                                 break;
                             }
                             
-                            pending.remove();
-                            if( !isDropped(node) && canDispatch(node)) {
-
-                                // Message may have been sitting in the pending
-                                // list a while waiting for the consumer to ak the message.
-                                if (node!=QueueMessageReference.NULL_MESSAGE && node.isExpired()) {
-                                    //increment number to dispatch
-                                    numberToDispatch++;
-                                    node.getRegionDestination().messageExpired(context, this, node);
-                                    continue;
+                            // Synchronize between dispatched list and remove of messageg from pending list
+                            // related to remove subscription action
+                            synchronized(dispatchLock) {
+                                pending.remove();
+                                if( !isDropped(node) && canDispatch(node)) {
+
+                                    // Message may have been sitting in the pending
+                                    // list a while waiting for the consumer to ak the message.
+                                    if (node!=QueueMessageReference.NULL_MESSAGE && node.isExpired()) {
+                                        //increment number to dispatch
+                                        numberToDispatch++;
+                                        node.getRegionDestination().messageExpired(context, this, node);
+                                        continue;
+                                    }
+                                    dispatch(node);
+                                    count++;
                                 }
-                                dispatch(node);
-                                count++;
                             }
                         }
                     }else {
@@ -596,10 +609,10 @@
         final Message message = node.getMessage();
         if (message == null) {
             return false;
-        }         
-        // Make sure we can dispatch a message.
-        if (canDispatch(node) && !isSlave()) {
-            
+        }
+        // No reentrant lock - Patch needed to IndirectMessageReference on method lock
+        if (!isSlave()) {
+
             MessageDispatch md = createMessageDispatch(node, message);
             // NULL messages don't count... they don't get Acked.
             if (node != QueueMessageReference.NULL_MESSAGE) {

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?rev=702152&r1=702151&r2=702152&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java Mon Oct  6 07:11:51 2008
@@ -959,6 +959,10 @@
 	                    if (!node.isDropped() && !node.isAcked() && (!node.isDropped() || rd.subscription.getConsumerInfo().isBrowser())) {
 	                        msgContext.setMessageReference(node);
 	                        if (rd.subscription.matches(node, msgContext)) {
+ 	                            // Log showing message dispatching
+ 	                            if (LOG.isDebugEnabled()) {
+ 	                                LOG.debug(destination.getQualifiedName() + " - Recovery - Message pushed '" + node.hashCode() + " - " + node + "' to subscription: '" + rd.subscription + "'");
+ 	                            }
 	                            rd.subscription.add(node);
 	                        } else {
 	                            // make sure it gets queued for dispatched again
@@ -1063,23 +1067,26 @@
     protected void removeMessage(ConnectionContext context,Subscription sub,final QueueMessageReference reference,MessageAck ack) throws IOException {
         reference.setAcked(true);
         // This sends the ack the the journal..
-        acknowledge(context, sub, ack, reference);
-
         if (!ack.isInTransaction()) {
+            acknowledge(context, sub, ack, reference);
             dropMessage(reference);
             wakeup();
         } else {
-            context.getTransaction().addSynchronization(new Synchronization() {
+            try {
+                acknowledge(context, sub, ack, reference);
+            } finally {
+                context.getTransaction().addSynchronization(new Synchronization() {
                 
-                public void afterCommit() throws Exception {
-                    dropMessage(reference);
-                    wakeup();
-                }
+                    public void afterCommit() throws Exception {
+                        dropMessage(reference);
+                        wakeup();
+                    }
                 
-                public void afterRollback() throws Exception {
-                    reference.setAcked(false);
-                }
-            });
+                    public void afterRollback() throws Exception {
+                        reference.setAcked(false);
+                    }
+                });
+            }
         }
 
     }
@@ -1153,18 +1160,11 @@
 
     private List<QueueMessageReference> doPageIn(boolean force) throws Exception {
         List<QueueMessageReference> result = null;
+        List<QueueMessageReference> resultList = null;
         dispatchLock.lock();
         try{
-           
-            int toPageIn = 0;
-            if (force) {
-                toPageIn = getMaxPageSize();
-            } else {
-                toPageIn = (getMaxPageSize() + (int) destinationStatistics
-                        .getInflight().getCount())
-                        - pagedInMessages.size();
-                toPageIn = Math.min(toPageIn, getMaxPageSize());
-            }
+            int toPageIn = getMaxPageSize() + Math.max(0, (int)destinationStatistics.getInflight().getCount()) - pagedInMessages.size();
+            toPageIn = Math.max(0, Math.min(toPageIn, getMaxPageSize()));
             if (isLazyDispatch()&& !force) {
                 // Only page in the minimum number of messages which can be dispatched immediately.
                 toPageIn = Math.min(getConsumerMessageCountBeforeFull(), toPageIn);
@@ -1193,16 +1193,24 @@
                         messages.release();
                     }
                 }
+                // Only add new messages, not already pagedIn to avoid multiple dispatch attempts
                 synchronized (pagedInMessages) {
-                    for(QueueMessageReference ref:result) {
-                        pagedInMessages.put(ref.getMessageId(), ref);
+                    resultList = new ArrayList<QueueMessageReference>(result.size());
+                    for(QueueMessageReference ref : result) {
+                        if (!pagedInMessages.containsKey(ref.getMessageId())) {
+                            pagedInMessages.put(ref.getMessageId(), ref);
+                            resultList.add(ref);
+                        }
                     }
                 }
+            } else {
+                // Avoid return null list, if condition is not validated
+                resultList = new ArrayList<QueueMessageReference>();
             }
         }finally {
             dispatchLock.unlock();
         }
-        return result;
+        return resultList;
     }
     
     private void doDispatch(List<QueueMessageReference> list) throws Exception {