You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2006/04/18 07:50:41 UTC

svn commit: r394851 - /incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java

Author: chirino
Date: Mon Apr 17 22:50:40 2006
New Revision: 394851

URL: http://svn.apache.org/viewcvs?rev=394851&view=rev
Log:
Properly handle a rollback() when call from an onMessaqge()


Modified:
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java?rev=394851&r1=394850&r2=394851&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java Mon Apr 17 22:50:40 2006
@@ -602,10 +602,14 @@
         this.info.setCurrentPrefetchSize(prefetch);
     }
 
-    private void beforeMessageIsConsumed(MessageDispatch md) {
+    private void beforeMessageIsConsumed(MessageDispatch md) throws JMSException {
         md.setDeliverySequenceId(session.getNextDeliveryId());
-        if (!session.isDupsOkAcknowledge())
+        if (!session.isDupsOkAcknowledge()) {
             deliveredMessages.addFirst(md);
+            if( session.isTransacted() ) {
+                ackLater(md,MessageAck.DELIVERED_ACK_TYPE);
+            }
+        }
     }
 
     private void afterMessageIsConsumed(MessageDispatch md,boolean messageExpired) throws JMSException{
@@ -615,9 +619,8 @@
             ackLater(md,MessageAck.DELIVERED_ACK_TYPE);
         }else{
             stats.onMessage();
-            if(session.isTransacted()){
-                ackLater(md,MessageAck.DELIVERED_ACK_TYPE);
-            }else if(session.isAutoAcknowledge()){
+            if( session.isTransacted() ) {
+            } else if(session.isAutoAcknowledge()) {
                 if(!deliveredMessages.isEmpty()){
                     if(optimizeAcknowledge){
                         if(deliveryingAcknowledgements.compareAndSet(false,true)){
@@ -636,11 +639,11 @@
                         deliveredMessages.clear();
                     }
                 }
-            }else if(session.isDupsOkAcknowledge()){
+            } else if(session.isDupsOkAcknowledge()){
                 ackLater(md,MessageAck.STANDARD_ACK_TYPE);
-            }else if(session.isClientAcknowledge()){
+            } else if(session.isClientAcknowledge()){
                 ackLater(md,MessageAck.DELIVERED_ACK_TYPE);
-            }else{
+            } else{
                 throw new IllegalStateException("Invalid session state.");
             }
         }