You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rh...@apache.org on 2014/09/23 19:58:35 UTC
svn commit: r1627102 - /qpid/proton/trunk/proton-c/src/messenger/messenger.c
Author: rhs
Date: Tue Sep 23 17:58:34 2014
New Revision: 1627102
URL: http://svn.apache.org/r1627102
Log:
PROTON-679: patch from dominic for manual credit mode
Modified:
qpid/proton/trunk/proton-c/src/messenger/messenger.c
Modified: qpid/proton/trunk/proton-c/src/messenger/messenger.c
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/messenger/messenger.c?rev=1627102&r1=1627101&r2=1627102&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/messenger/messenger.c (original)
+++ qpid/proton/trunk/proton-c/src/messenger/messenger.c Tue Sep 23 17:58:34 2014
@@ -54,10 +54,11 @@ typedef struct {
} pn_address_t;
// algorithm for granting credit to receivers
-typedef enum {
+typedef enum {
// pn_messenger_recv( X ), where:
- LINK_CREDIT_EXPLICIT, // X > 0
- LINK_CREDIT_AUTO // X == -1
+ LINK_CREDIT_EXPLICIT, // X > 0
+ LINK_CREDIT_AUTO, // X == -1
+ LINK_CREDIT_MANUAL // X == -2
} pn_link_credit_mode_t;
struct pn_messenger_t {
@@ -854,6 +855,8 @@ bool pn_messenger_flow(pn_messenger_t *m
const int used = messenger->distributed + pn_messenger_incoming(messenger);
if (max > used)
messenger->credit = max - used;
+ } else if (messenger->credit_mode == LINK_CREDIT_MANUAL) {
+ return false;
}
const int batch = per_link_credit(messenger);
@@ -1002,33 +1005,38 @@ int pni_pump_in(pn_messenger_t *messenge
n = pn_link_recv(receiver, encoded + pending, 1);
pn_link_advance(receiver);
- // account for the used credit
- assert( ctx );
- assert( messenger->distributed );
- messenger->distributed--;
-
pn_link_t *link = receiver;
- // replenish if low (< 20% maximum batch) and credit available
- if (!pn_link_get_drain(link) && pn_list_size(messenger->blocked) == 0 && messenger->credit > 0) {
- const int max = per_link_credit(messenger);
- const int lo_thresh = (int)(max * 0.2 + 0.5);
- if (pn_link_remote_credit(link) < lo_thresh) {
- const int more = pn_min(messenger->credit, max - pn_link_remote_credit(link));
- messenger->credit -= more;
- messenger->distributed += more;
- pn_link_flow(link, more);
- }
- }
- // check if blocked
- if (pn_list_index(messenger->blocked, link) < 0 && pn_link_remote_credit(link) == 0) {
- pn_list_remove(messenger->credited, link);
- if (pn_link_get_drain(link)) {
- pn_link_set_drain(link, false);
- assert( messenger->draining > 0 );
- messenger->draining--;
+ if (messenger->credit_mode != LINK_CREDIT_MANUAL) {
+ // account for the used credit
+ assert(ctx);
+ assert(messenger->distributed);
+ messenger->distributed--;
+
+ // replenish if low (< 20% maximum batch) and credit available
+ if (!pn_link_get_drain(link) && pn_list_size(messenger->blocked) == 0 &&
+ messenger->credit > 0) {
+ const int max = per_link_credit(messenger);
+ const int lo_thresh = (int)(max * 0.2 + 0.5);
+ if (pn_link_remote_credit(link) < lo_thresh) {
+ const int more =
+ pn_min(messenger->credit, max - pn_link_remote_credit(link));
+ messenger->credit -= more;
+ messenger->distributed += more;
+ pn_link_flow(link, more);
+ }
+ }
+ // check if blocked
+ if (pn_list_index(messenger->blocked, link) < 0 &&
+ pn_link_remote_credit(link) == 0) {
+ pn_list_remove(messenger->credited, link);
+ if (pn_link_get_drain(link)) {
+ pn_link_set_drain(link, false);
+ assert(messenger->draining > 0);
+ messenger->draining--;
+ }
+ pn_list_add(messenger->blocked, link);
}
- pn_list_add(messenger->blocked, link);
}
if (n != PN_EOS) {
@@ -2126,7 +2134,9 @@ int pn_messenger_recv(pn_messenger_t *me
return pn_error_format(messenger->error, PN_STATE_ERR, "no valid sources");
// re-compute credit, and update credit scheduler
- if (n == -1) {
+ if (n == -2) {
+ messenger->credit_mode = LINK_CREDIT_MANUAL;
+ } else if (n == -1) {
messenger->credit_mode = LINK_CREDIT_AUTO;
} else {
messenger->credit_mode = LINK_CREDIT_EXPLICIT;
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org