You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kg...@apache.org on 2013/02/20 16:22:08 UTC

svn commit: r1448216 - in /qpid/proton/trunk: proton-c/bindings/python/proton.py proton-c/include/proton/cproton.i proton-c/include/proton/messenger.h proton-c/src/messenger.c tests/python/proton_tests/messenger.py

Author: kgiusti
Date: Wed Feb 20 15:22:08 2013
New Revision: 1448216

URL: http://svn.apache.org/r1448216
Log:
PROTON-200: allow recv(-1) to grant unlimited credit

Modified:
    qpid/proton/trunk/proton-c/bindings/python/proton.py
    qpid/proton/trunk/proton-c/include/proton/cproton.i
    qpid/proton/trunk/proton-c/include/proton/messenger.h
    qpid/proton/trunk/proton-c/src/messenger.c
    qpid/proton/trunk/tests/python/proton_tests/messenger.py

Modified: qpid/proton/trunk/proton-c/bindings/python/proton.py
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/bindings/python/proton.py?rev=1448216&r1=1448215&r2=1448216&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/bindings/python/proton.py (original)
+++ qpid/proton/trunk/proton-c/bindings/python/proton.py Wed Feb 20 15:22:08 2013
@@ -366,12 +366,14 @@ send. Defaults to zero.
     """
     self._check(pn_messenger_send(self._mng))
 
-  def recv(self, n):
+  def recv(self, n=None):
     """
     Receives up to I{n} messages into the incoming queue of the
     L{Messenger}. This method will block until at least one message is
     available or the operation times out.
     """
+    if n is None:
+      n = -1
     self._check(pn_messenger_recv(self._mng, n))
 
   def get(self, message=None):

Modified: qpid/proton/trunk/proton-c/include/proton/cproton.i
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/include/proton/cproton.i?rev=1448216&r1=1448215&r2=1448216&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/include/proton/cproton.i (original)
+++ qpid/proton/trunk/proton-c/include/proton/cproton.i Wed Feb 20 15:22:08 2013
@@ -1284,7 +1284,6 @@ typedef long long int int64_t;
 {
  require:
   messenger != NULL;
-  n >= 0;
 }
 
 %contract pn_messenger_get(pn_messenger_t *messenger, pn_message_t *msg)

Modified: qpid/proton/trunk/proton-c/include/proton/messenger.h
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/include/proton/messenger.h?rev=1448216&r1=1448215&r2=1448216&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/include/proton/messenger.h (original)
+++ qpid/proton/trunk/proton-c/include/proton/messenger.h Wed Feb 20 15:22:08 2013
@@ -334,7 +334,8 @@ PN_EXTERN int pn_messenger_send(pn_messe
  * incoming queue.
  *
  * @param[in] messenger the messenger
- * @param[in] n the maximum number of messages to receive
+ * @param[in] n the maximum number of messages to receive, if -1
+ * allow at least one message from each peer.
  *
  * @return an error code or zero on success
  * @see error.h

Modified: qpid/proton/trunk/proton-c/src/messenger.c
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/messenger.c?rev=1448216&r1=1448215&r2=1448216&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/messenger.c (original)
+++ qpid/proton/trunk/proton-c/src/messenger.c Wed Feb 20 15:22:08 2013
@@ -48,6 +48,7 @@ struct pn_messenger_t {
   char *trusted_certificates;
   int timeout;
   pn_driver_t *driver;
+  bool unlimited_credit;
   int credit;
   int distributed;
   uint64_t next_tag;
@@ -262,6 +263,7 @@ pn_messenger_t *pn_messenger(const char 
     m->trusted_certificates = NULL;
     m->timeout = -1;
     m->driver = pn_driver();
+    m->unlimited_credit = false;
     m->credit = 0;
     m->distributed = 0;
     m->next_tag = 0;
@@ -385,25 +387,33 @@ const char *pn_messenger_error(pn_messen
 
 void pn_messenger_flow(pn_messenger_t *messenger)
 {
-  while (messenger->credit > 0) {
+  while (messenger->credit > 0 || messenger->unlimited_credit) {
     int prev = messenger->credit;
     pn_connector_t *ctor = pn_connector_head(messenger->driver);
     while (ctor) {
       pn_connection_t *conn = pn_connector_connection(ctor);
 
       pn_link_t *link = pn_link_head(conn, PN_LOCAL_ACTIVE);
-      while (link && messenger->credit > 0) {
+      while (link) {
         if (pn_link_is_receiver(link)) {
-          pn_link_flow(link, 1);
-          messenger->credit--;
-          messenger->distributed++;
+          if (messenger->unlimited_credit) {
+            if (!pn_link_credit(link)) {
+              pn_link_flow(link, 1);
+              messenger->distributed++;
+            }
+          } else {
+            pn_link_flow(link, 1);
+            messenger->credit--;
+            messenger->distributed++;
+            if (messenger->credit == 0) break;
+          }
         }
         link = pn_link_next(link, PN_LOCAL_ACTIVE);
       }
 
       ctor = pn_connector_next(ctor);
     }
-    if (messenger->credit == prev) break;
+    if (messenger->unlimited_credit || messenger->credit == prev) break;
   }
 }
 
@@ -1115,8 +1125,13 @@ int pn_messenger_recv(pn_messenger_t *me
   if (!pn_listener_head(messenger->driver) && !pn_connector_head(messenger->driver))
     return pn_error_format(messenger->error, PN_STATE_ERR, "no valid sources");
   int total = messenger->credit + messenger->distributed;
-  if (n > total)
-    messenger->credit += (n - total);
+  if (n == -1) {
+    messenger->unlimited_credit = true;
+  } else  {
+    messenger->unlimited_credit = false;
+    if (n > total)
+      messenger->credit += (n - total);
+  }
   pn_messenger_flow(messenger);
   return pn_messenger_sync(messenger, pn_messenger_rcvd);
 }

Modified: qpid/proton/trunk/tests/python/proton_tests/messenger.py
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/tests/python/proton_tests/messenger.py?rev=1448216&r1=1448215&r2=1448216&view=diff
==============================================================================
--- qpid/proton/trunk/tests/python/proton_tests/messenger.py (original)
+++ qpid/proton/trunk/tests/python/proton_tests/messenger.py Wed Feb 20 15:22:08 2013
@@ -25,7 +25,8 @@ from time import sleep, time
 class Test(common.Test):
 
   def setup(self):
-    self.server_received = 0;
+    self.server_credit = -1
+    self.server_received = 0
     self.server = Messenger("server")
     self.server.timeout=10000
     self.server.start()
@@ -61,7 +62,7 @@ class MessengerTest(Test):
     msg = Message()
     try:
       while self.running:
-        self.server.recv(10)
+        self.server.recv(self.server_credit)
         self.process_incoming(msg)
     except Timeout:
       print "server timed out"
@@ -209,6 +210,7 @@ class MessengerTest(Test):
         assert self.client.status(t) is ACCEPTED, (t, self.client.status(t))
 
   def testRejectIndividual(self):
+    self.server_credit = 10
     self.testReject(self.reject_individual)
 
   def reject_individual(self, msg):
@@ -269,3 +271,43 @@ class MessengerTest(Test):
       assert time() < deadline, "Server did not receive message!"
       sleep(.1)
     assert self.server_received == 1
+
+  def testUnlimitedCredit(self):
+    """ Bring up two links.  Verify credit is granted to each link by
+    transferring a message over each.
+    """
+    self.start()
+
+    msg = Message()
+    msg.address="amqp://0.0.0.0:12345/XXX"
+    msg.subject="Hello World!"
+    body = "First the world, then the galaxy!"
+    msg.load(body)
+    self.client.put(msg)
+    self.client.send()
+
+    reply = Message()
+    self.client.recv(1)
+    assert self.client.incoming == 1
+    self.client.get(reply)
+
+    assert reply.subject == "Hello World!"
+    rbod = reply.save()
+    assert rbod == body, (rbod, body)
+
+    msg = Message()
+    msg.address="amqp://0.0.0.0:12345/YYY"
+    msg.subject="Hello World!"
+    body = "First the world, then the galaxy!"
+    msg.load(body)
+    self.client.put(msg)
+    self.client.send()
+
+    reply = Message()
+    self.client.recv(-1)
+    assert self.client.incoming == 1
+    self.client.get(reply)
+
+    assert reply.subject == "Hello World!"
+    rbod = reply.save()
+    assert rbod == body, (rbod, body)



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org