You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@activemq.apache.org by "Matthias Wessel (Issue Comment Edited) (JIRA)" <ji...@apache.org> on 2012/01/18 13:15:39 UTC

[jira] [Issue Comment Edited] (AMQ-3664) Not all messages will be acknowledged when optimizeAcknowledge is true

    [ https://issues.apache.org/jira/browse/AMQ-3664?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13188421#comment-13188421 ] 

Matthias Wessel edited comment on AMQ-3664 at 1/18/12 12:15 PM:
----------------------------------------------------------------

When I switch to another cache than CACHE_AUTO or CACHE CONSUMER, I get a dramatic deterioration of performance. Then I am faster when I do not use ptimizeAcknowledge = true. This cannot be the goal of this setting!
The other problem is I do not want to close the consumer. Yes, I want to close the producer, but the consumer should consume all existing messages in the queue. The consumer should not care about how many producers produce messages to the queue.
                
      was (Author: matw):
    When I switch to another cache than CACHE_AUTO or CACHE CONSUMER, I get a dramatic deterioration of performance. Then I am faster when I do not use ptimizeAcknowledge = true. This cannot be the goal of this setting!
                  
> Not all messages will be acknowledged when optimizeAcknowledge is true
> ----------------------------------------------------------------------
>
>                 Key: AMQ-3664
>                 URL: https://issues.apache.org/jira/browse/AMQ-3664
>             Project: ActiveMQ
>          Issue Type: Bug
>          Components: Broker
>    Affects Versions: 5.5.1
>         Environment: Windows 7 and Linux Debian with JRE 1.6.24 or JRE 1.6.27
>            Reporter: Matthias Wessel
>            Priority: Critical
>
> I make performance test with activemq. When I set optimizeAcknowledge = true I get a dramatic performance improvement, but when I shut down the producer the consumer does not acknowledge all messages! If I stop the consumer and then I start the consumer a second time the consumer recieves messages again and again not all messages will be acknoledged in the queue.
> I am using camel 2.9.0 to produce and consume the messages.
> I am using the consumer Template with asyncSendBody.
> The following route is configured in the camelContext:
>     <camel:camelContext id="camelContext">
>     	<camel:template id="producerTemplate"/>
>     	<camel:consumerTemplate id="consumerTemplate"/>
>     	<camel:route>
>     		<camel:from uri="jms:queue0?concurrentConsumers=3&amp;maxConcurrentConsumers=10&amp;asyncConsumer=true"/>
>     		<camel:to uri="beanConsumer"/>
>     	</camel:route>
>     </camel:camelContext>
> The config for the ActiveMQComponent:
>     <bean id="jms" class="org.apache.activemq.camel.component.ActiveMQComponent">
> 		<property name="connectionFactory">		
> 			<bean class="org.apache.activemq.pool.PooledConnectionFactory">
>    				<property name="connectionFactory">
>   					<bean class="org.apache.activemq.spring.ActiveMQConnectionFactory">
>    						<property name="optimizeAcknowledge" value="true"/>
>    						<property name="dispatchAsync" value="true"/>
>   						<property name="sendAcksAsync" value="true"/>
>   						<property name="useAsyncSend" value="true"/>
> 				 		<property name="brokerURL" value="nio://138-ham-de:61616"/>				 		
> 				 		<property name="useDedicatedTaskRunner" value="false"/> 
>     				</bean>	
>       			</property>
>       		</bean>
>       	</property>
>     </bean>
> I think, the problem is here:
> Class ActiveMQMessageConsumer:
>     private void afterMessageIsConsumed(MessageDispatch md, boolean messageExpired) throws JMSException {
>         if (unconsumedMessages.isClosed()) {
>             return;
>         }
>         if (messageExpired) {
>             synchronized (deliveredMessages) {
>                 deliveredMessages.remove(md);
>             }
>             stats.getExpiredMessageCount().increment();
>             ackLater(md, MessageAck.DELIVERED_ACK_TYPE);
>         } else {
>             stats.onMessage();
>             if (session.getTransacted()) {
>                 // Do nothing.
>             } else if (isAutoAcknowledgeEach()) {
>                 if (deliveryingAcknowledgements.compareAndSet(false, true)) {
>                     synchronized (deliveredMessages) {
>                         if (!deliveredMessages.isEmpty()) {
>                             if (optimizeAcknowledge) {
>                                 ackCounter++;
>                                 if (ackCounter >= (info.getPrefetchSize() * .65) || System.currentTimeMillis() >= (optimizeAckTimestamp + optimizeAckTimeout)) {
>                                 	MessageAck ack = makeAckForAllDeliveredMessages(MessageAck.STANDARD_ACK_TYPE);
>                                 	if (ack != null) {
>                             		    deliveredMessages.clear();
>                             		    ackCounter = 0;
>                             		    session.sendAck(ack);
>                             		    optimizeAckTimestamp = System.currentTimeMillis();
>                                 	}
>                                 }
>                             } else {
>                                 MessageAck ack = makeAckForAllDeliveredMessages(MessageAck.STANDARD_ACK_TYPE);
>                                 if (ack!=null) {
>                                     deliveredMessages.clear();
>                                     session.sendAck(ack);
>                                 }
>                             }
>                         }
>                     }
>                     deliveryingAcknowledgements.set(false);
>                 }
>             } else if (isAutoAcknowledgeBatch()) {
>                 ackLater(md, MessageAck.STANDARD_ACK_TYPE);
>             } else if (session.isClientAcknowledge()||session.isIndividualAcknowledge()) {
>                 boolean messageUnackedByConsumer = false;
>                 synchronized (deliveredMessages) {
>                     messageUnackedByConsumer = deliveredMessages.contains(md);
>                 }
>                 if (messageUnackedByConsumer) {
>                     ackLater(md, MessageAck.DELIVERED_ACK_TYPE);
>                 }
>             } 
>             else {
>                 throw new IllegalStateException("Invalid session state.");
>             }
>         }
>     }
> What will happen when no producer will send a message to this queue so that no message will pass this method? When will the deliveredMessages been acked?

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira