You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rh...@apache.org on 2013/06/11 11:41:15 UTC

svn commit: r1491719 - in /qpid/proton/trunk: proton-c/bindings/java/src/main/java/org/apache/qpid/proton/engine/jni/ proton-c/bindings/python/ proton-c/include/proton/ proton-c/src/dispatcher/ proton-c/src/engine/ proton-c/src/messenger/ proton-j/prot...

Author: rhs
Date: Tue Jun 11 09:41:15 2013
New Revision: 1491719

URL: http://svn.apache.org/r1491719
Log:
PROTON-331: fixed logic for session window, removed fix limit, added tests

Modified:
    qpid/proton/trunk/proton-c/bindings/java/src/main/java/org/apache/qpid/proton/engine/jni/JNIDelivery.java
    qpid/proton/trunk/proton-c/bindings/python/proton.py
    qpid/proton/trunk/proton-c/include/proton/engine.h
    qpid/proton/trunk/proton-c/src/dispatcher/dispatcher.c
    qpid/proton/trunk/proton-c/src/dispatcher/dispatcher.h
    qpid/proton/trunk/proton-c/src/engine/engine-internal.h
    qpid/proton/trunk/proton-c/src/engine/engine.c
    qpid/proton/trunk/proton-c/src/messenger/messenger.c
    qpid/proton/trunk/proton-j/proton-api/src/main/java/org/apache/qpid/proton/engine/Delivery.java
    qpid/proton/trunk/proton-j/proton-api/src/main/java/org/apache/qpid/proton/engine/Transport.java
    qpid/proton/trunk/proton-j/proton-api/src/main/resources/proton.py
    qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/engine/impl/DeliveryImpl.java
    qpid/proton/trunk/tests/python/proton_tests/engine.py
    qpid/proton/trunk/tests/python/proton_tests/messenger.py

Modified: qpid/proton/trunk/proton-c/bindings/java/src/main/java/org/apache/qpid/proton/engine/jni/JNIDelivery.java
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/bindings/java/src/main/java/org/apache/qpid/proton/engine/jni/JNIDelivery.java?rev=1491719&r1=1491718&r2=1491719&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/bindings/java/src/main/java/org/apache/qpid/proton/engine/jni/JNIDelivery.java (original)
+++ qpid/proton/trunk/proton-c/bindings/java/src/main/java/org/apache/qpid/proton/engine/jni/JNIDelivery.java Tue Jun 11 09:41:15 2013
@@ -250,4 +250,11 @@ public class JNIDelivery implements Deli
         }
         super.finalize();
     }
+
+    @ProtonCEquivalent("pn_delivery_pending")
+    public int pending()
+    {
+        return (int) Proton.pn_delivery_pending(_impl);
+    }
+
 }

Modified: qpid/proton/trunk/proton-c/bindings/python/proton.py
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/bindings/python/proton.py?rev=1491719&r1=1491718&r2=1491719&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/bindings/python/proton.py (original)
+++ qpid/proton/trunk/proton-c/bindings/python/proton.py Tue Jun 11 09:41:15 2013
@@ -1993,6 +1993,22 @@ class Session(Endpoint):
   def _get_remote_cond_impl(self):
     return pn_session_remote_condition(self._ssn)
 
+  def _get_incoming_capacity(self):
+    return pn_session_get_incoming_capacity(self._ssn)
+
+  def _set_incoming_capacity(self, capacity):
+    pn_session_set_incoming_capacity(self._ssn, capacity)
+
+  incoming_capacity = property(_get_incoming_capacity, _set_incoming_capacity)
+
+  @property
+  def outgoing_bytes(self):
+    return pn_session_outgoing_bytes(self._ssn)
+
+  @property
+  def incoming_bytes(self):
+    return pn_session_incoming_bytes(self._ssn)
+
   def open(self):
     pn_session_open(self._ssn)
 
@@ -2264,6 +2280,14 @@ class Delivery(object):
     pn_delivery_update(self._dlv, state)
 
   @property
+  def pending(self):
+    return pn_delivery_pending(self._dlv)
+
+  @property
+  def partial(self):
+    return pn_delivery_partial(self._dlv)
+
+  @property
   def local_state(self):
     return pn_delivery_local_state(self._dlv)
 
