You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rh...@apache.org on 2012/09/07 03:19:17 UTC

svn commit: r1381839 - in /qpid/proton/trunk: proton-c/bindings/php/ proton-c/bindings/python/ proton-c/include/proton/ proton-c/src/ proton-c/src/dispatcher/ proton-c/src/engine/ proton-c/src/message/ proton-c/src/sasl/ tests/proton_tests/

Author: rhs
Date: Fri Sep  7 01:19:16 2012
New Revision: 1381839

URL: http://svn.apache.org/viewvc?rev=1381839&view=rev
Log:
decoupled transport from connection and moved sasl layer from driver to transport; fixed several memory leaks shown by valgrind; made messenger use driver's connection and listener iterators in favor of keeping internal duplicate structures

Added:
    qpid/proton/trunk/proton-c/src/sasl/sasl-internal.h
    qpid/proton/trunk/tests/proton_tests/transport.py
Modified:
    qpid/proton/trunk/proton-c/bindings/php/php.i
    qpid/proton/trunk/proton-c/bindings/python/python.i
    qpid/proton/trunk/proton-c/include/proton/cproton.i
    qpid/proton/trunk/proton-c/include/proton/engine.h
    qpid/proton/trunk/proton-c/include/proton/parser.h
    qpid/proton/trunk/proton-c/include/proton/sasl.h
    qpid/proton/trunk/proton-c/src/buffer.c
    qpid/proton/trunk/proton-c/src/dispatcher/dispatcher.c
    qpid/proton/trunk/proton-c/src/dispatcher/dispatcher.h
    qpid/proton/trunk/proton-c/src/driver.c
    qpid/proton/trunk/proton-c/src/engine/engine-internal.h
    qpid/proton/trunk/proton-c/src/engine/engine.c
    qpid/proton/trunk/proton-c/src/error.c
    qpid/proton/trunk/proton-c/src/message/message.c
    qpid/proton/trunk/proton-c/src/messenger.c
    qpid/proton/trunk/proton-c/src/sasl/sasl.c
    qpid/proton/trunk/proton-c/src/scanner.c
    qpid/proton/trunk/proton-c/src/util.h
    qpid/proton/trunk/tests/proton_tests/__init__.py
    qpid/proton/trunk/tests/proton_tests/engine.py
    qpid/proton/trunk/tests/proton_tests/message.py
    qpid/proton/trunk/tests/proton_tests/messenger.py
    qpid/proton/trunk/tests/proton_tests/sasl.py

