You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ra...@apache.org on 2008/02/05 17:20:14 UTC

svn commit: r618689 - in /activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region: PrefetchSubscription.java Queue.java

Author: rajdavies
Date: Tue Feb  5 08:20:11 2008
New Revision: 618689

URL: http://svn.apache.org/viewvc?rev=618689&view=rev
Log:
lock dispatching (again) whilst adding a consumer

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java?rev=618689&r1=618688&r2=618689&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java Tue Feb  5 08:20:11 2008
@@ -128,10 +128,9 @@
     public void add(MessageReference node) throws Exception {
         synchronized (pendingLock) {
             enqueueCounter++;
-            pending.addMessageLast(node);
-            dispatchPending();
+            pending.addMessageLast(node);    
         }
-       
+        dispatchPending();
     }
 
     public void processMessageDispatchNotification(MessageDispatchNotification mdn) throws Exception {

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?rev=618689&r1=618688&r2=618689&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java Tue Feb  5 08:20:11 2008
@@ -22,6 +22,7 @@
 import java.util.LinkedHashMap;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.concurrent.locks.ReentrantLock;
 
 import javax.jms.InvalidSelectorException;
 import javax.jms.JMSException;
@@ -83,6 +84,7 @@
     private final Object sendLock = new Object();
     private final TaskRunner taskRunner;    
     private final LinkedList<Runnable> messagesWaitingForSpace = new LinkedList<Runnable>();
+    private final ReentrantLock dispatchLock = new ReentrantLock();
     private final Runnable sendMessagesWaitingForSpaceTask = new Runnable() {
         public void run() {
             wakeup();
@@ -98,7 +100,6 @@
         } else {
             this.messages = new StoreQueueCursor(broker,this);
         }
-
         this.taskRunner = taskFactory.createTaskRunner(this, "Queue  " + destination.getPhysicalName());
         this.log = LogFactory.getLog(getClass().getName() + "." + destination.getPhysicalName());
     }
@@ -172,61 +173,67 @@
         return true;
     }
 
-    public void addSubscription(ConnectionContext context,Subscription sub) throws Exception {
-        sub.add(context, this);
-        destinationStatistics.getConsumers().increment();
-        MessageEvaluationContext msgContext = new MessageEvaluationContext();
+    public void addSubscription(ConnectionContext context, Subscription sub) throws Exception {
+        dispatchLock.lock();
+        try {
+            sub.add(context, this);
+            destinationStatistics.getConsumers().increment();
+            MessageEvaluationContext msgContext = new MessageEvaluationContext();
 
-        // needs to be synchronized - so no contention with dispatching
-        synchronized (consumers) {
-            consumers.add(sub);
-            if (sub.getConsumerInfo().isExclusive()) {
-                LockOwner owner = (LockOwner) sub;
-                if (exclusiveOwner == null) {
-                    exclusiveOwner = owner;
-                } else {
-                    // switch the owner if the priority is higher.
-                    if (owner.getLockPriority() > exclusiveOwner
-                            .getLockPriority()) {
+            // needs to be synchronized - so no contention with dispatching
+            synchronized (consumers) {
+                consumers.add(sub);
+                if (sub.getConsumerInfo().isExclusive()) {
+                    LockOwner owner = (LockOwner) sub;
+                    if (exclusiveOwner == null) {
                         exclusiveOwner = owner;
+                    } else {
+                        // switch the owner if the priority is higher.
+                        if (owner.getLockPriority() > exclusiveOwner
+                                .getLockPriority()) {
+                            exclusiveOwner = owner;
+                        }
                     }
                 }
             }
-        }
 
-        // we hold the lock on the dispatchValue - so lets build the paged in
-        // list directly;
-        buildList(false);
+            // we hold the lock on the dispatchValue - so lets build the paged
+            // in
+            // list directly;
+            doPageIn(false);
+
+            // synchronize with dispatch method so that no new messages are sent
+            // while
+            // setting up a subscription. avoid out of order messages,
+            // duplicates
+            // etc.
 
-        // synchronize with dispatch method so that no new messages are sent
-        // while
-        // setting up a subscription. avoid out of order messages,
-        // duplicates
-        // etc.
-
-        msgContext.setDestination(destination);
-        synchronized (pagedInMessages) {
-            // Add all the matching messages in the queue to the
-            // subscription.
-            for (Iterator<MessageReference> i = pagedInMessages.values().iterator(); i
-                    .hasNext();) {
-                QueueMessageReference node = (QueueMessageReference) i.next();
-                if (node.isDropped()
-                        || (!sub.getConsumerInfo().isBrowser() && node
-                                .getLockOwner() != null)) {
-                    continue;
-                }
-                try {
-                    msgContext.setMessageReference(node);
-                    if (sub.matches(node, msgContext)) {
-                        sub.add(node);
+            msgContext.setDestination(destination);
+            synchronized (pagedInMessages) {
+                // Add all the matching messages in the queue to the
+                // subscription.
+                for (Iterator<MessageReference> i = pagedInMessages.values()
+                        .iterator(); i.hasNext();) {
+                    QueueMessageReference node = (QueueMessageReference) i
+                            .next();
+                    if (node.isDropped()
+                            || (!sub.getConsumerInfo().isBrowser() && node
+                                    .getLockOwner() != null)) {
+                        continue;
+                    }
+                    try {
+                        msgContext.setMessageReference(node);
+                        if (sub.matches(node, msgContext)) {
+                            sub.add(node);
+                        }
+                    } catch (IOException e) {
+                        log.warn("Could not load message: " + e, e);
                     }
-                } catch (IOException e) {
-                    log.warn("Could not load message: " + e, e);
                 }
             }
+        } finally {
+            dispatchLock.unlock();
         }
-
     }
 
     public void removeSubscription(ConnectionContext context, Subscription sub)
@@ -956,54 +963,51 @@
         wakeup();
     }
     
-    final synchronized void wakeup() {
+    final void wakeup() {
         try {
             taskRunner.wakeup();
-            
         } catch (InterruptedException e) {
             log.warn("Task Runner failed to wakeup ", e);
         }
     }
 
-    
-    private   List<MessageReference> doPageIn(boolean force) throws Exception {
-    	 List<MessageReference> result  = null;
-    		result  = buildList(force);
-    	return  result;
-    }
-
-    private List<MessageReference> buildList(boolean force) throws Exception {
-        final int toPageIn = getMaxPageSize() - pagedInMessages.size();
+    private List<MessageReference> doPageIn(boolean force) throws Exception {
         List<MessageReference> result = null;
-        if ((force || !consumers.isEmpty()) && toPageIn > 0) {
-            messages.setMaxBatchSize(toPageIn);
-            int count = 0;
-            result = new ArrayList<MessageReference>(toPageIn);
-            synchronized (messages) {
-                try {
-                    messages.reset();
-                    while (messages.hasNext() && count < toPageIn) {
-                        MessageReference node = messages.next();
-                        messages.remove();
-                        if (!broker.isExpired(node)) {
-                            node = createMessageReference(node.getMessage());
-                            result.add(node);
-                            count++;
-                        } else {
-                            broker.messageExpired(createConnectionContext(),
-                                    node);
-                            destinationStatistics.getMessages().decrement();
+        dispatchLock.lock();
+        try {
+            final int toPageIn = getMaxPageSize() - pagedInMessages.size();
+            if ((force || !consumers.isEmpty()) && toPageIn > 0) {
+                messages.setMaxBatchSize(toPageIn);
+                int count = 0;
+                result = new ArrayList<MessageReference>(toPageIn);
+                synchronized (messages) {
+                    try {
+                        messages.reset();
+                        while (messages.hasNext() && count < toPageIn) {
+                            MessageReference node = messages.next();
+                            messages.remove();
+                            if (!broker.isExpired(node)) {
+                                node = createMessageReference(node.getMessage());
+                                result.add(node);
+                                count++;
+                            } else {
+                                broker.messageExpired(createConnectionContext(),
+                                        node);
+                                destinationStatistics.getMessages().decrement();
+                            }
                         }
+                    } finally {
+                        messages.release();
                     }
-                } finally {
-                    messages.release();
                 }
-            }
-            synchronized (pagedInMessages) {
-                for(MessageReference ref:result) {
-                    pagedInMessages.put(ref.getMessageId(), ref);
+                synchronized (pagedInMessages) {
+                    for(MessageReference ref:result) {
+                        pagedInMessages.put(ref.getMessageId(), ref);
+                    }
                 }
             }
+        }finally {
+            dispatchLock.unlock();
         }
         return result;
     }