You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kg...@apache.org on 2013/02/13 21:04:43 UTC
svn commit: r1445894 - in /qpid/proton/branches/kgiusti-proton-225/proton-c:
include/proton/engine.h src/engine/engine-internal.h src/engine/engine.c
Author: kgiusti
Date: Wed Feb 13 20:04:43 2013
New Revision: 1445894
URL: http://svn.apache.org/r1445894
Log:
PROTON-225: modify transport API to support internal I/O buffer
Modified:
qpid/proton/branches/kgiusti-proton-225/proton-c/include/proton/engine.h
qpid/proton/branches/kgiusti-proton-225/proton-c/src/engine/engine-internal.h
qpid/proton/branches/kgiusti-proton-225/proton-c/src/engine/engine.c
Modified: qpid/proton/branches/kgiusti-proton-225/proton-c/include/proton/engine.h
URL: http://svn.apache.org/viewvc/qpid/proton/branches/kgiusti-proton-225/proton-c/include/proton/engine.h?rev=1445894&r1=1445893&r2=1445894&view=diff
==============================================================================
--- qpid/proton/branches/kgiusti-proton-225/proton-c/include/proton/engine.h (original)
+++ qpid/proton/branches/kgiusti-proton-225/proton-c/include/proton/engine.h Wed Feb 13 20:04:43 2013
@@ -283,8 +283,74 @@ void pn_connection_set_context(pn_connec
// transport
pn_error_t *pn_transport_error(pn_transport_t *transport);
+/* deprecated */
ssize_t pn_transport_input(pn_transport_t *transport, const char *bytes, size_t available);
+/* deprecated */
ssize_t pn_transport_output(pn_transport_t *transport, char *bytes, size_t size);
+
+/** Report the amount of free space in a transport's input buffer. If
+ * the engine is in an error state or has reached the end of stream
+ * state, a negative value will be returned indication the condition.
+ *
+ * @param[in] transport the transport
+ * @return the free space in the transport, or error state if < 0
+ */
+ssize_t pn_transport_capacity(pn_transport_t *transport);
+
+/** Return a pointer to a transport's input buffer. This pointer may
+ * change when calls into the engine are made. The amount of space in
+ * this buffer is reported by ::pn_transport_capacity. Calls to
+ * ::pn_transport_push may change the value of this pointer and the
+ * amount of free space reported by ::pn_transport_capacity.
+ *
+ * @param[in] transport the transport
+ * @return a pointer to the transport's input buffer, NULL if no capacity available.
+ */
+char *pn_transport_buffer(pn_transport_t *transport);
+
+/** Push data from a transport's input buffer into the engine and
+ * return how much data was succesfully pushed. The number of bytes
+ * written to the input buffer (via the pointer supplied by
+ * ::pn_transport_buffer) must be no greater than the value returned
+ * by ::pn_transport_capacity.
+ *
+ * @param[in] transport the transport
+ * @param[size] the amount of data written to the transport's input buffer
+ * @return the amount of data consumed, or error code if < 0
+ */
+ssize_t pn_transport_push(pn_transport_t *transport, size_t size);
+
+/** Indicate that the input has reached End Of Stream (EOS). This
+ * tells the transport that no more input will be forthcoming.
+ *
+ * @param[in] transport the transport
+ * @return 0 on success, or error code if < 0
+ */
+int pn_transport_push_eos(pn_transport_t *transport);
+
+/** Return the number of pending output bytes for a transport, or a
+ * negative error code if the engine is in an error state.
+ *
+ * @param[in] the transport
+ * @return the number of pending output bytes, or an error code
+ */
+ssize_t pn_transport_pending(pn_transport_t *transport);
+
+/** Return a pointer to a transport's output buffer. Any calls into
+ * the engine may change the value of this pointer and it's contents.
+ *
+ * @param[in] the transport
+ * @return a pointer to the transport's output buffer, or NULL if no pending output.
+ */
+const char *pn_transport_peek(pn_transport_t *transport);
+
+/** Remove bytes from the head of the output buffer for a transport.
+ *
+ * @param[in] the transport
+ * @param[size] the number of bytes to remove
+ */
+void pn_transport_pop(pn_transport_t *transport, size_t size);
+
/** Process any pending transport timer events.
*
* This method should be called after all pending input has been processed by the
Modified: qpid/proton/branches/kgiusti-proton-225/proton-c/src/engine/engine-internal.h
URL: http://svn.apache.org/viewvc/qpid/proton/branches/kgiusti-proton-225/proton-c/src/engine/engine-internal.h?rev=1445894&r1=1445893&r2=1445894&view=diff
==============================================================================
--- qpid/proton/branches/kgiusti-proton-225/proton-c/src/engine/engine-internal.h (original)
+++ qpid/proton/branches/kgiusti-proton-225/proton-c/src/engine/engine-internal.h Wed Feb 13 20:04:43 2013
@@ -129,6 +129,8 @@ struct pn_transport_t {
char *remote_hostname;
pn_data_t *remote_offered_capabilities;
pn_data_t *remote_desired_capabilities;
+ //#define PN_DEFAULT_MAX_FRAME_SIZE (16*1024)
+#define PN_DEFAULT_MAX_FRAME_SIZE (0) /* for now, allow unlimited size */
uint32_t local_max_frame;
uint32_t remote_max_frame;
pn_condition_t remote_condition;
@@ -159,6 +161,18 @@ struct pn_transport_t {
/* statistics */
uint64_t bytes_input;
uint64_t bytes_output;
+
+ /* output buffered for send */
+ size_t output_size;
+ size_t output_pending;
+ char *output_buf;
+
+ /* input from peer */
+ size_t input_size;
+ size_t input_pending;
+ char *input_buf;
+ bool eos_pushed; // input stream closed by driver
+
};
struct pn_connection_t {
Modified: qpid/proton/branches/kgiusti-proton-225/proton-c/src/engine/engine.c
URL: http://svn.apache.org/viewvc/qpid/proton/branches/kgiusti-proton-225/proton-c/src/engine/engine.c?rev=1445894&r1=1445893&r2=1445894&view=diff
==============================================================================
--- qpid/proton/branches/kgiusti-proton-225/proton-c/src/engine/engine.c (original)
+++ qpid/proton/branches/kgiusti-proton-225/proton-c/src/engine/engine.c Wed Feb 13 20:04:43 2013
@@ -33,6 +33,8 @@
#include "../ssl/ssl-internal.h"
#include "../platform_fmt.h"
+static ssize_t transport_consume(pn_transport_t *transport);
+
// delivery buffers
void pn_delivery_buffer_init(pn_delivery_buffer_t *db, pn_sequence_t next, size_t capacity)
@@ -245,6 +247,8 @@ void pn_transport_free(pn_transport_t *t
pn_condition_tini(&transport->remote_condition);
free(transport->sessions);
free(transport->channels);
+ free(transport->input_buf);
+ free(transport->output_buf);
free(transport);
}
@@ -772,9 +776,10 @@ void pn_transport_init(pn_transport_t *t
transport->open_rcvd = false;
transport->close_sent = false;
transport->close_rcvd = false;
+ transport->eos_pushed = false;
transport->remote_container = NULL;
transport->remote_hostname = NULL;
- transport->local_max_frame = 0;
+ transport->local_max_frame = PN_DEFAULT_MAX_FRAME_SIZE;
transport->remote_max_frame = 0;
transport->local_idle_timeout = 0;
transport->dead_remote_deadline = 0;
@@ -795,6 +800,9 @@ void pn_transport_init(pn_transport_t *t
transport->bytes_input = 0;
transport->bytes_output = 0;
+
+ transport->input_pending = 0;
+ transport->output_pending = 0;
}
pn_session_state_t *pn_session_get_state(pn_transport_t *transport, pn_session_t *ssn)
@@ -830,6 +838,19 @@ pn_transport_t *pn_transport()
{
pn_transport_t *transport = (pn_transport_t *) malloc(sizeof(pn_transport_t));
if (!transport) return NULL;
+ transport->output_size = PN_DEFAULT_MAX_FRAME_SIZE ? PN_DEFAULT_MAX_FRAME_SIZE : 16 * 1024;
+ transport->output_buf = (char *) malloc(transport->output_size);
+ if (!transport->output_buf) {
+ free(transport);
+ return NULL;
+ }
+ transport->input_size = PN_DEFAULT_MAX_FRAME_SIZE ? PN_DEFAULT_MAX_FRAME_SIZE : 16 * 1024;
+ transport->input_buf = (char *) malloc(transport->input_size);
+ if (!transport->input_buf) {
+ free(transport->output_buf);
+ free(transport);
+ return NULL;
+ }
transport->connection = NULL;
pn_transport_init(transport);
@@ -847,6 +868,7 @@ int pn_transport_bind(pn_transport_t *tr
PN_SET_REMOTE(connection->endpoint.state, PN_REMOTE_ACTIVE);
if (!pn_error_code(transport->error)) {
transport->disp->halt = false;
+ transport_consume(transport); // blech - testBindAfterOpen
}
}
return 0;
@@ -1825,21 +1847,46 @@ int pn_do_close(pn_dispatcher_t *disp)
return 0;
}
+// deprecated
ssize_t pn_transport_input(pn_transport_t *transport, const char *bytes, size_t available)
{
if (!transport) return PN_ARG_ERR;
+ if (available == 0) {
+ return pn_transport_push_eos(transport);
+ }
+ const size_t original = available;
+ ssize_t capacity = pn_transport_capacity(transport);
+ if (capacity < 0) return capacity;
+ while (available && capacity) {
+ char *dest = pn_transport_buffer(transport);
+ assert(dest);
+ size_t count = pn_min( (size_t)capacity, available );
+ memmove( dest, bytes, count );
+ available -= count;
+ bytes += count;
+ int rc = pn_transport_push( transport, count );
+ if (rc < 0) return rc;
+ capacity = pn_transport_capacity(transport);
+ if (capacity < 0) return capacity;
+ }
+ return original - available;
+}
+
+// process pending input until none remaining or EOS
+static ssize_t transport_consume(pn_transport_t *transport)
+{
pn_io_layer_t *io_layer = transport->io_layers;
size_t consumed = 0;
- while (true) {
+ while (transport->input_pending || transport->eos_pushed) {
ssize_t n;
- n = io_layer->process_input( io_layer, bytes + consumed, available - consumed);
+ n = io_layer->process_input( io_layer,
+ transport->input_buf + consumed,
+ transport->input_pending );
if (n > 0) {
consumed += n;
- if (consumed >= available) {
- break;
- }
+ transport->input_pending -= n;
} else if (n == 0) {
break;
} else {
@@ -1850,11 +1897,15 @@ ssize_t pn_transport_input(pn_transport_
}
if (transport->disp->trace & (PN_TRACE_RAW | PN_TRACE_FRM))
pn_dispatcher_trace(transport->disp, 0, "<- EOS\n");
+ transport->input_pending = 0; // XXX ???
return n;
}
}
- transport->bytes_input += consumed;
+ if (transport->input_pending && consumed) {
+ memmove( transport->input_buf, &transport->input_buf[consumed], transport->input_pending );
+ }
+
return consumed;
}
@@ -2531,23 +2582,43 @@ static ssize_t pn_output_write_amqp(pn_i
return pn_dispatcher_output(transport->disp, bytes, size);
}
-ssize_t pn_transport_output(pn_transport_t *transport, char *bytes, size_t size)
+// generate outbound data, return amount of pending output else error
+static ssize_t transport_produce(pn_transport_t *transport)
{
- if (!transport) return PN_ARG_ERR;
-
pn_io_layer_t *io_layer = transport->io_layers;
- size_t total = 0;
+ ssize_t space = transport->output_size - transport->output_pending;
+
+ if (space == 0) { // can we expand the buffer?
+ int more = 0;
+ if (!transport->remote_max_frame) // no limit, so double it
+ more = transport->output_size;
+ else if (transport->remote_max_frame > transport->output_size)
+ more = transport->remote_max_frame - transport->output_size;
+ if (more) {
+ char *newbuf = (char *)malloc( transport->output_size + more );
+ if (newbuf) {
+ memmove( newbuf, transport->output_buf, transport->output_pending );
+ free( transport->output_buf );
+ transport->output_buf = newbuf;
+ transport->output_size += more;
+ space = more;
+ }
+ }
+ }
- while (size - total > 0) {
+ while (space > 0) {
ssize_t n;
- n = io_layer->process_output( io_layer, bytes + total, size - total);
+ n = io_layer->process_output( io_layer,
+ &transport->output_buf[transport->output_pending],
+ space );
if (n > 0) {
- total += n;
+ space -= n;
+ transport->output_pending += n;
} else if (n == 0) {
break;
} else {
- if (total > 0)
- break; // return what was output
+ if (transport->output_pending)
+ break; // return what is available
if (transport->disp->trace & (PN_TRACE_RAW | PN_TRACE_FRM)) {
if (n == PN_EOS)
pn_dispatcher_trace(transport->disp, 0, "-> EOS\n");
@@ -2558,11 +2629,23 @@ ssize_t pn_transport_output(pn_transport
return n;
}
}
+ return transport->output_pending;
+}
- transport->bytes_output += total;
- return total;
+// deprecated
+ssize_t pn_transport_output(pn_transport_t *transport, char *bytes, size_t size)
+{
+ if (!transport) return PN_ARG_ERR;
+ ssize_t available = pn_transport_pending(transport);
+ if (available > 0) {
+ available = (ssize_t) pn_min( (size_t)available, size );
+ memmove( bytes, pn_transport_peek(transport), available );
+ pn_transport_pop( transport, (size_t) available );
+ }
+ return available;
}
+
void pn_transport_trace(pn_transport_t *transport, pn_trace_t trace)
{
if (transport->sasl) pn_sasl_trace(transport->sasl, trace);
@@ -2577,6 +2660,7 @@ uint32_t pn_transport_get_max_frame(pn_t
void pn_transport_set_max_frame(pn_transport_t *transport, uint32_t size)
{
+ // if size == 0, no advertised limit to input frame size.
if (size && size < AMQP_MIN_MAX_FRAME_SIZE)
size = AMQP_MIN_MAX_FRAME_SIZE;
transport->local_max_frame = size;
@@ -2908,3 +2992,93 @@ pn_timestamp_t pn_io_layer_tick_passthru
return next->process_tick( next, now );
return 0;
}
+
+
+///
+
+// input
+ssize_t pn_transport_capacity(pn_transport_t *transport) /* <0 == done */
+{
+ if (pn_error_code(transport->error)) return pn_error_code(transport->error);
+
+ ssize_t capacity = transport->input_size - transport->input_pending;
+ if (!capacity) {
+ // can we expand the size of the input buffer?
+ int more = 0;
+ if (!transport->local_max_frame) { // no limit (ha!)
+ more = transport->input_size;
+ } else if (transport->local_max_frame > transport->input_size) {
+ more = transport->local_max_frame - transport->input_size;
+ }
+ if (more) {
+ char *newbuf = (char *) malloc( transport->input_size + more );
+ if (newbuf) {
+ memmove( newbuf, transport->input_buf, transport->input_pending );
+ free( transport->input_buf );
+ transport->input_buf = newbuf;
+ transport->input_size += more;
+ capacity = more;
+ }
+ }
+ }
+ return capacity;
+}
+
+
+char *pn_transport_buffer(pn_transport_t *transport)
+{
+ if (transport && transport->input_pending < transport->input_size) {
+ return &transport->input_buf[transport->input_pending];
+ }
+ return NULL;
+}
+
+ssize_t pn_transport_push(pn_transport_t *transport, size_t size)
+{
+ if (!transport) return PN_ARG_ERR;
+ size = pn_min( size, (transport->input_size - transport->input_pending) );
+ transport->input_pending += size;
+ transport->bytes_input += size;
+
+ ssize_t n = transport_consume( transport );
+ if (n < 0) return n;
+ return size;
+}
+
+// input stream has closed
+int pn_transport_push_eos(pn_transport_t *transport)
+{
+ transport->eos_pushed = true;
+ ssize_t x = transport_consume( transport );
+ if (x < 0) return (int) x;
+ return 0;
+ // @todo: what if not all input processed at this point? do we care???
+}
+
+// output
+ssize_t pn_transport_pending(pn_transport_t *transport) /* <0 == done */
+{
+ if (!transport) return PN_ARG_ERR;
+ return transport_produce( transport );
+}
+
+const char *pn_transport_peek(pn_transport_t *transport)
+{
+ if (transport && transport->output_pending) {
+ return transport->output_buf;
+ }
+ return NULL;
+}
+
+void pn_transport_pop(pn_transport_t *transport, size_t size)
+{
+ if (transport && size) {
+ assert( transport->output_pending >= size );
+ transport->output_pending -= size;
+ transport->bytes_output += size;
+ if (transport->output_pending) {
+ memmove( transport->output_buf, &transport->output_buf[size],
+ transport->output_pending );
+ }
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org