You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2012/11/13 21:51:14 UTC
svn commit: r1408952 - in
/activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp:
AmqpProtocolHandler.scala hawtdispatch/api/AmqpSender.java
hawtdispatch/api/MessageDelivery.java
Author: chirino
Date: Tue Nov 13 20:51:13 2012
New Revision: 1408952
URL: http://svn.apache.org/viewvc?rev=1408952&view=rev
Log:
Fixing amqp impl bugs.
Modified:
activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpProtocolHandler.scala
activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/api/AmqpSender.java
activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/api/MessageDelivery.java
Modified: activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpProtocolHandler.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpProtocolHandler.scala?rev=1408952&r1=1408951&r2=1408952&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpProtocolHandler.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpProtocolHandler.scala Tue Nov 13 20:51:13 2012
@@ -949,6 +949,10 @@ class AmqpProtocolHandler extends Protoc
val state = proton_delivery.getRemoteState();
state match {
case null =>
+ if( !proton_delivery.remotelySettled() ) {
+ proton_delivery.disposition(new Accepted());
+ }
+ settle(proton_delivery, Consumed, false);
case accepted:Accepted =>
if( !proton_delivery.remotelySettled() ) {
proton_delivery.disposition(new Accepted());
Modified: activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/api/AmqpSender.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/api/AmqpSender.java?rev=1408952&r1=1408951&r2=1408952&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/api/AmqpSender.java (original)
+++ activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/api/AmqpSender.java Tue Nov 13 20:51:13 2012
@@ -162,12 +162,13 @@ public class AmqpSender extends AmqpLink
@Override
protected void processDelivery(Delivery delivery) {
final MessageDelivery md = (MessageDelivery) delivery.getContext();
- if( delivery.remotelySettled() && delivery.getTag().length > 0 ) {
- checkinTag(delivery.getTag());
- }
- final DeliveryState state = delivery.getRemoteState();
- if( state!=null ) {
- if( state instanceof Accepted) {
+ if( delivery.remotelySettled() ) {
+ if( delivery.getTag().length > 0 ) {
+ checkinTag(delivery.getTag());
+ }
+
+ final DeliveryState state = delivery.getRemoteState();
+ if( state==null || state instanceof Accepted) {
if( !delivery.remotelySettled() ) {
delivery.disposition(new Accepted());
}
Modified: activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/api/MessageDelivery.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/api/MessageDelivery.java?rev=1408952&r1=1408951&r2=1408952&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/api/MessageDelivery.java (original)
+++ activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/api/MessageDelivery.java Tue Nov 13 20:51:13 2012
@@ -201,7 +201,7 @@ public abstract class MessageDelivery ex
addWatch(new Watch() {
@Override
public boolean execute() {
- if( delivery!=null && (delivery.isSettled() || delivery.remotelySettled()) ) {
+ if( delivery!=null && delivery.isSettled() ) {
cb.onSuccess(delivery.getRemoteState());
return true;
}