You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rh...@apache.org on 2012/11/02 16:05:07 UTC

svn commit: r1404992 - in /qpid/proton/trunk/proton-c/src: engine/engine-internal.h engine/engine.c util.h

Author: rhs
Date: Fri Nov  2 15:05:07 2012
New Revision: 1404992

URL: http://svn.apache.org/viewvc?rev=1404992&view=rev
Log:
moved resize macros to util.h; fixed bug in unsettled list; added simple batching of dispositions

Modified:
    qpid/proton/trunk/proton-c/src/engine/engine-internal.h
    qpid/proton/trunk/proton-c/src/engine/engine.c
    qpid/proton/trunk/proton-c/src/util.h

Modified: qpid/proton/trunk/proton-c/src/engine/engine-internal.h
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/engine/engine-internal.h?rev=1404992&r1=1404991&r2=1404992&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/engine/engine-internal.h (original)
+++ qpid/proton/trunk/proton-c/src/engine/engine-internal.h Fri Nov  2 15:05:07 2012
@@ -82,6 +82,12 @@ typedef struct {
   size_t link_capacity;
   pn_link_state_t **handles;
   size_t handle_capacity;
+
+  uint64_t disp_code;
+  bool disp_settled;
+  bool disp_type;
+  pn_sequence_t disp_first;
+  pn_sequence_t disp_last;
 } pn_session_state_t;
 
 #define SCRATCH (1024)
