You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kg...@apache.org on 2013/02/15 18:22:12 UTC
svn commit: r1446697 - in /qpid/proton/trunk: ./ proton-c/include/proton/
proton-c/src/ proton-c/src/dispatcher/ proton-c/src/engine/
proton-c/src/posix/ proton-c/src/ssl/ proton-c/src/windows/
tests/python/proton_tests/
Author: kgiusti
Date: Fri Feb 15 17:22:11 2013
New Revision: 1446697
URL: http://svn.apache.org/r1446697
Log:
PROTON-222, PROTON-225, PROTON-235: merge from task branch kgiusti-proton-225
Modified:
qpid/proton/trunk/ (props changed)
qpid/proton/trunk/proton-c/include/proton/engine.h
qpid/proton/trunk/proton-c/src/dispatcher/dispatcher.c
qpid/proton/trunk/proton-c/src/engine/engine-internal.h
qpid/proton/trunk/proton-c/src/engine/engine.c
qpid/proton/trunk/proton-c/src/messenger.c
qpid/proton/trunk/proton-c/src/posix/driver.c
qpid/proton/trunk/proton-c/src/ssl/openssl.c
qpid/proton/trunk/proton-c/src/windows/driver.c
qpid/proton/trunk/tests/python/proton_tests/messenger.py
qpid/proton/trunk/tests/python/proton_tests/sasl.py
Propchange: qpid/proton/trunk/
------------------------------------------------------------------------------
Merged /qpid/proton/branches/kgiusti-proton-225:r1445892-1446694
Modified: qpid/proton/trunk/proton-c/include/proton/engine.h
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/include/proton/engine.h?rev=1446697&r1=1446696&r2=1446697&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/include/proton/engine.h (original)
+++ qpid/proton/trunk/proton-c/include/proton/engine.h Fri Feb 15 17:22:11 2013
@@ -284,8 +284,99 @@ PN_EXTERN void pn_connection_set_context
// transport
PN_EXTERN pn_error_t *pn_transport_error(pn_transport_t *transport);
+/* deprecated */
PN_EXTERN ssize_t pn_transport_input(pn_transport_t *transport, const char *bytes, size_t available);
+/* deprecated */
PN_EXTERN ssize_t pn_transport_output(pn_transport_t *transport, char *bytes, size_t size);
+
+/** Report the amount of free space for input following the
+ * transport's tail pointer. If the engine is in an exceptional state
+ * such as encountering an error condition or reaching the end of
+ * stream state, a negative value will be returned indicating the
+ * condition. If an error is indicated, futher details can be obtained
+ * from ::pn_transport_error. Calls to ::pn_transport_push may alter
+ * the value of this pointer. See ::pn_transport_push for details.
+ *
+ * @param[in] transport the transport
+ * @return the free space in the transport, PN_EOS or error code if < 0
+ */
+ssize_t pn_transport_capacity(pn_transport_t *transport);
+
+/** Return the transport's tail pointer. The amount of free space
+ * following this pointer is reported by ::pn_transport_capacity.
+ * Calls to ::pn_transport_push may alther the value of this pointer.
+ * See ::pn_transport_push for details.
+ *
+ * @param[in] transport the transport
+ * @return a pointer to the transport's input buffer, NULL if no capacity available.
+ */
+char *pn_transport_tail(pn_transport_t *transport);
+
+/** Push input data following the tail pointer into the transport.
+ * Calling this function will cause the transport to consume ::size
+ * bytes of input occupying the free space following the tail pointer.
+ * Calls to this function may change the value of ::pn_transport_tail,
+ * as well as the amount of free space reported by
+ * ::pn_transport_capacity.
+ *
+ * @param[in] transport the transport
+ * @param[size] the amount of data written to the transport's input buffer
+ * @return 0 on success, or error code if < 0
+ */
+int pn_transport_push(pn_transport_t *transport, size_t size);
+
+/** Indicate that the input has reached End Of Stream (EOS). This
+ * tells the transport that no more input will be forthcoming.
+ *
+ * @param[in] transport the transport
+ * @return 0 on success, or error code if < 0
+ */
+int pn_transport_close_tail(pn_transport_t *transport);
+
+/** Report the number of pending output bytes following the
+ * transport's head pointer. If the engine is in an exceptional state
+ * such as encountering an error condition or reaching the end of
+ * stream state, a negative value will be returned indicating the
+ * condition. If an error is indicated, further details can be
+ * obtained from ::pn_transport_error. Calls to ::pn_transport_pop may
+ * alter the value of this pointer. See ::pn_transport_pop for
+ * details.
+ *
+ * @param[in] the transport
+ * @return the number of pending output bytes, or an error code
+ */
+ssize_t pn_transport_pending(pn_transport_t *transport);
+
+/** Return the transport's head pointer. This pointer references
+ * queued output data. The ::pn_transport_pending function reports how
+ * many bytes of output data follow this pointer. Calls to
+ * ::pn_transport_pop may alter this pointer and any data it
+ * references. See ::pn_transport_pop for details.
+ *
+ * @param[in] the transport
+ * @return a pointer to the transport's output buffer, or NULL if no pending output.
+ */
+const char *pn_transport_head(pn_transport_t *transport);
+
+/** Removes ::size bytes of output from the pending output queue
+ * following the transport's head pointer. Calls to this function may
+ * alter the transport's head pointer as well as the number of pending
+ * bytes reported by ::pn_transport_pending.
+ *
+ * @param[in] the transport
+ * @param[size] the number of bytes to remove
+ */
+void pn_transport_pop(pn_transport_t *transport, size_t size);
+
+/** Indicate that the output has closed. This tells the transport
+ * that no more output will be popped.
+ *
+ * @param[in] transport the transport
+ * @return 0 on success, or error code if < 0
+ */
+int pn_transport_close_head(pn_transport_t *transport);
+
+
/** Process any pending transport timer events.
*
* This method should be called after all pending input has been processed by the
@@ -311,6 +402,7 @@ PN_EXTERN void pn_transport_set_idle_tim
PN_EXTERN pn_millis_t pn_transport_get_remote_idle_timeout(pn_transport_t *transport);
PN_EXTERN uint64_t pn_transport_get_frames_output(const pn_transport_t *transport);
PN_EXTERN uint64_t pn_transport_get_frames_input(const pn_transport_t *transport);
+PN_EXTERN bool pn_transport_quiesced(pn_transport_t *transport);
PN_EXTERN void pn_transport_free(pn_transport_t *transport);
// session
Modified: qpid/proton/trunk/proton-c/src/dispatcher/dispatcher.c
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/dispatcher/dispatcher.c?rev=1446697&r1=1446696&r2=1446697&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/dispatcher/dispatcher.c (original)
+++ qpid/proton/trunk/proton-c/src/dispatcher/dispatcher.c Fri Feb 15 17:22:11 2013
@@ -166,63 +166,26 @@ int pn_dispatch_frame(pn_dispatcher_t *d
ssize_t pn_dispatcher_input(pn_dispatcher_t *disp, const char *bytes, size_t available)
{
- size_t offered = available;
-
- if (offered == disp->fragment) {
- return 0;
- }
-
- size_t leftover = pn_buffer_size(disp->input);
- if (leftover) {
- int e = pn_buffer_append(disp->input, bytes, available);
- if (e) return e;
- pn_bytes_t b = pn_buffer_bytes(disp->input);
- bytes = b.start;
- available = b.size;
- }
-
size_t read = 0;
- bool fragment = false;
- while (!disp->halt) {
+ while (available && !disp->halt) {
pn_frame_t frame;
- size_t n = pn_read_frame(&frame, bytes + read, available - read);
+ size_t n = pn_read_frame(&frame, bytes + read, available);
if (n) {
+ read += n;
+ available -= n;
disp->input_frames_ct += 1;
int e = pn_dispatch_frame(disp, frame);
if (e) return e;
- read += n;
} else {
- if (leftover) {
- if (read > leftover) {
- pn_buffer_clear(disp->input);
- fragment = true;
- } else {
- read = available;
- }
- } else {
- if (!read) {
- int e = pn_buffer_append(disp->input, bytes + read, available - read);
- if (e) return e;
- read = available;
- } else {
- fragment = true;
- }
- }
break;
}
if (!disp->batch) break;
}
- size_t consumed = read - leftover;
- if (consumed && fragment) {
- disp->fragment = offered - consumed;
- } else {
- disp->fragment = 0;
- }
- return consumed;
+ return read;
}
int pn_scan_args(pn_dispatcher_t *disp, const char *fmt, ...)
Modified: qpid/proton/trunk/proton-c/src/engine/engine-internal.h
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/engine/engine-internal.h?rev=1446697&r1=1446696&r2=1446697&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/engine/engine-internal.h (original)
+++ qpid/proton/trunk/proton-c/src/engine/engine-internal.h Fri Feb 15 17:22:11 2013
@@ -113,6 +113,8 @@ typedef struct pn_io_layer_t {
ssize_t (*process_input)(struct pn_io_layer_t *io_layer, const char *, size_t);
ssize_t (*process_output)(struct pn_io_layer_t *io_layer, char *, size_t);
pn_timestamp_t (*process_tick)(struct pn_io_layer_t *io_layer, pn_timestamp_t);
+ size_t (*buffered_output)(struct pn_io_layer_t *); // how much output is held
+ size_t (*buffered_input)(struct pn_io_layer_t *); // how much input is held
} pn_io_layer_t;
struct pn_transport_t {
@@ -129,6 +131,8 @@ struct pn_transport_t {
char *remote_hostname;
pn_data_t *remote_offered_capabilities;
pn_data_t *remote_desired_capabilities;
+ //#define PN_DEFAULT_MAX_FRAME_SIZE (16*1024)
+#define PN_DEFAULT_MAX_FRAME_SIZE (0) /* for now, allow unlimited size */
uint32_t local_max_frame;
uint32_t remote_max_frame;
pn_condition_t remote_condition;
@@ -159,6 +163,18 @@ struct pn_transport_t {
/* statistics */
uint64_t bytes_input;
uint64_t bytes_output;
+
+ /* output buffered for send */
+ size_t output_size;
+ size_t output_pending;
+ char *output_buf;
+
+ /* input from peer */
+ size_t input_size;
+ size_t input_pending;
+ char *input_buf;
+ bool tail_closed; // input stream closed by driver
+
};
struct pn_connection_t {
Modified: qpid/proton/trunk/proton-c/src/engine/engine.c
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/engine/engine.c?rev=1446697&r1=1446696&r2=1446697&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/engine/engine.c (original)
+++ qpid/proton/trunk/proton-c/src/engine/engine.c Fri Feb 15 17:22:11 2013
@@ -33,6 +33,8 @@
#include "../ssl/ssl-internal.h"
#include "../platform_fmt.h"
+static ssize_t transport_consume(pn_transport_t *transport);
+
// delivery buffers
void pn_delivery_buffer_init(pn_delivery_buffer_t *db, pn_sequence_t next, size_t capacity)
@@ -245,6 +247,8 @@ void pn_transport_free(pn_transport_t *t
pn_condition_tini(&transport->remote_condition);
free(transport->sessions);
free(transport->channels);
+ free(transport->input_buf);
+ free(transport->output_buf);
free(transport);
}
@@ -748,6 +752,8 @@ void pn_transport_init(pn_transport_t *t
io_layer->process_input = pn_io_layer_input_passthru;
io_layer->process_output = pn_io_layer_output_passthru;
io_layer->process_tick = pn_io_layer_tick_passthru;
+ io_layer->buffered_output = NULL;
+ io_layer->buffered_input = NULL;
++io_layer;
}
@@ -756,6 +762,8 @@ void pn_transport_init(pn_transport_t *t
amqp->process_input = pn_input_read_amqp_header;
amqp->process_output = pn_output_write_amqp_header;
amqp->process_tick = pn_io_layer_tick_passthru;
+ amqp->buffered_output = NULL;
+ amqp->buffered_input = NULL;
amqp->next = NULL;
pn_dispatcher_action(transport->disp, OPEN, "OPEN", pn_do_open);
@@ -772,9 +780,10 @@ void pn_transport_init(pn_transport_t *t
transport->open_rcvd = false;
transport->close_sent = false;
transport->close_rcvd = false;
+ transport->tail_closed = false;
transport->remote_container = NULL;
transport->remote_hostname = NULL;
- transport->local_max_frame = 0;
+ transport->local_max_frame = PN_DEFAULT_MAX_FRAME_SIZE;
transport->remote_max_frame = 0;
transport->local_idle_timeout = 0;
transport->dead_remote_deadline = 0;
@@ -795,6 +804,9 @@ void pn_transport_init(pn_transport_t *t
transport->bytes_input = 0;
transport->bytes_output = 0;
+
+ transport->input_pending = 0;
+ transport->output_pending = 0;
}
pn_session_state_t *pn_session_get_state(pn_transport_t *transport, pn_session_t *ssn)
@@ -830,6 +842,19 @@ pn_transport_t *pn_transport()
{
pn_transport_t *transport = (pn_transport_t *) malloc(sizeof(pn_transport_t));
if (!transport) return NULL;
+ transport->output_size = PN_DEFAULT_MAX_FRAME_SIZE ? PN_DEFAULT_MAX_FRAME_SIZE : 16 * 1024;
+ transport->output_buf = (char *) malloc(transport->output_size);
+ if (!transport->output_buf) {
+ free(transport);
+ return NULL;
+ }
+ transport->input_size = PN_DEFAULT_MAX_FRAME_SIZE ? PN_DEFAULT_MAX_FRAME_SIZE : 16 * 1024;
+ transport->input_buf = (char *) malloc(transport->input_size);
+ if (!transport->input_buf) {
+ free(transport->output_buf);
+ free(transport);
+ return NULL;
+ }
transport->connection = NULL;
pn_transport_init(transport);
@@ -847,6 +872,7 @@ int pn_transport_bind(pn_transport_t *tr
PN_SET_REMOTE(connection->endpoint.state, PN_REMOTE_ACTIVE);
if (!pn_error_code(transport->error)) {
transport->disp->halt = false;
+ transport_consume(transport); // blech - testBindAfterOpen
}
}
return 0;
@@ -1825,21 +1851,46 @@ int pn_do_close(pn_dispatcher_t *disp)
return 0;
}
+// deprecated
ssize_t pn_transport_input(pn_transport_t *transport, const char *bytes, size_t available)
{
if (!transport) return PN_ARG_ERR;
+ if (available == 0) {
+ return pn_transport_close_tail(transport);
+ }
+ const size_t original = available;
+ ssize_t capacity = pn_transport_capacity(transport);
+ if (capacity < 0) return capacity;
+ while (available && capacity) {
+ char *dest = pn_transport_tail(transport);
+ assert(dest);
+ size_t count = pn_min( (size_t)capacity, available );
+ memmove( dest, bytes, count );
+ available -= count;
+ bytes += count;
+ int rc = pn_transport_push( transport, count );
+ if (rc < 0) return rc;
+ capacity = pn_transport_capacity(transport);
+ if (capacity < 0) return capacity;
+ }
+
+ return original - available;
+}
+// process pending input until none remaining or EOS
+static ssize_t transport_consume(pn_transport_t *transport)
+{
pn_io_layer_t *io_layer = transport->io_layers;
size_t consumed = 0;
- while (true) {
+ while (transport->input_pending || transport->tail_closed) {
ssize_t n;
- n = io_layer->process_input( io_layer, bytes + consumed, available - consumed);
+ n = io_layer->process_input( io_layer,
+ transport->input_buf + consumed,
+ transport->input_pending );
if (n > 0) {
consumed += n;
- if (consumed >= available) {
- break;
- }
+ transport->input_pending -= n;
} else if (n == 0) {
break;
} else {
@@ -1850,11 +1901,15 @@ ssize_t pn_transport_input(pn_transport_
}
if (transport->disp->trace & (PN_TRACE_RAW | PN_TRACE_FRM))
pn_dispatcher_trace(transport->disp, 0, "<- EOS\n");
+ transport->input_pending = 0; // XXX ???
return n;
}
}
- transport->bytes_input += consumed;
+ if (transport->input_pending && consumed) {
+ memmove( transport->input_buf, &transport->input_buf[consumed], transport->input_pending );
+ }
+
return consumed;
}
@@ -2531,23 +2586,43 @@ static ssize_t pn_output_write_amqp(pn_i
return pn_dispatcher_output(transport->disp, bytes, size);
}
-ssize_t pn_transport_output(pn_transport_t *transport, char *bytes, size_t size)
+// generate outbound data, return amount of pending output else error
+static ssize_t transport_produce(pn_transport_t *transport)
{
- if (!transport) return PN_ARG_ERR;
-
pn_io_layer_t *io_layer = transport->io_layers;
- size_t total = 0;
+ ssize_t space = transport->output_size - transport->output_pending;
- while (size - total > 0) {
+ if (space == 0) { // can we expand the buffer?
+ int more = 0;
+ if (!transport->remote_max_frame) // no limit, so double it
+ more = transport->output_size;
+ else if (transport->remote_max_frame > transport->output_size)
+ more = transport->remote_max_frame - transport->output_size;
+ if (more) {
+ char *newbuf = (char *)malloc( transport->output_size + more );
+ if (newbuf) {
+ memmove( newbuf, transport->output_buf, transport->output_pending );
+ free( transport->output_buf );
+ transport->output_buf = newbuf;
+ transport->output_size += more;
+ space = more;
+ }
+ }
+ }
+
+ while (space > 0) {
ssize_t n;
- n = io_layer->process_output( io_layer, bytes + total, size - total);
+ n = io_layer->process_output( io_layer,
+ &transport->output_buf[transport->output_pending],
+ space );
if (n > 0) {
- total += n;
+ space -= n;
+ transport->output_pending += n;
} else if (n == 0) {
break;
} else {
- if (total > 0)
- break; // return what was output
+ if (transport->output_pending)
+ break; // return what is available
if (transport->disp->trace & (PN_TRACE_RAW | PN_TRACE_FRM)) {
if (n == PN_EOS)
pn_dispatcher_trace(transport->disp, 0, "-> EOS\n");
@@ -2558,11 +2633,23 @@ ssize_t pn_transport_output(pn_transport
return n;
}
}
+ return transport->output_pending;
+}
- transport->bytes_output += total;
- return total;
+// deprecated
+ssize_t pn_transport_output(pn_transport_t *transport, char *bytes, size_t size)
+{
+ if (!transport) return PN_ARG_ERR;
+ ssize_t available = pn_transport_pending(transport);
+ if (available > 0) {
+ available = (ssize_t) pn_min( (size_t)available, size );
+ memmove( bytes, pn_transport_head(transport), available );
+ pn_transport_pop( transport, (size_t) available );
+ }
+ return available;
}
+
void pn_transport_trace(pn_transport_t *transport, pn_trace_t trace)
{
if (transport->sasl) pn_sasl_trace(transport->sasl, trace);
@@ -2577,6 +2664,7 @@ uint32_t pn_transport_get_max_frame(pn_t
void pn_transport_set_max_frame(pn_transport_t *transport, uint32_t size)
{
+ // if size == 0, no advertised limit to input frame size.
if (size && size < AMQP_MIN_MAX_FRAME_SIZE)
size = AMQP_MIN_MAX_FRAME_SIZE;
transport->local_max_frame = size;
@@ -2908,3 +2996,116 @@ pn_timestamp_t pn_io_layer_tick_passthru
return next->process_tick( next, now );
return 0;
}
+
+
+///
+
+// input
+ssize_t pn_transport_capacity(pn_transport_t *transport) /* <0 == done */
+{
+ if (pn_error_code(transport->error)) return pn_error_code(transport->error);
+
+ ssize_t capacity = transport->input_size - transport->input_pending;
+ if (!capacity) {
+ // can we expand the size of the input buffer?
+ int more = 0;
+ if (!transport->local_max_frame) { // no limit (ha!)
+ more = transport->input_size;
+ } else if (transport->local_max_frame > transport->input_size) {
+ more = transport->local_max_frame - transport->input_size;
+ }
+ if (more) {
+ char *newbuf = (char *) malloc( transport->input_size + more );
+ if (newbuf) {
+ memmove( newbuf, transport->input_buf, transport->input_pending );
+ free( transport->input_buf );
+ transport->input_buf = newbuf;
+ transport->input_size += more;
+ capacity = more;
+ }
+ }
+ }
+ return capacity;
+}
+
+
+char *pn_transport_tail(pn_transport_t *transport)
+{
+ if (transport && transport->input_pending < transport->input_size) {
+ return &transport->input_buf[transport->input_pending];
+ }
+ return NULL;
+}
+
+int pn_transport_push(pn_transport_t *transport, size_t size)
+{
+ if (!transport) return PN_ARG_ERR;
+ size = pn_min( size, (transport->input_size - transport->input_pending) );
+ transport->input_pending += size;
+ transport->bytes_input += size;
+
+ ssize_t n = transport_consume( transport );
+ if (n < 0) return n;
+ return size;
+}
+
+// input stream has closed
+int pn_transport_close_tail(pn_transport_t *transport)
+{
+ transport->tail_closed = true;
+ ssize_t x = transport_consume( transport );
+ if (x < 0) return (int) x;
+ return 0;
+ // XXX: what if not all input processed at this point? do we care???
+}
+
+// output
+ssize_t pn_transport_pending(pn_transport_t *transport) /* <0 == done */
+{
+ if (!transport) return PN_ARG_ERR;
+ return transport_produce( transport );
+}
+
+const char *pn_transport_head(pn_transport_t *transport)
+{
+ if (transport && transport->output_pending) {
+ return transport->output_buf;
+ }
+ return NULL;
+}
+
+void pn_transport_pop(pn_transport_t *transport, size_t size)
+{
+ if (transport && size) {
+ assert( transport->output_pending >= size );
+ transport->output_pending -= size;
+ transport->bytes_output += size;
+ if (transport->output_pending) {
+ memmove( transport->output_buf, &transport->output_buf[size],
+ transport->output_pending );
+ }
+ }
+}
+
+int pn_transport_close_head(pn_transport_t *transport)
+{
+ return 0;
+}
+
+
+// true if the transport will not generate further output
+bool pn_transport_quiesced(pn_transport_t *transport)
+{
+ if (!transport) return true;
+ ssize_t pending = pn_transport_pending(transport);
+ if (pending < 0) return true; // output done
+ else if (pending > 0) return false;
+ // no pending at transport, but check if data is buffered in I/O layers
+ pn_io_layer_t *io_layer = transport->io_layers;
+ while (io_layer != &transport->io_layers[PN_IO_LAYER_CT]) {
+ if (io_layer->buffered_output && io_layer->buffered_output( io_layer ))
+ return false;
+ ++io_layer;
+ }
+ return true;
+}
Modified: qpid/proton/trunk/proton-c/src/messenger.c
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/messenger.c?rev=1446697&r1=1446696&r2=1446697&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/messenger.c (original)
+++ qpid/proton/trunk/proton-c/src/messenger.c Fri Feb 15 17:22:11 2013
@@ -1046,10 +1046,19 @@ int pn_messenger_settle(pn_messenger_t *
return pn_queue_update(queue, pn_tracker_sequence(tracker), (pn_status_t) 0, flags, true, true);
}
+// true if all pending output has been sent to peer
bool pn_messenger_sent(pn_messenger_t *messenger)
{
pn_connector_t *ctor = pn_connector_head(messenger->driver);
while (ctor) {
+
+ // check if transport is done generating output
+ pn_transport_t *transport = pn_connector_transport(ctor);
+ if (transport) {
+ if (!pn_transport_quiesced(transport))
+ return false;
+ }
+
pn_connection_t *conn = pn_connector_connection(ctor);
pn_link_t *link = pn_link_head(conn, PN_LOCAL_ACTIVE);
Modified: qpid/proton/trunk/proton-c/src/posix/driver.c
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/posix/driver.c?rev=1446697&r1=1446696&r2=1446697&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/posix/driver.c (original)
+++ qpid/proton/trunk/proton-c/src/posix/driver.c Fri Feb 15 17:22:11 2013
@@ -103,7 +103,6 @@ struct pn_listener_t {
void *context;
};
-#define IO_BUF_SIZE (64*1024)
#define PN_NAME_MAX (256)
struct pn_connector_t {
@@ -120,13 +119,7 @@ struct pn_connector_t {
pn_trace_t trace;
bool closed;
pn_timestamp_t wakeup;
- void (*read)(pn_connector_t *);
- void (*write) (pn_connector_t *);
- size_t input_size;
- char input[IO_BUF_SIZE];
bool input_eos;
- size_t output_size;
- char output[IO_BUF_SIZE];
pn_connection_t *connection;
pn_transport_t *transport;
pn_sasl_t *sasl;
@@ -378,9 +371,6 @@ pn_connector_t *pn_connector(pn_driver_t
return c;
}
-static void pn_connector_read(pn_connector_t *ctor);
-static void pn_connector_write(pn_connector_t *ctor);
-
pn_connector_t *pn_connector_fd(pn_driver_t *driver, int fd, void *context)
{
if (!driver) return NULL;
@@ -400,11 +390,7 @@ pn_connector_t *pn_connector_fd(pn_drive
c->trace = driver->trace;
c->closed = false;
c->wakeup = 0;
- c->read = pn_connector_read;
- c->write = pn_connector_write;
- c->input_size = 0;
c->input_eos = false;
- c->output_size = 0;
c->connection = NULL;
c->transport = pn_transport();
c->sasl = pn_sasl(c->transport);
@@ -503,74 +489,6 @@ void pn_connector_free(pn_connector_t *c
free(ctor);
}
-static void pn_connector_read(pn_connector_t *ctor)
-{
- ssize_t n = recv(ctor->fd, ctor->input + ctor->input_size, IO_BUF_SIZE - ctor->input_size, 0);
- if (n < 0) {
- if (errno != EAGAIN) {
- if (n < 0) perror("read");
- ctor->status &= ~PN_SEL_RD;
- ctor->input_eos = true;
- }
- } else if (n == 0) {
- ctor->status &= ~PN_SEL_RD;
- ctor->input_eos = true;
- } else {
- ctor->input_size += n;
- }
-}
-
-static void pn_connector_consume(pn_connector_t *ctor, int n)
-{
- ctor->input_size -= n;
- memmove(ctor->input, ctor->input + n, ctor->input_size);
-}
-
-static void pn_connector_process_input(pn_connector_t *ctor)
-{
- pn_transport_t *transport = ctor->transport;
- if (!ctor->input_done) {
- if (ctor->input_size > 0 || ctor->input_eos) {
- ssize_t n = pn_transport_input(transport, ctor->input, ctor->input_size);
- if (n >= 0) {
- pn_connector_consume(ctor, n);
- } else {
- pn_connector_consume(ctor, ctor->input_size);
- ctor->input_done = true;
- }
- }
- }
-}
-
-static char *pn_connector_output(pn_connector_t *ctor)
-{
- return ctor->output + ctor->output_size;
-}
-
-static size_t pn_connector_available(pn_connector_t *ctor)
-{
- return IO_BUF_SIZE - ctor->output_size;
-}
-
-static void pn_connector_process_output(pn_connector_t *ctor)
-{
- pn_transport_t *transport = ctor->transport;
- if (!ctor->output_done) {
- ssize_t n = pn_transport_output(transport, pn_connector_output(ctor),
- pn_connector_available(ctor));
- if (n >= 0) {
- ctor->output_size += n;
- } else {
- ctor->output_done = true;
- }
- }
-
- if (ctor->output_size) {
- ctor->status |= PN_SEL_WR;
- }
-}
-
-
void pn_connector_activate(pn_connector_t *ctor, pn_activate_criteria_t crit)
{
switch (crit) {
@@ -606,28 +524,6 @@ bool pn_connector_activated(pn_connector
return result;
}
-
-static void pn_connector_write(pn_connector_t *ctor)
-{
- if (ctor->output_size > 0) {
- ssize_t n = pn_send(ctor->fd, ctor->output, ctor->output_size);
- if (n < 0) {
- // XXX
- if (errno != EAGAIN) {
- perror("send");
- ctor->output_size = 0;
- ctor->output_done = true;
- }
- } else {
- ctor->output_size -= n;
- memmove(ctor->output, ctor->output + n, ctor->output_size);
- }
- }
-
- if (!ctor->output_size)
- ctor->status &= ~PN_SEL_WR;
-}
-
static pn_timestamp_t pn_connector_tick(pn_connector_t *ctor, time_t now)
{
if (!ctor->transport) return 0;
@@ -639,21 +535,79 @@ void pn_connector_process(pn_connector_t
if (c) {
if (c->closed) return;
- if (c->pending_read) {
- c->read(c);
- c->pending_read = false;
+ pn_transport_t *transport = c->transport;
+
+ ///
+ /// Socket read
+ ///
+ if (!c->input_done) {
+ ssize_t capacity = pn_transport_capacity(transport);
+ if (capacity > 0) {
+ c->status |= PN_SEL_RD;
+ if (c->pending_read) {
+ c->pending_read = false;
+ ssize_t n = recv(c->fd, pn_transport_tail(transport), capacity, 0);
+ if (n < 0) {
+ if (errno != EAGAIN) {
+ perror("read");
+ c->status &= ~PN_SEL_RD;
+ c->input_eos = true;
+ }
+ } else {
+ if (n == 0) {
+ c->status &= ~PN_SEL_RD;
+ c->input_eos = true;
+ }
+ if (pn_transport_push(transport, (size_t) n) < 0) {
+ c->status &= ~PN_SEL_RD;
+ c->input_done = true;
+ }
+ }
+ }
+ } else if (capacity < 0) {
+ c->status &= ~PN_SEL_RD;
+ c->input_done = true;
+ }
}
- pn_connector_process_input(c);
+ ///
+ /// Event wakeup
+ ///
c->wakeup = pn_connector_tick(c, pn_i_now());
- pn_connector_process_output(c);
- if (c->pending_write) {
- c->write(c);
- c->pending_write = false;
- pn_connector_process_output(c); // XXX: review this - there's a better way to determine if the WR flag should be re-set
+ ///
+ /// Socket write
+ ///
+ if (!c->output_done) {
+ ssize_t pending = pn_transport_pending(transport);
+ if (pending > 0) {
+ c->status |= PN_SEL_WR;
+ if (c->pending_write) {
+ c->pending_write = false;
+ ssize_t n = pn_send(c->fd, pn_transport_head(transport), pending);
+ if (n < 0) {
+ // XXX
+ if (errno != EAGAIN) {
+ perror("send");
+ c->output_done = true;
+ c->status &= ~PN_SEL_WR;
+ }
+ } else if (n) {
+ pn_transport_pop(transport, (size_t) n);
+ pending -= n;
+ if (pending == 0)
+ c->status &= ~PN_SEL_WR;
+ }
+ }
+ } else if (pending < 0) {
+ c->output_done = true;
+ c->status &= ~PN_SEL_WR;
+ }
}
- if (c->output_size == 0 && c->input_done && c->output_done) {
+
+ // Closed?
+
+ if (c->input_done && c->output_done) {
if (c->trace & (PN_TRACE_FRM | PN_TRACE_RAW | PN_TRACE_DRV)) {
fprintf(stderr, "Closed %s\n", c->name);
}
@@ -879,8 +833,7 @@ pn_connector_t *pn_driver_connector(pn_d
pn_connector_t *c = d->connector_next;
d->connector_next = c->connector_next;
- if (c->closed || c->pending_read || c->pending_write || c->pending_tick ||
- c->input_size || c->input_eos) {
+ if (c->closed || c->pending_read || c->pending_write || c->pending_tick || c->input_eos) {
return c;
}
}
Modified: qpid/proton/trunk/proton-c/src/ssl/openssl.c
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/ssl/openssl.c?rev=1446697&r1=1446696&r2=1446697&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/ssl/openssl.c (original)
+++ qpid/proton/trunk/proton-c/src/ssl/openssl.c Fri Feb 15 17:22:11 2013
@@ -91,9 +91,11 @@ struct pn_ssl_t {
// buffers for holding I/O from "applications" above SSL
#define APP_BUF_SIZE (4*1024)
- char outbuf[APP_BUF_SIZE];
+ char *outbuf;
+ size_t out_size;
size_t out_count;
- char inbuf[APP_BUF_SIZE];
+ char *inbuf;
+ size_t in_size;
size_t in_count;
pn_trace_t trace;
@@ -125,7 +127,8 @@ static int init_ssl_socket( pn_ssl_t * )
static void release_ssl_socket( pn_ssl_t * );
static pn_ssl_session_t *ssn_cache_find( pn_ssl_domain_t *, const char * );
static void ssl_session_free( pn_ssl_session_t *);
-
+static size_t buffered_output( pn_io_layer_t *io_layer );
+static size_t buffered_input( pn_io_layer_t *io_layer );
// @todo: used to avoid littering the code with calls to printf...
static void _log_error(const char *fmt, ...)
@@ -703,7 +706,8 @@ void pn_ssl_free( pn_ssl_t *ssl)
if (ssl->domain) pn_ssl_domain_free(ssl->domain);
if (ssl->session_id) free((void *)ssl->session_id);
if (ssl->peer_hostname) free((void *)ssl->peer_hostname);
-
+ if (ssl->inbuf) free((void *)ssl->inbuf);
+ if (ssl->outbuf) free((void *)ssl->outbuf);
free(ssl);
}
@@ -714,6 +718,21 @@ pn_ssl_t *pn_ssl(pn_transport_t *transpo
pn_ssl_t *ssl = (pn_ssl_t *) calloc(1, sizeof(pn_ssl_t));
if (!ssl) return NULL;
+ ssl->out_size = APP_BUF_SIZE;
+ uint32_t max_frame = pn_transport_get_max_frame(transport);
+ ssl->in_size = max_frame ? max_frame : APP_BUF_SIZE;
+ ssl->outbuf = (char *)malloc(ssl->out_size);
+ if (!ssl->outbuf) {
+ free(ssl);
+ return NULL;
+ }
+ ssl->inbuf = (char *)malloc(ssl->in_size);
+ if (!ssl->inbuf) {
+ free(ssl->outbuf);
+ free(ssl);
+ return NULL;
+ }
+
ssl->transport = transport;
transport->ssl = ssl;
@@ -722,6 +741,8 @@ pn_ssl_t *pn_ssl(pn_transport_t *transpo
ssl->io_layer->process_input = pn_io_layer_input_passthru;
ssl->io_layer->process_output = pn_io_layer_output_passthru;
ssl->io_layer->process_tick = pn_io_layer_tick_passthru;
+ ssl->io_layer->buffered_output = buffered_output;
+ ssl->io_layer->buffered_input = buffered_input;
ssl->trace = (transport->disp) ? transport->disp->trace : PN_TRACE_OFF;
@@ -814,8 +835,8 @@ static ssize_t process_input_ssl( pn_io_
// Read all available data from the SSL socket
- if (!ssl->ssl_closed && ssl->in_count < APP_BUF_SIZE) {
- int read = BIO_read( ssl->bio_ssl, &ssl->inbuf[ssl->in_count], APP_BUF_SIZE - ssl->in_count );
+ if (!ssl->ssl_closed && ssl->in_count < ssl->in_size) {
+ int read = BIO_read( ssl->bio_ssl, &ssl->inbuf[ssl->in_count], ssl->in_size - ssl->in_count );
if (read > 0) {
_log( ssl, "Read %d bytes from SSL socket for app\n", read );
_log_clear_data( ssl, &ssl->inbuf[ssl->in_count], read );
@@ -860,15 +881,39 @@ static ssize_t process_input_ssl( pn_io_
data += consumed;
work_pending = true;
_log( ssl, "Application consumed %d bytes from peer\n", (int) consumed );
+ } else if (consumed < 0) {
+ _log(ssl, "Application layer closed its input, error=%d (discarding %d bytes)\n",
+ (int) consumed, (int)ssl->in_count);
+ ssl->in_count = 0; // discard any pending input
+ ssl->app_input_closed = consumed;
+ if (ssl->app_output_closed && ssl->out_count == 0) {
+ // both sides of app closed, and no more app output pending:
+ start_ssl_shutdown(ssl);
+ }
} else {
- if (consumed < 0) {
- _log(ssl, "Application layer closed its input, error=%d (discarding %d bytes)\n",
- (int) consumed, (int)ssl->in_count);
- ssl->in_count = 0; // discard any pending input
- ssl->app_input_closed = consumed;
- if (ssl->app_output_closed && ssl->out_count == 0) {
- // both sides of app closed, and no more app output pending:
- start_ssl_shutdown(ssl);
+ // app did not consume any bytes, must be waiting for a full frame
+ if (ssl->in_count == ssl->in_size) {
+ // but the buffer is full, not enough room for a full frame.
+ // can we grow the buffer?
+ uint32_t max_frame = pn_transport_get_max_frame(ssl->transport);
+ if (!max_frame) max_frame = ssl->in_size * 2; // no limit
+ if (ssl->in_size < max_frame) {
+ // no max frame limit - grow it.
+ char *newbuf = (char *)malloc( max_frame );
+ if (newbuf) {
+ ssl->in_size *= max_frame;
+ memmove( newbuf, ssl->inbuf, ssl->in_count );
+ free( ssl->inbuf );
+ ssl->inbuf = newbuf;
+ }
+ work_pending = true; // can we get more input?
+ } else {
+ // can't gather any more input, but app needs more?
+ // This is a bug - since SSL can buffer up to max-frame,
+ // the application _must_ have enough data to process. If
+ // this is an oversized frame, the app _must_ handle it
+ // by returning an error code to SSL.
+ _log_error("Error: application unable to consume input.\n");
}
}
}
@@ -913,9 +958,9 @@ static ssize_t process_output_ssl( pn_io
work_pending = false;
// first, get any pending application output, if possible
- if (!ssl->app_output_closed && ssl->out_count < APP_BUF_SIZE) {
+ if (!ssl->app_output_closed && ssl->out_count < ssl->out_size) {
pn_io_layer_t *io_next = ssl->io_layer->next;
- ssize_t app_bytes = io_next->process_output( io_next, &ssl->outbuf[ssl->out_count], APP_BUF_SIZE - ssl->out_count);
+ ssize_t app_bytes = io_next->process_output( io_next, &ssl->outbuf[ssl->out_count], ssl->out_size - ssl->out_count);
if (app_bytes > 0) {
ssl->out_count += app_bytes;
work_pending = true;
@@ -1235,3 +1280,31 @@ static ssize_t process_output_done(pn_io
{
return PN_EOS;
}
+
+// return # output bytes sitting in this layer
+static size_t buffered_output(pn_io_layer_t *io_layer)
+{
+ size_t count = 0;
+ pn_ssl_t *ssl = (pn_ssl_t *)io_layer->context;
+ if (ssl) {
+ count += ssl->out_count;
+ if (ssl->bio_net_io) { // pick up any bytes waiting for network io
+ count += BIO_ctrl_pending(ssl->bio_net_io);
+ }
+ }
+ return count;
+}
+
+// return # input bytes sitting in this layer
+static size_t buffered_input( pn_io_layer_t *io_layer )
+{
+ size_t count = 0;
+ pn_ssl_t *ssl = (pn_ssl_t *)io_layer->context;
+ if (ssl) {
+ count += ssl->in_count;
+ if (ssl->bio_ssl) { // pick up any bytes waiting to be read
+ count += BIO_ctrl_pending(ssl->bio_ssl);
+ }
+ }
+ return count;
+}
Modified: qpid/proton/trunk/proton-c/src/windows/driver.c
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/windows/driver.c?rev=1446697&r1=1446696&r2=1446697&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/windows/driver.c (original)
+++ qpid/proton/trunk/proton-c/src/windows/driver.c Fri Feb 15 17:22:11 2013
@@ -145,7 +145,6 @@ struct pn_listener_t {
void *context;
};
-#define IO_BUF_SIZE (64*1024)
#define PN_NAME_MAX (256)
struct pn_connector_t {
@@ -162,13 +161,7 @@ struct pn_connector_t {
pn_trace_t trace;
bool closed;
pn_timestamp_t wakeup;
- void (*read)(pn_connector_t *);
- void (*write) (pn_connector_t *);
- size_t input_size;
- char input[IO_BUF_SIZE];
bool input_eos;
- size_t output_size;
- char output[IO_BUF_SIZE];
pn_connection_t *connection;
pn_transport_t *transport;
pn_sasl_t *sasl;
@@ -442,11 +435,7 @@ pn_connector_t *pn_connector_fd(pn_drive
c->trace = driver->trace;
c->closed = false;
c->wakeup = 0;
- c->read = pn_connector_read;
- c->write = pn_connector_write;
- c->input_size = 0;
c->input_eos = false;
- c->output_size = 0;
c->connection = NULL;
c->transport = pn_transport();
c->sasl = pn_sasl(c->transport);
@@ -545,74 +534,6 @@ void pn_connector_free(pn_connector_t *c
free(ctor);
}
-static void pn_connector_read(pn_connector_t *ctor)
-{
- ssize_t n = recv(ctor->fd, ctor->input + ctor->input_size, IO_BUF_SIZE - ctor->input_size, 0);
- if (n < 0) {
- if (errno != EAGAIN) {
- if (n < 0) perror("read");
- ctor->status &= ~PN_SEL_RD;
- ctor->input_eos = true;
- }
- } else if (n == 0) {
- ctor->status &= ~PN_SEL_RD;
- ctor->input_eos = true;
- } else {
- ctor->input_size += n;
- }
-}
-
-static void pn_connector_consume(pn_connector_t *ctor, int n)
-{
- ctor->input_size -= n;
- memmove(ctor->input, ctor->input + n, ctor->input_size);
-}
-
-static void pn_connector_process_input(pn_connector_t *ctor)
-{
- pn_transport_t *transport = ctor->transport;
- if (!ctor->input_done) {
- if (ctor->input_size > 0 || ctor->input_eos) {
- ssize_t n = pn_transport_input(transport, ctor->input, ctor->input_size);
- if (n >= 0) {
- pn_connector_consume(ctor, n);
- } else {
- pn_connector_consume(ctor, ctor->input_size);
- ctor->input_done = true;
- }
- }
- }
-}
-
-static char *pn_connector_output(pn_connector_t *ctor)
-{
- return ctor->output + ctor->output_size;
-}
-
-static size_t pn_connector_available(pn_connector_t *ctor)
-{
- return IO_BUF_SIZE - ctor->output_size;
-}
-
-static void pn_connector_process_output(pn_connector_t *ctor)
-{
- pn_transport_t *transport = ctor->transport;
- if (!ctor->output_done) {
- ssize_t n = pn_transport_output(transport, pn_connector_output(ctor),
- pn_connector_available(ctor));
- if (n >= 0) {
- ctor->output_size += n;
- } else {
- ctor->output_done = true;
- }
- }
-
- if (ctor->output_size) {
- ctor->status |= PN_SEL_WR;
- }
-}
-
-
void pn_connector_activate(pn_connector_t *ctor, pn_activate_criteria_t crit)
{
switch (crit) {
@@ -648,28 +569,6 @@ bool pn_connector_activated(pn_connector
return result;
}
-
-static void pn_connector_write(pn_connector_t *ctor)
-{
- if (ctor->output_size > 0) {
- ssize_t n = pn_send(ctor->fd, ctor->output, ctor->output_size);
- if (n < 0) {
- // XXX
- if (errno != EAGAIN) {
- perror("send");
- ctor->output_size = 0;
- ctor->output_done = true;
- }
- } else {
- ctor->output_size -= n;
- memmove(ctor->output, ctor->output + n, ctor->output_size);
- }
- }
-
- if (!ctor->output_size)
- ctor->status &= ~PN_SEL_WR;
-}
-
static pn_timestamp_t pn_connector_tick(pn_connector_t *ctor, time_t now)
{
if (!ctor->transport) return 0;
@@ -681,21 +580,80 @@ void pn_connector_process(pn_connector_t
if (c) {
if (c->closed) return;
- if (c->pending_read) {
- c->read(c);
- c->pending_read = false;
+ pn_transport_t *transport = c->transport;
+
+ ///
+ /// Socket read
+ ///
+ if (!c->input_done) {
+ ssize_t capacity = pn_transport_capacity(transport);
+ if (capacity > 0) {
+ c->status |= PN_SEL_RD;
+ if (c->pending_read) {
+ c->pending_read = false;
+ ssize_t n = recv(c->fd, pn_transport_tail(transport),
+ capacity, 0);
+ if (n < 0) {
+ if (errno != EAGAIN) {
+ perror("read");
+ c->status &= ~PN_SEL_RD;
+ c->input_eos = true;
+ }
+ } else {
+ if (n == 0) {
+ c->status &= ~PN_SEL_RD;
+ c->input_eos = true;
+ }
+ if (pn_transport_push(transport, (size_t) n) < 0) {
+ c->status &= ~PN_SEL_RD;
+ c->input_done = true;
+ }
+ }
+ }
+ } else if (capacity < 0) {
+ c->status &= ~PN_SEL_RD;
+ c->input_done = true;
+ }
}
- pn_connector_process_input(c);
+ ///
+ /// Event wakeup
+ ///
c->wakeup = pn_connector_tick(c, pn_i_now());
- pn_connector_process_output(c);
- if (c->pending_write) {
- c->write(c);
- c->pending_write = false;
- pn_connector_process_output(c); // XXX: review this - there's a better way to determine if the WR flag should be re-set
+ ///
+ /// Socket write
+ ///
+ if (!c->output_done) {
+ ssize_t pending = pn_transport_pending(transport);
+ if (pending > 0) {
+ c->status |= PN_SEL_WR;
+ if (c->pending_write) {
+ c->pending_write = false;
+ ssize_t n = pn_send(c->fd, pn_transport_head(transport), pending);
+ if (n < 0) {
+ // XXX
+ if (errno != EAGAIN) {
+ perror("send");
+ c->output_done = true;
+ c->status &= ~PN_SEL_WR;
+ }
+ } else if (n) {
+ pn_transport_pop(transport, (size_t) n);
+ pending -= n;
+ if (pending == 0)
+ c->status &= ~PN_SEL_WR;
+ }
+ }
+ } else if (pending < 0) {
+ c->output_done = true;
+ c->status &= ~PN_SEL_WR;
+ }
}
- if (c->output_size == 0 && c->input_done && c->output_done) {
+
+ // Closed?
+
+ if (c->input_done && c->output_done) {
if (c->trace & (PN_TRACE_FRM | PN_TRACE_RAW | PN_TRACE_DRV)) {
fprintf(stderr, "Closed %s\n", c->name);
}
@@ -960,8 +918,7 @@ pn_connector_t *pn_driver_connector(pn_d
pn_connector_t *c = d->connector_next;
d->connector_next = c->connector_next;
- if (c->closed || c->pending_read || c->pending_write || c->pending_tick ||
- c->input_size || c->input_eos) {
+ if (c->closed || c->pending_read || c->pending_write || c->pending_tick || c->input_eos) {
return c;
}
}
Modified: qpid/proton/trunk/tests/python/proton_tests/messenger.py
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/tests/python/proton_tests/messenger.py?rev=1446697&r1=1446696&r2=1446697&view=diff
==============================================================================
--- qpid/proton/trunk/tests/python/proton_tests/messenger.py (original)
+++ qpid/proton/trunk/tests/python/proton_tests/messenger.py Fri Feb 15 17:22:11 2013
@@ -20,10 +20,12 @@
import os, common
from proton import *
from threading import Thread
+from time import sleep, time
class Test(common.Test):
def setup(self):
+ self.server_received = 0;
self.server = Messenger("server")
self.server.timeout=10000
self.server.start()
@@ -69,6 +71,7 @@ class MessengerTest(Test):
def process_incoming(self, msg):
while self.server.incoming:
self.server.get(msg)
+ self.server_received += 1
if msg.body == REJECT_ME:
self.server.reject()
else:
@@ -250,3 +253,19 @@ class MessengerTest(Test):
remaining -= 1
for t in trackers:
assert self.client.status(t) is ACCEPTED, (t, self.client.status(t))
+
+ def test_proton222(self):
+ self.start()
+ msg = Message()
+ msg.address="amqp://0.0.0.0:12345"
+ msg.subject="Hello World!"
+ msg.load("First the world, then the galaxy!")
+ assert self.server_received == 0
+ self.client.put(msg)
+ self.client.send()
+ # ensure the server got the message without requiring client to stop first
+ deadline = time() + 10
+ while self.server_received == 0:
+ assert time() < deadline, "Server did not receive message!"
+ sleep(.1)
+ assert self.server_received == 1
Modified: qpid/proton/trunk/tests/python/proton_tests/sasl.py
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/tests/python/proton_tests/sasl.py?rev=1446697&r1=1446696&r2=1446697&view=diff
==============================================================================
--- qpid/proton/trunk/tests/python/proton_tests/sasl.py (original)
+++ qpid/proton/trunk/tests/python/proton_tests/sasl.py Fri Feb 15 17:22:11 2013
@@ -106,3 +106,27 @@ class SaslTest(Test):
out1 = self.t1.output(1024)
assert len(out1) > 0
+
+ def testFracturedSASL(self):
+ """ PROTON-235
+ """
+ self.s1.mechanisms("ANONYMOUS")
+ self.s1.client()
+ assert self.s1.outcome is None
+
+ # self.t1.trace(Transport.TRACE_FRM)
+
+ out = self.t1.output(1024)
+ self.t1.input("AMQP\x03\x01\x00\x00")
+ out = self.t1.output(1024)
+ self.t1.input("\x00\x00\x00")
+ out = self.t1.output(1024)
+ self.t1.input("A\x02\x01\x00\x00\x00S@\xc04\x01\xe01\x06\xa3\x06GSSAPI\x05PLAIN\x0aDIGEST-MD5\x08AMQPLAIN\x08CRAM-MD5\x04NTLM")
+ out = self.t1.output(1024)
+ self.t1.input("\x00\x00\x00\x10\x02\x01\x00\x00\x00SD\xc0\x03\x01P\x00")
+ out = self.t1.output(1024)
+ while out:
+ out = self.t1.output(1024)
+
+ assert self.s1.outcome == SASL.OK
+
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org