You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ro...@apache.org on 2015/11/03 13:45:19 UTC

qpid-proton git commit: PROTON-971: record the in-progress delivery on the transport link, don't send transfers for new deliveries until the existing one is completed

Repository: qpid-proton
Updated Branches:
  refs/heads/master cfd73cd07 -> a94e63515


PROTON-971: record the in-progress delivery on the transport link, don't send transfers for new deliveries until the existing one is completed


Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/a94e6351
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/a94e6351
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/a94e6351

Branch: refs/heads/master
Commit: a94e635152e5ab32c3dbe5ea87f88173d01f4ce1
Parents: cfd73cd
Author: Robert Gemmell <ro...@apache.org>
Authored: Tue Nov 3 12:11:51 2015 +0000
Committer: Robert Gemmell <ro...@apache.org>
Committed: Tue Nov 3 12:15:40 2015 +0000

----------------------------------------------------------------------
 .../qpid/proton/engine/impl/TransportImpl.java  | 19 +++++-
 .../proton/engine/impl/TransportSender.java     | 10 +++
 tests/python/proton_tests/engine.py             | 69 +++++++++++++++++++-
 3 files changed, 96 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/a94e6351/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java
----------------------------------------------------------------------
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java
index 7faadc6..f318319 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java
@@ -517,7 +517,7 @@ public class TransportImpl extends EndpointImpl
     private boolean processTransportWorkSender(DeliveryImpl delivery,
                                                SenderImpl snd)
     {
-        TransportLink<SenderImpl> tpLink = snd.getTransportLink();
+        TransportSender tpLink = snd.getTransportLink();
         SessionImpl session = snd.getSession();
         TransportSession tpSession = session.getTransportSession();
 
@@ -529,6 +529,16 @@ public class TransportImpl extends EndpointImpl
            tpSession.isLocalChannelSet() &&
            tpLink.getLocalHandle() != null && !_frameWriter.isFull())
         {
+            DeliveryImpl inProgress = tpLink.getInProgressDelivery();
+            if(inProgress != null){
+                // There is an existing Delivery awaiting completion. Check it
+                // is the same Delivery object given and return if not, as we
+                // can't interleave Transfer frames for deliveries on a link.
+                if(inProgress != delivery) {
+                    return false;
+                }
+            }
+
             UnsignedInteger deliveryId = tpSession.getOutgoingDeliveryId();
             TransportDelivery tpDelivery = new TransportDelivery(deliveryId, delivery, tpLink);
             delivery.setTransportDelivery(tpDelivery);
@@ -575,6 +585,9 @@ public class TransportImpl extends EndpointImpl
                 delivery.setDataLength(0);
 
                 if (!transfer.getMore()) {
+                    // Clear the in-progress delivery marker
+                    tpLink.setInProgressDelivery(null);
+
                     delivery.setDone();
                     tpLink.setDeliveryCount(tpLink.getDeliveryCount().add(UnsignedInteger.ONE));
                     tpLink.setLinkCredit(tpLink.getLinkCredit().subtract(UnsignedInteger.ONE));
@@ -589,6 +602,10 @@ public class TransportImpl extends EndpointImpl
                 delivery.setDataOffset(delivery.getDataOffset() + delta);
                 delivery.setDataLength(payload.remaining());
                 session.incrementOutgoingBytes(-delta);
+
+                // Remember the delivery we are still processing
+                // the body transfer frames for
+                tpLink.setInProgressDelivery(delivery);
             }
 
             if (snd.getLocalState() != EndpointState.CLOSED) {

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/a94e6351/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportSender.java
----------------------------------------------------------------------
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportSender.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportSender.java
index 26c39f5..cebe577 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportSender.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportSender.java
@@ -27,6 +27,7 @@ import org.apache.qpid.proton.amqp.transport.Flow;
 class TransportSender extends TransportLink<SenderImpl>
 {
     private boolean _drain;
+    private DeliveryImpl _inProgressDelivery;
     private static final UnsignedInteger ORIGINAL_DELIVERY_COUNT = UnsignedInteger.ZERO;
 
     TransportSender(SenderImpl link)
@@ -57,4 +58,13 @@ class TransportSender extends TransportLink<SenderImpl>
         setLinkCredit(linkCredit);
     }
 
+    public void setInProgressDelivery(DeliveryImpl inProgressDelivery)
+    {
+        _inProgressDelivery = inProgressDelivery;
+    }
+
+    public DeliveryImpl getInProgressDelivery()
+    {
+        return _inProgressDelivery;
+    }
 }

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/a94e6351/tests/python/proton_tests/engine.py
----------------------------------------------------------------------
diff --git a/tests/python/proton_tests/engine.py b/tests/python/proton_tests/engine.py
index 7946d2c..0a6eb8d 100644
--- a/tests/python/proton_tests/engine.py
+++ b/tests/python/proton_tests/engine.py
@@ -1100,7 +1100,74 @@ class MaxFrameTransferTest(Test):
 
     binary = self.rcv.recv(1024)
     assert binary == None
-    
+
+  def testSendQueuedMultiFrameMessages(self, sendSingleFrameMsg = False):
+    """
+    Test that multiple queued messages on the same link
+    with multi-frame content are sent correctly. Use an
+    odd max frame size, send enough data to use many.
+    """
+    self.snd, self.rcv = self.link("test-link", max_frame=[0,517])
+    self.c1 = self.snd.session.connection
+    self.c2 = self.rcv.session.connection
+    self.snd.open()
+    self.rcv.open()
+    self.pump()
+    assert self.rcv.session.connection.transport.max_frame_size == 517
+    assert self.snd.session.connection.transport.remote_max_frame_size == 517
+
+    self.rcv.flow(5)
+
+    self.pump()
+
+    # Send a delivery with 5 frames, all bytes as X1234
+    self.snd.delivery("tag")
+    msg = ("X1234" * 425).encode('utf-8')
+    assert 2125 == len(msg)
+    n = self.snd.send(msg)
+    assert n == len(msg)
+    assert self.snd.advance()
+
+    # Send a delivery with 5 frames, all bytes as Y5678
+    self.snd.delivery("tag2")
+    msg2 = ("Y5678" * 425).encode('utf-8')
+    assert 2125 == len(msg2)
+    n = self.snd.send(msg2)
+    assert n == len(msg2)
+    assert self.snd.advance()
+
+    self.pump()
+
+    if sendSingleFrameMsg:
+        # Send a delivery with 1 frame
+        self.snd.delivery("tag3")
+        msg3 = ("Z").encode('utf-8')
+        assert 1 == len(msg3)
+        n = self.snd.send(msg3)
+        assert n == len(msg3)
+        assert self.snd.advance()
+        self.pump()
+
+    binary = self.rcv.recv(5000)
+    self.assertEqual(binary, msg)
+
+    self.rcv.advance()
+
+    binary2 = self.rcv.recv(5000)
+    self.assertEqual(binary2, msg2)
+
+    self.rcv.advance()
+
+    if sendSingleFrameMsg:
+        binary3 = self.rcv.recv(5000)
+        self.assertEqual(binary3, msg3)
+        self.rcv.advance()
+
+    self.pump()
+
+  def testSendQueuedMultiFrameMessagesThenSingleFrameMessage(self):
+    self.testSendQueuedMultiFrameMessages(sendSingleFrameMsg = True)
+
   def testBigMessage(self):
     """
     Test transfering a big message.


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org