You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kg...@apache.org on 2012/09/10 16:56:23 UTC

svn commit: r1382918 [1/2] - in /qpid/proton/branches/driver_abstraction: ./ examples/broker/ proton-c/ proton-c/bindings/ proton-c/bindings/php/ proton-c/bindings/python/ proton-c/bindings/ruby/ proton-c/include/proton/ proton-c/src/ proton-c/src/disp...

Author: kgiusti
Date: Mon Sep 10 14:56:20 2012
New Revision: 1382918

URL: http://svn.apache.org/viewvc?rev=1382918&view=rev
Log:
checkpoint: merge latest from trunk

Added:
    qpid/proton/branches/driver_abstraction/proton-c/include/proton/cproton.i
      - copied unchanged from r1382866, qpid/proton/trunk/proton-c/include/proton/cproton.i
    qpid/proton/branches/driver_abstraction/proton-c/src/sasl/sasl-internal.h
      - copied unchanged from r1382866, qpid/proton/trunk/proton-c/src/sasl/sasl-internal.h
    qpid/proton/branches/driver_abstraction/proton-j/pom.xml
      - copied unchanged from r1382866, qpid/proton/trunk/proton-j/pom.xml
    qpid/proton/branches/driver_abstraction/proton-j/src/main/
      - copied from r1382866, qpid/proton/trunk/proton-j/src/main/
    qpid/proton/branches/driver_abstraction/tests/proton_tests/transport.py
      - copied unchanged from r1382866, qpid/proton/trunk/tests/proton_tests/transport.py
Removed:
    qpid/proton/branches/driver_abstraction/proton-c/bindings/cproton.i
    qpid/proton/branches/driver_abstraction/proton-j/src/org/
Modified:
    qpid/proton/branches/driver_abstraction/   (props changed)
    qpid/proton/branches/driver_abstraction/examples/broker/broker
    qpid/proton/branches/driver_abstraction/proton-c/CMakeLists.txt
    qpid/proton/branches/driver_abstraction/proton-c/bindings/CMakeLists.txt
    qpid/proton/branches/driver_abstraction/proton-c/bindings/php/php.i
    qpid/proton/branches/driver_abstraction/proton-c/bindings/python/python.i
    qpid/proton/branches/driver_abstraction/proton-c/bindings/ruby/ruby.i
    qpid/proton/branches/driver_abstraction/proton-c/include/proton/buffer.h
    qpid/proton/branches/driver_abstraction/proton-c/include/proton/codec.h
    qpid/proton/branches/driver_abstraction/proton-c/include/proton/driver.h
    qpid/proton/branches/driver_abstraction/proton-c/include/proton/engine.h
    qpid/proton/branches/driver_abstraction/proton-c/include/proton/error.h
    qpid/proton/branches/driver_abstraction/proton-c/include/proton/framing.h
    qpid/proton/branches/driver_abstraction/proton-c/include/proton/message.h
    qpid/proton/branches/driver_abstraction/proton-c/include/proton/messenger.h
    qpid/proton/branches/driver_abstraction/proton-c/include/proton/parser.h
    qpid/proton/branches/driver_abstraction/proton-c/include/proton/sasl.h
    qpid/proton/branches/driver_abstraction/proton-c/include/proton/scanner.h
    qpid/proton/branches/driver_abstraction/proton-c/include/proton/types.h
    qpid/proton/branches/driver_abstraction/proton-c/include/proton/util.h
    qpid/proton/branches/driver_abstraction/proton-c/src/buffer.c
    qpid/proton/branches/driver_abstraction/proton-c/src/dispatcher/dispatcher.c
    qpid/proton/branches/driver_abstraction/proton-c/src/dispatcher/dispatcher.h
    qpid/proton/branches/driver_abstraction/proton-c/src/driver.c
    qpid/proton/branches/driver_abstraction/proton-c/src/driver_impl.h
    qpid/proton/branches/driver_abstraction/proton-c/src/engine/engine-internal.h
    qpid/proton/branches/driver_abstraction/proton-c/src/engine/engine.c
    qpid/proton/branches/driver_abstraction/proton-c/src/error.c
    qpid/proton/branches/driver_abstraction/proton-c/src/message/message.c
    qpid/proton/branches/driver_abstraction/proton-c/src/messenger.c
    qpid/proton/branches/driver_abstraction/proton-c/src/sasl/sasl.c
    qpid/proton/branches/driver_abstraction/proton-c/src/scanner.c
    qpid/proton/branches/driver_abstraction/proton-c/src/util.h
    qpid/proton/branches/driver_abstraction/tests/proton_tests/__init__.py
    qpid/proton/branches/driver_abstraction/tests/proton_tests/engine.py
    qpid/proton/branches/driver_abstraction/tests/proton_tests/message.py
    qpid/proton/branches/driver_abstraction/tests/proton_tests/messenger.py
    qpid/proton/branches/driver_abstraction/tests/proton_tests/sasl.py

Propchange: qpid/proton/branches/driver_abstraction/
------------------------------------------------------------------------------
  Merged /qpid/proton/trunk:r1371803-1382866

Modified: qpid/proton/branches/driver_abstraction/examples/broker/broker
URL: http://svn.apache.org/viewvc/qpid/proton/branches/driver_abstraction/examples/broker/broker?rev=1382918&r1=1382917&r2=1382918&view=diff
==============================================================================
--- qpid/proton/branches/driver_abstraction/examples/broker/broker (original)
+++ qpid/proton/branches/driver_abstraction/examples/broker/broker Mon Sep 10 14:56:20 2012
@@ -128,5 +128,13 @@ try:
         else:
           pn_connector_process(c)
 
+      c = pn_connector_head(driver)
+      while c:
+        handler = pn_connector_context(c)
+        handler(c)
+        pn_connector_process(c)
+        window.redraw()
+        c = pn_connector_next(c)
+
 except KeyboardInterrupt:
   pass

Modified: qpid/proton/branches/driver_abstraction/proton-c/CMakeLists.txt
URL: http://svn.apache.org/viewvc/qpid/proton/branches/driver_abstraction/proton-c/CMakeLists.txt?rev=1382918&r1=1382917&r2=1382918&view=diff
==============================================================================
--- qpid/proton/branches/driver_abstraction/proton-c/CMakeLists.txt (original)
+++ qpid/proton/branches/driver_abstraction/proton-c/CMakeLists.txt Mon Sep 10 14:56:20 2012
@@ -4,7 +4,6 @@ project (Proton C)
 
 set (PN_VERSION_MAJOR 0)
 set (PN_VERSION_MINOR 1)
-set (LINK_DEPS uuid)
 
 include(CheckIncludeFile)
 
@@ -48,6 +47,7 @@ add_custom_command (
   DEPENDS ${PROJECT_SOURCE_DIR}/src/protocol.h.py
 )
 
