You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ra...@apache.org on 2008/05/16 20:29:34 UTC
svn commit: r657155 -
/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
Author: rajdavies
Date: Fri May 16 11:29:33 2008
New Revision: 657155
URL: http://svn.apache.org/viewvc?rev=657155&view=rev
Log:
tidy up for https://issues.apache.org/activemq/browse/AMQ-1732
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java?rev=657155&r1=657154&r2=657155&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java Fri May 16 11:29:33 2008
@@ -477,13 +477,20 @@
m = ActiveMQMessageTransformation.transformMessage(transformedMessage, session.connection);
}
}
- if (session.isClientAcknowledge() || session.isIndividualAcknowledge()) {
+ if (session.isClientAcknowledge()) {
m.setAcknowledgeCallback(new Callback() {
public void execute() throws Exception {
session.checkClosed();
session.acknowledge();
}
});
+ }else if (session.isIndividualAcknowledge()) {
+ m.setAcknowledgeCallback(new Callback() {
+ public void execute() throws Exception {
+ session.checkClosed();
+ acknowledge(md);
+ }
+ });
}
return m;
}
@@ -765,15 +772,9 @@
}
} else if (session.isDupsOkAcknowledge()) {
ackLater(md, MessageAck.STANDARD_ACK_TYPE);
- } else if (session.isClientAcknowledge()) {
+ } else if (session.isClientAcknowledge()||session.isIndividualAcknowledge()) {
ackLater(md, MessageAck.DELIVERED_ACK_TYPE);
- } else if (session.isIndividualAcknowledge()){
- MessageAck ack = new MessageAck(md,MessageAck.STANDARD_ACK_TYPE,deliveredMessages.size());
- session.asyncSendPacket(ack);
- synchronized(deliveredMessages){
- deliveredMessages.remove(md);
- }
- }
+ }
else {
throw new IllegalStateException("Invalid session state.");
}
@@ -855,6 +856,14 @@
}
}
}
+
+ void acknowledge(MessageDispatch md) throws JMSException {
+ MessageAck ack = new MessageAck(md,MessageAck.STANDARD_ACK_TYPE,deliveredMessages.size());
+ session.asyncSendPacket(ack);
+ synchronized(deliveredMessages){
+ deliveredMessages.remove(md);
+ }
+ }
public void commit() throws JMSException {
synchronized (deliveredMessages) {