You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rh...@apache.org on 2013/10/15 19:35:10 UTC

svn commit: r1532452 - in /qpid/proton/trunk: proton-c/bindings/python/ proton-c/include/proton/ proton-c/src/ proton-c/src/engine/ proton-c/src/messenger/ tests/python/proton_tests/

Author: rhs
Date: Tue Oct 15 17:35:09 2013
New Revision: 1532452

URL: http://svn.apache.org/r1532452
Log:
PROTON-200: modified messenger's credit distribution algorithm to cope with credit scarce scenarios

Modified:
    qpid/proton/trunk/proton-c/bindings/python/proton.py
    qpid/proton/trunk/proton-c/include/proton/engine.h
    qpid/proton/trunk/proton-c/include/proton/messenger.h
    qpid/proton/trunk/proton-c/src/engine/engine-internal.h
    qpid/proton/trunk/proton-c/src/engine/engine.c
    qpid/proton/trunk/proton-c/src/messenger/messenger.c
    qpid/proton/trunk/proton-c/src/util.h
    qpid/proton/trunk/tests/python/proton_tests/messenger.py

Modified: qpid/proton/trunk/proton-c/bindings/python/proton.py
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/bindings/python/proton.py?rev=1532452&r1=1532451&r2=1532452&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/bindings/python/proton.py (original)
+++ qpid/proton/trunk/proton-c/bindings/python/proton.py Tue Oct 15 17:35:09 2013
@@ -470,6 +470,10 @@ send. Defaults to zero.
       self._check(err)
       return True
 
+  @property
+  def receiving(self):
+    return pn_messenger_receiving(self._mng)
+
   def interrupt(self):
     self._check(pn_messenger_interrupt(self._mng))
 

Modified: qpid/proton/trunk/proton-c/include/proton/engine.h
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/include/proton/engine.h?rev=1532452&r1=1532451&r2=1532452&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/include/proton/engine.h (original)
+++ qpid/proton/trunk/proton-c/include/proton/engine.h Tue Oct 15 17:35:09 2013
@@ -475,6 +475,7 @@ PN_EXTERN pn_delivery_t *pn_link_current
 PN_EXTERN bool pn_link_advance(pn_link_t *link);
 PN_EXTERN int pn_link_credit(pn_link_t *link);
 PN_EXTERN int pn_link_queued(pn_link_t *link);
+PN_EXTERN int pn_link_remote_credit(pn_link_t *link);
 PN_EXTERN int pn_link_available(pn_link_t *link);
 PN_EXTERN pn_snd_settle_mode_t pn_link_snd_settle_mode(pn_link_t *link);
 PN_EXTERN pn_rcv_settle_mode_t pn_link_rcv_settle_mode(pn_link_t *link);
