You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@activemq.apache.org by "Matthias Wessel (Updated) (JIRA)" <ji...@apache.org> on 2012/01/17 10:56:40 UTC

[jira] [Updated] (AMQ-3664) Not all messages will be acknowledged when optimizeAcknowledge is true

     [ https://issues.apache.org/jira/browse/AMQ-3664?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Matthias Wessel updated AMQ-3664:
---------------------------------

    Summary: Not all messages will be acknowledged when optimizeAcknowledge is true  (was: Not all messages will be consumed when optimizeAcknowledge is true)
    
> Not all messages will be acknowledged when optimizeAcknowledge is true
> ----------------------------------------------------------------------
>
>                 Key: AMQ-3664
>                 URL: https://issues.apache.org/jira/browse/AMQ-3664
>             Project: ActiveMQ
>          Issue Type: Bug
>          Components: Broker
>    Affects Versions: 5.5.1
>         Environment: Windows 7 and Linux Debian with JRE 1.6.24 or JRE 1.6.27
>            Reporter: Matthias Wessel
>            Priority: Critical
>
> I make performance test with activemq. When I set optimizeAcknowledge = true I get a dramatic performance improvement, but when I shut down the producer the consumer does not acknowledge all messages! If I stop the consumer and then I start the consumer a second time the consumer recieves messages again and again not all messages will be acknoledged in the queue.
> I am using camel 2.9.0 to produce and consume the messages.
> I am using the consumer Template with asyncSendBody.
> The following route is configured in the camelContext:
>     <camel:camelContext id="camelContext">
>     	<camel:template id="producerTemplate"/>
>     	<camel:consumerTemplate id="consumerTemplate"/>
>     	<camel:route>
>     		<camel:from uri="jms:queue0?concurrentConsumers=3&amp;maxConcurrentConsumers=10&amp;asyncConsumer=true"/>
>     		<camel:to uri="beanConsumer"/>
>     	</camel:route>
>     </camel:camelContext>
> The config for the ActiveMQComponent:
>     <bean id="jms" class="org.apache.activemq.camel.component.ActiveMQComponent">
> 		<property name="connectionFactory">		
> 			<bean class="org.apache.activemq.pool.PooledConnectionFactory">
>    				<property name="connectionFactory">
>   					<bean class="org.apache.activemq.spring.ActiveMQConnectionFactory">
>    						<property name="optimizeAcknowledge" value="true"/>
>    						<property name="dispatchAsync" value="true"/>
>   						<property name="sendAcksAsync" value="true"/>
>   						<property name="useAsyncSend" value="true"/>
> 				 		<property name="brokerURL" value="nio://138-ham-de:61616"/>				 		
> 				 		<property name="useDedicatedTaskRunner" value="false"/> 
>     				</bean>	
>       			</property>
>       		</bean>
>       	</property>
>     </bean>
> I think, the problem is here:
> Class ActiveMQMessageConsumer:
>     private void afterMessageIsConsumed(MessageDispatch md, boolean messageExpired) throws JMSException {
>         if (unconsumedMessages.isClosed()) {
>             return;
>         }
>         if (messageExpired) {
>             synchronized (deliveredMessages) {
>                 deliveredMessages.remove(md);
>             }
>             stats.getExpiredMessageCount().increment();
>             ackLater(md, MessageAck.DELIVERED_ACK_TYPE);
>         } else {
>             stats.onMessage();
>             if (session.getTransacted()) {
>                 // Do nothing.
>             } else if (isAutoAcknowledgeEach()) {
>                 if (deliveryingAcknowledgements.compareAndSet(false, true)) {
>                     synchronized (deliveredMessages) {
>                         if (!deliveredMessages.isEmpty()) {
>                             if (optimizeAcknowledge) {
>                                 ackCounter++;
>                                 if (ackCounter >= (info.getPrefetchSize() * .65) || System.currentTimeMillis() >= (optimizeAckTimestamp + optimizeAckTimeout)) {
>                                 	MessageAck ack = makeAckForAllDeliveredMessages(MessageAck.STANDARD_ACK_TYPE);
>                                 	if (ack != null) {
>                             		    deliveredMessages.clear();
>                             		    ackCounter = 0;
>                             		    session.sendAck(ack);
>                             		    optimizeAckTimestamp = System.currentTimeMillis();
>                                 	}
>                                 }
>                             } else {
>                                 MessageAck ack = makeAckForAllDeliveredMessages(MessageAck.STANDARD_ACK_TYPE);
>                                 if (ack!=null) {
>                                     deliveredMessages.clear();
>                                     session.sendAck(ack);
>                                 }
>                             }
>                         }
>                     }
>                     deliveryingAcknowledgements.set(false);
>                 }
>             } else if (isAutoAcknowledgeBatch()) {
>                 ackLater(md, MessageAck.STANDARD_ACK_TYPE);
>             } else if (session.isClientAcknowledge()||session.isIndividualAcknowledge()) {
>                 boolean messageUnackedByConsumer = false;
>                 synchronized (deliveredMessages) {
>                     messageUnackedByConsumer = deliveredMessages.contains(md);
>                 }
>                 if (messageUnackedByConsumer) {
>                     ackLater(md, MessageAck.DELIVERED_ACK_TYPE);
>                 }
>             } 
>             else {
>                 throw new IllegalStateException("Invalid session state.");
>             }
>         }
>     }
> What will happen when no producer will send a message to this queue so that no message will pass this method? When will the deliveredMessages been acked?

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira