You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rh...@apache.org on 2012/11/16 03:45:46 UTC
svn commit: r1410172 - in /qpid/proton/trunk: proton-c/src/messenger.c
tests/proton_tests/messenger.py
Author: rhs
Date: Fri Nov 16 02:45:46 2012
New Revision: 1410172
URL: http://svn.apache.org/viewvc?rev=1410172&view=rev
Log:
fixed bug in non-cumulative acks
Modified:
qpid/proton/trunk/proton-c/src/messenger.c
qpid/proton/trunk/tests/proton_tests/messenger.py
Modified: qpid/proton/trunk/proton-c/src/messenger.c
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/messenger.c?rev=1410172&r1=1410171&r2=1410172&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/messenger.c (original)
+++ qpid/proton/trunk/proton-c/src/messenger.c Fri Nov 16 02:45:46 2012
@@ -36,7 +36,6 @@ typedef struct {
size_t capacity;
int window;
pn_sequence_t lwm;
- pn_sequence_t mwm;
pn_sequence_t hwm;
pn_delivery_t **deliveries;
} pn_queue_t;
@@ -73,7 +72,6 @@ void pn_queue_init(pn_queue_t *queue)
queue->capacity = 1024;
queue->window = 0;
queue->lwm = 0;
- queue->mwm = 0;
queue->hwm = 0;
queue->deliveries = calloc(queue->capacity, sizeof(pn_delivery_t *));
}
@@ -110,9 +108,6 @@ void pn_queue_gc(pn_queue_t *queue)
memmove(queue->deliveries, queue->deliveries + delta, (count - delta)*sizeof(pn_delivery_t *));
queue->lwm += delta;
- if (queue->mwm - queue->lwm < 0) {
- queue->mwm = queue->lwm;
- }
}
void pn_incref(pn_connection_t *conn)
@@ -161,11 +156,15 @@ pn_sequence_t pn_queue_add(pn_queue_t *q
void pn_queue_slide(pn_queue_t *queue)
{
if (queue->window >= 0) {
- while (queue->hwm - queue->mwm > queue->window) {
+ while (queue->hwm - queue->lwm > queue->window) {
pn_delivery_t *d = pn_queue_get(queue, queue->lwm);
if (d) {
- pn_delivery_settle(d);
- pn_queue_del(queue, d);
+ if (pn_delivery_local_state(d)) {
+ pn_delivery_settle(d);
+ pn_queue_del(queue, d);
+ } else {
+ break;
+ }
} else {
pn_queue_gc(queue);
}
@@ -182,7 +181,7 @@ int pn_queue_update(pn_queue_t *queue, p
}
size_t start;
- if (PN_CUMULATIVE | flags) {
+ if (PN_CUMULATIVE & flags) {
start = queue->lwm;
} else {
start = id;
@@ -191,23 +190,22 @@ int pn_queue_update(pn_queue_t *queue, p
for (pn_sequence_t i = start; i <= id; i++) {
pn_delivery_t *d = pn_queue_get(queue, i);
if (d) {
- if (match) {
- pn_delivery_update(d, pn_delivery_remote_state(d));
- } else {
- switch (status) {
- case PN_STATUS_ACCEPTED:
- pn_delivery_update(d, PN_ACCEPTED);
- break;
- case PN_STATUS_REJECTED:
- pn_delivery_update(d, PN_REJECTED);
- break;
- default:
- break;
+ if (!pn_delivery_local_state(d)) {
+ if (match) {
+ pn_delivery_update(d, pn_delivery_remote_state(d));
+ } else {
+ switch (status) {
+ case PN_STATUS_ACCEPTED:
+ pn_delivery_update(d, PN_ACCEPTED);
+ break;
+ case PN_STATUS_REJECTED:
+ pn_delivery_update(d, PN_REJECTED);
+ break;
+ default:
+ break;
+ }
}
}
- if (pn_delivery_local_state(d) && i - queue->mwm < 0) {
- queue->mwm = i;
- }
if (settle) {
pn_delivery_settle(d);
pn_queue_del(queue, d);
Modified: qpid/proton/trunk/tests/proton_tests/messenger.py
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/tests/proton_tests/messenger.py?rev=1410172&r1=1410171&r2=1410172&view=diff
==============================================================================
--- qpid/proton/trunk/tests/proton_tests/messenger.py (original)
+++ qpid/proton/trunk/tests/proton_tests/messenger.py Fri Nov 16 02:45:46 2012
@@ -60,21 +60,27 @@ class MessengerTest(Test):
try:
while self.running:
self.server.recv(10)
- while self.server.incoming:
- self.server.get(msg)
- if msg.body == REJECT_ME:
- self.server.reject()
- else:
- self.server.accept()
- if msg.reply_to:
- msg.address = msg.reply_to
- self.server.put(msg)
- self.server.settle()
+ self.process_incoming(msg)
except Timeout:
print "server timed out"
self.server.stop()
self.running = False
+ def process_incoming(self, msg):
+ while self.server.incoming:
+ self.server.get(msg)
+ if msg.body == REJECT_ME:
+ self.server.reject()
+ else:
+ self.server.accept()
+ self.dispatch(msg)
+
+ def dispatch(self, msg):
+ if msg.reply_to:
+ msg.address = msg.reply_to
+ self.server.put(msg)
+ self.server.settle()
+
def _testSendReceive(self, size=None):
self.start()
msg = Message()
@@ -164,7 +170,9 @@ class MessengerTest(Test):
else:
assert self.client.status(t) is None
- def testReject(self):
+ def testReject(self, process_incoming=None):
+ if process_incoming:
+ self.process_incoming = process_incoming
self.server.accept_mode = MANUAL
self.start()
msg = Message()
@@ -187,9 +195,23 @@ class MessengerTest(Test):
for t in trackers:
if t in rejected:
- assert self.client.status(t) is REJECTED
+ assert self.client.status(t) is REJECTED, (t, self.client.status(t))
else:
- assert self.client.status(t) is ACCEPTED
+ assert self.client.status(t) is ACCEPTED, (t, self.client.status(t))
+
+ def testRejectIndividual(self):
+ self.testReject(self.reject_individual)
+
+ def reject_individual(self, msg):
+ if self.server.incoming < 10:
+ return
+ while self.server.incoming:
+ t = self.server.get(msg)
+ if msg.body == REJECT_ME:
+ self.server.reject(t)
+ self.dispatch(msg)
+ self.server.accept()
+
def testIncomingWindow(self):
self.server.accept_mode = MANUAL
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org