You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@activemq.apache.org by "Matthias Wessel (Updated) (JIRA)" <ji...@apache.org> on 2012/01/17 10:42:40 UTC

[jira] [Updated] (AMQ-3664) Not all messages will be consumed when optimizeAcknowledge is true

     [ https://issues.apache.org/jira/browse/AMQ-3664?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Matthias Wessel updated AMQ-3664:
---------------------------------

          Component/s: Broker
          Description: 
I make performance test with activemq. When I set optimizeAcknowledge = true I get a dramatic performance improvement, but when I shut down the producer the consumer does not acknowledge all messages! If I stop the consumer and then I start the consumer a second time the consumer recieves messages again and again not all messages will be acknoledged in the queue.

I am using camel 2.9.0 to produce and consume the messages.
I am using the consumer Template with asyncSendBody.
The following route is configured in the camelContext:

    <camel:camelContext id="camelContext">
    	<camel:template id="producerTemplate"/>
    	<camel:consumerTemplate id="consumerTemplate"/>
    	<camel:route>
    		<camel:from uri="jms:queue0?concurrentConsumers=3&amp;maxConcurrentConsumers=10&amp;asyncConsumer=true"/>
    		<camel:to uri="beanConsumer"/>
    	</camel:route>
    </camel:camelContext>

The config for the ActiveMQComponent:
    <bean id="jms" class="org.apache.activemq.camel.component.ActiveMQComponent">
		<property name="connectionFactory">		
			<bean class="org.apache.activemq.pool.PooledConnectionFactory">
   				<property name="connectionFactory">
  					<bean class="org.apache.activemq.spring.ActiveMQConnectionFactory">
   						<property name="optimizeAcknowledge" value="true"/>
   						<property name="dispatchAsync" value="true"/>
  						<property name="sendAcksAsync" value="true"/>
  						<property name="useAsyncSend" value="true"/>
				 		<property name="brokerURL" value="nio://138-ham-de:61616"/>				 		
				 		<property name="useDedicatedTaskRunner" value="false"/> 
    				</bean>	
      			</property>
      		</bean>
      	</property>
    </bean>

I think, the problem is here:
    private void afterMessageIsConsumed(MessageDispatch md, boolean messageExpired) throws JMSException {
        if (unconsumedMessages.isClosed()) {
            return;
        }
        if (messageExpired) {
            synchronized (deliveredMessages) {
                deliveredMessages.remove(md);
            }
            stats.getExpiredMessageCount().increment();
            ackLater(md, MessageAck.DELIVERED_ACK_TYPE);
        } else {
            stats.onMessage();
            if (session.getTransacted()) {
                // Do nothing.
            } else if (isAutoAcknowledgeEach()) {
                if (deliveryingAcknowledgements.compareAndSet(false, true)) {
                    synchronized (deliveredMessages) {
                        if (!deliveredMessages.isEmpty()) {
                            if (optimizeAcknowledge) {
                                ackCounter++;
                                if (ackCounter >= (info.getPrefetchSize() * .65) || System.currentTimeMillis() >= (optimizeAckTimestamp + optimizeAckTimeout)) {
                                	MessageAck ack = makeAckForAllDeliveredMessages(MessageAck.STANDARD_ACK_TYPE);
                                	if (ack != null) {
                            		    deliveredMessages.clear();
                            		    ackCounter = 0;
                            		    session.sendAck(ack);
                            		    optimizeAckTimestamp = System.currentTimeMillis();
                                	}
                                }
                            } else {
                                MessageAck ack = makeAckForAllDeliveredMessages(MessageAck.STANDARD_ACK_TYPE);
                                if (ack!=null) {
                                    deliveredMessages.clear();
                                    session.sendAck(ack);
                                }
                            }
                        }
                    }
                    deliveryingAcknowledgements.set(false);
                }
            } else if (isAutoAcknowledgeBatch()) {
                ackLater(md, MessageAck.STANDARD_ACK_TYPE);
            } else if (session.isClientAcknowledge()||session.isIndividualAcknowledge()) {
                boolean messageUnackedByConsumer = false;
                synchronized (deliveredMessages) {
                    messageUnackedByConsumer = deliveredMessages.contains(md);
                }
                if (messageUnackedByConsumer) {
                    ackLater(md, MessageAck.DELIVERED_ACK_TYPE);
                }
            } 
            else {
                throw new IllegalStateException("Invalid session state.");
            }
        }
    }
What will happen when no producer will send a message to this queue so that no message will pass this method? When will the deliveredMessages been acked?




  was:
I make performance test with activemq. When I set optimizeAcknowledge = true I get a dramatic performance improvement, but when I shut down the producer the consumer does not acknowledge all messages! If I stop the consumer and then I start the consumer a second time the consumer recieves messages again and again not all messages will be acknoledged in the queue.

I am using camel 2.9.0 to produce and consume the messages.
I am using the consumer Template with asyncSendBody.
The following route is configured in the camelContext:

    <camel:camelContext id="camelContext">
    	<camel:template id="producerTemplate"/>
    	<camel:consumerTemplate id="consumerTemplate"/>
    	<camel:route>
    		<camel:from uri="jms:queue0?concurrentConsumers=3&amp;maxConcurrentConsumers=10&amp;asyncConsumer=true"/>
    		<camel:to uri="beanConsumer"/>
    	</camel:route>
    </camel:camelContext>

The config for the ActiveMQComponent:
    <bean id="jms" class="org.apache.activemq.camel.component.ActiveMQComponent">
		<property name="connectionFactory">		
			<bean class="org.apache.activemq.pool.PooledConnectionFactory">
   				<property name="connectionFactory">
  					<bean class="org.apache.activemq.spring.ActiveMQConnectionFactory">
   						<property name="optimizeAcknowledge" value="true"/>
   						<property name="dispatchAsync" value="true"/>
  						<property name="sendAcksAsync" value="true"/>
  						<property name="useAsyncSend" value="true"/>
				 		<property name="brokerURL" value="nio://138-ham-de:61616"/>				 		
				 		<property name="useDedicatedTaskRunner" value="false"/> 
    				</bean>	
      			</property>
      		</bean>
      	</property>
    </bean>



          Environment: Windows 7 and Linux Debian with JRE 1.6.24 or JRE 1.6.27
    Affects Version/s: 5.5.1
    
> Not all messages will be consumed when optimizeAcknowledge is true
> ------------------------------------------------------------------
>
>                 Key: AMQ-3664
>                 URL: https://issues.apache.org/jira/browse/AMQ-3664
>             Project: ActiveMQ
>          Issue Type: Bug
>          Components: Broker
>    Affects Versions: 5.5.1
>         Environment: Windows 7 and Linux Debian with JRE 1.6.24 or JRE 1.6.27
>            Reporter: Matthias Wessel
>            Priority: Critical
>
> I make performance test with activemq. When I set optimizeAcknowledge = true I get a dramatic performance improvement, but when I shut down the producer the consumer does not acknowledge all messages! If I stop the consumer and then I start the consumer a second time the consumer recieves messages again and again not all messages will be acknoledged in the queue.
> I am using camel 2.9.0 to produce and consume the messages.
> I am using the consumer Template with asyncSendBody.
> The following route is configured in the camelContext:
>     <camel:camelContext id="camelContext">
>     	<camel:template id="producerTemplate"/>
>     	<camel:consumerTemplate id="consumerTemplate"/>
>     	<camel:route>
>     		<camel:from uri="jms:queue0?concurrentConsumers=3&amp;maxConcurrentConsumers=10&amp;asyncConsumer=true"/>
>     		<camel:to uri="beanConsumer"/>
>     	</camel:route>
>     </camel:camelContext>
> The config for the ActiveMQComponent:
>     <bean id="jms" class="org.apache.activemq.camel.component.ActiveMQComponent">
> 		<property name="connectionFactory">		
> 			<bean class="org.apache.activemq.pool.PooledConnectionFactory">
>    				<property name="connectionFactory">
>   					<bean class="org.apache.activemq.spring.ActiveMQConnectionFactory">
>    						<property name="optimizeAcknowledge" value="true"/>
>    						<property name="dispatchAsync" value="true"/>
>   						<property name="sendAcksAsync" value="true"/>
>   						<property name="useAsyncSend" value="true"/>
> 				 		<property name="brokerURL" value="nio://138-ham-de:61616"/>				 		
> 				 		<property name="useDedicatedTaskRunner" value="false"/> 
>     				</bean>	
>       			</property>
>       		</bean>
>       	</property>
>     </bean>
> I think, the problem is here:
>     private void afterMessageIsConsumed(MessageDispatch md, boolean messageExpired) throws JMSException {
>         if (unconsumedMessages.isClosed()) {
>             return;
>         }
>         if (messageExpired) {
>             synchronized (deliveredMessages) {
>                 deliveredMessages.remove(md);
>             }
>             stats.getExpiredMessageCount().increment();
>             ackLater(md, MessageAck.DELIVERED_ACK_TYPE);
>         } else {
>             stats.onMessage();
>             if (session.getTransacted()) {
>                 // Do nothing.
>             } else if (isAutoAcknowledgeEach()) {
>                 if (deliveryingAcknowledgements.compareAndSet(false, true)) {
>                     synchronized (deliveredMessages) {
>                         if (!deliveredMessages.isEmpty()) {
>                             if (optimizeAcknowledge) {
>                                 ackCounter++;
>                                 if (ackCounter >= (info.getPrefetchSize() * .65) || System.currentTimeMillis() >= (optimizeAckTimestamp + optimizeAckTimeout)) {
>                                 	MessageAck ack = makeAckForAllDeliveredMessages(MessageAck.STANDARD_ACK_TYPE);
>                                 	if (ack != null) {
>                             		    deliveredMessages.clear();
>                             		    ackCounter = 0;
>                             		    session.sendAck(ack);
>                             		    optimizeAckTimestamp = System.currentTimeMillis();
>                                 	}
>                                 }
>                             } else {
>                                 MessageAck ack = makeAckForAllDeliveredMessages(MessageAck.STANDARD_ACK_TYPE);
>                                 if (ack!=null) {
>                                     deliveredMessages.clear();
>                                     session.sendAck(ack);
>                                 }
>                             }
>                         }
>                     }
>                     deliveryingAcknowledgements.set(false);
>                 }
>             } else if (isAutoAcknowledgeBatch()) {
>                 ackLater(md, MessageAck.STANDARD_ACK_TYPE);
>             } else if (session.isClientAcknowledge()||session.isIndividualAcknowledge()) {
>                 boolean messageUnackedByConsumer = false;
>                 synchronized (deliveredMessages) {
>                     messageUnackedByConsumer = deliveredMessages.contains(md);
>                 }
>                 if (messageUnackedByConsumer) {
>                     ackLater(md, MessageAck.DELIVERED_ACK_TYPE);
>                 }
>             } 
>             else {
>                 throw new IllegalStateException("Invalid session state.");
>             }
>         }
>     }
> What will happen when no producer will send a message to this queue so that no message will pass this method? When will the deliveredMessages been acked?

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira