You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2011/07/02 18:37:15 UTC

svn commit: r1142245 - /activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java

Author: tabish
Date: Sat Jul  2 16:37:14 2011
New Revision: 1142245

URL: http://svn.apache.org/viewvc?rev=1142245&view=rev
Log:
fix for: https://issues.apache.org/jira/browse/AMQ-3319

Concurrent read / write to the list is not protected, instead a read lock around both access points is used.

Switch to using a ConcurrentLinkQueue instead and remove the read lock around that code.

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?rev=1142245&r1=1142244&r2=1142245&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java Sat Jul  2 16:37:14 2011
@@ -25,11 +25,11 @@ import java.util.Comparator;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.LinkedHashMap;
-import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.CancellationException;
+import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.DelayQueue;
 import java.util.concurrent.Delayed;
@@ -58,11 +58,9 @@ import org.apache.activemq.command.*;
 import org.apache.activemq.filter.BooleanExpression;
 import org.apache.activemq.filter.MessageEvaluationContext;
 import org.apache.activemq.filter.NonCachedMessageEvaluationContext;
-import org.apache.activemq.security.SecurityContext;
 import org.apache.activemq.selector.SelectorParser;
 import org.apache.activemq.store.MessageRecoveryListener;
 import org.apache.activemq.store.MessageStore;
-import org.apache.activemq.thread.Scheduler;
 import org.apache.activemq.thread.Task;
 import org.apache.activemq.thread.TaskRunner;
 import org.apache.activemq.thread.TaskRunnerFactory;
@@ -349,7 +347,7 @@ public class Queue extends BaseDestinati
         }
     }
 
-    LinkedList<BrowserDispatch> browserDispatches = new LinkedList<BrowserDispatch>();
+    ConcurrentLinkedQueue<BrowserDispatch> browserDispatches = new ConcurrentLinkedQueue<BrowserDispatch>();
 
     public void addSubscription(ConnectionContext context, Subscription sub) throws Exception {
         if (LOG.isDebugEnabled()) {
@@ -403,13 +401,8 @@ public class Queue extends BaseDestinati
             if (sub instanceof QueueBrowserSubscription) {
                 // tee up for dispatch in next iterate
                 QueueBrowserSubscription browserSubscription = (QueueBrowserSubscription) sub;
-                pagedInMessagesLock.readLock().lock();
-                try{
-                    BrowserDispatch browserDispatch = new BrowserDispatch(browserSubscription);
-                    browserDispatches.addLast(browserDispatch);
-                }finally {
-                    pagedInMessagesLock.readLock().unlock();
-                }
+                BrowserDispatch browserDispatch = new BrowserDispatch(browserSubscription);
+                browserDispatches.add(browserDispatch);
             }
 
             if (!(this.optimizedDispatch || isSlave())) {
@@ -1357,19 +1350,6 @@ public class Queue extends BaseDestinati
         return movedCounter;
     }
 
-    BrowserDispatch getNextBrowserDispatch() {
-        pagedInMessagesLock.readLock().lock();
-        try{
-            if (browserDispatches.isEmpty()) {
-                return null;
-            }
-            return browserDispatches.removeFirst();
-        }finally {
-            pagedInMessagesLock.readLock().unlock();
-        }
-
-    }
-
     /**
      * @return true if we would like to iterate again
      * @see org.apache.activemq.thread.Task#iterate()
@@ -1425,7 +1405,7 @@ public class Queue extends BaseDestinati
                 }
             }
 
-            BrowserDispatch pendingBrowserDispatch = getNextBrowserDispatch();
+            BrowserDispatch pendingBrowserDispatch = browserDispatches.poll();
 
             messagesLock.readLock().lock();
             try{
@@ -1486,7 +1466,7 @@ public class Queue extends BaseDestinati
                         LOG.warn("exception on dispatch to browser: " + pendingBrowserDispatch.getBrowser(), e);
                     }
 
-                } while ((pendingBrowserDispatch = getNextBrowserDispatch()) != null);
+                } while ((pendingBrowserDispatch = browserDispatches.poll()) != null);
             }
 
             if (pendingWakeups.get() > 0) {
@@ -2074,7 +2054,7 @@ public class Queue extends BaseDestinati
         return sub;
     }
 
-    public void onUsageChanged(Usage usage, int oldPercentUsage, int newPercentUsage) {
+    public void onUsageChanged(@SuppressWarnings("rawtypes") Usage usage, int oldPercentUsage, int newPercentUsage) {
         if (oldPercentUsage > newPercentUsage) {
             asyncWakeup();
         }