You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rh...@apache.org on 2012/11/16 03:45:46 UTC

svn commit: r1410172 - in /qpid/proton/trunk: proton-c/src/messenger.c tests/proton_tests/messenger.py

Author: rhs
Date: Fri Nov 16 02:45:46 2012
New Revision: 1410172

URL: http://svn.apache.org/viewvc?rev=1410172&view=rev
Log:
fixed bug in non-cumulative acks

Modified:
    qpid/proton/trunk/proton-c/src/messenger.c
    qpid/proton/trunk/tests/proton_tests/messenger.py

Modified: qpid/proton/trunk/proton-c/src/messenger.c
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/messenger.c?rev=1410172&r1=1410171&r2=1410172&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/messenger.c (original)
+++ qpid/proton/trunk/proton-c/src/messenger.c Fri Nov 16 02:45:46 2012
@@ -36,7 +36,6 @@ typedef struct {
   size_t capacity;
   int window;
   pn_sequence_t lwm;
-  pn_sequence_t mwm;
   pn_sequence_t hwm;
   pn_delivery_t **deliveries;
 } pn_queue_t;
@@ -73,7 +72,6 @@ void pn_queue_init(pn_queue_t *queue)
   queue->capacity = 1024;
   queue->window = 0;
   queue->lwm = 0;
-  queue->mwm = 0;
   queue->hwm = 0;
   queue->deliveries = calloc(queue->capacity, sizeof(pn_delivery_t *));
 }
@@ -110,9 +108,6 @@ void pn_queue_gc(pn_queue_t *queue)
 
   memmove(queue->deliveries, queue->deliveries + delta, (count - delta)*sizeof(pn_delivery_t *));
   queue->lwm += delta;
-  if (queue->mwm - queue->lwm < 0) {
-    queue->mwm = queue->lwm;
-  }
 }
 
 void pn_incref(pn_connection_t *conn)
@@ -161,11 +156,15 @@ pn_sequence_t pn_queue_add(pn_queue_t *q
 void pn_queue_slide(pn_queue_t *queue)
 {
   if (queue->window >= 0) {
-    while (queue->hwm - queue->mwm > queue->window) {
+    while (queue->hwm - queue->lwm > queue->window) {
       pn_delivery_t *d = pn_queue_get(queue, queue->lwm);
       if (d) {
-        pn_delivery_settle(d);
-        pn_queue_del(queue, d);
+        if (pn_delivery_local_state(d)) {
+          pn_delivery_settle(d);
+          pn_queue_del(queue, d);
+        } else {
+          break;
+        }
       } else {
         pn_queue_gc(queue);
       }
@@ -182,7 +181,7 @@ int pn_queue_update(pn_queue_t *queue, p
   }
 
   size_t start;
-  if (PN_CUMULATIVE | flags) {
+  if (PN_CUMULATIVE & flags) {
     start = queue->lwm;
   } else {
     start = id;
@@ -191,23 +190,22 @@ int pn_queue_update(pn_queue_t *queue, p
   for (pn_sequence_t i = start; i <= id; i++) {
     pn_delivery_t *d = pn_queue_get(queue, i);
     if (d) {
-      if (match) {
-        pn_delivery_update(d, pn_delivery_remote_state(d));
-      } else {
-        switch (status) {
-        case PN_STATUS_ACCEPTED:
-          pn_delivery_update(d, PN_ACCEPTED);
-          break;
-        case PN_STATUS_REJECTED:
-          pn_delivery_update(d, PN_REJECTED);
-          break;
-        default:
-          break;
+      if (!pn_delivery_local_state(d)) {
+        if (match) {
+          pn_delivery_update(d, pn_delivery_remote_state(d));
+        } else {
+          switch (status) {
+          case PN_STATUS_ACCEPTED:
+            pn_delivery_update(d, PN_ACCEPTED);
+            break;
+          case PN_STATUS_REJECTED:
+            pn_delivery_update(d, PN_REJECTED);
+            break;
+          default:
+            break;
+          }
         }
       }
-      if (pn_delivery_local_state(d) && i - queue->mwm < 0) {
-        queue->mwm = i;
-      }
       if (settle) {
         pn_delivery_settle(d);
         pn_queue_del(queue, d);

Modified: qpid/proton/trunk/tests/proton_tests/messenger.py
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/tests/proton_tests/messenger.py?rev=1410172&r1=1410171&r2=1410172&view=diff
==============================================================================
--- qpid/proton/trunk/tests/proton_tests/messenger.py (original)
+++ qpid/proton/trunk/tests/proton_tests/messenger.py Fri Nov 16 02:45:46 2012
@@ -60,21 +60,27 @@ class MessengerTest(Test):
     try:
       while self.running:
         self.server.recv(10)
-        while self.server.incoming:
-          self.server.get(msg)
-          if msg.body == REJECT_ME:
-            self.server.reject()
-          else:
-            self.server.accept()
-            if msg.reply_to:
-              msg.address = msg.reply_to
-              self.server.put(msg)
-              self.server.settle()
+        self.process_incoming(msg)
     except Timeout:
       print "server timed out"
     self.server.stop()
     self.running = False
 
+  def process_incoming(self, msg):
+    while self.server.incoming:
+      self.server.get(msg)
+      if msg.body == REJECT_ME:
+        self.server.reject()
+      else:
+        self.server.accept()
+      self.dispatch(msg)
+
+  def dispatch(self, msg):
+    if msg.reply_to:
+      msg.address = msg.reply_to
+      self.server.put(msg)
+      self.server.settle()
+
   def _testSendReceive(self, size=None):
     self.start()
     msg = Message()
@@ -164,7 +170,9 @@ class MessengerTest(Test):
       else:
         assert self.client.status(t) is None
 
-  def testReject(self):
+  def testReject(self, process_incoming=None):
+    if process_incoming:
+      self.process_incoming = process_incoming
     self.server.accept_mode = MANUAL
     self.start()
     msg = Message()
@@ -187,9 +195,23 @@ class MessengerTest(Test):
 
     for t in trackers:
       if t in rejected:
-        assert self.client.status(t) is REJECTED
+        assert self.client.status(t) is REJECTED, (t, self.client.status(t))
       else:
-        assert self.client.status(t) is ACCEPTED
+        assert self.client.status(t) is ACCEPTED, (t, self.client.status(t))
+
+  def testRejectIndividual(self):
+    self.testReject(self.reject_individual)
+
+  def reject_individual(self, msg):
+    if self.server.incoming < 10:
+      return
+    while self.server.incoming:
+      t = self.server.get(msg)
+      if msg.body == REJECT_ME:
+        self.server.reject(t)
+      self.dispatch(msg)
+    self.server.accept()
+
 
   def testIncomingWindow(self):
     self.server.accept_mode = MANUAL



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org