You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ro...@apache.org on 2015/11/03 13:45:19 UTC
qpid-proton git commit: PROTON-971: record the in-progress delivery
on the transport link,
don't send transfers for new deliveries until the existing one is completed
Repository: qpid-proton
Updated Branches:
refs/heads/master cfd73cd07 -> a94e63515
PROTON-971: record the in-progress delivery on the transport link, don't send transfers for new deliveries until the existing one is completed
Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/a94e6351
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/a94e6351
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/a94e6351
Branch: refs/heads/master
Commit: a94e635152e5ab32c3dbe5ea87f88173d01f4ce1
Parents: cfd73cd
Author: Robert Gemmell <ro...@apache.org>
Authored: Tue Nov 3 12:11:51 2015 +0000
Committer: Robert Gemmell <ro...@apache.org>
Committed: Tue Nov 3 12:15:40 2015 +0000
----------------------------------------------------------------------
.../qpid/proton/engine/impl/TransportImpl.java | 19 +++++-
.../proton/engine/impl/TransportSender.java | 10 +++
tests/python/proton_tests/engine.py | 69 +++++++++++++++++++-
3 files changed, 96 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/a94e6351/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java
----------------------------------------------------------------------
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java
index 7faadc6..f318319 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java
@@ -517,7 +517,7 @@ public class TransportImpl extends EndpointImpl
private boolean processTransportWorkSender(DeliveryImpl delivery,
SenderImpl snd)
{
- TransportLink<SenderImpl> tpLink = snd.getTransportLink();
+ TransportSender tpLink = snd.getTransportLink();
SessionImpl session = snd.getSession();
TransportSession tpSession = session.getTransportSession();
@@ -529,6 +529,16 @@ public class TransportImpl extends EndpointImpl
tpSession.isLocalChannelSet() &&
tpLink.getLocalHandle() != null && !_frameWriter.isFull())
{
+ DeliveryImpl inProgress = tpLink.getInProgressDelivery();
+ if(inProgress != null){
+ // There is an existing Delivery awaiting completion. Check it
+ // is the same Delivery object given and return if not, as we
+ // can't interleave Transfer frames for deliveries on a link.
+ if(inProgress != delivery) {
+ return false;
+ }
+ }
+
UnsignedInteger deliveryId = tpSession.getOutgoingDeliveryId();
TransportDelivery tpDelivery = new TransportDelivery(deliveryId, delivery, tpLink);
delivery.setTransportDelivery(tpDelivery);
@@ -575,6 +585,9 @@ public class TransportImpl extends EndpointImpl
delivery.setDataLength(0);
if (!transfer.getMore()) {
+ // Clear the in-progress delivery marker
+ tpLink.setInProgressDelivery(null);
+
delivery.setDone();
tpLink.setDeliveryCount(tpLink.getDeliveryCount().add(UnsignedInteger.ONE));
tpLink.setLinkCredit(tpLink.getLinkCredit().subtract(UnsignedInteger.ONE));
@@ -589,6 +602,10 @@ public class TransportImpl extends EndpointImpl
delivery.setDataOffset(delivery.getDataOffset() + delta);
delivery.setDataLength(payload.remaining());
session.incrementOutgoingBytes(-delta);
+
+ // Remember the delivery we are still processing
+ // the body transfer frames for
+ tpLink.setInProgressDelivery(delivery);
}
if (snd.getLocalState() != EndpointState.CLOSED) {
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/a94e6351/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportSender.java
----------------------------------------------------------------------
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportSender.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportSender.java
index 26c39f5..cebe577 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportSender.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportSender.java
@@ -27,6 +27,7 @@ import org.apache.qpid.proton.amqp.transport.Flow;
class TransportSender extends TransportLink<SenderImpl>
{
private boolean _drain;
+ private DeliveryImpl _inProgressDelivery;
private static final UnsignedInteger ORIGINAL_DELIVERY_COUNT = UnsignedInteger.ZERO;
TransportSender(SenderImpl link)
@@ -57,4 +58,13 @@ class TransportSender extends TransportLink<SenderImpl>
setLinkCredit(linkCredit);
}
+ public void setInProgressDelivery(DeliveryImpl inProgressDelivery)
+ {
+ _inProgressDelivery = inProgressDelivery;
+ }
+
+ public DeliveryImpl getInProgressDelivery()
+ {
+ return _inProgressDelivery;
+ }
}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/a94e6351/tests/python/proton_tests/engine.py
----------------------------------------------------------------------
diff --git a/tests/python/proton_tests/engine.py b/tests/python/proton_tests/engine.py
index 7946d2c..0a6eb8d 100644
--- a/tests/python/proton_tests/engine.py
+++ b/tests/python/proton_tests/engine.py
@@ -1100,7 +1100,74 @@ class MaxFrameTransferTest(Test):
binary = self.rcv.recv(1024)
assert binary == None
-
+
+ def testSendQueuedMultiFrameMessages(self, sendSingleFrameMsg = False):
+ """
+ Test that multiple queued messages on the same link
+ with multi-frame content are sent correctly. Use an
+ odd max frame size, send enough data to use many.
+ """
+ self.snd, self.rcv = self.link("test-link", max_frame=[0,517])
+ self.c1 = self.snd.session.connection
+ self.c2 = self.rcv.session.connection
+ self.snd.open()
+ self.rcv.open()
+ self.pump()
+ assert self.rcv.session.connection.transport.max_frame_size == 517
+ assert self.snd.session.connection.transport.remote_max_frame_size == 517
+
+ self.rcv.flow(5)
+
+ self.pump()
+
+ # Send a delivery with 5 frames, all bytes as X1234
+ self.snd.delivery("tag")
+ msg = ("X1234" * 425).encode('utf-8')
+ assert 2125 == len(msg)
+ n = self.snd.send(msg)
+ assert n == len(msg)
+ assert self.snd.advance()
+
+ # Send a delivery with 5 frames, all bytes as Y5678
+ self.snd.delivery("tag2")
+ msg2 = ("Y5678" * 425).encode('utf-8')
+ assert 2125 == len(msg2)
+ n = self.snd.send(msg2)
+ assert n == len(msg2)
+ assert self.snd.advance()
+
+ self.pump()
+
+ if sendSingleFrameMsg:
+ # Send a delivery with 1 frame
+ self.snd.delivery("tag3")
+ msg3 = ("Z").encode('utf-8')
+ assert 1 == len(msg3)
+ n = self.snd.send(msg3)
+ assert n == len(msg3)
+ assert self.snd.advance()
+ self.pump()
+
+ binary = self.rcv.recv(5000)
+ self.assertEqual(binary, msg)
+
+ self.rcv.advance()
+
+ binary2 = self.rcv.recv(5000)
+ self.assertEqual(binary2, msg2)
+
+ self.rcv.advance()
+
+ if sendSingleFrameMsg:
+ binary3 = self.rcv.recv(5000)
+ self.assertEqual(binary3, msg3)
+ self.rcv.advance()
+
+ self.pump()
+
+ def testSendQueuedMultiFrameMessagesThenSingleFrameMessage(self):
+ self.testSendQueuedMultiFrameMessages(sendSingleFrameMsg = True)
+
def testBigMessage(self):
"""
Test transfering a big message.
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org