Modified: qpid/proton/trunk/proton-c/bindings/php/php.i
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/bindings/php/php.i?rev=1381839&r1=1381838&r2=1381839&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/bindings/php/php.i (original)
+++ qpid/proton/trunk/proton-c/bindings/php/php.i Fri Sep  7 01:19:16 2012
@@ -67,8 +67,6 @@ ssize_t pn_input(pn_transport_t *transpo
 
 ssize_t pn_sasl_send(pn_sasl_t *sasl, char *STRING, size_t LENGTH);
 %ignore pn_sasl_send;
-ssize_t pn_sasl_input(pn_sasl_t *sasl, char *STRING, size_t LENGTH);
-%ignore pn_sasl_input;
 
 
 // Use the OUTPUT_BUFFER,OUTPUT_LEN typemap to allow these functions to return
@@ -110,18 +108,6 @@ ssize_t pn_sasl_input(pn_sasl_t *sasl, c
 %}
 %ignore pn_output;
 
-%rename(pn_sasl_output) wrap_pn_output;
-// in PHP:   array = pn_sasl_output(sasl, MAXLEN);
-//           array[0] = size || error code
-//           array[1] = native string containing binary data
-%inline %{
-    void wrap_pn_sasl_output(pn_sasl_t *sasl, size_t maxCount, char **OUTPUT_BUFFER, ssize_t *OUTPUT_LEN) {
-        *OUTPUT_BUFFER = emalloc(sizeof(char) * maxCount);
-        *OUTPUT_LEN = pn_sasl_output(sasl, *OUTPUT_BUFFER, maxCount);
-    }
-%}
-%ignore pn_sasl_output;
-
 %rename(pn_message_data) wrap_pn_message_data;
 // in PHP:  array = pn_message_data("binary message data", MAXLEN);
 //          array[0] = size || error code

Modified: qpid/proton/trunk/proton-c/bindings/python/python.i
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/bindings/python/python.i?rev=1381839&r1=1381838&r2=1381839&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/bindings/python/python.i (original)
+++ qpid/proton/trunk/proton-c/bindings/python/python.i Fri Sep  7 01:19:16 2012
@@ -104,23 +104,6 @@ ssize_t pn_input(pn_transport_t *transpo
 %}
 %ignore pn_output;
 
-ssize_t pn_sasl_input(pn_sasl_t *sasl, char *STRING, size_t LENGTH);
-%ignore pn_sasl_input;
-
-%rename(pn_sasl_output) wrap_pn_sasl_output;
-%inline %{
-  int wrap_pn_sasl_output(pn_sasl_t *sasl, char *OUTPUT, size_t *OUTPUT_SIZE) {
-    ssize_t sz = pn_sasl_output(sasl, OUTPUT, *OUTPUT_SIZE);
-    if (sz >= 0) {
-      *OUTPUT_SIZE = sz;
-    } else {
-      *OUTPUT_SIZE = 0;
-    }
-    return sz;
-  }
-%}
-%ignore pn_sasl_output;
-
 %rename(pn_delivery) wrap_pn_delivery;
 %inline %{
   pn_delivery_t *wrap_pn_delivery(pn_link_t *link, char *STRING, size_t LENGTH) {

Modified: qpid/proton/trunk/proton-c/include/proton/cproton.i
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/include/proton/cproton.i?rev=1381839&r1=1381838&r2=1381839&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/include/proton/cproton.i (original)
+++ qpid/proton/trunk/proton-c/include/proton/cproton.i Fri Sep  7 01:19:16 2012
@@ -981,34 +981,6 @@
   check_sasl_outcome(pn_sasl_outcome);
 }
 
-%contract pn_sasl_input(pn_sasl_t *sasl, char *bytes, size_t available)
-{
- require:
-  sasl != NULL;
-  bytes != NULL;
-  available > 0;
-}
-
-%contract pn_sasl_output(pn_sasl_t *sasl, char *bytes, size_t size)
-{
- require:
-  sasl != NULL;
-  bytes != NULL;
-  size > 0;
-}
-
-%contract pn_sasl_trace(pn_sasl_t *sasl, pn_trace_t trace)
-{
- require:
-  sasl != NULL;
-}
-
-%contract pn_sasl_free(pn_sasl_t *sasl)
-{
- require:
-  sasl != NULL;
-}
-
 %include "proton/sasl.h"
 
 %contract pn_driver(void)

Modified: qpid/proton/trunk/proton-c/include/proton/engine.h
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/include/proton/engine.h?rev=1381839&r1=1381838&r2=1381839&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/include/proton/engine.h (original)
+++ qpid/proton/trunk/proton-c/include/proton/engine.h Fri Sep  7 01:19:16 2012
@@ -148,15 +148,22 @@ pn_delivery_t *pn_work_next(pn_delivery_
  */
 pn_session_t *pn_session(pn_connection_t *connection);
 
-/** Factory for creating the connection's transport.
+/** Factory for creating a transport.
  *
- * The transport used by the connection to interface with the network.
- * There can only be one transport associated with a connection.
+ * A transport to be used by a connection to interface with the
+ * network. There can only be one connection associated with a
+ * transport. See pn_transport_bind().
  *
- * @param[in] connection connection that will use the transport
- * @return pointer to new session
+ * @return pointer to new transport
  */
-pn_transport_t *pn_transport(pn_connection_t *connection);
+pn_transport_t *pn_transport(void);
+
+/** Binds the transport to an AMQP connection endpoint.
+ *
+ * @return an error code, or 0 on success
+ */
+
+int pn_transport_bind(pn_transport_t *transport, pn_connection_t *connection);
 
 /** Retrieve the first Session that matches the given state mask.
  *

Modified: qpid/proton/trunk/proton-c/include/proton/parser.h
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/include/proton/parser.h?rev=1381839&r1=1381838&r2=1381839&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/include/proton/parser.h (original)
+++ qpid/proton/trunk/proton-c/include/proton/parser.h Fri Sep  7 01:19:16 2012
@@ -30,7 +30,7 @@ extern "C" {
 
 typedef struct pn_parser_t pn_parser_t;
 
-pn_parser_t *pn_parser();
+pn_parser_t *pn_parser(void);
 int pn_parser_parse(pn_parser_t *parser, const char *str, pn_atoms_t *atoms);
 int pn_parser_errno(pn_parser_t *parser);
 const char *pn_parser_error(pn_parser_t *parser);

Modified: qpid/proton/trunk/proton-c/include/proton/sasl.h
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/include/proton/sasl.h?rev=1381839&r1=1381838&r2=1381839&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/include/proton/sasl.h (original)
+++ qpid/proton/trunk/proton-c/include/proton/sasl.h Fri Sep  7 01:19:16 2012
@@ -24,6 +24,7 @@
 
 #include <sys/types.h>
 #include <stdbool.h>
+#include <proton/engine.h>
 
 #ifdef __cplusplus
 extern "C" {
@@ -64,7 +65,7 @@ typedef enum {
  *
  * @return a new SASL object representing the layer.
  */
-pn_sasl_t *pn_sasl();
+pn_sasl_t *pn_sasl(pn_transport_t *transport);
 
 /** Access the current state of the layer.
  *
@@ -163,39 +164,6 @@ void pn_sasl_done(pn_sasl_t *sasl, pn_sa
  */
 pn_sasl_outcome_t pn_sasl_outcome(pn_sasl_t *sasl);
 
-/** Decode input data bytes into SASL frames, and process them.
- *
- * This function is called by the driver layer to pass data received
- * from the remote peer into the SASL layer.
- *
- * @param[in] sasl the SASL layer.
- * @param[in] bytes buffer of frames to process
- * @param[in] available number of octets of data in 'bytes'
- * @return the number of bytes consumed, or error code if < 0
- */
-ssize_t pn_sasl_input(pn_sasl_t *sasl, char *bytes, size_t available);
-
-/** Gather output frames from the layer.
- *
- * This function is used by the driver to poll the SASL layer for data
- * that will be sent to the remote peer.
- *
- * @param[in] sasl The SASL layer.
- * @param[out] bytes to be filled with encoded frames.
- * @param[in] size space available in bytes array.
- * @return the number of octets written to bytes, or error code if < 0
- */
-ssize_t pn_sasl_output(pn_sasl_t *sasl, char *bytes, size_t size);
-
-void pn_sasl_trace(pn_sasl_t *sasl, pn_trace_t trace);
-
-/** Destructor for the given SASL layer.
- *
- * @param[in] sasl the SASL object to free. No longer valid on
- *                 return.
- */
-void pn_sasl_free(pn_sasl_t *sasl);
-
 #ifdef __cplusplus
 }
 #endif

Modified: qpid/proton/trunk/proton-c/src/buffer.c
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/buffer.c?rev=1381839&r1=1381838&r2=1381839&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/buffer.c (original)
+++ qpid/proton/trunk/proton-c/src/buffer.c Fri Sep  7 01:19:16 2012
@@ -139,9 +139,6 @@ int pn_buffer_ensure(pn_buffer_t *buf, s
   return 0;
 }
 
-#define min(X,Y) ((X) > (Y) ? (Y) : (X))
-#define max(X,Y) ((X) < (Y) ? (Y) : (X))
-
 int pn_buffer_append(pn_buffer_t *buf, const char *bytes, size_t size)
 {
   int err = pn_buffer_ensure(buf, size);
@@ -149,7 +146,7 @@ int pn_buffer_append(pn_buffer_t *buf, c
 
   size_t tail = pn_buffer_tail(buf);
   size_t tail_space = pn_buffer_tail_space(buf);
-  size_t n = min(tail_space, size);
+  size_t n = pn_min(tail_space, size);
 
   memmove(buf->bytes + tail, bytes, n);
   memmove(buf->bytes, bytes + n, size - n);
@@ -166,7 +163,7 @@ int pn_buffer_prepend(pn_buffer_t *buf, 
 
   size_t head = pn_buffer_head(buf);
   size_t head_space = pn_buffer_head_space(buf);
-  size_t n = min(head_space, size);
+  size_t n = pn_min(head_space, size);
 
   memmove(buf->bytes + head - n, bytes + size - n, n);
   memmove(buf->bytes + buf->capacity - (size - n), bytes, size - n);

Modified: qpid/proton/trunk/proton-c/src/dispatcher/dispatcher.c
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/dispatcher/dispatcher.c?rev=1381839&r1=1381838&r2=1381839&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/dispatcher/dispatcher.c (original)
+++ qpid/proton/trunk/proton-c/src/dispatcher/dispatcher.c Fri Sep  7 01:19:16 2012
@@ -82,8 +82,8 @@ static void pn_do_trace(pn_dispatcher_t 
     uint8_t code = scanned ? code64 : 0;
     size_t n = SCRATCH;
     pn_data_format(args, disp->scratch, &n);
-    fprintf(stderr, "[%p:%u] %s %s %s", (void *) disp, ch,
-            dir == OUT ? "->" : "<-", disp->names[code], disp->scratch);
+    pn_dispatcher_trace(disp, ch, "%s %s %s", dir == OUT ? "->" : "<-",
+                        disp->names[code], disp->scratch);
     if (size) {
       size_t capacity = 4*size + 1;
       char buf[capacity];
@@ -95,6 +95,16 @@ static void pn_do_trace(pn_dispatcher_t 
   }
 }
 
+void pn_dispatcher_trace(pn_dispatcher_t *disp, uint16_t ch, char *fmt, ...)
+{
+  va_list ap;
+  fprintf(stderr, "[%p:%u] ", (void *) disp, ch);
+
+  va_start(ap, fmt);
+  vfprintf(stderr, fmt, ap);
+  va_end(ap);
+}
+
 ssize_t pn_dispatcher_input(pn_dispatcher_t *disp, char *bytes, size_t available)
 {
   size_t read = 0;

Modified: qpid/proton/trunk/proton-c/src/dispatcher/dispatcher.h
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/dispatcher/dispatcher.h?rev=1381839&r1=1381838&r2=1381839&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/dispatcher/dispatcher.h (original)
+++ qpid/proton/trunk/proton-c/src/dispatcher/dispatcher.h Fri Sep  7 01:19:16 2012
@@ -64,5 +64,6 @@ void pn_set_payload(pn_dispatcher_t *dis
 int pn_post_frame(pn_dispatcher_t *disp, uint16_t ch, const char *fmt, ...);
 ssize_t pn_dispatcher_input(pn_dispatcher_t *disp, char *bytes, size_t available);
 ssize_t pn_dispatcher_output(pn_dispatcher_t *disp, char *bytes, size_t size);
+void pn_dispatcher_trace(pn_dispatcher_t *disp, uint16_t ch, char *fmt, ...);
 
 #endif /* dispatcher.h */

Modified: qpid/proton/trunk/proton-c/src/driver.c
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/driver.c?rev=1381839&r1=1381838&r2=1381839&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/driver.c (original)
+++ qpid/proton/trunk/proton-c/src/driver.c Fri Sep  7 01:19:16 2012
@@ -94,11 +94,8 @@ struct pn_connector_t {
   bool input_eos;
   size_t output_size;
   char output[IO_BUF_SIZE];
-  pn_sasl_t *sasl;
   pn_connection_t *connection;
   pn_transport_t *transport;
-  ssize_t (*process_input)(pn_connector_t *);
-  ssize_t (*process_output)(pn_connector_t *);
   bool input_done;
   bool output_done;
   pn_listener_t *listener;
@@ -332,15 +329,6 @@ static void pn_connector_read(pn_connect
 static void pn_connector_write(pn_connector_t *ctor);
 static time_t pn_connector_tick(pn_connector_t *ctor, time_t now);
 
-static ssize_t pn_connector_read_sasl_header(pn_connector_t *ctor);
-static ssize_t pn_connector_read_sasl(pn_connector_t *ctor);
-static ssize_t pn_connector_read_amqp_header(pn_connector_t *ctor);
-static ssize_t pn_connector_read_amqp(pn_connector_t *ctor);
-static ssize_t pn_connector_write_sasl_header(pn_connector_t *ctor);
-static ssize_t pn_connector_write_sasl(pn_connector_t *ctor);
-static ssize_t pn_connector_write_amqp_header(pn_connector_t *ctor);
-static ssize_t pn_connector_write_amqp(pn_connector_t *ctor);
-
 pn_connector_t *pn_connector_fd(pn_driver_t *driver, int fd, void *context)
 {
   if (!driver) return NULL;
@@ -366,11 +354,8 @@ pn_connector_t *pn_connector_fd(pn_drive
   c->input_size = 0;
   c->input_eos = false;
   c->output_size = 0;
-  c->sasl = pn_sasl();
   c->connection = NULL;
-  c->transport = NULL;
-  c->process_input = pn_connector_read_sasl_header;
-  c->process_output = pn_connector_write_sasl_header;
+  c->transport = pn_transport();
   c->input_done = false;
   c->output_done = false;
   c->context = context;
@@ -396,20 +381,19 @@ void pn_connector_trace(pn_connector_t *
 {
   if (!ctor) return;
   ctor->trace = trace;
-  if (ctor->sasl) pn_sasl_trace(ctor->sasl, trace);
   if (ctor->transport) pn_trace(ctor->transport, trace);
 }
 
 pn_sasl_t *pn_connector_sasl(pn_connector_t *ctor)
 {
-  return ctor ? ctor->sasl : NULL;
+  return ctor ? pn_sasl(ctor->transport) : NULL;
 }
 
 void pn_connector_set_connection(pn_connector_t *ctor, pn_connection_t *connection)
 {
   if (!ctor) return;
   ctor->connection = connection;
-  ctor->transport = pn_transport(connection);
+  pn_transport_bind(ctor->transport, connection);
   if (ctor->transport) pn_trace(ctor->transport, ctor->trace);
 }
 
@@ -457,8 +441,8 @@ void pn_connector_free(pn_connector_t *c
 
   if (ctor->driver) pn_driver_remove_connector(ctor->driver, ctor);
   ctor->connection = NULL;
+  pn_transport_free(ctor->transport);
   ctor->transport = NULL;
-  pn_sasl_free(ctor->sasl);
   free(ctor);
 }
 
@@ -482,93 +466,18 @@ static void pn_connector_consume(pn_conn
 
 static void pn_connector_process_input(pn_connector_t *ctor)
 {
-  while (!ctor->input_done && (ctor->input_size > 0 || ctor->input_eos)) {
-    ssize_t n = ctor->process_input(ctor);
-    if (n > 0) {
-      pn_connector_consume(ctor, n);
-    } else if (n == 0) {
-      break;
-    } else {
-      if (n == PN_EOS) {
-        pn_connector_consume(ctor, ctor->input_size);
+  pn_transport_t *transport = ctor->transport;
+  if (!ctor->input_done) {
+    if (ctor->input_size > 0 || ctor->input_eos) {
+      ssize_t n = pn_input(transport, ctor->input, ctor->input_size);
+      if (n >= 0) {
+        pn_connector_consume(ctor, n);
       } else {
-        fprintf(stderr, "error in process_input: %s\n", pn_code(n));
+        pn_connector_consume(ctor, ctor->input_size);
+        ctor->input_done = true;
       }
-      ctor->input_done = true;
-      break;
-    }
-  }
-}
-
-static ssize_t pn_connector_read_sasl_header(pn_connector_t *ctor)
-{
-  if (ctor->input_size >= 8) {
-    if (memcmp(ctor->input, "AMQP\x03\x01\x00\x00", 8)) {
-      fprintf(stderr, "sasl header missmatch: ");
-      pn_fprint_data(stderr, ctor->input, ctor->input_size);
-      fprintf(stderr, "\n");
-      ctor->output_done = true;
-      return PN_ERR;
-    } else {
-      if (ctor->trace & PN_TRACE_FRM)
-        fprintf(stderr, "    <- AMQP SASL 1.0\n");
-      ctor->process_input = pn_connector_read_sasl;
-      return 8;
-    }
-  } else if (ctor->input_eos) {
-    fprintf(stderr, "sasl header missmatch: ");
-    pn_fprint_data(stderr, ctor->input, ctor->input_size);
-    fprintf(stderr, "\n");
-    ctor->output_done = true;
-    return PN_ERR;
-  }
-
-  return 0;
-}
-
-static ssize_t pn_connector_read_sasl(pn_connector_t *ctor)
-{
-  pn_sasl_t *sasl = ctor->sasl;
-  ssize_t n = pn_sasl_input(sasl, ctor->input, ctor->input_size);
-  if (n == PN_EOS) {
-    ctor->process_input = pn_connector_read_amqp_header;
-    return ctor->process_input(ctor);
-  } else {
-    return n;
-  }
-}
-
-static ssize_t pn_connector_read_amqp_header(pn_connector_t *ctor)
-{
-  if (ctor->input_size >= 8) {
-    if (memcmp(ctor->input, "AMQP\x00\x01\x00\x00", 8)) {
-      fprintf(stderr, "amqp header missmatch: ");
-      pn_fprint_data(stderr, ctor->input, ctor->input_size);
-      fprintf(stderr, "\n");
-      ctor->output_done = true;
-      return PN_ERR;
-    } else {
-      if (ctor->trace & PN_TRACE_FRM)
-        fprintf(stderr, "    <- AMQP 1.0\n");
-      ctor->process_input = pn_connector_read_amqp;
-      return 8;
     }
-  } else if (ctor->input_eos) {
-    fprintf(stderr, "amqp header missmatch: ");
-    pn_fprint_data(stderr, ctor->input, ctor->input_size);
-    fprintf(stderr, "\n");
-    ctor->output_done = true;
-    return PN_ERR;
   }
-
-  return 0;
-}
-
-static ssize_t pn_connector_read_amqp(pn_connector_t *ctor)
-{
-  if (!ctor->transport) return 0;
-  pn_transport_t *transport = ctor->transport;
-  return pn_input(transport, ctor->input, ctor->input_size);
 }
 
 static char *pn_connector_output(pn_connector_t *ctor)
@@ -583,18 +492,13 @@ static size_t pn_connector_available(pn_
 
 static void pn_connector_process_output(pn_connector_t *ctor)
 {
-  while (!ctor->output_done && pn_connector_available(ctor) > 0) {
-    ssize_t n = ctor->process_output(ctor);
-    if (n > 0) {
+  pn_transport_t *transport = ctor->transport;
+  if (!ctor->output_done) {
+    ssize_t n = pn_output(transport, pn_connector_output(ctor), pn_connector_available(ctor));
+    if (n >= 0) {
       ctor->output_size += n;
-    } else if (n == 0) {
-      break;
     } else {
-      if (n != PN_EOS) {
-        fprintf(stderr, "error in process_output: %s\n", pn_code(n));
-      }
       ctor->output_done = true;
-      break;
     }
   }
 
@@ -622,43 +526,6 @@ static void pn_connector_write(pn_connec
     ctor->status &= ~PN_SEL_WR;
 }
 
-static ssize_t pn_connector_write_sasl_header(pn_connector_t *ctor)
-{
-  if (ctor->trace & PN_TRACE_FRM)
-    fprintf(stderr, "    -> AMQP SASL 1.0\n");
-  memmove(pn_connector_output(ctor), "AMQP\x03\x01\x00\x00", 8);
-  ctor->process_output = pn_connector_write_sasl;
-  return 8;
-}
-
-static ssize_t pn_connector_write_sasl(pn_connector_t *ctor)
-{
-  pn_sasl_t *sasl = ctor->sasl;
-  ssize_t n = pn_sasl_output(sasl, pn_connector_output(ctor), pn_connector_available(ctor));
-  if (n == PN_EOS) {
-    ctor->process_output = pn_connector_write_amqp_header;
-    return ctor->process_output(ctor);
-  } else {
-    return n;
-  }
-}
-
-static ssize_t pn_connector_write_amqp_header(pn_connector_t *ctor)
-{
-  if (ctor->trace & PN_TRACE_FRM)
-    fprintf(stderr, "    -> AMQP 1.0\n");
-  memmove(pn_connector_output(ctor), "AMQP\x00\x01\x00\x00", 8);
-  ctor->process_output = pn_connector_write_amqp;
-  return 8;
-}
-
-static ssize_t pn_connector_write_amqp(pn_connector_t *ctor)
-{
-  if (!ctor->transport) return 0;
-  pn_transport_t *transport = ctor->transport;
-  return pn_output(transport, pn_connector_output(ctor), pn_connector_available(ctor));
-}
-
 static time_t pn_connector_tick(pn_connector_t *ctor, time_t now)
 {
   if (!ctor->transport) return 0;

Modified: qpid/proton/trunk/proton-c/src/engine/engine-internal.h
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/engine/engine-internal.h?rev=1381839&r1=1381838&r2=1381839&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/engine/engine-internal.h (original)
+++ qpid/proton/trunk/proton-c/src/engine/engine-internal.h Fri Sep  7 01:19:16 2012
@@ -85,15 +85,22 @@ typedef struct {
 
 #define SCRATCH (1024)
 
+#include <proton/sasl.h>
+
 struct pn_transport_t {
-  pn_endpoint_t endpoint;
+  ssize_t (*process_input)(pn_transport_t *, char *, size_t);
+  ssize_t (*process_output)(pn_transport_t *, char *, size_t);
+  size_t header_count;
+  pn_sasl_t *sasl;
   pn_connection_t *connection;
   pn_dispatcher_t *disp;
   bool open_sent;
   bool open_rcvd;
   bool close_sent;
   bool close_rcvd;
-  int error;
+  char *remote_container;
+  char *remote_hostname;
+  pn_error_t *error;
   pn_session_state_t *sessions;
   size_t session_capacity;
   pn_session_state_t **channels;
@@ -118,8 +125,6 @@ struct pn_connection_t {
   pn_delivery_t *tpwork_tail;
   char *container;
   char *hostname;
-  char *remote_container;
-  char *remote_hostname;
 };
 
 struct pn_session_t {
@@ -201,5 +206,6 @@ void pn_link_dump(pn_link_t *link);
   }
 
 void pn_dump(pn_connection_t *conn);
+void pn_transport_sasl_init(pn_transport_t *transport);
 
 #endif /* engine-internal.h */

Modified: qpid/proton/trunk/proton-c/src/engine/engine.c
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/engine/engine.c?rev=1381839&r1=1381838&r2=1381839&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/engine/engine.c (original)
+++ qpid/proton/trunk/proton-c/src/engine/engine.c Fri Sep  7 01:19:16 2012
@@ -29,6 +29,8 @@
 #include <stdarg.h>
 #include <stdio.h>
 
+#include "../sasl/sasl-internal.h"
+
 // delivery buffers
 
 void pn_delivery_buffer_init(pn_delivery_buffer_t *db, pn_sequence_t next, size_t capacity)
@@ -187,18 +189,18 @@ void pn_connection_close(pn_connection_t
   if (connection) pn_close((pn_endpoint_t *) connection);
 }
 
+void pn_endpoint_tini(pn_endpoint_t *endpoint);
+
 void pn_connection_free(pn_connection_t *connection)
 {
   if (!connection) return;
 
-  pn_transport_free(connection->transport);
   while (connection->session_count)
     pn_session_free(connection->sessions[connection->session_count - 1]);
   free(connection->sessions);
   free(connection->container);
   free(connection->hostname);
-  free(connection->remote_container);
-  free(connection->remote_hostname);
+  pn_endpoint_tini(&connection->endpoint);
   free(connection);
 }
 
@@ -216,6 +218,7 @@ void pn_transport_free(pn_transport_t *t
 {
   if (!transport) return;
 
+  pn_sasl_free(transport->sasl);
   pn_dispatcher_free(transport->disp);
   for (int i = 0; i < transport->session_capacity; i++) {
     pn_delivery_buffer_free(&transport->sessions[i].incoming);
@@ -223,6 +226,9 @@ void pn_transport_free(pn_transport_t *t
     free(transport->sessions[i].links);
     free(transport->sessions[i].handles);
   }
+  free(transport->remote_container);
+  free(transport->remote_hostname);
+  pn_error_free(transport->error);
   free(transport->sessions);
   free(transport->channels);
   free(transport);
@@ -273,6 +279,7 @@ void pn_session_free(pn_session_t *sessi
     pn_link_free(session->links[session->link_count - 1]);
   pn_remove_session(session->connection, session);
   free(session->links);
+  pn_endpoint_tini(&session->endpoint);
   free(session);
 }
 
@@ -354,6 +361,7 @@ void pn_link_free(pn_link_t *link)
     pn_free_delivery(d);
   }
   free(link->name);
+  pn_endpoint_tini(&link->endpoint);
   free(link);
 }
 
@@ -371,6 +379,11 @@ void pn_endpoint_init(pn_endpoint_t *end
   LL_ADD(conn, endpoint, endpoint);
 }
 
+void pn_endpoint_tini(pn_endpoint_t *endpoint)
+{
+  pn_error_free(endpoint->error);
+}
+
 pn_connection_t *pn_connection()
 {
   pn_connection_t *conn = malloc(sizeof(pn_connection_t));
@@ -391,8 +404,6 @@ pn_connection_t *pn_connection()
   conn->tpwork_tail = NULL;
   conn->container = NULL;
   conn->hostname = NULL;
-  conn->remote_container = NULL;
-  conn->remote_hostname = NULL;
 
   return conn;
 }
@@ -433,12 +444,14 @@ void pn_connection_set_hostname(pn_conne
 
 const char *pn_connection_remote_container(pn_connection_t *connection)
 {
-  return connection ? connection->remote_container : NULL;
+  if (!connection) return NULL;
+  return connection->transport ? connection->transport->remote_container : NULL;
 }
 
 const char *pn_connection_remote_hostname(pn_connection_t *connection)
 {
-  return connection ? connection->remote_hostname : NULL;
+  if (!connection) return NULL;
+  return connection->transport ? connection->transport->remote_hostname : NULL;
 }
 
 pn_delivery_t *pn_work_head(pn_connection_t *connection)
@@ -655,10 +668,21 @@ int pn_do_detach(pn_dispatcher_t *disp);
 int pn_do_end(pn_dispatcher_t *disp);
 int pn_do_close(pn_dispatcher_t *disp);
 
+static ssize_t pn_input_read_sasl_header(pn_transport_t *transport, char *bytes, size_t available);
+static ssize_t pn_input_read_sasl(pn_transport_t *transport, char *bytes, size_t available);
+static ssize_t pn_input_read_amqp_header(pn_transport_t *transport, char *bytes, size_t available);
+static ssize_t pn_input_read_amqp(pn_transport_t *transport, char *bytes, size_t available);
+static ssize_t pn_output_write_sasl_header(pn_transport_t *transport, char *bytes, size_t available);
+static ssize_t pn_output_write_sasl(pn_transport_t *transport, char *bytes, size_t available);
+static ssize_t pn_output_write_amqp_header(pn_transport_t *transport, char *bytes, size_t available);
+static ssize_t pn_output_write_amqp(pn_transport_t *transport, char *bytes, size_t available);
+
 void pn_transport_init(pn_transport_t *transport)
 {
-  pn_endpoint_init(&transport->endpoint, TRANSPORT, transport->connection);
-
+  transport->process_input = pn_input_read_amqp_header;
+  transport->process_output = pn_output_write_amqp_header;
+  transport->header_count = 0;
+  transport->sasl = NULL;
   transport->disp = pn_dispatcher(0, transport);
 
   pn_dispatcher_action(transport->disp, OPEN, "OPEN", pn_do_open);
@@ -675,7 +699,9 @@ void pn_transport_init(pn_transport_t *t
   transport->open_rcvd = false;
   transport->close_sent = false;
   transport->close_rcvd = false;
-  transport->error = 0;
+  transport->remote_container = NULL;
+  transport->remote_hostname = NULL;
+  transport->error = pn_error();
 
   transport->sessions = NULL;
   transport->session_capacity = 0;
@@ -716,25 +742,41 @@ void pn_map_channel(pn_transport_t *tran
   transport->channels[channel] = state;
 }
 
-pn_transport_t *pn_transport(pn_connection_t *conn)
+pn_transport_t *pn_transport()
 {
-  if (!conn) return NULL;
+  pn_transport_t *transport = malloc(sizeof(pn_transport_t));
+  if (!transport) return NULL;
 
-  if (conn->transport) {
-    return NULL;
-  } else {
-    conn->transport = malloc(sizeof(pn_transport_t));
-    if (!conn->transport) return NULL;
+  transport->connection = NULL;
+  pn_transport_init(transport);
+  return transport;
+}
+
+void pn_transport_sasl_init(pn_transport_t *transport)
+{
+  transport->process_input = pn_input_read_sasl_header;
+  transport->process_output = pn_output_write_sasl_header;
+}
 
-    conn->transport->connection = conn;
-    pn_transport_init(conn->transport);
-    return conn->transport;
+int pn_transport_bind(pn_transport_t *transport, pn_connection_t *connection)
+{
+  if (!transport) return PN_ARG_ERR;
+  if (transport->connection) return PN_STATE_ERR;
+  if (connection->transport) return PN_STATE_ERR;
+  transport->connection = connection;
+  connection->transport = transport;
+  if (transport->open_rcvd) {
+    PN_SET_REMOTE(connection->endpoint.state, PN_REMOTE_ACTIVE);
+    if (!pn_error_code(transport->error)) {
+      transport->disp->halt = false;
+    }
   }
+  return 0;
 }
 
 pn_error_t *pn_transport_error(pn_transport_t *transport)
 {
-  return transport->endpoint.error;
+  return transport->error;
 }
 
 void pn_link_init(pn_link_t *link, int type, pn_session_t *session, const char *name)
@@ -1047,13 +1089,13 @@ int pn_do_error(pn_transport_t *transpor
   // XXX: result
   vsnprintf(buf, 1024, fmt, ap);
   va_end(ap);
-  pn_error_set(transport->endpoint.error, PN_ERR, buf);
+  pn_error_set(transport->error, PN_ERR, buf);
   if (!transport->close_sent) {
     pn_post_close(transport);
     transport->close_sent = true;
   }
   transport->disp->halt = true;
-  fprintf(stderr, "ERROR %s %s\n", condition, pn_error_text(transport->endpoint.error));
+  fprintf(stderr, "ERROR %s %s\n", condition, pn_error_text(transport->error));
   return PN_ERR;
 }
 
@@ -1072,16 +1114,20 @@ int pn_do_open(pn_dispatcher_t *disp)
                          &hostname_q, &remote_hostname);
   if (err) return err;
   if (container_q) {
-    conn->remote_container = pn_bytes_strdup(remote_container);
+    transport->remote_container = pn_bytes_strdup(remote_container);
   } else {
-    conn->remote_container = NULL;
+    transport->remote_container = NULL;
   }
   if (hostname_q) {
-    conn->remote_hostname = pn_bytes_strdup(remote_hostname);
+    transport->remote_hostname = pn_bytes_strdup(remote_hostname);
+  } else {
+    transport->remote_hostname = NULL;
+  }
+  if (conn) {
+    PN_SET_REMOTE(conn->endpoint.state, PN_REMOTE_ACTIVE);
   } else {
-    conn->remote_hostname = NULL;
+    transport->disp->halt = true;
   }
-  PN_SET_REMOTE(conn->endpoint.state, PN_REMOTE_ACTIVE);
   transport->open_rcvd = true;
   return 0;
 }
@@ -1382,29 +1428,103 @@ ssize_t pn_input(pn_transport_t *transpo
 {
   if (!transport) return PN_ARG_ERR;
 
-  if (!available) {
-    pn_do_error(transport, "amqp:connection:framing-error", "connection aborted");
-    if (transport->disp->trace & (PN_TRACE_RAW | PN_TRACE_FRM))
-      fprintf(stderr, "    <- EOS\n");
-    return PN_ERR;
+  size_t consumed = 0;
+
+  while (true) {
+    ssize_t n = transport->process_input(transport, bytes + consumed, available - consumed);
+    if (n > 0) {
+      consumed += n;
+      if (consumed >= available) {
+        break;
+      }
+    } else if (n == 0) {
+      break;
+    } else {
+      if (transport->disp->trace & (PN_TRACE_RAW | PN_TRACE_FRM))
+        pn_dispatcher_trace(transport->disp, 0, "<- EOS\n");
+      return n;
+    }
+  }
+
+  return consumed;
+}
+
+#define SASL_HEADER ("AMQP\x03\x01\x00\x00")
+
+static ssize_t pn_input_read_header(pn_transport_t *transport, char *bytes, size_t available,
+                                    const char *header, size_t size, const char *protocol,
+                                    ssize_t (*next)(pn_transport_t *, char *, size_t))
+{
+  const char *point = header + transport->header_count;
+  int delta = pn_min(available, size - transport->header_count);
+  if (!available || memcmp(bytes, point, delta)) {
+    char quoted[1024];
+    pn_quote_data(quoted, 1024, bytes, available);
+    return pn_error_format(transport->error, PN_ERR,
+                           "%s header missmatch: '%s'", protocol, quoted);
+  } else {
+    transport->header_count += delta;
+    if (transport->header_count == size) {
+      transport->header_count = 0;
+      transport->process_input = next;
+
+      if (transport->disp->trace & PN_TRACE_FRM)
+        fprintf(stderr, "    <- %s\n", protocol);
+    }
+    return delta;
+  }
+}
+
+static ssize_t pn_input_read_sasl_header(pn_transport_t *transport, char *bytes, size_t available)
+{
+  return pn_input_read_header(transport, bytes, available, SASL_HEADER, 8, "SASL", pn_input_read_sasl);
+}
+
+static ssize_t pn_input_read_sasl(pn_transport_t *transport, char *bytes, size_t available)
+{
+  pn_sasl_t *sasl = transport->sasl;
+  ssize_t n = pn_sasl_input(sasl, bytes, available);
+  if (n == PN_EOS) {
+    transport->process_input = pn_input_read_amqp_header;
+    return transport->process_input(transport, bytes, available);
+  } else {
+    return n;
   }
+}
 
+#define AMQP_HEADER ("AMQP\x00\x01\x00\x00")
+
+static ssize_t pn_input_read_amqp_header(pn_transport_t *transport, char *bytes, size_t available)
+{
+  return pn_input_read_header(transport, bytes, available, AMQP_HEADER, 8,
+                              "AMQP", pn_input_read_amqp);
+}
+
+static ssize_t pn_input_read_amqp(pn_transport_t *transport, char *bytes, size_t available)
+{
   if (transport->close_rcvd) {
-    pn_do_error(transport, "amqp:connection:framing-error", "data after close");
-    if (transport->disp->trace & (PN_TRACE_RAW | PN_TRACE_FRM))
-      fprintf(stderr, "    <- EOS\n");
+    if (available > 0) {
+      pn_do_error(transport, "amqp:connection:framing-error", "data after close");
+      return PN_ERR;
+    } else {
+      return PN_EOS;
+    }
+  }
+
+  if (!available) {
+    pn_do_error(transport, "amqp:connection:framing-error", "connection aborted");
     return PN_ERR;
   }
 
+
   ssize_t n = pn_dispatcher_input(transport->disp, bytes, available);
-  if (n >= 0 && transport->close_rcvd) {
-    if (transport->disp->trace & (PN_TRACE_RAW | PN_TRACE_FRM))
-      fprintf(stderr, "    <- EOS\n");
+  if (n < 0) {
+    return pn_error_set(transport->error, n, "dispatch error");
+  } else if (transport->close_rcvd) {
     return PN_EOS;
-  } else if (n < 0) {
-    transport->error = n;
+  } else {
+    return n;
   }
-  return n;
 }
 
 bool pn_delivery_buffered(pn_delivery_t *delivery)
@@ -1801,18 +1921,60 @@ int pn_process(pn_transport_t *transport
   return 0;
 }
 
-ssize_t pn_output(pn_transport_t *transport, char *bytes, size_t size)
+static ssize_t pn_output_write_header(pn_transport_t *transport,
+                                      char *bytes, size_t size,
+                                      const char *header, size_t hdrsize,
+                                      const char *protocol,
+                                      ssize_t (*next)(pn_transport_t *, char *, size_t))
+{
+  if (transport->disp->trace & PN_TRACE_FRM)
+    fprintf(stderr, "    -> %s\n", protocol);
+  if (size >= hdrsize) {
+    memmove(bytes, header, hdrsize);
+    transport->process_output = next;
+    return hdrsize;
+  } else {
+    return pn_error_format(transport->error, PN_UNDERFLOW, "underflow writing %s header", protocol);
+  }
+}
+
+static ssize_t pn_output_write_sasl_header(pn_transport_t *transport, char *bytes, size_t size)
 {
-  if (!transport) return PN_ARG_ERR;
+  return pn_output_write_header(transport, bytes, size, SASL_HEADER, 8, "SASL",
+                                pn_output_write_sasl);
+}
+
+static ssize_t pn_output_write_sasl(pn_transport_t *transport, char *bytes, size_t size)
+{
+  pn_sasl_t *sasl = transport->sasl;
+  ssize_t n = pn_sasl_output(sasl, bytes, size);
+  if (n == PN_EOS) {
+    transport->process_output = pn_output_write_amqp_header;
+    return 0;
+  } else {
+    return n;
+  }
+}
+
+static ssize_t pn_output_write_amqp_header(pn_transport_t *transport, char *bytes, size_t size)
+{
+  return pn_output_write_header(transport, bytes, size, AMQP_HEADER, 8, "AMQP",
+                                pn_output_write_amqp);
+}
 
-  if (!transport->error)
-    transport->error = pn_process(transport);
+static ssize_t pn_output_write_amqp(pn_transport_t *transport, char *bytes, size_t size)
+{
+  if (!transport->connection) {
+    return 0;
+  }
+
+  if (!pn_error_code(transport->error)) {
+    pn_error_set(transport->error, pn_process(transport), "process error");
+  }
 
-  if (!transport->disp->available && (transport->close_sent || transport->error)) {
-    if (transport->disp->trace & (PN_TRACE_RAW | PN_TRACE_FRM))
-      fprintf(stderr, "    -> EOS\n");
-    if (transport->error)
-      return transport->error;
+  if (!transport->disp->available && (transport->close_sent || pn_error_code(transport->error))) {
+    if (pn_error_code(transport->error))
+      return pn_error_code(transport->error);
     else
       return PN_EOS;
   }
@@ -1820,8 +1982,40 @@ ssize_t pn_output(pn_transport_t *transp
   return pn_dispatcher_output(transport->disp, bytes, size);
 }
 
+ssize_t pn_output(pn_transport_t *transport, char *bytes, size_t size)
+{
+  if (!transport) return PN_ARG_ERR;
+
+  size_t total = 0;
+
+  while (size - total > 0) {
+    ssize_t n = transport->process_output(transport, bytes + total, size - total);
+    if (n > 0) {
+      total += n;
+    } else if (n == 0) {
+      break;
+    } else if (n == PN_EOS) {
+      if (total > 0) {
+        return total;
+      } else {
+        if (transport->disp->trace & (PN_TRACE_RAW | PN_TRACE_FRM))
+          pn_dispatcher_trace(transport->disp, 0, "-> EOS\n");
+        return PN_EOS;
+      }
+    } else {
+      if (transport->disp->trace & (PN_TRACE_RAW | PN_TRACE_FRM))
+        pn_dispatcher_trace(transport->disp, 0, "-> EOS (%zi) %s\n", n,
+                            pn_error_text(transport->error));
+      return n;
+    }
+  }
+
+  return total;
+}
+
 void pn_trace(pn_transport_t *transport, pn_trace_t trace)
 {
+  if (transport->sasl) pn_sasl_trace(transport->sasl, trace);
   transport->disp->trace = trace;
 }
 

Modified: qpid/proton/trunk/proton-c/src/error.c
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/error.c?rev=1381839&r1=1381838&r2=1381839&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/error.c (original)
+++ qpid/proton/trunk/proton-c/src/error.c Fri Sep  7 01:19:16 2012
@@ -62,8 +62,10 @@ void pn_error_clear(pn_error_t *error)
 int pn_error_set(pn_error_t *error, int code, const char *text)
 {
   pn_error_clear(error);
-  error->code = code;
-  error->text = pn_strdup(text);
+  if (code) {
+    error->code = code;
+    error->text = pn_strdup(text);
+  }
   return code;
 }
 

Modified: qpid/proton/trunk/proton-c/src/message/message.c
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/message/message.c?rev=1381839&r1=1381838&r2=1381839&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/message/message.c (original)
+++ qpid/proton/trunk/proton-c/src/message/message.c Fri Sep  7 01:19:16 2012
@@ -113,6 +113,7 @@ void pn_message_free(pn_message_t *msg)
     pn_buffer_free(msg->reply_to_group_id);
     pn_data_free(msg->data);
     pn_data_free(msg->body);
+    pn_parser_free(msg->parser);
     free(msg);
   }
 }

Modified: qpid/proton/trunk/proton-c/src/messenger.c
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/messenger.c?rev=1381839&r1=1381838&r2=1381839&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/messenger.c (original)
+++ qpid/proton/trunk/proton-c/src/messenger.c Fri Sep  7 01:19:16 2012
@@ -32,9 +32,6 @@ struct pn_messenger_t {
   char *name;
   int timeout;
   pn_driver_t *driver;
-  pn_connector_t *connectors[1024];
-  size_t size;
-  size_t listeners;
   int credit;
   uint64_t next_tag;
   pn_error_t *error;
@@ -61,8 +58,6 @@ pn_messenger_t *pn_messenger(const char 
     m->name = build_name(name);
     m->timeout = -1;
     m->driver = pn_driver();
-    m->size = 0;
-    m->listeners = 0;
     m->credit = 0;
     m->next_tag = 0;
     m->error = pn_error();
@@ -94,6 +89,7 @@ void pn_messenger_free(pn_messenger_t *m
     free(messenger->name);
     pn_driver_free(messenger->driver);
     pn_error_free(messenger->error);
+    free(messenger);
   }
 }
 
@@ -119,8 +115,8 @@ void pn_messenger_flow(pn_messenger_t *m
 {
   while (messenger->credit > 0) {
     int prev = messenger->credit;
-    for (int i = 0; i < messenger->size; i++) {
-      pn_connector_t *ctor = messenger->connectors[i];
+    pn_connector_t *ctor = pn_connector_head(messenger->driver);
+    while (ctor) {
       pn_connection_t *conn = pn_connector_connection(ctor);
 
       pn_link_t *link = pn_link_head(conn, PN_LOCAL_ACTIVE);
@@ -131,6 +127,8 @@ void pn_messenger_flow(pn_messenger_t *m
         }
         link = pn_link_next(link, PN_LOCAL_ACTIVE);
       }
+
+      ctor = pn_connector_next(ctor);
     }
     if (messenger->credit == prev) break;
   }
@@ -193,8 +191,10 @@ long int millis(struct timeval tv)
 
 int pn_messenger_tsync(pn_messenger_t *messenger, bool (*predicate)(pn_messenger_t *), int timeout)
 {
-  for (int i = 0; i < messenger->size; i++) {
-    pn_connector_process(messenger->connectors[i]);
+  pn_connector_t *ctor = pn_connector_head(messenger->driver);
+  while (ctor) {
+    pn_connector_process(ctor);
+    ctor = pn_connector_next(ctor);
   }
 
   struct timeval now;
@@ -219,7 +219,6 @@ int pn_messenger_tsync(pn_messenger_t *m
       pn_connection_t *conn = pn_connection();
       pn_connection_set_container(conn, messenger->name);
       pn_connector_set_connection(c, conn);
-      messenger->connectors[messenger->size++] = c;
     }
 
     pn_connector_t *c;
@@ -228,17 +227,10 @@ int pn_messenger_tsync(pn_messenger_t *m
       pn_connection_t *conn = pn_connector_connection(c);
       pn_messenger_endpoints(messenger, conn);
       if (pn_connector_closed(c)) {
-        for (int i = 0; i < messenger->size; i++) {
-          if (c == messenger->connectors[i]) {
-            memmove(messenger->connectors + i, messenger->connectors + i + 1, messenger->size - i - 1);
-            messenger->size--;
-            pn_connector_free(c);
-            pn_messenger_reclaim(messenger, conn);
-            pn_connection_free(conn);
-            pn_messenger_flow(messenger);
-            break;
-          }
-        }
+        pn_connector_free(c);
+        pn_messenger_reclaim(messenger, conn);
+        pn_connection_free(conn);
+        pn_messenger_flow(messenger);
       } else {
         pn_connector_process(c);
       }
@@ -266,15 +258,15 @@ int pn_messenger_start(pn_messenger_t *m
 
 bool pn_messenger_stopped(pn_messenger_t *messenger)
 {
-  return messenger->size == 0;
+  return pn_connector_head(messenger->driver) == NULL;
 }
 
 int pn_messenger_stop(pn_messenger_t *messenger)
 {
   if (!messenger) return PN_ARG_ERR;
 
-  for (int i = 0; i < messenger->size; i++) {
-    pn_connector_t *ctor = messenger->connectors[i];
+  pn_connector_t *ctor = pn_connector_head(messenger->driver);
+  while (ctor) {
     pn_connection_t *conn = pn_connector_connection(ctor);
     pn_link_t *link = pn_link_head(conn, PN_LOCAL_ACTIVE);
     while (link) {
@@ -282,6 +274,7 @@ int pn_messenger_stop(pn_messenger_t *me
       link = pn_link_next(link, PN_LOCAL_ACTIVE);
     }
     pn_connection_close(conn);
+    ctor = pn_connector_next(ctor);
   }
 
   pn_listener_t *l = pn_listener_head(messenger->driver);
@@ -334,17 +327,18 @@ pn_connection_t *pn_messenger_domain(pn_
   char *port = "5672";
   parse_url(buf, &user, &pass, &host, &port);
 
-  for (int i = 0; i < messenger->size; i++) {
-    pn_connection_t *connection = pn_connector_connection(messenger->connectors[i]);
+  pn_connector_t *ctor = pn_connector_head(messenger->driver);
+  while (ctor) {
+    pn_connection_t *connection = pn_connector_connection(ctor);
     const char *container = pn_connection_remote_container(connection);
     const char *hostname = pn_connection_hostname(connection);
     if (pn_streq(container, domain) || pn_streq(hostname, domain))
       return connection;
+    ctor = pn_connector_next(ctor);
   }
 
   pn_connector_t *connector = pn_connector(messenger->driver, host, port, NULL);
   if (!connector) return NULL;
-  messenger->connectors[messenger->size++] = connector;
   pn_sasl_t *sasl = pn_connector_sasl(connector);
   if (user) {
     pn_sasl_plain(sasl, user, pass);
@@ -423,11 +417,7 @@ pn_listener_t *pn_messenger_isource(pn_m
   char *port = "5672";
   parse_url(domain + 1, &user, &pass, &host, &port);
 
-  pn_listener_t *listener = pn_listener(messenger->driver, host, port, NULL);
-  if (listener) {
-    messenger->listeners++;
-  }
-  return listener;
+  return pn_listener(messenger->driver, host, port, NULL);
 }
 
 int pn_messenger_subscribe(pn_messenger_t *messenger, const char *source)
@@ -516,8 +506,8 @@ int pn_messenger_put(pn_messenger_t *mes
 
 bool pn_messenger_sent(pn_messenger_t *messenger)
 {
-  for (int i = 0; i < messenger->size; i++) {
-    pn_connector_t *ctor = messenger->connectors[i];
+  pn_connector_t *ctor = pn_connector_head(messenger->driver);
+  while (ctor) {
     pn_connection_t *conn = pn_connector_connection(ctor);
 
     pn_link_t *link = pn_link_head(conn, PN_LOCAL_ACTIVE);
@@ -535,6 +525,8 @@ bool pn_messenger_sent(pn_messenger_t *m
       }
       link = pn_link_next(link, PN_LOCAL_ACTIVE);
     }
+
+    ctor = pn_connector_next(ctor);
   }
 
   return true;
@@ -542,8 +534,8 @@ bool pn_messenger_sent(pn_messenger_t *m
 
 bool pn_messenger_rcvd(pn_messenger_t *messenger)
 {
-  for (int i = 0; i < messenger->size; i++) {
-    pn_connector_t *ctor = messenger->connectors[i];
+  pn_connector_t *ctor = pn_connector_head(messenger->driver);
+  while (ctor) {
     pn_connection_t *conn = pn_connector_connection(ctor);
 
     pn_delivery_t *d = pn_work_head(conn);
@@ -553,6 +545,7 @@ bool pn_messenger_rcvd(pn_messenger_t *m
       }
       d = pn_work_next(d);
     }
+    ctor = pn_connector_next(ctor);
   }
 
   return false;
@@ -566,7 +559,7 @@ int pn_messenger_send(pn_messenger_t *me
 int pn_messenger_recv(pn_messenger_t *messenger, int n)
 {
   if (!messenger) return PN_ARG_ERR;
-  if (!messenger->listeners && !messenger->size)
+  if (!pn_listener_head(messenger->driver) && !pn_connector_head(messenger->driver))
     return pn_error_format(messenger->error, PN_STATE_ERR, "no valid sources");
   messenger->credit += n;
   pn_messenger_flow(messenger);
@@ -577,8 +570,8 @@ int pn_messenger_get(pn_messenger_t *mes
 {
   if (!messenger) return PN_ARG_ERR;
 
-  for (int i = 0; i < messenger->size; i++) {
-    pn_connector_t *ctor = messenger->connectors[i];
+  pn_connector_t *ctor = pn_connector_head(messenger->driver);
+  while (ctor) {
     pn_connection_t *conn = pn_connector_connection(ctor);
 
     pn_delivery_t *d = pn_work_head(conn);
@@ -604,6 +597,8 @@ int pn_messenger_get(pn_messenger_t *mes
       }
       d = pn_work_next(d);
     }
+
+    ctor = pn_connector_next(ctor);
   }
 
   // XXX: need to drain credit before returning EOS
@@ -617,8 +612,8 @@ int pn_messenger_queued(pn_messenger_t *
 
   int result = 0;
 
-  for (int i = 0; i < messenger->size; i++) {
-    pn_connector_t *ctor = messenger->connectors[i];
+  pn_connector_t *ctor = pn_connector_head(messenger->driver);
+  while (ctor) {
     pn_connection_t *conn = pn_connector_connection(ctor);
 
     pn_link_t *link = pn_link_head(conn, PN_LOCAL_ACTIVE);
@@ -632,6 +627,7 @@ int pn_messenger_queued(pn_messenger_t *
       }
       link = pn_link_next(link, PN_LOCAL_ACTIVE);
     }
+    ctor = pn_connector_next(ctor);
   }
 
   return result;

Added: qpid/proton/trunk/proton-c/src/sasl/sasl-internal.h
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/sasl/sasl-internal.h?rev=1381839&view=auto
==============================================================================
--- qpid/proton/trunk/proton-c/src/sasl/sasl-internal.h (added)
+++ qpid/proton/trunk/proton-c/src/sasl/sasl-internal.h Fri Sep  7 01:19:16 2012
@@ -0,0 +1,39 @@
+#ifndef PROTON_SASL_INTERNAL_H
+#define PROTON_SASL_INTERNAL_H 1
+
+#include <proton/sasl.h>
+
+/** Decode input data bytes into SASL frames, and process them.
+ *
+ * This function is called by the driver layer to pass data received
+ * from the remote peer into the SASL layer.
+ *
+ * @param[in] sasl the SASL layer.
+ * @param[in] bytes buffer of frames to process
+ * @param[in] available number of octets of data in 'bytes'
+ * @return the number of bytes consumed, or error code if < 0
+ */
+ssize_t pn_sasl_input(pn_sasl_t *sasl, char *bytes, size_t available);
+
+/** Gather output frames from the layer.
+ *
+ * This function is used by the driver to poll the SASL layer for data
+ * that will be sent to the remote peer.
+ *
+ * @param[in] sasl The SASL layer.
+ * @param[out] bytes to be filled with encoded frames.
+ * @param[in] size space available in bytes array.
+ * @return the number of octets written to bytes, or error code if < 0
+ */
+ssize_t pn_sasl_output(pn_sasl_t *sasl, char *bytes, size_t size);
+
+void pn_sasl_trace(pn_sasl_t *sasl, pn_trace_t trace);
+
+/** Destructor for the given SASL layer.
+ *
+ * @param[in] sasl the SASL object to free. No longer valid on
+ *                 return.
+ */
+void pn_sasl_free(pn_sasl_t *sasl);
+
+#endif /* sasl-internal.h */

Modified: qpid/proton/trunk/proton-c/src/sasl/sasl.c
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/sasl/sasl.c?rev=1381839&r1=1381838&r2=1381839&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/sasl/sasl.c (original)
+++ qpid/proton/trunk/proton-c/src/sasl/sasl.c Fri Sep  7 01:19:16 2012
@@ -27,6 +27,7 @@
 #include <proton/sasl.h>
 #include "protocol.h"
 #include "../dispatcher/dispatcher.h"
+#include "../engine/engine-internal.h"
 #include "../util.h"
 
 #define SCRATCH (1024)
@@ -53,30 +54,36 @@ int pn_do_challenge(pn_dispatcher_t *dis
 int pn_do_response(pn_dispatcher_t *disp);
 int pn_do_outcome(pn_dispatcher_t *disp);
 
-pn_sasl_t *pn_sasl()
+pn_sasl_t *pn_sasl(pn_transport_t *transport)
 {
-  pn_sasl_t *sasl = malloc(sizeof(pn_sasl_t));
-  sasl->disp = pn_dispatcher(1, sasl);
+  if (!transport->sasl) {
+    pn_sasl_t *sasl = malloc(sizeof(pn_sasl_t));
+    sasl->disp = pn_dispatcher(1, sasl);
+
+    pn_dispatcher_action(sasl->disp, SASL_INIT, "SASL-INIT", pn_do_init);
+    pn_dispatcher_action(sasl->disp, SASL_MECHANISMS, "SASL-MECHANISMS", pn_do_mechanisms);
+    pn_dispatcher_action(sasl->disp, SASL_CHALLENGE, "SASL-CHALLENGE", pn_do_challenge);
+    pn_dispatcher_action(sasl->disp, SASL_RESPONSE, "SASL-RESPONSE", pn_do_response);
+    pn_dispatcher_action(sasl->disp, SASL_OUTCOME, "SASL-OUTCOME", pn_do_outcome);
 
-  pn_dispatcher_action(sasl->disp, SASL_INIT, "SASL-INIT", pn_do_init);
-  pn_dispatcher_action(sasl->disp, SASL_MECHANISMS, "SASL-MECHANISMS", pn_do_mechanisms);
-  pn_dispatcher_action(sasl->disp, SASL_CHALLENGE, "SASL-CHALLENGE", pn_do_challenge);
-  pn_dispatcher_action(sasl->disp, SASL_RESPONSE, "SASL-RESPONSE", pn_do_response);
-  pn_dispatcher_action(sasl->disp, SASL_OUTCOME, "SASL-OUTCOME", pn_do_outcome);
-
-  sasl->client = false;
-  sasl->configured = false;
-  sasl->mechanisms = NULL;
-  sasl->remote_mechanisms = NULL;
-  sasl->send_data = (pn_bytes_t) {0, NULL};
-  sasl->recv_data = (pn_bytes_t) {0, NULL};
-  sasl->outcome = PN_SASL_NONE;
-  sasl->sent_init = false;
-  sasl->rcvd_init = false;
-  sasl->sent_done = false;
-  sasl->rcvd_done = false;
+    sasl->client = false;
+    sasl->configured = false;
+    sasl->mechanisms = NULL;
+    sasl->remote_mechanisms = NULL;
+    sasl->send_data = (pn_bytes_t) {0, NULL};
+    sasl->recv_data = (pn_bytes_t) {0, NULL};
+    sasl->outcome = PN_SASL_NONE;
+    sasl->sent_init = false;
+    sasl->rcvd_init = false;
+    sasl->sent_done = false;
+    sasl->rcvd_done = false;
+
+    transport->sasl = sasl;
+
+    pn_transport_sasl_init(transport);
+  }
 
-  return sasl;
+  return transport->sasl;
 }
 
 pn_sasl_state_t pn_sasl_state(pn_sasl_t *sasl)
@@ -201,12 +208,14 @@ void pn_sasl_trace(pn_sasl_t *sasl, pn_t
 
 void pn_sasl_free(pn_sasl_t *sasl)
 {
-  free(sasl->mechanisms);
-  free(sasl->remote_mechanisms);
-  free(sasl->send_data.start);
-  free(sasl->recv_data.start);
-  pn_dispatcher_free(sasl->disp);
-  free(sasl);
+  if (sasl) {
+    free(sasl->mechanisms);
+    free(sasl->remote_mechanisms);
+    free(sasl->send_data.start);
+    free(sasl->recv_data.start);
+    pn_dispatcher_free(sasl->disp);
+    free(sasl);
+  }
 }
 
 void pn_client_init(pn_sasl_t *sasl)

Modified: qpid/proton/trunk/proton-c/src/scanner.c
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/scanner.c?rev=1381839&r1=1381838&r2=1381839&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/scanner.c (original)
+++ qpid/proton/trunk/proton-c/src/scanner.c Fri Sep  7 01:19:16 2012
@@ -81,7 +81,10 @@ pn_scanner_t *pn_scanner()
 
 void pn_scanner_free(pn_scanner_t *scanner)
 {
-  free(scanner);
+  if (scanner) {
+    pn_error_free(scanner->error);
+    free(scanner);
+  }
 }
 
 pn_token_t pn_scanner_token(pn_scanner_t *scanner)

Modified: qpid/proton/trunk/proton-c/src/util.h
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/util.h?rev=1381839&r1=1381838&r2=1381839&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/util.h (original)
+++ qpid/proton/trunk/proton-c/src/util.h Fri Sep  7 01:19:16 2012
@@ -92,4 +92,7 @@ bool pn_env_bool(const char *name);
 char *pn_strdup(const char *src);
 char *pn_strndup(const char *src, size_t n);
 
+#define pn_min(X,Y) ((X) > (Y) ? (Y) : (X))
+#define pn_max(X,Y) ((X) < (Y) ? (Y) : (X))
+
 #endif /* util.h */

Modified: qpid/proton/trunk/tests/proton_tests/__init__.py
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/tests/proton_tests/__init__.py?rev=1381839&r1=1381838&r2=1381839&view=diff
==============================================================================
--- qpid/proton/trunk/tests/proton_tests/__init__.py (original)
+++ qpid/proton/trunk/tests/proton_tests/__init__.py Fri Sep  7 01:19:16 2012
@@ -21,3 +21,4 @@ import proton_tests.engine
 import proton_tests.message
 import proton_tests.messenger
 import proton_tests.sasl
+import proton_tests.transport

Modified: qpid/proton/trunk/tests/proton_tests/engine.py
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/tests/proton_tests/engine.py?rev=1381839&r1=1381838&r2=1381839&view=diff
==============================================================================
--- qpid/proton/trunk/tests/proton_tests/engine.py (original)
+++ qpid/proton/trunk/tests/proton_tests/engine.py Fri Sep  7 01:19:16 2012
@@ -55,8 +55,10 @@ class Test(common.Test):
   def connection(self):
     c1 = pn_connection()
     c2 = pn_connection()
-    t1 = pn_transport(c1)
-    t2 = pn_transport(c2)
+    t1 = pn_transport()
+    pn_transport_bind(t1, c1)
+    t2 = pn_transport()
+    pn_transport_bind(t2, c2)
     self._wires.append((c1, t1, c2, t2))
     trc = os.environ.get("PN_TRACE_FRM")
     if trc and trc.lower() in ("1", "2", "yes", "true"):
@@ -82,7 +84,9 @@ class Test(common.Test):
   def cleanup(self):
     for c1, t1, c2, t2 in self._wires:
       pn_connection_free(c1)
+      pn_transport_free(t1)
       pn_connection_free(c2)
+      pn_transport_free(t2)
 
   def pump(self):
     for c1, t1, c2, t2 in self._wires:

Modified: qpid/proton/trunk/tests/proton_tests/message.py
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/tests/proton_tests/message.py?rev=1381839&r1=1381838&r2=1381839&view=diff
==============================================================================
--- qpid/proton/trunk/tests/proton_tests/message.py (original)
+++ qpid/proton/trunk/tests/proton_tests/message.py Fri Sep  7 01:19:16 2012
@@ -127,6 +127,8 @@ class CodecTest(Test):
     assert not cd, cd
     assert saved == body, (body, saved)
 
+    pn_message_free(msg2)
+
 
 class LoadSaveTest(Test):
 

Modified: qpid/proton/trunk/tests/proton_tests/messenger.py
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/tests/proton_tests/messenger.py?rev=1381839&r1=1381838&r2=1381839&view=diff
==============================================================================
--- qpid/proton/trunk/tests/proton_tests/messenger.py (original)
+++ qpid/proton/trunk/tests/proton_tests/messenger.py Fri Sep  7 01:19:16 2012
@@ -48,6 +48,7 @@ class Test(common.Test):
     pn_messenger_free(self.server)
     self.client = None
     self.server = None
+    pn_message_free(msg)
 
 class MessengerTest(Test):
 
@@ -64,6 +65,7 @@ class MessengerTest(Test):
             pn_message_set_address(msg, reply_to)
             pn_messenger_put(self.server, msg)
     pn_messenger_stop(self.server)
+    pn_message_free(msg)
 
   def testSendReceive(self):
     msg = pn_message()
@@ -91,4 +93,6 @@ class MessengerTest(Test):
     msg = pn_message()
     pn_message_set_address(msg, "totally-bogus-address")
     assert pn_messenger_put(self.client, msg) == PN_ERR
-    assert "unable to send to address: totally-bogus-address (getaddrinfo:" in pn_messenger_error(self.client)
+    err = pn_messenger_error(self.client)
+    assert "unable to send to address: totally-bogus-address (getaddrinfo:" in err, err
+    pn_message_free(msg)

Modified: qpid/proton/trunk/tests/proton_tests/sasl.py
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/tests/proton_tests/sasl.py?rev=1381839&r1=1381838&r2=1381839&view=diff
==============================================================================
--- qpid/proton/trunk/tests/proton_tests/sasl.py (original)
+++ qpid/proton/trunk/tests/proton_tests/sasl.py Fri Sep  7 01:19:16 2012
@@ -26,36 +26,34 @@ class Test(common.Test):
 class SaslTest(Test):
 
   def testPipelined(self):
-    cli = pn_sasl()
-    pn_sasl_mechanisms(cli, "ANONYMOUS")
-    pn_sasl_client(cli)
-
-    srv = pn_sasl()
-    pn_sasl_mechanisms(srv, "ANONYMOUS")
-    pn_sasl_server(srv)
-    pn_sasl_done(srv, PN_SASL_OK)
+    cli = pn_transport()
+    cli_auth = pn_sasl(cli)
+    pn_sasl_mechanisms(cli_auth, "ANONYMOUS")
+    pn_sasl_client(cli_auth)
+
+    assert pn_sasl_outcome(cli_auth) == PN_SASL_NONE
+
+    srv = pn_transport()
+    srv_auth = pn_sasl(srv)
+    pn_sasl_mechanisms(srv_auth, "ANONYMOUS")
+    pn_sasl_server(srv_auth)
+    pn_sasl_done(srv_auth, PN_SASL_OK)
 
-    cli_code, cli_out = pn_sasl_output(cli, 1024)
-    srv_code, srv_out = pn_sasl_output(srv, 1024)
+    cli_code, cli_out = pn_output(cli, 1024)
+    srv_code, srv_out = pn_output(srv, 1024)
 
     assert cli_code > 0, cli_code
     assert srv_code > 0, srv_code
 
-    dummy_header = "AMQPxxxx"
+    n = pn_input(srv, cli_out)
+    assert n == len(cli_out), "(%s) %s" % (n, pn_error_text(pn_transport_error(srv)))
 
-    srv_out += dummy_header
-    cli_out += dummy_header
+    assert pn_sasl_outcome(cli_auth) == PN_SASL_NONE
 
-    n = pn_sasl_input(srv, cli_out)
-    assert n > 0, n
-    cli_out = cli_out[n:]
-    assert cli_out == dummy_header
-    n = pn_sasl_input(srv, cli_out)
-    assert n == PN_EOS, n
-
-    n = pn_sasl_input(cli, srv_out)
-    assert n > 0, n
-    srv_out = srv_out[n:]
-    assert srv_out == dummy_header
-    n = pn_sasl_input(cli, srv_out)
-    assert n == PN_EOS, n
+    n = pn_input(cli, srv_out)
+    assert n == len(srv_out), n
+
+    assert pn_sasl_outcome(cli_auth) == PN_SASL_OK
+
+    pn_transport_free(cli)
+    pn_transport_free(srv)

Added: qpid/proton/trunk/tests/proton_tests/transport.py
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/tests/proton_tests/transport.py?rev=1381839&view=auto
==============================================================================
--- qpid/proton/trunk/tests/proton_tests/transport.py (added)
+++ qpid/proton/trunk/tests/proton_tests/transport.py Fri Sep  7 01:19:16 2012
@@ -0,0 +1,105 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+# 
+#   http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import os, common, xproton
+from xproton import *
+
+class Test(common.Test):
+  pass
+
+class TransportTest(Test):
+
+  def setup(self):
+    self.transport = pn_transport()
+
+  def teardown(self):
+    pn_transport_free(self.transport)
+    self.transport = None
+
+  def testEOS(self):
+    n = pn_input(self.transport, "")
+    assert n == PN_ERR, pn_error_text(pn_transport_error(self.transport))
+
+  def testPartial(self):
+    n = pn_input(self.transport, "AMQ")
+    assert n == 3, n
+    n = pn_input(self.transport, "")
+    assert n == PN_ERR, n
+
+  def testGarbage(self):
+    n = pn_input(self.transport, "GARBAGE_")
+    assert n == PN_ERR, pn_error_text(pn_transport_error(self.transport))
+    n = pn_input(self.transport, "")
+    assert n == PN_ERR, n
+
+  def testSmallGarbage(self):
+    n = pn_input(self.transport, "XXX")
+    assert n == PN_ERR, pn_error_text(pn_transport_error(self.transport))
+    n = pn_input(self.transport, "")
+    assert n == PN_ERR, n
+
+  def testBigGarbage(self):
+    n = pn_input(self.transport, "GARBAGE_XXX")
+    assert n == PN_ERR, pn_error_text(pn_transport_error(self.transport))
+    n = pn_input(self.transport, "")
+    assert n == PN_ERR, n
+
+  def testHeader(self):
+    n = pn_input(self.transport, "AMQP\x00\x01\x00\x00")
+    assert n == 8, n
+    n = pn_input(self.transport, "")
+    assert n == PN_ERR, n
+
+  def testOutput(self):
+    n, out = pn_output(self.transport, 1024)
+    assert n == len(out)
+
+  def testBindAfterOpen(self):
+    conn = pn_connection()
+    ssn = pn_session(conn)
+    pn_connection_open(conn)
+    pn_session_open(ssn)
+    pn_connection_set_container(conn, "test-container")
+    pn_connection_set_hostname(conn, "test-hostname")
+    trn = pn_transport()
+    pn_transport_bind(trn, conn)
+    n, out = pn_output(trn, 1024)
+    assert n == len(out), n
+    assert "test-container" in out, repr(out)
+    n = pn_input(self.transport, out)
+    assert n > 0 and n < len(out)
+    out = out[n:]
+
+    n = pn_input(self.transport, out)
+    assert n == 0
+
+    c = pn_connection()
+    assert pn_connection_remote_container(c) == None
+    assert pn_connection_remote_hostname(c) == None
+    pn_transport_bind(self.transport, c)
+    assert pn_connection_remote_container(c) == "test-container"
+    assert pn_connection_remote_hostname(c) == "test-hostname"
+    assert pn_session_head(c, 0) == None
+    n = pn_input(self.transport, out)
+    assert n == len(out), (n, out)
+    assert pn_session_head(c, 0) != None
+
+    pn_transport_free(trn)
+    pn_connection_free(conn)
+    pn_connection_free(c)



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org