@@ -2672,7 +2696,6 @@ class Driver(object):
 __all__ = [
            "API_LANGUAGE",
            "IMPLEMENTATION_LANGUAGE",
-           "PN_SESSION_WINDOW",
            "ACCEPTED",
            "AUTOMATIC",
            "PENDING",

Modified: qpid/proton/trunk/proton-c/include/proton/engine.h
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/include/proton/engine.h?rev=1491719&r1=1491718&r2=1491719&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/include/proton/engine.h (original)
+++ qpid/proton/trunk/proton-c/include/proton/engine.h Tue Jun 11 09:41:15 2013
@@ -111,8 +111,6 @@ typedef int pn_trace_t;
 #define PN_TRACE_FRM (2)
 #define PN_TRACE_DRV (4)
 
-#define PN_SESSION_WINDOW (1024)
-
 // connection
 
 /** Factory to construct a new Connection.
@@ -182,6 +180,12 @@ PN_EXTERN pn_delivery_t *pn_work_next(pn
  */
 PN_EXTERN pn_session_t *pn_session(pn_connection_t *connection);
 
+PN_EXTERN size_t pn_session_get_incoming_capacity(pn_session_t *ssn);
+PN_EXTERN void pn_session_set_incoming_capacity(pn_session_t *ssn, size_t capacity);
+
+PN_EXTERN size_t pn_session_outgoing_bytes(pn_session_t *ssn);
+PN_EXTERN size_t pn_session_incoming_bytes(pn_session_t *ssn);
+
 /** Factory for creating a transport.
  *
  * A transport to be used by a connection to interface with the

Modified: qpid/proton/trunk/proton-c/src/dispatcher/dispatcher.c
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/dispatcher/dispatcher.c?rev=1491719&r1=1491718&r2=1491719&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/dispatcher/dispatcher.c (original)
+++ qpid/proton/trunk/proton-c/src/dispatcher/dispatcher.c Tue Jun 11 09:41:15 2013
@@ -266,9 +266,11 @@ int pn_post_transfer_frame(pn_dispatcher
                            const pn_bytes_t *tag,
                            uint32_t message_format,
                            bool settled,
-                           bool more)
+                           bool more,
+                           pn_sequence_t frame_limit)
 {
   bool more_flag = more;
+  int framecount = 0;
 
   // create preformatives, assuming 'more' flag need not change
 
@@ -323,7 +325,7 @@ int pn_post_transfer_frame(pn_dispatcher
       goto encode_performatives;
     }
 
-    pn_do_trace(disp, ch, OUT, disp->output_args, disp->output_payload, disp->output_size);
+    pn_do_trace(disp, ch, OUT, disp->output_args, disp->output_payload, available);
 
     memmove( buf.start + buf.size, disp->output_payload, available);
     disp->output_payload += available;
@@ -342,14 +344,15 @@ int pn_post_transfer_frame(pn_dispatcher
       disp->output = (char *) realloc(disp->output, disp->capacity);
     }
     disp->output_frames_ct += 1;
+    framecount++;
     if (disp->trace & PN_TRACE_RAW) {
       fprintf(stderr, "RAW: \"");
       pn_fprint_data(stderr, disp->output + disp->available, n);
       fprintf(stderr, "\"\n");
     }
     disp->available += n;
-  } while (disp->output_size > 0);
+  } while (disp->output_size > 0 && framecount < frame_limit);
 
   disp->output_payload = NULL;
-  return 0;
+  return framecount;
 }

Modified: qpid/proton/trunk/proton-c/src/dispatcher/dispatcher.h
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/dispatcher/dispatcher.h?rev=1491719&r1=1491718&r2=1491719&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/dispatcher/dispatcher.h (original)
+++ qpid/proton/trunk/proton-c/src/dispatcher/dispatcher.h Tue Jun 11 09:41:15 2013
@@ -80,5 +80,6 @@ int pn_post_transfer_frame(pn_dispatcher
                            const pn_bytes_t *delivery_tag,
                            uint32_t message_format,
                            bool settled,
-                           bool more);
+                           bool more,
+                           pn_sequence_t frame_limit);
 #endif /* dispatcher.h */

Modified: qpid/proton/trunk/proton-c/src/engine/engine-internal.h
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/engine/engine-internal.h?rev=1491719&r1=1491718&r2=1491719&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/engine/engine-internal.h (original)
+++ qpid/proton/trunk/proton-c/src/engine/engine-internal.h Tue Jun 11 09:41:15 2013
@@ -62,7 +62,6 @@ typedef struct {
 } pn_delivery_state_t;
 
 typedef struct {
-  size_t capacity;
   pn_sequence_t next;
   pn_hash_t *deliveries;
 } pn_delivery_map_t;
@@ -84,6 +83,7 @@ typedef struct {
   pn_delivery_map_t outgoing;
   pn_sequence_t incoming_transfer_count;
   pn_sequence_t incoming_window;
+  pn_sequence_t remote_incoming_window;
   pn_sequence_t outgoing_transfer_count;
   pn_sequence_t outgoing_window;
   pn_hash_t *local_handles;
@@ -194,6 +194,11 @@ struct pn_session_t {
   pn_connection_t *connection;
   pn_list_t *links;
   void *context;
+  size_t incoming_capacity;
+  pn_sequence_t incoming_bytes;
+  pn_sequence_t outgoing_bytes;
+  pn_sequence_t incoming_deliveries;
+  pn_sequence_t outgoing_deliveries;
   pn_session_state_t state;
 };
 

Modified: qpid/proton/trunk/proton-c/src/engine/engine.c
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/engine/engine.c?rev=1491719&r1=1491718&r2=1491719&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/engine/engine.c (original)
+++ qpid/proton/trunk/proton-c/src/engine/engine.c Tue Jun 11 09:41:15 2013
@@ -37,10 +37,9 @@ static ssize_t transport_consume(pn_tran
 
 // delivery buffers
 
-void pn_delivery_map_init(pn_delivery_map_t *db, pn_sequence_t next, size_t capacity)
+void pn_delivery_map_init(pn_delivery_map_t *db, pn_sequence_t next)
 {
-  db->deliveries = pn_hash(capacity, 0.75, PN_REFCOUNT);
-  db->capacity = capacity;
+  db->deliveries = pn_hash(1024, 0.75, PN_REFCOUNT);
   db->next = next;
 }
 
@@ -49,11 +48,6 @@ void pn_delivery_map_free(pn_delivery_ma
   pn_free(db->deliveries);
 }
 
-size_t pn_delivery_map_available(pn_delivery_map_t *db)
-{
-  return db->capacity - pn_hash_size(db->deliveries);
-}
-
 pn_delivery_t *pn_delivery_map_get(pn_delivery_map_t *db, pn_sequence_t id)
 {
   return (pn_delivery_t *) pn_hash_get(db->deliveries, id);
@@ -68,8 +62,6 @@ static void pn_delivery_state_init(pn_de
 
 pn_delivery_state_t *pn_delivery_map_push(pn_delivery_map_t *db, pn_delivery_t *delivery)
 {
-  if (!pn_delivery_map_available(db))
-    return NULL;
   pn_delivery_state_t *ds = &delivery->state;
   pn_delivery_state_init(ds, delivery, db->next++);
   pn_hash_put(db->deliveries, ds->id, delivery);
@@ -628,13 +620,18 @@ pn_session_t *pn_session(pn_connection_t
   pn_decref(ssn);
   ssn->links = pn_list(0, PN_REFCOUNT);
   ssn->context = 0;
+  ssn->incoming_capacity = 1024*1024;
+  ssn->incoming_bytes = 0;
+  ssn->outgoing_bytes = 0;
+  ssn->incoming_deliveries = 0;
+  ssn->outgoing_deliveries = 0;
 
   // begin transport state
   memset(&ssn->state, 0, sizeof(ssn->state));
   ssn->state.local_channel = (uint16_t)-1;
   ssn->state.remote_channel = (uint16_t)-1;
-  pn_delivery_map_init(&ssn->state.incoming, 0, PN_SESSION_WINDOW);
-  pn_delivery_map_init(&ssn->state.outgoing, 0, PN_SESSION_WINDOW);
+  pn_delivery_map_init(&ssn->state.incoming, 0);
+  pn_delivery_map_init(&ssn->state.outgoing, 0);
   ssn->state.local_handles = pn_hash(0, 0.75, PN_REFCOUNT);
   ssn->state.remote_handles = pn_hash(0, 0.75, PN_REFCOUNT);
   // end transport state
@@ -642,6 +639,55 @@ pn_session_t *pn_session(pn_connection_t
   return ssn;
 }
 
+size_t pn_session_get_incoming_capacity(pn_session_t *ssn)
+{
+  assert(ssn);
+  return ssn->incoming_capacity;
+}
+
+void pn_session_set_incoming_capacity(pn_session_t *ssn, size_t capacity)
+{
+  assert(ssn);
+  // XXX: should this trigger a flow?
+  ssn->incoming_capacity = capacity;
+}
+
+size_t pn_session_outgoing_bytes(pn_session_t *ssn)
+{
+  assert(ssn);
+  return ssn->outgoing_bytes;
+}
+
+size_t pn_session_incoming_bytes(pn_session_t *ssn)
+{
+  assert(ssn);
+  return ssn->incoming_bytes;
+}
+
+size_t pn_session_outgoing_window(pn_session_t *ssn)
+{
+  uint32_t size = ssn->connection->transport->remote_max_frame;
+  if (!size) {
+    return ssn->outgoing_deliveries;
+  } else {
+    pn_sequence_t frames = ssn->outgoing_bytes/size;
+    if (ssn->outgoing_bytes % size) {
+      frames++;
+    }
+    return pn_max(frames, ssn->outgoing_deliveries);
+  }
+}
+
+size_t pn_session_incoming_window(pn_session_t *ssn)
+{
+  uint32_t size = ssn->connection->transport->local_max_frame;
+  if (!size) {
+    return 2147483647; // biggest legal value
+  } else {
+    return (ssn->incoming_capacity - ssn->incoming_bytes)/size;
+  }
+}
+
 pn_state_t pn_session_state(pn_session_t *session)
 {
   return session->endpoint.state;
@@ -1228,6 +1274,7 @@ void pn_advance_sender(pn_link_t *link)
   link->current->done = true;
   link->queued++;
   link->credit--;
+  link->session->outgoing_deliveries++;
   pn_add_tpwork(link->current);
   link->current = link->current->unsettled_next;
 }
@@ -1236,6 +1283,16 @@ void pn_advance_receiver(pn_link_t *link
 {
   link->credit--;
   link->queued--;
+  link->session->incoming_deliveries--;
+
+  pn_delivery_t *current = link->current;
+  link->session->incoming_bytes -= pn_buffer_size(current->bytes);
+  pn_buffer_clear(current->bytes);
+
+  if (!link->session->state.incoming_window) {
+    pn_add_tpwork(current);
+  }
+
   link->current = link->current->unsettled_next;
 }
 
@@ -1553,6 +1610,11 @@ int pn_do_transfer(pn_dispatcher_t *disp
                          &more);
   if (err) return err;
   pn_session_t *ssn = pn_channel_state(transport, disp->channel);
+
+  if (!ssn->state.incoming_window) {
+    return pn_do_error(transport, "amqp:session:window-violation", "incoming session window exceeded");
+  }
+
   pn_link_t *link = pn_handle_state(ssn, handle);
   pn_delivery_t *delivery;
   if (link->unsettled_tail && !link->unsettled_tail->done) {
@@ -1560,13 +1622,10 @@ int pn_do_transfer(pn_dispatcher_t *disp
   } else {
     pn_delivery_map_t *incoming = &ssn->state.incoming;
 
-    if (!pn_delivery_map_available(incoming)) {
-      return pn_do_error(transport, "amqp:session:window-violation", "incoming session window exceeded");
-    }
-
     if (!ssn->state.incoming_init) {
       incoming->next = id;
       ssn->state.incoming_init = true;
+      ssn->incoming_deliveries++;
     }
 
     delivery = pn_delivery(link, pn_dtag(tag.start, tag.size));
@@ -1586,11 +1645,13 @@ int pn_do_transfer(pn_dispatcher_t *disp
   }
 
   pn_buffer_append(delivery->bytes, disp->payload, disp->size);
+  ssn->incoming_bytes += disp->size;
   delivery->done = !more;
 
   ssn->state.incoming_transfer_count++;
   ssn->state.incoming_window--;
 
+  // XXX: need better policy for when to refresh window
   if (!ssn->state.incoming_window && (int32_t) link->state.local_handle >= 0) {
     pn_post_flow(transport, ssn, link);
   }
@@ -1613,9 +1674,9 @@ int pn_do_flow(pn_dispatcher_t *disp)
   pn_session_t *ssn = pn_channel_state(transport, disp->channel);
 
   if (inext_init) {
-    ssn->state.outgoing_window = inext + iwin - ssn->state.outgoing_transfer_count;
+    ssn->state.remote_incoming_window = inext + iwin - ssn->state.outgoing_transfer_count;
   } else {
-    ssn->state.outgoing_window = iwin;
+    ssn->state.remote_incoming_window = iwin;
   }
 
   if (handle_init) {
@@ -1983,11 +2044,13 @@ int pn_process_ssn_setup(pn_transport_t 
     if (!(endpoint->state & PN_LOCAL_UNINIT) && state->local_channel == (uint16_t) -1)
     {
       uint16_t channel = allocate_alias(transport->local_channels);
+      state->incoming_window = pn_session_incoming_window(ssn);
+      state->outgoing_window = pn_session_outgoing_window(ssn);
       pn_post_frame(transport->disp, channel, "DL[?HIII]", BEGIN,
                     ((int16_t) state->remote_channel >= 0), state->remote_channel,
                     state->outgoing_transfer_count,
-                    pn_delivery_map_available(&state->incoming),
-                    pn_delivery_map_available(&state->outgoing));
+                    state->incoming_window,
+                    state->outgoing_window);
       state->local_channel = channel;
       pn_hash_put(transport->local_channels, channel, ssn);
     }
@@ -2058,14 +2121,15 @@ int pn_process_link_setup(pn_transport_t
 
 int pn_post_flow(pn_transport_t *transport, pn_session_t *ssn, pn_link_t *link)
 {
-  ssn->state.incoming_window = pn_delivery_map_available(&ssn->state.incoming);
+  ssn->state.incoming_window = pn_session_incoming_window(ssn);
+  ssn->state.outgoing_window = pn_session_outgoing_window(ssn);
   bool linkq = (bool) link;
   pn_link_state_t *state = &link->state;
   return pn_post_frame(transport->disp, ssn->state.local_channel, "DL[?IIII?I?I?In?o]", FLOW,
                        (int16_t) ssn->state.remote_channel >= 0, ssn->state.incoming_transfer_count,
                        ssn->state.incoming_window,
-                       ssn->state.outgoing.next,
-                       pn_delivery_map_available(&ssn->state.outgoing),
+                       ssn->state.outgoing_transfer_count,
+                       ssn->state.outgoing_window,
                        linkq, linkq ? state->local_handle : 0,
                        linkq, linkq ? state->delivery_count : 0,
                        linkq, linkq ? state->link_credit : 0,
@@ -2171,33 +2235,40 @@ int pn_process_tpwork_sender(pn_transpor
   pn_link_state_t *link_state = &link->state;
   if ((int16_t) ssn_state->local_channel >= 0 && (int32_t) link_state->local_handle >= 0) {
     pn_delivery_state_t *state = delivery->state.init ? &delivery->state : NULL;
-    if (!(*allocation_blocked) && !state && pn_delivery_map_available(&ssn_state->outgoing)) {
+    if (!(*allocation_blocked) && !state) {
       state = pn_delivery_map_push(&ssn_state->outgoing, delivery);
     } else {
+      // XXX: I'm pretty sure this can never actually happen, however
+      // we may need some logic to block allocation if a delivery is
+      // blocked by link credit
       *allocation_blocked = true;
     }
 
     if (state && !state->sent && (delivery->done || pn_buffer_size(delivery->bytes) > 0) &&
-        ssn_state->outgoing_window > 0 && link_state->link_credit > 0) {
+        ssn_state->remote_incoming_window > 0 && link_state->link_credit > 0) {
       pn_bytes_t bytes = pn_buffer_bytes(delivery->bytes);
       pn_set_payload(transport->disp, bytes.start, bytes.size);
-      pn_buffer_clear(delivery->bytes);
+      link->session->outgoing_bytes -= bytes.size;
       pn_bytes_t tag = pn_buffer_bytes(delivery->tag);
-      int err = pn_post_transfer_frame(transport->disp,
-                                       ssn_state->local_channel,
-                                       link_state->local_handle,
-                                       state->id, &tag,
-                                       0, // message-format
-                                       delivery->local_settled,
-                                       !delivery->done);
-      if (err) return err;
-      ssn_state->outgoing_transfer_count++;
-      ssn_state->outgoing_window--;
-      if (delivery->done) {
+      int count = pn_post_transfer_frame(transport->disp,
+                                         ssn_state->local_channel,
+                                         link_state->local_handle,
+                                         state->id, &tag,
+                                         0, // message-format
+                                         delivery->local_settled,
+                                         !delivery->done,
+                                         ssn_state->remote_incoming_window);
+      if (count < 0) return count;
+      ssn_state->outgoing_transfer_count += count;
+      ssn_state->remote_incoming_window -= count;
+
+      pn_buffer_trim(delivery->bytes, bytes.size - transport->disp->output_size, 0);
+      if (!pn_buffer_size(delivery->bytes) && delivery->done) {
         state->sent = true;
         link_state->delivery_count++;
         link_state->link_credit--;
         link->queued--;
+        link->session->outgoing_deliveries--;
       }
     }
   }
@@ -2228,13 +2299,13 @@ int pn_process_tpwork_receiver(pn_transp
   }
 
   if (delivery->local_settled) {
-    size_t available = pn_delivery_map_available(&ssn->state.incoming);
     pn_full_settle(&ssn->state.incoming, delivery);
-    if (!ssn->state.incoming_window &&
-        pn_delivery_map_available(&ssn->state.incoming) > available) {
-      int err = pn_post_flow(transport, ssn, NULL);
-      if (err) return err;
-    }
+  }
+
+  // XXX: need to centralize this policy and improve it
+  if (!ssn->state.incoming_window) {
+    int err = pn_post_flow(transport, ssn, link);
+    if (err) return err;
   }
 
   return 0;
@@ -2621,6 +2692,7 @@ ssize_t pn_link_send(pn_link_t *sender, 
   pn_delivery_t *current = pn_link_current(sender);
   if (!current) return PN_EOS;
   pn_buffer_append(current->bytes, bytes, n);
+  sender->session->outgoing_bytes += n;
   pn_add_tpwork(current);
   return n;
 }
@@ -2643,6 +2715,10 @@ ssize_t pn_link_recv(pn_link_t *receiver
     size_t size = pn_buffer_get(delivery->bytes, 0, n, bytes);
     pn_buffer_trim(delivery->bytes, size, 0);
     if (size) {
+      receiver->session->incoming_bytes -= size;
+      if (!receiver->session->state.incoming_window) {
+        pn_add_tpwork(delivery);
+      }
       return size;
     } else {
       return delivery->done ? PN_EOS : 0;

Modified: qpid/proton/trunk/proton-c/src/messenger/messenger.c
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/messenger/messenger.c?rev=1491719&r1=1491718&r2=1491719&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/messenger/messenger.c (original)
+++ qpid/proton/trunk/proton-c/src/messenger/messenger.c Tue Jun 11 09:41:15 2013
@@ -446,7 +446,9 @@ int pni_pump_in(pn_messenger_t *messenge
   char *encoded = pn_buffer_bytes(buf).start;
   ssize_t n = pn_link_recv(receiver, encoded, pending);
   if (n != (ssize_t) pending) {
-    return pn_error_format(messenger->error, n, "didn't receive pending bytes: %" PN_ZI, n);
+    return pn_error_format(messenger->error, n,
+                           "didn't receive pending bytes: %" PN_ZI " %" PN_ZI,
+                           n, pending);
   }
   n = pn_link_recv(receiver, encoded + pending, 1);
   pn_link_advance(receiver);
@@ -478,7 +480,10 @@ void pn_messenger_endpoints(pn_messenger
     }
     pn_delivery_clear(d);
     if (pn_delivery_readable(d)) {
-      pni_pump_in(messenger, pn_terminus_get_address(pn_link_source(link)), link);
+      int err = pni_pump_in(messenger, pn_terminus_get_address(pn_link_source(link)), link);
+      if (err) {
+        fprintf(stderr, "%s\n", pn_messenger_error(messenger));
+      }
     }
     d = pn_work_next(d);
   }
@@ -947,12 +952,6 @@ int pn_messenger_get_outgoing_window(pn_
 
 int pn_messenger_set_outgoing_window(pn_messenger_t *messenger, int window)
 {
-  if (window >= PN_SESSION_WINDOW) {
-    return pn_error_format(messenger->error, PN_ARG_ERR,
-                           "specified window (%i) exceeds max (%i)",
-                           window, PN_SESSION_WINDOW);
-  }
-
   pni_store_set_window(messenger->outgoing, window);
   return 0;
 }
@@ -964,12 +963,6 @@ int pn_messenger_get_incoming_window(pn_
 
 int pn_messenger_set_incoming_window(pn_messenger_t *messenger, int window)
 {
-  if (window >= PN_SESSION_WINDOW) {
-    return pn_error_format(messenger->error, PN_ARG_ERR,
-                           "specified window (%i) exceeds max (%i)",
-                           window, PN_SESSION_WINDOW);
-  }
-
   pni_store_set_window(messenger->incoming, window);
   return 0;
 }

Modified: qpid/proton/trunk/proton-j/proton-api/src/main/java/org/apache/qpid/proton/engine/Delivery.java
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/proton-api/src/main/java/org/apache/qpid/proton/engine/Delivery.java?rev=1491719&r1=1491718&r2=1491719&view=diff
==============================================================================
--- qpid/proton/trunk/proton-j/proton-api/src/main/java/org/apache/qpid/proton/engine/Delivery.java (original)
+++ qpid/proton/trunk/proton-j/proton-api/src/main/java/org/apache/qpid/proton/engine/Delivery.java Tue Jun 11 09:41:15 2013
@@ -99,4 +99,7 @@ public interface Delivery
     public boolean isUpdated();
 
     public boolean isPartial();
+
+    public int pending();
+
 }

Modified: qpid/proton/trunk/proton-j/proton-api/src/main/java/org/apache/qpid/proton/engine/Transport.java
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/proton-api/src/main/java/org/apache/qpid/proton/engine/Transport.java?rev=1491719&r1=1491718&r2=1491719&view=diff
==============================================================================
--- qpid/proton/trunk/proton-j/proton-api/src/main/java/org/apache/qpid/proton/engine/Transport.java (original)
+++ qpid/proton/trunk/proton-j/proton-api/src/main/java/org/apache/qpid/proton/engine/Transport.java Tue Jun 11 09:41:15 2013
@@ -67,7 +67,7 @@ public interface Transport extends Endpo
 
     /** the lower bound for the agreed maximum frame size (in bytes). */
     public int MIN_MAX_FRAME_SIZE = 512;
-    public int SESSION_WINDOW = 1024;
+    public int SESSION_WINDOW = 16*1024;
     public int END_OF_STREAM = -1;
 
     public void bind(Connection connection);

Modified: qpid/proton/trunk/proton-j/proton-api/src/main/resources/proton.py
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/proton-api/src/main/resources/proton.py?rev=1491719&r1=1491718&r2=1491719&view=diff
==============================================================================
--- qpid/proton/trunk/proton-j/proton-api/src/main/resources/proton.py (original)
+++ qpid/proton/trunk/proton-j/proton-api/src/main/resources/proton.py Tue Jun 11 09:41:15 2013
@@ -56,8 +56,6 @@ class Constant(object):
 class Skipped(Exception):
   skipped = True
 
-PN_SESSION_WINDOW = JTransport.SESSION_WINDOW
-
 PENDING = "PENDING"
 ACCEPTED = "ACCEPTED"
 REJECTED = "REJECTED"
@@ -508,6 +506,10 @@ class Delivery(object):
   def work_next(self):
     return wrap_delivery(self.impl.getWorkNext())
 
+  @property
+  def pending(self):
+    return self.impl.pending()
+
 class Transport(object):
 
   TRACE_OFF = 0
@@ -1461,7 +1463,6 @@ __all__ = [
            "MANUAL",
            "PENDING",
            "REJECTED",
-           "PN_SESSION_WINDOW",
            "char",
            "Condition",
            "Connection",

Modified: qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/engine/impl/DeliveryImpl.java
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/engine/impl/DeliveryImpl.java?rev=1491719&r1=1491718&r2=1491719&view=diff
==============================================================================
--- qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/engine/impl/DeliveryImpl.java (original)
+++ qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/engine/impl/DeliveryImpl.java Tue Jun 11 09:41:15 2013
@@ -486,4 +486,9 @@ public class DeliveryImpl implements Del
         return builder.toString();
     }
 
+    public int pending()
+    {
+        return _dataSize;
+    }
+
 }

Modified: qpid/proton/trunk/tests/python/proton_tests/engine.py
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/tests/python/proton_tests/engine.py?rev=1491719&r1=1491718&r2=1491719&view=diff
==============================================================================
--- qpid/proton/trunk/tests/python/proton_tests/engine.py (original)
+++ qpid/proton/trunk/tests/python/proton_tests/engine.py Tue Jun 11 09:41:15 2013
@@ -899,7 +899,7 @@ class IdleTimeoutTest(Test):
 class CreditTest(Test):
 
   def setup(self):
-    self.snd, self.rcv = self.link("test-link")
+    self.snd, self.rcv = self.link("test-link", max_frame=(16*1024, 16*1024))
     self.c1 = self.snd.session.connection
     self.c2 = self.rcv.session.connection
     self.snd.open()
@@ -909,7 +909,7 @@ class CreditTest(Test):
   def teardown(self):
     self.cleanup()
 
-  def testCreditSender(self):
+  def testCreditSender(self, count=1024):
     credit = self.snd.credit
     assert credit == 0, credit
     self.rcv.flow(10)
@@ -917,10 +917,10 @@ class CreditTest(Test):
     credit = self.snd.credit
     assert credit == 10, credit
 
-    self.rcv.flow(PN_SESSION_WINDOW)
+    self.rcv.flow(count)
     self.pump()
     credit = self.snd.credit
-    assert credit == 10 + PN_SESSION_WINDOW, credit
+    assert credit == 10 + count, credit
 
   def testCreditReceiver(self):
     self.rcv.flow(10)
@@ -939,70 +939,6 @@ class CreditTest(Test):
     assert self.rcv.credit == 9, self.rcv.credit
     assert self.rcv.queued == 0, self.rcv.queued
 
-  def settle(self):
-    result = []
-    d = self.c1.work_head
-    while d:
-      if d.updated:
-        result.append(d.tag)
-        d.settle()
-      d = d.work_next
-    return result
-
-  def testBuffering(self):
-    self.rcv.flow(PN_SESSION_WINDOW + 10)
-    self.pump()
-
-    assert self.rcv.queued == 0, self.rcv.queued
-
-    idx = 0
-    while self.snd.credit:
-      d = self.snd.delivery("tag%s" % idx)
-      assert d
-      assert self.snd.advance()
-      self.pump()
-      idx += 1
-
-    assert idx == PN_SESSION_WINDOW + 10, idx
-
-    assert self.rcv.queued == PN_SESSION_WINDOW, self.rcv.queued
-
-    extra = self.snd.delivery("extra")
-    assert extra
-    assert self.snd.advance()
-    self.pump()
-
-    assert self.rcv.queued == PN_SESSION_WINDOW, self.rcv.queued
-
-    for i in range(10):
-      d = self.rcv.current
-      assert d.tag == "tag%s" % i, d.tag
-      assert self.rcv.advance()
-      d.settle()
-      self.pump()
-      assert self.rcv.queued == PN_SESSION_WINDOW - (i+1), self.rcv.queued
-
-    tags = self.settle()
-    assert tags == ["tag%s" % i for i in range(10)], tags
-    self.pump()
-
-    assert self.rcv.queued == PN_SESSION_WINDOW, self.rcv.queued
-
-    for i in range(PN_SESSION_WINDOW):
-      d = self.rcv.current
-      assert d, i
-      assert d.tag == "tag%s" % (i+10), d.tag
-      assert self.rcv.advance()
-      d.settle()
-      self.pump()
-
-    assert self.rcv.queued == 0, self.rcv.queued
-
-    tags = self.settle()
-    assert tags == ["tag%s" % (i+10) for i in range(PN_SESSION_WINDOW)]
-
-    assert self.rcv.queued == 0, self.rcv.queued
-
   def _testBufferingOnClose(self, a, b):
     for i in range(10):
       d = self.snd.delivery("tag-%s" % i)
@@ -1053,27 +989,6 @@ class CreditTest(Test):
   def testBufferingOnCloseConnectionConnection(self):
     self._testBufferingOnClose("connection", "connection")
 
-  def testCreditWithBuffering(self):
-    self.rcv.flow(PN_SESSION_WINDOW + 10)
-    self.pump()
-    assert self.snd.credit == PN_SESSION_WINDOW + 10, self.snd.credit
-    assert self.rcv.queued == 0, self.rcv.queued
-
-    idx = 0
-    while self.snd.credit:
-      d = self.snd.delivery("tag%s" % idx)
-      assert d
-      assert self.snd.advance()
-      self.pump()
-      idx += 1
-
-    assert idx == PN_SESSION_WINDOW + 10, idx
-    assert self.rcv.queued == PN_SESSION_WINDOW, self.rcv.queued
-
-    self.rcv.flow(1)
-    self.pump()
-    assert self.snd.credit == 1, self.snd.credit
-
   def testFullDrain(self):
     assert self.rcv.credit == 0
     assert self.snd.credit == 0
@@ -1197,6 +1112,116 @@ class CreditTest(Test):
     assert self.rcv.credit == 0
     assert self.rcv.queued == 0
 
+class SessionCreditTest(Test):
+
+  def teardown(self):
+    self.cleanup()
+
+  def testBuffering(self, count=32, size=1024, capacity=16*1024, max_frame=1024):
+    snd, rcv = self.link("test-link", max_frame=(max_frame, max_frame))
+    rcv.session.incoming_capacity = capacity
+    snd.open()
+    rcv.open()
+    rcv.flow(count)
+    self.pump()
+
+    assert count > 0
+
+    total_bytes = count * size
+
+    assert snd.session.outgoing_bytes == 0, snd.session.outgoing_bytes
+    assert rcv.session.incoming_bytes == 0, rcv.session.incoming_bytes
+    assert snd.queued == 0, snd.queued
+    assert rcv.queued == 0, rcv.queued
+
+    idx = 0
+    while snd.credit:
+      d = snd.delivery("tag%s" % idx)
+      assert d
+      n = snd.send("x"*size)
+      assert n == size, (n, size)
+      assert snd.advance()
+      self.pump()
+      idx += 1
+
+    assert idx == count, (idx, count)
+
+    assert snd.session.outgoing_bytes < total_bytes, (snd.session.outgoing_bytes, total_bytes)
+    assert rcv.session.incoming_bytes < capacity, (rcv.session.incoming_bytes, capacity)
+    if snd.session.outgoing_bytes > 0:
+      available = rcv.session.incoming_capacity - rcv.session.incoming_bytes
+      assert available < max_frame, available
+
+    for i in range(count):
+      d = rcv.current
+      pending = d.pending
+      before = rcv.session.incoming_bytes
+      assert rcv.advance()
+      after = rcv.session.incoming_bytes
+      assert before - after == pending
+      snd_before = snd.session.incoming_bytes
+      self.pump()
+      snd_after = snd.session.incoming_bytes
+
+      assert rcv.session.incoming_bytes < capacity
+      if snd_before > 0:
+        assert capacity - after <= max_frame
+        assert snd_before > snd_after
+      if snd_after > 0:
+        available = rcv.session.incoming_capacity - rcv.session.incoming_bytes
+        assert available < max_frame, available
+
+  def testBufferingSize16(self):
+    self.testBuffering(size=16)
+
+  def testBufferingSize256(self):
+    self.testBuffering(size=256)
+
+  def testBufferingSize512(self):
+    self.testBuffering(size=512)
+
+  def testBufferingSize2048(self):
+    self.testBuffering(size=2048)
+
+  def testBufferingSize1025(self):
+    self.testBuffering(size=1025)
+
+  def testBufferingSize1023(self):
+    self.testBuffering(size=1023)
+
+  def testBufferingSize989(self):
+    self.testBuffering(size=989)
+
+  def testBufferingSize1059(self):
+    self.testBuffering(size=1059)
+
+  def testCreditWithBuffering(self):
+    snd, rcv = self.link("test-link", max_frame=(1024, 1024))
+    rcv.session.incoming_capacity = 64*1024
+    snd.open()
+    rcv.open()
+    rcv.flow(128)
+    self.pump()
+
+    assert snd.credit == 128, snd.credit
+    assert rcv.queued == 0, rcv.queued
+
+    idx = 0
+    while snd.credit:
+      d = snd.delivery("tag%s" % idx)
+      snd.send("x"*1024)
+      assert d
+      assert snd.advance()
+      self.pump()
+      idx += 1
+
+    assert idx == 128, idx
+    assert rcv.queued < 128, rcv.queued
+
+    rcv.flow(1)
+    self.pump()
+    assert snd.credit == 1, snd.credit
+
 class SettlementTest(Test):
 
   def setup(self):
@@ -1266,6 +1291,53 @@ class SettlementTest(Test):
     assert self.snd.unsettled == 1, self.snd.unsettled
     assert self.rcv.unsettled == 0, self.rcv.unsettled
 
+  def testMultipleUnsettled(self, count=1024, size=1024):
+    self.rcv.flow(count)
+    self.pump()
+
+    assert self.snd.unsettled == 0, self.snd.unsettled
+    assert self.rcv.unsettled == 0, self.rcv.unsettled
+
+    unsettled = []
+
+    for i in range(count):
+      sd = self.snd.delivery("tag%s" % i)
+      assert sd
+      n = self.snd.send("x"*size)
+      assert n == size, n
+      assert self.snd.advance()
+      self.pump()
+
+      rd = self.rcv.current
+      assert rd, "did not receive delivery %s" % i
+      n = rd.pending
+      b = self.rcv.recv(n)
+      assert len(b) == n, (b, n)
+      rd.update(Delivery.ACCEPTED)
+      assert self.rcv.advance()
+      self.pump()
+      unsettled.append(rd)
+
+    assert self.rcv.unsettled == count
+
+    for rd in unsettled:
+      rd.settle()
+
+  def testMultipleUnsettled2K1K(self):
+    self.testMultipleUnsettled(2048, 1024)
+
+  def testMultipleUnsettled4K1K(self):
+    self.testMultipleUnsettled(4096, 1024)
+
+  def testMultipleUnsettled1K2K(self):
+    self.testMultipleUnsettled(1024, 2048)
+
+  def testMultipleUnsettled2K2K(self):
+    self.testMultipleUnsettled(2048, 2048)
+
+  def testMultipleUnsettled4K2K(self):
+    self.testMultipleUnsettled(4096, 2048)
+
 class PipelineTest(Test):
 
   def setup(self):

Modified: qpid/proton/trunk/tests/python/proton_tests/messenger.py
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/tests/python/proton_tests/messenger.py?rev=1491719&r1=1491718&r2=1491719&view=diff
==============================================================================
--- qpid/proton/trunk/tests/python/proton_tests/messenger.py (original)
+++ qpid/proton/trunk/tests/python/proton_tests/messenger.py Tue Jun 11 09:41:15 2013
@@ -285,43 +285,45 @@ class MessengerTest(Test):
     for t in trackers:
       assert self.client.status(t) is ACCEPTED, (t, self.client.status(t))
 
-  def testIncomingQueueBiggerThanWindow(self):
+  def testIncomingQueueBiggerThanWindow(self, size=10):
     if IMPLEMENTATION_LANGUAGE == "Java":
       # Currently fails with proton-j. See https://issues.apache.org/jira/browse/PROTON-315
       raise Skipped
 
-    self.server.outgoing_window = 10
-    self.client.incoming_window = 10
+    self.server.outgoing_window = size
+    self.client.incoming_window = size
     self.start()
 
     msg = Message()
     msg.address = "amqp://0.0.0.0:12345"
     msg.subject = "Hello World!"
 
-    for i in range(20):
+    for i in range(2*size):
       self.client.put(msg)
 
-    while self.client.incoming < 20:
-      self.client.recv(20 - self.client.incoming)
-
     trackers = []
-    while self.client.incoming:
-      t = self.client.get(msg)
-      assert self.client.status(t) is PENDING, (t, self.client.status(t))
-      trackers.append(t)
+    while len(trackers) < 2*size:
+      self.client.recv(2*size - len(trackers))
+      while self.client.incoming:
+        t = self.client.get(msg)
+        assert self.client.status(t) is PENDING, (t, self.client.status(t))
+        trackers.append(t)
 
-    for t in trackers[:10]:
+    for t in trackers[:size]:
       assert self.client.status(t) is None, (t, self.client.status(t))
-    for t in trackers[10:]:
+    for t in trackers[size:]:
       assert self.client.status(t) is PENDING, (t, self.client.status(t))
 
     self.client.accept()
 
-    for t in trackers[:10]:
+    for t in trackers[:size]:
       assert self.client.status(t) is None, (t, self.client.status(t))
-    for t in trackers[10:]:
+    for t in trackers[size:]:
       assert self.client.status(t) is ACCEPTED, (t, self.client.status(t))
 
+  def testIncomingQueueBiggerThanSessionWindow(self):
+    self.testIncomingQueueBiggerThanWindow(2048)
+
   def test_proton222(self):
     self.start()
     msg = Message()



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org