You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ra...@apache.org on 2006/03/03 11:06:43 UTC
svn commit: r382746 -
/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
Author: rajdavies
Date: Fri Mar 3 02:06:41 2006
New Revision: 382746
URL: http://svn.apache.org/viewcvs?rev=382746&view=rev
Log:
tidied up some of the synchronization
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java?rev=382746&r1=382745&r2=382746&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java Fri Mar 3 02:06:41 2006
@@ -1,18 +1,15 @@
/**
- *
+ *
* Copyright 2005-2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
*/
package org.apache.activemq.broker.region;
@@ -34,165 +31,157 @@
import org.apache.activemq.transaction.Synchronization;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicInteger;
-public class TopicSubscription extends AbstractSubscription {
- private static final Log log = LogFactory.getLog(TopicSubscription.class);
- final protected LinkedList matched = new LinkedList();
- final protected ActiveMQDestination dlqDestination = new ActiveMQQueue("ActiveMQ.DLQ");
+
+public class TopicSubscription extends AbstractSubscription{
+ private static final Log log=LogFactory.getLog(TopicSubscription.class);
+ final protected LinkedList matched=new LinkedList();
+ final protected ActiveMQDestination dlqDestination=new ActiveMQQueue("ActiveMQ.DLQ");
final protected UsageManager usageManager;
- protected int dispatched=0;
- protected int delivered=0;
- private int maximumPendingMessages = -1;
-
- public TopicSubscription(Broker broker,ConnectionContext context, ConsumerInfo info, UsageManager usageManager) throws InvalidSelectorException {
- super(broker,context, info);
+ protected AtomicInteger dispatched=new AtomicInteger();
+ protected AtomicInteger delivered=new AtomicInteger();
+ private int maximumPendingMessages=-1;
+ private final Object matchedListMutex=new Object();
+
+ public TopicSubscription(Broker broker,ConnectionContext context,ConsumerInfo info,UsageManager usageManager)
+ throws InvalidSelectorException{
+ super(broker,context,info);
this.usageManager=usageManager;
}
- public void add(MessageReference node) throws InterruptedException, IOException {
+ public void add(MessageReference node) throws InterruptedException,IOException{
node.incrementReferenceCount();
- if( !isFull() && !isSlaveBroker()) {
+ if(!isFull()&&!isSlaveBroker()){
// if maximumPendingMessages is set we will only discard messages which
// have not been dispatched (i.e. we allow the prefetch buffer to be filled)
dispatch(node);
- } else {
- if (maximumPendingMessages != 0) {
- synchronized (matched) {
+ }else{
+ if(maximumPendingMessages!=0){
+ synchronized(matchedListMutex){
matched.addLast(node);
-
// NOTE - be careful about the slaveBroker!
- if (maximumPendingMessages > 0) {
- log.warn("discarding " + (matched.size() - maximumPendingMessages) + " messages for slow consumer");
+ if(maximumPendingMessages>0){
// lets discard old messages as we are a slow consumer
- while (!matched.isEmpty() && matched.size() > maximumPendingMessages) {
- MessageReference oldMessage = (MessageReference) matched.removeFirst();
+ while(!matched.isEmpty()&&matched.size()>maximumPendingMessages){
+ MessageReference oldMessage=(MessageReference) matched.removeFirst();
oldMessage.decrementReferenceCount();
+ if (log.isDebugEnabled()){
+ log.debug("Discarding message " + oldMessage);
+ }
}
}
}
}
- }
+ }
}
-
- public void processMessageDispatchNotification(MessageDispatchNotification mdn){
- synchronized(matched){
- for (Iterator i = matched.iterator(); i.hasNext();){
- MessageReference node = (MessageReference)i.next();
- if (node.getMessageId().equals(mdn.getMessageId())){
+
+ public void processMessageDispatchNotification(MessageDispatchNotification mdn){
+ synchronized(matchedListMutex){
+ for(Iterator i=matched.iterator();i.hasNext();){
+ MessageReference node=(MessageReference) i.next();
+ if(node.getMessageId().equals(mdn.getMessageId())){
i.remove();
- dispatched++;
+ dispatched.incrementAndGet();
node.decrementReferenceCount();
break;
}
}
}
}
-
- public void acknowledge(final ConnectionContext context, final MessageAck ack) throws Throwable {
-
+
+ public void acknowledge(final ConnectionContext context,final MessageAck ack) throws Throwable{
// Handle the standard acknowledgment case.
- boolean wasFull = isFull();
- if( ack.isStandardAck() || ack.isPoisonAck() ) {
- if ( context.isInTransaction() ) {
- delivered += ack.getMessageCount();
+ boolean wasFull=isFull();
+ if(ack.isStandardAck()||ack.isPoisonAck()){
+ if(context.isInTransaction()){
+ delivered.addAndGet(ack.getMessageCount());
context.getTransaction().addSynchronization(new Synchronization(){
- public void afterCommit() throws Throwable {
- synchronized(TopicSubscription.this) {
- dispatched -= ack.getMessageCount();
- delivered = Math.max(0, delivered - ack.getMessageCount());
- }
+ public void afterCommit() throws Throwable{
+ dispatched.addAndGet(-ack.getMessageCount());
+ delivered.set(Math.max(0,delivered.get()-ack.getMessageCount()));
}
});
- } else {
- dispatched -= ack.getMessageCount();
- delivered = Math.max(0, delivered - ack.getMessageCount());
+ }else{
+ dispatched.addAndGet(-ack.getMessageCount());
+ delivered.set(Math.max(0,delivered.get()-ack.getMessageCount()));
}
-
- if( wasFull && !isFull() ) {
+ if(wasFull&&!isFull()){
dispatchMatched();
}
return;
-
- } else if( ack.isDeliveredAck() ) {
+ }else if(ack.isDeliveredAck()){
// Message was delivered but not acknowledged: update pre-fetch counters.
- delivered += ack.getMessageCount();
- if( wasFull && !isFull() ) {
+ delivered.addAndGet(ack.getMessageCount());
+ if(wasFull&&!isFull()){
dispatchMatched();
}
return;
}
-
throw new JMSException("Invalid acknowledgment: "+ack);
}
-
+
public int pending(){
- return matched.size() - dispatched;
+ return matched.size()-dispatched.get();
}
-
+
public int dispatched(){
- return dispatched;
+ return dispatched.get();
}
-
+
public int delivered(){
- return delivered;
+ return delivered.get();
}
-
- public int getMaximumPendingMessages() {
+
+ public int getMaximumPendingMessages(){
return maximumPendingMessages;
}
/**
- * Sets the maximum number of pending messages that can be matched against this consumer
- * before old messages are discarded.
+ * Sets the maximum number of pending messages that can be matched against this consumer before old messages are
+ * discarded.
*/
- public void setMaximumPendingMessages(int maximumPendingMessages) {
- this.maximumPendingMessages = maximumPendingMessages;
+ public void setMaximumPendingMessages(int maximumPendingMessages){
+ this.maximumPendingMessages=maximumPendingMessages;
}
- private boolean isFull() {
- return dispatched-delivered >= info.getPrefetchSize();
+ private boolean isFull(){
+ return dispatched.get()-delivered.get()>=info.getPrefetchSize();
}
-
- private void dispatchMatched() throws IOException {
- for (Iterator iter = matched.iterator(); iter.hasNext() && !isFull();) {
- MessageReference message = (MessageReference) iter.next();
- iter.remove();
- dispatch(message);
+
+ private void dispatchMatched() throws IOException{
+ synchronized(matchedListMutex){
+ for(Iterator iter=matched.iterator();iter.hasNext()&&!isFull();){
+ MessageReference message=(MessageReference) iter.next();
+ iter.remove();
+ dispatch(message);
+ }
}
}
- private void dispatch(final MessageReference node) throws IOException {
-
- Message message = (Message) node;
-
- // Make sure we can dispatch a message.
- MessageDispatch md = new MessageDispatch();
+ private void dispatch(final MessageReference node) throws IOException{
+ Message message=(Message) node;
+ // Make sure we can dispatch a message.
+ MessageDispatch md=new MessageDispatch();
md.setMessage(message);
- md.setConsumerId( info.getConsumerId() );
- md.setDestination( node.getRegionDestination().getActiveMQDestination() );
-
- dispatched++;
- if( info.isDispatchAsync() ) {
+ md.setConsumerId(info.getConsumerId());
+ md.setDestination(node.getRegionDestination().getActiveMQDestination());
+ dispatched.incrementAndGet();
+ if(info.isDispatchAsync()){
md.setConsumer(new Runnable(){
- public void run() {
+ public void run(){
node.decrementReferenceCount();
}
});
context.getConnection().dispatchAsync(md);
- } else {
- context.getConnection().dispatchSync(md);
+ }else{
+ context.getConnection().dispatchSync(md);
node.decrementReferenceCount();
- }
- }
-
- public String toString() {
- return
- "TopicSubscription:" +
- " consumer="+info.getConsumerId()+
- ", destinations="+destinations.size()+
- ", dispatched="+dispatched+
- ", delivered="+this.delivered+
- ", matched="+this.matched.size();
+ }
}
+ public String toString(){
+ return "TopicSubscription:"+" consumer="+info.getConsumerId()+", destinations="+destinations.size()
+ +", dispatched="+dispatched+", delivered="+this.delivered+", matched="+this.matched.size();
+ }
}