You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2012/12/04 15:02:41 UTC
svn commit: r1416944 - in /qpid/proton/trunk: proton-c/src/engine/engine.c
tests/proton_tests/engine.py
Author: gsim
Date: Tue Dec 4 14:02:39 2012
New Revision: 1416944
URL: http://svn.apache.org/viewvc?rev=1416944&view=rev
Log:
PROTON-157: Ensure deliveries on tpwork queue are treated in correct order
Modified:
qpid/proton/trunk/proton-c/src/engine/engine.c
qpid/proton/trunk/tests/proton_tests/engine.py
Modified: qpid/proton/trunk/proton-c/src/engine/engine.c
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/engine/engine.c?rev=1416944&r1=1416943&r2=1416944&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/engine/engine.c (original)
+++ qpid/proton/trunk/proton-c/src/engine/engine.c Tue Dec 4 14:02:39 2012
@@ -2119,16 +2119,18 @@ int pn_post_disp(pn_transport_t *transpo
return 0;
}
-int pn_process_tpwork_sender(pn_transport_t *transport, pn_delivery_t *delivery)
+int pn_process_tpwork_sender(pn_transport_t *transport, pn_delivery_t *delivery, bool* allocation_blocked)
{
pn_link_t *link = delivery->link;
pn_session_state_t *ssn_state = pn_session_get_state(transport, link->session);
pn_link_state_t *link_state = pn_link_get_state(ssn_state, link);
if ((int16_t) ssn_state->local_channel >= 0 && (int32_t) link_state->local_handle >= 0) {
pn_delivery_state_t *state = (pn_delivery_state_t *) delivery->transport_context;
- if (!state && pn_delivery_buffer_available(&ssn_state->outgoing)) {
+ if (!(*allocation_blocked) && !state && pn_delivery_buffer_available(&ssn_state->outgoing)) {
state = pn_delivery_buffer_push(&ssn_state->outgoing, delivery);
delivery->transport_context = state;
+ } else {
+ *allocation_blocked = true;
}
if (state && !state->sent && (delivery->done || pn_buffer_size(delivery->bytes) > 0) &&
@@ -2200,6 +2202,7 @@ int pn_process_tpwork(pn_transport_t *tr
{
pn_connection_t *conn = (pn_connection_t *) endpoint;
pn_delivery_t *delivery = conn->tpwork_head;
+ bool allocation_blocked = false;
while (delivery)
{
if (!delivery->transport_context && transport->disp->available > 0) {
@@ -2208,7 +2211,7 @@ int pn_process_tpwork(pn_transport_t *tr
pn_link_t *link = delivery->link;
if (pn_link_is_sender(link)) {
- int err = pn_process_tpwork_sender(transport, delivery);
+ int err = pn_process_tpwork_sender(transport, delivery, &allocation_blocked);
if (err) return err;
} else {
int err = pn_process_tpwork_receiver(transport, delivery);
Modified: qpid/proton/trunk/tests/proton_tests/engine.py
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/tests/proton_tests/engine.py?rev=1416944&r1=1416943&r2=1416944&view=diff
==============================================================================
--- qpid/proton/trunk/tests/proton_tests/engine.py (original)
+++ qpid/proton/trunk/tests/proton_tests/engine.py Tue Dec 4 14:02:39 2012
@@ -29,10 +29,10 @@ from proton import *
OUTPUT_SIZE = 10*1024
-def pump(t1, t2):
+def pump(t1, t2, buffer_size=OUTPUT_SIZE):
while True:
- out1 = t1.output(OUTPUT_SIZE)
- out2 = t2.output(OUTPUT_SIZE)
+ out1 = t1.output(buffer_size)
+ out2 = t2.output(buffer_size)
if out1 or out2:
if out1:
@@ -98,9 +98,9 @@ class Test(common.Test):
def cleanup(self):
pass
- def pump(self):
+ def pump(self, buffer_size=OUTPUT_SIZE):
for c1, t1, c2, t2 in self._wires:
- pump(t1, t2)
+ pump(t1, t2, buffer_size)
class ConnectionTest(Test):
@@ -575,6 +575,71 @@ class TransferTest(Test):
assert sd.local_state == rd.remote_state == Delivery.ACCEPTED
+ def test_delivery_id_ordering(self):
+ self.rcv.flow(1024)
+ self.pump(buffer_size=64*1024)
+
+ #fill up delivery buffer on sender
+ for m in range(1024):
+ sd = self.snd.delivery("tag%s" % m)
+ msg = "message %s" % m
+ n = self.snd.send(msg)
+ assert n == len(msg)
+ assert self.snd.advance()
+
+ self.pump(buffer_size=64*1024)
+
+ #receive a session-windows worth of messages and accept them
+ for m in range(1024):
+ rd = self.rcv.current
+ assert rd is not None, m
+ assert rd.tag == ("tag%s" % m), (rd.tag, m)
+ msg = self.rcv.recv(1024)
+ assert msg == ("message %s" % m), (msg, m)
+ rd.update(Delivery.ACCEPTED)
+ rd.settle()
+
+ self.pump(buffer_size=64*1024)
+
+ #add some new deliveries
+ for m in range(1024, 1450):
+ sd = self.snd.delivery("tag%s" % m)
+ msg = "message %s" % m
+ n = self.snd.send(msg)
+ assert n == len(msg)
+ assert self.snd.advance()
+
+ #handle all disposition changes to sent messages
+ d = self.c1.work_head
+ while d:
+ if d.updated:
+ d.update(Delivery.ACCEPTED)
+ d.settle()
+ d = d.work_next
+
+ #submit some more deliveries
+ for m in range(1450, 1500):
+ sd = self.snd.delivery("tag%s" % m)
+ msg = "message %s" % m
+ n = self.snd.send(msg)
+ assert n == len(msg)
+ assert self.snd.advance()
+
+ self.pump(buffer_size=64*1024)
+ self.rcv.flow(1024)
+ self.pump(buffer_size=64*1024)
+
+ #verify remaining messages can be received and accepted
+ for m in range(1024, 1500):
+ rd = self.rcv.current
+ assert rd is not None, m
+ assert rd.tag == ("tag%s" % m), (rd.tag, m)
+ msg = self.rcv.recv(1024)
+ assert msg == ("message %s" % m), (msg, m)
+ rd.update(Delivery.ACCEPTED)
+ rd.settle()
+
+
class MaxFrameTransferTest(Test):
def setup(self):
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org