You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kg...@apache.org on 2012/09/10 16:56:23 UTC
svn commit: r1382918 [1/2] - in /qpid/proton/branches/driver_abstraction: ./
examples/broker/ proton-c/ proton-c/bindings/ proton-c/bindings/php/
proton-c/bindings/python/ proton-c/bindings/ruby/ proton-c/include/proton/
proton-c/src/ proton-c/src/disp...
Author: kgiusti
Date: Mon Sep 10 14:56:20 2012
New Revision: 1382918
URL: http://svn.apache.org/viewvc?rev=1382918&view=rev
Log:
checkpoint: merge latest from trunk
Added:
qpid/proton/branches/driver_abstraction/proton-c/include/proton/cproton.i
- copied unchanged from r1382866, qpid/proton/trunk/proton-c/include/proton/cproton.i
qpid/proton/branches/driver_abstraction/proton-c/src/sasl/sasl-internal.h
- copied unchanged from r1382866, qpid/proton/trunk/proton-c/src/sasl/sasl-internal.h
qpid/proton/branches/driver_abstraction/proton-j/pom.xml
- copied unchanged from r1382866, qpid/proton/trunk/proton-j/pom.xml
qpid/proton/branches/driver_abstraction/proton-j/src/main/
- copied from r1382866, qpid/proton/trunk/proton-j/src/main/
qpid/proton/branches/driver_abstraction/tests/proton_tests/transport.py
- copied unchanged from r1382866, qpid/proton/trunk/tests/proton_tests/transport.py
Removed:
qpid/proton/branches/driver_abstraction/proton-c/bindings/cproton.i
qpid/proton/branches/driver_abstraction/proton-j/src/org/
Modified:
qpid/proton/branches/driver_abstraction/ (props changed)
qpid/proton/branches/driver_abstraction/examples/broker/broker
qpid/proton/branches/driver_abstraction/proton-c/CMakeLists.txt
qpid/proton/branches/driver_abstraction/proton-c/bindings/CMakeLists.txt
qpid/proton/branches/driver_abstraction/proton-c/bindings/php/php.i
qpid/proton/branches/driver_abstraction/proton-c/bindings/python/python.i
qpid/proton/branches/driver_abstraction/proton-c/bindings/ruby/ruby.i
qpid/proton/branches/driver_abstraction/proton-c/include/proton/buffer.h
qpid/proton/branches/driver_abstraction/proton-c/include/proton/codec.h
qpid/proton/branches/driver_abstraction/proton-c/include/proton/driver.h
qpid/proton/branches/driver_abstraction/proton-c/include/proton/engine.h
qpid/proton/branches/driver_abstraction/proton-c/include/proton/error.h
qpid/proton/branches/driver_abstraction/proton-c/include/proton/framing.h
qpid/proton/branches/driver_abstraction/proton-c/include/proton/message.h
qpid/proton/branches/driver_abstraction/proton-c/include/proton/messenger.h
qpid/proton/branches/driver_abstraction/proton-c/include/proton/parser.h
qpid/proton/branches/driver_abstraction/proton-c/include/proton/sasl.h
qpid/proton/branches/driver_abstraction/proton-c/include/proton/scanner.h
qpid/proton/branches/driver_abstraction/proton-c/include/proton/types.h
qpid/proton/branches/driver_abstraction/proton-c/include/proton/util.h
qpid/proton/branches/driver_abstraction/proton-c/src/buffer.c
qpid/proton/branches/driver_abstraction/proton-c/src/dispatcher/dispatcher.c
qpid/proton/branches/driver_abstraction/proton-c/src/dispatcher/dispatcher.h
qpid/proton/branches/driver_abstraction/proton-c/src/driver.c
qpid/proton/branches/driver_abstraction/proton-c/src/driver_impl.h
qpid/proton/branches/driver_abstraction/proton-c/src/engine/engine-internal.h
qpid/proton/branches/driver_abstraction/proton-c/src/engine/engine.c
qpid/proton/branches/driver_abstraction/proton-c/src/error.c
qpid/proton/branches/driver_abstraction/proton-c/src/message/message.c
qpid/proton/branches/driver_abstraction/proton-c/src/messenger.c
qpid/proton/branches/driver_abstraction/proton-c/src/sasl/sasl.c
qpid/proton/branches/driver_abstraction/proton-c/src/scanner.c
qpid/proton/branches/driver_abstraction/proton-c/src/util.h
qpid/proton/branches/driver_abstraction/tests/proton_tests/__init__.py
qpid/proton/branches/driver_abstraction/tests/proton_tests/engine.py
qpid/proton/branches/driver_abstraction/tests/proton_tests/message.py
qpid/proton/branches/driver_abstraction/tests/proton_tests/messenger.py
qpid/proton/branches/driver_abstraction/tests/proton_tests/sasl.py
Propchange: qpid/proton/branches/driver_abstraction/
------------------------------------------------------------------------------
Merged /qpid/proton/trunk:r1371803-1382866
Modified: qpid/proton/branches/driver_abstraction/examples/broker/broker
URL: http://svn.apache.org/viewvc/qpid/proton/branches/driver_abstraction/examples/broker/broker?rev=1382918&r1=1382917&r2=1382918&view=diff
==============================================================================
--- qpid/proton/branches/driver_abstraction/examples/broker/broker (original)
+++ qpid/proton/branches/driver_abstraction/examples/broker/broker Mon Sep 10 14:56:20 2012
@@ -128,5 +128,13 @@ try:
else:
pn_connector_process(c)
+ c = pn_connector_head(driver)
+ while c:
+ handler = pn_connector_context(c)
+ handler(c)
+ pn_connector_process(c)
+ window.redraw()
+ c = pn_connector_next(c)
+
except KeyboardInterrupt:
pass
Modified: qpid/proton/branches/driver_abstraction/proton-c/CMakeLists.txt
URL: http://svn.apache.org/viewvc/qpid/proton/branches/driver_abstraction/proton-c/CMakeLists.txt?rev=1382918&r1=1382917&r2=1382918&view=diff
==============================================================================
--- qpid/proton/branches/driver_abstraction/proton-c/CMakeLists.txt (original)
+++ qpid/proton/branches/driver_abstraction/proton-c/CMakeLists.txt Mon Sep 10 14:56:20 2012
@@ -4,7 +4,6 @@ project (Proton C)
set (PN_VERSION_MAJOR 0)
set (PN_VERSION_MINOR 1)
-set (LINK_DEPS uuid)
include(CheckIncludeFile)
@@ -48,6 +47,7 @@ add_custom_command (
DEPENDS ${PROJECT_SOURCE_DIR}/src/protocol.h.py
)
+# Configure the poller
if (POLLER STREQUAL poll)
set (pn_driver_impl
src/drivers/poll.c
@@ -66,6 +66,22 @@ else (SSL_IMPL STREQUAL openssl)
set (pn_driver_ssl_impl src/drivers/ssl_stub.c)
endif (SSL_IMPL STREQUAL openssl)
+find_package(SWIG)
+if (SWIG_FOUND)
+ add_subdirectory(bindings)
+endif (SWIG_FOUND)
+
+add_subdirectory(docs/api)
+
+# Should really be finding the uuid library appropriate for the platform
+# in lieu of doing this set the library name directly.
+set (UUID_LIB uuid)
+
+set (qpid-proton-platform
+ src/driver.c
+ ${pn_driver_impl}
+ ${pn_driver_ssl_impl}
+)
add_library (
qpid-proton SHARED
@@ -86,39 +102,33 @@ add_library (
src/message/message.c
src/sasl/sasl.c
- ${PROJECT_BINARY_DIR}/encodings.h
- ${PROJECT_BINARY_DIR}/protocol.h
-)
-
-add_library (
- qpid-proton-posix SHARED
-
- src/driver.c
src/messenger.c
- ${pn_driver_impl}
- ${pn_driver_ssl_impl}
-)
+ ${qpid-proton-platform}
-find_package(SWIG)
-if (SWIG_FOUND)
- add_subdirectory(bindings)
-endif (SWIG_FOUND)
+ ${PROJECT_BINARY_DIR}/encodings.h
+ ${PROJECT_BINARY_DIR}/protocol.h
+)
+target_link_libraries (qpid-proton ${UUID_LIB})
add_executable (proton src/proton.c)
-target_link_libraries (proton qpid-proton qpid-proton-posix ${LINK_DEPS})
+target_link_libraries (proton qpid-proton)
add_executable (proton-dump src/proton-dump.c)
target_link_libraries (proton-dump qpid-proton)
-add_subdirectory(docs/api)
-
set_target_properties (
- qpid-proton qpid-proton-posix proton proton-dump
+ qpid-proton proton proton-dump
PROPERTIES
COMPILE_FLAGS "-Wall -Werror -pedantic-errors -std=c99 -g -Iinclude -fPIC"
)
-install (TARGETS proton proton-dump qpid-proton qpid-proton-posix
+# Install executables and libraries
+install (TARGETS proton proton-dump qpid-proton
RUNTIME DESTINATION bin
LIBRARY DESTINATION lib)
+
+# Install header files
+file(GLOB headers "include/proton/*.[hi]")
+install (FILES ${headers} DESTINATION include/proton)
+
Modified: qpid/proton/branches/driver_abstraction/proton-c/bindings/CMakeLists.txt
URL: http://svn.apache.org/viewvc/qpid/proton/branches/driver_abstraction/proton-c/bindings/CMakeLists.txt?rev=1382918&r1=1382917&r2=1382918&view=diff
==============================================================================
--- qpid/proton/branches/driver_abstraction/proton-c/bindings/CMakeLists.txt (original)
+++ qpid/proton/branches/driver_abstraction/proton-c/bindings/CMakeLists.txt Mon Sep 10 14:56:20 2012
@@ -19,7 +19,7 @@
include(UseSWIG)
-set (BINDING_DEPS qpid-proton qpid-proton-posix ${LINK_DEPS})
+set (BINDING_DEPS qpid-proton)
# Build wrapper for Python:
find_package (PythonLibs)
Modified: qpid/proton/branches/driver_abstraction/proton-c/bindings/php/php.i
URL: http://svn.apache.org/viewvc/qpid/proton/branches/driver_abstraction/proton-c/bindings/php/php.i?rev=1382918&r1=1382917&r2=1382918&view=diff
==============================================================================
--- qpid/proton/branches/driver_abstraction/proton-c/bindings/php/php.i (original)
+++ qpid/proton/branches/driver_abstraction/proton-c/bindings/php/php.i Mon Sep 10 14:56:20 2012
@@ -67,8 +67,6 @@ ssize_t pn_input(pn_transport_t *transpo
ssize_t pn_sasl_send(pn_sasl_t *sasl, char *STRING, size_t LENGTH);
%ignore pn_sasl_send;
-ssize_t pn_sasl_input(pn_sasl_t *sasl, char *STRING, size_t LENGTH);
-%ignore pn_sasl_input;
// Use the OUTPUT_BUFFER,OUTPUT_LEN typemap to allow these functions to return
@@ -110,18 +108,6 @@ ssize_t pn_sasl_input(pn_sasl_t *sasl, c
%}
%ignore pn_output;
-%rename(pn_sasl_output) wrap_pn_output;
-// in PHP: array = pn_sasl_output(sasl, MAXLEN);
-// array[0] = size || error code
-// array[1] = native string containing binary data
-%inline %{
- void wrap_pn_sasl_output(pn_sasl_t *sasl, size_t maxCount, char **OUTPUT_BUFFER, ssize_t *OUTPUT_LEN) {
- *OUTPUT_BUFFER = emalloc(sizeof(char) * maxCount);
- *OUTPUT_LEN = pn_sasl_output(sasl, *OUTPUT_BUFFER, maxCount);
- }
-%}
-%ignore pn_sasl_output;
-
%rename(pn_message_data) wrap_pn_message_data;
// in PHP: array = pn_message_data("binary message data", MAXLEN);
// array[0] = size || error code
@@ -274,4 +260,4 @@ pn_connector_t *pn_connector_fd(pn_drive
%ignore pn_connector_free;
-%include "../cproton.i"
+%include "proton/cproton.i"
Modified: qpid/proton/branches/driver_abstraction/proton-c/bindings/python/python.i
URL: http://svn.apache.org/viewvc/qpid/proton/branches/driver_abstraction/proton-c/bindings/python/python.i?rev=1382918&r1=1382917&r2=1382918&view=diff
==============================================================================
--- qpid/proton/branches/driver_abstraction/proton-c/bindings/python/python.i (original)
+++ qpid/proton/branches/driver_abstraction/proton-c/bindings/python/python.i Mon Sep 10 14:56:20 2012
@@ -104,23 +104,6 @@ ssize_t pn_input(pn_transport_t *transpo
%}
%ignore pn_output;
-ssize_t pn_sasl_input(pn_sasl_t *sasl, char *STRING, size_t LENGTH);
-%ignore pn_sasl_input;
-
-%rename(pn_sasl_output) wrap_pn_sasl_output;
-%inline %{
- int wrap_pn_sasl_output(pn_sasl_t *sasl, char *OUTPUT, size_t *OUTPUT_SIZE) {
- ssize_t sz = pn_sasl_output(sasl, OUTPUT, *OUTPUT_SIZE);
- if (sz >= 0) {
- *OUTPUT_SIZE = sz;
- } else {
- *OUTPUT_SIZE = 0;
- }
- return sz;
- }
-%}
-%ignore pn_sasl_output;
-
%rename(pn_delivery) wrap_pn_delivery;
%inline %{
pn_delivery_t *wrap_pn_delivery(pn_link_t *link, char *STRING, size_t LENGTH) {
@@ -230,4 +213,4 @@ ssize_t pn_sasl_input(pn_sasl_t *sasl, c
%}
%ignore pn_connector_free;
-%include "../cproton.i"
+%include "proton/cproton.i"
Modified: qpid/proton/branches/driver_abstraction/proton-c/bindings/ruby/ruby.i
URL: http://svn.apache.org/viewvc/qpid/proton/branches/driver_abstraction/proton-c/bindings/ruby/ruby.i?rev=1382918&r1=1382917&r2=1382918&view=diff
==============================================================================
--- qpid/proton/branches/driver_abstraction/proton-c/bindings/ruby/ruby.i (original)
+++ qpid/proton/branches/driver_abstraction/proton-c/bindings/ruby/ruby.i Mon Sep 10 14:56:20 2012
@@ -154,4 +154,4 @@ ssize_t pn_input(pn_transport_t *transpo
%}
%ignore pn_message_data;
-%include "../cproton.i"
+%include "proton/cproton.i"
Modified: qpid/proton/branches/driver_abstraction/proton-c/include/proton/buffer.h
URL: http://svn.apache.org/viewvc/qpid/proton/branches/driver_abstraction/proton-c/include/proton/buffer.h?rev=1382918&r1=1382917&r2=1382918&view=diff
==============================================================================
--- qpid/proton/branches/driver_abstraction/proton-c/include/proton/buffer.h (original)
+++ qpid/proton/branches/driver_abstraction/proton-c/include/proton/buffer.h Mon Sep 10 14:56:20 2012
@@ -24,6 +24,10 @@
#include <proton/types.h>
+#ifdef __cplusplus
+extern "C" {
+#endif
+
typedef struct pn_buffer_t pn_buffer_t;
pn_buffer_t *pn_buffer(size_t capacity);
@@ -31,10 +35,15 @@ void pn_buffer_free(pn_buffer_t *buf);
size_t pn_buffer_size(pn_buffer_t *buf);
int pn_buffer_append(pn_buffer_t *buf, const char *bytes, size_t size);
int pn_buffer_prepend(pn_buffer_t *buf, const char *bytes, size_t size);
+size_t pn_buffer_get(pn_buffer_t *buf, size_t offset, size_t size, char *dst);
int pn_buffer_trim(pn_buffer_t *buf, size_t left, size_t right);
int pn_buffer_clear(pn_buffer_t *buf);
int pn_buffer_defrag(pn_buffer_t *buf);
pn_bytes_t pn_buffer_bytes(pn_buffer_t *buf);
int pn_buffer_print(pn_buffer_t *buf);
+#ifdef __cplusplus
+}
+#endif
+
#endif /* buffer.h */
Modified: qpid/proton/branches/driver_abstraction/proton-c/include/proton/codec.h
URL: http://svn.apache.org/viewvc/qpid/proton/branches/driver_abstraction/proton-c/include/proton/codec.h?rev=1382918&r1=1382917&r2=1382918&view=diff
==============================================================================
--- qpid/proton/branches/driver_abstraction/proton-c/include/proton/codec.h (original)
+++ qpid/proton/branches/driver_abstraction/proton-c/include/proton/codec.h Mon Sep 10 14:56:20 2012
@@ -28,6 +28,10 @@
#include <unistd.h>
#include <stdarg.h>
+#ifdef __cplusplus
+extern "C" {
+#endif
+
typedef enum {
PN_NULL,
PN_BOOL,
@@ -118,4 +122,8 @@ pn_atoms_t pn_data_available(pn_data_t *
int pn_data_format(pn_data_t *data, char *bytes, size_t *size);
int pn_data_resize(pn_data_t *data, size_t size);
+#ifdef __cplusplus
+}
+#endif
+
#endif /* codec.h */
Modified: qpid/proton/branches/driver_abstraction/proton-c/include/proton/driver.h
URL: http://svn.apache.org/viewvc/qpid/proton/branches/driver_abstraction/proton-c/include/proton/driver.h?rev=1382918&r1=1382917&r2=1382918&view=diff
==============================================================================
--- qpid/proton/branches/driver_abstraction/proton-c/include/proton/driver.h (original)
+++ qpid/proton/branches/driver_abstraction/proton-c/include/proton/driver.h Mon Sep 10 14:56:20 2012
@@ -26,6 +26,10 @@
#include <proton/engine.h>
#include <proton/sasl.h>
+#ifdef __cplusplus
+extern "C" {
+#endif
+
/** @file
* API for the Driver Layer.
*
@@ -52,6 +56,22 @@ typedef struct pn_connector_t pn_connect
*/
pn_driver_t *pn_driver(void);
+/** Return the most recent error code.
+ *
+ * @param[in] d the driver
+ *
+ * @return the most recent error text for d
+ */
+int pn_driver_errno(pn_driver_t *d);
+
+/** Return the most recent error text for d.
+ *
+ * @param[in] d the driver
+ *
+ * @return the most recent error text for d
+ */
+const char *pn_driver_error(pn_driver_t *d);
+
/** Set the tracing level for the given driver.
*
* @param[in] driver the driver to trace
@@ -124,6 +144,23 @@ pn_listener_t *pn_listener(pn_driver_t *
*/
pn_listener_t *pn_listener_fd(pn_driver_t *driver, int fd, void *context);
+/** Access the head listener for a driver.
+ *
+ * @param[in] driver the driver whose head listener will be returned
+ *
+ * @return the head listener for driver or NULL if there is none
+ */
+pn_listener_t *pn_listener_head(pn_driver_t *driver);
+
+/** Access the next listener.
+ *
+ * @param[in] listener the listener whose next listener will be
+ * returned
+ *
+ * @return the next listener
+ */
+pn_listener_t *pn_listener_next(pn_listener_t *listener);
+
/**
* @todo pn_listener_trace needs documentation
*/
@@ -219,6 +256,23 @@ pn_connector_t *pn_connector(pn_driver_t
*/
pn_connector_t *pn_connector_fd(pn_driver_t *driver, int fd, void *context);
+/** Access the head connector for a driver.
+ *
+ * @param[in] driver the driver whose head connector will be returned
+ *
+ * @return the head connector for driver or NULL if there is none
+ */
+pn_connector_t *pn_connector_head(pn_driver_t *driver);
+
+/** Access the next connector.
+ *
+ * @param[in] connector the connector whose next connector will be
+ * returned
+ *
+ * @return the next connector
+ */
+pn_connector_t *pn_connector_next(pn_connector_t *connector);
+
/** Set the tracing level for the given connector.
*
* @param[in] connector the connector to trace
@@ -308,7 +362,6 @@ bool pn_connector_closed(pn_connector_t
*/
void pn_connector_free(pn_connector_t *connector);
-
/** Configure the set of trusted certificates for this client. This causes the connector
* to use SSL/TLS to authenticate the server and encrypt traffic. It is intended to be
* used by a client that is attempting to connect to a trusted server. See
@@ -337,8 +390,6 @@ int pn_connector_ssl_set_client_auth(pn_
const char *private_key_file,
const char *password);
-
-
/** Force the peer (client) to authenticate. This is intended to be used on those
* connectors that have been created by a listener - it permits the server to force
* authentication of the connected client. See ::pn_listener_ssl_set_client_auth
@@ -352,4 +403,9 @@ int pn_connector_ssl_set_client_auth(pn_
int pn_connector_ssl_authenticate_client(pn_connector_t *connector,
const char *trusted_CAs_file);
+
+#ifdef __cplusplus
+}
+#endif
+
#endif /* driver.h */
Modified: qpid/proton/branches/driver_abstraction/proton-c/include/proton/engine.h
URL: http://svn.apache.org/viewvc/qpid/proton/branches/driver_abstraction/proton-c/include/proton/engine.h?rev=1382918&r1=1382917&r2=1382918&view=diff
==============================================================================
--- qpid/proton/branches/driver_abstraction/proton-c/include/proton/engine.h (original)
+++ qpid/proton/branches/driver_abstraction/proton-c/include/proton/engine.h Mon Sep 10 14:56:20 2012
@@ -27,6 +27,10 @@
#include <sys/types.h>
#include <proton/error.h>
+#ifdef __cplusplus
+extern "C" {
+#endif
+
/** @file
* API for the proton Engine.
*
@@ -144,15 +148,22 @@ pn_delivery_t *pn_work_next(pn_delivery_
*/
pn_session_t *pn_session(pn_connection_t *connection);
-/** Factory for creating the connection's transport.
+/** Factory for creating a transport.
*
- * The transport used by the connection to interface with the network.
- * There can only be one transport associated with a connection.
+ * A transport to be used by a connection to interface with the
+ * network. There can only be one connection associated with a
+ * transport. See pn_transport_bind().
*
- * @param[in] connection connection that will use the transport
- * @return pointer to new session
+ * @return pointer to new transport
+ */
+pn_transport_t *pn_transport(void);
+
+/** Binds the transport to an AMQP connection endpoint.
+ *
+ * @return an error code, or 0 on success
*/
-pn_transport_t *pn_transport(pn_connection_t *connection);
+
+int pn_transport_bind(pn_transport_t *transport, pn_connection_t *connection);
/** Retrieve the first Session that matches the given state mask.
*
@@ -218,6 +229,24 @@ void pn_connection_open(pn_connection_t
void pn_connection_close(pn_connection_t *connection);
void pn_connection_free(pn_connection_t *connection);
+/** Access the application context that is associated with the
+ * connection.
+ *
+ * @param[in] connection the connection whose context is to be returned.
+ *
+ * @return the application context that was passed to pn_connection_set_context()
+ */
+void *pn_connection_context(pn_connection_t *connection);
+
+/** Assign a new application context to the connection.
+ *
+ * @param[in] connection the connection which will hold the context.
+ * @param[in] context new application context to associate with the
+ * connection
+ */
+void pn_connection_set_context(pn_connection_t *connection, void *context);
+
+
// transport
pn_error_t *pn_transport_error(pn_transport_t *transport);
ssize_t pn_input(pn_transport_t *transport, char *bytes, size_t available);
@@ -247,8 +276,8 @@ const char *pn_target(pn_link_t *link);
const char *pn_source(pn_link_t *link);
void pn_set_source(pn_link_t *link, const char *source);
void pn_set_target(pn_link_t *link, const char *target);
-char *pn_remote_source(pn_link_t *link);
-char *pn_remote_target(pn_link_t *link);
+const char *pn_remote_source(pn_link_t *link);
+const char *pn_remote_target(pn_link_t *link);
pn_delivery_t *pn_delivery(pn_link_t *link, pn_delivery_tag_t tag);
pn_delivery_t *pn_current(pn_link_t *link);
bool pn_advance(pn_link_t *link);
@@ -291,4 +320,8 @@ void pn_disposition(pn_delivery_t *deliv
void pn_settle(pn_delivery_t *delivery);
void pn_delivery_dump(pn_delivery_t *delivery);
+#ifdef __cplusplus
+}
+#endif
+
#endif /* engine.h */
Modified: qpid/proton/branches/driver_abstraction/proton-c/include/proton/error.h
URL: http://svn.apache.org/viewvc/qpid/proton/branches/driver_abstraction/proton-c/include/proton/error.h?rev=1382918&r1=1382917&r2=1382918&view=diff
==============================================================================
--- qpid/proton/branches/driver_abstraction/proton-c/include/proton/error.h (original)
+++ qpid/proton/branches/driver_abstraction/proton-c/include/proton/error.h Mon Sep 10 14:56:20 2012
@@ -24,6 +24,10 @@
#include <stdarg.h>
+#ifdef __cplusplus
+extern "C" {
+#endif
+
typedef struct pn_error_t pn_error_t;
#define PN_EOS (-1)
@@ -42,7 +46,12 @@ void pn_error_clear(pn_error_t *error);
int pn_error_set(pn_error_t *error, int code, const char *text);
int pn_error_vformat(pn_error_t *error, int code, const char *fmt, va_list ap);
int pn_error_format(pn_error_t *error, int code, const char *fmt, ...);
+int pn_error_from_errno(pn_error_t *error, const char *msg);
int pn_error_code(pn_error_t *error);
const char *pn_error_text(pn_error_t *error);
+#ifdef __cplusplus
+}
+#endif
+
#endif /* error.h */
Modified: qpid/proton/branches/driver_abstraction/proton-c/include/proton/framing.h
URL: http://svn.apache.org/viewvc/qpid/proton/branches/driver_abstraction/proton-c/include/proton/framing.h?rev=1382918&r1=1382917&r2=1382918&view=diff
==============================================================================
--- qpid/proton/branches/driver_abstraction/proton-c/include/proton/framing.h (original)
+++ qpid/proton/branches/driver_abstraction/proton-c/include/proton/framing.h Mon Sep 10 14:56:20 2012
@@ -25,6 +25,10 @@
#include <stdint.h>
#include <sys/types.h>
+#ifdef __cplusplus
+extern "C" {
+#endif
+
#define AMQP_HEADER_SIZE (8)
typedef struct {
@@ -39,4 +43,8 @@ typedef struct {
size_t pn_read_frame(pn_frame_t *frame, char *bytes, size_t available);
size_t pn_write_frame(char *bytes, size_t size, pn_frame_t frame);
+#ifdef __cplusplus
+}
+#endif
+
#endif /* framing.h */
Modified: qpid/proton/branches/driver_abstraction/proton-c/include/proton/message.h
URL: http://svn.apache.org/viewvc/qpid/proton/branches/driver_abstraction/proton-c/include/proton/message.h?rev=1382918&r1=1382917&r2=1382918&view=diff
==============================================================================
--- qpid/proton/branches/driver_abstraction/proton-c/include/proton/message.h (original)
+++ qpid/proton/branches/driver_abstraction/proton-c/include/proton/message.h Mon Sep 10 14:56:20 2012
@@ -27,6 +27,10 @@
#include <sys/types.h>
#include <stdbool.h>
+#ifdef __cplusplus
+extern "C" {
+#endif
+
typedef struct pn_message_t pn_message_t;
typedef enum {
PN_DATA,
@@ -143,4 +147,8 @@ int pn_message_encode(pn_message_t *msg,
ssize_t pn_message_data(char *dst, size_t available, const char *src, size_t size);
+#ifdef __cplusplus
+}
+#endif
+
#endif /* message.h */
Modified: qpid/proton/branches/driver_abstraction/proton-c/include/proton/messenger.h
URL: http://svn.apache.org/viewvc/qpid/proton/branches/driver_abstraction/proton-c/include/proton/messenger.h?rev=1382918&r1=1382917&r2=1382918&view=diff
==============================================================================
--- qpid/proton/branches/driver_abstraction/proton-c/include/proton/messenger.h (original)
+++ qpid/proton/branches/driver_abstraction/proton-c/include/proton/messenger.h Mon Sep 10 14:56:20 2012
@@ -24,6 +24,10 @@
#include <proton/message.h>
+#ifdef __cplusplus
+extern "C" {
+#endif
+
/** @file
* The messenger API provides a high level interface for sending and
* receiving AMQP messages.
@@ -180,4 +184,8 @@ int pn_messenger_outgoing(pn_messenger_t
*/
int pn_messenger_incoming(pn_messenger_t *messenger);
+#ifdef __cplusplus
+}
+#endif
+
#endif /* messenger.h */
Modified: qpid/proton/branches/driver_abstraction/proton-c/include/proton/parser.h
URL: http://svn.apache.org/viewvc/qpid/proton/branches/driver_abstraction/proton-c/include/proton/parser.h?rev=1382918&r1=1382917&r2=1382918&view=diff
==============================================================================
--- qpid/proton/branches/driver_abstraction/proton-c/include/proton/parser.h (original)
+++ qpid/proton/branches/driver_abstraction/proton-c/include/proton/parser.h Mon Sep 10 14:56:20 2012
@@ -24,12 +24,20 @@
#include <proton/codec.h>
+#ifdef __cplusplus
+extern "C" {
+#endif
+
typedef struct pn_parser_t pn_parser_t;
-pn_parser_t *pn_parser();
+pn_parser_t *pn_parser(void);
int pn_parser_parse(pn_parser_t *parser, const char *str, pn_atoms_t *atoms);
int pn_parser_errno(pn_parser_t *parser);
const char *pn_parser_error(pn_parser_t *parser);
void pn_parser_free(pn_parser_t *parser);
+#ifdef __cplusplus
+}
+#endif
+
#endif /* parser.h */
Modified: qpid/proton/branches/driver_abstraction/proton-c/include/proton/sasl.h
URL: http://svn.apache.org/viewvc/qpid/proton/branches/driver_abstraction/proton-c/include/proton/sasl.h?rev=1382918&r1=1382917&r2=1382918&view=diff
==============================================================================
--- qpid/proton/branches/driver_abstraction/proton-c/include/proton/sasl.h (original)
+++ qpid/proton/branches/driver_abstraction/proton-c/include/proton/sasl.h Mon Sep 10 14:56:20 2012
@@ -24,6 +24,11 @@
#include <sys/types.h>
#include <stdbool.h>
+#include <proton/engine.h>
+
+#ifdef __cplusplus
+extern "C" {
+#endif
/** @file
* API for the SASL Secure Transport Layer.
@@ -60,7 +65,7 @@ typedef enum {
*
* @return a new SASL object representing the layer.
*/
-pn_sasl_t *pn_sasl();
+pn_sasl_t *pn_sasl(pn_transport_t *transport);
/** Access the current state of the layer.
*
@@ -159,38 +164,8 @@ void pn_sasl_done(pn_sasl_t *sasl, pn_sa
*/
pn_sasl_outcome_t pn_sasl_outcome(pn_sasl_t *sasl);
-/** Decode input data bytes into SASL frames, and process them.
- *
- * This function is called by the driver layer to pass data received
- * from the remote peer into the SASL layer.
- *
- * @param[in] sasl the SASL layer.
- * @param[in] bytes buffer of frames to process
- * @param[in] available number of octets of data in 'bytes'
- * @return the number of bytes consumed, or error code if < 0
- */
-ssize_t pn_sasl_input(pn_sasl_t *sasl, char *bytes, size_t available);
-
-/** Gather output frames from the layer.
- *
- * This function is used by the driver to poll the SASL layer for data
- * that will be sent to the remote peer.
- *
- * @param[in] sasl The SASL layer.
- * @param[out] bytes to be filled with encoded frames.
- * @param[in] size space available in bytes array.
- * @return the number of octets written to bytes, or error code if < 0
- */
-ssize_t pn_sasl_output(pn_sasl_t *sasl, char *bytes, size_t size);
-
-void pn_sasl_trace(pn_sasl_t *sasl, pn_trace_t trace);
-
-/** Destructor for the given SASL layer.
- *
- * @param[in] sasl the SASL object to free. No longer valid on
- * return.
- */
-void pn_sasl_free(pn_sasl_t *sasl);
-
+#ifdef __cplusplus
+}
+#endif
#endif /* sasl.h */
Modified: qpid/proton/branches/driver_abstraction/proton-c/include/proton/scanner.h
URL: http://svn.apache.org/viewvc/qpid/proton/branches/driver_abstraction/proton-c/include/proton/scanner.h?rev=1382918&r1=1382917&r2=1382918&view=diff
==============================================================================
--- qpid/proton/branches/driver_abstraction/proton-c/include/proton/scanner.h (original)
+++ qpid/proton/branches/driver_abstraction/proton-c/include/proton/scanner.h Mon Sep 10 14:56:20 2012
@@ -25,6 +25,10 @@
#include <sys/types.h>
#include <stdarg.h>
+#ifdef __cplusplus
+extern "C" {
+#endif
+
typedef enum {
PN_TOK_LBRACE,
PN_TOK_RBRACE,
@@ -70,4 +74,8 @@ int pn_scanner_start(pn_scanner_t *scann
int pn_scanner_scan(pn_scanner_t *scanner);
int pn_scanner_shift(pn_scanner_t *scanner);
+#ifdef __cplusplus
+}
+#endif
+
#endif /* scanner.h */
Modified: qpid/proton/branches/driver_abstraction/proton-c/include/proton/types.h
URL: http://svn.apache.org/viewvc/qpid/proton/branches/driver_abstraction/proton-c/include/proton/types.h?rev=1382918&r1=1382917&r2=1382918&view=diff
==============================================================================
--- qpid/proton/branches/driver_abstraction/proton-c/include/proton/types.h (original)
+++ qpid/proton/branches/driver_abstraction/proton-c/include/proton/types.h Mon Sep 10 14:56:20 2012
@@ -25,6 +25,10 @@
#include <sys/types.h>
#include <stdint.h>
+#ifdef __cplusplus
+extern "C" {
+#endif
+
typedef int32_t pn_sequence_t;
typedef uint32_t pn_millis_t;
typedef uint64_t pn_timestamp_t;
@@ -37,4 +41,8 @@ typedef struct {
pn_bytes_t pn_bytes(size_t size, char *start);
pn_bytes_t pn_bytes_dup(size_t size, const char *start);
+#ifdef __cplusplus
+}
+#endif
+
#endif /* types.h */
Modified: qpid/proton/branches/driver_abstraction/proton-c/include/proton/util.h
URL: http://svn.apache.org/viewvc/qpid/proton/branches/driver_abstraction/proton-c/include/proton/util.h?rev=1382918&r1=1382917&r2=1382918&view=diff
==============================================================================
--- qpid/proton/branches/driver_abstraction/proton-c/include/proton/util.h (original)
+++ qpid/proton/branches/driver_abstraction/proton-c/include/proton/util.h Mon Sep 10 14:56:20 2012
@@ -24,8 +24,16 @@
#include <stdarg.h>
+#ifdef __cplusplus
+extern "C" {
+#endif
+
void parse_url(char *url, char **user, char **pass, char **host, char **port);
void pn_fatal(char *fmt, ...);
void pn_vfatal(char *fmt, va_list ap);
+#ifdef __cplusplus
+}
+#endif
+
#endif /* util.h */
Modified: qpid/proton/branches/driver_abstraction/proton-c/src/buffer.c
URL: http://svn.apache.org/viewvc/qpid/proton/branches/driver_abstraction/proton-c/src/buffer.c?rev=1382918&r1=1382917&r2=1382918&view=diff
==============================================================================
--- qpid/proton/branches/driver_abstraction/proton-c/src/buffer.c (original)
+++ qpid/proton/branches/driver_abstraction/proton-c/src/buffer.c Mon Sep 10 14:56:20 2012
@@ -139,9 +139,6 @@ int pn_buffer_ensure(pn_buffer_t *buf, s
return 0;
}
-#define min(X,Y) ((X) > (Y) ? (Y) : (X))
-#define max(X,Y) ((X) < (Y) ? (Y) : (X))
-
int pn_buffer_append(pn_buffer_t *buf, const char *bytes, size_t size)
{
int err = pn_buffer_ensure(buf, size);
@@ -149,7 +146,7 @@ int pn_buffer_append(pn_buffer_t *buf, c
size_t tail = pn_buffer_tail(buf);
size_t tail_space = pn_buffer_tail_space(buf);
- size_t n = min(tail_space, size);
+ size_t n = pn_min(tail_space, size);
memmove(buf->bytes + tail, bytes, n);
memmove(buf->bytes, bytes + n, size - n);
@@ -166,7 +163,7 @@ int pn_buffer_prepend(pn_buffer_t *buf,
size_t head = pn_buffer_head(buf);
size_t head_space = pn_buffer_head_space(buf);
- size_t n = min(head_space, size);
+ size_t n = pn_min(head_space, size);
memmove(buf->bytes + head - n, bytes + size - n, n);
memmove(buf->bytes + buf->capacity - (size - n), bytes, size - n);
@@ -182,6 +179,35 @@ int pn_buffer_prepend(pn_buffer_t *buf,
return 0;
}
+size_t pn_buffer_index(pn_buffer_t *buf, size_t index)
+{
+ size_t result = buf->start + index;
+ if (result >= buf->capacity) result -= buf->capacity;
+ return result;
+}
+
+size_t pn_buffer_get(pn_buffer_t *buf, size_t offset, size_t size, char *dst)
+{
+ size_t start = pn_buffer_index(buf, offset);
+ size_t stop = pn_buffer_index(buf, pn_min(offset + size, buf->size));
+
+ size_t sz1;
+ size_t sz2;
+
+ if (start > stop) {
+ sz1 = buf->capacity - start;
+ sz2 = stop;
+ } else {
+ sz1 = stop - start;
+ sz2 = 0;
+ }
+
+ memmove(dst, buf->bytes + start, sz1);
+ memmove(dst + sz1, buf->bytes, sz2);
+
+ return sz1 + sz2;
+}
+
int pn_buffer_trim(pn_buffer_t *buf, size_t left, size_t right)
{
if (left + right > buf->size) return PN_ARG_ERR;
@@ -202,13 +228,6 @@ int pn_buffer_clear(pn_buffer_t *buf)
return 0;
}
-size_t pn_buffer_index(pn_buffer_t *buf, size_t index)
-{
- size_t result = buf->start + index;
- if (result >= buf->capacity) result -= buf->capacity;
- return result;
-}
-
static void pn_buffer_rotate (pn_buffer_t *buf, size_t sz) {
if (sz == 0) return;
Modified: qpid/proton/branches/driver_abstraction/proton-c/src/dispatcher/dispatcher.c
URL: http://svn.apache.org/viewvc/qpid/proton/branches/driver_abstraction/proton-c/src/dispatcher/dispatcher.c?rev=1382918&r1=1382917&r2=1382918&view=diff
==============================================================================
--- qpid/proton/branches/driver_abstraction/proton-c/src/dispatcher/dispatcher.c (original)
+++ qpid/proton/branches/driver_abstraction/proton-c/src/dispatcher/dispatcher.c Mon Sep 10 14:56:20 2012
@@ -82,8 +82,8 @@ static void pn_do_trace(pn_dispatcher_t
uint8_t code = scanned ? code64 : 0;
size_t n = SCRATCH;
pn_data_format(args, disp->scratch, &n);
- fprintf(stderr, "[%p:%u] %s %s %s", (void *) disp, ch,
- dir == OUT ? "->" : "<-", disp->names[code], disp->scratch);
+ pn_dispatcher_trace(disp, ch, "%s %s %s", dir == OUT ? "->" : "<-",
+ disp->names[code], disp->scratch);
if (size) {
size_t capacity = 4*size + 1;
char buf[capacity];
@@ -95,6 +95,16 @@ static void pn_do_trace(pn_dispatcher_t
}
}
+void pn_dispatcher_trace(pn_dispatcher_t *disp, uint16_t ch, char *fmt, ...)
+{
+ va_list ap;
+ fprintf(stderr, "[%p:%u] ", (void *) disp, ch);
+
+ va_start(ap, fmt);
+ vfprintf(stderr, fmt, ap);
+ va_end(ap);
+}
+
ssize_t pn_dispatcher_input(pn_dispatcher_t *disp, char *bytes, size_t available)
{
size_t read = 0;
Modified: qpid/proton/branches/driver_abstraction/proton-c/src/dispatcher/dispatcher.h
URL: http://svn.apache.org/viewvc/qpid/proton/branches/driver_abstraction/proton-c/src/dispatcher/dispatcher.h?rev=1382918&r1=1382917&r2=1382918&view=diff
==============================================================================
--- qpid/proton/branches/driver_abstraction/proton-c/src/dispatcher/dispatcher.h (original)
+++ qpid/proton/branches/driver_abstraction/proton-c/src/dispatcher/dispatcher.h Mon Sep 10 14:56:20 2012
@@ -64,5 +64,6 @@ void pn_set_payload(pn_dispatcher_t *dis
int pn_post_frame(pn_dispatcher_t *disp, uint16_t ch, const char *fmt, ...);
ssize_t pn_dispatcher_input(pn_dispatcher_t *disp, char *bytes, size_t available);
ssize_t pn_dispatcher_output(pn_dispatcher_t *disp, char *bytes, size_t size);
+void pn_dispatcher_trace(pn_dispatcher_t *disp, uint16_t ch, char *fmt, ...);
#endif /* dispatcher.h */
Modified: qpid/proton/branches/driver_abstraction/proton-c/src/driver.c
URL: http://svn.apache.org/viewvc/qpid/proton/branches/driver_abstraction/proton-c/src/driver.c?rev=1382918&r1=1382917&r2=1382918&view=diff
==============================================================================
--- qpid/proton/branches/driver_abstraction/proton-c/src/driver.c (original)
+++ qpid/proton/branches/driver_abstraction/proton-c/src/driver.c Mon Sep 10 14:56:20 2012
@@ -69,24 +69,24 @@ pn_listener_t *pn_listener(pn_driver_t *
struct addrinfo *addr;
int code = getaddrinfo(host, port, NULL, &addr);
if (code) {
- fprintf(stderr, "getaddrinfo: %s\n", gai_strerror(code));
+ pn_error_format(driver->error, PN_ERR, "getaddrinfo: %s\n", gai_strerror(code));
return NULL;
}
int sock = socket(AF_INET, SOCK_STREAM, getprotobyname("tcp")->p_proto);
if (sock == -1) {
- perror("socket");
+ pn_error_from_errno(driver->error, "socket");
return NULL;
}
int optval = 1;
if (setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, &optval, sizeof(optval)) == -1) {
- perror("setsockopt");
+ pn_error_from_errno(driver->error, "setsockopt");
return NULL;
}
if (bind(sock, addr->ai_addr, addr->ai_addrlen) == -1) {
- perror("bind");
+ pn_error_from_errno(driver->error, "bind");
freeaddrinfo(addr);
return NULL;
}
@@ -94,14 +94,14 @@ pn_listener_t *pn_listener(pn_driver_t *
freeaddrinfo(addr);
if (listen(sock, 50) == -1) {
- perror("listen");
+ pn_error_from_errno(driver->error, "listen");
return NULL;
}
pn_listener_t *l = pn_listener_fd(driver, sock, context);
if (driver->trace & (PN_TRACE_FRM | PN_TRACE_RAW | PN_TRACE_DRV))
- printf("Listening on %s:%s\n", host, port);
+ fprintf(stderr, "Listening on %s:%s\n", host, port);
return l;
}
@@ -125,6 +125,16 @@ pn_listener_t *pn_listener_fd(pn_driver_
return l;
}
+pn_listener_t *pn_listener_head(pn_driver_t *driver)
+{
+ return driver ? driver->listener_head : NULL;
+}
+
+pn_listener_t *pn_listener_next(pn_listener_t *listener)
+{
+ return listener ? listener->listener_next : NULL;
+}
+
void pn_listener_trace(pn_listener_t *l, pn_trace_t trace) {
// XXX
}
@@ -227,17 +237,20 @@ pn_connector_t *pn_connector(pn_driver_t
struct addrinfo *addr;
int code = getaddrinfo(host, port, NULL, &addr);
if (code) {
- fprintf(stderr, "getaddrinfo: %s\n", gai_strerror(code));
+ pn_error_format(driver->error, PN_ERR, "getaddrinfo: %s", gai_strerror(code));
return NULL;
}
int sock = socket(AF_INET, SOCK_STREAM, getprotobyname("tcp")->p_proto);
- if (sock == -1)
+ if (sock == -1) {
+ pn_error_from_errno(driver->error, "socket");
return NULL;
+ }
pn_configure_sock(sock);
if (connect(sock, addr->ai_addr, addr->ai_addrlen) == -1) {
+ pn_error_from_errno(driver->error, "connect");
freeaddrinfo(addr);
return NULL;
}
@@ -255,15 +268,6 @@ static void pn_connector_read(pn_connect
static void pn_connector_write(pn_connector_t *ctor);
static time_t pn_connector_tick(pn_connector_t *ctor, time_t now);
-static ssize_t pn_connector_read_sasl_header(pn_connector_t *ctor);
-static ssize_t pn_connector_read_sasl(pn_connector_t *ctor);
-static ssize_t pn_connector_read_amqp_header(pn_connector_t *ctor);
-static ssize_t pn_connector_read_amqp(pn_connector_t *ctor);
-static ssize_t pn_connector_write_sasl_header(pn_connector_t *ctor);
-static ssize_t pn_connector_write_sasl(pn_connector_t *ctor);
-static ssize_t pn_connector_write_amqp_header(pn_connector_t *ctor);
-static ssize_t pn_connector_write_amqp(pn_connector_t *ctor);
-
pn_connector_t *pn_connector_fd(pn_driver_t *driver, int fd, void *context)
{
if (!driver) return NULL;
@@ -289,11 +293,8 @@ pn_connector_t *pn_connector_fd(pn_drive
c->input_size = 0;
c->input_eos = false;
c->output_size = 0;
- c->sasl = pn_sasl();
c->connection = NULL;
- c->transport = NULL;
- c->process_input = pn_connector_read_sasl_header;
- c->process_output = pn_connector_write_sasl_header;
+ c->transport = pn_transport();
c->input_done = false;
c->output_done = false;
c->context = context;
@@ -308,24 +309,33 @@ pn_connector_t *pn_connector_fd(pn_drive
return c;
}
+pn_connector_t *pn_connector_head(pn_driver_t *driver)
+{
+ return driver ? driver->connector_head : NULL;
+}
+
+pn_connector_t *pn_connector_next(pn_connector_t *connector)
+{
+ return connector ? connector->connector_next : NULL;
+}
+
void pn_connector_trace(pn_connector_t *ctor, pn_trace_t trace)
{
if (!ctor) return;
ctor->trace = trace;
- if (ctor->sasl) pn_sasl_trace(ctor->sasl, trace);
if (ctor->transport) pn_trace(ctor->transport, trace);
}
pn_sasl_t *pn_connector_sasl(pn_connector_t *ctor)
{
- return ctor ? ctor->sasl : NULL;
+ return ctor ? pn_sasl(ctor->transport) : NULL;
}
void pn_connector_set_connection(pn_connector_t *ctor, pn_connection_t *connection)
{
if (!ctor) return;
ctor->connection = connection;
- ctor->transport = pn_transport(connection);
+ pn_transport_bind(ctor->transport, connection);
if (ctor->transport) pn_trace(ctor->transport, ctor->trace);
}
@@ -375,8 +385,8 @@ void pn_connector_free(pn_connector_t *c
pn_connector_poller_destroy(ctor);
ctor->connection = NULL;
+ pn_transport_free(ctor->transport);
ctor->transport = NULL;
- pn_sasl_free(ctor->sasl);
free(ctor);
}
@@ -400,95 +410,20 @@ static void pn_connector_consume(pn_conn
void pn_connector_process_input(pn_connector_t *ctor)
{
- while (!ctor->input_done && (ctor->input_size > 0 || ctor->input_eos)) {
- ssize_t n = ctor->process_input(ctor);
- if (n > 0) {
- pn_connector_consume(ctor, n);
- } else if (n == 0) {
- break;
- } else {
- if (n == PN_EOS) {
- pn_connector_consume(ctor, ctor->input_size);
+ pn_transport_t *transport = ctor->transport;
+ if (!ctor->input_done) {
+ if (ctor->input_size > 0 || ctor->input_eos) {
+ ssize_t n = pn_input(transport, ctor->input, ctor->input_size);
+ if (n >= 0) {
+ pn_connector_consume(ctor, n);
} else {
- fprintf(stderr, "error in process_input: %s\n", pn_code(n));
+ pn_connector_consume(ctor, ctor->input_size);
+ ctor->input_done = true;
}
- ctor->input_done = true;
- break;
}
}
}
-static ssize_t pn_connector_read_sasl_header(pn_connector_t *ctor)
-{
- if (ctor->input_size >= 8) {
- if (memcmp(ctor->input, "AMQP\x03\x01\x00\x00", 8)) {
- fprintf(stderr, "sasl header missmatch: ");
- pn_fprint_data(stderr, ctor->input, ctor->input_size);
- fprintf(stderr, "\n");
- ctor->output_done = true;
- return PN_ERR;
- } else {
- if (ctor->trace & PN_TRACE_FRM)
- fprintf(stderr, " <- AMQP SASL 1.0\n");
- ctor->process_input = pn_connector_read_sasl;
- return 8;
- }
- } else if (ctor->input_eos) {
- fprintf(stderr, "sasl header missmatch: ");
- pn_fprint_data(stderr, ctor->input, ctor->input_size);
- fprintf(stderr, "\n");
- ctor->output_done = true;
- return PN_ERR;
- }
-
- return 0;
-}
-
-static ssize_t pn_connector_read_sasl(pn_connector_t *ctor)
-{
- pn_sasl_t *sasl = ctor->sasl;
- ssize_t n = pn_sasl_input(sasl, ctor->input, ctor->input_size);
- if (n == PN_EOS) {
- ctor->process_input = pn_connector_read_amqp_header;
- return ctor->process_input(ctor);
- } else {
- return n;
- }
-}
-
-static ssize_t pn_connector_read_amqp_header(pn_connector_t *ctor)
-{
- if (ctor->input_size >= 8) {
- if (memcmp(ctor->input, "AMQP\x00\x01\x00\x00", 8)) {
- fprintf(stderr, "amqp header missmatch: ");
- pn_fprint_data(stderr, ctor->input, ctor->input_size);
- fprintf(stderr, "\n");
- ctor->output_done = true;
- return PN_ERR;
- } else {
- if (ctor->trace & PN_TRACE_FRM)
- fprintf(stderr, " <- AMQP 1.0\n");
- ctor->process_input = pn_connector_read_amqp;
- return 8;
- }
- } else if (ctor->input_eos) {
- fprintf(stderr, "amqp header missmatch: ");
- pn_fprint_data(stderr, ctor->input, ctor->input_size);
- fprintf(stderr, "\n");
- ctor->output_done = true;
- return PN_ERR;
- }
-
- return 0;
-}
-
-static ssize_t pn_connector_read_amqp(pn_connector_t *ctor)
-{
- if (!ctor->transport) return 0;
- pn_transport_t *transport = ctor->transport;
- return pn_input(transport, ctor->input, ctor->input_size);
-}
-
static char *pn_connector_output(pn_connector_t *ctor)
{
return ctor->output + ctor->output_size;
@@ -501,18 +436,13 @@ static size_t pn_connector_available(pn_
void pn_connector_process_output(pn_connector_t *ctor)
{
- while (!ctor->output_done && pn_connector_available(ctor) > 0) {
- ssize_t n = ctor->process_output(ctor);
- if (n > 0) {
+ pn_transport_t *transport = ctor->transport;
+ if (!ctor->output_done) {
+ ssize_t n = pn_output(transport, pn_connector_output(ctor), pn_connector_available(ctor));
+ if (n >= 0) {
ctor->output_size += n;
- } else if (n == 0) {
- break;
} else {
- if (n != PN_EOS) {
- fprintf(stderr, "error in process_output: %s\n", pn_code(n));
- }
ctor->output_done = true;
- break;
}
}
@@ -540,43 +470,6 @@ static void pn_connector_write(pn_connec
ctor->status &= ~PN_SEL_WR;
}
-static ssize_t pn_connector_write_sasl_header(pn_connector_t *ctor)
-{
- if (ctor->trace & PN_TRACE_FRM)
- fprintf(stderr, " -> AMQP SASL 1.0\n");
- memmove(pn_connector_output(ctor), "AMQP\x03\x01\x00\x00", 8);
- ctor->process_output = pn_connector_write_sasl;
- return 8;
-}
-
-static ssize_t pn_connector_write_sasl(pn_connector_t *ctor)
-{
- pn_sasl_t *sasl = ctor->sasl;
- ssize_t n = pn_sasl_output(sasl, pn_connector_output(ctor), pn_connector_available(ctor));
- if (n == PN_EOS) {
- ctor->process_output = pn_connector_write_amqp_header;
- return ctor->process_output(ctor);
- } else {
- return n;
- }
-}
-
-static ssize_t pn_connector_write_amqp_header(pn_connector_t *ctor)
-{
- if (ctor->trace & PN_TRACE_FRM)
- fprintf(stderr, " -> AMQP 1.0\n");
- memmove(pn_connector_output(ctor), "AMQP\x00\x01\x00\x00", 8);
- ctor->process_output = pn_connector_write_amqp;
- return 8;
-}
-
-static ssize_t pn_connector_write_amqp(pn_connector_t *ctor)
-{
- if (!ctor->transport) return 0;
- pn_transport_t *transport = ctor->transport;
- return pn_output(transport, pn_connector_output(ctor), pn_connector_available(ctor));
-}
-
static time_t pn_connector_tick(pn_connector_t *ctor, time_t now)
{
if (!ctor->transport) return 0;
@@ -620,6 +513,7 @@ pn_driver_t *pn_driver()
{
pn_driver_t *d = malloc(sizeof(pn_driver_t));
if (!d) return NULL;
+ d->error = pn_error();
d->listener_head = NULL;
d->listener_tail = NULL;
d->listener_next = NULL;
@@ -645,6 +539,16 @@ pn_driver_t *pn_driver()
return d;
}
+int pn_driver_errno(pn_driver_t *d)
+{
+ return d ? pn_error_code(d->error) : PN_ARG_ERR;
+}
+
+const char *pn_driver_error(pn_driver_t *d)
+{
+ return d ? pn_error_text(d->error) : NULL;
+}
+
void pn_driver_trace(pn_driver_t *d, pn_trace_t trace)
{
d->trace = trace;
@@ -662,6 +566,7 @@ void pn_driver_free(pn_driver_t *d)
pn_listener_free(d->listener_head);
pn_driver_poller_destroy(d);
+ pn_error_free(d->error);
free(d);
}
Modified: qpid/proton/branches/driver_abstraction/proton-c/src/driver_impl.h
URL: http://svn.apache.org/viewvc/qpid/proton/branches/driver_abstraction/proton-c/src/driver_impl.h?rev=1382918&r1=1382917&r2=1382918&view=diff
==============================================================================
--- qpid/proton/branches/driver_abstraction/proton-c/src/driver_impl.h (original)
+++ qpid/proton/branches/driver_abstraction/proton-c/src/driver_impl.h Mon Sep 10 14:56:20 2012
@@ -25,6 +25,7 @@
/* Decls */
struct pn_driver_t {
+ pn_error_t *error;
pn_listener_t *listener_head;
pn_listener_t *listener_tail;
pn_listener_t *listener_next;
@@ -90,11 +91,8 @@ struct pn_connector_t {
bool input_eos;
size_t output_size;
char output[PN_CONNECTOR_IO_BUF_SIZE];
- pn_sasl_t *sasl;
pn_connection_t *connection;
pn_transport_t *transport;
- ssize_t (*process_input)(pn_connector_t *);
- ssize_t (*process_output)(pn_connector_t *);
bool input_done;
bool output_done;
pn_listener_t *listener;
Modified: qpid/proton/branches/driver_abstraction/proton-c/src/engine/engine-internal.h
URL: http://svn.apache.org/viewvc/qpid/proton/branches/driver_abstraction/proton-c/src/engine/engine-internal.h?rev=1382918&r1=1382917&r2=1382918&view=diff
==============================================================================
--- qpid/proton/branches/driver_abstraction/proton-c/src/engine/engine-internal.h (original)
+++ qpid/proton/branches/driver_abstraction/proton-c/src/engine/engine-internal.h Mon Sep 10 14:56:20 2012
@@ -22,6 +22,7 @@
*
*/
+#include <proton/buffer.h>
#include <proton/engine.h>
#include <proton/types.h>
#include "../dispatcher/dispatcher.h"
@@ -85,15 +86,22 @@ typedef struct {
#define SCRATCH (1024)
+#include <proton/sasl.h>
+
struct pn_transport_t {
- pn_endpoint_t endpoint;
+ ssize_t (*process_input)(pn_transport_t *, char *, size_t);
+ ssize_t (*process_output)(pn_transport_t *, char *, size_t);
+ size_t header_count;
+ pn_sasl_t *sasl;
pn_connection_t *connection;
pn_dispatcher_t *disp;
bool open_sent;
bool open_rcvd;
bool close_sent;
bool close_rcvd;
- int error;
+ char *remote_container;
+ char *remote_hostname;
+ pn_error_t *error;
pn_session_state_t *sessions;
size_t session_capacity;
pn_session_state_t **channels;
@@ -118,8 +126,7 @@ struct pn_connection_t {
pn_delivery_t *tpwork_tail;
char *container;
char *hostname;
- char *remote_container;
- char *remote_hostname;
+ void *context;
};
struct pn_session_t {
@@ -154,7 +161,7 @@ struct pn_link_t {
struct pn_delivery_t {
pn_link_t *link;
- pn_bytes_t tag;
+ pn_buffer_t *tag;
int local_state;
int remote_state;
bool local_settled;
@@ -171,9 +178,7 @@ struct pn_delivery_t {
pn_delivery_t *tpwork_next;
pn_delivery_t *tpwork_prev;
bool tpwork;
- char *bytes;
- size_t size;
- size_t capacity;
+ pn_buffer_t *bytes;
bool done;
void *context;
};
@@ -201,5 +206,6 @@ void pn_link_dump(pn_link_t *link);
}
void pn_dump(pn_connection_t *conn);
+void pn_transport_sasl_init(pn_transport_t *transport);
#endif /* engine-internal.h */
Modified: qpid/proton/branches/driver_abstraction/proton-c/src/engine/engine.c
URL: http://svn.apache.org/viewvc/qpid/proton/branches/driver_abstraction/proton-c/src/engine/engine.c?rev=1382918&r1=1382917&r2=1382918&view=diff
==============================================================================
--- qpid/proton/branches/driver_abstraction/proton-c/src/engine/engine.c (original)
+++ qpid/proton/branches/driver_abstraction/proton-c/src/engine/engine.c Mon Sep 10 14:56:20 2012
@@ -29,6 +29,8 @@
#include <stdarg.h>
#include <stdio.h>
+#include "../sasl/sasl-internal.h"
+
// delivery buffers
void pn_delivery_buffer_init(pn_delivery_buffer_t *db, pn_sequence_t next, size_t capacity)
@@ -187,21 +189,32 @@ void pn_connection_close(pn_connection_t
if (connection) pn_close((pn_endpoint_t *) connection);
}
+void pn_endpoint_tini(pn_endpoint_t *endpoint);
+
void pn_connection_free(pn_connection_t *connection)
{
if (!connection) return;
- pn_transport_free(connection->transport);
while (connection->session_count)
pn_session_free(connection->sessions[connection->session_count - 1]);
free(connection->sessions);
free(connection->container);
free(connection->hostname);
- free(connection->remote_container);
- free(connection->remote_hostname);
+ pn_endpoint_tini(&connection->endpoint);
free(connection);
}
+void *pn_connection_context(pn_connection_t *conn)
+{
+ return conn ? conn->context : 0;
+}
+
+void pn_connection_set_context(pn_connection_t *conn, void *context)
+{
+ if (conn)
+ conn->context = context;
+}
+
void pn_transport_open(pn_transport_t *transport)
{
pn_open((pn_endpoint_t *) transport);
@@ -216,6 +229,7 @@ void pn_transport_free(pn_transport_t *t
{
if (!transport) return;
+ pn_sasl_free(transport->sasl);
pn_dispatcher_free(transport->disp);
for (int i = 0; i < transport->session_capacity; i++) {
pn_delivery_buffer_free(&transport->sessions[i].incoming);
@@ -223,6 +237,9 @@ void pn_transport_free(pn_transport_t *t
free(transport->sessions[i].links);
free(transport->sessions[i].handles);
}
+ free(transport->remote_container);
+ free(transport->remote_hostname);
+ pn_error_free(transport->error);
free(transport->sessions);
free(transport->channels);
free(transport);
@@ -273,6 +290,7 @@ void pn_session_free(pn_session_t *sessi
pn_link_free(session->links[session->link_count - 1]);
pn_remove_session(session->connection, session);
free(session->links);
+ pn_endpoint_tini(&session->endpoint);
free(session);
}
@@ -298,28 +316,11 @@ void pn_remove_link(pn_session_t *ssn, p
link->session = NULL;
}
-void pn_clear_tag(pn_delivery_t *delivery)
-{
- if (delivery->tag.start) {
- free(delivery->tag.start);
- delivery->tag = (pn_bytes_t) {0, NULL};
- }
-}
-
-void pn_clear_bytes(pn_delivery_t *delivery)
-{
- if (delivery->capacity) {
- free(delivery->bytes);
- delivery->bytes = NULL;
- delivery->capacity = 0;
- }
-}
-
void pn_free_delivery(pn_delivery_t *delivery)
{
if (delivery) {
- pn_clear_tag(delivery);
- pn_clear_bytes(delivery);
+ pn_buffer_free(delivery->tag);
+ pn_buffer_free(delivery->bytes);
free(delivery);
}
}
@@ -354,6 +355,7 @@ void pn_link_free(pn_link_t *link)
pn_free_delivery(d);
}
free(link->name);
+ pn_endpoint_tini(&link->endpoint);
free(link);
}
@@ -371,11 +373,17 @@ void pn_endpoint_init(pn_endpoint_t *end
LL_ADD(conn, endpoint, endpoint);
}
+void pn_endpoint_tini(pn_endpoint_t *endpoint)
+{
+ pn_error_free(endpoint->error);
+}
+
pn_connection_t *pn_connection()
{
pn_connection_t *conn = malloc(sizeof(pn_connection_t));
if (!conn) return NULL;
+ conn->context = NULL;
conn->endpoint_head = NULL;
conn->endpoint_tail = NULL;
pn_endpoint_init(&conn->endpoint, CONNECTION, conn);
@@ -391,8 +399,6 @@ pn_connection_t *pn_connection()
conn->tpwork_tail = NULL;
conn->container = NULL;
conn->hostname = NULL;
- conn->remote_container = NULL;
- conn->remote_hostname = NULL;
return conn;
}
@@ -433,12 +439,14 @@ void pn_connection_set_hostname(pn_conne
const char *pn_connection_remote_container(pn_connection_t *connection)
{
- return connection ? connection->remote_container : NULL;
+ if (!connection) return NULL;
+ return connection->transport ? connection->transport->remote_container : NULL;
}
const char *pn_connection_remote_hostname(pn_connection_t *connection)
{
- return connection ? connection->remote_hostname : NULL;
+ if (!connection) return NULL;
+ return connection->transport ? connection->transport->remote_hostname : NULL;
}
pn_delivery_t *pn_work_head(pn_connection_t *connection)
@@ -655,10 +663,21 @@ int pn_do_detach(pn_dispatcher_t *disp);
int pn_do_end(pn_dispatcher_t *disp);
int pn_do_close(pn_dispatcher_t *disp);
+static ssize_t pn_input_read_sasl_header(pn_transport_t *transport, char *bytes, size_t available);
+static ssize_t pn_input_read_sasl(pn_transport_t *transport, char *bytes, size_t available);
+static ssize_t pn_input_read_amqp_header(pn_transport_t *transport, char *bytes, size_t available);
+static ssize_t pn_input_read_amqp(pn_transport_t *transport, char *bytes, size_t available);
+static ssize_t pn_output_write_sasl_header(pn_transport_t *transport, char *bytes, size_t available);
+static ssize_t pn_output_write_sasl(pn_transport_t *transport, char *bytes, size_t available);
+static ssize_t pn_output_write_amqp_header(pn_transport_t *transport, char *bytes, size_t available);
+static ssize_t pn_output_write_amqp(pn_transport_t *transport, char *bytes, size_t available);
+
void pn_transport_init(pn_transport_t *transport)
{
- pn_endpoint_init(&transport->endpoint, TRANSPORT, transport->connection);
-
+ transport->process_input = pn_input_read_amqp_header;
+ transport->process_output = pn_output_write_amqp_header;
+ transport->header_count = 0;
+ transport->sasl = NULL;
transport->disp = pn_dispatcher(0, transport);
pn_dispatcher_action(transport->disp, OPEN, "OPEN", pn_do_open);
@@ -675,7 +694,9 @@ void pn_transport_init(pn_transport_t *t
transport->open_rcvd = false;
transport->close_sent = false;
transport->close_rcvd = false;
- transport->error = 0;
+ transport->remote_container = NULL;
+ transport->remote_hostname = NULL;
+ transport->error = pn_error();
transport->sessions = NULL;
transport->session_capacity = 0;
@@ -716,25 +737,41 @@ void pn_map_channel(pn_transport_t *tran
transport->channels[channel] = state;
}
-pn_transport_t *pn_transport(pn_connection_t *conn)
+pn_transport_t *pn_transport()
{
- if (!conn) return NULL;
+ pn_transport_t *transport = malloc(sizeof(pn_transport_t));
+ if (!transport) return NULL;
- if (conn->transport) {
- return NULL;
- } else {
- conn->transport = malloc(sizeof(pn_transport_t));
- if (!conn->transport) return NULL;
+ transport->connection = NULL;
+ pn_transport_init(transport);
+ return transport;
+}
- conn->transport->connection = conn;
- pn_transport_init(conn->transport);
- return conn->transport;
+void pn_transport_sasl_init(pn_transport_t *transport)
+{
+ transport->process_input = pn_input_read_sasl_header;
+ transport->process_output = pn_output_write_sasl_header;
+}
+
+int pn_transport_bind(pn_transport_t *transport, pn_connection_t *connection)
+{
+ if (!transport) return PN_ARG_ERR;
+ if (transport->connection) return PN_STATE_ERR;
+ if (connection->transport) return PN_STATE_ERR;
+ transport->connection = connection;
+ connection->transport = transport;
+ if (transport->open_rcvd) {
+ PN_SET_REMOTE(connection->endpoint.state, PN_REMOTE_ACTIVE);
+ if (!pn_error_code(transport->error)) {
+ transport->disp->halt = false;
+ }
}
+ return 0;
}
pn_error_t *pn_transport_error(pn_transport_t *transport)
{
- return transport->endpoint.error;
+ return transport->error;
}
void pn_link_init(pn_link_t *link, int type, pn_session_t *session, const char *name)
@@ -779,12 +816,12 @@ void pn_set_target(pn_link_t *link, cons
link->local_target = pn_strdup(target);
}
-char *pn_remote_source(pn_link_t *link)
+const char *pn_remote_source(pn_link_t *link)
{
return link ? link->remote_source : NULL;
}
-char *pn_remote_target(pn_link_t *link)
+const char *pn_remote_target(pn_link_t *link)
{
return link ? link->remote_target : NULL;
}
@@ -869,12 +906,15 @@ pn_delivery_t *pn_delivery(pn_link_t *li
if (!link) return NULL;
pn_delivery_t *delivery = link->settled_head;
LL_POP(link, settled);
- if (!delivery) delivery = malloc(sizeof(pn_delivery_t));
- if (!delivery) return NULL;
+ if (!delivery) {
+ delivery = malloc(sizeof(pn_delivery_t));
+ if (!delivery) return NULL;
+ delivery->tag = pn_buffer(16);
+ delivery->bytes = pn_buffer(64);
+ }
delivery->link = link;
- delivery->tag.size = tag.size;
- delivery->tag.start = malloc(tag.size);
- memcpy(delivery->tag.start, tag.bytes, tag.size);
+ pn_buffer_clear(delivery->tag);
+ pn_buffer_append(delivery->tag, tag.bytes, tag.size);
delivery->local_state = 0;
delivery->remote_state = 0;
delivery->local_settled = false;
@@ -888,9 +928,7 @@ pn_delivery_t *pn_delivery(pn_link_t *li
delivery->tpwork_next = NULL;
delivery->tpwork_prev = NULL;
delivery->tpwork = false;
- delivery->bytes = NULL;
- delivery->size = 0;
- delivery->capacity = 0;
+ pn_buffer_clear(delivery->bytes);
delivery->done = false;
delivery->context = NULL;
@@ -928,7 +966,8 @@ bool pn_is_current(pn_delivery_t *delive
void pn_delivery_dump(pn_delivery_t *d)
{
char tag[1024];
- pn_quote_data(tag, 1024, d->tag.start, d->tag.size);
+ pn_bytes_t bytes = pn_buffer_bytes(d->tag);
+ pn_quote_data(tag, 1024, bytes.start, bytes.size);
printf("{tag=%s, local_state=%u, remote_state=%u, local_settled=%u, "
"remote_settled=%u, updated=%u, current=%u, writable=%u, readable=%u, "
"work=%u}",
@@ -940,7 +979,8 @@ void pn_delivery_dump(pn_delivery_t *d)
pn_delivery_tag_t pn_delivery_tag(pn_delivery_t *delivery)
{
if (delivery) {
- return pn_dtag(delivery->tag.start, delivery->tag.size);
+ pn_bytes_t tag = pn_buffer_bytes(delivery->tag);
+ return pn_dtag(tag.start, tag.size);
} else {
return (pn_delivery_tag_t) {0};
}
@@ -1002,8 +1042,8 @@ void pn_real_settle(pn_delivery_t *deliv
LL_REMOVE(link, unsettled, delivery);
// TODO: what if we settle the current delivery?
LL_ADD(link, settled, delivery);
- pn_clear_tag(delivery);
- pn_clear_bytes(delivery);
+ pn_buffer_clear(delivery->tag);
+ pn_buffer_clear(delivery->bytes);
delivery->settled = true;
}
@@ -1047,13 +1087,13 @@ int pn_do_error(pn_transport_t *transpor
// XXX: result
vsnprintf(buf, 1024, fmt, ap);
va_end(ap);
- pn_error_set(transport->endpoint.error, PN_ERR, buf);
+ pn_error_set(transport->error, PN_ERR, buf);
if (!transport->close_sent) {
pn_post_close(transport);
transport->close_sent = true;
}
transport->disp->halt = true;
- fprintf(stderr, "ERROR %s %s\n", condition, pn_error_text(transport->endpoint.error));
+ fprintf(stderr, "ERROR %s %s\n", condition, pn_error_text(transport->error));
return PN_ERR;
}
@@ -1072,16 +1112,20 @@ int pn_do_open(pn_dispatcher_t *disp)
&hostname_q, &remote_hostname);
if (err) return err;
if (container_q) {
- conn->remote_container = pn_bytes_strdup(remote_container);
+ transport->remote_container = pn_bytes_strdup(remote_container);
} else {
- conn->remote_container = NULL;
+ transport->remote_container = NULL;
}
if (hostname_q) {
- conn->remote_hostname = pn_bytes_strdup(remote_hostname);
+ transport->remote_hostname = pn_bytes_strdup(remote_hostname);
} else {
- conn->remote_hostname = NULL;
+ transport->remote_hostname = NULL;
+ }
+ if (conn) {
+ PN_SET_REMOTE(conn->endpoint.state, PN_REMOTE_ACTIVE);
+ } else {
+ transport->disp->halt = true;
}
- PN_SET_REMOTE(conn->endpoint.state, PN_REMOTE_ACTIVE);
transport->open_rcvd = true;
return 0;
}
@@ -1215,9 +1259,7 @@ int pn_do_transfer(pn_dispatcher_t *disp
link->queued++;
}
- PN_ENSURE(delivery->bytes, delivery->capacity, delivery->size + disp->size);
- memmove(delivery->bytes + delivery->size, disp->payload, disp->size);
- delivery->size += disp->size;
+ pn_buffer_append(delivery->bytes, disp->payload, disp->size);
delivery->done = !more;
ssn_state->incoming_transfer_count++;
@@ -1382,29 +1424,103 @@ ssize_t pn_input(pn_transport_t *transpo
{
if (!transport) return PN_ARG_ERR;
- if (!available) {
- pn_do_error(transport, "amqp:connection:framing-error", "connection aborted");
- if (transport->disp->trace & (PN_TRACE_RAW | PN_TRACE_FRM))
- fprintf(stderr, " <- EOS\n");
- return PN_ERR;
+ size_t consumed = 0;
+
+ while (true) {
+ ssize_t n = transport->process_input(transport, bytes + consumed, available - consumed);
+ if (n > 0) {
+ consumed += n;
+ if (consumed >= available) {
+ break;
+ }
+ } else if (n == 0) {
+ break;
+ } else {
+ if (transport->disp->trace & (PN_TRACE_RAW | PN_TRACE_FRM))
+ pn_dispatcher_trace(transport->disp, 0, "<- EOS\n");
+ return n;
+ }
+ }
+
+ return consumed;
+}
+
+#define SASL_HEADER ("AMQP\x03\x01\x00\x00")
+
+static ssize_t pn_input_read_header(pn_transport_t *transport, char *bytes, size_t available,
+ const char *header, size_t size, const char *protocol,
+ ssize_t (*next)(pn_transport_t *, char *, size_t))
+{
+ const char *point = header + transport->header_count;
+ int delta = pn_min(available, size - transport->header_count);
+ if (!available || memcmp(bytes, point, delta)) {
+ char quoted[1024];
+ pn_quote_data(quoted, 1024, bytes, available);
+ return pn_error_format(transport->error, PN_ERR,
+ "%s header missmatch: '%s'", protocol, quoted);
+ } else {
+ transport->header_count += delta;
+ if (transport->header_count == size) {
+ transport->header_count = 0;
+ transport->process_input = next;
+
+ if (transport->disp->trace & PN_TRACE_FRM)
+ fprintf(stderr, " <- %s\n", protocol);
+ }
+ return delta;
}
+}
+static ssize_t pn_input_read_sasl_header(pn_transport_t *transport, char *bytes, size_t available)
+{
+ return pn_input_read_header(transport, bytes, available, SASL_HEADER, 8, "SASL", pn_input_read_sasl);
+}
+
+static ssize_t pn_input_read_sasl(pn_transport_t *transport, char *bytes, size_t available)
+{
+ pn_sasl_t *sasl = transport->sasl;
+ ssize_t n = pn_sasl_input(sasl, bytes, available);
+ if (n == PN_EOS) {
+ transport->process_input = pn_input_read_amqp_header;
+ return transport->process_input(transport, bytes, available);
+ } else {
+ return n;
+ }
+}
+
+#define AMQP_HEADER ("AMQP\x00\x01\x00\x00")
+
+static ssize_t pn_input_read_amqp_header(pn_transport_t *transport, char *bytes, size_t available)
+{
+ return pn_input_read_header(transport, bytes, available, AMQP_HEADER, 8,
+ "AMQP", pn_input_read_amqp);
+}
+
+static ssize_t pn_input_read_amqp(pn_transport_t *transport, char *bytes, size_t available)
+{
if (transport->close_rcvd) {
- pn_do_error(transport, "amqp:connection:framing-error", "data after close");
- if (transport->disp->trace & (PN_TRACE_RAW | PN_TRACE_FRM))
- fprintf(stderr, " <- EOS\n");
+ if (available > 0) {
+ pn_do_error(transport, "amqp:connection:framing-error", "data after close");
+ return PN_ERR;
+ } else {
+ return PN_EOS;
+ }
+ }
+
+ if (!available) {
+ pn_do_error(transport, "amqp:connection:framing-error", "connection aborted");
return PN_ERR;
}
+
ssize_t n = pn_dispatcher_input(transport->disp, bytes, available);
- if (n >= 0 && transport->close_rcvd) {
- if (transport->disp->trace & (PN_TRACE_RAW | PN_TRACE_FRM))
- fprintf(stderr, " <- EOS\n");
+ if (n < 0) {
+ return pn_error_set(transport->error, n, "dispatch error");
+ } else if (transport->close_rcvd) {
return PN_EOS;
- } else if (n < 0) {
- transport->error = n;
+ } else {
+ return n;
}
- return n;
}
bool pn_delivery_buffered(pn_delivery_t *delivery)
@@ -1413,7 +1529,7 @@ bool pn_delivery_buffered(pn_delivery_t
if (pn_is_sender(delivery->link)) {
pn_delivery_state_t *state = delivery->context;
if (state) {
- return (delivery->done && !state->sent) || delivery->size > 0;
+ return (delivery->done && !state->sent) || pn_buffer_size(delivery->bytes) > 0;
} else {
return delivery->done;
}
@@ -1561,16 +1677,16 @@ int pn_process_tpwork_sender(pn_transpor
delivery->context = state;
}
- if (state && !state->sent && (delivery->done || delivery->size > 0) &&
+ if (state && !state->sent && (delivery->done || pn_buffer_size(delivery->bytes) > 0) &&
ssn_state->outgoing_window > 0 && link_state->link_credit > 0) {
- if (delivery->bytes) {
- pn_set_payload(transport->disp, delivery->bytes, delivery->size);
- delivery->size = 0;
- }
+ pn_bytes_t bytes = pn_buffer_bytes(delivery->bytes);
+ pn_set_payload(transport->disp, bytes.start, bytes.size);
+ pn_buffer_clear(delivery->bytes);
+ pn_bytes_t tag = pn_buffer_bytes(delivery->tag);
int err = pn_post_frame(transport->disp, ssn_state->local_channel, "DL[IIzIoo]", TRANSFER,
link_state->local_handle, state->id,
- delivery->tag.size, delivery->tag.start,
- 0, delivery->local_settled, !delivery->done);
+ tag.size, tag.start, 0, delivery->local_settled,
+ !delivery->done);
if (err) return err;
ssn_state->outgoing_transfer_count++;
ssn_state->outgoing_window--;
@@ -1801,18 +1917,60 @@ int pn_process(pn_transport_t *transport
return 0;
}
-ssize_t pn_output(pn_transport_t *transport, char *bytes, size_t size)
+static ssize_t pn_output_write_header(pn_transport_t *transport,
+ char *bytes, size_t size,
+ const char *header, size_t hdrsize,
+ const char *protocol,
+ ssize_t (*next)(pn_transport_t *, char *, size_t))
+{
+ if (transport->disp->trace & PN_TRACE_FRM)
+ fprintf(stderr, " -> %s\n", protocol);
+ if (size >= hdrsize) {
+ memmove(bytes, header, hdrsize);
+ transport->process_output = next;
+ return hdrsize;
+ } else {
+ return pn_error_format(transport->error, PN_UNDERFLOW, "underflow writing %s header", protocol);
+ }
+}
+
+static ssize_t pn_output_write_sasl_header(pn_transport_t *transport, char *bytes, size_t size)
{
- if (!transport) return PN_ARG_ERR;
+ return pn_output_write_header(transport, bytes, size, SASL_HEADER, 8, "SASL",
+ pn_output_write_sasl);
+}
+
+static ssize_t pn_output_write_sasl(pn_transport_t *transport, char *bytes, size_t size)
+{
+ pn_sasl_t *sasl = transport->sasl;
+ ssize_t n = pn_sasl_output(sasl, bytes, size);
+ if (n == PN_EOS) {
+ transport->process_output = pn_output_write_amqp_header;
+ return 0;
+ } else {
+ return n;
+ }
+}
- if (!transport->error)
- transport->error = pn_process(transport);
+static ssize_t pn_output_write_amqp_header(pn_transport_t *transport, char *bytes, size_t size)
+{
+ return pn_output_write_header(transport, bytes, size, AMQP_HEADER, 8, "AMQP",
+ pn_output_write_amqp);
+}
+
+static ssize_t pn_output_write_amqp(pn_transport_t *transport, char *bytes, size_t size)
+{
+ if (!transport->connection) {
+ return 0;
+ }
+
+ if (!pn_error_code(transport->error)) {
+ pn_error_set(transport->error, pn_process(transport), "process error");
+ }
- if (!transport->disp->available && (transport->close_sent || transport->error)) {
- if (transport->disp->trace & (PN_TRACE_RAW | PN_TRACE_FRM))
- fprintf(stderr, " -> EOS\n");
- if (transport->error)
- return transport->error;
+ if (!transport->disp->available && (transport->close_sent || pn_error_code(transport->error))) {
+ if (pn_error_code(transport->error))
+ return pn_error_code(transport->error);
else
return PN_EOS;
}
@@ -1820,8 +1978,40 @@ ssize_t pn_output(pn_transport_t *transp
return pn_dispatcher_output(transport->disp, bytes, size);
}
+ssize_t pn_output(pn_transport_t *transport, char *bytes, size_t size)
+{
+ if (!transport) return PN_ARG_ERR;
+
+ size_t total = 0;
+
+ while (size - total > 0) {
+ ssize_t n = transport->process_output(transport, bytes + total, size - total);
+ if (n > 0) {
+ total += n;
+ } else if (n == 0) {
+ break;
+ } else if (n == PN_EOS) {
+ if (total > 0) {
+ return total;
+ } else {
+ if (transport->disp->trace & (PN_TRACE_RAW | PN_TRACE_FRM))
+ pn_dispatcher_trace(transport->disp, 0, "-> EOS\n");
+ return PN_EOS;
+ }
+ } else {
+ if (transport->disp->trace & (PN_TRACE_RAW | PN_TRACE_FRM))
+ pn_dispatcher_trace(transport->disp, 0, "-> EOS (%zi) %s\n", n,
+ pn_error_text(transport->error));
+ return n;
+ }
+ }
+
+ return total;
+}
+
void pn_trace(pn_transport_t *transport, pn_trace_t trace)
{
+ if (transport->sasl) pn_sasl_trace(transport->sasl, trace);
transport->disp->trace = trace;
}
@@ -1829,9 +2019,7 @@ ssize_t pn_send(pn_link_t *sender, const
{
pn_delivery_t *current = pn_current(sender);
if (!current) return PN_EOS;
- PN_ENSURE(current->bytes, current->capacity, current->size + n);
- memmove(current->bytes + current->size, bytes, n);
- current->size += n;
+ pn_buffer_append(current->bytes, bytes, n);
pn_add_tpwork(current);
return n;
}
@@ -1850,11 +2038,9 @@ ssize_t pn_recv(pn_link_t *receiver, cha
pn_delivery_t *delivery = receiver->current;
if (delivery) {
- if (delivery->size) {
- size_t size = n > delivery->size ? delivery->size : n;
- memmove(bytes, delivery->bytes, size);
- memmove(bytes, bytes + size, delivery->size - size);
- delivery->size -= size;
+ size_t size = pn_buffer_get(delivery->bytes, 0, n, bytes);
+ pn_buffer_trim(delivery->bytes, size, 0);
+ if (size) {
return size;
} else {
return delivery->done ? PN_EOS : 0;
@@ -1945,5 +2131,5 @@ bool pn_readable(pn_delivery_t *delivery
size_t pn_pending(pn_delivery_t *delivery)
{
- return delivery->size;
+ return pn_buffer_size(delivery->bytes);
}
Modified: qpid/proton/branches/driver_abstraction/proton-c/src/error.c
URL: http://svn.apache.org/viewvc/qpid/proton/branches/driver_abstraction/proton-c/src/error.c?rev=1382918&r1=1382917&r2=1382918&view=diff
==============================================================================
--- qpid/proton/branches/driver_abstraction/proton-c/src/error.c (original)
+++ qpid/proton/branches/driver_abstraction/proton-c/src/error.c Mon Sep 10 14:56:20 2012
@@ -19,8 +19,11 @@
*
*/
+#define _POSIX_C_SOURCE (200112)
+
#include <proton/error.h>
#include <stdlib.h>
+#include <string.h>
#include "util.h"
struct pn_error_t {
@@ -59,8 +62,10 @@ void pn_error_clear(pn_error_t *error)
int pn_error_set(pn_error_t *error, int code, const char *text)
{
pn_error_clear(error);
- error->code = code;
- error->text = pn_strdup(text);
+ if (code) {
+ error->code = code;
+ error->text = pn_strdup(text);
+ }
return code;
}
@@ -83,6 +88,13 @@ int pn_error_format(pn_error_t *error, i
return rcode;
}
+int pn_error_from_errno(pn_error_t *error, const char *msg)
+{
+ char err[1024];
+ strerror_r(errno, err, 1024);
+ return pn_error_format(error, PN_ERR, "%s: %s", msg, err);
+}
+
int pn_error_code(pn_error_t *error)
{
return error ? error->code : PN_ARG_ERR;
Modified: qpid/proton/branches/driver_abstraction/proton-c/src/message/message.c
URL: http://svn.apache.org/viewvc/qpid/proton/branches/driver_abstraction/proton-c/src/message/message.c?rev=1382918&r1=1382917&r2=1382918&view=diff
==============================================================================
--- qpid/proton/branches/driver_abstraction/proton-c/src/message/message.c (original)
+++ qpid/proton/branches/driver_abstraction/proton-c/src/message/message.c Mon Sep 10 14:56:20 2012
@@ -113,6 +113,7 @@ void pn_message_free(pn_message_t *msg)
pn_buffer_free(msg->reply_to_group_id);
pn_data_free(msg->data);
pn_data_free(msg->body);
+ pn_parser_free(msg->parser);
free(msg);
}
}
@@ -676,12 +677,12 @@ int pn_message_save_text(pn_message_t *m
}
uint64_t desc;
- pn_bytes_t str;
- bool scanned;
- int err = pn_data_scan(msg->body, "DL?S", &desc, &scanned, &str);
+ pn_bytes_t str = {0,0};
+ bool scanned, dscanned;
+ int err = pn_data_scan(msg->body, "?DL?S", &dscanned, &desc, &scanned, &str);
if (err) return err;
- if (desc == AMQP_VALUE && scanned) {
- if (str.size >= *size) {
+ if (dscanned && desc == AMQP_VALUE) {
+ if (scanned && str.size >= *size) {
return PN_OVERFLOW;
} else {
memcpy(data, str.start, str.size);
Modified: qpid/proton/branches/driver_abstraction/proton-c/src/messenger.c
URL: http://svn.apache.org/viewvc/qpid/proton/branches/driver_abstraction/proton-c/src/messenger.c?rev=1382918&r1=1382917&r2=1382918&view=diff
==============================================================================
--- qpid/proton/branches/driver_abstraction/proton-c/src/messenger.c (original)
+++ qpid/proton/branches/driver_abstraction/proton-c/src/messenger.c Mon Sep 10 14:56:20 2012
@@ -32,9 +32,6 @@ struct pn_messenger_t {
char *name;
int timeout;
pn_driver_t *driver;
- pn_connector_t *connectors[1024];
- size_t size;
- size_t listeners;
int credit;
uint64_t next_tag;
pn_error_t *error;
@@ -61,8 +58,6 @@ pn_messenger_t *pn_messenger(const char
m->name = build_name(name);
m->timeout = -1;
m->driver = pn_driver();
- m->size = 0;
- m->listeners = 0;
m->credit = 0;
m->next_tag = 0;
m->error = pn_error();
@@ -94,6 +89,7 @@ void pn_messenger_free(pn_messenger_t *m
free(messenger->name);
pn_driver_free(messenger->driver);
pn_error_free(messenger->error);
+ free(messenger);
}
}
@@ -119,8 +115,8 @@ void pn_messenger_flow(pn_messenger_t *m
{
while (messenger->credit > 0) {
int prev = messenger->credit;
- for (int i = 0; i < messenger->size; i++) {
- pn_connector_t *ctor = messenger->connectors[i];
+ pn_connector_t *ctor = pn_connector_head(messenger->driver);
+ while (ctor) {
pn_connection_t *conn = pn_connector_connection(ctor);
pn_link_t *link = pn_link_head(conn, PN_LOCAL_ACTIVE);
@@ -131,6 +127,8 @@ void pn_messenger_flow(pn_messenger_t *m
}
link = pn_link_next(link, PN_LOCAL_ACTIVE);
}
+
+ ctor = pn_connector_next(ctor);
}
if (messenger->credit == prev) break;
}
@@ -193,8 +191,10 @@ long int millis(struct timeval tv)
int pn_messenger_tsync(pn_messenger_t *messenger, bool (*predicate)(pn_messenger_t *), int timeout)
{
- for (int i = 0; i < messenger->size; i++) {
- pn_connector_process(messenger->connectors[i]);
+ pn_connector_t *ctor = pn_connector_head(messenger->driver);
+ while (ctor) {
+ pn_connector_process(ctor);
+ ctor = pn_connector_next(ctor);
}
struct timeval now;
@@ -219,7 +219,6 @@ int pn_messenger_tsync(pn_messenger_t *m
pn_connection_t *conn = pn_connection();
pn_connection_set_container(conn, messenger->name);
pn_connector_set_connection(c, conn);
- messenger->connectors[messenger->size++] = c;
}
pn_connector_t *c;
@@ -228,17 +227,10 @@ int pn_messenger_tsync(pn_messenger_t *m
pn_connection_t *conn = pn_connector_connection(c);
pn_messenger_endpoints(messenger, conn);
if (pn_connector_closed(c)) {
- for (int i = 0; i < messenger->size; i++) {
- if (c == messenger->connectors[i]) {
- memmove(messenger->connectors + i, messenger->connectors + i + 1, messenger->size - i - 1);
- messenger->size--;
- pn_connector_free(c);
- pn_messenger_reclaim(messenger, conn);
- pn_connection_free(conn);
- pn_messenger_flow(messenger);
- break;
- }
- }
+ pn_connector_free(c);
+ pn_messenger_reclaim(messenger, conn);
+ pn_connection_free(conn);
+ pn_messenger_flow(messenger);
} else {
pn_connector_process(c);
}
@@ -266,15 +258,15 @@ int pn_messenger_start(pn_messenger_t *m
bool pn_messenger_stopped(pn_messenger_t *messenger)
{
- return messenger->size == 0;
+ return pn_connector_head(messenger->driver) == NULL;
}
int pn_messenger_stop(pn_messenger_t *messenger)
{
if (!messenger) return PN_ARG_ERR;
- for (int i = 0; i < messenger->size; i++) {
- pn_connector_t *ctor = messenger->connectors[i];
+ pn_connector_t *ctor = pn_connector_head(messenger->driver);
+ while (ctor) {
pn_connection_t *conn = pn_connector_connection(ctor);
pn_link_t *link = pn_link_head(conn, PN_LOCAL_ACTIVE);
while (link) {
@@ -282,6 +274,15 @@ int pn_messenger_stop(pn_messenger_t *me
link = pn_link_next(link, PN_LOCAL_ACTIVE);
}
pn_connection_close(conn);
+ ctor = pn_connector_next(ctor);
+ }
+
+ pn_listener_t *l = pn_listener_head(messenger->driver);
+ while (l) {
+ pn_listener_close(l);
+ pn_listener_t *prev = l;
+ l = pn_listener_next(l);
+ pn_listener_free(prev);
}
return pn_messenger_sync(messenger, pn_messenger_stopped);
@@ -299,6 +300,7 @@ static void parse_address(char *address,
if (*c == '/') {
*c = '\0';
*name = c + 1;
+ break;
}
}
} else {
@@ -313,7 +315,7 @@ bool pn_streq(const char *a, const char
pn_connection_t *pn_messenger_domain(pn_messenger_t *messenger, const char *domain)
{
- char buf[strlen(domain) + 1];
+ char buf[domain ? strlen(domain) + 1 : 1];
if (domain) {
strcpy(buf, domain);
} else {
@@ -325,17 +327,18 @@ pn_connection_t *pn_messenger_domain(pn_
char *port = "5672";
parse_url(buf, &user, &pass, &host, &port);
- for (int i = 0; i < messenger->size; i++) {
- pn_connection_t *connection = pn_connector_connection(messenger->connectors[i]);
+ pn_connector_t *ctor = pn_connector_head(messenger->driver);
+ while (ctor) {
+ pn_connection_t *connection = pn_connector_connection(ctor);
const char *container = pn_connection_remote_container(connection);
const char *hostname = pn_connection_hostname(connection);
if (pn_streq(container, domain) || pn_streq(hostname, domain))
return connection;
+ ctor = pn_connector_next(ctor);
}
pn_connector_t *connector = pn_connector(messenger->driver, host, port, NULL);
if (!connector) return NULL;
- messenger->connectors[messenger->size++] = connector;
pn_sasl_t *sasl = pn_connector_sasl(connector);
if (user) {
pn_sasl_plain(sasl, user, pass);
@@ -414,11 +417,7 @@ pn_listener_t *pn_messenger_isource(pn_m
char *port = "5672";
parse_url(domain + 1, &user, &pass, &host, &port);
- pn_listener_t *listener = pn_listener(messenger->driver, host, port, NULL);
- if (listener) {
- messenger->listeners++;
- }
- return listener;
+ return pn_listener(messenger->driver, host, port, NULL);
}
int pn_messenger_subscribe(pn_messenger_t *messenger, const char *source)
@@ -430,7 +429,8 @@ int pn_messenger_subscribe(pn_messenger_
return 0;
} else {
return pn_error_format(messenger->error, PN_ERR,
- "unable to subscribe to source: %s", source);
+ "unable to subscribe to source: %s (%s)", source,
+ pn_driver_error(messenger->driver));
}
} else if (len >= 2 && source[0] == '/' && source[1] == '/') {
pn_link_t *src = pn_messenger_source(messenger, source);
@@ -438,7 +438,8 @@ int pn_messenger_subscribe(pn_messenger_
return 0;
} else {
return pn_error_format(messenger->error, PN_ERR,
- "unable to subscribe to source: %s", source);
+ "unable to subscribe to source: %s (%s)", source,
+ pn_driver_error(messenger->driver));
}
} else {
return 0;
@@ -471,7 +472,8 @@ int pn_messenger_put(pn_messenger_t *mes
pn_link_t *sender = pn_messenger_target(messenger, address);
if (!sender)
return pn_error_format(messenger->error, PN_ERR,
- "unable to send to address: %s", address);
+ "unable to send to address: %s (%s)", address,
+ pn_driver_error(messenger->driver));
// XXX: proper tag
char tag[8];
void *ptr = &tag;
@@ -504,8 +506,8 @@ int pn_messenger_put(pn_messenger_t *mes
bool pn_messenger_sent(pn_messenger_t *messenger)
{
- for (int i = 0; i < messenger->size; i++) {
- pn_connector_t *ctor = messenger->connectors[i];
+ pn_connector_t *ctor = pn_connector_head(messenger->driver);
+ while (ctor) {
pn_connection_t *conn = pn_connector_connection(ctor);
pn_link_t *link = pn_link_head(conn, PN_LOCAL_ACTIVE);
@@ -523,6 +525,8 @@ bool pn_messenger_sent(pn_messenger_t *m
}
link = pn_link_next(link, PN_LOCAL_ACTIVE);
}
+
+ ctor = pn_connector_next(ctor);
}
return true;
@@ -530,8 +534,8 @@ bool pn_messenger_sent(pn_messenger_t *m
bool pn_messenger_rcvd(pn_messenger_t *messenger)
{
- for (int i = 0; i < messenger->size; i++) {
- pn_connector_t *ctor = messenger->connectors[i];
+ pn_connector_t *ctor = pn_connector_head(messenger->driver);
+ while (ctor) {
pn_connection_t *conn = pn_connector_connection(ctor);
pn_delivery_t *d = pn_work_head(conn);
@@ -541,6 +545,7 @@ bool pn_messenger_rcvd(pn_messenger_t *m
}
d = pn_work_next(d);
}
+ ctor = pn_connector_next(ctor);
}
return false;
@@ -554,7 +559,7 @@ int pn_messenger_send(pn_messenger_t *me
int pn_messenger_recv(pn_messenger_t *messenger, int n)
{
if (!messenger) return PN_ARG_ERR;
- if (!messenger->listeners && !messenger->size)
+ if (!pn_listener_head(messenger->driver) && !pn_connector_head(messenger->driver))
return pn_error_format(messenger->error, PN_STATE_ERR, "no valid sources");
messenger->credit += n;
pn_messenger_flow(messenger);
@@ -565,8 +570,8 @@ int pn_messenger_get(pn_messenger_t *mes
{
if (!messenger) return PN_ARG_ERR;
- for (int i = 0; i < messenger->size; i++) {
- pn_connector_t *ctor = messenger->connectors[i];
+ pn_connector_t *ctor = pn_connector_head(messenger->driver);
+ while (ctor) {
pn_connection_t *conn = pn_connector_connection(ctor);
pn_delivery_t *d = pn_work_head(conn);
@@ -592,6 +597,8 @@ int pn_messenger_get(pn_messenger_t *mes
}
d = pn_work_next(d);
}
+
+ ctor = pn_connector_next(ctor);
}
// XXX: need to drain credit before returning EOS
@@ -605,8 +612,8 @@ int pn_messenger_queued(pn_messenger_t *
int result = 0;
- for (int i = 0; i < messenger->size; i++) {
- pn_connector_t *ctor = messenger->connectors[i];
+ pn_connector_t *ctor = pn_connector_head(messenger->driver);
+ while (ctor) {
pn_connection_t *conn = pn_connector_connection(ctor);
pn_link_t *link = pn_link_head(conn, PN_LOCAL_ACTIVE);
@@ -620,6 +627,7 @@ int pn_messenger_queued(pn_messenger_t *
}
link = pn_link_next(link, PN_LOCAL_ACTIVE);
}
+ ctor = pn_connector_next(ctor);
}
return result;
Modified: qpid/proton/branches/driver_abstraction/proton-c/src/sasl/sasl.c
URL: http://svn.apache.org/viewvc/qpid/proton/branches/driver_abstraction/proton-c/src/sasl/sasl.c?rev=1382918&r1=1382917&r2=1382918&view=diff
==============================================================================
--- qpid/proton/branches/driver_abstraction/proton-c/src/sasl/sasl.c (original)
+++ qpid/proton/branches/driver_abstraction/proton-c/src/sasl/sasl.c Mon Sep 10 14:56:20 2012
@@ -27,6 +27,7 @@
#include <proton/sasl.h>
#include "protocol.h"
#include "../dispatcher/dispatcher.h"
+#include "../engine/engine-internal.h"
#include "../util.h"
#define SCRATCH (1024)
@@ -53,30 +54,36 @@ int pn_do_challenge(pn_dispatcher_t *dis
int pn_do_response(pn_dispatcher_t *disp);
int pn_do_outcome(pn_dispatcher_t *disp);
-pn_sasl_t *pn_sasl()
+pn_sasl_t *pn_sasl(pn_transport_t *transport)
{
- pn_sasl_t *sasl = malloc(sizeof(pn_sasl_t));
- sasl->disp = pn_dispatcher(1, sasl);
+ if (!transport->sasl) {
+ pn_sasl_t *sasl = malloc(sizeof(pn_sasl_t));
+ sasl->disp = pn_dispatcher(1, sasl);
+
+ pn_dispatcher_action(sasl->disp, SASL_INIT, "SASL-INIT", pn_do_init);
+ pn_dispatcher_action(sasl->disp, SASL_MECHANISMS, "SASL-MECHANISMS", pn_do_mechanisms);
+ pn_dispatcher_action(sasl->disp, SASL_CHALLENGE, "SASL-CHALLENGE", pn_do_challenge);
+ pn_dispatcher_action(sasl->disp, SASL_RESPONSE, "SASL-RESPONSE", pn_do_response);
+ pn_dispatcher_action(sasl->disp, SASL_OUTCOME, "SASL-OUTCOME", pn_do_outcome);
- pn_dispatcher_action(sasl->disp, SASL_INIT, "SASL-INIT", pn_do_init);
- pn_dispatcher_action(sasl->disp, SASL_MECHANISMS, "SASL-MECHANISMS", pn_do_mechanisms);
- pn_dispatcher_action(sasl->disp, SASL_CHALLENGE, "SASL-CHALLENGE", pn_do_challenge);
- pn_dispatcher_action(sasl->disp, SASL_RESPONSE, "SASL-RESPONSE", pn_do_response);
- pn_dispatcher_action(sasl->disp, SASL_OUTCOME, "SASL-OUTCOME", pn_do_outcome);
-
- sasl->client = false;
- sasl->configured = false;
- sasl->mechanisms = NULL;
- sasl->remote_mechanisms = NULL;
- sasl->send_data = (pn_bytes_t) {0, NULL};
- sasl->recv_data = (pn_bytes_t) {0, NULL};
- sasl->outcome = PN_SASL_NONE;
- sasl->sent_init = false;
- sasl->rcvd_init = false;
- sasl->sent_done = false;
- sasl->rcvd_done = false;
+ sasl->client = false;
+ sasl->configured = false;
+ sasl->mechanisms = NULL;
+ sasl->remote_mechanisms = NULL;
+ sasl->send_data = (pn_bytes_t) {0, NULL};
+ sasl->recv_data = (pn_bytes_t) {0, NULL};
+ sasl->outcome = PN_SASL_NONE;
+ sasl->sent_init = false;
+ sasl->rcvd_init = false;
+ sasl->sent_done = false;
+ sasl->rcvd_done = false;
+
+ transport->sasl = sasl;
+
+ pn_transport_sasl_init(transport);
+ }
- return sasl;
+ return transport->sasl;
}
pn_sasl_state_t pn_sasl_state(pn_sasl_t *sasl)
@@ -201,12 +208,14 @@ void pn_sasl_trace(pn_sasl_t *sasl, pn_t
void pn_sasl_free(pn_sasl_t *sasl)
{
- free(sasl->mechanisms);
- free(sasl->remote_mechanisms);
- free(sasl->send_data.start);
- free(sasl->recv_data.start);
- pn_dispatcher_free(sasl->disp);
- free(sasl);
+ if (sasl) {
+ free(sasl->mechanisms);
+ free(sasl->remote_mechanisms);
+ free(sasl->send_data.start);
+ free(sasl->recv_data.start);
+ pn_dispatcher_free(sasl->disp);
+ free(sasl);
+ }
}
void pn_client_init(pn_sasl_t *sasl)
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org