@@ -492,6 +493,7 @@ PN_EXTERN void pn_link_close(pn_link_t *
 PN_EXTERN void pn_link_free(pn_link_t *sender);
 PN_EXTERN void *pn_link_get_context(pn_link_t *link);
 PN_EXTERN void pn_link_set_context(pn_link_t *link, void *context);
+PN_EXTERN bool pn_link_get_drain(pn_link_t *link);
 
 // sender
 PN_EXTERN void pn_link_offered(pn_link_t *sender, int credit);
@@ -502,6 +504,7 @@ PN_EXTERN int pn_link_drained(pn_link_t 
 // receiver
 PN_EXTERN void pn_link_flow(pn_link_t *receiver, int credit);
 PN_EXTERN void pn_link_drain(pn_link_t *receiver, int credit);
+PN_EXTERN void pn_link_set_drain(pn_link_t *receiver, bool drain);
 PN_EXTERN ssize_t pn_link_recv(pn_link_t *receiver, char *bytes, size_t n);
 PN_EXTERN bool pn_link_draining(pn_link_t *receiver);
 

Modified: qpid/proton/trunk/proton-c/include/proton/messenger.h
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/include/proton/messenger.h?rev=1532452&r1=1532451&r2=1532452&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/include/proton/messenger.h (original)
+++ qpid/proton/trunk/proton-c/include/proton/messenger.h Tue Oct 15 17:35:09 2013
@@ -395,7 +395,7 @@ PN_EXTERN int pn_messenger_send(pn_messe
  * messenger is in blocking mode, this call will block until at least
  * one message is available in the incoming queue.
  *
- * Each call to pn_messenger_recv replaces the previos receive
+ * Each call to pn_messenger_recv replaces the previous receive
  * operation, so pn_messenger_recv(messenger, 0) will cancel any
  * outstanding receive.
  *
@@ -412,8 +412,11 @@ PN_EXTERN int pn_messenger_send(pn_messe
  */
 PN_EXTERN int pn_messenger_recv(pn_messenger_t *messenger, int limit);
 
-/** Returns the number of messages that was requested by
- * the most recent call to pn_messenger_recv.
+/** Returns the capacity of the incoming message queue of
+ * messenger. Note this count does not include those messages already
+ * available on the incoming queue (@see
+ * pn_messenger_incoming()). Rather it returns the number of incoming
+ * queue entries available for receiving messages
  *
  * @param[in] messenger the messenger
  */

Modified: qpid/proton/trunk/proton-c/src/engine/engine-internal.h
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/engine/engine-internal.h?rev=1532452&r1=1532451&r2=1532452&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/engine/engine-internal.h (original)
+++ qpid/proton/trunk/proton-c/src/engine/engine-internal.h Tue Oct 15 17:35:09 2013
@@ -240,6 +240,7 @@ struct pn_link_t {
   pn_sequence_t available;
   pn_sequence_t credit;
   pn_sequence_t queued;
+  bool drain_flag_mode; // receiver only
   bool drain;
   int drained; // number of drained credits
   void *context;

Modified: qpid/proton/trunk/proton-c/src/engine/engine.c
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/engine/engine.c?rev=1532452&r1=1532451&r2=1532452&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/engine/engine.c (original)
+++ qpid/proton/trunk/proton-c/src/engine/engine.c Tue Oct 15 17:35:09 2013
@@ -680,6 +680,7 @@ pn_link_t *pn_link_new(int type, pn_sess
   link->credit = 0;
   link->queued = 0;
   link->drain = false;
+  link->drain_flag_mode = true;
   link->drained = 0;
   link->context = 0;
   link->snd_settle_mode = PN_SND_MIXED;
@@ -1179,6 +1180,18 @@ int pn_link_queued(pn_link_t *link)
   return link ? link->queued : 0;
 }
 
+int pn_link_remote_credit(pn_link_t *link)
+{
+  assert(link);
+  return link->credit - link->queued;
+}
+
+bool pn_link_get_drain(pn_link_t *link)
+{
+  assert(link);
+  return link->drain;
+}
+
 pn_snd_settle_mode_t pn_link_snd_settle_mode(pn_link_t *link)
 {
   return link ? (pn_snd_settle_mode_t)link->snd_settle_mode
@@ -1297,10 +1310,13 @@ ssize_t pn_link_recv(pn_link_t *receiver
 
 void pn_link_flow(pn_link_t *receiver, int credit)
 {
-  if (receiver && pn_link_is_receiver(receiver)) {
-    receiver->credit += credit;
-    receiver->drain = false;
-    pn_modified(receiver->session->connection, &receiver->endpoint);
+  assert(receiver);
+  assert(pn_link_is_receiver(receiver));
+  receiver->credit += credit;
+  pn_modified(receiver->session->connection, &receiver->endpoint);
+  if (!receiver->drain_flag_mode) {
+    pn_link_set_drain(receiver, false);
+    receiver->drain_flag_mode = false;
   }
 }
 
@@ -1308,8 +1324,18 @@ void pn_link_drain(pn_link_t *receiver, 
 {
   assert(receiver);
   assert(pn_link_is_receiver(receiver));
+  pn_link_set_drain(receiver, true);
   pn_link_flow(receiver, credit);
-  receiver->drain = true;
+  receiver->drain_flag_mode = false;
+}
+
+void pn_link_set_drain(pn_link_t *receiver, bool drain)
+{
+  assert(receiver);
+  assert(pn_link_is_receiver(receiver));
+  receiver->drain = drain;
+  pn_modified(receiver->session->connection, &receiver->endpoint);
+  receiver->drain_flag_mode = true;
 }
 
 bool pn_link_draining(pn_link_t *receiver)

Modified: qpid/proton/trunk/proton-c/src/messenger/messenger.c
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/messenger/messenger.c?rev=1532452&r1=1532451&r2=1532452&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/messenger/messenger.c (original)
+++ qpid/proton/trunk/proton-c/src/messenger/messenger.c Tue Oct 15 17:35:09 2013
@@ -35,6 +35,8 @@
 #include "store.h"
 #include "transform.h"
 
+typedef struct pn_link_ctx_t pn_link_ctx_t;
+
 typedef struct {
   pn_string_t *text;
   bool passive;
@@ -46,6 +48,13 @@ typedef struct {
   char *name;
 } pn_address_t;
 
+// algorithm for granting credit to receivers
+typedef  enum {
+  // pn_messenger_recv( X ), where:
+  LINK_CREDIT_EXPLICIT,  // X > 0
+  LINK_CREDIT_AUTO   // X == -1
+} pn_link_credit_mode_t;
+
 struct pn_messenger_t {
   char *name;
   char *certificate;
@@ -56,10 +65,15 @@ struct pn_messenger_t {
   bool blocking;
   pn_driver_t *driver;
   int send_threshold;
-  int receiving;
-  int credit_batch;
-  int credit;
-  int distributed;
+  pn_link_credit_mode_t credit_mode;
+  int credit_batch;  // when LINK_CREDIT_AUTO
+  int credit;        // available
+  int distributed;   // credit
+  int receivers;     // # receiver links
+  int draining;      // # links in drain state
+  pn_list_t *credited;
+  pn_list_t *blocked;
+  pn_timestamp_t next_drain;
   uint64_t next_tag;
   pni_store_t *outgoing;
   pni_store_t *incoming;
@@ -125,9 +139,11 @@ typedef struct {
   char *pass;
   char *host;
   char *port;
+  pn_connector_t *connector;
 } pn_connection_ctx_t;
 
 static pn_connection_ctx_t *pn_connection_ctx(pn_connection_t *conn,
+                                              pn_connector_t *connector,
                                               const char *scheme,
                                               const char *user,
                                               const char *pass,
@@ -142,6 +158,7 @@ static pn_connection_ctx_t *pn_connectio
   ctx->pass = pn_strdup(pass);
   ctx->host = pn_strdup(host);
   ctx->port = pn_strdup(port);
+  ctx->connector = connector;
   pn_connection_set_context(conn, ctx);
   return ctx;
 }
@@ -174,6 +191,53 @@ static char *build_name(const char *name
   }
 }
 
+struct pn_link_ctx_t {
+  pn_subscription_t *subscription;
+};
+
+// compute the maximum amount of credit each receiving link is
+// entitled to.  The actual credit given to the link depends on what
+// amount of credit is actually available.
+static int per_link_credit( pn_messenger_t *messenger )
+{
+  if (messenger->receivers == 0) return 0;
+  int total = messenger->credit + messenger->distributed;
+  return pn_max(total/messenger->receivers, 1);
+}
+
+static void link_ctx_setup( pn_messenger_t *messenger,
+                            pn_connection_t *connection,
+                            pn_link_t *link )
+{
+  if (pn_link_is_receiver(link)) {
+    messenger->receivers++;
+    pn_link_ctx_t *ctx = (pn_link_ctx_t *) calloc(1, sizeof(pn_link_ctx_t));
+    assert( ctx );
+    assert( !pn_link_get_context(link) );
+    pn_link_set_context( link, ctx );
+    pn_list_add(messenger->blocked, link);
+  }
+}
+
+static void link_ctx_release( pn_messenger_t *messenger, pn_link_t *link )
+{
+  if (pn_link_is_receiver(link)) {
+    assert( messenger->receivers > 0 );
+    messenger->receivers--;
+    pn_link_ctx_t *ctx = (pn_link_ctx_t *) pn_link_get_context( link );
+    assert( ctx );
+    if (pn_link_get_drain(link)) {
+      pn_link_set_drain(link, false);
+      assert( messenger->draining > 0 );
+      messenger->draining--;
+    }
+    pn_list_remove(messenger->credited, link);
+    pn_list_remove(messenger->blocked, link);
+    pn_link_set_context( link, NULL );
+    free( ctx );
+  }
+}
+
 pn_messenger_t *pn_messenger(const char *name)
 {
   pn_messenger_t *m = (pn_messenger_t *) malloc(sizeof(pn_messenger_t));
@@ -187,10 +251,15 @@ pn_messenger_t *pn_messenger(const char 
     m->timeout = -1;
     m->blocking = true;
     m->driver = pn_driver();
-    m->receiving = 0;
+    m->credit_mode = LINK_CREDIT_EXPLICIT;
     m->credit_batch = 1024;
     m->credit = 0;
     m->distributed = 0;
+    m->receivers = 0;
+    m->draining = 0;
+    m->credited = pn_list(0, 0);
+    m->blocked = pn_list(0, 0);
+    m->next_drain = 0;
     m->next_tag = 0;
     m->outgoing = pni_store();
     m->incoming = pni_store();
@@ -328,6 +397,8 @@ void pn_messenger_free(pn_messenger_t *m
     free(messenger->subscriptions);
     pn_free(messenger->rewrites);
     pn_free(messenger->routes);
+    pn_free(messenger->credited);
+    pn_free(messenger->blocked);
     free(messenger);
   }
 }
@@ -347,55 +418,99 @@ pn_error_t *pn_messenger_error(pn_messen
   return messenger->error;
 }
 
-void pn_messenger_flow(pn_messenger_t *messenger)
-{
-  int link_ct = 0;
-  pn_connector_t *ctor = pn_connector_head(messenger->driver);
-  while (ctor) {
-    pn_connection_t *conn = pn_connector_connection(ctor);
-
-    pn_link_t *link = pn_link_head(conn, PN_LOCAL_ACTIVE);
-    while (link) {
-      if (pn_link_is_receiver(link)) link_ct++;
-      link = pn_link_next(link, PN_LOCAL_ACTIVE);
+// Run the credit scheduler, grant flow as needed.  Return True if
+// credit allocation for any link has changed.
+bool pn_messenger_flow(pn_messenger_t *messenger)
+{
+  bool updated = false;
+  if (messenger->receivers == 0) return updated;
+
+  if (messenger->credit_mode == LINK_CREDIT_AUTO) {
+    // replenish, but limit the max total messages buffered
+    const int max = messenger->receivers * messenger->credit_batch;
+    const int used = messenger->distributed + pn_messenger_incoming(messenger);
+    if (max > used)
+      messenger->credit = max - used;
+  }
+
+  // account for any credit left over after draining links has completed
+  if (messenger->draining > 0) {
+    for (size_t i = 0; i < pn_list_size(messenger->credited); i++) {
+      pn_link_t *link = (pn_link_t *) pn_list_get(messenger->credited, i);
+      if (pn_link_get_drain(link)) {
+        if (!pn_link_draining(link)) {
+          // drain completed!
+          int drained = pn_link_drained(link);
+          //          printf("%s: drained %i from %p\n", messenger->name, drained, (void *) ctx->link);
+          messenger->distributed -= drained;
+          messenger->credit += drained;
+          pn_link_set_drain(link, false);
+          messenger->draining--;
+          pn_list_remove(messenger->credited, link);
+          pn_list_add(messenger->blocked, link);
+        }
+      }
     }
-    ctor = pn_connector_next(ctor);
   }
 
-  if (link_ct == 0) return;
-
-  if (messenger->receiving == -1) {
-    messenger->credit = link_ct * messenger->credit_batch - pn_messenger_incoming(messenger);
-  } else {
-    int total = messenger->credit + messenger->distributed;
-    if (messenger->receiving > total)
-      messenger->credit += (messenger->receiving - total);
+  const int batch = per_link_credit(messenger);
+  while (messenger->credit > 0 && pn_list_size(messenger->blocked)) {
+    pn_link_t *link = (pn_link_t *) pn_list_get(messenger->blocked, 0);
+    pn_list_del(messenger->blocked, 0, 1);
+
+    const int more = pn_min( messenger->credit, batch );
+    messenger->distributed += more;
+    messenger->credit -= more;
+    //    printf("%s: flowing %i to %p\n", messenger->name, more, (void *) ctx->link);
+    pn_link_flow(link, more);
+    pn_list_add(messenger->credited, link);
+    pn_connection_t *conn = pn_session_connection(pn_link_session(link));
+    pn_connection_ctx_t *cctx;
+    cctx = (pn_connection_ctx_t *)pn_connection_get_context(conn);
+    // flow changed, must process it
+    pn_connector_process( cctx->connector );
+    updated = true;
   }
 
-  int batch = (messenger->credit < link_ct) ? 1
-    : (messenger->credit/link_ct);
-
-  ctor = pn_connector_head(messenger->driver);
-  while (ctor) {
-    pn_connection_t *conn = pn_connector_connection(ctor);
-    pn_link_t *link = pn_link_head(conn, PN_LOCAL_ACTIVE);
-    while (link) {
-      if (pn_link_is_receiver(link)) {
+  if (!pn_list_size(messenger->blocked)) {
+    messenger->next_drain = 0;
+  } else {
+    // not enough credit for all links
+    if (!messenger->draining) {
+      //      printf("%s: let's drain\n", messenger->name);
+      if (messenger->next_drain == 0) {
+        messenger->next_drain = pn_i_now() + 250;
+        //        printf("%s: initializing next_drain\n", messenger->name);
+      } else if (messenger->next_drain <= pn_i_now()) {
+        // initiate drain, free up at most enough to satisfy blocked
+        messenger->next_drain = 0;
+        int needed = pn_list_size(messenger->blocked) * batch;
+        for (size_t i = 0; i < pn_list_size(messenger->credited); i++) {
+          pn_link_t *link = (pn_link_t *) pn_list_get(messenger->credited, i);
+          if (!pn_link_get_drain(link)) {
+            //            printf("%s: initiating drain from %p\n", messenger->name, (void *) ctx->link);
+            pn_link_set_drain(link, true);
+            needed -= pn_link_remote_credit(link);
+            messenger->draining++;
+            pn_connection_t *conn =
+              pn_session_connection(pn_link_session(link));
+            pn_connection_ctx_t *cctx;
+            cctx = (pn_connection_ctx_t *)pn_connection_get_context(conn);
+            // drain requested on link, must process it
+            pn_connector_process( cctx->connector );
+            updated = true;
+          }
 
-        int have = pn_link_credit(link);
-        if (have < batch) {
-          int need = batch - have;
-          int amount = (messenger->credit < need) ? messenger->credit : need;
-          pn_link_flow(link, amount);
-          messenger->distributed += amount;
-          messenger->credit -= amount;
-          if (messenger->credit == 0) return;
+          if (needed <= 0) {
+            break;
+          }
         }
+      } else {
+        //        printf("%s: delaying\n", messenger->name);
       }
-      link = pn_link_next(link, PN_LOCAL_ACTIVE);
     }
-    ctor = pn_connector_next(ctor);
   }
+  return updated;
 }
 
 static void pn_transport_config(pn_messenger_t *messenger,
@@ -465,8 +580,8 @@ int pni_pump_in(pn_messenger_t *messenge
   pn_buffer_t *buf = pni_entry_bytes(entry);
   pni_entry_set_delivery(entry, d);
 
-  pn_subscription_t *sub = (pn_subscription_t *) pn_link_get_context(receiver);
-  pni_entry_set_context(entry, sub);
+  pn_link_ctx_t *ctx = (pn_link_ctx_t *) pn_link_get_context( receiver );
+  pni_entry_set_context(entry, ctx ? ctx->subscription : NULL);
 
   size_t pending = pn_delivery_pending(d);
   int err = pn_buffer_ensure(buf, pending + 1);
@@ -480,6 +595,35 @@ int pni_pump_in(pn_messenger_t *messenge
   }
   n = pn_link_recv(receiver, encoded + pending, 1);
   pn_link_advance(receiver);
+
+  // account for the used credit
+  assert( ctx );
+  assert( messenger->distributed );
+  messenger->distributed--;
+
+  pn_link_t *link = receiver;
+  // replenish if low (< 20% maximum batch) and credit available
+  if (!pn_link_get_drain(link) && pn_list_size(messenger->blocked) == 0 && messenger->credit > 0) {
+    const int max = per_link_credit(messenger);
+    const int lo_thresh = (int)(max * 0.2 + 0.5);
+    if (pn_link_remote_credit(link) < lo_thresh) {
+      const int more = pn_min(messenger->credit, max - pn_link_remote_credit(link));
+      messenger->credit -= more;
+      messenger->distributed += more;
+      pn_link_flow(link, more);
+    }
+  }
+  // check if blocked
+  if (pn_list_index(messenger->blocked, link) < 0 && pn_link_remote_credit(link) == 0) {
+    pn_list_remove(messenger->credited, link);
+    if (pn_link_get_drain(link)) {
+      pn_link_set_drain(link, false);
+      assert( messenger->draining > 0 );
+      messenger->draining--;
+    }
+    pn_list_add(messenger->blocked, link);
+  }
+
   if (n != PN_EOS) {
     return pn_error_format(messenger->error, n, "PN_EOS expected");
   }
@@ -530,11 +674,12 @@ void pn_messenger_endpoints(pn_messenger
   while (link) {
     pn_terminus_copy(pn_link_source(link), pn_link_remote_source(link));
     pn_terminus_copy(pn_link_target(link), pn_link_remote_target(link));
+    link_ctx_setup( messenger, conn, link );
     pn_link_open(link);
     if (pn_link_is_receiver(link)) {
       pn_listener_t *listener = pn_connector_listener(ctor);
       pn_listener_ctx_t *ctx = (pn_listener_ctx_t *) pn_listener_context(listener);
-      pn_link_set_context(link, ctx ? ctx->subscription : NULL);
+      ((pn_link_ctx_t *)pn_link_get_context(link))->subscription = ctx ? ctx->subscription : NULL;
     }
     link = pn_link_next(link, PN_LOCAL_UNINIT);
   }
@@ -547,8 +692,6 @@ void pn_messenger_endpoints(pn_messenger
     link = pn_link_next(link, PN_LOCAL_ACTIVE | PN_REMOTE_ACTIVE);
   }
 
-  pn_messenger_flow(messenger);
-
   ssn = pn_session_head(conn, PN_LOCAL_ACTIVE | PN_REMOTE_CLOSED);
   while (ssn) {
     pn_condition_report("SESSION", pn_session_remote_condition(ssn));
@@ -556,12 +699,16 @@ void pn_messenger_endpoints(pn_messenger
     ssn = pn_session_next(ssn, PN_LOCAL_ACTIVE | PN_REMOTE_CLOSED);
   }
 
-  link = pn_link_head(conn, PN_LOCAL_ACTIVE | PN_REMOTE_CLOSED);
+  link = pn_link_head(conn, PN_REMOTE_CLOSED);
   while (link) {
-    pn_condition_report("LINK", pn_link_remote_condition(link));
-    pn_link_close(link);
-    // XXX: should free link
-    link = pn_link_next(link, PN_LOCAL_ACTIVE | PN_REMOTE_CLOSED);
+    if (PN_LOCAL_ACTIVE | pn_link_state(link)) {
+      pn_condition_report("LINK", pn_link_remote_condition(link));
+      pn_link_close(link);
+    } else {
+      link_ctx_release( messenger, link );
+      pn_link_free(link);
+    }
+    link = pn_link_next(link, PN_REMOTE_CLOSED);
   }
 
   if (pn_connection_state(conn) == (PN_LOCAL_ACTIVE | PN_REMOTE_CLOSED)) {
@@ -585,6 +732,8 @@ void pn_messenger_endpoints(pn_messenger
   } else if (pn_connector_closed(ctor) && !(pn_connection_state(conn) & PN_REMOTE_CLOSED)) {
     pn_error_report("CONNECTION", "connection aborted");
   }
+
+  pn_messenger_flow(messenger);
 }
 
 void pni_messenger_reclaim(pn_messenger_t *messenger, pn_connection_t *conn)
@@ -608,6 +757,8 @@ void pni_messenger_reclaim(pn_messenger_
       d = pn_unsettled_next(d);
     }
 
+    link_ctx_release(messenger, link);
+
     link = pn_link_next(link, 0);
   }
 
@@ -617,6 +768,7 @@ void pni_messenger_reclaim(pn_messenger_
 
 
 pn_connection_t *pn_messenger_connection(pn_messenger_t *messenger,
+                                         pn_connector_t *connector,
                                          char *scheme,
                                          char *user,
                                          char *pass,
@@ -625,7 +777,7 @@ pn_connection_t *pn_messenger_connection
 {
   pn_connection_t *connection = pn_connection();
   if (!connection) return NULL;
-  pn_connection_ctx(connection, scheme, user, pass, host, port);
+  pn_connection_ctx(connection, connector, scheme, user, pass, host, port);
 
   pn_connection_set_container(connection, messenger->name);
   pn_connection_set_hostname(connection, host);
@@ -651,6 +803,17 @@ int pn_messenger_tsync(pn_messenger_t *m
     int remaining = deadline - now;
     if (pred || (timeout >= 0 && remaining < 0)) break;
 
+    // Update the credit scheduler. If the scheduler detects credit
+    // imbalance on the links, wake up in time to service credit drain
+    pn_messenger_flow(messenger);
+    if (messenger->next_drain) {
+      if (now >= messenger->next_drain)
+        remaining = 0;
+      else {
+        const int delay = messenger->next_drain - now;
+        remaining = (remaining < 0) ? delay : pn_min( remaining, delay );
+      }
+    }
     int error = pn_driver_wait(messenger->driver, remaining);
     if (error && error != PN_INTR) return error;
 
@@ -681,7 +844,7 @@ int pn_messenger_tsync(pn_messenger_t *m
       pn_sasl_server(sasl);
       pn_sasl_done(sasl, PN_SASL_OK);
       pn_connection_t *conn =
-        pn_messenger_connection(messenger, scheme, NULL, NULL, NULL, NULL);
+        pn_messenger_connection(messenger, c, scheme, NULL, NULL, NULL, NULL);
       pn_connector_set_connection(c, conn);
     }
 
@@ -695,7 +858,6 @@ int pn_messenger_tsync(pn_messenger_t *m
         pn_connector_free(c);
         if (conn) {
           pni_messenger_reclaim(messenger, conn);
-          pn_messenger_flow(messenger);
         }
       } else {
         pn_connector_process(c);
@@ -889,7 +1051,7 @@ pn_connection_t *pn_messenger_resolve(pn
   }
 
   pn_connection_t *connection =
-    pn_messenger_connection(messenger, scheme, user, pass, host, port);
+    pn_messenger_connection(messenger, connector, scheme, user, pass, host, port);
   pn_transport_config(messenger, connector, connection);
   pn_connection_open(connection);
   pn_connector_set_connection(connector, connection);
@@ -941,16 +1103,18 @@ pn_link_t *pn_messenger_link(pn_messenge
   link = sender ? pn_sender(ssn, "sender-xxx") : pn_receiver(ssn, "receiver-xxx");
   if ((sender && pn_messenger_get_outgoing_window(messenger)) ||
       (!sender && pn_messenger_get_incoming_window(messenger))) {
-      // use explicit settlement via dispositions (not pre-settled)
-      pn_link_set_snd_settle_mode( link, PN_SND_UNSETTLED );
-      pn_link_set_rcv_settle_mode( link, PN_RCV_SECOND );
+    // use explicit settlement via dispositions (not pre-settled)
+    pn_link_set_snd_settle_mode( link, PN_SND_UNSETTLED );
+    pn_link_set_rcv_settle_mode( link, PN_RCV_SECOND );
   }
   // XXX
   pn_terminus_set_address(pn_link_target(link), name);
   pn_terminus_set_address(pn_link_source(link), name);
+  link_ctx_setup( messenger, connection, link );
   if (!sender) {
-    pn_subscription_t *sub = pn_subscription(messenger, NULL);
-    pn_link_set_context(link, sub);
+    pn_link_ctx_t *ctx = (pn_link_ctx_t *)pn_link_get_context(link);
+    assert( ctx );
+    ctx->subscription = pn_subscription(messenger, NULL);
   }
   pn_link_open(link);
   return link;
@@ -991,8 +1155,8 @@ pn_subscription_t *pn_messenger_subscrib
   } else {
     pn_link_t *src = pn_messenger_source(messenger, source);
     if (!src) return NULL;
-    pn_subscription_t *sub = (pn_subscription_t *) pn_link_get_context(src);
-    return sub;
+    pn_link_ctx_t *ctx = (pn_link_ctx_t *) pn_link_get_context( src );
+    return ctx ? ctx->subscription : NULL;
   }
 }
 
@@ -1048,7 +1212,11 @@ static void outward_munge(pn_messenger_t
 int pni_pump_out(pn_messenger_t *messenger, const char *address, pn_link_t *sender)
 {
   pni_entry_t *entry = pni_store_get(messenger->outgoing, address);
-  if (!entry) return 0;
+  if (!entry) {
+    pn_link_drained(sender);
+    return 0;
+  }
+
   pn_buffer_t *buf = pni_entry_bytes(entry);
   pn_bytes_t bytes = pn_buffer_bytes(buf);
   char *encoded = bytes.start;
@@ -1292,7 +1460,17 @@ int pn_messenger_recv(pn_messenger_t *me
   if (messenger->blocking && !pn_listener_head(messenger->driver)
       && !pn_connector_head(messenger->driver))
     return pn_error_format(messenger->error, PN_STATE_ERR, "no valid sources");
-  messenger->receiving = n;
+
+  // re-compute credit, and update credit scheduler
+  if (n == -1) {
+    messenger->credit_mode = LINK_CREDIT_AUTO;
+  } else {
+    messenger->credit_mode = LINK_CREDIT_EXPLICIT;
+    if (n > messenger->distributed)
+      messenger->credit = n - messenger->distributed;
+    else  // cancel unallocated
+      messenger->credit = 0;
+  }
   pn_messenger_flow(messenger);
   int err = pn_messenger_sync(messenger, pn_messenger_rcvd);
   if (err) return err;
@@ -1309,7 +1487,7 @@ int pn_messenger_recv(pn_messenger_t *me
 int pn_messenger_receiving(pn_messenger_t *messenger)
 {
   assert(messenger);
-  return messenger->receiving;
+  return messenger->credit + messenger->distributed;
 }
 
 int pn_messenger_get(pn_messenger_t *messenger, pn_message_t *msg)
@@ -1326,7 +1504,6 @@ int pn_messenger_get(pn_messenger_t *mes
   const char *encoded = bytes.start;
   size_t size = bytes.size;
 
-  messenger->distributed--;
   messenger->incoming_subscription = (pn_subscription_t *) pni_entry_get_context(entry);
 
   if (msg) {

Modified: qpid/proton/trunk/proton-c/src/util.h
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/util.h?rev=1532452&r1=1532451&r2=1532452&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/util.h (original)
+++ qpid/proton/trunk/proton-c/src/util.h Tue Oct 15 17:35:09 2013
@@ -97,8 +97,6 @@ pn_timestamp_t pn_timestamp_min(pn_times
       LL_HEAD(ROOT, LIST) = (NODE)-> LIST ## _next;                    \
     if ((NODE) == LL_TAIL(ROOT, LIST))                                 \
       LL_TAIL(ROOT, LIST) = (NODE)-> LIST ## _prev;                    \
-    (NODE)-> LIST ## _next = NULL;                                     \
-    (NODE)-> LIST ## _prev = NULL;                                     \
   }
 
 char *pn_strdup(const char *src);

Modified: qpid/proton/trunk/tests/python/proton_tests/messenger.py
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/tests/python/proton_tests/messenger.py?rev=1532452&r1=1532451&r2=1532452&view=diff
==============================================================================
--- qpid/proton/trunk/tests/python/proton_tests/messenger.py (original)
+++ qpid/proton/trunk/tests/python/proton_tests/messenger.py Tue Oct 15 17:35:09 2013
@@ -28,6 +28,7 @@ class Test(common.Test):
   def setup(self):
     self.server_credit = 10
     self.server_received = 0
+    self.server_finite_credit = False
     self.server = Messenger("server")
     self.server.timeout = self.timeout
     self.server.start()
@@ -68,6 +69,14 @@ REJECT_ME = "*REJECT-ME*"
 class MessengerTest(Test):
 
   def run_server(self):
+    if self.server_finite_credit:
+      self._run_server_finite_credit()
+    else:
+      self._run_server_recv()
+
+  def _run_server_recv(self):
+    """ Use recv() to replenish credit each time the server waits
+    """
     msg = Message()
     try:
       while self.running:
@@ -81,6 +90,23 @@ class MessengerTest(Test):
       self.server.stop()
       self.running = False
 
+  def _run_server_finite_credit(self):
+    """ Grant credit once, process until credit runs out
+    """
+    msg = Message()
+    self.server_is_running_event.set()
+    try:
+      self.server.recv(self.server_credit)
+      while self.running:
+        # do not grant additional credit (eg. call recv())
+        self.process_incoming(msg)
+        self.server.work()
+    except Interrupt:
+      pass
+    finally:
+      self.server.stop()
+      self.running = False
+
   def process_incoming(self, msg):
     while self.server.incoming:
       self.server.get(msg)
@@ -568,6 +594,57 @@ class MessengerTest(Test):
     self.client.rewrite("*", "$1")
     self._testRewrite("amqp://user:pass@host", "amqp://user:pass@host")
 
+  def testCreditBlockingRebalance(self):
+    """ The server is given a fixed amount of credit, and runs until that
+    credit is exhausted.
+    """
+    if sys.platform.startswith("java"):
+        raise Skipped("Skipping testCreditBlockingRebalance - credit scheduler TBD for Java Messenger")
+
+    self.server_finite_credit = True
+    self.server_credit = 11
+    self.start()
+
+    # put one message out on "Link1" - since there are no other links, it
+    # should get all the credit (10 after sending)
+    msg = Message()
+    msg.address="amqp://0.0.0.0:12345/Link1"
+    msg.subject="Hello World!"
+    body = "First the world, then the galaxy!"
+    msg.body = body
+    self.client.put(msg)
+    self.client.send()
+    self.client.recv(1)
+    assert self.client.incoming == 1
+
+    # Now attempt to exhaust credit using a different link
+    for i in range(10):
+      msg.address="amqp://0.0.0.0:12345/Link2"
+      self.client.put(msg)
+    self.client.send()
+
+    deadline = time() + self.timeout
+    count = 0
+    while count < 11 and time() < deadline:
+        self.client.recv(-1)
+        while self.client.incoming:
+            self.client.get(msg)
+            count += 1
+    assert count == 11, count
+
+    # now attempt to send one more.  There isn't enough credit, so it should
+    # not be sent
+    self.client.timeout = 1
+    msg.address="amqp://0.0.0.0:12345/Link2"
+    self.client.put(msg)
+    try:
+      self.client.send()
+      assert False, "expected client to time out in send()"
+    except Timeout:
+      pass
+    assert self.client.outgoing == 1
+
+
 class NBMessengerTest(common.Test):
 
   def setup(self):
@@ -652,3 +729,170 @@ class NBMessengerTest(common.Test):
     self.client.get(msg2)
     assert msg2.address == msg.address
     assert msg2.body == msg.body
+
+  def testCreditAutoBackpressure(self):
+    """ Verify that use of automatic credit (pn_messenger_recv(-1)) does not
+    fill the incoming queue indefinitely.  If the receiver does not 'get' the
+    message, eventually the sender will block.  See PROTON-350 """
+    self.server.recv()
+    msg = Message()
+    msg.address = self.address
+    deadline = time() + self.timeout
+    while time() < deadline:
+        old = self.server.incoming
+        for j in xrange(1001):
+            self.client.put(msg)
+        self.pump()
+        if old == self.server.incoming:
+            break;
+    assert old == self.server.incoming, "Backpressure not active!"
+
+  def testCreditRedistribution(self):
+    """ Verify that a fixed amount of credit will redistribute to new
+    links.
+    """
+    if sys.platform.startswith("java"):
+        raise Skipped("Skipping testCreditRedistribution - credit scheduler TBD for Java Messenger")
+
+    self.server.recv( 5 )
+
+    # first link will get all credit
+    msg1 = Message()
+    msg1.address = self.address + "/msg1"
+    self.client.put(msg1)
+    self.pump()
+    assert self.server.incoming == 1, self.server.incoming
+    assert self.server.receiving == 4, self.server.receiving
+
+    # no credit left over for this link
+    msg2 = Message()
+    msg2.address = self.address + "/msg2"
+    self.client.put(msg2)
+    self.pump()
+    assert self.server.incoming == 1, self.server.incoming
+    assert self.server.receiving == 4, self.server.receiving
+
+    # eventually, credit will rebalance and the new link will send
+    deadline = time() + self.timeout
+    while time() < deadline:
+        sleep(.1)
+        self.pump()
+        if self.server.incoming == 2:
+            break;
+    assert self.server.incoming == 2, self.server.incoming
+    assert self.server.receiving == 3, self.server.receiving
+
+  def testCreditReclaim(self):
+    """ Verify that credit is reclaimed when a link with outstanding credit is
+    torn down.
+    """
+    if sys.platform.startswith("java"):
+        raise Skipped("Skipping testCreditReclaim - credit scheduler TBD for Java Messenger")
+
+    self.server.recv( 9 )
+
+    # first link will get all credit
+    msg1 = Message()
+    msg1.address = self.address + "/msg1"
+    self.client.put(msg1)
+    self.pump()
+    assert self.server.incoming == 1, self.server.incoming
+    assert self.server.receiving == 8, self.server.receiving
+
+    # no credit left over for this link
+    msg2 = Message()
+    msg2.address = self.address + "/msg2"
+    self.client.put(msg2)
+    self.pump()
+    assert self.server.incoming == 1, self.server.incoming
+    assert self.server.receiving == 8, self.server.receiving
+
+    # and none for this new client
+    client2 = Messenger("client2")
+    client2.blocking = False
+    client2.start()
+    msg3 = Message()
+    msg3.address = self.address + "/msg3"
+    client2.put(msg3)
+    while client2.work(0):
+        self.pump()
+    assert self.server.incoming == 1, self.server.incoming
+    assert self.server.receiving == 8, self.server.receiving
+
+    # eventually, credit will rebalance and all links will
+    # send a message
+    deadline = time() + self.timeout
+    while time() < deadline:
+        sleep(.1)
+        self.pump()
+        client2.work(0)
+        if self.server.incoming == 3:
+            break;
+    assert self.server.incoming == 3, self.server.incoming
+    assert self.server.receiving == 6, self.server.receiving
+
+    # now tear down client two, this should cause its outstanding credit to be
+    # made available to the other links
+    client2.stop()
+    self.pump()
+
+    for i in range(4):
+        self.client.put(msg1)
+        self.client.put(msg2)
+
+    # should exhaust all credit
+    deadline = time() + self.timeout
+    while time() < deadline:
+        sleep(.1)
+        self.pump()
+        if self.server.incoming == 9:
+            break;
+    assert self.server.incoming == 9, self.server.incoming
+    assert self.server.receiving == 0, self.server.receiving
+
+
+
+  def testCreditReplenish(self):
+    """ When extra credit is available it should be granted to the first
+    link that can use it.
+    """
+    if sys.platform.startswith("java"):
+        raise Skipped("Skipping testCreditReplenish - credit scheduler TBD for Java Messenger")
+
+    # create three links
+    msg = Message()
+    for i in range(3):
+        msg.address = self.address + "/%d" % i
+        self.client.put(msg)
+
+    self.server.recv( 50 )  # 50/3 = 16 per link + 2 extra
+
+    self.pump()
+    assert self.server.incoming == 3, self.server.incoming
+    assert self.server.receiving == 47, self.server.receiving
+
+    # 47/3 = 15 per link, + 2 extra
+
+    # verify one link can send 15 + the two extra (17)
+    for i in range(17):
+        msg.address = self.address + "/0"
+        self.client.put(msg)
+    self.pump()
+    assert self.server.incoming == 20, self.server.incoming
+    assert self.server.receiving == 30, self.server.receiving
+
+    # now verify that the remaining credit (30) will eventually rebalance
+    # across all links (10 per link)
+    for j in range(10):
+        for i in range(3):
+            msg.address = self.address + "/%d" % i
+            self.client.put(msg)
+
+    deadline = time() + self.timeout
+    while time() < deadline:
+        sleep(.1)
+        self.pump()
+        if self.server.incoming == 50:
+            break
+    assert self.server.incoming == 50, self.server.incoming
+    assert self.server.receiving == 0, self.server.receiving



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org