You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rh...@apache.org on 2013/10/15 19:35:10 UTC
svn commit: r1532452 - in /qpid/proton/trunk: proton-c/bindings/python/
proton-c/include/proton/ proton-c/src/ proton-c/src/engine/
proton-c/src/messenger/ tests/python/proton_tests/
Author: rhs
Date: Tue Oct 15 17:35:09 2013
New Revision: 1532452
URL: http://svn.apache.org/r1532452
Log:
PROTON-200: modified messenger's credit distribution algorithm to cope with credit scarce scenarios
Modified:
qpid/proton/trunk/proton-c/bindings/python/proton.py
qpid/proton/trunk/proton-c/include/proton/engine.h
qpid/proton/trunk/proton-c/include/proton/messenger.h
qpid/proton/trunk/proton-c/src/engine/engine-internal.h
qpid/proton/trunk/proton-c/src/engine/engine.c
qpid/proton/trunk/proton-c/src/messenger/messenger.c
qpid/proton/trunk/proton-c/src/util.h
qpid/proton/trunk/tests/python/proton_tests/messenger.py
Modified: qpid/proton/trunk/proton-c/bindings/python/proton.py
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/bindings/python/proton.py?rev=1532452&r1=1532451&r2=1532452&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/bindings/python/proton.py (original)
+++ qpid/proton/trunk/proton-c/bindings/python/proton.py Tue Oct 15 17:35:09 2013
@@ -470,6 +470,10 @@ send. Defaults to zero.
self._check(err)
return True
+ @property
+ def receiving(self):
+ return pn_messenger_receiving(self._mng)
+
def interrupt(self):
self._check(pn_messenger_interrupt(self._mng))
Modified: qpid/proton/trunk/proton-c/include/proton/engine.h
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/include/proton/engine.h?rev=1532452&r1=1532451&r2=1532452&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/include/proton/engine.h (original)
+++ qpid/proton/trunk/proton-c/include/proton/engine.h Tue Oct 15 17:35:09 2013
@@ -475,6 +475,7 @@ PN_EXTERN pn_delivery_t *pn_link_current
PN_EXTERN bool pn_link_advance(pn_link_t *link);
PN_EXTERN int pn_link_credit(pn_link_t *link);
PN_EXTERN int pn_link_queued(pn_link_t *link);
+PN_EXTERN int pn_link_remote_credit(pn_link_t *link);
PN_EXTERN int pn_link_available(pn_link_t *link);
PN_EXTERN pn_snd_settle_mode_t pn_link_snd_settle_mode(pn_link_t *link);
PN_EXTERN pn_rcv_settle_mode_t pn_link_rcv_settle_mode(pn_link_t *link);
@@ -492,6 +493,7 @@ PN_EXTERN void pn_link_close(pn_link_t *
PN_EXTERN void pn_link_free(pn_link_t *sender);
PN_EXTERN void *pn_link_get_context(pn_link_t *link);
PN_EXTERN void pn_link_set_context(pn_link_t *link, void *context);
+PN_EXTERN bool pn_link_get_drain(pn_link_t *link);
// sender
PN_EXTERN void pn_link_offered(pn_link_t *sender, int credit);
@@ -502,6 +504,7 @@ PN_EXTERN int pn_link_drained(pn_link_t
// receiver
PN_EXTERN void pn_link_flow(pn_link_t *receiver, int credit);
PN_EXTERN void pn_link_drain(pn_link_t *receiver, int credit);
+PN_EXTERN void pn_link_set_drain(pn_link_t *receiver, bool drain);
PN_EXTERN ssize_t pn_link_recv(pn_link_t *receiver, char *bytes, size_t n);
PN_EXTERN bool pn_link_draining(pn_link_t *receiver);
Modified: qpid/proton/trunk/proton-c/include/proton/messenger.h
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/include/proton/messenger.h?rev=1532452&r1=1532451&r2=1532452&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/include/proton/messenger.h (original)
+++ qpid/proton/trunk/proton-c/include/proton/messenger.h Tue Oct 15 17:35:09 2013
@@ -395,7 +395,7 @@ PN_EXTERN int pn_messenger_send(pn_messe
* messenger is in blocking mode, this call will block until at least
* one message is available in the incoming queue.
*
- * Each call to pn_messenger_recv replaces the previos receive
+ * Each call to pn_messenger_recv replaces the previous receive
* operation, so pn_messenger_recv(messenger, 0) will cancel any
* outstanding receive.
*
@@ -412,8 +412,11 @@ PN_EXTERN int pn_messenger_send(pn_messe
*/
PN_EXTERN int pn_messenger_recv(pn_messenger_t *messenger, int limit);
-/** Returns the number of messages that was requested by
- * the most recent call to pn_messenger_recv.
+/** Returns the capacity of the incoming message queue of
+ * messenger. Note this count does not include those messages already
+ * available on the incoming queue (@see
+ * pn_messenger_incoming()). Rather it returns the number of incoming
+ * queue entries available for receiving messages
*
* @param[in] messenger the messenger
*/
Modified: qpid/proton/trunk/proton-c/src/engine/engine-internal.h
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/engine/engine-internal.h?rev=1532452&r1=1532451&r2=1532452&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/engine/engine-internal.h (original)
+++ qpid/proton/trunk/proton-c/src/engine/engine-internal.h Tue Oct 15 17:35:09 2013
@@ -240,6 +240,7 @@ struct pn_link_t {
pn_sequence_t available;
pn_sequence_t credit;
pn_sequence_t queued;
+ bool drain_flag_mode; // receiver only
bool drain;
int drained; // number of drained credits
void *context;
Modified: qpid/proton/trunk/proton-c/src/engine/engine.c
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/engine/engine.c?rev=1532452&r1=1532451&r2=1532452&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/engine/engine.c (original)
+++ qpid/proton/trunk/proton-c/src/engine/engine.c Tue Oct 15 17:35:09 2013
@@ -680,6 +680,7 @@ pn_link_t *pn_link_new(int type, pn_sess
link->credit = 0;
link->queued = 0;
link->drain = false;
+ link->drain_flag_mode = true;
link->drained = 0;
link->context = 0;
link->snd_settle_mode = PN_SND_MIXED;
@@ -1179,6 +1180,18 @@ int pn_link_queued(pn_link_t *link)
return link ? link->queued : 0;
}
+int pn_link_remote_credit(pn_link_t *link)
+{
+ assert(link);
+ return link->credit - link->queued;
+}
+
+bool pn_link_get_drain(pn_link_t *link)
+{
+ assert(link);
+ return link->drain;
+}
+
pn_snd_settle_mode_t pn_link_snd_settle_mode(pn_link_t *link)
{
return link ? (pn_snd_settle_mode_t)link->snd_settle_mode
@@ -1297,10 +1310,13 @@ ssize_t pn_link_recv(pn_link_t *receiver
void pn_link_flow(pn_link_t *receiver, int credit)
{
- if (receiver && pn_link_is_receiver(receiver)) {
- receiver->credit += credit;
- receiver->drain = false;
- pn_modified(receiver->session->connection, &receiver->endpoint);
+ assert(receiver);
+ assert(pn_link_is_receiver(receiver));
+ receiver->credit += credit;
+ pn_modified(receiver->session->connection, &receiver->endpoint);
+ if (!receiver->drain_flag_mode) {
+ pn_link_set_drain(receiver, false);
+ receiver->drain_flag_mode = false;
}
}
@@ -1308,8 +1324,18 @@ void pn_link_drain(pn_link_t *receiver,
{
assert(receiver);
assert(pn_link_is_receiver(receiver));
+ pn_link_set_drain(receiver, true);
pn_link_flow(receiver, credit);
- receiver->drain = true;
+ receiver->drain_flag_mode = false;
+}
+
+void pn_link_set_drain(pn_link_t *receiver, bool drain)
+{
+ assert(receiver);
+ assert(pn_link_is_receiver(receiver));
+ receiver->drain = drain;
+ pn_modified(receiver->session->connection, &receiver->endpoint);
+ receiver->drain_flag_mode = true;
}
bool pn_link_draining(pn_link_t *receiver)
Modified: qpid/proton/trunk/proton-c/src/messenger/messenger.c
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/messenger/messenger.c?rev=1532452&r1=1532451&r2=1532452&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/messenger/messenger.c (original)
+++ qpid/proton/trunk/proton-c/src/messenger/messenger.c Tue Oct 15 17:35:09 2013
@@ -35,6 +35,8 @@
#include "store.h"
#include "transform.h"
+typedef struct pn_link_ctx_t pn_link_ctx_t;
+
typedef struct {
pn_string_t *text;
bool passive;
@@ -46,6 +48,13 @@ typedef struct {
char *name;
} pn_address_t;
+// algorithm for granting credit to receivers
+typedef enum {
+ // pn_messenger_recv( X ), where:
+ LINK_CREDIT_EXPLICIT, // X > 0
+ LINK_CREDIT_AUTO // X == -1
+} pn_link_credit_mode_t;
+
struct pn_messenger_t {
char *name;
char *certificate;
@@ -56,10 +65,15 @@ struct pn_messenger_t {
bool blocking;
pn_driver_t *driver;
int send_threshold;
- int receiving;
- int credit_batch;
- int credit;
- int distributed;
+ pn_link_credit_mode_t credit_mode;
+ int credit_batch; // when LINK_CREDIT_AUTO
+ int credit; // available
+ int distributed; // credit
+ int receivers; // # receiver links
+ int draining; // # links in drain state
+ pn_list_t *credited;
+ pn_list_t *blocked;
+ pn_timestamp_t next_drain;
uint64_t next_tag;
pni_store_t *outgoing;
pni_store_t *incoming;
@@ -125,9 +139,11 @@ typedef struct {
char *pass;
char *host;
char *port;
+ pn_connector_t *connector;
} pn_connection_ctx_t;
static pn_connection_ctx_t *pn_connection_ctx(pn_connection_t *conn,
+ pn_connector_t *connector,
const char *scheme,
const char *user,
const char *pass,
@@ -142,6 +158,7 @@ static pn_connection_ctx_t *pn_connectio
ctx->pass = pn_strdup(pass);
ctx->host = pn_strdup(host);
ctx->port = pn_strdup(port);
+ ctx->connector = connector;
pn_connection_set_context(conn, ctx);
return ctx;
}
@@ -174,6 +191,53 @@ static char *build_name(const char *name
}
}
+struct pn_link_ctx_t {
+ pn_subscription_t *subscription;
+};
+
+// compute the maximum amount of credit each receiving link is
+// entitled to. The actual credit given to the link depends on what
+// amount of credit is actually available.
+static int per_link_credit( pn_messenger_t *messenger )
+{
+ if (messenger->receivers == 0) return 0;
+ int total = messenger->credit + messenger->distributed;
+ return pn_max(total/messenger->receivers, 1);
+}
+
+static void link_ctx_setup( pn_messenger_t *messenger,
+ pn_connection_t *connection,
+ pn_link_t *link )
+{
+ if (pn_link_is_receiver(link)) {
+ messenger->receivers++;
+ pn_link_ctx_t *ctx = (pn_link_ctx_t *) calloc(1, sizeof(pn_link_ctx_t));
+ assert( ctx );
+ assert( !pn_link_get_context(link) );
+ pn_link_set_context( link, ctx );
+ pn_list_add(messenger->blocked, link);
+ }
+}
+
+static void link_ctx_release( pn_messenger_t *messenger, pn_link_t *link )
+{
+ if (pn_link_is_receiver(link)) {
+ assert( messenger->receivers > 0 );
+ messenger->receivers--;
+ pn_link_ctx_t *ctx = (pn_link_ctx_t *) pn_link_get_context( link );
+ assert( ctx );
+ if (pn_link_get_drain(link)) {
+ pn_link_set_drain(link, false);
+ assert( messenger->draining > 0 );
+ messenger->draining--;
+ }
+ pn_list_remove(messenger->credited, link);
+ pn_list_remove(messenger->blocked, link);
+ pn_link_set_context( link, NULL );
+ free( ctx );
+ }
+}
+
pn_messenger_t *pn_messenger(const char *name)
{
pn_messenger_t *m = (pn_messenger_t *) malloc(sizeof(pn_messenger_t));
@@ -187,10 +251,15 @@ pn_messenger_t *pn_messenger(const char
m->timeout = -1;
m->blocking = true;
m->driver = pn_driver();
- m->receiving = 0;
+ m->credit_mode = LINK_CREDIT_EXPLICIT;
m->credit_batch = 1024;
m->credit = 0;
m->distributed = 0;
+ m->receivers = 0;
+ m->draining = 0;
+ m->credited = pn_list(0, 0);
+ m->blocked = pn_list(0, 0);
+ m->next_drain = 0;
m->next_tag = 0;
m->outgoing = pni_store();
m->incoming = pni_store();
@@ -328,6 +397,8 @@ void pn_messenger_free(pn_messenger_t *m
free(messenger->subscriptions);
pn_free(messenger->rewrites);
pn_free(messenger->routes);
+ pn_free(messenger->credited);
+ pn_free(messenger->blocked);
free(messenger);
}
}
@@ -347,55 +418,99 @@ pn_error_t *pn_messenger_error(pn_messen
return messenger->error;
}
-void pn_messenger_flow(pn_messenger_t *messenger)
-{
- int link_ct = 0;
- pn_connector_t *ctor = pn_connector_head(messenger->driver);
- while (ctor) {
- pn_connection_t *conn = pn_connector_connection(ctor);
-
- pn_link_t *link = pn_link_head(conn, PN_LOCAL_ACTIVE);
- while (link) {
- if (pn_link_is_receiver(link)) link_ct++;
- link = pn_link_next(link, PN_LOCAL_ACTIVE);
+// Run the credit scheduler, grant flow as needed. Return True if
+// credit allocation for any link has changed.
+bool pn_messenger_flow(pn_messenger_t *messenger)
+{
+ bool updated = false;
+ if (messenger->receivers == 0) return updated;
+
+ if (messenger->credit_mode == LINK_CREDIT_AUTO) {
+ // replenish, but limit the max total messages buffered
+ const int max = messenger->receivers * messenger->credit_batch;
+ const int used = messenger->distributed + pn_messenger_incoming(messenger);
+ if (max > used)
+ messenger->credit = max - used;
+ }
+
+ // account for any credit left over after draining links has completed
+ if (messenger->draining > 0) {
+ for (size_t i = 0; i < pn_list_size(messenger->credited); i++) {
+ pn_link_t *link = (pn_link_t *) pn_list_get(messenger->credited, i);
+ if (pn_link_get_drain(link)) {
+ if (!pn_link_draining(link)) {
+ // drain completed!
+ int drained = pn_link_drained(link);
+ // printf("%s: drained %i from %p\n", messenger->name, drained, (void *) ctx->link);
+ messenger->distributed -= drained;
+ messenger->credit += drained;
+ pn_link_set_drain(link, false);
+ messenger->draining--;
+ pn_list_remove(messenger->credited, link);
+ pn_list_add(messenger->blocked, link);
+ }
+ }
}
- ctor = pn_connector_next(ctor);
}
- if (link_ct == 0) return;
-
- if (messenger->receiving == -1) {
- messenger->credit = link_ct * messenger->credit_batch - pn_messenger_incoming(messenger);
- } else {
- int total = messenger->credit + messenger->distributed;
- if (messenger->receiving > total)
- messenger->credit += (messenger->receiving - total);
+ const int batch = per_link_credit(messenger);
+ while (messenger->credit > 0 && pn_list_size(messenger->blocked)) {
+ pn_link_t *link = (pn_link_t *) pn_list_get(messenger->blocked, 0);
+ pn_list_del(messenger->blocked, 0, 1);
+
+ const int more = pn_min( messenger->credit, batch );
+ messenger->distributed += more;
+ messenger->credit -= more;
+ // printf("%s: flowing %i to %p\n", messenger->name, more, (void *) ctx->link);
+ pn_link_flow(link, more);
+ pn_list_add(messenger->credited, link);
+ pn_connection_t *conn = pn_session_connection(pn_link_session(link));
+ pn_connection_ctx_t *cctx;
+ cctx = (pn_connection_ctx_t *)pn_connection_get_context(conn);
+ // flow changed, must process it
+ pn_connector_process( cctx->connector );
+ updated = true;
}
- int batch = (messenger->credit < link_ct) ? 1
- : (messenger->credit/link_ct);
-
- ctor = pn_connector_head(messenger->driver);
- while (ctor) {
- pn_connection_t *conn = pn_connector_connection(ctor);
- pn_link_t *link = pn_link_head(conn, PN_LOCAL_ACTIVE);
- while (link) {
- if (pn_link_is_receiver(link)) {
+ if (!pn_list_size(messenger->blocked)) {
+ messenger->next_drain = 0;
+ } else {
+ // not enough credit for all links
+ if (!messenger->draining) {
+ // printf("%s: let's drain\n", messenger->name);
+ if (messenger->next_drain == 0) {
+ messenger->next_drain = pn_i_now() + 250;
+ // printf("%s: initializing next_drain\n", messenger->name);
+ } else if (messenger->next_drain <= pn_i_now()) {
+ // initiate drain, free up at most enough to satisfy blocked
+ messenger->next_drain = 0;
+ int needed = pn_list_size(messenger->blocked) * batch;
+ for (size_t i = 0; i < pn_list_size(messenger->credited); i++) {
+ pn_link_t *link = (pn_link_t *) pn_list_get(messenger->credited, i);
+ if (!pn_link_get_drain(link)) {
+ // printf("%s: initiating drain from %p\n", messenger->name, (void *) ctx->link);
+ pn_link_set_drain(link, true);
+ needed -= pn_link_remote_credit(link);
+ messenger->draining++;
+ pn_connection_t *conn =
+ pn_session_connection(pn_link_session(link));
+ pn_connection_ctx_t *cctx;
+ cctx = (pn_connection_ctx_t *)pn_connection_get_context(conn);
+ // drain requested on link, must process it
+ pn_connector_process( cctx->connector );
+ updated = true;
+ }
- int have = pn_link_credit(link);
- if (have < batch) {
- int need = batch - have;
- int amount = (messenger->credit < need) ? messenger->credit : need;
- pn_link_flow(link, amount);
- messenger->distributed += amount;
- messenger->credit -= amount;
- if (messenger->credit == 0) return;
+ if (needed <= 0) {
+ break;
+ }
}
+ } else {
+ // printf("%s: delaying\n", messenger->name);
}
- link = pn_link_next(link, PN_LOCAL_ACTIVE);
}
- ctor = pn_connector_next(ctor);
}
+ return updated;
}
static void pn_transport_config(pn_messenger_t *messenger,
@@ -465,8 +580,8 @@ int pni_pump_in(pn_messenger_t *messenge
pn_buffer_t *buf = pni_entry_bytes(entry);
pni_entry_set_delivery(entry, d);
- pn_subscription_t *sub = (pn_subscription_t *) pn_link_get_context(receiver);
- pni_entry_set_context(entry, sub);
+ pn_link_ctx_t *ctx = (pn_link_ctx_t *) pn_link_get_context( receiver );
+ pni_entry_set_context(entry, ctx ? ctx->subscription : NULL);
size_t pending = pn_delivery_pending(d);
int err = pn_buffer_ensure(buf, pending + 1);
@@ -480,6 +595,35 @@ int pni_pump_in(pn_messenger_t *messenge
}
n = pn_link_recv(receiver, encoded + pending, 1);
pn_link_advance(receiver);
+
+ // account for the used credit
+ assert( ctx );
+ assert( messenger->distributed );
+ messenger->distributed--;
+
+ pn_link_t *link = receiver;
+ // replenish if low (< 20% maximum batch) and credit available
+ if (!pn_link_get_drain(link) && pn_list_size(messenger->blocked) == 0 && messenger->credit > 0) {
+ const int max = per_link_credit(messenger);
+ const int lo_thresh = (int)(max * 0.2 + 0.5);
+ if (pn_link_remote_credit(link) < lo_thresh) {
+ const int more = pn_min(messenger->credit, max - pn_link_remote_credit(link));
+ messenger->credit -= more;
+ messenger->distributed += more;
+ pn_link_flow(link, more);
+ }
+ }
+ // check if blocked
+ if (pn_list_index(messenger->blocked, link) < 0 && pn_link_remote_credit(link) == 0) {
+ pn_list_remove(messenger->credited, link);
+ if (pn_link_get_drain(link)) {
+ pn_link_set_drain(link, false);
+ assert( messenger->draining > 0 );
+ messenger->draining--;
+ }
+ pn_list_add(messenger->blocked, link);
+ }
+
if (n != PN_EOS) {
return pn_error_format(messenger->error, n, "PN_EOS expected");
}
@@ -530,11 +674,12 @@ void pn_messenger_endpoints(pn_messenger
while (link) {
pn_terminus_copy(pn_link_source(link), pn_link_remote_source(link));
pn_terminus_copy(pn_link_target(link), pn_link_remote_target(link));
+ link_ctx_setup( messenger, conn, link );
pn_link_open(link);
if (pn_link_is_receiver(link)) {
pn_listener_t *listener = pn_connector_listener(ctor);
pn_listener_ctx_t *ctx = (pn_listener_ctx_t *) pn_listener_context(listener);
- pn_link_set_context(link, ctx ? ctx->subscription : NULL);
+ ((pn_link_ctx_t *)pn_link_get_context(link))->subscription = ctx ? ctx->subscription : NULL;
}
link = pn_link_next(link, PN_LOCAL_UNINIT);
}
@@ -547,8 +692,6 @@ void pn_messenger_endpoints(pn_messenger
link = pn_link_next(link, PN_LOCAL_ACTIVE | PN_REMOTE_ACTIVE);
}
- pn_messenger_flow(messenger);
-
ssn = pn_session_head(conn, PN_LOCAL_ACTIVE | PN_REMOTE_CLOSED);
while (ssn) {
pn_condition_report("SESSION", pn_session_remote_condition(ssn));
@@ -556,12 +699,16 @@ void pn_messenger_endpoints(pn_messenger
ssn = pn_session_next(ssn, PN_LOCAL_ACTIVE | PN_REMOTE_CLOSED);
}
- link = pn_link_head(conn, PN_LOCAL_ACTIVE | PN_REMOTE_CLOSED);
+ link = pn_link_head(conn, PN_REMOTE_CLOSED);
while (link) {
- pn_condition_report("LINK", pn_link_remote_condition(link));
- pn_link_close(link);
- // XXX: should free link
- link = pn_link_next(link, PN_LOCAL_ACTIVE | PN_REMOTE_CLOSED);
+ if (PN_LOCAL_ACTIVE | pn_link_state(link)) {
+ pn_condition_report("LINK", pn_link_remote_condition(link));
+ pn_link_close(link);
+ } else {
+ link_ctx_release( messenger, link );
+ pn_link_free(link);
+ }
+ link = pn_link_next(link, PN_REMOTE_CLOSED);
}
if (pn_connection_state(conn) == (PN_LOCAL_ACTIVE | PN_REMOTE_CLOSED)) {
@@ -585,6 +732,8 @@ void pn_messenger_endpoints(pn_messenger
} else if (pn_connector_closed(ctor) && !(pn_connection_state(conn) & PN_REMOTE_CLOSED)) {
pn_error_report("CONNECTION", "connection aborted");
}
+
+ pn_messenger_flow(messenger);
}
void pni_messenger_reclaim(pn_messenger_t *messenger, pn_connection_t *conn)
@@ -608,6 +757,8 @@ void pni_messenger_reclaim(pn_messenger_
d = pn_unsettled_next(d);
}
+ link_ctx_release(messenger, link);
+
link = pn_link_next(link, 0);
}
@@ -617,6 +768,7 @@ void pni_messenger_reclaim(pn_messenger_
pn_connection_t *pn_messenger_connection(pn_messenger_t *messenger,
+ pn_connector_t *connector,
char *scheme,
char *user,
char *pass,
@@ -625,7 +777,7 @@ pn_connection_t *pn_messenger_connection
{
pn_connection_t *connection = pn_connection();
if (!connection) return NULL;
- pn_connection_ctx(connection, scheme, user, pass, host, port);
+ pn_connection_ctx(connection, connector, scheme, user, pass, host, port);
pn_connection_set_container(connection, messenger->name);
pn_connection_set_hostname(connection, host);
@@ -651,6 +803,17 @@ int pn_messenger_tsync(pn_messenger_t *m
int remaining = deadline - now;
if (pred || (timeout >= 0 && remaining < 0)) break;
+ // Update the credit scheduler. If the scheduler detects credit
+ // imbalance on the links, wake up in time to service credit drain
+ pn_messenger_flow(messenger);
+ if (messenger->next_drain) {
+ if (now >= messenger->next_drain)
+ remaining = 0;
+ else {
+ const int delay = messenger->next_drain - now;
+ remaining = (remaining < 0) ? delay : pn_min( remaining, delay );
+ }
+ }
int error = pn_driver_wait(messenger->driver, remaining);
if (error && error != PN_INTR) return error;
@@ -681,7 +844,7 @@ int pn_messenger_tsync(pn_messenger_t *m
pn_sasl_server(sasl);
pn_sasl_done(sasl, PN_SASL_OK);
pn_connection_t *conn =
- pn_messenger_connection(messenger, scheme, NULL, NULL, NULL, NULL);
+ pn_messenger_connection(messenger, c, scheme, NULL, NULL, NULL, NULL);
pn_connector_set_connection(c, conn);
}
@@ -695,7 +858,6 @@ int pn_messenger_tsync(pn_messenger_t *m
pn_connector_free(c);
if (conn) {
pni_messenger_reclaim(messenger, conn);
- pn_messenger_flow(messenger);
}
} else {
pn_connector_process(c);
@@ -889,7 +1051,7 @@ pn_connection_t *pn_messenger_resolve(pn
}
pn_connection_t *connection =
- pn_messenger_connection(messenger, scheme, user, pass, host, port);
+ pn_messenger_connection(messenger, connector, scheme, user, pass, host, port);
pn_transport_config(messenger, connector, connection);
pn_connection_open(connection);
pn_connector_set_connection(connector, connection);
@@ -941,16 +1103,18 @@ pn_link_t *pn_messenger_link(pn_messenge
link = sender ? pn_sender(ssn, "sender-xxx") : pn_receiver(ssn, "receiver-xxx");
if ((sender && pn_messenger_get_outgoing_window(messenger)) ||
(!sender && pn_messenger_get_incoming_window(messenger))) {
- // use explicit settlement via dispositions (not pre-settled)
- pn_link_set_snd_settle_mode( link, PN_SND_UNSETTLED );
- pn_link_set_rcv_settle_mode( link, PN_RCV_SECOND );
+ // use explicit settlement via dispositions (not pre-settled)
+ pn_link_set_snd_settle_mode( link, PN_SND_UNSETTLED );
+ pn_link_set_rcv_settle_mode( link, PN_RCV_SECOND );
}
// XXX
pn_terminus_set_address(pn_link_target(link), name);
pn_terminus_set_address(pn_link_source(link), name);
+ link_ctx_setup( messenger, connection, link );
if (!sender) {
- pn_subscription_t *sub = pn_subscription(messenger, NULL);
- pn_link_set_context(link, sub);
+ pn_link_ctx_t *ctx = (pn_link_ctx_t *)pn_link_get_context(link);
+ assert( ctx );
+ ctx->subscription = pn_subscription(messenger, NULL);
}
pn_link_open(link);
return link;
@@ -991,8 +1155,8 @@ pn_subscription_t *pn_messenger_subscrib
} else {
pn_link_t *src = pn_messenger_source(messenger, source);
if (!src) return NULL;
- pn_subscription_t *sub = (pn_subscription_t *) pn_link_get_context(src);
- return sub;
+ pn_link_ctx_t *ctx = (pn_link_ctx_t *) pn_link_get_context( src );
+ return ctx ? ctx->subscription : NULL;
}
}
@@ -1048,7 +1212,11 @@ static void outward_munge(pn_messenger_t
int pni_pump_out(pn_messenger_t *messenger, const char *address, pn_link_t *sender)
{
pni_entry_t *entry = pni_store_get(messenger->outgoing, address);
- if (!entry) return 0;
+ if (!entry) {
+ pn_link_drained(sender);
+ return 0;
+ }
+
pn_buffer_t *buf = pni_entry_bytes(entry);
pn_bytes_t bytes = pn_buffer_bytes(buf);
char *encoded = bytes.start;
@@ -1292,7 +1460,17 @@ int pn_messenger_recv(pn_messenger_t *me
if (messenger->blocking && !pn_listener_head(messenger->driver)
&& !pn_connector_head(messenger->driver))
return pn_error_format(messenger->error, PN_STATE_ERR, "no valid sources");
- messenger->receiving = n;
+
+ // re-compute credit, and update credit scheduler
+ if (n == -1) {
+ messenger->credit_mode = LINK_CREDIT_AUTO;
+ } else {
+ messenger->credit_mode = LINK_CREDIT_EXPLICIT;
+ if (n > messenger->distributed)
+ messenger->credit = n - messenger->distributed;
+ else // cancel unallocated
+ messenger->credit = 0;
+ }
pn_messenger_flow(messenger);
int err = pn_messenger_sync(messenger, pn_messenger_rcvd);
if (err) return err;
@@ -1309,7 +1487,7 @@ int pn_messenger_recv(pn_messenger_t *me
int pn_messenger_receiving(pn_messenger_t *messenger)
{
assert(messenger);
- return messenger->receiving;
+ return messenger->credit + messenger->distributed;
}
int pn_messenger_get(pn_messenger_t *messenger, pn_message_t *msg)
@@ -1326,7 +1504,6 @@ int pn_messenger_get(pn_messenger_t *mes
const char *encoded = bytes.start;
size_t size = bytes.size;
- messenger->distributed--;
messenger->incoming_subscription = (pn_subscription_t *) pni_entry_get_context(entry);
if (msg) {
Modified: qpid/proton/trunk/proton-c/src/util.h
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/util.h?rev=1532452&r1=1532451&r2=1532452&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/util.h (original)
+++ qpid/proton/trunk/proton-c/src/util.h Tue Oct 15 17:35:09 2013
@@ -97,8 +97,6 @@ pn_timestamp_t pn_timestamp_min(pn_times
LL_HEAD(ROOT, LIST) = (NODE)-> LIST ## _next; \
if ((NODE) == LL_TAIL(ROOT, LIST)) \
LL_TAIL(ROOT, LIST) = (NODE)-> LIST ## _prev; \
- (NODE)-> LIST ## _next = NULL; \
- (NODE)-> LIST ## _prev = NULL; \
}
char *pn_strdup(const char *src);
Modified: qpid/proton/trunk/tests/python/proton_tests/messenger.py
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/tests/python/proton_tests/messenger.py?rev=1532452&r1=1532451&r2=1532452&view=diff
==============================================================================
--- qpid/proton/trunk/tests/python/proton_tests/messenger.py (original)
+++ qpid/proton/trunk/tests/python/proton_tests/messenger.py Tue Oct 15 17:35:09 2013
@@ -28,6 +28,7 @@ class Test(common.Test):
def setup(self):
self.server_credit = 10
self.server_received = 0
+ self.server_finite_credit = False
self.server = Messenger("server")
self.server.timeout = self.timeout
self.server.start()
@@ -68,6 +69,14 @@ REJECT_ME = "*REJECT-ME*"
class MessengerTest(Test):
def run_server(self):
+ if self.server_finite_credit:
+ self._run_server_finite_credit()
+ else:
+ self._run_server_recv()
+
+ def _run_server_recv(self):
+ """ Use recv() to replenish credit each time the server waits
+ """
msg = Message()
try:
while self.running:
@@ -81,6 +90,23 @@ class MessengerTest(Test):
self.server.stop()
self.running = False
+ def _run_server_finite_credit(self):
+ """ Grant credit once, process until credit runs out
+ """
+ msg = Message()
+ self.server_is_running_event.set()
+ try:
+ self.server.recv(self.server_credit)
+ while self.running:
+ # do not grant additional credit (eg. call recv())
+ self.process_incoming(msg)
+ self.server.work()
+ except Interrupt:
+ pass
+ finally:
+ self.server.stop()
+ self.running = False
+
def process_incoming(self, msg):
while self.server.incoming:
self.server.get(msg)
@@ -568,6 +594,57 @@ class MessengerTest(Test):
self.client.rewrite("*", "$1")
self._testRewrite("amqp://user:pass@host", "amqp://user:pass@host")
+ def testCreditBlockingRebalance(self):
+ """ The server is given a fixed amount of credit, and runs until that
+ credit is exhausted.
+ """
+ if sys.platform.startswith("java"):
+ raise Skipped("Skipping testCreditBlockingRebalance - credit scheduler TBD for Java Messenger")
+
+ self.server_finite_credit = True
+ self.server_credit = 11
+ self.start()
+
+ # put one message out on "Link1" - since there are no other links, it
+ # should get all the credit (10 after sending)
+ msg = Message()
+ msg.address="amqp://0.0.0.0:12345/Link1"
+ msg.subject="Hello World!"
+ body = "First the world, then the galaxy!"
+ msg.body = body
+ self.client.put(msg)
+ self.client.send()
+ self.client.recv(1)
+ assert self.client.incoming == 1
+
+ # Now attempt to exhaust credit using a different link
+ for i in range(10):
+ msg.address="amqp://0.0.0.0:12345/Link2"
+ self.client.put(msg)
+ self.client.send()
+
+ deadline = time() + self.timeout
+ count = 0
+ while count < 11 and time() < deadline:
+ self.client.recv(-1)
+ while self.client.incoming:
+ self.client.get(msg)
+ count += 1
+ assert count == 11, count
+
+ # now attempt to send one more. There isn't enough credit, so it should
+ # not be sent
+ self.client.timeout = 1
+ msg.address="amqp://0.0.0.0:12345/Link2"
+ self.client.put(msg)
+ try:
+ self.client.send()
+ assert False, "expected client to time out in send()"
+ except Timeout:
+ pass
+ assert self.client.outgoing == 1
+
+
class NBMessengerTest(common.Test):
def setup(self):
@@ -652,3 +729,170 @@ class NBMessengerTest(common.Test):
self.client.get(msg2)
assert msg2.address == msg.address
assert msg2.body == msg.body
+
+ def testCreditAutoBackpressure(self):
+ """ Verify that use of automatic credit (pn_messenger_recv(-1)) does not
+ fill the incoming queue indefinitely. If the receiver does not 'get' the
+ message, eventually the sender will block. See PROTON-350 """
+ self.server.recv()
+ msg = Message()
+ msg.address = self.address
+ deadline = time() + self.timeout
+ while time() < deadline:
+ old = self.server.incoming
+ for j in xrange(1001):
+ self.client.put(msg)
+ self.pump()
+ if old == self.server.incoming:
+ break;
+ assert old == self.server.incoming, "Backpressure not active!"
+
+ def testCreditRedistribution(self):
+ """ Verify that a fixed amount of credit will redistribute to new
+ links.
+ """
+ if sys.platform.startswith("java"):
+ raise Skipped("Skipping testCreditRedistribution - credit scheduler TBD for Java Messenger")
+
+ self.server.recv( 5 )
+
+ # first link will get all credit
+ msg1 = Message()
+ msg1.address = self.address + "/msg1"
+ self.client.put(msg1)
+ self.pump()
+ assert self.server.incoming == 1, self.server.incoming
+ assert self.server.receiving == 4, self.server.receiving
+
+ # no credit left over for this link
+ msg2 = Message()
+ msg2.address = self.address + "/msg2"
+ self.client.put(msg2)
+ self.pump()
+ assert self.server.incoming == 1, self.server.incoming
+ assert self.server.receiving == 4, self.server.receiving
+
+ # eventually, credit will rebalance and the new link will send
+ deadline = time() + self.timeout
+ while time() < deadline:
+ sleep(.1)
+ self.pump()
+ if self.server.incoming == 2:
+ break;
+ assert self.server.incoming == 2, self.server.incoming
+ assert self.server.receiving == 3, self.server.receiving
+
+ def testCreditReclaim(self):
+ """ Verify that credit is reclaimed when a link with outstanding credit is
+ torn down.
+ """
+ if sys.platform.startswith("java"):
+ raise Skipped("Skipping testCreditReclaim - credit scheduler TBD for Java Messenger")
+
+ self.server.recv( 9 )
+
+ # first link will get all credit
+ msg1 = Message()
+ msg1.address = self.address + "/msg1"
+ self.client.put(msg1)
+ self.pump()
+ assert self.server.incoming == 1, self.server.incoming
+ assert self.server.receiving == 8, self.server.receiving
+
+ # no credit left over for this link
+ msg2 = Message()
+ msg2.address = self.address + "/msg2"
+ self.client.put(msg2)
+ self.pump()
+ assert self.server.incoming == 1, self.server.incoming
+ assert self.server.receiving == 8, self.server.receiving
+
+ # and none for this new client
+ client2 = Messenger("client2")
+ client2.blocking = False
+ client2.start()
+ msg3 = Message()
+ msg3.address = self.address + "/msg3"
+ client2.put(msg3)
+ while client2.work(0):
+ self.pump()
+ assert self.server.incoming == 1, self.server.incoming
+ assert self.server.receiving == 8, self.server.receiving
+
+ # eventually, credit will rebalance and all links will
+ # send a message
+ deadline = time() + self.timeout
+ while time() < deadline:
+ sleep(.1)
+ self.pump()
+ client2.work(0)
+ if self.server.incoming == 3:
+ break;
+ assert self.server.incoming == 3, self.server.incoming
+ assert self.server.receiving == 6, self.server.receiving
+
+ # now tear down client two, this should cause its outstanding credit to be
+ # made available to the other links
+ client2.stop()
+ self.pump()
+
+ for i in range(4):
+ self.client.put(msg1)
+ self.client.put(msg2)
+
+ # should exhaust all credit
+ deadline = time() + self.timeout
+ while time() < deadline:
+ sleep(.1)
+ self.pump()
+ if self.server.incoming == 9:
+ break;
+ assert self.server.incoming == 9, self.server.incoming
+ assert self.server.receiving == 0, self.server.receiving
+
+
+
+ def testCreditReplenish(self):
+ """ When extra credit is available it should be granted to the first
+ link that can use it.
+ """
+ if sys.platform.startswith("java"):
+ raise Skipped("Skipping testCreditReplenish - credit scheduler TBD for Java Messenger")
+
+ # create three links
+ msg = Message()
+ for i in range(3):
+ msg.address = self.address + "/%d" % i
+ self.client.put(msg)
+
+ self.server.recv( 50 ) # 50/3 = 16 per link + 2 extra
+
+ self.pump()
+ assert self.server.incoming == 3, self.server.incoming
+ assert self.server.receiving == 47, self.server.receiving
+
+ # 47/3 = 15 per link, + 2 extra
+
+ # verify one link can send 15 + the two extra (17)
+ for i in range(17):
+ msg.address = self.address + "/0"
+ self.client.put(msg)
+ self.pump()
+ assert self.server.incoming == 20, self.server.incoming
+ assert self.server.receiving == 30, self.server.receiving
+
+ # now verify that the remaining credit (30) will eventually rebalance
+ # across all links (10 per link)
+ for j in range(10):
+ for i in range(3):
+ msg.address = self.address + "/%d" % i
+ self.client.put(msg)
+
+ deadline = time() + self.timeout
+ while time() < deadline:
+ sleep(.1)
+ self.pump()
+ if self.server.incoming == 50:
+ break
+ assert self.server.incoming == 50, self.server.incoming
+ assert self.server.receiving == 0, self.server.receiving
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org