You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ra...@apache.org on 2006/04/14 10:38:52 UTC
svn commit: r394050 - in
/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq:
./ broker/region/ command/
Author: rajdavies
Date: Fri Apr 14 01:38:50 2006
New Revision: 394050
URL: http://svn.apache.org/viewcvs?rev=394050&view=rev
Log:
For optimized acknowledge, eagerly get acknowledgements from consumers
when the dispatched list gets too big.
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Subscription.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ConsumerInfo.java
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java?rev=394050&r1=394049&r2=394050&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java Fri Apr 14 01:38:50 2006
@@ -46,6 +46,7 @@
import org.apache.commons.logging.LogFactory;
import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean;
+import edu.emory.mathcs.backport.java.util.concurrent.*;
/**
* A client uses a <CODE>MessageConsumer</CODE> object to receive messages
@@ -111,7 +112,9 @@
private MessageAvailableListener availableListener;
private RedeliveryPolicy redeliveryPolicy;
- private AtomicBoolean optimizeAcknowledge = new AtomicBoolean();
+ private boolean optimizeAcknowledge;
+ private AtomicBoolean deliveryingAcknowledgements = new AtomicBoolean();
+ private ExecutorService executorService = null;
/**
* Create a MessageConsumer
@@ -160,6 +163,7 @@
this.info = new ConsumerInfo(consumerId);
this.info.setSubcriptionName(name);
this.info.setPrefetchSize(prefetch);
+ this.info.setCurrentPrefetchSize(prefetch);
this.info.setMaximumPendingMessageLimit(maximumPendingMessageCount);
this.info.setNoLocal(noLocal);
this.info.setDispatchAsync(dispatchAsync);
@@ -182,9 +186,9 @@
}
this.stats = new JMSConsumerStatsImpl(session.getSessionStats(), dest);
- this.optimizeAcknowledge.set(session.connection.isOptimizeAcknowledge()&&session.isAutoAcknowledge()
- &&!info.isBrowser());
- this.info.setOptimizedAcknowledge(this.optimizeAcknowledge.get());
+ this.optimizeAcknowledge=session.connection.isOptimizeAcknowledge()&&session.isAutoAcknowledge()
+ &&!info.isBrowser();
+ this.info.setOptimizedAcknowledge(this.optimizeAcknowledge);
try {
this.session.addConsumer(this);
this.session.syncSendPacket(info);
@@ -516,20 +520,35 @@
}
void deliverAcks(){
- synchronized(optimizeAcknowledge){
- if(this.optimizeAcknowledge.get()){
+ MessageAck ack=null;
+ if(deliveryingAcknowledgements.compareAndSet(false,true)){
+ if(this.optimizeAcknowledge){
if(!deliveredMessages.isEmpty()){
MessageDispatch md=(MessageDispatch) deliveredMessages.getFirst();
- MessageAck ack=new MessageAck(md,MessageAck.STANDARD_ACK_TYPE,deliveredMessages.size());
- try{
- session.asyncSendPacket(ack);
- }catch(JMSException e){
- log.error("Failed to delivered acknowledgements",e);
- }
+ ack=new MessageAck(md,MessageAck.STANDARD_ACK_TYPE,deliveredMessages.size());
deliveredMessages.clear();
ackCounter=0;
}
}
+ if(ack!=null){
+ final MessageAck ackToSend=ack;
+ if(executorService==null){
+ executorService=Executors.newSingleThreadExecutor();
+ }
+ executorService.submit(new Runnable(){
+ public void run(){
+ try{
+ session.asyncSendPacket(ackToSend);
+ }catch(JMSException e){
+ log.error("Failed to delivered acknowledgements",e);
+ }finally{
+ deliveryingAcknowledgements.set(false);
+ }
+ }
+ });
+ }else{
+ deliveryingAcknowledgements.set(false);
+ }
}
}
@@ -539,6 +558,9 @@
// Ack any delivered messages now. (session may still
// commit/rollback the acks).
deliverAcks();//only processes optimized acknowledgements
+ if (executorService!=null){
+ executorService.shutdown();
+ }
if ((session.isTransacted() || session.isDupsOkAcknowledge())) {
acknowledge();
}
@@ -562,15 +584,15 @@
}
protected void setOptimizeAcknowledge(boolean value){
- synchronized(optimizeAcknowledge){
+ if (optimizeAcknowledge && !value){
deliverAcks();
- optimizeAcknowledge.set(value);
}
+ optimizeAcknowledge=value;
}
protected void setPrefetchSize(int prefetch){
deliverAcks();
- this.info.setPrefetchSize(prefetch);
+ this.info.setCurrentPrefetchSize(prefetch);
}
private void beforeMessageIsConsumed(MessageDispatch md) {
@@ -590,20 +612,21 @@
ackLater(md,MessageAck.DELIVERED_ACK_TYPE);
}else if(session.isAutoAcknowledge()){
if(!deliveredMessages.isEmpty()){
- synchronized(optimizeAcknowledge){
- if(this.optimizeAcknowledge.get()){
+ if(optimizeAcknowledge){
+ if(deliveryingAcknowledgements.compareAndSet(false,true)){
ackCounter++;
- if(ackCounter>=(info.getPrefetchSize()*.75)){
- MessageAck ack=new MessageAck(md,MessageAck.STANDARD_ACK_TYPE,ackCounter);
+ if(ackCounter>=(info.getCurrentPrefetchSize()*.75)){
+ MessageAck ack=new MessageAck(md,MessageAck.STANDARD_ACK_TYPE,deliveredMessages.size());
session.asyncSendPacket(ack);
ackCounter=0;
deliveredMessages.clear();
}
- }else{
- MessageAck ack=new MessageAck(md,MessageAck.STANDARD_ACK_TYPE,deliveredMessages.size());
- session.asyncSendPacket(ack);
- deliveredMessages.clear();
+ deliveryingAcknowledgements.set(false);
}
+ }else{
+ MessageAck ack=new MessageAck(md,MessageAck.STANDARD_ACK_TYPE,deliveredMessages.size());
+ session.asyncSendPacket(ack);
+ deliveredMessages.clear();
}
}
}else if(session.isDupsOkAcknowledge()){
@@ -697,12 +720,10 @@
public void rollback() throws JMSException{
synchronized(unconsumedMessages.getMutex()){
- synchronized(optimizeAcknowledge){
- if(optimizeAcknowledge.get()){
- // remove messages read but not acked at the broker yet through optimizeAcknowledge
- for(int i=0;(i<deliveredMessages.size())&&(i<ackCounter);i++){
- deliveredMessages.removeLast();
- }
+ if(optimizeAcknowledge){
+ // remove messages read but not acked at the broker yet through optimizeAcknowledge
+ for(int i=0;(i<deliveredMessages.size())&&(i<ackCounter);i++){
+ deliveredMessages.removeLast();
}
}
if(deliveredMessages.isEmpty())
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java?rev=394050&r1=394049&r2=394050&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java Fri Apr 14 01:38:50 2006
@@ -24,6 +24,7 @@
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.policy.DeadLetterStrategy;
import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ConsumerControl;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
@@ -62,6 +63,7 @@
if(!isFull()&&!isSlaveBroker()){
dispatch(node);
}else{
+ optimizePrefetch();
synchronized(pending){
if( pending.isEmpty() )
if (log.isDebugEnabled()){
@@ -210,6 +212,20 @@
return dispatched.size()-prefetchExtension>=info.getPrefetchSize();
}
+ /**
+ * @return true when 60% or more room is left for dispatching messages
+ */
+ public boolean isLowWaterMark(){
+ return (dispatched.size()-prefetchExtension) <= (info.getPrefetchSize() *.4);
+ }
+
+ /**
+ * @return true when 10% or less room is left for dispatching messages
+ */
+ public boolean isHighWaterMark(){
+ return (dispatched.size()-prefetchExtension) >= (info.getPrefetchSize() *.9);
+ }
+
synchronized public int getPendingQueueSize(){
return pending.size();
}
@@ -229,6 +245,26 @@
synchronized public long getEnqueueCounter() {
return enqueueCounter;
}
+
+ /**
+ * optimize message consumer prefetch if the consumer supports it
+ *
+ */
+ public void optimizePrefetch(){
+ if(info!=null&&info.isOptimizedAcknowledge()&&context!=null&&context.getConnection()!=null
+ &&context.getConnection().isManageable()){
+ if(info.getCurrentPrefetchSize()!=info.getPrefetchSize() && isLowWaterMark()){
+ info.setCurrentPrefetchSize(info.getPrefetchSize());
+ updateConsumerPrefetch(info.getPrefetchSize());
+ }else if(info.getCurrentPrefetchSize()==info.getPrefetchSize() && isHighWaterMark()){
+ // want to purge any outstanding acks held by the consumer
+ info.setCurrentPrefetchSize(1);
+ updateConsumerPrefetch(1);
+ }
+ }
+ }
+
+
protected void dispatchMatched() throws IOException{
@@ -287,6 +323,19 @@
context.getConnection().serviceException(e);
}
}
+ }
+ }
+
+ /**
+ * inform the MessageConsumer on the client to change it's prefetch
+ * @param newPrefetch
+ */
+ public void updateConsumerPrefetch(int newPrefetch){
+ if (context != null && context.getConnection() != null && context.getConnection().isManageable()){
+ ConsumerControl cc = new ConsumerControl();
+ cc.setConsumerId(info.getConsumerId());
+ cc.setPrefetch(newPrefetch);
+ context.getConnection().dispatchAsync(cc);
}
}
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Subscription.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Subscription.java?rev=394050&r1=394049&r2=394050&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Subscription.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Subscription.java Fri Apr 14 01:38:50 2006
@@ -148,4 +148,26 @@
* Set when the subscription is registered in JMX
*/
public void setObjectName(ObjectName objectName);
+
+ /**
+ * @return true when 60% or more room is left for dispatching messages
+ */
+ public boolean isLowWaterMark();
+
+ /**
+ * @return true when 10% or less room is left for dispatching messages
+ */
+ public boolean isHighWaterMark();
+
+ /**
+ * inform the MessageConsumer on the client to change it's prefetch
+ * @param newPrefetch
+ */
+ public void updateConsumerPrefetch(int newPrefetch);
+
+ /**
+ * optimize message consumer prefetch if the consumer supports it
+ *
+ */
+ public void optimizePrefetch();
}
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java?rev=394050&r1=394049&r2=394050&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java Fri Apr 14 01:38:50 2006
@@ -26,6 +26,7 @@
import org.apache.activemq.broker.region.policy.OldestMessageEvictionStrategy;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ConsumerControl;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
@@ -64,6 +65,7 @@
enqueueCounter++;
node.incrementReferenceCount();
if(!isFull()&&!isSlaveBroker()){
+ optimizePrefetch();
// if maximumPendingMessages is set we will only discard messages which
// have not been dispatched (i.e. we allow the prefetch buffer to be filled)
dispatch(node);
@@ -230,6 +232,51 @@
private boolean isFull(){
return dispatched.get()-delivered.get()>=info.getPrefetchSize();
+ }
+
+ /**
+ * @return true when 60% or more room is left for dispatching messages
+ */
+ public boolean isLowWaterMark(){
+ return (dispatched.get()-delivered.get()) <= (info.getPrefetchSize() *.4);
+ }
+
+ /**
+ * @return true when 10% or less room is left for dispatching messages
+ */
+ public boolean isHighWaterMark(){
+ return (dispatched.get()-delivered.get()) >= (info.getPrefetchSize() *.9);
+ }
+
+ /**
+ * inform the MessageConsumer on the client to change it's prefetch
+ * @param newPrefetch
+ */
+ public void updateConsumerPrefetch(int newPrefetch){
+ if (context != null && context.getConnection() != null && context.getConnection().isManageable()){
+ ConsumerControl cc = new ConsumerControl();
+ cc.setConsumerId(info.getConsumerId());
+ cc.setPrefetch(newPrefetch);
+ context.getConnection().dispatchAsync(cc);
+ }
+ }
+
+ /**
+ * optimize message consumer prefetch if the consumer supports it
+ *
+ */
+ public void optimizePrefetch(){
+ if(info!=null&&info.isOptimizedAcknowledge()&&context!=null&&context.getConnection()!=null
+ &&context.getConnection().isManageable()){
+ if(info.getCurrentPrefetchSize()!=info.getPrefetchSize() && isLowWaterMark()){
+ info.setCurrentPrefetchSize(info.getPrefetchSize());
+ updateConsumerPrefetch(info.getPrefetchSize());
+ }else if(info.getCurrentPrefetchSize()==info.getPrefetchSize() && isHighWaterMark()){
+ // want to purge any outstanding acks held by the consumer
+ info.setCurrentPrefetchSize(1);
+ updateConsumerPrefetch(1);
+ }
+ }
}
private void dispatchMatched() throws IOException{
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ConsumerInfo.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ConsumerInfo.java?rev=394050&r1=394049&r2=394050&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ConsumerInfo.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ConsumerInfo.java Fri Apr 14 01:38:50 2006
@@ -49,6 +49,7 @@
protected byte priority;
protected BrokerId[] brokerPath;
protected boolean optimizedAcknowledge;
+ protected transient int currentPrefetchSize;//used by the broker
protected BooleanExpression additionalPredicate;
protected transient boolean networkSubscription; //this subscription originated from a network connection
@@ -144,6 +145,7 @@
public void setPrefetchSize(int prefetchSize) {
this.prefetchSize = prefetchSize;
+ this.currentPrefetchSize = prefetchSize;
}
/**
@@ -320,6 +322,20 @@
*/
public void setOptimizedAcknowledge(boolean optimizedAcknowledge){
this.optimizedAcknowledge=optimizedAcknowledge;
+ }
+
+ /**
+ * @return Returns the currentPrefetchSize.
+ */
+ public int getCurrentPrefetchSize(){
+ return currentPrefetchSize;
+ }
+
+ /**
+ * @param currentPrefetchSize The currentPrefetchSize to set.
+ */
+ public void setCurrentPrefetchSize(int currentPrefetchSize){
+ this.currentPrefetchSize=currentPrefetchSize;
}
}