You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ra...@apache.org on 2008/02/05 17:20:14 UTC
svn commit: r618689 - in
/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region:
PrefetchSubscription.java Queue.java
Author: rajdavies
Date: Tue Feb 5 08:20:11 2008
New Revision: 618689
URL: http://svn.apache.org/viewvc?rev=618689&view=rev
Log:
lock dispatching (again) whilst adding a consumer
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java?rev=618689&r1=618688&r2=618689&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java Tue Feb 5 08:20:11 2008
@@ -128,10 +128,9 @@
public void add(MessageReference node) throws Exception {
synchronized (pendingLock) {
enqueueCounter++;
- pending.addMessageLast(node);
- dispatchPending();
+ pending.addMessageLast(node);
}
-
+ dispatchPending();
}
public void processMessageDispatchNotification(MessageDispatchNotification mdn) throws Exception {
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?rev=618689&r1=618688&r2=618689&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java Tue Feb 5 08:20:11 2008
@@ -22,6 +22,7 @@
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
+import java.util.concurrent.locks.ReentrantLock;
import javax.jms.InvalidSelectorException;
import javax.jms.JMSException;
@@ -83,6 +84,7 @@
private final Object sendLock = new Object();
private final TaskRunner taskRunner;
private final LinkedList<Runnable> messagesWaitingForSpace = new LinkedList<Runnable>();
+ private final ReentrantLock dispatchLock = new ReentrantLock();
private final Runnable sendMessagesWaitingForSpaceTask = new Runnable() {
public void run() {
wakeup();
@@ -98,7 +100,6 @@
} else {
this.messages = new StoreQueueCursor(broker,this);
}
-
this.taskRunner = taskFactory.createTaskRunner(this, "Queue " + destination.getPhysicalName());
this.log = LogFactory.getLog(getClass().getName() + "." + destination.getPhysicalName());
}
@@ -172,61 +173,67 @@
return true;
}
- public void addSubscription(ConnectionContext context,Subscription sub) throws Exception {
- sub.add(context, this);
- destinationStatistics.getConsumers().increment();
- MessageEvaluationContext msgContext = new MessageEvaluationContext();
+ public void addSubscription(ConnectionContext context, Subscription sub) throws Exception {
+ dispatchLock.lock();
+ try {
+ sub.add(context, this);
+ destinationStatistics.getConsumers().increment();
+ MessageEvaluationContext msgContext = new MessageEvaluationContext();
- // needs to be synchronized - so no contention with dispatching
- synchronized (consumers) {
- consumers.add(sub);
- if (sub.getConsumerInfo().isExclusive()) {
- LockOwner owner = (LockOwner) sub;
- if (exclusiveOwner == null) {
- exclusiveOwner = owner;
- } else {
- // switch the owner if the priority is higher.
- if (owner.getLockPriority() > exclusiveOwner
- .getLockPriority()) {
+ // needs to be synchronized - so no contention with dispatching
+ synchronized (consumers) {
+ consumers.add(sub);
+ if (sub.getConsumerInfo().isExclusive()) {
+ LockOwner owner = (LockOwner) sub;
+ if (exclusiveOwner == null) {
exclusiveOwner = owner;
+ } else {
+ // switch the owner if the priority is higher.
+ if (owner.getLockPriority() > exclusiveOwner
+ .getLockPriority()) {
+ exclusiveOwner = owner;
+ }
}
}
}
- }
- // we hold the lock on the dispatchValue - so lets build the paged in
- // list directly;
- buildList(false);
+ // we hold the lock on the dispatchValue - so lets build the paged
+ // in
+ // list directly;
+ doPageIn(false);
+
+ // synchronize with dispatch method so that no new messages are sent
+ // while
+ // setting up a subscription. avoid out of order messages,
+ // duplicates
+ // etc.
- // synchronize with dispatch method so that no new messages are sent
- // while
- // setting up a subscription. avoid out of order messages,
- // duplicates
- // etc.
-
- msgContext.setDestination(destination);
- synchronized (pagedInMessages) {
- // Add all the matching messages in the queue to the
- // subscription.
- for (Iterator<MessageReference> i = pagedInMessages.values().iterator(); i
- .hasNext();) {
- QueueMessageReference node = (QueueMessageReference) i.next();
- if (node.isDropped()
- || (!sub.getConsumerInfo().isBrowser() && node
- .getLockOwner() != null)) {
- continue;
- }
- try {
- msgContext.setMessageReference(node);
- if (sub.matches(node, msgContext)) {
- sub.add(node);
+ msgContext.setDestination(destination);
+ synchronized (pagedInMessages) {
+ // Add all the matching messages in the queue to the
+ // subscription.
+ for (Iterator<MessageReference> i = pagedInMessages.values()
+ .iterator(); i.hasNext();) {
+ QueueMessageReference node = (QueueMessageReference) i
+ .next();
+ if (node.isDropped()
+ || (!sub.getConsumerInfo().isBrowser() && node
+ .getLockOwner() != null)) {
+ continue;
+ }
+ try {
+ msgContext.setMessageReference(node);
+ if (sub.matches(node, msgContext)) {
+ sub.add(node);
+ }
+ } catch (IOException e) {
+ log.warn("Could not load message: " + e, e);
}
- } catch (IOException e) {
- log.warn("Could not load message: " + e, e);
}
}
+ } finally {
+ dispatchLock.unlock();
}
-
}
public void removeSubscription(ConnectionContext context, Subscription sub)
@@ -956,54 +963,51 @@
wakeup();
}
- final synchronized void wakeup() {
+ final void wakeup() {
try {
taskRunner.wakeup();
-
} catch (InterruptedException e) {
log.warn("Task Runner failed to wakeup ", e);
}
}
-
- private List<MessageReference> doPageIn(boolean force) throws Exception {
- List<MessageReference> result = null;
- result = buildList(force);
- return result;
- }
-
- private List<MessageReference> buildList(boolean force) throws Exception {
- final int toPageIn = getMaxPageSize() - pagedInMessages.size();
+ private List<MessageReference> doPageIn(boolean force) throws Exception {
List<MessageReference> result = null;
- if ((force || !consumers.isEmpty()) && toPageIn > 0) {
- messages.setMaxBatchSize(toPageIn);
- int count = 0;
- result = new ArrayList<MessageReference>(toPageIn);
- synchronized (messages) {
- try {
- messages.reset();
- while (messages.hasNext() && count < toPageIn) {
- MessageReference node = messages.next();
- messages.remove();
- if (!broker.isExpired(node)) {
- node = createMessageReference(node.getMessage());
- result.add(node);
- count++;
- } else {
- broker.messageExpired(createConnectionContext(),
- node);
- destinationStatistics.getMessages().decrement();
+ dispatchLock.lock();
+ try {
+ final int toPageIn = getMaxPageSize() - pagedInMessages.size();
+ if ((force || !consumers.isEmpty()) && toPageIn > 0) {
+ messages.setMaxBatchSize(toPageIn);
+ int count = 0;
+ result = new ArrayList<MessageReference>(toPageIn);
+ synchronized (messages) {
+ try {
+ messages.reset();
+ while (messages.hasNext() && count < toPageIn) {
+ MessageReference node = messages.next();
+ messages.remove();
+ if (!broker.isExpired(node)) {
+ node = createMessageReference(node.getMessage());
+ result.add(node);
+ count++;
+ } else {
+ broker.messageExpired(createConnectionContext(),
+ node);
+ destinationStatistics.getMessages().decrement();
+ }
}
+ } finally {
+ messages.release();
}
- } finally {
- messages.release();
}
- }
- synchronized (pagedInMessages) {
- for(MessageReference ref:result) {
- pagedInMessages.put(ref.getMessageId(), ref);
+ synchronized (pagedInMessages) {
+ for(MessageReference ref:result) {
+ pagedInMessages.put(ref.getMessageId(), ref);
+ }
}
}
+ }finally {
+ dispatchLock.unlock();
}
return result;
}