You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2006/04/28 21:13:57 UTC
svn commit: r397985 - in
/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region:
DurableTopicSubscription.java PrefetchSubscription.java
QueueBrowserSubscription.java QueueSubscription.java
Author: chirino
Date: Fri Apr 28 12:13:54 2006
New Revision: 397985
URL: http://svn.apache.org/viewcvs?rev=397985&view=rev
Log:
Gaurd access to the pending list better.
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueBrowserSubscription.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java?rev=397985&r1=397984&r2=397985&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java Fri Apr 28 12:13:54 2006
@@ -63,9 +63,7 @@
Topic topic = (Topic) destination;
topic.activate(context, this);
}
- if( !isFull() ) {
- dispatchMatched();
- }
+ dispatchMatched();
}
synchronized public void activate(ConnectionContext context, ConsumerInfo info) throws Exception {
@@ -79,9 +77,7 @@
topic.activate(context, this);
}
}
- if( !isFull() ) {
- dispatchMatched();
- }
+ dispatchMatched();
}
}
@@ -104,7 +100,9 @@
redeliveredMessages.put(node.getMessageId(), new Integer(1));
}
if( keepDurableSubsActive ) {
- pending.addFirst(node);
+ synchronized(pending) {
+ pending.addFirst(node);
+ }
} else {
node.decrementReferenceCount();
}
@@ -112,11 +110,13 @@
}
if( !keepDurableSubsActive ) {
- for (Iterator iter = pending.iterator(); iter.hasNext();) {
- MessageReference node = (MessageReference) iter.next();
- node.decrementReferenceCount();
- iter.remove();
- }
+ synchronized(pending) {
+ for (Iterator iter = pending.iterator(); iter.hasNext();) {
+ MessageReference node = (MessageReference) iter.next();
+ node.decrementReferenceCount();
+ iter.remove();
+ }
+ }
}
prefetchExtension=0;
}
@@ -171,7 +171,7 @@
", destinations="+destinations.size()+
", dispatched="+dispatched.size()+
", delivered="+this.prefetchExtension+
- ", pending="+this.pending.size();
+ ", pending="+getPendingQueueSize();
}
public String getClientId() {
@@ -186,13 +186,15 @@
* Release any references that we are holding.
*/
synchronized public void destroy() {
-
- for (Iterator iter = pending.iterator(); iter.hasNext();) {
- MessageReference node = (MessageReference) iter.next();
- node.decrementReferenceCount();
- }
- pending.clear();
-
+
+ synchronized(pending) {
+ for (Iterator iter = pending.iterator(); iter.hasNext();) {
+ MessageReference node = (MessageReference) iter.next();
+ node.decrementReferenceCount();
+ }
+ pending.clear();
+ }
+
for (Iterator iter = dispatched.iterator(); iter.hasNext();) {
MessageReference node = (MessageReference) iter.next();
node.decrementReferenceCount();
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java?rev=397985&r1=397984&r2=397985&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java Fri Apr 28 12:13:54 2006
@@ -94,7 +94,6 @@
synchronized public void acknowledge(final ConnectionContext context,final MessageAck ack) throws Exception{
// Handle the standard acknowledgment case.
- boolean wasFull=isFull();
if(ack.isStandardAck()){
// Acknowledge all dispatched messages up till the message id of the acknowledgment.
int index=0;
@@ -129,9 +128,7 @@
prefetchExtension=Math.max(prefetchExtension,index+1);
else
prefetchExtension=Math.max(0,prefetchExtension-(index+1));
- if(wasFull&&!isFull()){
- dispatchMatched();
- }
+ dispatchMatched();
return;
}else{
// System.out.println("no match: "+ack.getLastMessageId()+","+messageId);
@@ -147,9 +144,7 @@
final MessageReference node=(MessageReference) iter.next();
if(ack.getLastMessageId().equals(node.getMessageId())){
prefetchExtension=Math.max(prefetchExtension,index+1);
- if(wasFull&&!isFull()){
- dispatchMatched();
- }
+ dispatchMatched();
return;
}
}
@@ -176,9 +171,7 @@
acknowledge(context,ack,node);
if(ack.getLastMessageId().equals(messageId)){
prefetchExtension=Math.max(0,prefetchExtension-(index+1));
- if(wasFull&&!isFull()){
- dispatchMatched();
- }
+ dispatchMatched();
return;
}
}
@@ -226,8 +219,10 @@
return (dispatched.size()-prefetchExtension) >= (info.getPrefetchSize() *.9);
}
- synchronized public int getPendingQueueSize(){
- return pending.size();
+ public int getPendingQueueSize(){
+ synchronized(pending) {
+ return pending.size();
+ }
}
synchronized public int getDispatchedQueueSize(){
@@ -312,16 +307,13 @@
}
synchronized protected void onDispatch(final MessageReference node,final Message message){
- boolean wasFull=isFull();
if(node.getRegionDestination()!=null){
node.getRegionDestination().getDestinationStatistics().onMessageDequeue(message);
context.getConnection().getStatistics().onMessageDequeue(message);
- if(wasFull&&!isFull()){
- try{
- dispatchMatched();
- }catch(IOException e){
- context.getConnection().serviceException(e);
- }
+ try{
+ dispatchMatched();
+ }catch(IOException e){
+ context.getConnection().serviceException(e);
}
}
}
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueBrowserSubscription.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueBrowserSubscription.java?rev=397985&r1=397984&r2=397985&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueBrowserSubscription.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueBrowserSubscription.java Fri Apr 28 12:13:54 2006
@@ -47,7 +47,7 @@
", destinations="+destinations.size()+
", dispatched="+dispatched.size()+
", delivered="+this.prefetchExtension+
- ", pending="+this.pending.size();
+ ", pending="+getPendingQueueSize();
}
public void browseDone() throws Exception {
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java?rev=397985&r1=397984&r2=397985&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java Fri Apr 28 12:13:54 2006
@@ -125,7 +125,7 @@
", destinations="+destinations.size()+
", dispatched="+dispatched.size()+
", delivered="+this.prefetchExtension+
- ", pending="+this.pending.size();
+ ", pending="+getPendingQueueSize();
}
public int getLockPriority() {