You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rh...@apache.org on 2014/09/23 19:58:35 UTC

svn commit: r1627102 - /qpid/proton/trunk/proton-c/src/messenger/messenger.c

Author: rhs
Date: Tue Sep 23 17:58:34 2014
New Revision: 1627102

URL: http://svn.apache.org/r1627102
Log:
PROTON-679: patch from dominic for manual credit mode

Modified:
    qpid/proton/trunk/proton-c/src/messenger/messenger.c

Modified: qpid/proton/trunk/proton-c/src/messenger/messenger.c
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/messenger/messenger.c?rev=1627102&r1=1627101&r2=1627102&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/messenger/messenger.c (original)
+++ qpid/proton/trunk/proton-c/src/messenger/messenger.c Tue Sep 23 17:58:34 2014
@@ -54,10 +54,11 @@ typedef struct {
 } pn_address_t;
 
 // algorithm for granting credit to receivers
-typedef  enum {
+typedef enum {
   // pn_messenger_recv( X ), where:
-  LINK_CREDIT_EXPLICIT,  // X > 0
-  LINK_CREDIT_AUTO   // X == -1
+  LINK_CREDIT_EXPLICIT, // X > 0
+  LINK_CREDIT_AUTO,     // X == -1
+  LINK_CREDIT_MANUAL    // X == -2
 } pn_link_credit_mode_t;
 
 struct pn_messenger_t {
@@ -854,6 +855,8 @@ bool pn_messenger_flow(pn_messenger_t *m
     const int used = messenger->distributed + pn_messenger_incoming(messenger);
     if (max > used)
       messenger->credit = max - used;
+  } else if (messenger->credit_mode == LINK_CREDIT_MANUAL) {
+    return false;
   }
 
   const int batch = per_link_credit(messenger);
@@ -1002,33 +1005,38 @@ int pni_pump_in(pn_messenger_t *messenge
   n = pn_link_recv(receiver, encoded + pending, 1);
   pn_link_advance(receiver);
 
-  // account for the used credit
-  assert( ctx );
-  assert( messenger->distributed );
-  messenger->distributed--;
-
   pn_link_t *link = receiver;
 
-  // replenish if low (< 20% maximum batch) and credit available
-  if (!pn_link_get_drain(link) && pn_list_size(messenger->blocked) == 0 && messenger->credit > 0) {
-    const int max = per_link_credit(messenger);
-    const int lo_thresh = (int)(max * 0.2 + 0.5);
-    if (pn_link_remote_credit(link) < lo_thresh) {
-      const int more = pn_min(messenger->credit, max - pn_link_remote_credit(link));
-      messenger->credit -= more;
-      messenger->distributed += more;
-      pn_link_flow(link, more);
-    }
-  }
-  // check if blocked
-  if (pn_list_index(messenger->blocked, link) < 0 && pn_link_remote_credit(link) == 0) {
-    pn_list_remove(messenger->credited, link);
-    if (pn_link_get_drain(link)) {
-      pn_link_set_drain(link, false);
-      assert( messenger->draining > 0 );
-      messenger->draining--;
+  if (messenger->credit_mode != LINK_CREDIT_MANUAL) {
+    // account for the used credit
+    assert(ctx);
+    assert(messenger->distributed);
+    messenger->distributed--;
+
+    // replenish if low (< 20% maximum batch) and credit available
+    if (!pn_link_get_drain(link) && pn_list_size(messenger->blocked) == 0 &&
+        messenger->credit > 0) {
+      const int max = per_link_credit(messenger);
+      const int lo_thresh = (int)(max * 0.2 + 0.5);
+      if (pn_link_remote_credit(link) < lo_thresh) {
+        const int more =
+            pn_min(messenger->credit, max - pn_link_remote_credit(link));
+        messenger->credit -= more;
+        messenger->distributed += more;
+        pn_link_flow(link, more);
+      }
+    }
+    // check if blocked
+    if (pn_list_index(messenger->blocked, link) < 0 &&
+        pn_link_remote_credit(link) == 0) {
+      pn_list_remove(messenger->credited, link);
+      if (pn_link_get_drain(link)) {
+        pn_link_set_drain(link, false);
+        assert(messenger->draining > 0);
+        messenger->draining--;
+      }
+      pn_list_add(messenger->blocked, link);
     }
-    pn_list_add(messenger->blocked, link);
   }
 
   if (n != PN_EOS) {
@@ -2126,7 +2134,9 @@ int pn_messenger_recv(pn_messenger_t *me
     return pn_error_format(messenger->error, PN_STATE_ERR, "no valid sources");
 
   // re-compute credit, and update credit scheduler
-  if (n == -1) {
+  if (n == -2) {
+    messenger->credit_mode = LINK_CREDIT_MANUAL;
+  } else if (n == -1) {
     messenger->credit_mode = LINK_CREDIT_AUTO;
   } else {
     messenger->credit_mode = LINK_CREDIT_EXPLICIT;



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org