You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2006/04/18 07:50:41 UTC
svn commit: r394851 -
/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
Author: chirino
Date: Mon Apr 17 22:50:40 2006
New Revision: 394851
URL: http://svn.apache.org/viewcvs?rev=394851&view=rev
Log:
Properly handle a rollback() when call from an onMessaqge()
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java?rev=394851&r1=394850&r2=394851&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java Mon Apr 17 22:50:40 2006
@@ -602,10 +602,14 @@
this.info.setCurrentPrefetchSize(prefetch);
}
- private void beforeMessageIsConsumed(MessageDispatch md) {
+ private void beforeMessageIsConsumed(MessageDispatch md) throws JMSException {
md.setDeliverySequenceId(session.getNextDeliveryId());
- if (!session.isDupsOkAcknowledge())
+ if (!session.isDupsOkAcknowledge()) {
deliveredMessages.addFirst(md);
+ if( session.isTransacted() ) {
+ ackLater(md,MessageAck.DELIVERED_ACK_TYPE);
+ }
+ }
}
private void afterMessageIsConsumed(MessageDispatch md,boolean messageExpired) throws JMSException{
@@ -615,9 +619,8 @@
ackLater(md,MessageAck.DELIVERED_ACK_TYPE);
}else{
stats.onMessage();
- if(session.isTransacted()){
- ackLater(md,MessageAck.DELIVERED_ACK_TYPE);
- }else if(session.isAutoAcknowledge()){
+ if( session.isTransacted() ) {
+ } else if(session.isAutoAcknowledge()) {
if(!deliveredMessages.isEmpty()){
if(optimizeAcknowledge){
if(deliveryingAcknowledgements.compareAndSet(false,true)){
@@ -636,11 +639,11 @@
deliveredMessages.clear();
}
}
- }else if(session.isDupsOkAcknowledge()){
+ } else if(session.isDupsOkAcknowledge()){
ackLater(md,MessageAck.STANDARD_ACK_TYPE);
- }else if(session.isClientAcknowledge()){
+ } else if(session.isClientAcknowledge()){
ackLater(md,MessageAck.DELIVERED_ACK_TYPE);
- }else{
+ } else{
throw new IllegalStateException("Invalid session state.");
}
}