You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2015/11/13 12:39:09 UTC
svn commit: r1714190 - in
/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid:
amqp_1_0/transport/SendingLinkEndpoint.java
server/protocol/v1_0/ConsumerTarget_1_0.java
server/protocol/v1_0/SendingLink_1_0.java
Author: rgodfrey
Date: Fri Nov 13 11:39:09 2015
New Revision: 1714190
URL: http://svn.apache.org/viewvc?rev=1714190&view=rev
Log:
QPID-6844 : Fix link credit assignment for AMQP 1.0
Modified:
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/transport/SendingLinkEndpoint.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java
Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/transport/SendingLinkEndpoint.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/transport/SendingLinkEndpoint.java?rev=1714190&r1=1714189&r2=1714190&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/transport/SendingLinkEndpoint.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/transport/SendingLinkEndpoint.java Fri Nov 13 11:39:09 2015
@@ -81,27 +81,18 @@ public class SendingLinkEndpoint extends
return Role.SENDER;
}
- public boolean transfer(final Transfer xfr)
+ public boolean transfer(final Transfer xfr, final boolean decrementCredit)
{
SessionEndpoint s = getSession();
- int transferCount;
- transferCount = _lastDeliveryTag == null ? 1 : 1;
xfr.setMessageFormat(UnsignedInteger.ZERO);
synchronized(getLock())
{
-
- final int currentCredit = getLinkCredit().intValue() - transferCount;
-
- if(currentCredit < 0)
- {
- return false;
- }
- else
+ if(decrementCredit)
{
- setLinkCredit(UnsignedInteger.valueOf((int)currentCredit));
+ setLinkCredit(getLinkCredit().subtract(UnsignedInteger.ONE));
}
- setDeliveryCount(UnsignedInteger.valueOf((getDeliveryCount().intValue() + transferCount)));
+ setDeliveryCount(UnsignedInteger.valueOf((getDeliveryCount().intValue() + 1)));
xfr.setHandle(getLocalHandle());
Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java?rev=1714190&r1=1714189&r2=1714190&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java Fri Nov 13 11:39:09 2015
@@ -265,7 +265,7 @@ class ConsumerTarget_1_0 extends Abstrac
}
getSession().getAMQPConnection().registerMessageDelivered(message.getSize());
- getEndpoint().transfer(transfer);
+ getEndpoint().transfer(transfer, false);
}
else
{
@@ -307,7 +307,8 @@ class ConsumerTarget_1_0 extends Abstrac
{
suspend();
}
-
+ SendingLinkEndpoint linkEndpoint = _link.getEndpoint();
+ linkEndpoint.setLinkCredit(linkEndpoint.getLinkCredit().subtract(UnsignedInteger.ONE));
return hasCredit;
}
}
@@ -324,7 +325,11 @@ class ConsumerTarget_1_0 extends Abstrac
public void restoreCredit(final ServerMessage message)
{
- //TODO
+ synchronized (_link.getLock())
+ {
+ final SendingLinkEndpoint endpoint = _link.getEndpoint();
+ endpoint.setLinkCredit(endpoint.getLinkCredit().subtract(UnsignedInteger.ONE));
+ }
}
public void queueEmpty()
Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java?rev=1714190&r1=1714189&r2=1714190&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java Fri Nov 13 11:39:09 2015
@@ -492,7 +492,7 @@ public class SendingLink_1_0 implements
xfr.setDeliveryTag(dt);
xfr.setState(accepted);
xfr.setResume(Boolean.TRUE);
- getEndpoint().transfer(xfr);
+ getEndpoint().transfer(xfr, true);
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org