You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@activemq.apache.org by "Matthias Wessel (Updated) (JIRA)" <ji...@apache.org> on 2012/01/17 10:56:40 UTC
[jira] [Updated] (AMQ-3664) Not all messages will be acknowledged
when optimizeAcknowledge is true
[ https://issues.apache.org/jira/browse/AMQ-3664?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Matthias Wessel updated AMQ-3664:
---------------------------------
Summary: Not all messages will be acknowledged when optimizeAcknowledge is true (was: Not all messages will be consumed when optimizeAcknowledge is true)
> Not all messages will be acknowledged when optimizeAcknowledge is true
> ----------------------------------------------------------------------
>
> Key: AMQ-3664
> URL: https://issues.apache.org/jira/browse/AMQ-3664
> Project: ActiveMQ
> Issue Type: Bug
> Components: Broker
> Affects Versions: 5.5.1
> Environment: Windows 7 and Linux Debian with JRE 1.6.24 or JRE 1.6.27
> Reporter: Matthias Wessel
> Priority: Critical
>
> I make performance test with activemq. When I set optimizeAcknowledge = true I get a dramatic performance improvement, but when I shut down the producer the consumer does not acknowledge all messages! If I stop the consumer and then I start the consumer a second time the consumer recieves messages again and again not all messages will be acknoledged in the queue.
> I am using camel 2.9.0 to produce and consume the messages.
> I am using the consumer Template with asyncSendBody.
> The following route is configured in the camelContext:
> <camel:camelContext id="camelContext">
> <camel:template id="producerTemplate"/>
> <camel:consumerTemplate id="consumerTemplate"/>
> <camel:route>
> <camel:from uri="jms:queue0?concurrentConsumers=3&maxConcurrentConsumers=10&asyncConsumer=true"/>
> <camel:to uri="beanConsumer"/>
> </camel:route>
> </camel:camelContext>
> The config for the ActiveMQComponent:
> <bean id="jms" class="org.apache.activemq.camel.component.ActiveMQComponent">
> <property name="connectionFactory">
> <bean class="org.apache.activemq.pool.PooledConnectionFactory">
> <property name="connectionFactory">
> <bean class="org.apache.activemq.spring.ActiveMQConnectionFactory">
> <property name="optimizeAcknowledge" value="true"/>
> <property name="dispatchAsync" value="true"/>
> <property name="sendAcksAsync" value="true"/>
> <property name="useAsyncSend" value="true"/>
> <property name="brokerURL" value="nio://138-ham-de:61616"/>
> <property name="useDedicatedTaskRunner" value="false"/>
> </bean>
> </property>
> </bean>
> </property>
> </bean>
> I think, the problem is here:
> Class ActiveMQMessageConsumer:
> private void afterMessageIsConsumed(MessageDispatch md, boolean messageExpired) throws JMSException {
> if (unconsumedMessages.isClosed()) {
> return;
> }
> if (messageExpired) {
> synchronized (deliveredMessages) {
> deliveredMessages.remove(md);
> }
> stats.getExpiredMessageCount().increment();
> ackLater(md, MessageAck.DELIVERED_ACK_TYPE);
> } else {
> stats.onMessage();
> if (session.getTransacted()) {
> // Do nothing.
> } else if (isAutoAcknowledgeEach()) {
> if (deliveryingAcknowledgements.compareAndSet(false, true)) {
> synchronized (deliveredMessages) {
> if (!deliveredMessages.isEmpty()) {
> if (optimizeAcknowledge) {
> ackCounter++;
> if (ackCounter >= (info.getPrefetchSize() * .65) || System.currentTimeMillis() >= (optimizeAckTimestamp + optimizeAckTimeout)) {
> MessageAck ack = makeAckForAllDeliveredMessages(MessageAck.STANDARD_ACK_TYPE);
> if (ack != null) {
> deliveredMessages.clear();
> ackCounter = 0;
> session.sendAck(ack);
> optimizeAckTimestamp = System.currentTimeMillis();
> }
> }
> } else {
> MessageAck ack = makeAckForAllDeliveredMessages(MessageAck.STANDARD_ACK_TYPE);
> if (ack!=null) {
> deliveredMessages.clear();
> session.sendAck(ack);
> }
> }
> }
> }
> deliveryingAcknowledgements.set(false);
> }
> } else if (isAutoAcknowledgeBatch()) {
> ackLater(md, MessageAck.STANDARD_ACK_TYPE);
> } else if (session.isClientAcknowledge()||session.isIndividualAcknowledge()) {
> boolean messageUnackedByConsumer = false;
> synchronized (deliveredMessages) {
> messageUnackedByConsumer = deliveredMessages.contains(md);
> }
> if (messageUnackedByConsumer) {
> ackLater(md, MessageAck.DELIVERED_ACK_TYPE);
> }
> }
> else {
> throw new IllegalStateException("Invalid session state.");
> }
> }
> }
> What will happen when no producer will send a message to this queue so that no message will pass this method? When will the deliveredMessages been acked?
--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira