You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kg...@apache.org on 2013/02/13 21:04:43 UTC

svn commit: r1445894 - in /qpid/proton/branches/kgiusti-proton-225/proton-c: include/proton/engine.h src/engine/engine-internal.h src/engine/engine.c

Author: kgiusti
Date: Wed Feb 13 20:04:43 2013
New Revision: 1445894

URL: http://svn.apache.org/r1445894
Log:
PROTON-225: modify transport API to support internal I/O buffer

Modified:
    qpid/proton/branches/kgiusti-proton-225/proton-c/include/proton/engine.h
    qpid/proton/branches/kgiusti-proton-225/proton-c/src/engine/engine-internal.h
    qpid/proton/branches/kgiusti-proton-225/proton-c/src/engine/engine.c

Modified: qpid/proton/branches/kgiusti-proton-225/proton-c/include/proton/engine.h
URL: http://svn.apache.org/viewvc/qpid/proton/branches/kgiusti-proton-225/proton-c/include/proton/engine.h?rev=1445894&r1=1445893&r2=1445894&view=diff
==============================================================================
--- qpid/proton/branches/kgiusti-proton-225/proton-c/include/proton/engine.h (original)
+++ qpid/proton/branches/kgiusti-proton-225/proton-c/include/proton/engine.h Wed Feb 13 20:04:43 2013
@@ -283,8 +283,74 @@ void pn_connection_set_context(pn_connec
 
 // transport
 pn_error_t *pn_transport_error(pn_transport_t *transport);
+/* deprecated */
 ssize_t pn_transport_input(pn_transport_t *transport, const char *bytes, size_t available);
+/* deprecated */
 ssize_t pn_transport_output(pn_transport_t *transport, char *bytes, size_t size);
+
+/** Report the amount of free space in a transport's input buffer. If
+ * the engine is in an error state or has reached the end of stream
+ * state, a negative value will be returned indication the condition.
+ *
+ * @param[in] transport the transport
+ * @return the free space in the transport, or error state if < 0
+ */
+ssize_t pn_transport_capacity(pn_transport_t *transport);
+
+/** Return a pointer to a transport's input buffer. This pointer may
+ * change when calls into the engine are made. The amount of space in
+ * this buffer is reported by ::pn_transport_capacity. Calls to
+ * ::pn_transport_push may change the value of this pointer and the
+ * amount of free space reported by ::pn_transport_capacity.
+ *
+ * @param[in] transport the transport
+ * @return a pointer to the transport's input buffer, NULL if no capacity available.
+ */
+char *pn_transport_buffer(pn_transport_t *transport);
+
+/** Push data from a transport's input buffer into the engine and
+ * return how much data was succesfully pushed.  The number of bytes
+ * written to the input buffer (via the pointer supplied by
+ * ::pn_transport_buffer) must be no greater than the value returned
+ * by ::pn_transport_capacity.
+ *
+ * @param[in] transport the transport
+ * @param[size] the amount of data written to the transport's input buffer
+ * @return the amount of data consumed, or error code if < 0
+ */
+ssize_t pn_transport_push(pn_transport_t *transport, size_t size);
+
+/** Indicate that the input has reached End Of Stream (EOS).  This
+ * tells the transport that no more input will be forthcoming.
+ *
+ * @param[in] transport the transport
+ * @return 0 on success, or error code if < 0
+ */
+int pn_transport_push_eos(pn_transport_t *transport);
+
+/** Return the number of pending output bytes for a transport, or a
+ * negative error code if the engine is in an error state.
+ *
+ * @param[in] the transport
+ * @return the number of pending output bytes, or an error code
+ */
+ssize_t pn_transport_pending(pn_transport_t *transport);
+
+/** Return a pointer to a transport's output buffer. Any calls into
+ * the engine may change the value of this pointer and it's contents.
+ *
+ * @param[in] the transport
+ * @return a pointer to the transport's output buffer, or NULL if no pending output.
+ */
+const char *pn_transport_peek(pn_transport_t *transport);
+
+/** Remove bytes from the head of the output buffer for a transport.
+ *
+ * @param[in] the transport
+ * @param[size] the number of bytes to remove
+ */
+void pn_transport_pop(pn_transport_t *transport, size_t size);
+
 /** Process any pending transport timer events.
  *
  * This method should be called after all pending input has been processed by the

Modified: qpid/proton/branches/kgiusti-proton-225/proton-c/src/engine/engine-internal.h
URL: http://svn.apache.org/viewvc/qpid/proton/branches/kgiusti-proton-225/proton-c/src/engine/engine-internal.h?rev=1445894&r1=1445893&r2=1445894&view=diff
==============================================================================
--- qpid/proton/branches/kgiusti-proton-225/proton-c/src/engine/engine-internal.h (original)
+++ qpid/proton/branches/kgiusti-proton-225/proton-c/src/engine/engine-internal.h Wed Feb 13 20:04:43 2013
@@ -129,6 +129,8 @@ struct pn_transport_t {
   char *remote_hostname;
   pn_data_t *remote_offered_capabilities;
   pn_data_t *remote_desired_capabilities;
+  //#define PN_DEFAULT_MAX_FRAME_SIZE (16*1024)
+#define PN_DEFAULT_MAX_FRAME_SIZE (0)  /* for now, allow unlimited size */
   uint32_t   local_max_frame;
   uint32_t   remote_max_frame;
   pn_condition_t remote_condition;
@@ -159,6 +161,18 @@ struct pn_transport_t {
   /* statistics */
   uint64_t bytes_input;
   uint64_t bytes_output;
+
+  /* output buffered for send */
+  size_t output_size;
+  size_t output_pending;
+  char *output_buf;
+
+  /* input from peer */
+  size_t input_size;
+  size_t input_pending;
+  char *input_buf;
+  bool eos_pushed;      // input stream closed by driver
+
 };
 
 struct pn_connection_t {

Modified: qpid/proton/branches/kgiusti-proton-225/proton-c/src/engine/engine.c
URL: http://svn.apache.org/viewvc/qpid/proton/branches/kgiusti-proton-225/proton-c/src/engine/engine.c?rev=1445894&r1=1445893&r2=1445894&view=diff
==============================================================================
--- qpid/proton/branches/kgiusti-proton-225/proton-c/src/engine/engine.c (original)
+++ qpid/proton/branches/kgiusti-proton-225/proton-c/src/engine/engine.c Wed Feb 13 20:04:43 2013
@@ -33,6 +33,8 @@
 #include "../ssl/ssl-internal.h"
 #include "../platform_fmt.h"
 
+static ssize_t transport_consume(pn_transport_t *transport);
+
 // delivery buffers
 
 void pn_delivery_buffer_init(pn_delivery_buffer_t *db, pn_sequence_t next, size_t capacity)
@@ -245,6 +247,8 @@ void pn_transport_free(pn_transport_t *t
   pn_condition_tini(&transport->remote_condition);
   free(transport->sessions);
   free(transport->channels);
+  free(transport->input_buf);
+  free(transport->output_buf);
   free(transport);
 }
 
@@ -772,9 +776,10 @@ void pn_transport_init(pn_transport_t *t
   transport->open_rcvd = false;
   transport->close_sent = false;
   transport->close_rcvd = false;
+  transport->eos_pushed = false;
   transport->remote_container = NULL;
   transport->remote_hostname = NULL;
-  transport->local_max_frame = 0;
+  transport->local_max_frame = PN_DEFAULT_MAX_FRAME_SIZE;
   transport->remote_max_frame = 0;
   transport->local_idle_timeout = 0;
   transport->dead_remote_deadline = 0;
@@ -795,6 +800,9 @@ void pn_transport_init(pn_transport_t *t
 
   transport->bytes_input = 0;
   transport->bytes_output = 0;
+
+  transport->input_pending = 0;
+  transport->output_pending = 0;
 }
 
 pn_session_state_t *pn_session_get_state(pn_transport_t *transport, pn_session_t *ssn)
@@ -830,6 +838,19 @@ pn_transport_t *pn_transport()
 {
   pn_transport_t *transport = (pn_transport_t *) malloc(sizeof(pn_transport_t));
   if (!transport) return NULL;
+  transport->output_size = PN_DEFAULT_MAX_FRAME_SIZE ? PN_DEFAULT_MAX_FRAME_SIZE : 16 * 1024;
+  transport->output_buf = (char *) malloc(transport->output_size);
+  if (!transport->output_buf) {
+    free(transport);
+    return NULL;
+  }
+  transport->input_size =  PN_DEFAULT_MAX_FRAME_SIZE ? PN_DEFAULT_MAX_FRAME_SIZE : 16 * 1024;
+  transport->input_buf = (char *) malloc(transport->input_size);
+  if (!transport->input_buf) {
+    free(transport->output_buf);
+    free(transport);
+    return NULL;
+  }
 
   transport->connection = NULL;
   pn_transport_init(transport);
@@ -847,6 +868,7 @@ int pn_transport_bind(pn_transport_t *tr
     PN_SET_REMOTE(connection->endpoint.state, PN_REMOTE_ACTIVE);
     if (!pn_error_code(transport->error)) {
       transport->disp->halt = false;
+      transport_consume(transport);        // blech - testBindAfterOpen
     }
   }
   return 0;
@@ -1825,21 +1847,46 @@ int pn_do_close(pn_dispatcher_t *disp)
   return 0;
 }
 
+// deprecated
 ssize_t pn_transport_input(pn_transport_t *transport, const char *bytes, size_t available)
 {
   if (!transport) return PN_ARG_ERR;
+  if (available == 0) {
+    return pn_transport_push_eos(transport);
+  }
+  const size_t original = available;
+  ssize_t capacity = pn_transport_capacity(transport);
+  if (capacity < 0) return capacity;
+  while (available && capacity) {
+    char *dest = pn_transport_buffer(transport);
+    assert(dest);
+    size_t count = pn_min( (size_t)capacity, available );
+    memmove( dest, bytes, count );
+    available -= count;
+    bytes += count;
+    int rc = pn_transport_push( transport, count );
+    if (rc < 0) return rc;
+    capacity = pn_transport_capacity(transport);
+    if (capacity < 0) return capacity;
+  }
 
+  return original - available;
+}
+
+// process pending input until none remaining or EOS
+static ssize_t transport_consume(pn_transport_t *transport)
+{
   pn_io_layer_t *io_layer = transport->io_layers;
   size_t consumed = 0;
 
-  while (true) {
+  while (transport->input_pending || transport->eos_pushed) {
     ssize_t n;
-    n = io_layer->process_input( io_layer, bytes + consumed, available - consumed);
+    n = io_layer->process_input( io_layer,
+                                 transport->input_buf + consumed,
+                                 transport->input_pending );
     if (n > 0) {
       consumed += n;
-      if (consumed >= available) {
-        break;
-      }
+      transport->input_pending -= n;
     } else if (n == 0) {
       break;
     } else {
@@ -1850,11 +1897,15 @@ ssize_t pn_transport_input(pn_transport_
       }
       if (transport->disp->trace & (PN_TRACE_RAW | PN_TRACE_FRM))
         pn_dispatcher_trace(transport->disp, 0, "<- EOS\n");
+      transport->input_pending = 0;  // XXX ???
       return n;
     }
   }
 
-  transport->bytes_input += consumed;
+  if (transport->input_pending && consumed) {
+    memmove( transport->input_buf,  &transport->input_buf[consumed], transport->input_pending );
+  }
+
   return consumed;
 }
 
@@ -2531,23 +2582,43 @@ static ssize_t pn_output_write_amqp(pn_i
   return pn_dispatcher_output(transport->disp, bytes, size);
 }
 
-ssize_t pn_transport_output(pn_transport_t *transport, char *bytes, size_t size)
+// generate outbound data, return amount of pending output else error
+static ssize_t transport_produce(pn_transport_t *transport)
 {
-  if (!transport) return PN_ARG_ERR;
-
   pn_io_layer_t *io_layer = transport->io_layers;
-  size_t total = 0;
+  ssize_t space = transport->output_size - transport->output_pending;
+
+  if (space == 0) {     // can we expand the buffer?
+    int more = 0;
+    if (!transport->remote_max_frame)   // no limit, so double it
+      more = transport->output_size;
+    else if (transport->remote_max_frame > transport->output_size)
+      more = transport->remote_max_frame - transport->output_size;
+    if (more) {
+      char *newbuf = (char *)malloc( transport->output_size + more );
+      if (newbuf) {
+        memmove( newbuf, transport->output_buf, transport->output_pending );
+        free( transport->output_buf );
+        transport->output_buf = newbuf;
+        transport->output_size += more;
+        space = more;
+      }
+    }
+  }
 
-  while (size - total > 0) {
+  while (space > 0) {
     ssize_t n;
-    n = io_layer->process_output( io_layer, bytes + total, size - total);
+    n = io_layer->process_output( io_layer,
+                                  &transport->output_buf[transport->output_pending],
+                                  space );
     if (n > 0) {
-      total += n;
+      space -= n;
+      transport->output_pending += n;
     } else if (n == 0) {
       break;
     } else {
-      if (total > 0)
-        break;   // return what was output
+      if (transport->output_pending)
+        break;   // return what is available
       if (transport->disp->trace & (PN_TRACE_RAW | PN_TRACE_FRM)) {
         if (n == PN_EOS)
           pn_dispatcher_trace(transport->disp, 0, "-> EOS\n");
@@ -2558,11 +2629,23 @@ ssize_t pn_transport_output(pn_transport
       return n;
     }
   }
+  return transport->output_pending;
+}
 
-  transport->bytes_output += total;
-  return total;
+// deprecated
+ssize_t pn_transport_output(pn_transport_t *transport, char *bytes, size_t size)
+{
+  if (!transport) return PN_ARG_ERR;
+  ssize_t available = pn_transport_pending(transport);
+  if (available > 0) {
+    available = (ssize_t) pn_min( (size_t)available, size );
+    memmove( bytes, pn_transport_peek(transport), available );
+    pn_transport_pop( transport, (size_t) available );
+  }
+  return available;
 }
 
+
 void pn_transport_trace(pn_transport_t *transport, pn_trace_t trace)
 {
   if (transport->sasl) pn_sasl_trace(transport->sasl, trace);
@@ -2577,6 +2660,7 @@ uint32_t pn_transport_get_max_frame(pn_t
 
 void pn_transport_set_max_frame(pn_transport_t *transport, uint32_t size)
 {
+  // if size == 0, no advertised limit to input frame size.
   if (size && size < AMQP_MIN_MAX_FRAME_SIZE)
     size = AMQP_MIN_MAX_FRAME_SIZE;
   transport->local_max_frame = size;
@@ -2908,3 +2992,93 @@ pn_timestamp_t pn_io_layer_tick_passthru
     return next->process_tick( next, now );
   return 0;
 }
+
+
+///
+
+// input
+ssize_t pn_transport_capacity(pn_transport_t *transport)  /* <0 == done */
+{
+  if (pn_error_code(transport->error)) return pn_error_code(transport->error);
+
+  ssize_t capacity = transport->input_size - transport->input_pending;
+  if (!capacity) {
+    // can we expand the size of the input buffer?
+    int more = 0;
+    if (!transport->local_max_frame) {  // no limit (ha!)
+      more = transport->input_size;
+    } else if (transport->local_max_frame > transport->input_size) {
+      more = transport->local_max_frame - transport->input_size;
+    }
+    if (more) {
+      char *newbuf = (char *) malloc( transport->input_size + more );
+      if (newbuf) {
+        memmove( newbuf, transport->input_buf, transport->input_pending );
+        free( transport->input_buf );
+        transport->input_buf = newbuf;
+        transport->input_size += more;
+        capacity = more;
+      }
+    }
+  }
+  return capacity;
+}
+
+
+char *pn_transport_buffer(pn_transport_t *transport)
+{
+  if (transport && transport->input_pending < transport->input_size) {
+    return &transport->input_buf[transport->input_pending];
+  }
+  return NULL;
+}
+
+ssize_t pn_transport_push(pn_transport_t *transport, size_t size)
+{
+  if (!transport) return PN_ARG_ERR;
+  size = pn_min( size, (transport->input_size - transport->input_pending) );
+  transport->input_pending += size;
+  transport->bytes_input += size;
+
+  ssize_t n = transport_consume( transport );
+  if (n < 0) return n;
+  return size;
+}
+
+// input stream has closed
+int pn_transport_push_eos(pn_transport_t *transport)
+{
+  transport->eos_pushed = true;
+  ssize_t x = transport_consume( transport );
+  if (x < 0) return (int) x;
+  return 0;
+  // @todo: what if not all input processed at this point?  do we care???
+}
+
+// output
+ssize_t pn_transport_pending(pn_transport_t *transport)      /* <0 == done */
+{
+  if (!transport) return PN_ARG_ERR;
+  return transport_produce( transport );
+}
+
+const char *pn_transport_peek(pn_transport_t *transport)
+{
+  if (transport && transport->output_pending) {
+    return transport->output_buf;
+  }
+  return NULL;
+}
+
+void pn_transport_pop(pn_transport_t *transport, size_t size)
+{
+  if (transport && size) {
+    assert( transport->output_pending >= size );
+    transport->output_pending -= size;
+    transport->bytes_output += size;
+    if (transport->output_pending) {
+      memmove( transport->output_buf,  &transport->output_buf[size],
+               transport->output_pending );
+    }
+  }
+}



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org