You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ra...@apache.org on 2006/04/14 10:38:52 UTC

svn commit: r394050 - in /incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq: ./ broker/region/ command/

Author: rajdavies
Date: Fri Apr 14 01:38:50 2006
New Revision: 394050

URL: http://svn.apache.org/viewcvs?rev=394050&view=rev
Log:
For optimized acknowledge, eagerly get acknowledgements from consumers
when the dispatched list gets too big.

Modified:
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Subscription.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ConsumerInfo.java

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java?rev=394050&r1=394049&r2=394050&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java Fri Apr 14 01:38:50 2006
@@ -46,6 +46,7 @@
 import org.apache.commons.logging.LogFactory;
 
 import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean;
+import edu.emory.mathcs.backport.java.util.concurrent.*;
 
 /**
  * A client uses a <CODE>MessageConsumer</CODE> object to receive messages
@@ -111,7 +112,9 @@
     private MessageAvailableListener availableListener;
 
     private RedeliveryPolicy redeliveryPolicy;
-    private AtomicBoolean optimizeAcknowledge = new AtomicBoolean();
+    private boolean optimizeAcknowledge;
+    private AtomicBoolean deliveryingAcknowledgements = new AtomicBoolean();
+    private ExecutorService executorService = null;
    
     /**
      * Create a MessageConsumer
@@ -160,6 +163,7 @@
         this.info = new ConsumerInfo(consumerId);
         this.info.setSubcriptionName(name);
         this.info.setPrefetchSize(prefetch);
+        this.info.setCurrentPrefetchSize(prefetch);
         this.info.setMaximumPendingMessageLimit(maximumPendingMessageCount);
         this.info.setNoLocal(noLocal);
         this.info.setDispatchAsync(dispatchAsync);
@@ -182,9 +186,9 @@
         }
 
         this.stats = new JMSConsumerStatsImpl(session.getSessionStats(), dest);
-        this.optimizeAcknowledge.set(session.connection.isOptimizeAcknowledge()&&session.isAutoAcknowledge()
-                        &&!info.isBrowser());
-        this.info.setOptimizedAcknowledge(this.optimizeAcknowledge.get());
+        this.optimizeAcknowledge=session.connection.isOptimizeAcknowledge()&&session.isAutoAcknowledge()
+                        &&!info.isBrowser();
+        this.info.setOptimizedAcknowledge(this.optimizeAcknowledge);
         try {
             this.session.addConsumer(this);
             this.session.syncSendPacket(info);
@@ -516,20 +520,35 @@
     }
     
     void deliverAcks(){
-        synchronized(optimizeAcknowledge){
-            if(this.optimizeAcknowledge.get()){
+        MessageAck ack=null;
+        if(deliveryingAcknowledgements.compareAndSet(false,true)){
+            if(this.optimizeAcknowledge){
                 if(!deliveredMessages.isEmpty()){
                     MessageDispatch md=(MessageDispatch) deliveredMessages.getFirst();
-                    MessageAck ack=new MessageAck(md,MessageAck.STANDARD_ACK_TYPE,deliveredMessages.size());
-                    try{
-                        session.asyncSendPacket(ack);
-                    }catch(JMSException e){
-                        log.error("Failed to delivered acknowledgements",e);
-                    }
+                    ack=new MessageAck(md,MessageAck.STANDARD_ACK_TYPE,deliveredMessages.size());
                     deliveredMessages.clear();
                     ackCounter=0;
                 }
             }
+            if(ack!=null){
+                final MessageAck ackToSend=ack;
+                if(executorService==null){
+                    executorService=Executors.newSingleThreadExecutor();
+                }
+                executorService.submit(new Runnable(){
+                    public void run(){
+                        try{
+                            session.asyncSendPacket(ackToSend);
+                        }catch(JMSException e){
+                            log.error("Failed to delivered acknowledgements",e);
+                        }finally{
+                            deliveryingAcknowledgements.set(false);
+                        }
+                    }
+                });
+            }else{
+                deliveryingAcknowledgements.set(false);
+            }
         }
     }
 
@@ -539,6 +558,9 @@
             // Ack any delivered messages now. (session may still
             // commit/rollback the acks).
             deliverAcks();//only processes optimized acknowledgements
+            if (executorService!=null){
+                executorService.shutdown();
+            }
             if ((session.isTransacted() || session.isDupsOkAcknowledge())) {
                 acknowledge();
             }
@@ -562,15 +584,15 @@
     }
     
     protected void setOptimizeAcknowledge(boolean value){
-        synchronized(optimizeAcknowledge){
+        if (optimizeAcknowledge && !value){
             deliverAcks();
-            optimizeAcknowledge.set(value);
         }
+        optimizeAcknowledge=value;
     }
     
     protected void setPrefetchSize(int prefetch){
         deliverAcks();
-        this.info.setPrefetchSize(prefetch);
+        this.info.setCurrentPrefetchSize(prefetch);
     }
 
     private void beforeMessageIsConsumed(MessageDispatch md) {
@@ -590,20 +612,21 @@
                 ackLater(md,MessageAck.DELIVERED_ACK_TYPE);
             }else if(session.isAutoAcknowledge()){
                 if(!deliveredMessages.isEmpty()){
-                    synchronized(optimizeAcknowledge){
-                        if(this.optimizeAcknowledge.get()){
+                    if(optimizeAcknowledge){
+                        if(deliveryingAcknowledgements.compareAndSet(false,true)){
                             ackCounter++;
-                            if(ackCounter>=(info.getPrefetchSize()*.75)){
-                                MessageAck ack=new MessageAck(md,MessageAck.STANDARD_ACK_TYPE,ackCounter);
+                            if(ackCounter>=(info.getCurrentPrefetchSize()*.75)){
+                                MessageAck ack=new MessageAck(md,MessageAck.STANDARD_ACK_TYPE,deliveredMessages.size());
                                 session.asyncSendPacket(ack);
                                 ackCounter=0;
                                 deliveredMessages.clear();
                             }
-                        }else{
-                            MessageAck ack=new MessageAck(md,MessageAck.STANDARD_ACK_TYPE,deliveredMessages.size());
-                            session.asyncSendPacket(ack);
-                            deliveredMessages.clear();
+                            deliveryingAcknowledgements.set(false);
                         }
+                    }else{
+                        MessageAck ack=new MessageAck(md,MessageAck.STANDARD_ACK_TYPE,deliveredMessages.size());
+                        session.asyncSendPacket(ack);
+                        deliveredMessages.clear();
                     }
                 }
             }else if(session.isDupsOkAcknowledge()){
@@ -697,12 +720,10 @@
 
     public void rollback() throws JMSException{
         synchronized(unconsumedMessages.getMutex()){
-            synchronized(optimizeAcknowledge){
-                if(optimizeAcknowledge.get()){
-                    // remove messages read but not acked at the broker yet through optimizeAcknowledge
-                    for(int i=0;(i<deliveredMessages.size())&&(i<ackCounter);i++){
-                        deliveredMessages.removeLast();
-                    }
+            if(optimizeAcknowledge){
+                // remove messages read but not acked at the broker yet through optimizeAcknowledge
+                for(int i=0;(i<deliveredMessages.size())&&(i<ackCounter);i++){
+                    deliveredMessages.removeLast();
                 }
             }
             if(deliveredMessages.isEmpty())

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java?rev=394050&r1=394049&r2=394050&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java Fri Apr 14 01:38:50 2006
@@ -24,6 +24,7 @@
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.broker.region.policy.DeadLetterStrategy;
 import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ConsumerControl;
 import org.apache.activemq.command.ConsumerInfo;
 import org.apache.activemq.command.Message;
 import org.apache.activemq.command.MessageAck;
@@ -62,6 +63,7 @@
         if(!isFull()&&!isSlaveBroker()){
             dispatch(node);
         }else{
+            optimizePrefetch();
             synchronized(pending){
                 if( pending.isEmpty() )
                     if (log.isDebugEnabled()){
@@ -210,6 +212,20 @@
         return dispatched.size()-prefetchExtension>=info.getPrefetchSize();
     }
     
+    /**
+     * @return true when 60% or more room is left for dispatching messages
+     */
+    public boolean isLowWaterMark(){
+        return (dispatched.size()-prefetchExtension) <= (info.getPrefetchSize() *.4);
+    }
+    
+    /**
+     * @return true when 10% or less room is left for dispatching messages
+     */
+    public boolean isHighWaterMark(){
+        return (dispatched.size()-prefetchExtension) >= (info.getPrefetchSize() *.9);
+    }
+    
     synchronized public int getPendingQueueSize(){
         return pending.size();
     }
