You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@activemq.apache.org by "Matthias Wessel (Updated) (JIRA)" <ji...@apache.org> on 2012/01/17 10:42:40 UTC
[jira] [Updated] (AMQ-3664) Not all messages will be consumed when
optimizeAcknowledge is true
[ https://issues.apache.org/jira/browse/AMQ-3664?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Matthias Wessel updated AMQ-3664:
---------------------------------
Component/s: Broker
Description:
I make performance test with activemq. When I set optimizeAcknowledge = true I get a dramatic performance improvement, but when I shut down the producer the consumer does not acknowledge all messages! If I stop the consumer and then I start the consumer a second time the consumer recieves messages again and again not all messages will be acknoledged in the queue.
I am using camel 2.9.0 to produce and consume the messages.
I am using the consumer Template with asyncSendBody.
The following route is configured in the camelContext:
<camel:camelContext id="camelContext">
<camel:template id="producerTemplate"/>
<camel:consumerTemplate id="consumerTemplate"/>
<camel:route>
<camel:from uri="jms:queue0?concurrentConsumers=3&maxConcurrentConsumers=10&asyncConsumer=true"/>
<camel:to uri="beanConsumer"/>
</camel:route>
</camel:camelContext>
The config for the ActiveMQComponent:
<bean id="jms" class="org.apache.activemq.camel.component.ActiveMQComponent">
<property name="connectionFactory">
<bean class="org.apache.activemq.pool.PooledConnectionFactory">
<property name="connectionFactory">
<bean class="org.apache.activemq.spring.ActiveMQConnectionFactory">
<property name="optimizeAcknowledge" value="true"/>
<property name="dispatchAsync" value="true"/>
<property name="sendAcksAsync" value="true"/>
<property name="useAsyncSend" value="true"/>
<property name="brokerURL" value="nio://138-ham-de:61616"/>
<property name="useDedicatedTaskRunner" value="false"/>
</bean>
</property>
</bean>
</property>
</bean>
I think, the problem is here:
private void afterMessageIsConsumed(MessageDispatch md, boolean messageExpired) throws JMSException {
if (unconsumedMessages.isClosed()) {
return;
}
if (messageExpired) {
synchronized (deliveredMessages) {
deliveredMessages.remove(md);
}
stats.getExpiredMessageCount().increment();
ackLater(md, MessageAck.DELIVERED_ACK_TYPE);
} else {
stats.onMessage();
if (session.getTransacted()) {
// Do nothing.
} else if (isAutoAcknowledgeEach()) {
if (deliveryingAcknowledgements.compareAndSet(false, true)) {
synchronized (deliveredMessages) {
if (!deliveredMessages.isEmpty()) {
if (optimizeAcknowledge) {
ackCounter++;
if (ackCounter >= (info.getPrefetchSize() * .65) || System.currentTimeMillis() >= (optimizeAckTimestamp + optimizeAckTimeout)) {
MessageAck ack = makeAckForAllDeliveredMessages(MessageAck.STANDARD_ACK_TYPE);
if (ack != null) {
deliveredMessages.clear();
ackCounter = 0;
session.sendAck(ack);
optimizeAckTimestamp = System.currentTimeMillis();
}
}
} else {
MessageAck ack = makeAckForAllDeliveredMessages(MessageAck.STANDARD_ACK_TYPE);
if (ack!=null) {
deliveredMessages.clear();
session.sendAck(ack);
}
}
}
}
deliveryingAcknowledgements.set(false);
}
} else if (isAutoAcknowledgeBatch()) {
ackLater(md, MessageAck.STANDARD_ACK_TYPE);
} else if (session.isClientAcknowledge()||session.isIndividualAcknowledge()) {
boolean messageUnackedByConsumer = false;
synchronized (deliveredMessages) {
messageUnackedByConsumer = deliveredMessages.contains(md);
}
if (messageUnackedByConsumer) {
ackLater(md, MessageAck.DELIVERED_ACK_TYPE);
}
}
else {
throw new IllegalStateException("Invalid session state.");
}
}
}
What will happen when no producer will send a message to this queue so that no message will pass this method? When will the deliveredMessages been acked?
was:
I make performance test with activemq. When I set optimizeAcknowledge = true I get a dramatic performance improvement, but when I shut down the producer the consumer does not acknowledge all messages! If I stop the consumer and then I start the consumer a second time the consumer recieves messages again and again not all messages will be acknoledged in the queue.
I am using camel 2.9.0 to produce and consume the messages.
I am using the consumer Template with asyncSendBody.
The following route is configured in the camelContext:
<camel:camelContext id="camelContext">
<camel:template id="producerTemplate"/>
<camel:consumerTemplate id="consumerTemplate"/>
<camel:route>
<camel:from uri="jms:queue0?concurrentConsumers=3&maxConcurrentConsumers=10&asyncConsumer=true"/>
<camel:to uri="beanConsumer"/>
</camel:route>
</camel:camelContext>
The config for the ActiveMQComponent:
<bean id="jms" class="org.apache.activemq.camel.component.ActiveMQComponent">
<property name="connectionFactory">
<bean class="org.apache.activemq.pool.PooledConnectionFactory">
<property name="connectionFactory">
<bean class="org.apache.activemq.spring.ActiveMQConnectionFactory">
<property name="optimizeAcknowledge" value="true"/>
<property name="dispatchAsync" value="true"/>
<property name="sendAcksAsync" value="true"/>
<property name="useAsyncSend" value="true"/>
<property name="brokerURL" value="nio://138-ham-de:61616"/>
<property name="useDedicatedTaskRunner" value="false"/>
</bean>
</property>
</bean>
</property>
</bean>
Environment: Windows 7 and Linux Debian with JRE 1.6.24 or JRE 1.6.27
Affects Version/s: 5.5.1
> Not all messages will be consumed when optimizeAcknowledge is true
> ------------------------------------------------------------------
>
> Key: AMQ-3664
> URL: https://issues.apache.org/jira/browse/AMQ-3664
> Project: ActiveMQ
> Issue Type: Bug
> Components: Broker
> Affects Versions: 5.5.1
> Environment: Windows 7 and Linux Debian with JRE 1.6.24 or JRE 1.6.27
> Reporter: Matthias Wessel
> Priority: Critical
>
> I make performance test with activemq. When I set optimizeAcknowledge = true I get a dramatic performance improvement, but when I shut down the producer the consumer does not acknowledge all messages! If I stop the consumer and then I start the consumer a second time the consumer recieves messages again and again not all messages will be acknoledged in the queue.
> I am using camel 2.9.0 to produce and consume the messages.
> I am using the consumer Template with asyncSendBody.
> The following route is configured in the camelContext:
> <camel:camelContext id="camelContext">
> <camel:template id="producerTemplate"/>
> <camel:consumerTemplate id="consumerTemplate"/>
> <camel:route>
> <camel:from uri="jms:queue0?concurrentConsumers=3&maxConcurrentConsumers=10&asyncConsumer=true"/>
> <camel:to uri="beanConsumer"/>
> </camel:route>
> </camel:camelContext>
> The config for the ActiveMQComponent:
> <bean id="jms" class="org.apache.activemq.camel.component.ActiveMQComponent">
> <property name="connectionFactory">
> <bean class="org.apache.activemq.pool.PooledConnectionFactory">
> <property name="connectionFactory">
> <bean class="org.apache.activemq.spring.ActiveMQConnectionFactory">
> <property name="optimizeAcknowledge" value="true"/>
> <property name="dispatchAsync" value="true"/>
> <property name="sendAcksAsync" value="true"/>
> <property name="useAsyncSend" value="true"/>
> <property name="brokerURL" value="nio://138-ham-de:61616"/>
> <property name="useDedicatedTaskRunner" value="false"/>
> </bean>
> </property>
> </bean>
> </property>
> </bean>
> I think, the problem is here:
> private void afterMessageIsConsumed(MessageDispatch md, boolean messageExpired) throws JMSException {
> if (unconsumedMessages.isClosed()) {
> return;
> }
> if (messageExpired) {
> synchronized (deliveredMessages) {
> deliveredMessages.remove(md);
> }
> stats.getExpiredMessageCount().increment();
> ackLater(md, MessageAck.DELIVERED_ACK_TYPE);
> } else {
> stats.onMessage();
> if (session.getTransacted()) {
> // Do nothing.
> } else if (isAutoAcknowledgeEach()) {
> if (deliveryingAcknowledgements.compareAndSet(false, true)) {
> synchronized (deliveredMessages) {
> if (!deliveredMessages.isEmpty()) {
> if (optimizeAcknowledge) {
> ackCounter++;
> if (ackCounter >= (info.getPrefetchSize() * .65) || System.currentTimeMillis() >= (optimizeAckTimestamp + optimizeAckTimeout)) {
> MessageAck ack = makeAckForAllDeliveredMessages(MessageAck.STANDARD_ACK_TYPE);
> if (ack != null) {
> deliveredMessages.clear();
> ackCounter = 0;
> session.sendAck(ack);
> optimizeAckTimestamp = System.currentTimeMillis();
> }
> }
> } else {
> MessageAck ack = makeAckForAllDeliveredMessages(MessageAck.STANDARD_ACK_TYPE);
> if (ack!=null) {
> deliveredMessages.clear();
> session.sendAck(ack);
> }
> }
> }
> }
> deliveryingAcknowledgements.set(false);
> }
> } else if (isAutoAcknowledgeBatch()) {
> ackLater(md, MessageAck.STANDARD_ACK_TYPE);
> } else if (session.isClientAcknowledge()||session.isIndividualAcknowledge()) {
> boolean messageUnackedByConsumer = false;
> synchronized (deliveredMessages) {
> messageUnackedByConsumer = deliveredMessages.contains(md);
> }
> if (messageUnackedByConsumer) {
> ackLater(md, MessageAck.DELIVERED_ACK_TYPE);
> }
> }
> else {
> throw new IllegalStateException("Invalid session state.");
> }
> }
> }
> What will happen when no producer will send a message to this queue so that no message will pass this method? When will the deliveredMessages been acked?
--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira