You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kg...@apache.org on 2013/02/15 18:22:12 UTC

svn commit: r1446697 - in /qpid/proton/trunk: ./ proton-c/include/proton/ proton-c/src/ proton-c/src/dispatcher/ proton-c/src/engine/ proton-c/src/posix/ proton-c/src/ssl/ proton-c/src/windows/ tests/python/proton_tests/

Author: kgiusti
Date: Fri Feb 15 17:22:11 2013
New Revision: 1446697

URL: http://svn.apache.org/r1446697
Log:
PROTON-222, PROTON-225, PROTON-235: merge from task branch kgiusti-proton-225

Modified:
    qpid/proton/trunk/   (props changed)
    qpid/proton/trunk/proton-c/include/proton/engine.h
    qpid/proton/trunk/proton-c/src/dispatcher/dispatcher.c
    qpid/proton/trunk/proton-c/src/engine/engine-internal.h
    qpid/proton/trunk/proton-c/src/engine/engine.c
    qpid/proton/trunk/proton-c/src/messenger.c
    qpid/proton/trunk/proton-c/src/posix/driver.c
    qpid/proton/trunk/proton-c/src/ssl/openssl.c
    qpid/proton/trunk/proton-c/src/windows/driver.c
    qpid/proton/trunk/tests/python/proton_tests/messenger.py
    qpid/proton/trunk/tests/python/proton_tests/sasl.py

Propchange: qpid/proton/trunk/
------------------------------------------------------------------------------
  Merged /qpid/proton/branches/kgiusti-proton-225:r1445892-1446694

Modified: qpid/proton/trunk/proton-c/include/proton/engine.h
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/include/proton/engine.h?rev=1446697&r1=1446696&r2=1446697&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/include/proton/engine.h (original)
+++ qpid/proton/trunk/proton-c/include/proton/engine.h Fri Feb 15 17:22:11 2013
@@ -284,8 +284,99 @@ PN_EXTERN void pn_connection_set_context
 
 // transport
 PN_EXTERN pn_error_t *pn_transport_error(pn_transport_t *transport);
+/* deprecated */
 PN_EXTERN ssize_t pn_transport_input(pn_transport_t *transport, const char *bytes, size_t available);
+/* deprecated */
 PN_EXTERN ssize_t pn_transport_output(pn_transport_t *transport, char *bytes, size_t size);
+
+/** Report the amount of free space for input following the
+ * transport's tail pointer. If the engine is in an exceptional state
+ * such as encountering an error condition or reaching the end of
+ * stream state, a negative value will be returned indicating the
+ * condition. If an error is indicated, futher details can be obtained
+ * from ::pn_transport_error. Calls to ::pn_transport_push may alter
+ * the value of this pointer. See ::pn_transport_push for details.
+ *
+ * @param[in] transport the transport
+ * @return the free space in the transport, PN_EOS or error code if < 0
+ */
+ssize_t pn_transport_capacity(pn_transport_t *transport);
+
+/** Return the transport's tail pointer. The amount of free space
+ * following this pointer is reported by ::pn_transport_capacity.
+ * Calls to ::pn_transport_push may alther the value of this pointer.
+ * See ::pn_transport_push for details.
+ *
+ * @param[in] transport the transport
+ * @return a pointer to the transport's input buffer, NULL if no capacity available.
+ */
+char *pn_transport_tail(pn_transport_t *transport);
+
+/** Push input data following the tail pointer into the transport.
+ * Calling this function will cause the transport to consume ::size
+ * bytes of input occupying the free space following the tail pointer.
+ * Calls to this function may change the value of ::pn_transport_tail,
+ * as well as the amount of free space reported by
+ * ::pn_transport_capacity.
+ *
+ * @param[in] transport the transport
+ * @param[size] the amount of data written to the transport's input buffer
+ * @return 0 on success, or error code if < 0
+ */
+int pn_transport_push(pn_transport_t *transport, size_t size);
+
+/** Indicate that the input has reached End Of Stream (EOS).  This
+ * tells the transport that no more input will be forthcoming.
+ *
+ * @param[in] transport the transport
+ * @return 0 on success, or error code if < 0
+ */
+int pn_transport_close_tail(pn_transport_t *transport);
+
+/** Report the number of pending output bytes following the
+ * transport's head pointer. If the engine is in an exceptional state
+ * such as encountering an error condition or reaching the end of
+ * stream state, a negative value will be returned indicating the
+ * condition. If an error is indicated, further details can be
+ * obtained from ::pn_transport_error. Calls to ::pn_transport_pop may
+ * alter the value of this pointer. See ::pn_transport_pop for
+ * details.
+ *
+ * @param[in] the transport
+ * @return the number of pending output bytes, or an error code
+ */
+ssize_t pn_transport_pending(pn_transport_t *transport);
+
+/** Return the transport's head pointer. This pointer references
+ * queued output data. The ::pn_transport_pending function reports how
+ * many bytes of output data follow this pointer. Calls to
+ * ::pn_transport_pop may alter this pointer and any data it
+ * references. See ::pn_transport_pop for details.
+ *
+ * @param[in] the transport
+ * @return a pointer to the transport's output buffer, or NULL if no pending output.
+ */
+const char *pn_transport_head(pn_transport_t *transport);
+
+/** Removes ::size bytes of output from the pending output queue
+ * following the transport's head pointer. Calls to this function may
+ * alter the transport's head pointer as well as the number of pending
+ * bytes reported by ::pn_transport_pending.
+ *
+ * @param[in] the transport
+ * @param[size] the number of bytes to remove
+ */
+void pn_transport_pop(pn_transport_t *transport, size_t size);
+
+/** Indicate that the output has closed.  This tells the transport
+ * that no more output will be popped.
+ *
+ * @param[in] transport the transport
+ * @return 0 on success, or error code if < 0
+ */
+int pn_transport_close_head(pn_transport_t *transport);
+
+
 /** Process any pending transport timer events.
  *
  * This method should be called after all pending input has been processed by the
@@ -311,6 +402,7 @@ PN_EXTERN void pn_transport_set_idle_tim
 PN_EXTERN pn_millis_t pn_transport_get_remote_idle_timeout(pn_transport_t *transport);
 PN_EXTERN uint64_t pn_transport_get_frames_output(const pn_transport_t *transport);
 PN_EXTERN uint64_t pn_transport_get_frames_input(const pn_transport_t *transport);
+PN_EXTERN bool pn_transport_quiesced(pn_transport_t *transport);
 PN_EXTERN void pn_transport_free(pn_transport_t *transport);
 
 // session

Modified: qpid/proton/trunk/proton-c/src/dispatcher/dispatcher.c
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/dispatcher/dispatcher.c?rev=1446697&r1=1446696&r2=1446697&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/dispatcher/dispatcher.c (original)
+++ qpid/proton/trunk/proton-c/src/dispatcher/dispatcher.c Fri Feb 15 17:22:11 2013
@@ -166,63 +166,26 @@ int pn_dispatch_frame(pn_dispatcher_t *d
 
 ssize_t pn_dispatcher_input(pn_dispatcher_t *disp, const char *bytes, size_t available)
 {
-  size_t offered = available;
-
-  if (offered == disp->fragment) {
-    return 0;
-  }
-
-  size_t leftover = pn_buffer_size(disp->input);
-  if (leftover) {
-    int e = pn_buffer_append(disp->input, bytes, available);
-    if (e) return e;
-    pn_bytes_t b = pn_buffer_bytes(disp->input);
-    bytes = b.start;
-    available = b.size;
-  }
-
   size_t read = 0;
-  bool fragment = false;
 
-  while (!disp->halt) {
+  while (available && !disp->halt) {
     pn_frame_t frame;
 
-    size_t n = pn_read_frame(&frame, bytes + read, available - read);
+    size_t n = pn_read_frame(&frame, bytes + read, available);
     if (n) {
+      read += n;
+      available -= n;
       disp->input_frames_ct += 1;
       int e = pn_dispatch_frame(disp, frame);
       if (e) return e;
-      read += n;
     } else {
-      if (leftover) {
-        if (read > leftover) {
-          pn_buffer_clear(disp->input);
-          fragment = true;
-        } else {
-          read = available;
-        }
-      } else {
-        if (!read) {
-          int e = pn_buffer_append(disp->input, bytes + read, available - read);
-          if (e) return e;
-          read = available;
-        } else {
-          fragment = true;
-        }
-      }
       break;
     }
 
     if (!disp->batch) break;
   }
 
-  size_t consumed = read - leftover;
-  if (consumed && fragment) {
-    disp->fragment = offered - consumed;
-  } else {
-    disp->fragment = 0;
-  }
-  return consumed;
+  return read;
 }
 
 int pn_scan_args(pn_dispatcher_t *disp, const char *fmt, ...)

Modified: qpid/proton/trunk/proton-c/src/engine/engine-internal.h
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/engine/engine-internal.h?rev=1446697&r1=1446696&r2=1446697&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/engine/engine-internal.h (original)
+++ qpid/proton/trunk/proton-c/src/engine/engine-internal.h Fri Feb 15 17:22:11 2013
@@ -113,6 +113,8 @@ typedef struct pn_io_layer_t {
   ssize_t (*process_input)(struct pn_io_layer_t *io_layer, const char *, size_t);
   ssize_t (*process_output)(struct pn_io_layer_t *io_layer, char *, size_t);
   pn_timestamp_t (*process_tick)(struct pn_io_layer_t *io_layer, pn_timestamp_t);
+  size_t (*buffered_output)(struct pn_io_layer_t *);  // how much output is held
+  size_t (*buffered_input)(struct pn_io_layer_t *);   // how much input is held
 } pn_io_layer_t;
 
 struct pn_transport_t {
@@ -129,6 +131,8 @@ struct pn_transport_t {
   char *remote_hostname;
   pn_data_t *remote_offered_capabilities;
   pn_data_t *remote_desired_capabilities;
+  //#define PN_DEFAULT_MAX_FRAME_SIZE (16*1024)
+#define PN_DEFAULT_MAX_FRAME_SIZE (0)  /* for now, allow unlimited size */
   uint32_t   local_max_frame;
   uint32_t   remote_max_frame;
   pn_condition_t remote_condition;
@@ -159,6 +163,18 @@ struct pn_transport_t {
   /* statistics */
   uint64_t bytes_input;
   uint64_t bytes_output;
+
+  /* output buffered for send */
+  size_t output_size;
+  size_t output_pending;
+  char *output_buf;
+
+  /* input from peer */
+  size_t input_size;
+  size_t input_pending;
+  char *input_buf;
+  bool tail_closed;      // input stream closed by driver
+
 };
 
 struct pn_connection_t {

Modified: qpid/proton/trunk/proton-c/src/engine/engine.c
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/engine/engine.c?rev=1446697&r1=1446696&r2=1446697&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/engine/engine.c (original)
+++ qpid/proton/trunk/proton-c/src/engine/engine.c Fri Feb 15 17:22:11 2013
@@ -33,6 +33,8 @@
 #include "../ssl/ssl-internal.h"
 #include "../platform_fmt.h"
 
+static ssize_t transport_consume(pn_transport_t *transport);
+
 // delivery buffers
 
 void pn_delivery_buffer_init(pn_delivery_buffer_t *db, pn_sequence_t next, size_t capacity)
@@ -245,6 +247,8 @@ void pn_transport_free(pn_transport_t *t
   pn_condition_tini(&transport->remote_condition);
   free(transport->sessions);
   free(transport->channels);
+  free(transport->input_buf);
+  free(transport->output_buf);
   free(transport);
 }
 
@@ -748,6 +752,8 @@ void pn_transport_init(pn_transport_t *t
     io_layer->process_input = pn_io_layer_input_passthru;
     io_layer->process_output = pn_io_layer_output_passthru;
     io_layer->process_tick = pn_io_layer_tick_passthru;
+    io_layer->buffered_output = NULL;
+    io_layer->buffered_input = NULL;
     ++io_layer;
   }
 
@@ -756,6 +762,8 @@ void pn_transport_init(pn_transport_t *t
   amqp->process_input = pn_input_read_amqp_header;
   amqp->process_output = pn_output_write_amqp_header;
   amqp->process_tick = pn_io_layer_tick_passthru;
+  amqp->buffered_output = NULL;
+  amqp->buffered_input = NULL;
   amqp->next = NULL;
 
   pn_dispatcher_action(transport->disp, OPEN, "OPEN", pn_do_open);
@@ -772,9 +780,10 @@ void pn_transport_init(pn_transport_t *t
   transport->open_rcvd = false;
   transport->close_sent = false;
   transport->close_rcvd = false;
+  transport->tail_closed = false;
   transport->remote_container = NULL;
   transport->remote_hostname = NULL;
-  transport->local_max_frame = 0;
+  transport->local_max_frame = PN_DEFAULT_MAX_FRAME_SIZE;
   transport->remote_max_frame = 0;
   transport->local_idle_timeout = 0;
   transport->dead_remote_deadline = 0;
@@ -795,6 +804,9 @@ void pn_transport_init(pn_transport_t *t
 
   transport->bytes_input = 0;
   transport->bytes_output = 0;
+
+  transport->input_pending = 0;
+  transport->output_pending = 0;
 }
 
 pn_session_state_t *pn_session_get_state(pn_transport_t *transport, pn_session_t *ssn)
@@ -830,6 +842,19 @@ pn_transport_t *pn_transport()
 {
   pn_transport_t *transport = (pn_transport_t *) malloc(sizeof(pn_transport_t));
   if (!transport) return NULL;
+  transport->output_size = PN_DEFAULT_MAX_FRAME_SIZE ? PN_DEFAULT_MAX_FRAME_SIZE : 16 * 1024;
+  transport->output_buf = (char *) malloc(transport->output_size);
+  if (!transport->output_buf) {
+    free(transport);
+    return NULL;
+  }
+  transport->input_size =  PN_DEFAULT_MAX_FRAME_SIZE ? PN_DEFAULT_MAX_FRAME_SIZE : 16 * 1024;
+  transport->input_buf = (char *) malloc(transport->input_size);
+  if (!transport->input_buf) {
+    free(transport->output_buf);
+    free(transport);
+    return NULL;
+  }
 
   transport->connection = NULL;
   pn_transport_init(transport);
@@ -847,6 +872,7 @@ int pn_transport_bind(pn_transport_t *tr
     PN_SET_REMOTE(connection->endpoint.state, PN_REMOTE_ACTIVE);
     if (!pn_error_code(transport->error)) {
       transport->disp->halt = false;
+      transport_consume(transport);        // blech - testBindAfterOpen
     }
   }
   return 0;
@@ -1825,21 +1851,46 @@ int pn_do_close(pn_dispatcher_t *disp)
   return 0;
 }
 
+// deprecated
 ssize_t pn_transport_input(pn_transport_t *transport, const char *bytes, size_t available)
 {
   if (!transport) return PN_ARG_ERR;
+  if (available == 0) {
+    return pn_transport_close_tail(transport);
+  }
+  const size_t original = available;
+  ssize_t capacity = pn_transport_capacity(transport);
+  if (capacity < 0) return capacity;
+  while (available && capacity) {
+    char *dest = pn_transport_tail(transport);
+    assert(dest);
+    size_t count = pn_min( (size_t)capacity, available );
+    memmove( dest, bytes, count );
+    available -= count;
+    bytes += count;
+    int rc = pn_transport_push( transport, count );
+    if (rc < 0) return rc;
+    capacity = pn_transport_capacity(transport);
+    if (capacity < 0) return capacity;
+  }
+
+  return original - available;
+}
 
+// process pending input until none remaining or EOS
+static ssize_t transport_consume(pn_transport_t *transport)
+{
   pn_io_layer_t *io_layer = transport->io_layers;
   size_t consumed = 0;
 
-  while (true) {
+  while (transport->input_pending || transport->tail_closed) {
     ssize_t n;
-    n = io_layer->process_input( io_layer, bytes + consumed, available - consumed);
+    n = io_layer->process_input( io_layer,
+                                 transport->input_buf + consumed,
+                                 transport->input_pending );
     if (n > 0) {
       consumed += n;
-      if (consumed >= available) {
-        break;
-      }
+      transport->input_pending -= n;
     } else if (n == 0) {
       break;
     } else {
@@ -1850,11 +1901,15 @@ ssize_t pn_transport_input(pn_transport_
       }
       if (transport->disp->trace & (PN_TRACE_RAW | PN_TRACE_FRM))
         pn_dispatcher_trace(transport->disp, 0, "<- EOS\n");
+      transport->input_pending = 0;  // XXX ???
       return n;
     }
   }
 
-  transport->bytes_input += consumed;
+  if (transport->input_pending && consumed) {
+    memmove( transport->input_buf,  &transport->input_buf[consumed], transport->input_pending );
+  }
+
   return consumed;
 }
 
@@ -2531,23 +2586,43 @@ static ssize_t pn_output_write_amqp(pn_i
   return pn_dispatcher_output(transport->disp, bytes, size);
 }
 
-ssize_t pn_transport_output(pn_transport_t *transport, char *bytes, size_t size)
+// generate outbound data, return amount of pending output else error
+static ssize_t transport_produce(pn_transport_t *transport)
 {
-  if (!transport) return PN_ARG_ERR;
-
   pn_io_layer_t *io_layer = transport->io_layers;
-  size_t total = 0;
+  ssize_t space = transport->output_size - transport->output_pending;
 
-  while (size - total > 0) {
+  if (space == 0) {     // can we expand the buffer?
+    int more = 0;
+    if (!transport->remote_max_frame)   // no limit, so double it
+      more = transport->output_size;
+    else if (transport->remote_max_frame > transport->output_size)
+      more = transport->remote_max_frame - transport->output_size;
+    if (more) {
+      char *newbuf = (char *)malloc( transport->output_size + more );
+      if (newbuf) {
+        memmove( newbuf, transport->output_buf, transport->output_pending );
+        free( transport->output_buf );
+        transport->output_buf = newbuf;
+        transport->output_size += more;
+        space = more;
+      }
+    }
+  }
+
+  while (space > 0) {
     ssize_t n;
-    n = io_layer->process_output( io_layer, bytes + total, size - total);
+    n = io_layer->process_output( io_layer,
+                                  &transport->output_buf[transport->output_pending],
+                                  space );
     if (n > 0) {
-      total += n;
+      space -= n;
+      transport->output_pending += n;
     } else if (n == 0) {
       break;
     } else {
-      if (total > 0)
-        break;   // return what was output
+      if (transport->output_pending)
+        break;   // return what is available
       if (transport->disp->trace & (PN_TRACE_RAW | PN_TRACE_FRM)) {
         if (n == PN_EOS)
           pn_dispatcher_trace(transport->disp, 0, "-> EOS\n");
@@ -2558,11 +2633,23 @@ ssize_t pn_transport_output(pn_transport
       return n;
     }
   }
+  return transport->output_pending;
+}
 
-  transport->bytes_output += total;
-  return total;
+// deprecated
+ssize_t pn_transport_output(pn_transport_t *transport, char *bytes, size_t size)
+{
+  if (!transport) return PN_ARG_ERR;
+  ssize_t available = pn_transport_pending(transport);
+  if (available > 0) {
+    available = (ssize_t) pn_min( (size_t)available, size );
+    memmove( bytes, pn_transport_head(transport), available );
+    pn_transport_pop( transport, (size_t) available );
+  }
+  return available;
 }
 
+
 void pn_transport_trace(pn_transport_t *transport, pn_trace_t trace)
 {
   if (transport->sasl) pn_sasl_trace(transport->sasl, trace);
@@ -2577,6 +2664,7 @@ uint32_t pn_transport_get_max_frame(pn_t
 
 void pn_transport_set_max_frame(pn_transport_t *transport, uint32_t size)
 {
+  // if size == 0, no advertised limit to input frame size.
   if (size && size < AMQP_MIN_MAX_FRAME_SIZE)
     size = AMQP_MIN_MAX_FRAME_SIZE;
   transport->local_max_frame = size;
@@ -2908,3 +2996,116 @@ pn_timestamp_t pn_io_layer_tick_passthru
     return next->process_tick( next, now );
   return 0;
 }
+
+
+///
+
+// input
+ssize_t pn_transport_capacity(pn_transport_t *transport)  /* <0 == done */
+{
+  if (pn_error_code(transport->error)) return pn_error_code(transport->error);
+
+  ssize_t capacity = transport->input_size - transport->input_pending;
+  if (!capacity) {
+    // can we expand the size of the input buffer?
+    int more = 0;
+    if (!transport->local_max_frame) {  // no limit (ha!)
+      more = transport->input_size;
+    } else if (transport->local_max_frame > transport->input_size) {
+      more = transport->local_max_frame - transport->input_size;
+    }
+    if (more) {
+      char *newbuf = (char *) malloc( transport->input_size + more );
+      if (newbuf) {
+        memmove( newbuf, transport->input_buf, transport->input_pending );
+        free( transport->input_buf );
+        transport->input_buf = newbuf;
+        transport->input_size += more;
+        capacity = more;
+      }
+    }
+  }
+  return capacity;
+}
+
+
+char *pn_transport_tail(pn_transport_t *transport)
+{
+  if (transport && transport->input_pending < transport->input_size) {
+    return &transport->input_buf[transport->input_pending];
+  }
+  return NULL;
+}
+
+int pn_transport_push(pn_transport_t *transport, size_t size)
+{
+  if (!transport) return PN_ARG_ERR;
+  size = pn_min( size, (transport->input_size - transport->input_pending) );
+  transport->input_pending += size;
+  transport->bytes_input += size;
+
+  ssize_t n = transport_consume( transport );
+  if (n < 0) return n;
+  return size;
+}
+
+// input stream has closed
+int pn_transport_close_tail(pn_transport_t *transport)
+{
+  transport->tail_closed = true;
+  ssize_t x = transport_consume( transport );
+  if (x < 0) return (int) x;
+  return 0;
+  // XXX: what if not all input processed at this point?  do we care???
+}
+
+// output
+ssize_t pn_transport_pending(pn_transport_t *transport)      /* <0 == done */
+{
+  if (!transport) return PN_ARG_ERR;
+  return transport_produce( transport );
+}
+
+const char *pn_transport_head(pn_transport_t *transport)
+{
+  if (transport && transport->output_pending) {
+    return transport->output_buf;
+  }
+  return NULL;
+}
+
+void pn_transport_pop(pn_transport_t *transport, size_t size)
+{
+  if (transport && size) {
+    assert( transport->output_pending >= size );
+    transport->output_pending -= size;
+    transport->bytes_output += size;
+    if (transport->output_pending) {
+      memmove( transport->output_buf,  &transport->output_buf[size],
+               transport->output_pending );
+    }
+  }
+}
+
+int pn_transport_close_head(pn_transport_t *transport)
+{
+  return 0;
+}
+
+
+// true if the transport will not generate further output
+bool pn_transport_quiesced(pn_transport_t *transport)
+{
+  if (!transport) return true;
+  ssize_t pending = pn_transport_pending(transport);
+  if (pending < 0) return true; // output done
+  else if (pending > 0) return false;
+  // no pending at transport, but check if data is buffered in I/O layers
+  pn_io_layer_t *io_layer = transport->io_layers;
+  while (io_layer != &transport->io_layers[PN_IO_LAYER_CT]) {
+    if (io_layer->buffered_output && io_layer->buffered_output( io_layer ))
+      return false;
+    ++io_layer;
+  }
+  return true;
+}

Modified: qpid/proton/trunk/proton-c/src/messenger.c
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/messenger.c?rev=1446697&r1=1446696&r2=1446697&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/messenger.c (original)
+++ qpid/proton/trunk/proton-c/src/messenger.c Fri Feb 15 17:22:11 2013
@@ -1046,10 +1046,19 @@ int pn_messenger_settle(pn_messenger_t *
   return pn_queue_update(queue, pn_tracker_sequence(tracker), (pn_status_t) 0, flags, true, true);
 }
 
+// true if all pending output has been sent to peer
 bool pn_messenger_sent(pn_messenger_t *messenger)
 {
   pn_connector_t *ctor = pn_connector_head(messenger->driver);
   while (ctor) {
+
+    // check if transport is done generating output
+    pn_transport_t *transport = pn_connector_transport(ctor);
+    if (transport) {
+      if (!pn_transport_quiesced(transport))
+        return false;
+    }
+
     pn_connection_t *conn = pn_connector_connection(ctor);
 
     pn_link_t *link = pn_link_head(conn, PN_LOCAL_ACTIVE);

Modified: qpid/proton/trunk/proton-c/src/posix/driver.c
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/posix/driver.c?rev=1446697&r1=1446696&r2=1446697&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/posix/driver.c (original)
+++ qpid/proton/trunk/proton-c/src/posix/driver.c Fri Feb 15 17:22:11 2013
@@ -103,7 +103,6 @@ struct pn_listener_t {
   void *context;
 };
 
-#define IO_BUF_SIZE (64*1024)
 #define PN_NAME_MAX (256)
 
 struct pn_connector_t {
@@ -120,13 +119,7 @@ struct pn_connector_t {
   pn_trace_t trace;
   bool closed;
   pn_timestamp_t wakeup;
-  void (*read)(pn_connector_t *);
-  void (*write) (pn_connector_t *);
-  size_t input_size;
-  char input[IO_BUF_SIZE];
   bool input_eos;
-  size_t output_size;
-  char output[IO_BUF_SIZE];
   pn_connection_t *connection;
   pn_transport_t *transport;
   pn_sasl_t *sasl;
@@ -378,9 +371,6 @@ pn_connector_t *pn_connector(pn_driver_t
   return c;
 }
 
-static void pn_connector_read(pn_connector_t *ctor);
-static void pn_connector_write(pn_connector_t *ctor);
-
 pn_connector_t *pn_connector_fd(pn_driver_t *driver, int fd, void *context)
 {
   if (!driver) return NULL;
@@ -400,11 +390,7 @@ pn_connector_t *pn_connector_fd(pn_drive
   c->trace = driver->trace;
   c->closed = false;
   c->wakeup = 0;
-  c->read = pn_connector_read;
-  c->write = pn_connector_write;
-  c->input_size = 0;
   c->input_eos = false;
-  c->output_size = 0;
   c->connection = NULL;
   c->transport = pn_transport();
   c->sasl = pn_sasl(c->transport);
@@ -503,74 +489,6 @@ void pn_connector_free(pn_connector_t *c
   free(ctor);
 }
 
-static void pn_connector_read(pn_connector_t *ctor)
-{
-  ssize_t n = recv(ctor->fd, ctor->input + ctor->input_size, IO_BUF_SIZE - ctor->input_size, 0);
-  if (n < 0) {
-      if (errno != EAGAIN) {
-          if (n < 0) perror("read");
-          ctor->status &= ~PN_SEL_RD;
-          ctor->input_eos = true;
-      }
-  } else if (n == 0) {
-    ctor->status &= ~PN_SEL_RD;
-    ctor->input_eos = true;
-  } else {
-    ctor->input_size += n;
-  }
-}
-
-static void pn_connector_consume(pn_connector_t *ctor, int n)
-{
-  ctor->input_size -= n;
-  memmove(ctor->input, ctor->input + n, ctor->input_size);
-}
-
-static void pn_connector_process_input(pn_connector_t *ctor)
-{
-  pn_transport_t *transport = ctor->transport;
-  if (!ctor->input_done) {
-    if (ctor->input_size > 0 || ctor->input_eos) {
-      ssize_t n = pn_transport_input(transport, ctor->input, ctor->input_size);
-      if (n >= 0) {
-        pn_connector_consume(ctor, n);
-      } else {
-        pn_connector_consume(ctor, ctor->input_size);
-        ctor->input_done = true;
-      }
-    }
-  }
-}
-
-static char *pn_connector_output(pn_connector_t *ctor)
-{
-  return ctor->output + ctor->output_size;
-}
-
-static size_t pn_connector_available(pn_connector_t *ctor)
-{
-  return IO_BUF_SIZE - ctor->output_size;
-}
-
-static void pn_connector_process_output(pn_connector_t *ctor)
-{
-  pn_transport_t *transport = ctor->transport;
-  if (!ctor->output_done) {
-    ssize_t n = pn_transport_output(transport, pn_connector_output(ctor),
-                                    pn_connector_available(ctor));
-    if (n >= 0) {
-      ctor->output_size += n;
-    } else {
-      ctor->output_done = true;
-    }
-  }
-
-  if (ctor->output_size) {
-    ctor->status |= PN_SEL_WR;
-  }
-}
-
-
 void pn_connector_activate(pn_connector_t *ctor, pn_activate_criteria_t crit)
 {
     switch (crit) {
@@ -606,28 +524,6 @@ bool pn_connector_activated(pn_connector
     return result;
 }
 
-
-static void pn_connector_write(pn_connector_t *ctor)
-{
-  if (ctor->output_size > 0) {
-    ssize_t n = pn_send(ctor->fd, ctor->output, ctor->output_size);
-    if (n < 0) {
-      // XXX
-        if (errno != EAGAIN) {
-            perror("send");
-            ctor->output_size = 0;
-            ctor->output_done = true;
-        }
-    } else {
-      ctor->output_size -= n;
-      memmove(ctor->output, ctor->output + n, ctor->output_size);
-    }
-  }
-
-  if (!ctor->output_size)
-    ctor->status &= ~PN_SEL_WR;
-}
-
 static pn_timestamp_t pn_connector_tick(pn_connector_t *ctor, time_t now)
 {
   if (!ctor->transport) return 0;
@@ -639,21 +535,79 @@ void pn_connector_process(pn_connector_t
   if (c) {
     if (c->closed) return;
 
-    if (c->pending_read) {
-      c->read(c);
-      c->pending_read = false;
+    pn_transport_t *transport = c->transport;
+
+    ///
+    /// Socket read
+    ///
+    if (!c->input_done) {
+      ssize_t capacity = pn_transport_capacity(transport);
+      if (capacity > 0) {
+        c->status |= PN_SEL_RD;
+        if (c->pending_read) {
+          c->pending_read = false;
+          ssize_t n =  recv(c->fd, pn_transport_tail(transport), capacity, 0);
+          if (n < 0) {
+            if (errno != EAGAIN) {
+              perror("read");
+              c->status &= ~PN_SEL_RD;
+              c->input_eos = true;
+            }
+          } else {
+            if (n == 0) {
+              c->status &= ~PN_SEL_RD;
+              c->input_eos = true;
+            }
+            if (pn_transport_push(transport, (size_t) n) < 0) {
+              c->status &= ~PN_SEL_RD;
+              c->input_done = true;
+            }
+          }
+        }
+      } else if (capacity < 0) {
+        c->status &= ~PN_SEL_RD;
+        c->input_done = true;
+      }
     }
-    pn_connector_process_input(c);
 
+    ///
+    /// Event wakeup
+    ///
     c->wakeup = pn_connector_tick(c, pn_i_now());
 
-    pn_connector_process_output(c);
-    if (c->pending_write) {
-      c->write(c);
-      c->pending_write = false;
-      pn_connector_process_output(c);  // XXX: review this - there's a better way to determine if the WR flag should be re-set
+    ///
+    /// Socket write
+    ///
+    if (!c->output_done) {
+      ssize_t pending = pn_transport_pending(transport);
+      if (pending > 0) {
+        c->status |= PN_SEL_WR;
+        if (c->pending_write) {
+          c->pending_write = false;
+          ssize_t n = pn_send(c->fd, pn_transport_head(transport), pending);
+          if (n < 0) {
+            // XXX
+            if (errno != EAGAIN) {
+              perror("send");
+              c->output_done = true;
+              c->status &= ~PN_SEL_WR;
+            }
+          } else if (n) {
+            pn_transport_pop(transport, (size_t) n);
+            pending -= n;
+            if (pending == 0)
+              c->status &= ~PN_SEL_WR;
+          }
+        }
+      } else if (pending < 0) {
+        c->output_done = true;
+        c->status &= ~PN_SEL_WR;
+      }
     }
-    if (c->output_size == 0 && c->input_done && c->output_done) {
+
+    // Closed?
+
+    if (c->input_done && c->output_done) {
       if (c->trace & (PN_TRACE_FRM | PN_TRACE_RAW | PN_TRACE_DRV)) {
         fprintf(stderr, "Closed %s\n", c->name);
       }
@@ -879,8 +833,7 @@ pn_connector_t *pn_driver_connector(pn_d
     pn_connector_t *c = d->connector_next;
     d->connector_next = c->connector_next;
 
-    if (c->closed || c->pending_read || c->pending_write || c->pending_tick ||
-        c->input_size || c->input_eos) {
+    if (c->closed || c->pending_read || c->pending_write || c->pending_tick || c->input_eos) {
       return c;
     }
   }

Modified: qpid/proton/trunk/proton-c/src/ssl/openssl.c
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/ssl/openssl.c?rev=1446697&r1=1446696&r2=1446697&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/ssl/openssl.c (original)
+++ qpid/proton/trunk/proton-c/src/ssl/openssl.c Fri Feb 15 17:22:11 2013
@@ -91,9 +91,11 @@ struct pn_ssl_t {
 
   // buffers for holding I/O from "applications" above SSL
 #define APP_BUF_SIZE    (4*1024)
-  char outbuf[APP_BUF_SIZE];
+  char *outbuf;
+  size_t out_size;
   size_t out_count;
-  char inbuf[APP_BUF_SIZE];
+  char *inbuf;
+  size_t in_size;
   size_t in_count;
 
   pn_trace_t trace;
@@ -125,7 +127,8 @@ static int init_ssl_socket( pn_ssl_t * )
 static void release_ssl_socket( pn_ssl_t * );
 static pn_ssl_session_t *ssn_cache_find( pn_ssl_domain_t *, const char * );
 static void ssl_session_free( pn_ssl_session_t *);
-
+static size_t buffered_output( pn_io_layer_t *io_layer );
+static size_t buffered_input( pn_io_layer_t *io_layer );
 
 // @todo: used to avoid littering the code with calls to printf...
 static void _log_error(const char *fmt, ...)
@@ -703,7 +706,8 @@ void pn_ssl_free( pn_ssl_t *ssl)
   if (ssl->domain) pn_ssl_domain_free(ssl->domain);
   if (ssl->session_id) free((void *)ssl->session_id);
   if (ssl->peer_hostname) free((void *)ssl->peer_hostname);
-
+  if (ssl->inbuf) free((void *)ssl->inbuf);
+  if (ssl->outbuf) free((void *)ssl->outbuf);
   free(ssl);
 }
 
@@ -714,6 +718,21 @@ pn_ssl_t *pn_ssl(pn_transport_t *transpo
 
   pn_ssl_t *ssl = (pn_ssl_t *) calloc(1, sizeof(pn_ssl_t));
   if (!ssl) return NULL;
+  ssl->out_size = APP_BUF_SIZE;
+  uint32_t max_frame = pn_transport_get_max_frame(transport);
+  ssl->in_size =  max_frame ? max_frame : APP_BUF_SIZE;
+  ssl->outbuf = (char *)malloc(ssl->out_size);
+  if (!ssl->outbuf) {
+    free(ssl);
+    return NULL;
+  }
+  ssl->inbuf =  (char *)malloc(ssl->in_size);
+  if (!ssl->inbuf) {
+    free(ssl->outbuf);
+    free(ssl);
+    return NULL;
+  }
+
   ssl->transport = transport;
   transport->ssl = ssl;
 
@@ -722,6 +741,8 @@ pn_ssl_t *pn_ssl(pn_transport_t *transpo
   ssl->io_layer->process_input = pn_io_layer_input_passthru;
   ssl->io_layer->process_output = pn_io_layer_output_passthru;
   ssl->io_layer->process_tick = pn_io_layer_tick_passthru;
+  ssl->io_layer->buffered_output = buffered_output;
+  ssl->io_layer->buffered_input = buffered_input;
 
   ssl->trace = (transport->disp) ? transport->disp->trace : PN_TRACE_OFF;
 
@@ -814,8 +835,8 @@ static ssize_t process_input_ssl( pn_io_
 
     // Read all available data from the SSL socket
 
-    if (!ssl->ssl_closed && ssl->in_count < APP_BUF_SIZE) {
-      int read = BIO_read( ssl->bio_ssl, &ssl->inbuf[ssl->in_count], APP_BUF_SIZE - ssl->in_count );
+    if (!ssl->ssl_closed && ssl->in_count < ssl->in_size) {
+      int read = BIO_read( ssl->bio_ssl, &ssl->inbuf[ssl->in_count], ssl->in_size - ssl->in_count );
       if (read > 0) {
         _log( ssl, "Read %d bytes from SSL socket for app\n", read );
         _log_clear_data( ssl, &ssl->inbuf[ssl->in_count], read );
@@ -860,15 +881,39 @@ static ssize_t process_input_ssl( pn_io_
           data += consumed;
           work_pending = true;
           _log( ssl, "Application consumed %d bytes from peer\n", (int) consumed );
+        } else if (consumed < 0) {
+          _log(ssl, "Application layer closed its input, error=%d (discarding %d bytes)\n",
+               (int) consumed, (int)ssl->in_count);
+          ssl->in_count = 0;    // discard any pending input
+          ssl->app_input_closed = consumed;
+          if (ssl->app_output_closed && ssl->out_count == 0) {
+            // both sides of app closed, and no more app output pending:
+            start_ssl_shutdown(ssl);
+          }
         } else {
-          if (consumed < 0) {
-            _log(ssl, "Application layer closed its input, error=%d (discarding %d bytes)\n",
-                 (int) consumed, (int)ssl->in_count);
-            ssl->in_count = 0;    // discard any pending input
-            ssl->app_input_closed = consumed;
-            if (ssl->app_output_closed && ssl->out_count == 0) {
-              // both sides of app closed, and no more app output pending:
-              start_ssl_shutdown(ssl);
+          // app did not consume any bytes, must be waiting for a full frame
+          if (ssl->in_count == ssl->in_size) {
+            // but the buffer is full, not enough room for a full frame.
+            // can we grow the buffer?
+            uint32_t max_frame = pn_transport_get_max_frame(ssl->transport);
+            if (!max_frame) max_frame = ssl->in_size * 2;  // no limit
+            if (ssl->in_size < max_frame) {
+              // no max frame limit - grow it.
+              char *newbuf = (char *)malloc( max_frame );
+              if (newbuf) {
+                ssl->in_size *= max_frame;
+                memmove( newbuf, ssl->inbuf, ssl->in_count );
+                free( ssl->inbuf );
+                ssl->inbuf = newbuf;
+              }
+              work_pending = true;  // can we get more input?
+            } else {
+              // can't gather any more input, but app needs more?
+              // This is a bug - since SSL can buffer up to max-frame,
+              // the application _must_ have enough data to process.  If
+              // this is an oversized frame, the app _must_ handle it
+              // by returning an error code to SSL.
+              _log_error("Error: application unable to consume input.\n");
             }
           }
         }
@@ -913,9 +958,9 @@ static ssize_t process_output_ssl( pn_io
     work_pending = false;
     // first, get any pending application output, if possible
 
-    if (!ssl->app_output_closed && ssl->out_count < APP_BUF_SIZE) {
+    if (!ssl->app_output_closed && ssl->out_count < ssl->out_size) {
       pn_io_layer_t *io_next = ssl->io_layer->next;
-      ssize_t app_bytes = io_next->process_output( io_next, &ssl->outbuf[ssl->out_count], APP_BUF_SIZE - ssl->out_count);
+      ssize_t app_bytes = io_next->process_output( io_next, &ssl->outbuf[ssl->out_count], ssl->out_size - ssl->out_count);
       if (app_bytes > 0) {
         ssl->out_count += app_bytes;
         work_pending = true;
@@ -1235,3 +1280,31 @@ static ssize_t process_output_done(pn_io
 {
   return PN_EOS;
 }
+
+// return # output bytes sitting in this layer
+static size_t buffered_output(pn_io_layer_t *io_layer)
+{
+  size_t count = 0;
+  pn_ssl_t *ssl = (pn_ssl_t *)io_layer->context;
+  if (ssl) {
+    count += ssl->out_count;
+    if (ssl->bio_net_io) { // pick up any bytes waiting for network io
+      count += BIO_ctrl_pending(ssl->bio_net_io);
+    }
+  }
+  return count;
+}
+
+// return # input bytes sitting in this layer
+static size_t buffered_input( pn_io_layer_t *io_layer )
+{
+  size_t count = 0;
+  pn_ssl_t *ssl = (pn_ssl_t *)io_layer->context;
+  if (ssl) {
+    count += ssl->in_count;
+    if (ssl->bio_ssl) { // pick up any bytes waiting to be read
+      count += BIO_ctrl_pending(ssl->bio_ssl);
+    }
+  }
+  return count;
+}

Modified: qpid/proton/trunk/proton-c/src/windows/driver.c
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/windows/driver.c?rev=1446697&r1=1446696&r2=1446697&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/windows/driver.c (original)
+++ qpid/proton/trunk/proton-c/src/windows/driver.c Fri Feb 15 17:22:11 2013
@@ -145,7 +145,6 @@ struct pn_listener_t {
   void *context;
 };
 
-#define IO_BUF_SIZE (64*1024)
 #define PN_NAME_MAX (256)
 
 struct pn_connector_t {
@@ -162,13 +161,7 @@ struct pn_connector_t {
   pn_trace_t trace;
   bool closed;
   pn_timestamp_t wakeup;
-  void (*read)(pn_connector_t *);
-  void (*write) (pn_connector_t *);
-  size_t input_size;
-  char input[IO_BUF_SIZE];
   bool input_eos;
-  size_t output_size;
-  char output[IO_BUF_SIZE];
   pn_connection_t *connection;
   pn_transport_t *transport;
   pn_sasl_t *sasl;
@@ -442,11 +435,7 @@ pn_connector_t *pn_connector_fd(pn_drive
   c->trace = driver->trace;
   c->closed = false;
   c->wakeup = 0;
-  c->read = pn_connector_read;
-  c->write = pn_connector_write;
-  c->input_size = 0;
   c->input_eos = false;
-  c->output_size = 0;
   c->connection = NULL;
   c->transport = pn_transport();
   c->sasl = pn_sasl(c->transport);
@@ -545,74 +534,6 @@ void pn_connector_free(pn_connector_t *c
   free(ctor);
 }
 
-static void pn_connector_read(pn_connector_t *ctor)
-{
-  ssize_t n = recv(ctor->fd, ctor->input + ctor->input_size, IO_BUF_SIZE - ctor->input_size, 0);
-  if (n < 0) {
-      if (errno != EAGAIN) {
-          if (n < 0) perror("read");
-          ctor->status &= ~PN_SEL_RD;
-          ctor->input_eos = true;
-      }
-  } else if (n == 0) {
-    ctor->status &= ~PN_SEL_RD;
-    ctor->input_eos = true;
-  } else {
-    ctor->input_size += n;
-  }
-}
-
-static void pn_connector_consume(pn_connector_t *ctor, int n)
-{
-  ctor->input_size -= n;
-  memmove(ctor->input, ctor->input + n, ctor->input_size);
-}
-
-static void pn_connector_process_input(pn_connector_t *ctor)
-{
-  pn_transport_t *transport = ctor->transport;
-  if (!ctor->input_done) {
-    if (ctor->input_size > 0 || ctor->input_eos) {
-      ssize_t n = pn_transport_input(transport, ctor->input, ctor->input_size);
-      if (n >= 0) {
-        pn_connector_consume(ctor, n);
-      } else {
-        pn_connector_consume(ctor, ctor->input_size);
-        ctor->input_done = true;
-      }
-    }
-  }
-}
-
-static char *pn_connector_output(pn_connector_t *ctor)
-{
-  return ctor->output + ctor->output_size;
-}
-
-static size_t pn_connector_available(pn_connector_t *ctor)
-{
-  return IO_BUF_SIZE - ctor->output_size;
-}
-
-static void pn_connector_process_output(pn_connector_t *ctor)
-{
-  pn_transport_t *transport = ctor->transport;
-  if (!ctor->output_done) {
-    ssize_t n = pn_transport_output(transport, pn_connector_output(ctor),
-                                    pn_connector_available(ctor));
-    if (n >= 0) {
-      ctor->output_size += n;
-    } else {
-      ctor->output_done = true;
-    }
-  }
-
-  if (ctor->output_size) {
-    ctor->status |= PN_SEL_WR;
-  }
-}
-
-
 void pn_connector_activate(pn_connector_t *ctor, pn_activate_criteria_t crit)
 {
     switch (crit) {
@@ -648,28 +569,6 @@ bool pn_connector_activated(pn_connector
     return result;
 }
 
-
-static void pn_connector_write(pn_connector_t *ctor)
-{
-  if (ctor->output_size > 0) {
-    ssize_t n = pn_send(ctor->fd, ctor->output, ctor->output_size);
-    if (n < 0) {
-      // XXX
-        if (errno != EAGAIN) {
-            perror("send");
-            ctor->output_size = 0;
-            ctor->output_done = true;
-        }
-    } else {
-      ctor->output_size -= n;
-      memmove(ctor->output, ctor->output + n, ctor->output_size);
-    }
-  }
-
-  if (!ctor->output_size)
-    ctor->status &= ~PN_SEL_WR;
-}
-
 static pn_timestamp_t pn_connector_tick(pn_connector_t *ctor, time_t now)
 {
   if (!ctor->transport) return 0;
@@ -681,21 +580,80 @@ void pn_connector_process(pn_connector_t
   if (c) {
     if (c->closed) return;
 
-    if (c->pending_read) {
-      c->read(c);
-      c->pending_read = false;
+    pn_transport_t *transport = c->transport;
+
+    ///
+    /// Socket read
+    ///
+    if (!c->input_done) {
+      ssize_t capacity = pn_transport_capacity(transport);
+      if (capacity > 0) {
+        c->status |= PN_SEL_RD;
+        if (c->pending_read) {
+          c->pending_read = false;
+          ssize_t n =  recv(c->fd, pn_transport_tail(transport),
+                            capacity, 0);
+          if (n < 0) {
+            if (errno != EAGAIN) {
+              perror("read");
+              c->status &= ~PN_SEL_RD;
+              c->input_eos = true;
+            }
+          } else {
+            if (n == 0) {
+              c->status &= ~PN_SEL_RD;
+              c->input_eos = true;
+            }
+            if (pn_transport_push(transport, (size_t) n) < 0) {
+              c->status &= ~PN_SEL_RD;
+              c->input_done = true;
+            }
+          }
+        }
+      } else if (capacity < 0) {
+        c->status &= ~PN_SEL_RD;
+        c->input_done = true;
+      }
     }
-    pn_connector_process_input(c);
 
+    ///
+    /// Event wakeup
+    ///
     c->wakeup = pn_connector_tick(c, pn_i_now());
 
-    pn_connector_process_output(c);
-    if (c->pending_write) {
-      c->write(c);
-      c->pending_write = false;
-      pn_connector_process_output(c);  // XXX: review this - there's a better way to determine if the WR flag should be re-set
+    ///
+    /// Socket write
+    ///
+    if (!c->output_done) {
+      ssize_t pending = pn_transport_pending(transport);
+      if (pending > 0) {
+        c->status |= PN_SEL_WR;
+        if (c->pending_write) {
+          c->pending_write = false;
+          ssize_t n = pn_send(c->fd, pn_transport_head(transport), pending);
+          if (n < 0) {
+            // XXX
+            if (errno != EAGAIN) {
+              perror("send");
+              c->output_done = true;
+              c->status &= ~PN_SEL_WR;
+            }
+          } else if (n) {
+            pn_transport_pop(transport, (size_t) n);
+            pending -= n;
+            if (pending == 0)
+              c->status &= ~PN_SEL_WR;
+          }
+        }
+      } else if (pending < 0) {
+        c->output_done = true;
+        c->status &= ~PN_SEL_WR;
+      }
     }
-    if (c->output_size == 0 && c->input_done && c->output_done) {
+
+    // Closed?
+
+    if (c->input_done && c->output_done) {
       if (c->trace & (PN_TRACE_FRM | PN_TRACE_RAW | PN_TRACE_DRV)) {
         fprintf(stderr, "Closed %s\n", c->name);
       }
@@ -960,8 +918,7 @@ pn_connector_t *pn_driver_connector(pn_d
     pn_connector_t *c = d->connector_next;
     d->connector_next = c->connector_next;
 
-    if (c->closed || c->pending_read || c->pending_write || c->pending_tick ||
-        c->input_size || c->input_eos) {
+    if (c->closed || c->pending_read || c->pending_write || c->pending_tick || c->input_eos) {
       return c;
     }
   }

Modified: qpid/proton/trunk/tests/python/proton_tests/messenger.py
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/tests/python/proton_tests/messenger.py?rev=1446697&r1=1446696&r2=1446697&view=diff
==============================================================================
--- qpid/proton/trunk/tests/python/proton_tests/messenger.py (original)
+++ qpid/proton/trunk/tests/python/proton_tests/messenger.py Fri Feb 15 17:22:11 2013
@@ -20,10 +20,12 @@
 import os, common
 from proton import *
 from threading import Thread
+from time import sleep, time
 
 class Test(common.Test):
 
   def setup(self):
+    self.server_received = 0;
     self.server = Messenger("server")
     self.server.timeout=10000
     self.server.start()
@@ -69,6 +71,7 @@ class MessengerTest(Test):
   def process_incoming(self, msg):
     while self.server.incoming:
       self.server.get(msg)
+      self.server_received += 1
       if msg.body == REJECT_ME:
         self.server.reject()
       else:
@@ -250,3 +253,19 @@ class MessengerTest(Test):
         remaining -= 1
     for t in trackers:
       assert self.client.status(t) is ACCEPTED, (t, self.client.status(t))
+
+  def test_proton222(self):
+    self.start()
+    msg = Message()
+    msg.address="amqp://0.0.0.0:12345"
+    msg.subject="Hello World!"
+    msg.load("First the world, then the galaxy!")
+    assert self.server_received == 0
+    self.client.put(msg)
+    self.client.send()
+    # ensure the server got the message without requiring client to stop first
+    deadline = time() + 10
+    while self.server_received == 0:
+      assert time() < deadline, "Server did not receive message!"
+      sleep(.1)
+    assert self.server_received == 1

Modified: qpid/proton/trunk/tests/python/proton_tests/sasl.py
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/tests/python/proton_tests/sasl.py?rev=1446697&r1=1446696&r2=1446697&view=diff
==============================================================================
--- qpid/proton/trunk/tests/python/proton_tests/sasl.py (original)
+++ qpid/proton/trunk/tests/python/proton_tests/sasl.py Fri Feb 15 17:22:11 2013
@@ -106,3 +106,27 @@ class SaslTest(Test):
 
     out1 = self.t1.output(1024)
     assert len(out1) > 0
+
+  def testFracturedSASL(self):
+    """ PROTON-235
+    """
+    self.s1.mechanisms("ANONYMOUS")
+    self.s1.client()
+    assert self.s1.outcome is None
+
+    # self.t1.trace(Transport.TRACE_FRM)
+
+    out = self.t1.output(1024)
+    self.t1.input("AMQP\x03\x01\x00\x00")
+    out = self.t1.output(1024)
+    self.t1.input("\x00\x00\x00")
+    out = self.t1.output(1024)
+    self.t1.input("A\x02\x01\x00\x00\x00S@\xc04\x01\xe01\x06\xa3\x06GSSAPI\x05PLAIN\x0aDIGEST-MD5\x08AMQPLAIN\x08CRAM-MD5\x04NTLM")
+    out = self.t1.output(1024)
+    self.t1.input("\x00\x00\x00\x10\x02\x01\x00\x00\x00SD\xc0\x03\x01P\x00")
+    out = self.t1.output(1024)
+    while out:
+      out = self.t1.output(1024)
+
+    assert self.s1.outcome == SASL.OK
+



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org