@@ -229,6 +245,26 @@
     synchronized public long getEnqueueCounter() {
         return enqueueCounter;
     }
+    
+    /**
+     * optimize message consumer prefetch if the consumer supports it
+     *
+     */
+    public void optimizePrefetch(){
+        if(info!=null&&info.isOptimizedAcknowledge()&&context!=null&&context.getConnection()!=null
+                        &&context.getConnection().isManageable()){
+            if(info.getCurrentPrefetchSize()!=info.getPrefetchSize() && isLowWaterMark()){
+                info.setCurrentPrefetchSize(info.getPrefetchSize());
+                updateConsumerPrefetch(info.getPrefetchSize());
+            }else if(info.getCurrentPrefetchSize()==info.getPrefetchSize() && isHighWaterMark()){
+                // want to purge any outstanding acks held by the consumer
+                info.setCurrentPrefetchSize(1);
+                updateConsumerPrefetch(1);
+            }
+        }
+    }
+    
+    
 
 
     protected void dispatchMatched() throws IOException{
@@ -287,6 +323,19 @@
                     context.getConnection().serviceException(e);
                 }
             }
+        }
+    }
+    
+    /**
+     * inform the MessageConsumer on the client to change it's prefetch
+     * @param newPrefetch
+     */
+    public void updateConsumerPrefetch(int newPrefetch){
+        if (context != null && context.getConnection() != null && context.getConnection().isManageable()){
+            ConsumerControl cc = new ConsumerControl();
+            cc.setConsumerId(info.getConsumerId());
+            cc.setPrefetch(newPrefetch);
+            context.getConnection().dispatchAsync(cc);
         }
     }
 

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Subscription.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Subscription.java?rev=394050&r1=394049&r2=394050&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Subscription.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Subscription.java Fri Apr 14 01:38:50 2006
@@ -148,4 +148,26 @@
      * Set when the subscription is registered in JMX
      */
     public void setObjectName(ObjectName objectName);
+    
+    /**
+     * @return true when 60% or more room is left for dispatching messages
+     */
+    public boolean isLowWaterMark();
+    
+    /**
+     * @return true when 10% or less room is left for dispatching messages
+     */
+    public boolean isHighWaterMark();
+    
+    /**
+     * inform the MessageConsumer on the client to change it's prefetch
+     * @param newPrefetch
+     */
+    public void updateConsumerPrefetch(int newPrefetch);
+    
+    /**
+     * optimize message consumer prefetch if the consumer supports it
+     *
+     */
+    public void optimizePrefetch();
 }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java?rev=394050&r1=394049&r2=394050&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java Fri Apr 14 01:38:50 2006
@@ -26,6 +26,7 @@
 import org.apache.activemq.broker.region.policy.OldestMessageEvictionStrategy;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ConsumerControl;
 import org.apache.activemq.command.ConsumerInfo;
 import org.apache.activemq.command.Message;
 import org.apache.activemq.command.MessageAck;
@@ -64,6 +65,7 @@
         enqueueCounter++;
         node.incrementReferenceCount();
         if(!isFull()&&!isSlaveBroker()){
+            optimizePrefetch();
             // if maximumPendingMessages is set we will only discard messages which
             // have not been dispatched (i.e. we allow the prefetch buffer to be filled)
             dispatch(node);
@@ -230,6 +232,51 @@
 
     private boolean isFull(){
         return dispatched.get()-delivered.get()>=info.getPrefetchSize();
+    }
+    
+    /**
+     * @return true when 60% or more room is left for dispatching messages
+     */
+    public boolean isLowWaterMark(){
+        return (dispatched.get()-delivered.get()) <= (info.getPrefetchSize() *.4);
+    }
+    
+    /**
+     * @return true when 10% or less room is left for dispatching messages
+     */
+    public boolean isHighWaterMark(){
+        return (dispatched.get()-delivered.get()) >= (info.getPrefetchSize() *.9);
+    }
+    
+    /**
+     * inform the MessageConsumer on the client to change it's prefetch
+     * @param newPrefetch
+     */
+    public void updateConsumerPrefetch(int newPrefetch){
+        if (context != null && context.getConnection() != null && context.getConnection().isManageable()){
+            ConsumerControl cc = new ConsumerControl();
+            cc.setConsumerId(info.getConsumerId());
+            cc.setPrefetch(newPrefetch);
+            context.getConnection().dispatchAsync(cc);
+        }
+    }
+    
+    /**
+     * optimize message consumer prefetch if the consumer supports it
+     *
+     */
+    public void optimizePrefetch(){
+        if(info!=null&&info.isOptimizedAcknowledge()&&context!=null&&context.getConnection()!=null
+                        &&context.getConnection().isManageable()){
+            if(info.getCurrentPrefetchSize()!=info.getPrefetchSize() && isLowWaterMark()){
+                info.setCurrentPrefetchSize(info.getPrefetchSize());
+                updateConsumerPrefetch(info.getPrefetchSize());
+            }else if(info.getCurrentPrefetchSize()==info.getPrefetchSize() && isHighWaterMark()){
+                // want to purge any outstanding acks held by the consumer
+                info.setCurrentPrefetchSize(1);
+                updateConsumerPrefetch(1);
+            }
+        }
     }
 
     private void dispatchMatched() throws IOException{

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ConsumerInfo.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ConsumerInfo.java?rev=394050&r1=394049&r2=394050&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ConsumerInfo.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ConsumerInfo.java Fri Apr 14 01:38:50 2006
@@ -49,6 +49,7 @@
     protected byte priority;
     protected BrokerId[] brokerPath;
     protected boolean optimizedAcknowledge;
+    protected transient int currentPrefetchSize;//used by the broker
     
     protected BooleanExpression additionalPredicate;
     protected transient boolean networkSubscription; //this subscription originated from a network connection
@@ -144,6 +145,7 @@
 
     public void setPrefetchSize(int prefetchSize) {
         this.prefetchSize = prefetchSize;
+        this.currentPrefetchSize = prefetchSize;
     }
 
     /**
@@ -320,6 +322,20 @@
      */
     public void setOptimizedAcknowledge(boolean optimizedAcknowledge){
         this.optimizedAcknowledge=optimizedAcknowledge;
+    }
+
+    /**
+     * @return Returns the currentPrefetchSize.
+     */
+    public int getCurrentPrefetchSize(){
+        return currentPrefetchSize;
+    }
+
+    /**
+     * @param currentPrefetchSize The currentPrefetchSize to set.
+     */
+    public void setCurrentPrefetchSize(int currentPrefetchSize){
+        this.currentPrefetchSize=currentPrefetchSize;
     }
 
 }