You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kg...@apache.org on 2013/02/20 16:22:26 UTC

svn commit: r1448220 - in /qpid/proton/trunk: proton-c/bindings/python/ proton-c/include/proton/ proton-c/src/ proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/impl/ tests/python/proton_tests/

Author: kgiusti
Date: Wed Feb 20 15:22:26 2013
New Revision: 1448220

URL: http://svn.apache.org/r1448220
Log:
PROTON-200: maintain a minimum credit level for each receive link

Modified:
    qpid/proton/trunk/proton-c/bindings/python/proton.py
    qpid/proton/trunk/proton-c/include/proton/messenger.h
    qpid/proton/trunk/proton-c/src/messenger.c
    qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/impl/MessengerImpl.java
    qpid/proton/trunk/tests/python/proton_tests/messenger.py

Modified: qpid/proton/trunk/proton-c/bindings/python/proton.py
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/bindings/python/proton.py?rev=1448220&r1=1448219&r2=1448220&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/bindings/python/proton.py (original)
+++ qpid/proton/trunk/proton-c/bindings/python/proton.py Wed Feb 20 15:22:26 2013
@@ -369,8 +369,9 @@ send. Defaults to zero.
   def recv(self, n=None):
     """
     Receives up to I{n} messages into the incoming queue of the
-    L{Messenger}. This method will block until at least one message is
-    available or the operation times out.
+    L{Messenger}. If I{n} is not specified, L{Messenger} will receive as many
+    messages as it can buffer internally. This method will block until at least
+    one message is available or the operation times out.
     """
     if n is None:
       n = -1

Modified: qpid/proton/trunk/proton-c/include/proton/messenger.h
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/include/proton/messenger.h?rev=1448220&r1=1448219&r2=1448220&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/include/proton/messenger.h (original)
+++ qpid/proton/trunk/proton-c/include/proton/messenger.h Wed Feb 20 15:22:26 2013
@@ -329,13 +329,14 @@ PN_EXTERN pn_tracker_t pn_messenger_outg
  */
 PN_EXTERN int pn_messenger_send(pn_messenger_t *messenger);
 