@@ -216,20 +222,6 @@ struct pn_delivery_t {
 
 void pn_link_dump(pn_link_t *link);
 
-#define PN_ENSURE(ARRAY, CAPACITY, COUNT)                      \
-  while ((CAPACITY) < (COUNT)) {                                \
-    (CAPACITY) = (CAPACITY) ? 2 * (CAPACITY) : 16;              \
-    (ARRAY) = realloc((ARRAY), (CAPACITY) * sizeof (*(ARRAY))); \
-  }                                                             \
-
-#define PN_ENSUREZ(ARRAY, CAPACITY, COUNT)                \
-  {                                                        \
-    size_t _old_capacity = (CAPACITY);                     \
-    PN_ENSURE((ARRAY), (CAPACITY), (COUNT));              \
-    memset((ARRAY) + _old_capacity, 0,                     \
-           sizeof(*(ARRAY))*((CAPACITY) - _old_capacity)); \
-  }
-
 void pn_dump(pn_connection_t *conn);
 void pn_transport_sasl_init(pn_transport_t *transport);
 

Modified: qpid/proton/trunk/proton-c/src/engine/engine.c
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/engine/engine.c?rev=1404992&r1=1404991&r2=1404992&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/engine/engine.c (original)
+++ qpid/proton/trunk/proton-c/src/engine/engine.c Fri Nov  2 15:05:07 2012
@@ -1151,12 +1151,20 @@ int pn_link_unsettled(pn_link_t *link)
 
 pn_delivery_t *pn_unsettled_head(pn_link_t *link)
 {
-  return link->unsettled_head;
+  pn_delivery_t *d = link->unsettled_head;
+  while (d && d->local_settled) {
+    d = d->unsettled_next;
+  }
+  return d;
 }
 
 pn_delivery_t *pn_unsettled_next(pn_delivery_t *delivery)
 {
-  return delivery->unsettled_next;
+  pn_delivery_t *d = delivery->unsettled_next;
+  while (d && d->local_settled) {
+    d = d->unsettled_next;
+  }
+  return d;
 }
 
 bool pn_is_current(pn_delivery_t *delivery)
@@ -1982,10 +1990,29 @@ int pn_process_flow_receiver(pn_transpor
   return 0;
 }
 
+int pn_flush_disp(pn_transport_t *transport, pn_session_state_t *ssn_state)
+{
+  uint64_t code = ssn_state->disp_code;
+  uint64_t settled = ssn_state->disp_settled;
+  if (code || settled) {
+    int err = pn_post_frame(transport->disp, ssn_state->local_channel, "DL[oIIo?DL[]]", DISPOSITION,
+                            ssn_state->disp_type, ssn_state->disp_first, ssn_state->disp_last,
+                            settled, (bool)code, code);
+    if (err) return err;
+    ssn_state->disp_type = 0;
+    ssn_state->disp_code = 0;
+    ssn_state->disp_settled = 0;
+    ssn_state->disp_first = 0;
+    ssn_state->disp_last = 0;
+  }
+  return 0;
+}
+
 int pn_post_disp(pn_transport_t *transport, pn_delivery_t *delivery)
 {
   pn_link_t *link = delivery->link;
   pn_session_state_t *ssn_state = pn_session_get_state(transport, link->session);
+  pn_modified(transport->connection, &link->session->endpoint);
   // XXX: check for null state
   pn_delivery_state_t *state = delivery->transport_context;
   uint64_t code;
@@ -1996,16 +2023,34 @@ int pn_post_disp(pn_transport_t *transpo
   case PN_RELEASED:
     code = RELEASED;
     break;
+  case PN_REJECTED:
+    code = REJECTED;
+    break;
     //TODO: rejected and modified (both take extra data which may need to be passed through somehow) e.g. change from enum to discriminated union?
   default:
     code = 0;
   }
 
-  if (code || delivery->local_settled)
-    return pn_post_frame(transport->disp, ssn_state->local_channel, "DL[oIIo?DL[]]", DISPOSITION,
-                         link->endpoint.type == RECEIVER, state->id, state->id,
-                         delivery->local_settled,
-                         (bool)code, code);
+  if (code == ssn_state->disp_code && delivery->local_settled == ssn_state->disp_settled &&
+      ssn_state->disp_type == (link->endpoint.type == RECEIVER)) {
+    if (state->id == ssn_state->disp_first - 1) {
+      ssn_state->disp_first = state->id;
+      return 0;
+    } else if (state->id == ssn_state->disp_last + 1) {
+      ssn_state->disp_last = state->id;
+      return 0;
+    }
+  }
+
+  int err = pn_flush_disp(transport, ssn_state);
+  if (err) return err;
+
+  ssn_state->disp_type = (link->endpoint.type == RECEIVER);
+  ssn_state->disp_code = code;
+  ssn_state->disp_settled = delivery->local_settled;
+  ssn_state->disp_first = state->id;
+  ssn_state->disp_last = state->id;
+
   return 0;
 }
 
@@ -2112,6 +2157,21 @@ int pn_process_tpwork(pn_transport_t *tr
   return 0;
 }
 
+int pn_process_flush_disp(pn_transport_t *transport, pn_endpoint_t *endpoint)
+{
+  if (endpoint->type == SESSION) {
+    pn_session_t *session = (pn_session_t *) endpoint;
+    pn_session_state_t *state = pn_session_get_state(transport, session);
+    if ((int16_t) state->local_channel >= 0 && !transport->close_sent)
+    {
+      int err = pn_flush_disp(transport, state);
+      if (err) return err;
+    }
+  }
+
+  return 0;
+}
+
 int pn_process_flow_sender(pn_transport_t *transport, pn_endpoint_t *endpoint)
 {
   if (endpoint->type == SENDER && endpoint->state & PN_LOCAL_ACTIVE)
@@ -2251,6 +2311,8 @@ int pn_process(pn_transport_t *transport
   if ((err = pn_phase(transport, pn_process_tpwork))) return err;
   if ((err = pn_phase(transport, pn_process_tpwork))) return err;
 
+  if ((err = pn_phase(transport, pn_process_flush_disp))) return err;
+
   if ((err = pn_phase(transport, pn_process_flow_sender))) return err;
   if ((err = pn_phase(transport, pn_process_link_teardown))) return err;
   if ((err = pn_phase(transport, pn_process_ssn_teardown))) return err;

Modified: qpid/proton/trunk/proton-c/src/util.h
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/util.h?rev=1404992&r1=1404991&r2=1404992&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/util.h (original)
+++ qpid/proton/trunk/proton-c/src/util.h Fri Nov  2 15:05:07 2012
@@ -95,4 +95,18 @@ char *pn_strndup(const char *src, size_t
 #define pn_min(X,Y) ((X) > (Y) ? (Y) : (X))
 #define pn_max(X,Y) ((X) < (Y) ? (Y) : (X))
 
+#define PN_ENSURE(ARRAY, CAPACITY, COUNT)                      \
+  while ((CAPACITY) < (COUNT)) {                                \
+    (CAPACITY) = (CAPACITY) ? 2 * (CAPACITY) : 16;              \
+    (ARRAY) = realloc((ARRAY), (CAPACITY) * sizeof (*(ARRAY))); \
+  }                                                             \
+
+#define PN_ENSUREZ(ARRAY, CAPACITY, COUNT)                \
+  {                                                        \
+    size_t _old_capacity = (CAPACITY);                     \
+    PN_ENSURE((ARRAY), (CAPACITY), (COUNT));               \
+    memset((ARRAY) + _old_capacity, 0,                     \
+           sizeof(*(ARRAY))*((CAPACITY) - _old_capacity)); \
+  }
+
 #endif /* util.h */



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org