You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ra...@apache.org on 2008/05/16 20:29:34 UTC

svn commit: r657155 - /activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java

Author: rajdavies
Date: Fri May 16 11:29:33 2008
New Revision: 657155

URL: http://svn.apache.org/viewvc?rev=657155&view=rev
Log:
tidy up for https://issues.apache.org/activemq/browse/AMQ-1732

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java?rev=657155&r1=657154&r2=657155&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java Fri May 16 11:29:33 2008
@@ -477,13 +477,20 @@
                 m = ActiveMQMessageTransformation.transformMessage(transformedMessage, session.connection);
             }
         }
-        if (session.isClientAcknowledge() || session.isIndividualAcknowledge()) {
+        if (session.isClientAcknowledge()) {
             m.setAcknowledgeCallback(new Callback() {
                 public void execute() throws Exception {
                     session.checkClosed();
                     session.acknowledge();
                 }
             });
+        }else if (session.isIndividualAcknowledge()) {
+            m.setAcknowledgeCallback(new Callback() {
+                public void execute() throws Exception {
+                    session.checkClosed();
+                    acknowledge(md);
+                }
+            });
         }
         return m;
     }
@@ -765,15 +772,9 @@
                 }
             } else if (session.isDupsOkAcknowledge()) {
                 ackLater(md, MessageAck.STANDARD_ACK_TYPE);
-            } else if (session.isClientAcknowledge()) {
+            } else if (session.isClientAcknowledge()||session.isIndividualAcknowledge()) {
                 ackLater(md, MessageAck.DELIVERED_ACK_TYPE);
-            } else if (session.isIndividualAcknowledge()){
-            	MessageAck ack = new MessageAck(md,MessageAck.STANDARD_ACK_TYPE,deliveredMessages.size());
-                session.asyncSendPacket(ack);
-                synchronized(deliveredMessages){
-	                deliveredMessages.remove(md);
-                }
-            }
+            } 
             else {
                 throw new IllegalStateException("Invalid session state.");
             }
@@ -855,6 +856,14 @@
             }
         }
     }
+    
+    void acknowledge(MessageDispatch md) throws JMSException {
+        MessageAck ack = new MessageAck(md,MessageAck.STANDARD_ACK_TYPE,deliveredMessages.size());
+        session.asyncSendPacket(ack);
+        synchronized(deliveredMessages){
+            deliveredMessages.remove(md);
+        }
+    }
 
     public void commit() throws JMSException {
         synchronized (deliveredMessages) {