You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2012/11/13 21:51:14 UTC

svn commit: r1408952 - in /activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp: AmqpProtocolHandler.scala hawtdispatch/api/AmqpSender.java hawtdispatch/api/MessageDelivery.java

Author: chirino
Date: Tue Nov 13 20:51:13 2012
New Revision: 1408952

URL: http://svn.apache.org/viewvc?rev=1408952&view=rev
Log:
Fixing amqp impl bugs.

Modified:
    activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpProtocolHandler.scala
    activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/api/AmqpSender.java
    activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/api/MessageDelivery.java

Modified: activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpProtocolHandler.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpProtocolHandler.scala?rev=1408952&r1=1408951&r2=1408952&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpProtocolHandler.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpProtocolHandler.scala Tue Nov 13 20:51:13 2012
@@ -949,6 +949,10 @@ class AmqpProtocolHandler extends Protoc
       val state = proton_delivery.getRemoteState();
       state match {
         case null =>
+          if( !proton_delivery.remotelySettled() ) {
+              proton_delivery.disposition(new Accepted());
+          }
+          settle(proton_delivery, Consumed, false);
         case accepted:Accepted =>
           if( !proton_delivery.remotelySettled() ) {
               proton_delivery.disposition(new Accepted());

Modified: activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/api/AmqpSender.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/api/AmqpSender.java?rev=1408952&r1=1408951&r2=1408952&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/api/AmqpSender.java (original)
+++ activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/api/AmqpSender.java Tue Nov 13 20:51:13 2012
@@ -162,12 +162,13 @@ public class AmqpSender extends AmqpLink
     @Override
     protected void processDelivery(Delivery delivery) {
         final MessageDelivery md  = (MessageDelivery) delivery.getContext();
-        if( delivery.remotelySettled() && delivery.getTag().length > 0 ) {
-            checkinTag(delivery.getTag());
-        }
-        final DeliveryState state = delivery.getRemoteState();
-        if( state!=null ) {
-            if( state instanceof Accepted) {
+        if( delivery.remotelySettled() ) {
+            if( delivery.getTag().length > 0 ) {
+                checkinTag(delivery.getTag());
+            }
+
+            final DeliveryState state = delivery.getRemoteState();
+            if( state==null || state instanceof Accepted) {
                 if( !delivery.remotelySettled() ) {
                     delivery.disposition(new Accepted());
                 }

Modified: activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/api/MessageDelivery.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/api/MessageDelivery.java?rev=1408952&r1=1408951&r2=1408952&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/api/MessageDelivery.java (original)
+++ activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/api/MessageDelivery.java Tue Nov 13 20:51:13 2012
@@ -201,7 +201,7 @@ public abstract class MessageDelivery ex
         addWatch(new Watch() {
             @Override
             public boolean execute() {
-                if( delivery!=null && (delivery.isSettled() || delivery.remotelySettled()) ) {
+                if( delivery!=null && delivery.isSettled() ) {
                     cb.onSuccess(delivery.getRemoteState());
                     return true;
                 }