You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2012/10/29 19:41:44 UTC

svn commit: r1403459 - /activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java

Author: chirino
Date: Mon Oct 29 18:41:44 2012
New Revision: 1403459

URL: http://svn.apache.org/viewvc?rev=1403459&view=rev
Log:
Fixes problem /w amqp impl of message acks.

Modified:
    activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java

Modified: activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java?rev=1403459&r1=1403458&r2=1403459&view=diff
==============================================================================
--- activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java (original)
+++ activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java Mon Oct 29 18:41:44 2012
@@ -482,7 +482,7 @@ class AmqpProtocolConverter {
             final ActiveMQMessage message = (ActiveMQMessage) getInboundTransformer().transform(em);
             current = null;
 
-            if( message.getDestination()==null ) {
+            if( destination!=null ) {
                 message.setJMSDestination(destination);
             }
             message.setProducerId(producerId);
@@ -777,6 +777,7 @@ class AmqpProtocolConverter {
                 ack.setLastMessageId(md.getMessage().getMessageId());
                 ack.setMessageCount(1);
                 ack.setAckType((byte)ackType);
+                ack.setDestination(md.getDestination());
 
                 DeliveryState remoteState = delivery.getRemoteState();
                 if( remoteState!=null && remoteState instanceof TransactionalState) {
@@ -788,7 +789,15 @@ class AmqpProtocolConverter {
                 sendToActiveMQ(ack, new ResponseHandler() {
                     @Override
                     public void onResponse(AmqpProtocolConverter converter, Response response) throws IOException {
-                        delivery.settle();
+                        if( response.isException() ) {
+                            if (response.isException()) {
+                                Throwable exception = ((ExceptionResponse) response).getException();
+                                exception.printStackTrace();
+                                sender.close();
+                            }
+                        } else {
+                            delivery.settle();
+                        }
                         pumpProtonToSocket();
                     }
                 });
@@ -885,6 +894,7 @@ class AmqpProtocolConverter {
                 sender.open();
                 if (response.isException()) {
                     Throwable exception = ((ExceptionResponse) response).getException();
+                    exception.printStackTrace();
                     sender.close();
                 }
                 pumpProtonToSocket();