You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2012/12/04 15:02:41 UTC

svn commit: r1416944 - in /qpid/proton/trunk: proton-c/src/engine/engine.c tests/proton_tests/engine.py

Author: gsim
Date: Tue Dec  4 14:02:39 2012
New Revision: 1416944

URL: http://svn.apache.org/viewvc?rev=1416944&view=rev
Log:
PROTON-157: Ensure deliveries on tpwork queue are treated in correct order

Modified:
    qpid/proton/trunk/proton-c/src/engine/engine.c
    qpid/proton/trunk/tests/proton_tests/engine.py

Modified: qpid/proton/trunk/proton-c/src/engine/engine.c
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/engine/engine.c?rev=1416944&r1=1416943&r2=1416944&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/engine/engine.c (original)
+++ qpid/proton/trunk/proton-c/src/engine/engine.c Tue Dec  4 14:02:39 2012
@@ -2119,16 +2119,18 @@ int pn_post_disp(pn_transport_t *transpo
   return 0;
 }
 
-int pn_process_tpwork_sender(pn_transport_t *transport, pn_delivery_t *delivery)
+int pn_process_tpwork_sender(pn_transport_t *transport, pn_delivery_t *delivery, bool* allocation_blocked)
 {
   pn_link_t *link = delivery->link;
   pn_session_state_t *ssn_state = pn_session_get_state(transport, link->session);
   pn_link_state_t *link_state = pn_link_get_state(ssn_state, link);
   if ((int16_t) ssn_state->local_channel >= 0 && (int32_t) link_state->local_handle >= 0) {
     pn_delivery_state_t *state = (pn_delivery_state_t *) delivery->transport_context;
-    if (!state && pn_delivery_buffer_available(&ssn_state->outgoing)) {
+    if (!(*allocation_blocked) && !state && pn_delivery_buffer_available(&ssn_state->outgoing)) {
       state = pn_delivery_buffer_push(&ssn_state->outgoing, delivery);
       delivery->transport_context = state;
+    } else {
+      *allocation_blocked = true;
     }
 
     if (state && !state->sent && (delivery->done || pn_buffer_size(delivery->bytes) > 0) &&
@@ -2200,6 +2202,7 @@ int pn_process_tpwork(pn_transport_t *tr
   {
     pn_connection_t *conn = (pn_connection_t *) endpoint;
     pn_delivery_t *delivery = conn->tpwork_head;
+    bool allocation_blocked = false;
     while (delivery)
     {
       if (!delivery->transport_context && transport->disp->available > 0) {
@@ -2208,7 +2211,7 @@ int pn_process_tpwork(pn_transport_t *tr
 
       pn_link_t *link = delivery->link;
       if (pn_link_is_sender(link)) {
-        int err = pn_process_tpwork_sender(transport, delivery);
+        int err = pn_process_tpwork_sender(transport, delivery, &allocation_blocked);
         if (err) return err;
       } else {
         int err = pn_process_tpwork_receiver(transport, delivery);

Modified: qpid/proton/trunk/tests/proton_tests/engine.py
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/tests/proton_tests/engine.py?rev=1416944&r1=1416943&r2=1416944&view=diff
==============================================================================
--- qpid/proton/trunk/tests/proton_tests/engine.py (original)
+++ qpid/proton/trunk/tests/proton_tests/engine.py Tue Dec  4 14:02:39 2012
@@ -29,10 +29,10 @@ from proton import *
 
 OUTPUT_SIZE = 10*1024
 
-def pump(t1, t2):
+def pump(t1, t2, buffer_size=OUTPUT_SIZE):
   while True:
-    out1 = t1.output(OUTPUT_SIZE)
-    out2 = t2.output(OUTPUT_SIZE)
+    out1 = t1.output(buffer_size)
+    out2 = t2.output(buffer_size)
 
     if out1 or out2:
       if out1:
@@ -98,9 +98,9 @@ class Test(common.Test):
   def cleanup(self):
     pass
 
-  def pump(self):
+  def pump(self, buffer_size=OUTPUT_SIZE):
     for c1, t1, c2, t2 in self._wires:
-      pump(t1, t2)
+      pump(t1, t2, buffer_size)
 
 class ConnectionTest(Test):
 
@@ -575,6 +575,71 @@ class TransferTest(Test):
 
     assert sd.local_state == rd.remote_state == Delivery.ACCEPTED
 
+  def test_delivery_id_ordering(self):
+    self.rcv.flow(1024)
+    self.pump(buffer_size=64*1024)
+
+    #fill up delivery buffer on sender
+    for m in range(1024):
+      sd = self.snd.delivery("tag%s" % m)
+      msg = "message %s" % m
+      n = self.snd.send(msg)
+      assert n == len(msg)
+      assert self.snd.advance()
+
+    self.pump(buffer_size=64*1024)
+
+    #receive a session-windows worth of messages and accept them
+    for m in range(1024):
+      rd = self.rcv.current
+      assert rd is not None, m
+      assert rd.tag == ("tag%s" % m), (rd.tag, m)
+      msg = self.rcv.recv(1024)
+      assert msg == ("message %s" % m), (msg, m)
+      rd.update(Delivery.ACCEPTED)
+      rd.settle()
+
+    self.pump(buffer_size=64*1024)
+
+    #add some new deliveries
+    for m in range(1024, 1450):
+      sd = self.snd.delivery("tag%s" % m)
+      msg = "message %s" % m
+      n = self.snd.send(msg)
+      assert n == len(msg)
+      assert self.snd.advance()
+
+    #handle all disposition changes to sent messages
+    d = self.c1.work_head
+    while d:
+      if d.updated:
+        d.update(Delivery.ACCEPTED)
+        d.settle()
+      d = d.work_next
+
+    #submit some more deliveries
+    for m in range(1450, 1500):
+      sd = self.snd.delivery("tag%s" % m)
+      msg = "message %s" % m
+      n = self.snd.send(msg)
+      assert n == len(msg)
+      assert self.snd.advance()
+
+    self.pump(buffer_size=64*1024)
+    self.rcv.flow(1024)
+    self.pump(buffer_size=64*1024)
+
+    #verify remaining messages can be received and accepted
+    for m in range(1024, 1500):
+      rd = self.rcv.current
+      assert rd is not None, m
+      assert rd.tag == ("tag%s" % m), (rd.tag, m)
+      msg = self.rcv.recv(1024)
+      assert msg == ("message %s" % m), (msg, m)
+      rd.update(Delivery.ACCEPTED)
+      rd.settle()
+
+
 class MaxFrameTransferTest(Test):
 
   def setup(self):



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org