You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2012/10/29 19:41:44 UTC
svn commit: r1403459 -
/activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java
Author: chirino
Date: Mon Oct 29 18:41:44 2012
New Revision: 1403459
URL: http://svn.apache.org/viewvc?rev=1403459&view=rev
Log:
Fixes problem /w amqp impl of message acks.
Modified:
activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java
Modified: activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java?rev=1403459&r1=1403458&r2=1403459&view=diff
==============================================================================
--- activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java (original)
+++ activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java Mon Oct 29 18:41:44 2012
@@ -482,7 +482,7 @@ class AmqpProtocolConverter {
final ActiveMQMessage message = (ActiveMQMessage) getInboundTransformer().transform(em);
current = null;
- if( message.getDestination()==null ) {
+ if( destination!=null ) {
message.setJMSDestination(destination);
}
message.setProducerId(producerId);
@@ -777,6 +777,7 @@ class AmqpProtocolConverter {
ack.setLastMessageId(md.getMessage().getMessageId());
ack.setMessageCount(1);
ack.setAckType((byte)ackType);
+ ack.setDestination(md.getDestination());
DeliveryState remoteState = delivery.getRemoteState();
if( remoteState!=null && remoteState instanceof TransactionalState) {
@@ -788,7 +789,15 @@ class AmqpProtocolConverter {
sendToActiveMQ(ack, new ResponseHandler() {
@Override
public void onResponse(AmqpProtocolConverter converter, Response response) throws IOException {
- delivery.settle();
+ if( response.isException() ) {
+ if (response.isException()) {
+ Throwable exception = ((ExceptionResponse) response).getException();
+ exception.printStackTrace();
+ sender.close();
+ }
+ } else {
+ delivery.settle();
+ }
pumpProtonToSocket();
}
});
@@ -885,6 +894,7 @@ class AmqpProtocolConverter {
sender.open();
if (response.isException()) {
Throwable exception = ((ExceptionResponse) response).getException();
+ exception.printStackTrace();
sender.close();
}
pumpProtonToSocket();