You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2011/07/02 18:37:15 UTC
svn commit: r1142245 -
/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
Author: tabish
Date: Sat Jul 2 16:37:14 2011
New Revision: 1142245
URL: http://svn.apache.org/viewvc?rev=1142245&view=rev
Log:
fix for: https://issues.apache.org/jira/browse/AMQ-3319
Concurrent read / write to the list is not protected, instead a read lock around both access points is used.
Switch to using a ConcurrentLinkQueue instead and remove the read lock around that code.
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?rev=1142245&r1=1142244&r2=1142245&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java Sat Jul 2 16:37:14 2011
@@ -25,11 +25,11 @@ import java.util.Comparator;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
-import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CancellationException;
+import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
@@ -58,11 +58,9 @@ import org.apache.activemq.command.*;
import org.apache.activemq.filter.BooleanExpression;
import org.apache.activemq.filter.MessageEvaluationContext;
import org.apache.activemq.filter.NonCachedMessageEvaluationContext;
-import org.apache.activemq.security.SecurityContext;
import org.apache.activemq.selector.SelectorParser;
import org.apache.activemq.store.MessageRecoveryListener;
import org.apache.activemq.store.MessageStore;
-import org.apache.activemq.thread.Scheduler;
import org.apache.activemq.thread.Task;
import org.apache.activemq.thread.TaskRunner;
import org.apache.activemq.thread.TaskRunnerFactory;
@@ -349,7 +347,7 @@ public class Queue extends BaseDestinati
}
}
- LinkedList<BrowserDispatch> browserDispatches = new LinkedList<BrowserDispatch>();
+ ConcurrentLinkedQueue<BrowserDispatch> browserDispatches = new ConcurrentLinkedQueue<BrowserDispatch>();
public void addSubscription(ConnectionContext context, Subscription sub) throws Exception {
if (LOG.isDebugEnabled()) {
@@ -403,13 +401,8 @@ public class Queue extends BaseDestinati
if (sub instanceof QueueBrowserSubscription) {
// tee up for dispatch in next iterate
QueueBrowserSubscription browserSubscription = (QueueBrowserSubscription) sub;
- pagedInMessagesLock.readLock().lock();
- try{
- BrowserDispatch browserDispatch = new BrowserDispatch(browserSubscription);
- browserDispatches.addLast(browserDispatch);
- }finally {
- pagedInMessagesLock.readLock().unlock();
- }
+ BrowserDispatch browserDispatch = new BrowserDispatch(browserSubscription);
+ browserDispatches.add(browserDispatch);
}
if (!(this.optimizedDispatch || isSlave())) {
@@ -1357,19 +1350,6 @@ public class Queue extends BaseDestinati
return movedCounter;
}
- BrowserDispatch getNextBrowserDispatch() {
- pagedInMessagesLock.readLock().lock();
- try{
- if (browserDispatches.isEmpty()) {
- return null;
- }
- return browserDispatches.removeFirst();
- }finally {
- pagedInMessagesLock.readLock().unlock();
- }
-
- }
-
/**
* @return true if we would like to iterate again
* @see org.apache.activemq.thread.Task#iterate()
@@ -1425,7 +1405,7 @@ public class Queue extends BaseDestinati
}
}
- BrowserDispatch pendingBrowserDispatch = getNextBrowserDispatch();
+ BrowserDispatch pendingBrowserDispatch = browserDispatches.poll();
messagesLock.readLock().lock();
try{
@@ -1486,7 +1466,7 @@ public class Queue extends BaseDestinati
LOG.warn("exception on dispatch to browser: " + pendingBrowserDispatch.getBrowser(), e);
}
- } while ((pendingBrowserDispatch = getNextBrowserDispatch()) != null);
+ } while ((pendingBrowserDispatch = browserDispatches.poll()) != null);
}
if (pendingWakeups.get() > 0) {
@@ -2074,7 +2054,7 @@ public class Queue extends BaseDestinati
return sub;
}
- public void onUsageChanged(Usage usage, int oldPercentUsage, int newPercentUsage) {
+ public void onUsageChanged(@SuppressWarnings("rawtypes") Usage usage, int oldPercentUsage, int newPercentUsage) {
if (oldPercentUsage > newPercentUsage) {
asyncWakeup();
}