+# Configure the poller
 if (POLLER STREQUAL poll)
   set (pn_driver_impl
        src/drivers/poll.c
@@ -66,6 +66,22 @@ else (SSL_IMPL STREQUAL openssl)
   set (pn_driver_ssl_impl src/drivers/ssl_stub.c)
 endif (SSL_IMPL STREQUAL openssl)
 
+find_package(SWIG)
+if (SWIG_FOUND)
+  add_subdirectory(bindings)
+endif (SWIG_FOUND)
+
+add_subdirectory(docs/api)
+
+# Should really be finding the uuid library appropriate for the platform
+# in lieu of doing this set the library name directly.
+set (UUID_LIB uuid)
+
+set (qpid-proton-platform
+  src/driver.c
+  ${pn_driver_impl}
+  ${pn_driver_ssl_impl}
+)
 
 add_library (
   qpid-proton SHARED
@@ -86,39 +102,33 @@ add_library (
   src/message/message.c
   src/sasl/sasl.c
 
-  ${PROJECT_BINARY_DIR}/encodings.h
-  ${PROJECT_BINARY_DIR}/protocol.h
-)
-
-add_library (
-  qpid-proton-posix SHARED
-
-  src/driver.c
   src/messenger.c
 
-  ${pn_driver_impl}
-  ${pn_driver_ssl_impl}
-)
+  ${qpid-proton-platform}
 
-find_package(SWIG)
-if (SWIG_FOUND)
-  add_subdirectory(bindings)
-endif (SWIG_FOUND)
+  ${PROJECT_BINARY_DIR}/encodings.h
+  ${PROJECT_BINARY_DIR}/protocol.h
+)
+target_link_libraries (qpid-proton ${UUID_LIB})
 
 add_executable (proton src/proton.c)
-target_link_libraries (proton qpid-proton qpid-proton-posix ${LINK_DEPS})
+target_link_libraries (proton qpid-proton)
 
 add_executable (proton-dump src/proton-dump.c)
 target_link_libraries (proton-dump qpid-proton)
 
-add_subdirectory(docs/api)
-
 set_target_properties (
-  qpid-proton qpid-proton-posix proton proton-dump
+  qpid-proton proton proton-dump
   PROPERTIES
   COMPILE_FLAGS "-Wall -Werror -pedantic-errors -std=c99 -g -Iinclude -fPIC"
 )
 
-install (TARGETS proton proton-dump qpid-proton qpid-proton-posix
+# Install executables and libraries
+install (TARGETS proton proton-dump qpid-proton
          RUNTIME DESTINATION bin
          LIBRARY DESTINATION lib)
+
+# Install header files
+file(GLOB headers "include/proton/*.[hi]")
+install (FILES ${headers} DESTINATION include/proton)
+

Modified: qpid/proton/branches/driver_abstraction/proton-c/bindings/CMakeLists.txt
URL: http://svn.apache.org/viewvc/qpid/proton/branches/driver_abstraction/proton-c/bindings/CMakeLists.txt?rev=1382918&r1=1382917&r2=1382918&view=diff
==============================================================================
--- qpid/proton/branches/driver_abstraction/proton-c/bindings/CMakeLists.txt (original)
+++ qpid/proton/branches/driver_abstraction/proton-c/bindings/CMakeLists.txt Mon Sep 10 14:56:20 2012
@@ -19,7 +19,7 @@
 
 include(UseSWIG)
 
-set (BINDING_DEPS qpid-proton qpid-proton-posix ${LINK_DEPS})
+set (BINDING_DEPS qpid-proton)
 
 # Build wrapper for Python:
 find_package (PythonLibs)

Modified: qpid/proton/branches/driver_abstraction/proton-c/bindings/php/php.i
URL: http://svn.apache.org/viewvc/qpid/proton/branches/driver_abstraction/proton-c/bindings/php/php.i?rev=1382918&r1=1382917&r2=1382918&view=diff
==============================================================================
--- qpid/proton/branches/driver_abstraction/proton-c/bindings/php/php.i (original)
+++ qpid/proton/branches/driver_abstraction/proton-c/bindings/php/php.i Mon Sep 10 14:56:20 2012
@@ -67,8 +67,6 @@ ssize_t pn_input(pn_transport_t *transpo
 
 ssize_t pn_sasl_send(pn_sasl_t *sasl, char *STRING, size_t LENGTH);
 %ignore pn_sasl_send;
-ssize_t pn_sasl_input(pn_sasl_t *sasl, char *STRING, size_t LENGTH);
-%ignore pn_sasl_input;
 
 
 // Use the OUTPUT_BUFFER,OUTPUT_LEN typemap to allow these functions to return
@@ -110,18 +108,6 @@ ssize_t pn_sasl_input(pn_sasl_t *sasl, c
 %}
 %ignore pn_output;
 
-%rename(pn_sasl_output) wrap_pn_output;
-// in PHP:   array = pn_sasl_output(sasl, MAXLEN);
-//           array[0] = size || error code
-//           array[1] = native string containing binary data
-%inline %{
-    void wrap_pn_sasl_output(pn_sasl_t *sasl, size_t maxCount, char **OUTPUT_BUFFER, ssize_t *OUTPUT_LEN) {
-        *OUTPUT_BUFFER = emalloc(sizeof(char) * maxCount);
-        *OUTPUT_LEN = pn_sasl_output(sasl, *OUTPUT_BUFFER, maxCount);
-    }
-%}
-%ignore pn_sasl_output;
-
 %rename(pn_message_data) wrap_pn_message_data;
 // in PHP:  array = pn_message_data("binary message data", MAXLEN);
 //          array[0] = size || error code
@@ -274,4 +260,4 @@ pn_connector_t *pn_connector_fd(pn_drive
 %ignore pn_connector_free;
 
 
-%include "../cproton.i"
+%include "proton/cproton.i"

Modified: qpid/proton/branches/driver_abstraction/proton-c/bindings/python/python.i
URL: http://svn.apache.org/viewvc/qpid/proton/branches/driver_abstraction/proton-c/bindings/python/python.i?rev=1382918&r1=1382917&r2=1382918&view=diff
==============================================================================
--- qpid/proton/branches/driver_abstraction/proton-c/bindings/python/python.i (original)
+++ qpid/proton/branches/driver_abstraction/proton-c/bindings/python/python.i Mon Sep 10 14:56:20 2012
@@ -104,23 +104,6 @@ ssize_t pn_input(pn_transport_t *transpo
 %}
 %ignore pn_output;
 
-ssize_t pn_sasl_input(pn_sasl_t *sasl, char *STRING, size_t LENGTH);
-%ignore pn_sasl_input;
-
-%rename(pn_sasl_output) wrap_pn_sasl_output;
-%inline %{
-  int wrap_pn_sasl_output(pn_sasl_t *sasl, char *OUTPUT, size_t *OUTPUT_SIZE) {
-    ssize_t sz = pn_sasl_output(sasl, OUTPUT, *OUTPUT_SIZE);
-    if (sz >= 0) {
-      *OUTPUT_SIZE = sz;
-    } else {
-      *OUTPUT_SIZE = 0;
-    }
-    return sz;
-  }
-%}
-%ignore pn_sasl_output;
-
 %rename(pn_delivery) wrap_pn_delivery;
 %inline %{
   pn_delivery_t *wrap_pn_delivery(pn_link_t *link, char *STRING, size_t LENGTH) {
@@ -230,4 +213,4 @@ ssize_t pn_sasl_input(pn_sasl_t *sasl, c
 %}
 %ignore pn_connector_free;
 
-%include "../cproton.i"
+%include "proton/cproton.i"

Modified: qpid/proton/branches/driver_abstraction/proton-c/bindings/ruby/ruby.i
URL: http://svn.apache.org/viewvc/qpid/proton/branches/driver_abstraction/proton-c/bindings/ruby/ruby.i?rev=1382918&r1=1382917&r2=1382918&view=diff
==============================================================================
--- qpid/proton/branches/driver_abstraction/proton-c/bindings/ruby/ruby.i (original)
+++ qpid/proton/branches/driver_abstraction/proton-c/bindings/ruby/ruby.i Mon Sep 10 14:56:20 2012
@@ -154,4 +154,4 @@ ssize_t pn_input(pn_transport_t *transpo
 %}
 %ignore pn_message_data;
 
-%include "../cproton.i"
+%include "proton/cproton.i"

Modified: qpid/proton/branches/driver_abstraction/proton-c/include/proton/buffer.h
URL: http://svn.apache.org/viewvc/qpid/proton/branches/driver_abstraction/proton-c/include/proton/buffer.h?rev=1382918&r1=1382917&r2=1382918&view=diff
==============================================================================
--- qpid/proton/branches/driver_abstraction/proton-c/include/proton/buffer.h (original)
+++ qpid/proton/branches/driver_abstraction/proton-c/include/proton/buffer.h Mon Sep 10 14:56:20 2012
@@ -24,6 +24,10 @@
 
 #include <proton/types.h>
 
+#ifdef __cplusplus
+extern "C" {
+#endif
+
 typedef struct pn_buffer_t pn_buffer_t;
 
 pn_buffer_t *pn_buffer(size_t capacity);
@@ -31,10 +35,15 @@ void pn_buffer_free(pn_buffer_t *buf);
 size_t pn_buffer_size(pn_buffer_t *buf);
 int pn_buffer_append(pn_buffer_t *buf, const char *bytes, size_t size);
 int pn_buffer_prepend(pn_buffer_t *buf, const char *bytes, size_t size);
+size_t pn_buffer_get(pn_buffer_t *buf, size_t offset, size_t size, char *dst);
 int pn_buffer_trim(pn_buffer_t *buf, size_t left, size_t right);
 int pn_buffer_clear(pn_buffer_t *buf);
 int pn_buffer_defrag(pn_buffer_t *buf);
 pn_bytes_t pn_buffer_bytes(pn_buffer_t *buf);
 int pn_buffer_print(pn_buffer_t *buf);
 
+#ifdef __cplusplus
+}
+#endif
+
 #endif /* buffer.h */

Modified: qpid/proton/branches/driver_abstraction/proton-c/include/proton/codec.h
URL: http://svn.apache.org/viewvc/qpid/proton/branches/driver_abstraction/proton-c/include/proton/codec.h?rev=1382918&r1=1382917&r2=1382918&view=diff
==============================================================================
--- qpid/proton/branches/driver_abstraction/proton-c/include/proton/codec.h (original)
+++ qpid/proton/branches/driver_abstraction/proton-c/include/proton/codec.h Mon Sep 10 14:56:20 2012
@@ -28,6 +28,10 @@
 #include <unistd.h>
 #include <stdarg.h>
 
+#ifdef __cplusplus
+extern "C" {
+#endif
+
 typedef enum {
   PN_NULL,
   PN_BOOL,
@@ -118,4 +122,8 @@ pn_atoms_t pn_data_available(pn_data_t *
 int pn_data_format(pn_data_t *data, char *bytes, size_t *size);
 int pn_data_resize(pn_data_t *data, size_t size);
 
+#ifdef __cplusplus
+}
+#endif
+
 #endif /* codec.h */

Modified: qpid/proton/branches/driver_abstraction/proton-c/include/proton/driver.h
URL: http://svn.apache.org/viewvc/qpid/proton/branches/driver_abstraction/proton-c/include/proton/driver.h?rev=1382918&r1=1382917&r2=1382918&view=diff
==============================================================================
--- qpid/proton/branches/driver_abstraction/proton-c/include/proton/driver.h (original)
+++ qpid/proton/branches/driver_abstraction/proton-c/include/proton/driver.h Mon Sep 10 14:56:20 2012
@@ -26,6 +26,10 @@
 #include <proton/engine.h>
 #include <proton/sasl.h>
 
+#ifdef __cplusplus
+extern "C" {
+#endif
+
 /** @file
  * API for the Driver Layer.
  *
@@ -52,6 +56,22 @@ typedef struct pn_connector_t pn_connect
  */
 pn_driver_t *pn_driver(void);
 
+/** Return the most recent error code.
+ *
+ * @param[in] d the driver
+ *
+ * @return the most recent error text for d
+ */
+int pn_driver_errno(pn_driver_t *d);
+
+/** Return the most recent error text for d.
+ *
+ * @param[in] d the driver
+ *
+ * @return the most recent error text for d
+ */
+const char *pn_driver_error(pn_driver_t *d);
+
 /** Set the tracing level for the given driver.
  *
  * @param[in] driver the driver to trace
@@ -124,6 +144,23 @@ pn_listener_t *pn_listener(pn_driver_t *
  */
 pn_listener_t *pn_listener_fd(pn_driver_t *driver, int fd, void *context);
 
+/** Access the head listener for a driver.
+ *
+ * @param[in] driver the driver whose head listener will be returned
+ *
+ * @return the head listener for driver or NULL if there is none
+ */
+pn_listener_t *pn_listener_head(pn_driver_t *driver);
+
+/** Access the next listener.
+ *
+ * @param[in] listener the listener whose next listener will be
+ *            returned
+ *
+ * @return the next listener
+ */
+pn_listener_t *pn_listener_next(pn_listener_t *listener);
+
 /**
  * @todo pn_listener_trace needs documentation
  */
@@ -219,6 +256,23 @@ pn_connector_t *pn_connector(pn_driver_t
  */
 pn_connector_t *pn_connector_fd(pn_driver_t *driver, int fd, void *context);
 
+/** Access the head connector for a driver.
+ *
+ * @param[in] driver the driver whose head connector will be returned
+ *
+ * @return the head connector for driver or NULL if there is none
+ */
+pn_connector_t *pn_connector_head(pn_driver_t *driver);
+
+/** Access the next connector.
+ *
+ * @param[in] connector the connector whose next connector will be
+ *            returned
+ *
+ * @return the next connector
+ */
+pn_connector_t *pn_connector_next(pn_connector_t *connector);
+
 /** Set the tracing level for the given connector.
  *
  * @param[in] connector the connector to trace
@@ -308,7 +362,6 @@ bool pn_connector_closed(pn_connector_t 
  */
 void pn_connector_free(pn_connector_t *connector);
 
-
 /** Configure the set of trusted certificates for this client.  This causes the connector
  * to use SSL/TLS to authenticate the server and encrypt traffic.  It is intended to be
  * used by a client that is attempting to connect to a trusted server.  See
@@ -337,8 +390,6 @@ int pn_connector_ssl_set_client_auth(pn_
                                      const char *private_key_file,
                                      const char *password);
 
-
-
 /** Force the peer (client) to authenticate.  This is intended to be used on those
  * connectors that have been created by a listener - it permits the server to force
  * authentication of the connected client.  See ::pn_listener_ssl_set_client_auth
@@ -352,4 +403,9 @@ int pn_connector_ssl_set_client_auth(pn_
 int pn_connector_ssl_authenticate_client(pn_connector_t *connector,
                                          const char *trusted_CAs_file);
 
+
+#ifdef __cplusplus
+}
+#endif
+
 #endif /* driver.h */

Modified: qpid/proton/branches/driver_abstraction/proton-c/include/proton/engine.h
URL: http://svn.apache.org/viewvc/qpid/proton/branches/driver_abstraction/proton-c/include/proton/engine.h?rev=1382918&r1=1382917&r2=1382918&view=diff
==============================================================================
--- qpid/proton/branches/driver_abstraction/proton-c/include/proton/engine.h (original)
+++ qpid/proton/branches/driver_abstraction/proton-c/include/proton/engine.h Mon Sep 10 14:56:20 2012
@@ -27,6 +27,10 @@
 #include <sys/types.h>
 #include <proton/error.h>
 
+#ifdef __cplusplus
+extern "C" {
+#endif
+
 /** @file
  * API for the proton Engine.
  *
@@ -144,15 +148,22 @@ pn_delivery_t *pn_work_next(pn_delivery_
  */
 pn_session_t *pn_session(pn_connection_t *connection);
 
-/** Factory for creating the connection's transport.
+/** Factory for creating a transport.
  *
- * The transport used by the connection to interface with the network.
- * There can only be one transport associated with a connection.
+ * A transport to be used by a connection to interface with the
+ * network. There can only be one connection associated with a
+ * transport. See pn_transport_bind().
  *
- * @param[in] connection connection that will use the transport
- * @return pointer to new session
+ * @return pointer to new transport
+ */
+pn_transport_t *pn_transport(void);
+
+/** Binds the transport to an AMQP connection endpoint.
+ *
+ * @return an error code, or 0 on success
  */
-pn_transport_t *pn_transport(pn_connection_t *connection);
+
+int pn_transport_bind(pn_transport_t *transport, pn_connection_t *connection);
 
 /** Retrieve the first Session that matches the given state mask.
  *
@@ -218,6 +229,24 @@ void pn_connection_open(pn_connection_t 
 void pn_connection_close(pn_connection_t *connection);
 void pn_connection_free(pn_connection_t *connection);
 
+/** Access the application context that is associated with the
+ *  connection.
+ *
+ * @param[in] connection the connection whose context is to be returned.
+ *
+ * @return the application context that was passed to pn_connection_set_context()
+ */
+void *pn_connection_context(pn_connection_t *connection);
+
+/** Assign a new application context to the connection.
+ *
+ * @param[in] connection the connection which will hold the context.
+ * @param[in] context new application context to associate with the
+ *                    connection
+ */
+void pn_connection_set_context(pn_connection_t *connection, void *context);
+
+
 // transport
 pn_error_t *pn_transport_error(pn_transport_t *transport);
 ssize_t pn_input(pn_transport_t *transport, char *bytes, size_t available);
@@ -247,8 +276,8 @@ const char *pn_target(pn_link_t *link);
 const char *pn_source(pn_link_t *link);
 void pn_set_source(pn_link_t *link, const char *source);
 void pn_set_target(pn_link_t *link, const char *target);
-char *pn_remote_source(pn_link_t *link);
-char *pn_remote_target(pn_link_t *link);
+const char *pn_remote_source(pn_link_t *link);
+const char *pn_remote_target(pn_link_t *link);
 pn_delivery_t *pn_delivery(pn_link_t *link, pn_delivery_tag_t tag);
 pn_delivery_t *pn_current(pn_link_t *link);
 bool pn_advance(pn_link_t *link);
@@ -291,4 +320,8 @@ void pn_disposition(pn_delivery_t *deliv
 void pn_settle(pn_delivery_t *delivery);
 void pn_delivery_dump(pn_delivery_t *delivery);
 
+#ifdef __cplusplus
+}
+#endif
+
 #endif /* engine.h */

Modified: qpid/proton/branches/driver_abstraction/proton-c/include/proton/error.h
URL: http://svn.apache.org/viewvc/qpid/proton/branches/driver_abstraction/proton-c/include/proton/error.h?rev=1382918&r1=1382917&r2=1382918&view=diff
==============================================================================
--- qpid/proton/branches/driver_abstraction/proton-c/include/proton/error.h (original)
+++ qpid/proton/branches/driver_abstraction/proton-c/include/proton/error.h Mon Sep 10 14:56:20 2012
@@ -24,6 +24,10 @@
 
 #include <stdarg.h>
 
+#ifdef __cplusplus
+extern "C" {
+#endif
+
 typedef struct pn_error_t pn_error_t;
 
 #define PN_EOS (-1)
@@ -42,7 +46,12 @@ void pn_error_clear(pn_error_t *error);
 int pn_error_set(pn_error_t *error, int code, const char *text);
 int pn_error_vformat(pn_error_t *error, int code, const char *fmt, va_list ap);
 int pn_error_format(pn_error_t *error, int code, const char *fmt, ...);
+int pn_error_from_errno(pn_error_t *error, const char *msg);
 int pn_error_code(pn_error_t *error);
 const char *pn_error_text(pn_error_t *error);
 
+#ifdef __cplusplus
+}
+#endif
+
 #endif /* error.h */

Modified: qpid/proton/branches/driver_abstraction/proton-c/include/proton/framing.h
URL: http://svn.apache.org/viewvc/qpid/proton/branches/driver_abstraction/proton-c/include/proton/framing.h?rev=1382918&r1=1382917&r2=1382918&view=diff
==============================================================================
--- qpid/proton/branches/driver_abstraction/proton-c/include/proton/framing.h (original)
+++ qpid/proton/branches/driver_abstraction/proton-c/include/proton/framing.h Mon Sep 10 14:56:20 2012
@@ -25,6 +25,10 @@
 #include <stdint.h>
 #include <sys/types.h>
 
+#ifdef __cplusplus
+extern "C" {
+#endif
+
 #define AMQP_HEADER_SIZE (8)
 
 typedef struct {
@@ -39,4 +43,8 @@ typedef struct {
 size_t pn_read_frame(pn_frame_t *frame, char *bytes, size_t available);
 size_t pn_write_frame(char *bytes, size_t size, pn_frame_t frame);
 
+#ifdef __cplusplus
+}
+#endif
+
 #endif /* framing.h */

Modified: qpid/proton/branches/driver_abstraction/proton-c/include/proton/message.h
URL: http://svn.apache.org/viewvc/qpid/proton/branches/driver_abstraction/proton-c/include/proton/message.h?rev=1382918&r1=1382917&r2=1382918&view=diff
==============================================================================
--- qpid/proton/branches/driver_abstraction/proton-c/include/proton/message.h (original)
+++ qpid/proton/branches/driver_abstraction/proton-c/include/proton/message.h Mon Sep 10 14:56:20 2012
@@ -27,6 +27,10 @@
 #include <sys/types.h>
 #include <stdbool.h>
 
+#ifdef __cplusplus
+extern "C" {
+#endif
+
 typedef struct pn_message_t pn_message_t;
 typedef enum {
   PN_DATA,
@@ -143,4 +147,8 @@ int pn_message_encode(pn_message_t *msg,
 
 ssize_t pn_message_data(char *dst, size_t available, const char *src, size_t size);
 
+#ifdef __cplusplus
+}
+#endif
+
 #endif /* message.h */

Modified: qpid/proton/branches/driver_abstraction/proton-c/include/proton/messenger.h
URL: http://svn.apache.org/viewvc/qpid/proton/branches/driver_abstraction/proton-c/include/proton/messenger.h?rev=1382918&r1=1382917&r2=1382918&view=diff
==============================================================================
--- qpid/proton/branches/driver_abstraction/proton-c/include/proton/messenger.h (original)
+++ qpid/proton/branches/driver_abstraction/proton-c/include/proton/messenger.h Mon Sep 10 14:56:20 2012
@@ -24,6 +24,10 @@
 
 #include <proton/message.h>
 
+#ifdef __cplusplus
+extern "C" {
+#endif
+
 /** @file
  * The messenger API provides a high level interface for sending and
  * receiving AMQP messages.
@@ -180,4 +184,8 @@ int pn_messenger_outgoing(pn_messenger_t
  */
 int pn_messenger_incoming(pn_messenger_t *messenger);
 
+#ifdef __cplusplus
+}
+#endif
+
 #endif /* messenger.h */

Modified: qpid/proton/branches/driver_abstraction/proton-c/include/proton/parser.h
URL: http://svn.apache.org/viewvc/qpid/proton/branches/driver_abstraction/proton-c/include/proton/parser.h?rev=1382918&r1=1382917&r2=1382918&view=diff
==============================================================================
--- qpid/proton/branches/driver_abstraction/proton-c/include/proton/parser.h (original)
+++ qpid/proton/branches/driver_abstraction/proton-c/include/proton/parser.h Mon Sep 10 14:56:20 2012
@@ -24,12 +24,20 @@
 
 #include <proton/codec.h>
 
+#ifdef __cplusplus
+extern "C" {
+#endif
+
 typedef struct pn_parser_t pn_parser_t;
 
-pn_parser_t *pn_parser();
+pn_parser_t *pn_parser(void);
 int pn_parser_parse(pn_parser_t *parser, const char *str, pn_atoms_t *atoms);
 int pn_parser_errno(pn_parser_t *parser);
 const char *pn_parser_error(pn_parser_t *parser);
 void pn_parser_free(pn_parser_t *parser);
 
+#ifdef __cplusplus
+}
+#endif
+
 #endif /* parser.h */

Modified: qpid/proton/branches/driver_abstraction/proton-c/include/proton/sasl.h
URL: http://svn.apache.org/viewvc/qpid/proton/branches/driver_abstraction/proton-c/include/proton/sasl.h?rev=1382918&r1=1382917&r2=1382918&view=diff
==============================================================================
--- qpid/proton/branches/driver_abstraction/proton-c/include/proton/sasl.h (original)
+++ qpid/proton/branches/driver_abstraction/proton-c/include/proton/sasl.h Mon Sep 10 14:56:20 2012
@@ -24,6 +24,11 @@
 
 #include <sys/types.h>
 #include <stdbool.h>
+#include <proton/engine.h>
+
+#ifdef __cplusplus
+extern "C" {
+#endif
 
 /** @file
  * API for the SASL Secure Transport Layer.
@@ -60,7 +65,7 @@ typedef enum {
  *
  * @return a new SASL object representing the layer.
  */
-pn_sasl_t *pn_sasl();
+pn_sasl_t *pn_sasl(pn_transport_t *transport);
 
 /** Access the current state of the layer.
  *
@@ -159,38 +164,8 @@ void pn_sasl_done(pn_sasl_t *sasl, pn_sa
  */
 pn_sasl_outcome_t pn_sasl_outcome(pn_sasl_t *sasl);
 
-/** Decode input data bytes into SASL frames, and process them.
- *
- * This function is called by the driver layer to pass data received
- * from the remote peer into the SASL layer.
- *
- * @param[in] sasl the SASL layer.
- * @param[in] bytes buffer of frames to process
- * @param[in] available number of octets of data in 'bytes'
- * @return the number of bytes consumed, or error code if < 0
- */
-ssize_t pn_sasl_input(pn_sasl_t *sasl, char *bytes, size_t available);
-
-/** Gather output frames from the layer.
- *
- * This function is used by the driver to poll the SASL layer for data
- * that will be sent to the remote peer.
- *
- * @param[in] sasl The SASL layer.
- * @param[out] bytes to be filled with encoded frames.
- * @param[in] size space available in bytes array.
- * @return the number of octets written to bytes, or error code if < 0
- */
-ssize_t pn_sasl_output(pn_sasl_t *sasl, char *bytes, size_t size);
-
-void pn_sasl_trace(pn_sasl_t *sasl, pn_trace_t trace);
-
-/** Destructor for the given SASL layer.
- *
- * @param[in] sasl the SASL object to free. No longer valid on
- *                 return.
- */
-void pn_sasl_free(pn_sasl_t *sasl);
-
+#ifdef __cplusplus
+}
+#endif
 
 #endif /* sasl.h */

Modified: qpid/proton/branches/driver_abstraction/proton-c/include/proton/scanner.h
URL: http://svn.apache.org/viewvc/qpid/proton/branches/driver_abstraction/proton-c/include/proton/scanner.h?rev=1382918&r1=1382917&r2=1382918&view=diff
==============================================================================
--- qpid/proton/branches/driver_abstraction/proton-c/include/proton/scanner.h (original)
+++ qpid/proton/branches/driver_abstraction/proton-c/include/proton/scanner.h Mon Sep 10 14:56:20 2012
@@ -25,6 +25,10 @@
 #include <sys/types.h>
 #include <stdarg.h>
 
+#ifdef __cplusplus
+extern "C" {
+#endif
+
 typedef enum {
   PN_TOK_LBRACE,
   PN_TOK_RBRACE,
@@ -70,4 +74,8 @@ int pn_scanner_start(pn_scanner_t *scann
 int pn_scanner_scan(pn_scanner_t *scanner);
 int pn_scanner_shift(pn_scanner_t *scanner);
 
+#ifdef __cplusplus
+}
+#endif
+
 #endif /* scanner.h */

Modified: qpid/proton/branches/driver_abstraction/proton-c/include/proton/types.h
URL: http://svn.apache.org/viewvc/qpid/proton/branches/driver_abstraction/proton-c/include/proton/types.h?rev=1382918&r1=1382917&r2=1382918&view=diff
==============================================================================
--- qpid/proton/branches/driver_abstraction/proton-c/include/proton/types.h (original)
+++ qpid/proton/branches/driver_abstraction/proton-c/include/proton/types.h Mon Sep 10 14:56:20 2012
@@ -25,6 +25,10 @@
 #include <sys/types.h>
 #include <stdint.h>
 
+#ifdef __cplusplus
+extern "C" {
+#endif
+
 typedef int32_t  pn_sequence_t;
 typedef uint32_t pn_millis_t;
 typedef uint64_t pn_timestamp_t;
@@ -37,4 +41,8 @@ typedef struct {
 pn_bytes_t pn_bytes(size_t size, char *start);
 pn_bytes_t pn_bytes_dup(size_t size, const char *start);
 
+#ifdef __cplusplus
+}
+#endif
+
 #endif /* types.h */

Modified: qpid/proton/branches/driver_abstraction/proton-c/include/proton/util.h
URL: http://svn.apache.org/viewvc/qpid/proton/branches/driver_abstraction/proton-c/include/proton/util.h?rev=1382918&r1=1382917&r2=1382918&view=diff
==============================================================================
--- qpid/proton/branches/driver_abstraction/proton-c/include/proton/util.h (original)
+++ qpid/proton/branches/driver_abstraction/proton-c/include/proton/util.h Mon Sep 10 14:56:20 2012
@@ -24,8 +24,16 @@
 
 #include <stdarg.h>
 
+#ifdef __cplusplus
+extern "C" {
+#endif
+
 void parse_url(char *url, char **user, char **pass, char **host, char **port);
 void pn_fatal(char *fmt, ...);
 void pn_vfatal(char *fmt, va_list ap);
 
+#ifdef __cplusplus
+}
+#endif
+
 #endif /* util.h */

Modified: qpid/proton/branches/driver_abstraction/proton-c/src/buffer.c
URL: http://svn.apache.org/viewvc/qpid/proton/branches/driver_abstraction/proton-c/src/buffer.c?rev=1382918&r1=1382917&r2=1382918&view=diff
==============================================================================
--- qpid/proton/branches/driver_abstraction/proton-c/src/buffer.c (original)
+++ qpid/proton/branches/driver_abstraction/proton-c/src/buffer.c Mon Sep 10 14:56:20 2012
@@ -139,9 +139,6 @@ int pn_buffer_ensure(pn_buffer_t *buf, s
   return 0;
 }
 
-#define min(X,Y) ((X) > (Y) ? (Y) : (X))
-#define max(X,Y) ((X) < (Y) ? (Y) : (X))
-
 int pn_buffer_append(pn_buffer_t *buf, const char *bytes, size_t size)
 {
   int err = pn_buffer_ensure(buf, size);
@@ -149,7 +146,7 @@ int pn_buffer_append(pn_buffer_t *buf, c
 
   size_t tail = pn_buffer_tail(buf);
   size_t tail_space = pn_buffer_tail_space(buf);
-  size_t n = min(tail_space, size);
+  size_t n = pn_min(tail_space, size);
 
   memmove(buf->bytes + tail, bytes, n);
   memmove(buf->bytes, bytes + n, size - n);
@@ -166,7 +163,7 @@ int pn_buffer_prepend(pn_buffer_t *buf, 
 
   size_t head = pn_buffer_head(buf);
   size_t head_space = pn_buffer_head_space(buf);
-  size_t n = min(head_space, size);
+  size_t n = pn_min(head_space, size);
 
   memmove(buf->bytes + head - n, bytes + size - n, n);
   memmove(buf->bytes + buf->capacity - (size - n), bytes, size - n);
@@ -182,6 +179,35 @@ int pn_buffer_prepend(pn_buffer_t *buf, 
   return 0;
 }
 
+size_t pn_buffer_index(pn_buffer_t *buf, size_t index)
+{
+  size_t result = buf->start + index;
+  if (result >= buf->capacity) result -= buf->capacity;
+  return result;
+}
+
+size_t pn_buffer_get(pn_buffer_t *buf, size_t offset, size_t size, char *dst)
+{
+  size_t start = pn_buffer_index(buf, offset);
+  size_t stop = pn_buffer_index(buf, pn_min(offset + size, buf->size));
+
+  size_t sz1;
+  size_t sz2;
+
+  if (start > stop) {
+    sz1 = buf->capacity - start;
+    sz2 = stop;
+  } else {
+    sz1 = stop - start;
+    sz2 = 0;
+  }
+
+  memmove(dst, buf->bytes + start, sz1);
+  memmove(dst + sz1, buf->bytes, sz2);
+
+  return sz1 + sz2;
+}
+
 int pn_buffer_trim(pn_buffer_t *buf, size_t left, size_t right)
 {
   if (left + right > buf->size) return PN_ARG_ERR;
@@ -202,13 +228,6 @@ int pn_buffer_clear(pn_buffer_t *buf)
   return 0;
 }
 
-size_t pn_buffer_index(pn_buffer_t *buf, size_t index)
-{
-  size_t result = buf->start + index;
-  if (result >= buf->capacity) result -= buf->capacity;
-  return result;
-}
-
 static void pn_buffer_rotate (pn_buffer_t *buf, size_t sz) {
   if (sz == 0) return;
 

Modified: qpid/proton/branches/driver_abstraction/proton-c/src/dispatcher/dispatcher.c
URL: http://svn.apache.org/viewvc/qpid/proton/branches/driver_abstraction/proton-c/src/dispatcher/dispatcher.c?rev=1382918&r1=1382917&r2=1382918&view=diff
==============================================================================
--- qpid/proton/branches/driver_abstraction/proton-c/src/dispatcher/dispatcher.c (original)
+++ qpid/proton/branches/driver_abstraction/proton-c/src/dispatcher/dispatcher.c Mon Sep 10 14:56:20 2012
@@ -82,8 +82,8 @@ static void pn_do_trace(pn_dispatcher_t 
     uint8_t code = scanned ? code64 : 0;
     size_t n = SCRATCH;
     pn_data_format(args, disp->scratch, &n);
-    fprintf(stderr, "[%p:%u] %s %s %s", (void *) disp, ch,
-            dir == OUT ? "->" : "<-", disp->names[code], disp->scratch);
+    pn_dispatcher_trace(disp, ch, "%s %s %s", dir == OUT ? "->" : "<-",
+                        disp->names[code], disp->scratch);
     if (size) {
       size_t capacity = 4*size + 1;
       char buf[capacity];
@@ -95,6 +95,16 @@ static void pn_do_trace(pn_dispatcher_t 
   }
 }
 
+void pn_dispatcher_trace(pn_dispatcher_t *disp, uint16_t ch, char *fmt, ...)
+{
+  va_list ap;
+  fprintf(stderr, "[%p:%u] ", (void *) disp, ch);
+
+  va_start(ap, fmt);
+  vfprintf(stderr, fmt, ap);
+  va_end(ap);
+}
+
 ssize_t pn_dispatcher_input(pn_dispatcher_t *disp, char *bytes, size_t available)
 {
   size_t read = 0;

Modified: qpid/proton/branches/driver_abstraction/proton-c/src/dispatcher/dispatcher.h
URL: http://svn.apache.org/viewvc/qpid/proton/branches/driver_abstraction/proton-c/src/dispatcher/dispatcher.h?rev=1382918&r1=1382917&r2=1382918&view=diff
==============================================================================
--- qpid/proton/branches/driver_abstraction/proton-c/src/dispatcher/dispatcher.h (original)
+++ qpid/proton/branches/driver_abstraction/proton-c/src/dispatcher/dispatcher.h Mon Sep 10 14:56:20 2012
@@ -64,5 +64,6 @@ void pn_set_payload(pn_dispatcher_t *dis
 int pn_post_frame(pn_dispatcher_t *disp, uint16_t ch, const char *fmt, ...);
 ssize_t pn_dispatcher_input(pn_dispatcher_t *disp, char *bytes, size_t available);
 ssize_t pn_dispatcher_output(pn_dispatcher_t *disp, char *bytes, size_t size);
+void pn_dispatcher_trace(pn_dispatcher_t *disp, uint16_t ch, char *fmt, ...);
 
 #endif /* dispatcher.h */

Modified: qpid/proton/branches/driver_abstraction/proton-c/src/driver.c
URL: http://svn.apache.org/viewvc/qpid/proton/branches/driver_abstraction/proton-c/src/driver.c?rev=1382918&r1=1382917&r2=1382918&view=diff
==============================================================================
--- qpid/proton/branches/driver_abstraction/proton-c/src/driver.c (original)
+++ qpid/proton/branches/driver_abstraction/proton-c/src/driver.c Mon Sep 10 14:56:20 2012
@@ -69,24 +69,24 @@ pn_listener_t *pn_listener(pn_driver_t *
   struct addrinfo *addr;
   int code = getaddrinfo(host, port, NULL, &addr);
   if (code) {
-    fprintf(stderr, "getaddrinfo: %s\n", gai_strerror(code));
+    pn_error_format(driver->error, PN_ERR, "getaddrinfo: %s\n", gai_strerror(code));
     return NULL;
   }
 
   int sock = socket(AF_INET, SOCK_STREAM, getprotobyname("tcp")->p_proto);
   if (sock == -1) {
-    perror("socket");
+    pn_error_from_errno(driver->error, "socket");
     return NULL;
   }
 
   int optval = 1;
   if (setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, &optval, sizeof(optval)) == -1) {
-    perror("setsockopt");
+    pn_error_from_errno(driver->error, "setsockopt");
     return NULL;
   }
 
   if (bind(sock, addr->ai_addr, addr->ai_addrlen) == -1) {
-    perror("bind");
+    pn_error_from_errno(driver->error, "bind");
     freeaddrinfo(addr);
     return NULL;
   }
@@ -94,14 +94,14 @@ pn_listener_t *pn_listener(pn_driver_t *
   freeaddrinfo(addr);
 
   if (listen(sock, 50) == -1) {
-    perror("listen");
+    pn_error_from_errno(driver->error, "listen");
     return NULL;
   }
 
   pn_listener_t *l = pn_listener_fd(driver, sock, context);
 
   if (driver->trace & (PN_TRACE_FRM | PN_TRACE_RAW | PN_TRACE_DRV))
-    printf("Listening on %s:%s\n", host, port);
+    fprintf(stderr, "Listening on %s:%s\n", host, port);
   return l;
 }
 
@@ -125,6 +125,16 @@ pn_listener_t *pn_listener_fd(pn_driver_
   return l;
 }
 
+pn_listener_t *pn_listener_head(pn_driver_t *driver)
+{
+  return driver ? driver->listener_head : NULL;
+}
+
+pn_listener_t *pn_listener_next(pn_listener_t *listener)
+{
+  return listener ? listener->listener_next : NULL;
+}
+
 void pn_listener_trace(pn_listener_t *l, pn_trace_t trace) {
   // XXX
 }
@@ -227,17 +237,20 @@ pn_connector_t *pn_connector(pn_driver_t
   struct addrinfo *addr;
   int code = getaddrinfo(host, port, NULL, &addr);
   if (code) {
-    fprintf(stderr, "getaddrinfo: %s\n", gai_strerror(code));
+    pn_error_format(driver->error, PN_ERR, "getaddrinfo: %s", gai_strerror(code));
     return NULL;
   }
 
   int sock = socket(AF_INET, SOCK_STREAM, getprotobyname("tcp")->p_proto);
-  if (sock == -1)
+  if (sock == -1) {
+    pn_error_from_errno(driver->error, "socket");
     return NULL;
+  }
 
   pn_configure_sock(sock);
 
   if (connect(sock, addr->ai_addr, addr->ai_addrlen) == -1) {
+    pn_error_from_errno(driver->error, "connect");
     freeaddrinfo(addr);
     return NULL;
   }
@@ -255,15 +268,6 @@ static void pn_connector_read(pn_connect
 static void pn_connector_write(pn_connector_t *ctor);
 static time_t pn_connector_tick(pn_connector_t *ctor, time_t now);
 
-static ssize_t pn_connector_read_sasl_header(pn_connector_t *ctor);
-static ssize_t pn_connector_read_sasl(pn_connector_t *ctor);
-static ssize_t pn_connector_read_amqp_header(pn_connector_t *ctor);
-static ssize_t pn_connector_read_amqp(pn_connector_t *ctor);
-static ssize_t pn_connector_write_sasl_header(pn_connector_t *ctor);
-static ssize_t pn_connector_write_sasl(pn_connector_t *ctor);
-static ssize_t pn_connector_write_amqp_header(pn_connector_t *ctor);
-static ssize_t pn_connector_write_amqp(pn_connector_t *ctor);
-
 pn_connector_t *pn_connector_fd(pn_driver_t *driver, int fd, void *context)
 {
   if (!driver) return NULL;
@@ -289,11 +293,8 @@ pn_connector_t *pn_connector_fd(pn_drive
   c->input_size = 0;
   c->input_eos = false;
   c->output_size = 0;
-  c->sasl = pn_sasl();
   c->connection = NULL;
-  c->transport = NULL;
-  c->process_input = pn_connector_read_sasl_header;
-  c->process_output = pn_connector_write_sasl_header;
+  c->transport = pn_transport();
   c->input_done = false;
   c->output_done = false;
   c->context = context;
@@ -308,24 +309,33 @@ pn_connector_t *pn_connector_fd(pn_drive
   return c;
 }
 
+pn_connector_t *pn_connector_head(pn_driver_t *driver)
+{
+  return driver ? driver->connector_head : NULL;
+}
+
+pn_connector_t *pn_connector_next(pn_connector_t *connector)
+{
+  return connector ? connector->connector_next : NULL;
+}
+
 void pn_connector_trace(pn_connector_t *ctor, pn_trace_t trace)
 {
   if (!ctor) return;
   ctor->trace = trace;
-  if (ctor->sasl) pn_sasl_trace(ctor->sasl, trace);
   if (ctor->transport) pn_trace(ctor->transport, trace);
 }
 
 pn_sasl_t *pn_connector_sasl(pn_connector_t *ctor)
 {
-  return ctor ? ctor->sasl : NULL;
+  return ctor ? pn_sasl(ctor->transport) : NULL;
 }
 
 void pn_connector_set_connection(pn_connector_t *ctor, pn_connection_t *connection)
 {
   if (!ctor) return;
   ctor->connection = connection;
-  ctor->transport = pn_transport(connection);
+  pn_transport_bind(ctor->transport, connection);
   if (ctor->transport) pn_trace(ctor->transport, ctor->trace);
 }
 
@@ -375,8 +385,8 @@ void pn_connector_free(pn_connector_t *c
 
   pn_connector_poller_destroy(ctor);
   ctor->connection = NULL;
+  pn_transport_free(ctor->transport);
   ctor->transport = NULL;
-  pn_sasl_free(ctor->sasl);
   free(ctor);
 }
 
@@ -400,95 +410,20 @@ static void pn_connector_consume(pn_conn
 
 void pn_connector_process_input(pn_connector_t *ctor)
 {
-  while (!ctor->input_done && (ctor->input_size > 0 || ctor->input_eos)) {
-    ssize_t n = ctor->process_input(ctor);
-    if (n > 0) {
-      pn_connector_consume(ctor, n);
-    } else if (n == 0) {
-      break;
-    } else {
-      if (n == PN_EOS) {
-        pn_connector_consume(ctor, ctor->input_size);
+  pn_transport_t *transport = ctor->transport;
+  if (!ctor->input_done) {
+    if (ctor->input_size > 0 || ctor->input_eos) {
+      ssize_t n = pn_input(transport, ctor->input, ctor->input_size);
+      if (n >= 0) {
+        pn_connector_consume(ctor, n);
       } else {
-        fprintf(stderr, "error in process_input: %s\n", pn_code(n));
+        pn_connector_consume(ctor, ctor->input_size);
+        ctor->input_done = true;
       }
-      ctor->input_done = true;
-      break;
     }
   }
 }
 
-static ssize_t pn_connector_read_sasl_header(pn_connector_t *ctor)
-{
-  if (ctor->input_size >= 8) {
-    if (memcmp(ctor->input, "AMQP\x03\x01\x00\x00", 8)) {
-      fprintf(stderr, "sasl header missmatch: ");
-      pn_fprint_data(stderr, ctor->input, ctor->input_size);
-      fprintf(stderr, "\n");
-      ctor->output_done = true;
-      return PN_ERR;
-    } else {
-      if (ctor->trace & PN_TRACE_FRM)
-        fprintf(stderr, "    <- AMQP SASL 1.0\n");
-      ctor->process_input = pn_connector_read_sasl;
-      return 8;
-    }
-  } else if (ctor->input_eos) {
-    fprintf(stderr, "sasl header missmatch: ");
-    pn_fprint_data(stderr, ctor->input, ctor->input_size);
-    fprintf(stderr, "\n");
-    ctor->output_done = true;
-    return PN_ERR;
-  }
-
-  return 0;
-}
-
-static ssize_t pn_connector_read_sasl(pn_connector_t *ctor)
-{
-  pn_sasl_t *sasl = ctor->sasl;
-  ssize_t n = pn_sasl_input(sasl, ctor->input, ctor->input_size);
-  if (n == PN_EOS) {
-    ctor->process_input = pn_connector_read_amqp_header;
-    return ctor->process_input(ctor);
-  } else {
-    return n;
-  }
-}
-
-static ssize_t pn_connector_read_amqp_header(pn_connector_t *ctor)
-{
-  if (ctor->input_size >= 8) {
-    if (memcmp(ctor->input, "AMQP\x00\x01\x00\x00", 8)) {
-      fprintf(stderr, "amqp header missmatch: ");
-      pn_fprint_data(stderr, ctor->input, ctor->input_size);
-      fprintf(stderr, "\n");
-      ctor->output_done = true;
-      return PN_ERR;
-    } else {
-      if (ctor->trace & PN_TRACE_FRM)
-        fprintf(stderr, "    <- AMQP 1.0\n");
-      ctor->process_input = pn_connector_read_amqp;
-      return 8;
-    }
-  } else if (ctor->input_eos) {
-    fprintf(stderr, "amqp header missmatch: ");
-    pn_fprint_data(stderr, ctor->input, ctor->input_size);
-    fprintf(stderr, "\n");
-    ctor->output_done = true;
-    return PN_ERR;
-  }
-
-  return 0;
-}
-
-static ssize_t pn_connector_read_amqp(pn_connector_t *ctor)
-{
-  if (!ctor->transport) return 0;
-  pn_transport_t *transport = ctor->transport;
-  return pn_input(transport, ctor->input, ctor->input_size);
-}
-
 static char *pn_connector_output(pn_connector_t *ctor)
 {
   return ctor->output + ctor->output_size;
@@ -501,18 +436,13 @@ static size_t pn_connector_available(pn_
 
 void pn_connector_process_output(pn_connector_t *ctor)
 {
-  while (!ctor->output_done && pn_connector_available(ctor) > 0) {
-    ssize_t n = ctor->process_output(ctor);
-    if (n > 0) {
+  pn_transport_t *transport = ctor->transport;
+  if (!ctor->output_done) {
+    ssize_t n = pn_output(transport, pn_connector_output(ctor), pn_connector_available(ctor));
+    if (n >= 0) {
       ctor->output_size += n;
-    } else if (n == 0) {
-      break;
     } else {
-      if (n != PN_EOS) {
-        fprintf(stderr, "error in process_output: %s\n", pn_code(n));
-      }
       ctor->output_done = true;
-      break;
     }
   }
 
@@ -540,43 +470,6 @@ static void pn_connector_write(pn_connec
     ctor->status &= ~PN_SEL_WR;
 }
 
-static ssize_t pn_connector_write_sasl_header(pn_connector_t *ctor)
-{
-  if (ctor->trace & PN_TRACE_FRM)
-    fprintf(stderr, "    -> AMQP SASL 1.0\n");
-  memmove(pn_connector_output(ctor), "AMQP\x03\x01\x00\x00", 8);
-  ctor->process_output = pn_connector_write_sasl;
-  return 8;
-}
-
-static ssize_t pn_connector_write_sasl(pn_connector_t *ctor)
-{
-  pn_sasl_t *sasl = ctor->sasl;
-  ssize_t n = pn_sasl_output(sasl, pn_connector_output(ctor), pn_connector_available(ctor));
-  if (n == PN_EOS) {
-    ctor->process_output = pn_connector_write_amqp_header;
-    return ctor->process_output(ctor);
-  } else {
-    return n;
-  }
-}
-
-static ssize_t pn_connector_write_amqp_header(pn_connector_t *ctor)
-{
-  if (ctor->trace & PN_TRACE_FRM)
-    fprintf(stderr, "    -> AMQP 1.0\n");
-  memmove(pn_connector_output(ctor), "AMQP\x00\x01\x00\x00", 8);
-  ctor->process_output = pn_connector_write_amqp;
-  return 8;
-}
-
-static ssize_t pn_connector_write_amqp(pn_connector_t *ctor)
-{
-  if (!ctor->transport) return 0;
-  pn_transport_t *transport = ctor->transport;
-  return pn_output(transport, pn_connector_output(ctor), pn_connector_available(ctor));
-}
-
 static time_t pn_connector_tick(pn_connector_t *ctor, time_t now)
 {
   if (!ctor->transport) return 0;
@@ -620,6 +513,7 @@ pn_driver_t *pn_driver()
 {
   pn_driver_t *d = malloc(sizeof(pn_driver_t));
   if (!d) return NULL;
+  d->error = pn_error();
   d->listener_head = NULL;
   d->listener_tail = NULL;
   d->listener_next = NULL;
@@ -645,6 +539,16 @@ pn_driver_t *pn_driver()
   return d;
 }
 
+int pn_driver_errno(pn_driver_t *d)
+{
+  return d ? pn_error_code(d->error) : PN_ARG_ERR;
+}
+
+const char *pn_driver_error(pn_driver_t *d)
+{
+  return d ? pn_error_text(d->error) : NULL;
+}
+
 void pn_driver_trace(pn_driver_t *d, pn_trace_t trace)
 {
   d->trace = trace;
@@ -662,6 +566,7 @@ void pn_driver_free(pn_driver_t *d)
     pn_listener_free(d->listener_head);
 
   pn_driver_poller_destroy(d);
+  pn_error_free(d->error);
   free(d);
 }
 

Modified: qpid/proton/branches/driver_abstraction/proton-c/src/driver_impl.h
URL: http://svn.apache.org/viewvc/qpid/proton/branches/driver_abstraction/proton-c/src/driver_impl.h?rev=1382918&r1=1382917&r2=1382918&view=diff
==============================================================================
--- qpid/proton/branches/driver_abstraction/proton-c/src/driver_impl.h (original)
+++ qpid/proton/branches/driver_abstraction/proton-c/src/driver_impl.h Mon Sep 10 14:56:20 2012
@@ -25,6 +25,7 @@
 /* Decls */
 
 struct pn_driver_t {
+  pn_error_t *error;
   pn_listener_t *listener_head;
   pn_listener_t *listener_tail;
   pn_listener_t *listener_next;
@@ -90,11 +91,8 @@ struct pn_connector_t {
   bool input_eos;
   size_t output_size;
   char output[PN_CONNECTOR_IO_BUF_SIZE];
-  pn_sasl_t *sasl;
   pn_connection_t *connection;
   pn_transport_t *transport;
-  ssize_t (*process_input)(pn_connector_t *);
-  ssize_t (*process_output)(pn_connector_t *);
   bool input_done;
   bool output_done;
   pn_listener_t *listener;

Modified: qpid/proton/branches/driver_abstraction/proton-c/src/engine/engine-internal.h
URL: http://svn.apache.org/viewvc/qpid/proton/branches/driver_abstraction/proton-c/src/engine/engine-internal.h?rev=1382918&r1=1382917&r2=1382918&view=diff
==============================================================================
--- qpid/proton/branches/driver_abstraction/proton-c/src/engine/engine-internal.h (original)
+++ qpid/proton/branches/driver_abstraction/proton-c/src/engine/engine-internal.h Mon Sep 10 14:56:20 2012
@@ -22,6 +22,7 @@
  *
  */
 
+#include <proton/buffer.h>
 #include <proton/engine.h>
 #include <proton/types.h>
 #include "../dispatcher/dispatcher.h"
@@ -85,15 +86,22 @@ typedef struct {
 
 #define SCRATCH (1024)
 
+#include <proton/sasl.h>
+
 struct pn_transport_t {
-  pn_endpoint_t endpoint;
+  ssize_t (*process_input)(pn_transport_t *, char *, size_t);
+  ssize_t (*process_output)(pn_transport_t *, char *, size_t);
+  size_t header_count;
+  pn_sasl_t *sasl;
   pn_connection_t *connection;
   pn_dispatcher_t *disp;
   bool open_sent;
   bool open_rcvd;
   bool close_sent;
   bool close_rcvd;
-  int error;
+  char *remote_container;
+  char *remote_hostname;
+  pn_error_t *error;
   pn_session_state_t *sessions;
   size_t session_capacity;
   pn_session_state_t **channels;
@@ -118,8 +126,7 @@ struct pn_connection_t {
   pn_delivery_t *tpwork_tail;
   char *container;
   char *hostname;
-  char *remote_container;
-  char *remote_hostname;
+  void *context;
 };
 
 struct pn_session_t {
@@ -154,7 +161,7 @@ struct pn_link_t {
 
 struct pn_delivery_t {
   pn_link_t *link;
-  pn_bytes_t tag;
+  pn_buffer_t *tag;
   int local_state;
   int remote_state;
   bool local_settled;
@@ -171,9 +178,7 @@ struct pn_delivery_t {
   pn_delivery_t *tpwork_next;
   pn_delivery_t *tpwork_prev;
   bool tpwork;
-  char *bytes;
-  size_t size;
-  size_t capacity;
+  pn_buffer_t *bytes;
   bool done;
   void *context;
 };
@@ -201,5 +206,6 @@ void pn_link_dump(pn_link_t *link);
   }
 
 void pn_dump(pn_connection_t *conn);
+void pn_transport_sasl_init(pn_transport_t *transport);
 
 #endif /* engine-internal.h */

Modified: qpid/proton/branches/driver_abstraction/proton-c/src/engine/engine.c
URL: http://svn.apache.org/viewvc/qpid/proton/branches/driver_abstraction/proton-c/src/engine/engine.c?rev=1382918&r1=1382917&r2=1382918&view=diff
==============================================================================
--- qpid/proton/branches/driver_abstraction/proton-c/src/engine/engine.c (original)
+++ qpid/proton/branches/driver_abstraction/proton-c/src/engine/engine.c Mon Sep 10 14:56:20 2012
@@ -29,6 +29,8 @@
 #include <stdarg.h>
 #include <stdio.h>
 
+#include "../sasl/sasl-internal.h"
+
 // delivery buffers
 
 void pn_delivery_buffer_init(pn_delivery_buffer_t *db, pn_sequence_t next, size_t capacity)
@@ -187,21 +189,32 @@ void pn_connection_close(pn_connection_t
   if (connection) pn_close((pn_endpoint_t *) connection);
 }
 
+void pn_endpoint_tini(pn_endpoint_t *endpoint);
+
 void pn_connection_free(pn_connection_t *connection)
 {
   if (!connection) return;
 
-  pn_transport_free(connection->transport);
   while (connection->session_count)
     pn_session_free(connection->sessions[connection->session_count - 1]);
   free(connection->sessions);
   free(connection->container);
   free(connection->hostname);
-  free(connection->remote_container);
-  free(connection->remote_hostname);
+  pn_endpoint_tini(&connection->endpoint);
   free(connection);
 }
 
+void *pn_connection_context(pn_connection_t *conn)
+{
+    return conn ? conn->context : 0;
+}
+
+void pn_connection_set_context(pn_connection_t *conn, void *context)
+{
+    if (conn)
+        conn->context = context;
+}
+
 void pn_transport_open(pn_transport_t *transport)
 {
   pn_open((pn_endpoint_t *) transport);
@@ -216,6 +229,7 @@ void pn_transport_free(pn_transport_t *t
 {
   if (!transport) return;
 
+  pn_sasl_free(transport->sasl);
   pn_dispatcher_free(transport->disp);
   for (int i = 0; i < transport->session_capacity; i++) {
     pn_delivery_buffer_free(&transport->sessions[i].incoming);
@@ -223,6 +237,9 @@ void pn_transport_free(pn_transport_t *t
     free(transport->sessions[i].links);
     free(transport->sessions[i].handles);
   }
+  free(transport->remote_container);
+  free(transport->remote_hostname);
+  pn_error_free(transport->error);
   free(transport->sessions);
   free(transport->channels);
   free(transport);
@@ -273,6 +290,7 @@ void pn_session_free(pn_session_t *sessi
     pn_link_free(session->links[session->link_count - 1]);
   pn_remove_session(session->connection, session);
   free(session->links);
+  pn_endpoint_tini(&session->endpoint);
   free(session);
 }
 
@@ -298,28 +316,11 @@ void pn_remove_link(pn_session_t *ssn, p
   link->session = NULL;
 }
 
-void pn_clear_tag(pn_delivery_t *delivery)
-{
-  if (delivery->tag.start) {
-    free(delivery->tag.start);
-    delivery->tag = (pn_bytes_t) {0, NULL};
-  }
-}
-
-void pn_clear_bytes(pn_delivery_t *delivery)
-{
-  if (delivery->capacity) {
-    free(delivery->bytes);
-    delivery->bytes = NULL;
-    delivery->capacity = 0;
-  }
-}
-
 void pn_free_delivery(pn_delivery_t *delivery)
 {
   if (delivery) {
-    pn_clear_tag(delivery);
-    pn_clear_bytes(delivery);
+    pn_buffer_free(delivery->tag);
+    pn_buffer_free(delivery->bytes);
     free(delivery);
   }
 }
@@ -354,6 +355,7 @@ void pn_link_free(pn_link_t *link)
     pn_free_delivery(d);
   }
   free(link->name);
+  pn_endpoint_tini(&link->endpoint);
   free(link);
 }
 
@@ -371,11 +373,17 @@ void pn_endpoint_init(pn_endpoint_t *end
   LL_ADD(conn, endpoint, endpoint);
 }
 
+void pn_endpoint_tini(pn_endpoint_t *endpoint)
+{
+  pn_error_free(endpoint->error);
+}
+
 pn_connection_t *pn_connection()
 {
   pn_connection_t *conn = malloc(sizeof(pn_connection_t));
   if (!conn) return NULL;
 
+  conn->context = NULL;
   conn->endpoint_head = NULL;
   conn->endpoint_tail = NULL;
   pn_endpoint_init(&conn->endpoint, CONNECTION, conn);
@@ -391,8 +399,6 @@ pn_connection_t *pn_connection()
   conn->tpwork_tail = NULL;
   conn->container = NULL;
   conn->hostname = NULL;
-  conn->remote_container = NULL;
-  conn->remote_hostname = NULL;
 
   return conn;
 }
@@ -433,12 +439,14 @@ void pn_connection_set_hostname(pn_conne
 
 const char *pn_connection_remote_container(pn_connection_t *connection)
 {
-  return connection ? connection->remote_container : NULL;
+  if (!connection) return NULL;
+  return connection->transport ? connection->transport->remote_container : NULL;
 }
 
 const char *pn_connection_remote_hostname(pn_connection_t *connection)
 {
-  return connection ? connection->remote_hostname : NULL;
+  if (!connection) return NULL;
+  return connection->transport ? connection->transport->remote_hostname : NULL;
 }
 
 pn_delivery_t *pn_work_head(pn_connection_t *connection)
@@ -655,10 +663,21 @@ int pn_do_detach(pn_dispatcher_t *disp);
 int pn_do_end(pn_dispatcher_t *disp);
 int pn_do_close(pn_dispatcher_t *disp);
 
+static ssize_t pn_input_read_sasl_header(pn_transport_t *transport, char *bytes, size_t available);
+static ssize_t pn_input_read_sasl(pn_transport_t *transport, char *bytes, size_t available);
+static ssize_t pn_input_read_amqp_header(pn_transport_t *transport, char *bytes, size_t available);
+static ssize_t pn_input_read_amqp(pn_transport_t *transport, char *bytes, size_t available);
+static ssize_t pn_output_write_sasl_header(pn_transport_t *transport, char *bytes, size_t available);
+static ssize_t pn_output_write_sasl(pn_transport_t *transport, char *bytes, size_t available);
+static ssize_t pn_output_write_amqp_header(pn_transport_t *transport, char *bytes, size_t available);
+static ssize_t pn_output_write_amqp(pn_transport_t *transport, char *bytes, size_t available);
+
 void pn_transport_init(pn_transport_t *transport)
 {
-  pn_endpoint_init(&transport->endpoint, TRANSPORT, transport->connection);
-
+  transport->process_input = pn_input_read_amqp_header;
+  transport->process_output = pn_output_write_amqp_header;
+  transport->header_count = 0;
+  transport->sasl = NULL;
   transport->disp = pn_dispatcher(0, transport);
 
   pn_dispatcher_action(transport->disp, OPEN, "OPEN", pn_do_open);
@@ -675,7 +694,9 @@ void pn_transport_init(pn_transport_t *t
   transport->open_rcvd = false;
   transport->close_sent = false;
   transport->close_rcvd = false;
-  transport->error = 0;
+  transport->remote_container = NULL;
+  transport->remote_hostname = NULL;
+  transport->error = pn_error();
 
   transport->sessions = NULL;
   transport->session_capacity = 0;
@@ -716,25 +737,41 @@ void pn_map_channel(pn_transport_t *tran
   transport->channels[channel] = state;
 }
 
-pn_transport_t *pn_transport(pn_connection_t *conn)
+pn_transport_t *pn_transport()
 {
-  if (!conn) return NULL;
+  pn_transport_t *transport = malloc(sizeof(pn_transport_t));
+  if (!transport) return NULL;
 
-  if (conn->transport) {
-    return NULL;
-  } else {
-    conn->transport = malloc(sizeof(pn_transport_t));
-    if (!conn->transport) return NULL;
+  transport->connection = NULL;
+  pn_transport_init(transport);
+  return transport;
+}
 
-    conn->transport->connection = conn;
-    pn_transport_init(conn->transport);
-    return conn->transport;
+void pn_transport_sasl_init(pn_transport_t *transport)
+{
+  transport->process_input = pn_input_read_sasl_header;
+  transport->process_output = pn_output_write_sasl_header;
+}
+
+int pn_transport_bind(pn_transport_t *transport, pn_connection_t *connection)
+{
+  if (!transport) return PN_ARG_ERR;
+  if (transport->connection) return PN_STATE_ERR;
+  if (connection->transport) return PN_STATE_ERR;
+  transport->connection = connection;
+  connection->transport = transport;
+  if (transport->open_rcvd) {
+    PN_SET_REMOTE(connection->endpoint.state, PN_REMOTE_ACTIVE);
+    if (!pn_error_code(transport->error)) {
+      transport->disp->halt = false;
+    }
   }
+  return 0;
 }
 
 pn_error_t *pn_transport_error(pn_transport_t *transport)
 {
-  return transport->endpoint.error;
+  return transport->error;
 }
 
 void pn_link_init(pn_link_t *link, int type, pn_session_t *session, const char *name)
@@ -779,12 +816,12 @@ void pn_set_target(pn_link_t *link, cons
   link->local_target = pn_strdup(target);
 }
 
-char *pn_remote_source(pn_link_t *link)
+const char *pn_remote_source(pn_link_t *link)
 {
   return link ? link->remote_source : NULL;
 }
 
-char *pn_remote_target(pn_link_t *link)
+const char *pn_remote_target(pn_link_t *link)
 {
   return link ? link->remote_target : NULL;
 }
@@ -869,12 +906,15 @@ pn_delivery_t *pn_delivery(pn_link_t *li
   if (!link) return NULL;
   pn_delivery_t *delivery = link->settled_head;
   LL_POP(link, settled);
-  if (!delivery) delivery = malloc(sizeof(pn_delivery_t));
-  if (!delivery) return NULL;
+  if (!delivery) {
+    delivery = malloc(sizeof(pn_delivery_t));
+    if (!delivery) return NULL;
+    delivery->tag = pn_buffer(16);
+    delivery->bytes = pn_buffer(64);
+  }
   delivery->link = link;
-  delivery->tag.size = tag.size;
-  delivery->tag.start = malloc(tag.size);
-  memcpy(delivery->tag.start, tag.bytes, tag.size);
+  pn_buffer_clear(delivery->tag);
+  pn_buffer_append(delivery->tag, tag.bytes, tag.size);
   delivery->local_state = 0;
   delivery->remote_state = 0;
   delivery->local_settled = false;
@@ -888,9 +928,7 @@ pn_delivery_t *pn_delivery(pn_link_t *li
   delivery->tpwork_next = NULL;
   delivery->tpwork_prev = NULL;
   delivery->tpwork = false;
-  delivery->bytes = NULL;
-  delivery->size = 0;
-  delivery->capacity = 0;
+  pn_buffer_clear(delivery->bytes);
   delivery->done = false;
   delivery->context = NULL;
 
@@ -928,7 +966,8 @@ bool pn_is_current(pn_delivery_t *delive
 void pn_delivery_dump(pn_delivery_t *d)
 {
   char tag[1024];
-  pn_quote_data(tag, 1024, d->tag.start, d->tag.size);
+  pn_bytes_t bytes = pn_buffer_bytes(d->tag);
+  pn_quote_data(tag, 1024, bytes.start, bytes.size);
   printf("{tag=%s, local_state=%u, remote_state=%u, local_settled=%u, "
          "remote_settled=%u, updated=%u, current=%u, writable=%u, readable=%u, "
          "work=%u}",
@@ -940,7 +979,8 @@ void pn_delivery_dump(pn_delivery_t *d)
 pn_delivery_tag_t pn_delivery_tag(pn_delivery_t *delivery)
 {
   if (delivery) {
-    return pn_dtag(delivery->tag.start, delivery->tag.size);
+    pn_bytes_t tag = pn_buffer_bytes(delivery->tag);
+    return pn_dtag(tag.start, tag.size);
   } else {
     return (pn_delivery_tag_t) {0};
   }
@@ -1002,8 +1042,8 @@ void pn_real_settle(pn_delivery_t *deliv
   LL_REMOVE(link, unsettled, delivery);
   // TODO: what if we settle the current delivery?
   LL_ADD(link, settled, delivery);
-  pn_clear_tag(delivery);
-  pn_clear_bytes(delivery);
+  pn_buffer_clear(delivery->tag);
+  pn_buffer_clear(delivery->bytes);
   delivery->settled = true;
 }
 
@@ -1047,13 +1087,13 @@ int pn_do_error(pn_transport_t *transpor
   // XXX: result
   vsnprintf(buf, 1024, fmt, ap);
   va_end(ap);
-  pn_error_set(transport->endpoint.error, PN_ERR, buf);
+  pn_error_set(transport->error, PN_ERR, buf);
   if (!transport->close_sent) {
     pn_post_close(transport);
     transport->close_sent = true;
   }
   transport->disp->halt = true;
-  fprintf(stderr, "ERROR %s %s\n", condition, pn_error_text(transport->endpoint.error));
+  fprintf(stderr, "ERROR %s %s\n", condition, pn_error_text(transport->error));
   return PN_ERR;
 }
 
@@ -1072,16 +1112,20 @@ int pn_do_open(pn_dispatcher_t *disp)
                          &hostname_q, &remote_hostname);
   if (err) return err;
   if (container_q) {
-    conn->remote_container = pn_bytes_strdup(remote_container);
+    transport->remote_container = pn_bytes_strdup(remote_container);
   } else {
-    conn->remote_container = NULL;
+    transport->remote_container = NULL;
   }
   if (hostname_q) {
-    conn->remote_hostname = pn_bytes_strdup(remote_hostname);
+    transport->remote_hostname = pn_bytes_strdup(remote_hostname);
   } else {
-    conn->remote_hostname = NULL;
+    transport->remote_hostname = NULL;
+  }
+  if (conn) {
+    PN_SET_REMOTE(conn->endpoint.state, PN_REMOTE_ACTIVE);
+  } else {
+    transport->disp->halt = true;
   }
-  PN_SET_REMOTE(conn->endpoint.state, PN_REMOTE_ACTIVE);
   transport->open_rcvd = true;
   return 0;
 }
@@ -1215,9 +1259,7 @@ int pn_do_transfer(pn_dispatcher_t *disp
     link->queued++;
   }
 
-  PN_ENSURE(delivery->bytes, delivery->capacity, delivery->size + disp->size);
-  memmove(delivery->bytes + delivery->size, disp->payload, disp->size);
-  delivery->size += disp->size;
+  pn_buffer_append(delivery->bytes, disp->payload, disp->size);
   delivery->done = !more;
 
   ssn_state->incoming_transfer_count++;
@@ -1382,29 +1424,103 @@ ssize_t pn_input(pn_transport_t *transpo
 {
   if (!transport) return PN_ARG_ERR;
 
-  if (!available) {
-    pn_do_error(transport, "amqp:connection:framing-error", "connection aborted");
-    if (transport->disp->trace & (PN_TRACE_RAW | PN_TRACE_FRM))
-      fprintf(stderr, "    <- EOS\n");
-    return PN_ERR;
+  size_t consumed = 0;
+
+  while (true) {
+    ssize_t n = transport->process_input(transport, bytes + consumed, available - consumed);
+    if (n > 0) {
+      consumed += n;
+      if (consumed >= available) {
+        break;
+      }
+    } else if (n == 0) {
+      break;
+    } else {
+      if (transport->disp->trace & (PN_TRACE_RAW | PN_TRACE_FRM))
+        pn_dispatcher_trace(transport->disp, 0, "<- EOS\n");
+      return n;
+    }
+  }
+
+  return consumed;
+}
+
+#define SASL_HEADER ("AMQP\x03\x01\x00\x00")
+
+static ssize_t pn_input_read_header(pn_transport_t *transport, char *bytes, size_t available,
+                                    const char *header, size_t size, const char *protocol,
+                                    ssize_t (*next)(pn_transport_t *, char *, size_t))
+{
+  const char *point = header + transport->header_count;
+  int delta = pn_min(available, size - transport->header_count);
+  if (!available || memcmp(bytes, point, delta)) {
+    char quoted[1024];
+    pn_quote_data(quoted, 1024, bytes, available);
+    return pn_error_format(transport->error, PN_ERR,
+                           "%s header missmatch: '%s'", protocol, quoted);
+  } else {
+    transport->header_count += delta;
+    if (transport->header_count == size) {
+      transport->header_count = 0;
+      transport->process_input = next;
+
+      if (transport->disp->trace & PN_TRACE_FRM)
+        fprintf(stderr, "    <- %s\n", protocol);
+    }
+    return delta;
   }
+}
 
+static ssize_t pn_input_read_sasl_header(pn_transport_t *transport, char *bytes, size_t available)
+{
+  return pn_input_read_header(transport, bytes, available, SASL_HEADER, 8, "SASL", pn_input_read_sasl);
+}
+
+static ssize_t pn_input_read_sasl(pn_transport_t *transport, char *bytes, size_t available)
+{
+  pn_sasl_t *sasl = transport->sasl;
+  ssize_t n = pn_sasl_input(sasl, bytes, available);
+  if (n == PN_EOS) {
+    transport->process_input = pn_input_read_amqp_header;
+    return transport->process_input(transport, bytes, available);
+  } else {
+    return n;
+  }
+}
+
+#define AMQP_HEADER ("AMQP\x00\x01\x00\x00")
+
+static ssize_t pn_input_read_amqp_header(pn_transport_t *transport, char *bytes, size_t available)
+{
+  return pn_input_read_header(transport, bytes, available, AMQP_HEADER, 8,
+                              "AMQP", pn_input_read_amqp);
+}
+
+static ssize_t pn_input_read_amqp(pn_transport_t *transport, char *bytes, size_t available)
+{
   if (transport->close_rcvd) {
-    pn_do_error(transport, "amqp:connection:framing-error", "data after close");
-    if (transport->disp->trace & (PN_TRACE_RAW | PN_TRACE_FRM))
-      fprintf(stderr, "    <- EOS\n");
+    if (available > 0) {
+      pn_do_error(transport, "amqp:connection:framing-error", "data after close");
+      return PN_ERR;
+    } else {
+      return PN_EOS;
+    }
+  }
+
+  if (!available) {
+    pn_do_error(transport, "amqp:connection:framing-error", "connection aborted");
     return PN_ERR;
   }
 
+
   ssize_t n = pn_dispatcher_input(transport->disp, bytes, available);
-  if (n >= 0 && transport->close_rcvd) {
-    if (transport->disp->trace & (PN_TRACE_RAW | PN_TRACE_FRM))
-      fprintf(stderr, "    <- EOS\n");
+  if (n < 0) {
+    return pn_error_set(transport->error, n, "dispatch error");
+  } else if (transport->close_rcvd) {
     return PN_EOS;
-  } else if (n < 0) {
-    transport->error = n;
+  } else {
+    return n;
   }
-  return n;
 }
 
 bool pn_delivery_buffered(pn_delivery_t *delivery)
@@ -1413,7 +1529,7 @@ bool pn_delivery_buffered(pn_delivery_t 
   if (pn_is_sender(delivery->link)) {
     pn_delivery_state_t *state = delivery->context;
     if (state) {
-      return (delivery->done && !state->sent) || delivery->size > 0;
+      return (delivery->done && !state->sent) || pn_buffer_size(delivery->bytes) > 0;
     } else {
       return delivery->done;
     }
@@ -1561,16 +1677,16 @@ int pn_process_tpwork_sender(pn_transpor
       delivery->context = state;
     }
 
-    if (state && !state->sent && (delivery->done || delivery->size > 0) &&
+    if (state && !state->sent && (delivery->done || pn_buffer_size(delivery->bytes) > 0) &&
         ssn_state->outgoing_window > 0 && link_state->link_credit > 0) {
-      if (delivery->bytes) {
-        pn_set_payload(transport->disp, delivery->bytes, delivery->size);
-        delivery->size = 0;
-      }
+      pn_bytes_t bytes = pn_buffer_bytes(delivery->bytes);
+      pn_set_payload(transport->disp, bytes.start, bytes.size);
+      pn_buffer_clear(delivery->bytes);
+      pn_bytes_t tag = pn_buffer_bytes(delivery->tag);
       int err = pn_post_frame(transport->disp, ssn_state->local_channel, "DL[IIzIoo]", TRANSFER,
                               link_state->local_handle, state->id,
-                              delivery->tag.size, delivery->tag.start,
-                              0, delivery->local_settled, !delivery->done);
+                              tag.size, tag.start, 0, delivery->local_settled,
+                              !delivery->done);
       if (err) return err;
       ssn_state->outgoing_transfer_count++;
       ssn_state->outgoing_window--;
@@ -1801,18 +1917,60 @@ int pn_process(pn_transport_t *transport
   return 0;
 }
 
-ssize_t pn_output(pn_transport_t *transport, char *bytes, size_t size)
+static ssize_t pn_output_write_header(pn_transport_t *transport,
+                                      char *bytes, size_t size,
+                                      const char *header, size_t hdrsize,
+                                      const char *protocol,
+                                      ssize_t (*next)(pn_transport_t *, char *, size_t))
+{
+  if (transport->disp->trace & PN_TRACE_FRM)
+    fprintf(stderr, "    -> %s\n", protocol);
+  if (size >= hdrsize) {
+    memmove(bytes, header, hdrsize);
+    transport->process_output = next;
+    return hdrsize;
+  } else {
+    return pn_error_format(transport->error, PN_UNDERFLOW, "underflow writing %s header", protocol);
+  }
+}
+
+static ssize_t pn_output_write_sasl_header(pn_transport_t *transport, char *bytes, size_t size)
 {
-  if (!transport) return PN_ARG_ERR;
+  return pn_output_write_header(transport, bytes, size, SASL_HEADER, 8, "SASL",
+                                pn_output_write_sasl);
+}
+
+static ssize_t pn_output_write_sasl(pn_transport_t *transport, char *bytes, size_t size)
+{
+  pn_sasl_t *sasl = transport->sasl;
+  ssize_t n = pn_sasl_output(sasl, bytes, size);
+  if (n == PN_EOS) {
+    transport->process_output = pn_output_write_amqp_header;
+    return 0;
+  } else {
+    return n;
+  }
+}
 
-  if (!transport->error)
-    transport->error = pn_process(transport);
+static ssize_t pn_output_write_amqp_header(pn_transport_t *transport, char *bytes, size_t size)
+{
+  return pn_output_write_header(transport, bytes, size, AMQP_HEADER, 8, "AMQP",
+                                pn_output_write_amqp);
+}
+
+static ssize_t pn_output_write_amqp(pn_transport_t *transport, char *bytes, size_t size)
+{
+  if (!transport->connection) {
+    return 0;
+  }
+
+  if (!pn_error_code(transport->error)) {
+    pn_error_set(transport->error, pn_process(transport), "process error");
+  }
 
-  if (!transport->disp->available && (transport->close_sent || transport->error)) {
-    if (transport->disp->trace & (PN_TRACE_RAW | PN_TRACE_FRM))
-      fprintf(stderr, "    -> EOS\n");
-    if (transport->error)
-      return transport->error;
+  if (!transport->disp->available && (transport->close_sent || pn_error_code(transport->error))) {
+    if (pn_error_code(transport->error))
+      return pn_error_code(transport->error);
     else
       return PN_EOS;
   }
@@ -1820,8 +1978,40 @@ ssize_t pn_output(pn_transport_t *transp
   return pn_dispatcher_output(transport->disp, bytes, size);
 }
 
+ssize_t pn_output(pn_transport_t *transport, char *bytes, size_t size)
+{
+  if (!transport) return PN_ARG_ERR;
+
+  size_t total = 0;
+
+  while (size - total > 0) {
+    ssize_t n = transport->process_output(transport, bytes + total, size - total);
+    if (n > 0) {
+      total += n;
+    } else if (n == 0) {
+      break;
+    } else if (n == PN_EOS) {
+      if (total > 0) {
+        return total;
+      } else {
+        if (transport->disp->trace & (PN_TRACE_RAW | PN_TRACE_FRM))
+          pn_dispatcher_trace(transport->disp, 0, "-> EOS\n");
+        return PN_EOS;
+      }
+    } else {
+      if (transport->disp->trace & (PN_TRACE_RAW | PN_TRACE_FRM))
+        pn_dispatcher_trace(transport->disp, 0, "-> EOS (%zi) %s\n", n,
+                            pn_error_text(transport->error));
+      return n;
+    }
+  }
+
+  return total;
+}
+
 void pn_trace(pn_transport_t *transport, pn_trace_t trace)
 {
+  if (transport->sasl) pn_sasl_trace(transport->sasl, trace);
   transport->disp->trace = trace;
 }
 
@@ -1829,9 +2019,7 @@ ssize_t pn_send(pn_link_t *sender, const
 {
   pn_delivery_t *current = pn_current(sender);
   if (!current) return PN_EOS;
-  PN_ENSURE(current->bytes, current->capacity, current->size + n);
-  memmove(current->bytes + current->size, bytes, n);
-  current->size += n;
+  pn_buffer_append(current->bytes, bytes, n);
   pn_add_tpwork(current);
   return n;
 }
@@ -1850,11 +2038,9 @@ ssize_t pn_recv(pn_link_t *receiver, cha
 
   pn_delivery_t *delivery = receiver->current;
   if (delivery) {
-    if (delivery->size) {
-      size_t size = n > delivery->size ? delivery->size : n;
-      memmove(bytes, delivery->bytes, size);
-      memmove(bytes, bytes + size, delivery->size - size);
-      delivery->size -= size;
+    size_t size = pn_buffer_get(delivery->bytes, 0, n, bytes);
+    pn_buffer_trim(delivery->bytes, size, 0);
+    if (size) {
       return size;
     } else {
       return delivery->done ? PN_EOS : 0;
@@ -1945,5 +2131,5 @@ bool pn_readable(pn_delivery_t *delivery
 
 size_t pn_pending(pn_delivery_t *delivery)
 {
-  return delivery->size;
+  return pn_buffer_size(delivery->bytes);
 }

Modified: qpid/proton/branches/driver_abstraction/proton-c/src/error.c
URL: http://svn.apache.org/viewvc/qpid/proton/branches/driver_abstraction/proton-c/src/error.c?rev=1382918&r1=1382917&r2=1382918&view=diff
==============================================================================
--- qpid/proton/branches/driver_abstraction/proton-c/src/error.c (original)
+++ qpid/proton/branches/driver_abstraction/proton-c/src/error.c Mon Sep 10 14:56:20 2012
@@ -19,8 +19,11 @@
  *
  */
 
+#define _POSIX_C_SOURCE (200112)
+
 #include <proton/error.h>
 #include <stdlib.h>
+#include <string.h>
 #include "util.h"
 
 struct pn_error_t {
@@ -59,8 +62,10 @@ void pn_error_clear(pn_error_t *error)
 int pn_error_set(pn_error_t *error, int code, const char *text)
 {
   pn_error_clear(error);
-  error->code = code;
-  error->text = pn_strdup(text);
+  if (code) {
+    error->code = code;
+    error->text = pn_strdup(text);
+  }
   return code;
 }
 
@@ -83,6 +88,13 @@ int pn_error_format(pn_error_t *error, i
   return rcode;
 }
 
+int pn_error_from_errno(pn_error_t *error, const char *msg)
+{
+  char err[1024];
+  strerror_r(errno, err, 1024);
+  return pn_error_format(error, PN_ERR, "%s: %s", msg, err);
+}
+
 int pn_error_code(pn_error_t *error)
 {
   return error ? error->code : PN_ARG_ERR;

Modified: qpid/proton/branches/driver_abstraction/proton-c/src/message/message.c
URL: http://svn.apache.org/viewvc/qpid/proton/branches/driver_abstraction/proton-c/src/message/message.c?rev=1382918&r1=1382917&r2=1382918&view=diff
==============================================================================
--- qpid/proton/branches/driver_abstraction/proton-c/src/message/message.c (original)
+++ qpid/proton/branches/driver_abstraction/proton-c/src/message/message.c Mon Sep 10 14:56:20 2012
@@ -113,6 +113,7 @@ void pn_message_free(pn_message_t *msg)
     pn_buffer_free(msg->reply_to_group_id);
     pn_data_free(msg->data);
     pn_data_free(msg->body);
+    pn_parser_free(msg->parser);
     free(msg);
   }
 }
@@ -676,12 +677,12 @@ int pn_message_save_text(pn_message_t *m
   }
 
   uint64_t desc;
-  pn_bytes_t str;
-  bool scanned;
-  int err = pn_data_scan(msg->body, "DL?S", &desc, &scanned, &str);
+  pn_bytes_t str = {0,0};
+  bool scanned, dscanned;
+  int err = pn_data_scan(msg->body, "?DL?S", &dscanned, &desc, &scanned, &str);
   if (err) return err;
-  if (desc == AMQP_VALUE && scanned) {
-    if (str.size >= *size) {
+  if (dscanned && desc == AMQP_VALUE) {
+    if (scanned && str.size >= *size) {
       return PN_OVERFLOW;
     } else {
       memcpy(data, str.start, str.size);

Modified: qpid/proton/branches/driver_abstraction/proton-c/src/messenger.c
URL: http://svn.apache.org/viewvc/qpid/proton/branches/driver_abstraction/proton-c/src/messenger.c?rev=1382918&r1=1382917&r2=1382918&view=diff
==============================================================================
--- qpid/proton/branches/driver_abstraction/proton-c/src/messenger.c (original)
+++ qpid/proton/branches/driver_abstraction/proton-c/src/messenger.c Mon Sep 10 14:56:20 2012
@@ -32,9 +32,6 @@ struct pn_messenger_t {
   char *name;
   int timeout;
   pn_driver_t *driver;
-  pn_connector_t *connectors[1024];
-  size_t size;
-  size_t listeners;
   int credit;
   uint64_t next_tag;
   pn_error_t *error;
@@ -61,8 +58,6 @@ pn_messenger_t *pn_messenger(const char 
     m->name = build_name(name);
     m->timeout = -1;
     m->driver = pn_driver();
-    m->size = 0;
-    m->listeners = 0;
     m->credit = 0;
     m->next_tag = 0;
     m->error = pn_error();
@@ -94,6 +89,7 @@ void pn_messenger_free(pn_messenger_t *m
     free(messenger->name);
     pn_driver_free(messenger->driver);
     pn_error_free(messenger->error);
+    free(messenger);
   }
 }
 
@@ -119,8 +115,8 @@ void pn_messenger_flow(pn_messenger_t *m
 {
   while (messenger->credit > 0) {
     int prev = messenger->credit;
-    for (int i = 0; i < messenger->size; i++) {
-      pn_connector_t *ctor = messenger->connectors[i];
+    pn_connector_t *ctor = pn_connector_head(messenger->driver);
+    while (ctor) {
       pn_connection_t *conn = pn_connector_connection(ctor);
 
       pn_link_t *link = pn_link_head(conn, PN_LOCAL_ACTIVE);
@@ -131,6 +127,8 @@ void pn_messenger_flow(pn_messenger_t *m
         }
         link = pn_link_next(link, PN_LOCAL_ACTIVE);
       }
+
+      ctor = pn_connector_next(ctor);
     }
     if (messenger->credit == prev) break;
   }
@@ -193,8 +191,10 @@ long int millis(struct timeval tv)
 
 int pn_messenger_tsync(pn_messenger_t *messenger, bool (*predicate)(pn_messenger_t *), int timeout)
 {
-  for (int i = 0; i < messenger->size; i++) {
-    pn_connector_process(messenger->connectors[i]);
+  pn_connector_t *ctor = pn_connector_head(messenger->driver);
+  while (ctor) {
+    pn_connector_process(ctor);
+    ctor = pn_connector_next(ctor);
   }
 
   struct timeval now;
@@ -219,7 +219,6 @@ int pn_messenger_tsync(pn_messenger_t *m
       pn_connection_t *conn = pn_connection();
       pn_connection_set_container(conn, messenger->name);
       pn_connector_set_connection(c, conn);
-      messenger->connectors[messenger->size++] = c;
     }
 
     pn_connector_t *c;
@@ -228,17 +227,10 @@ int pn_messenger_tsync(pn_messenger_t *m
       pn_connection_t *conn = pn_connector_connection(c);
       pn_messenger_endpoints(messenger, conn);
       if (pn_connector_closed(c)) {
-        for (int i = 0; i < messenger->size; i++) {
-          if (c == messenger->connectors[i]) {
-            memmove(messenger->connectors + i, messenger->connectors + i + 1, messenger->size - i - 1);
-            messenger->size--;
-            pn_connector_free(c);
-            pn_messenger_reclaim(messenger, conn);
-            pn_connection_free(conn);
-            pn_messenger_flow(messenger);
-            break;
-          }
-        }
+        pn_connector_free(c);
+        pn_messenger_reclaim(messenger, conn);
+        pn_connection_free(conn);
+        pn_messenger_flow(messenger);
       } else {
         pn_connector_process(c);
       }
@@ -266,15 +258,15 @@ int pn_messenger_start(pn_messenger_t *m
 
 bool pn_messenger_stopped(pn_messenger_t *messenger)
 {
-  return messenger->size == 0;
+  return pn_connector_head(messenger->driver) == NULL;
 }
 
 int pn_messenger_stop(pn_messenger_t *messenger)
 {
   if (!messenger) return PN_ARG_ERR;
 
-  for (int i = 0; i < messenger->size; i++) {
-    pn_connector_t *ctor = messenger->connectors[i];
+  pn_connector_t *ctor = pn_connector_head(messenger->driver);
+  while (ctor) {
     pn_connection_t *conn = pn_connector_connection(ctor);
     pn_link_t *link = pn_link_head(conn, PN_LOCAL_ACTIVE);
     while (link) {
@@ -282,6 +274,15 @@ int pn_messenger_stop(pn_messenger_t *me
       link = pn_link_next(link, PN_LOCAL_ACTIVE);
     }
     pn_connection_close(conn);
+    ctor = pn_connector_next(ctor);
+  }
+
+  pn_listener_t *l = pn_listener_head(messenger->driver);
+  while (l) {
+    pn_listener_close(l);
+    pn_listener_t *prev = l;
+    l = pn_listener_next(l);
+    pn_listener_free(prev);
   }
 
   return pn_messenger_sync(messenger, pn_messenger_stopped);
@@ -299,6 +300,7 @@ static void parse_address(char *address,
       if (*c == '/') {
         *c = '\0';
         *name = c + 1;
+        break;
       }
     }
   } else {
@@ -313,7 +315,7 @@ bool pn_streq(const char *a, const char 
 
 pn_connection_t *pn_messenger_domain(pn_messenger_t *messenger, const char *domain)
 {
-  char buf[strlen(domain) + 1];
+  char buf[domain ? strlen(domain) + 1 : 1];
   if (domain) {
     strcpy(buf, domain);
   } else {
@@ -325,17 +327,18 @@ pn_connection_t *pn_messenger_domain(pn_
   char *port = "5672";
   parse_url(buf, &user, &pass, &host, &port);
 
-  for (int i = 0; i < messenger->size; i++) {
-    pn_connection_t *connection = pn_connector_connection(messenger->connectors[i]);
+  pn_connector_t *ctor = pn_connector_head(messenger->driver);
+  while (ctor) {
+    pn_connection_t *connection = pn_connector_connection(ctor);
     const char *container = pn_connection_remote_container(connection);
     const char *hostname = pn_connection_hostname(connection);
     if (pn_streq(container, domain) || pn_streq(hostname, domain))
       return connection;
+    ctor = pn_connector_next(ctor);
   }
 
   pn_connector_t *connector = pn_connector(messenger->driver, host, port, NULL);
   if (!connector) return NULL;
-  messenger->connectors[messenger->size++] = connector;
   pn_sasl_t *sasl = pn_connector_sasl(connector);
   if (user) {
     pn_sasl_plain(sasl, user, pass);
@@ -414,11 +417,7 @@ pn_listener_t *pn_messenger_isource(pn_m
   char *port = "5672";
   parse_url(domain + 1, &user, &pass, &host, &port);
 
-  pn_listener_t *listener = pn_listener(messenger->driver, host, port, NULL);
-  if (listener) {
-    messenger->listeners++;
-  }
-  return listener;
+  return pn_listener(messenger->driver, host, port, NULL);
 }
 
 int pn_messenger_subscribe(pn_messenger_t *messenger, const char *source)
@@ -430,7 +429,8 @@ int pn_messenger_subscribe(pn_messenger_
       return 0;
     } else {
       return pn_error_format(messenger->error, PN_ERR,
-                             "unable to subscribe to source: %s", source);
+                             "unable to subscribe to source: %s (%s)", source,
+                             pn_driver_error(messenger->driver));
     }
   } else if (len >= 2 && source[0] == '/' && source[1] == '/') {
     pn_link_t *src = pn_messenger_source(messenger, source);
@@ -438,7 +438,8 @@ int pn_messenger_subscribe(pn_messenger_
       return 0;
     } else {
       return pn_error_format(messenger->error, PN_ERR,
-                             "unable to subscribe to source: %s", source);
+                             "unable to subscribe to source: %s (%s)", source,
+                             pn_driver_error(messenger->driver));
     }
   } else {
     return 0;
@@ -471,7 +472,8 @@ int pn_messenger_put(pn_messenger_t *mes
   pn_link_t *sender = pn_messenger_target(messenger, address);
   if (!sender)
     return pn_error_format(messenger->error, PN_ERR,
-                           "unable to send to address: %s", address);
+                           "unable to send to address: %s (%s)", address,
+                           pn_driver_error(messenger->driver));
   // XXX: proper tag
   char tag[8];
   void *ptr = &tag;
@@ -504,8 +506,8 @@ int pn_messenger_put(pn_messenger_t *mes
 
 bool pn_messenger_sent(pn_messenger_t *messenger)
 {
-  for (int i = 0; i < messenger->size; i++) {
-    pn_connector_t *ctor = messenger->connectors[i];
+  pn_connector_t *ctor = pn_connector_head(messenger->driver);
+  while (ctor) {
     pn_connection_t *conn = pn_connector_connection(ctor);
 
     pn_link_t *link = pn_link_head(conn, PN_LOCAL_ACTIVE);
@@ -523,6 +525,8 @@ bool pn_messenger_sent(pn_messenger_t *m
       }
       link = pn_link_next(link, PN_LOCAL_ACTIVE);
     }
+
+    ctor = pn_connector_next(ctor);
   }
 
   return true;
@@ -530,8 +534,8 @@ bool pn_messenger_sent(pn_messenger_t *m
 
 bool pn_messenger_rcvd(pn_messenger_t *messenger)
 {
-  for (int i = 0; i < messenger->size; i++) {
-    pn_connector_t *ctor = messenger->connectors[i];
+  pn_connector_t *ctor = pn_connector_head(messenger->driver);
+  while (ctor) {
     pn_connection_t *conn = pn_connector_connection(ctor);
 
     pn_delivery_t *d = pn_work_head(conn);
@@ -541,6 +545,7 @@ bool pn_messenger_rcvd(pn_messenger_t *m
       }
       d = pn_work_next(d);
     }
+    ctor = pn_connector_next(ctor);
   }
 
   return false;
@@ -554,7 +559,7 @@ int pn_messenger_send(pn_messenger_t *me
 int pn_messenger_recv(pn_messenger_t *messenger, int n)
 {
   if (!messenger) return PN_ARG_ERR;
-  if (!messenger->listeners && !messenger->size)
+  if (!pn_listener_head(messenger->driver) && !pn_connector_head(messenger->driver))
     return pn_error_format(messenger->error, PN_STATE_ERR, "no valid sources");
   messenger->credit += n;
   pn_messenger_flow(messenger);
@@ -565,8 +570,8 @@ int pn_messenger_get(pn_messenger_t *mes
 {
   if (!messenger) return PN_ARG_ERR;
 
-  for (int i = 0; i < messenger->size; i++) {
-    pn_connector_t *ctor = messenger->connectors[i];
+  pn_connector_t *ctor = pn_connector_head(messenger->driver);
+  while (ctor) {
     pn_connection_t *conn = pn_connector_connection(ctor);
 
     pn_delivery_t *d = pn_work_head(conn);
@@ -592,6 +597,8 @@ int pn_messenger_get(pn_messenger_t *mes
       }
       d = pn_work_next(d);
     }
+
+    ctor = pn_connector_next(ctor);
   }
 
   // XXX: need to drain credit before returning EOS
@@ -605,8 +612,8 @@ int pn_messenger_queued(pn_messenger_t *
 
   int result = 0;
 
-  for (int i = 0; i < messenger->size; i++) {
-    pn_connector_t *ctor = messenger->connectors[i];
+  pn_connector_t *ctor = pn_connector_head(messenger->driver);
+  while (ctor) {
     pn_connection_t *conn = pn_connector_connection(ctor);
 
     pn_link_t *link = pn_link_head(conn, PN_LOCAL_ACTIVE);
@@ -620,6 +627,7 @@ int pn_messenger_queued(pn_messenger_t *
       }
       link = pn_link_next(link, PN_LOCAL_ACTIVE);
     }
+    ctor = pn_connector_next(ctor);
   }
 
   return result;

Modified: qpid/proton/branches/driver_abstraction/proton-c/src/sasl/sasl.c
URL: http://svn.apache.org/viewvc/qpid/proton/branches/driver_abstraction/proton-c/src/sasl/sasl.c?rev=1382918&r1=1382917&r2=1382918&view=diff
==============================================================================
--- qpid/proton/branches/driver_abstraction/proton-c/src/sasl/sasl.c (original)
+++ qpid/proton/branches/driver_abstraction/proton-c/src/sasl/sasl.c Mon Sep 10 14:56:20 2012
@@ -27,6 +27,7 @@
 #include <proton/sasl.h>
 #include "protocol.h"
 #include "../dispatcher/dispatcher.h"
+#include "../engine/engine-internal.h"
 #include "../util.h"
 
 #define SCRATCH (1024)
@@ -53,30 +54,36 @@ int pn_do_challenge(pn_dispatcher_t *dis
 int pn_do_response(pn_dispatcher_t *disp);
 int pn_do_outcome(pn_dispatcher_t *disp);
 
-pn_sasl_t *pn_sasl()
+pn_sasl_t *pn_sasl(pn_transport_t *transport)
 {
-  pn_sasl_t *sasl = malloc(sizeof(pn_sasl_t));
-  sasl->disp = pn_dispatcher(1, sasl);
+  if (!transport->sasl) {
+    pn_sasl_t *sasl = malloc(sizeof(pn_sasl_t));
+    sasl->disp = pn_dispatcher(1, sasl);
+
+    pn_dispatcher_action(sasl->disp, SASL_INIT, "SASL-INIT", pn_do_init);
+    pn_dispatcher_action(sasl->disp, SASL_MECHANISMS, "SASL-MECHANISMS", pn_do_mechanisms);
+    pn_dispatcher_action(sasl->disp, SASL_CHALLENGE, "SASL-CHALLENGE", pn_do_challenge);
+    pn_dispatcher_action(sasl->disp, SASL_RESPONSE, "SASL-RESPONSE", pn_do_response);
+    pn_dispatcher_action(sasl->disp, SASL_OUTCOME, "SASL-OUTCOME", pn_do_outcome);
 
-  pn_dispatcher_action(sasl->disp, SASL_INIT, "SASL-INIT", pn_do_init);
-  pn_dispatcher_action(sasl->disp, SASL_MECHANISMS, "SASL-MECHANISMS", pn_do_mechanisms);
-  pn_dispatcher_action(sasl->disp, SASL_CHALLENGE, "SASL-CHALLENGE", pn_do_challenge);
-  pn_dispatcher_action(sasl->disp, SASL_RESPONSE, "SASL-RESPONSE", pn_do_response);
-  pn_dispatcher_action(sasl->disp, SASL_OUTCOME, "SASL-OUTCOME", pn_do_outcome);
-
-  sasl->client = false;
-  sasl->configured = false;
-  sasl->mechanisms = NULL;
-  sasl->remote_mechanisms = NULL;
-  sasl->send_data = (pn_bytes_t) {0, NULL};
-  sasl->recv_data = (pn_bytes_t) {0, NULL};
-  sasl->outcome = PN_SASL_NONE;
-  sasl->sent_init = false;
-  sasl->rcvd_init = false;
-  sasl->sent_done = false;
-  sasl->rcvd_done = false;
+    sasl->client = false;
+    sasl->configured = false;
+    sasl->mechanisms = NULL;
+    sasl->remote_mechanisms = NULL;
+    sasl->send_data = (pn_bytes_t) {0, NULL};
+    sasl->recv_data = (pn_bytes_t) {0, NULL};
+    sasl->outcome = PN_SASL_NONE;
+    sasl->sent_init = false;
+    sasl->rcvd_init = false;
+    sasl->sent_done = false;
+    sasl->rcvd_done = false;
+
+    transport->sasl = sasl;
+
+    pn_transport_sasl_init(transport);
+  }
 
-  return sasl;
+  return transport->sasl;
 }
 
 pn_sasl_state_t pn_sasl_state(pn_sasl_t *sasl)
@@ -201,12 +208,14 @@ void pn_sasl_trace(pn_sasl_t *sasl, pn_t
 
 void pn_sasl_free(pn_sasl_t *sasl)
 {
-  free(sasl->mechanisms);
-  free(sasl->remote_mechanisms);
-  free(sasl->send_data.start);
-  free(sasl->recv_data.start);
-  pn_dispatcher_free(sasl->disp);
-  free(sasl);
+  if (sasl) {
+    free(sasl->mechanisms);
+    free(sasl->remote_mechanisms);
+    free(sasl->send_data.start);
+    free(sasl->recv_data.start);
+    pn_dispatcher_free(sasl->disp);
+    free(sasl);
+  }
 }
 
 void pn_client_init(pn_sasl_t *sasl)



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org