-/** Receives up to n message into the incoming message queue of a
- * messenger. Blocks until at least one message is available in the
- * incoming queue.
+/** Receives up to n messages into the incoming message queue of a
+ * messenger. If n is -1, Messenger will be able to receive as many
+ * messages as it can buffer internally.  Blocks until at least one
+ * message is available in the incoming queue.
  *
  * @param[in] messenger the messenger
- * @param[in] n the maximum number of messages to receive, if -1
- * allow at least one message from each peer.
+ * @param[in] n the maximum number of messages to receive or -1 to to
+ * receive as many messages as it can buffer internally.
  *
  * @return an error code or zero on success
  * @see error.h

Modified: qpid/proton/trunk/proton-c/src/messenger.c
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/messenger.c?rev=1448220&r1=1448219&r2=1448220&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/messenger.c (original)
+++ qpid/proton/trunk/proton-c/src/messenger.c Wed Feb 20 15:22:26 2013
@@ -49,6 +49,7 @@ struct pn_messenger_t {
   int timeout;
   pn_driver_t *driver;
   bool unlimited_credit;
+  int credit_batch;
   int credit;
   int distributed;
   uint64_t next_tag;
@@ -264,6 +265,7 @@ pn_messenger_t *pn_messenger(const char 
     m->timeout = -1;
     m->driver = pn_driver();
     m->unlimited_credit = false;
+    m->credit_batch = 10;
     m->credit = 0;
     m->distributed = 0;
     m->next_tag = 0;
@@ -387,33 +389,47 @@ const char *pn_messenger_error(pn_messen
 
 void pn_messenger_flow(pn_messenger_t *messenger)
 {
-  while (messenger->credit > 0 || messenger->unlimited_credit) {
-    int prev = messenger->credit;
-    pn_connector_t *ctor = pn_connector_head(messenger->driver);
-    while (ctor) {
-      pn_connection_t *conn = pn_connector_connection(ctor);
-
-      pn_link_t *link = pn_link_head(conn, PN_LOCAL_ACTIVE);
-      while (link) {
-        if (pn_link_is_receiver(link)) {
-          if (messenger->unlimited_credit) {
-            if (!pn_link_credit(link)) {
-              pn_link_flow(link, 1);
-              messenger->distributed++;
-            }
-          } else {
-            pn_link_flow(link, 1);
-            messenger->credit--;
-            messenger->distributed++;
-            if (messenger->credit == 0) break;
-          }
+  int link_ct = 0;
+  pn_connector_t *ctor = pn_connector_head(messenger->driver);
+  while (ctor) {
+    pn_connection_t *conn = pn_connector_connection(ctor);
+
+    pn_link_t *link = pn_link_head(conn, PN_LOCAL_ACTIVE);
+    while (link) {
+      if (pn_link_is_receiver(link)) link_ct++;
+      link = pn_link_next(link, PN_LOCAL_ACTIVE);
+    }
+    ctor = pn_connector_next(ctor);
+  }
+
+  if (link_ct == 0) return;
+
+  if (messenger->unlimited_credit)
+    messenger->credit = link_ct * messenger->credit_batch;
+
+  int batch = (messenger->credit < link_ct) ? 1
+    : (messenger->credit/link_ct);
+
+  ctor = pn_connector_head(messenger->driver);
+  while (ctor) {
+    pn_connection_t *conn = pn_connector_connection(ctor);
+    pn_link_t *link = pn_link_head(conn, PN_LOCAL_ACTIVE);
+    while (link) {
+      if (pn_link_is_receiver(link)) {
+
+        int have = pn_link_credit(link);
+        if (have < batch) {
+          int need = batch - have;
+          int amount = (messenger->credit < need) ? messenger->credit : need;
+          pn_link_flow(link, amount);
+          messenger->distributed += amount;
+          messenger->credit -= amount;
+          if (messenger->credit == 0) return;
         }
-        link = pn_link_next(link, PN_LOCAL_ACTIVE);
       }
-
-      ctor = pn_connector_next(ctor);
+      link = pn_link_next(link, PN_LOCAL_ACTIVE);
     }
-    if (messenger->unlimited_credit || messenger->credit == prev) break;
+    ctor = pn_connector_next(ctor);
   }
 }
 

Modified: qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/impl/MessengerImpl.java
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/impl/MessengerImpl.java?rev=1448220&r1=1448219&r2=1448220&view=diff
==============================================================================
--- qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/impl/MessengerImpl.java (original)
+++ qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/impl/MessengerImpl.java Wed Feb 20 15:22:26 2013
@@ -74,6 +74,7 @@ public class MessengerImpl implements Me
     private byte[] _buffer = new byte[5*1024];
     private Driver _driver;
     private boolean _unlimitedCredit = false;
+    private static final int _creditBatch = 10;
     private int _credit;
     private int _distributed;
     private TrackerQueue _incoming = new TrackerQueue();
@@ -571,27 +572,44 @@ public class MessengerImpl implements Me
 
     private void distributeCredit()
     {
-        int previous = 0;
-        while (_unlimitedCredit || (_credit > 0 && _credit != previous))
+        int linkCt = 0;
+        // @todo track the number of opened receive links
+        for (Connector c : _driver.connectors())
         {
-            previous = _credit;
-            for (Connector c : _driver.connectors())
+            Connection connection = c.getConnection();
+            for (Link link : new Links(connection, ACTIVE, ANY))
             {
-                Connection connection = c.getConnection();
-                for (Link link : new Links(connection, ACTIVE, ANY))
+                if (link instanceof Receiver) linkCt++;
+            }
+        }
+
+        if (linkCt == 0) return;
+
+        if (_unlimitedCredit)
+        {
+            _credit = linkCt * _creditBatch;
+        }
+
+        int batch = (_credit < linkCt) ? 1 : (_credit/linkCt);
+        for (Connector c : _driver.connectors())
+        {
+            Connection connection = c.getConnection();
+            for (Link link : new Links(connection, ACTIVE, ANY))
+            {
+                if (link instanceof Receiver)
                 {
-                    if (link instanceof Receiver)
+                    int have = ((Receiver) link).getCredit();
+                    if (have < batch)
                     {
-                        ((Receiver) link).flow(1);
-                        _distributed++;
-                        if (!_unlimitedCredit) {
-                            _credit--;
-                            if (_credit == 0) return;
-                        }
+                        int need = batch - have;
+                        int amount = (_credit < need) ? _credit : need;
+                        ((Receiver) link).flow(amount);
+                        _distributed += amount;
+                        _credit -= amount;
+                        if (_credit == 0) return;
                     }
                 }
             }
-            if (_unlimitedCredit) return;
         }
     }
 

Modified: qpid/proton/trunk/tests/python/proton_tests/messenger.py
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/tests/python/proton_tests/messenger.py?rev=1448220&r1=1448219&r2=1448220&view=diff
==============================================================================
--- qpid/proton/trunk/tests/python/proton_tests/messenger.py (original)
+++ qpid/proton/trunk/tests/python/proton_tests/messenger.py Wed Feb 20 15:22:26 2013
@@ -25,7 +25,7 @@ from time import sleep, time
 class Test(common.Test):
 
   def setup(self):
-    self.server_credit = -1
+    self.server_credit = 10
     self.server_received = 0
     self.server = Messenger("server")
     self.server.timeout=10000
@@ -210,7 +210,6 @@ class MessengerTest(Test):
         assert self.client.status(t) is ACCEPTED, (t, self.client.status(t))
 
   def testRejectIndividual(self):
-    self.server_credit = 10
     self.testReject(self.reject_individual)
 
   def reject_individual(self, msg):
@@ -276,6 +275,7 @@ class MessengerTest(Test):
     """ Bring up two links.  Verify credit is granted to each link by
     transferring a message over each.
     """
+    self.server_credit = -1
     self.start()
 
     msg = Message()
@@ -304,7 +304,7 @@ class MessengerTest(Test):
     self.client.send()
 
     reply = Message()
-    self.client.recv(-1)
+    self.client.recv(1)
     assert self.client.incoming == 1
     self.client.get(reply)
 



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org