You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rh...@apache.org on 2012/09/07 03:19:17 UTC
svn commit: r1381839 - in /qpid/proton/trunk: proton-c/bindings/php/
proton-c/bindings/python/ proton-c/include/proton/ proton-c/src/
proton-c/src/dispatcher/ proton-c/src/engine/ proton-c/src/message/
proton-c/src/sasl/ tests/proton_tests/
Author: rhs
Date: Fri Sep 7 01:19:16 2012
New Revision: 1381839
URL: http://svn.apache.org/viewvc?rev=1381839&view=rev
Log:
decoupled transport from connection and moved sasl layer from driver to transport; fixed several memory leaks shown by valgrind; made messenger use driver's connection and listener iterators in favor of keeping internal duplicate structures
Added:
qpid/proton/trunk/proton-c/src/sasl/sasl-internal.h
qpid/proton/trunk/tests/proton_tests/transport.py
Modified:
qpid/proton/trunk/proton-c/bindings/php/php.i
qpid/proton/trunk/proton-c/bindings/python/python.i
qpid/proton/trunk/proton-c/include/proton/cproton.i
qpid/proton/trunk/proton-c/include/proton/engine.h
qpid/proton/trunk/proton-c/include/proton/parser.h
qpid/proton/trunk/proton-c/include/proton/sasl.h
qpid/proton/trunk/proton-c/src/buffer.c
qpid/proton/trunk/proton-c/src/dispatcher/dispatcher.c
qpid/proton/trunk/proton-c/src/dispatcher/dispatcher.h
qpid/proton/trunk/proton-c/src/driver.c
qpid/proton/trunk/proton-c/src/engine/engine-internal.h
qpid/proton/trunk/proton-c/src/engine/engine.c
qpid/proton/trunk/proton-c/src/error.c
qpid/proton/trunk/proton-c/src/message/message.c
qpid/proton/trunk/proton-c/src/messenger.c
qpid/proton/trunk/proton-c/src/sasl/sasl.c
qpid/proton/trunk/proton-c/src/scanner.c
qpid/proton/trunk/proton-c/src/util.h
qpid/proton/trunk/tests/proton_tests/__init__.py
qpid/proton/trunk/tests/proton_tests/engine.py
qpid/proton/trunk/tests/proton_tests/message.py
qpid/proton/trunk/tests/proton_tests/messenger.py
qpid/proton/trunk/tests/proton_tests/sasl.py
Modified: qpid/proton/trunk/proton-c/bindings/php/php.i
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/bindings/php/php.i?rev=1381839&r1=1381838&r2=1381839&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/bindings/php/php.i (original)
+++ qpid/proton/trunk/proton-c/bindings/php/php.i Fri Sep 7 01:19:16 2012
@@ -67,8 +67,6 @@ ssize_t pn_input(pn_transport_t *transpo
ssize_t pn_sasl_send(pn_sasl_t *sasl, char *STRING, size_t LENGTH);
%ignore pn_sasl_send;
-ssize_t pn_sasl_input(pn_sasl_t *sasl, char *STRING, size_t LENGTH);
-%ignore pn_sasl_input;
// Use the OUTPUT_BUFFER,OUTPUT_LEN typemap to allow these functions to return
@@ -110,18 +108,6 @@ ssize_t pn_sasl_input(pn_sasl_t *sasl, c
%}
%ignore pn_output;
-%rename(pn_sasl_output) wrap_pn_output;
-// in PHP: array = pn_sasl_output(sasl, MAXLEN);
-// array[0] = size || error code
-// array[1] = native string containing binary data
-%inline %{
- void wrap_pn_sasl_output(pn_sasl_t *sasl, size_t maxCount, char **OUTPUT_BUFFER, ssize_t *OUTPUT_LEN) {
- *OUTPUT_BUFFER = emalloc(sizeof(char) * maxCount);
- *OUTPUT_LEN = pn_sasl_output(sasl, *OUTPUT_BUFFER, maxCount);
- }
-%}
-%ignore pn_sasl_output;
-
%rename(pn_message_data) wrap_pn_message_data;
// in PHP: array = pn_message_data("binary message data", MAXLEN);
// array[0] = size || error code
Modified: qpid/proton/trunk/proton-c/bindings/python/python.i
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/bindings/python/python.i?rev=1381839&r1=1381838&r2=1381839&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/bindings/python/python.i (original)
+++ qpid/proton/trunk/proton-c/bindings/python/python.i Fri Sep 7 01:19:16 2012
@@ -104,23 +104,6 @@ ssize_t pn_input(pn_transport_t *transpo
%}
%ignore pn_output;
-ssize_t pn_sasl_input(pn_sasl_t *sasl, char *STRING, size_t LENGTH);
-%ignore pn_sasl_input;
-
-%rename(pn_sasl_output) wrap_pn_sasl_output;
-%inline %{
- int wrap_pn_sasl_output(pn_sasl_t *sasl, char *OUTPUT, size_t *OUTPUT_SIZE) {
- ssize_t sz = pn_sasl_output(sasl, OUTPUT, *OUTPUT_SIZE);
- if (sz >= 0) {
- *OUTPUT_SIZE = sz;
- } else {
- *OUTPUT_SIZE = 0;
- }
- return sz;
- }
-%}
-%ignore pn_sasl_output;
-
%rename(pn_delivery) wrap_pn_delivery;
%inline %{
pn_delivery_t *wrap_pn_delivery(pn_link_t *link, char *STRING, size_t LENGTH) {
Modified: qpid/proton/trunk/proton-c/include/proton/cproton.i
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/include/proton/cproton.i?rev=1381839&r1=1381838&r2=1381839&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/include/proton/cproton.i (original)
+++ qpid/proton/trunk/proton-c/include/proton/cproton.i Fri Sep 7 01:19:16 2012
@@ -981,34 +981,6 @@
check_sasl_outcome(pn_sasl_outcome);
}
-%contract pn_sasl_input(pn_sasl_t *sasl, char *bytes, size_t available)
-{
- require:
- sasl != NULL;
- bytes != NULL;
- available > 0;
-}
-
-%contract pn_sasl_output(pn_sasl_t *sasl, char *bytes, size_t size)
-{
- require:
- sasl != NULL;
- bytes != NULL;
- size > 0;
-}
-
-%contract pn_sasl_trace(pn_sasl_t *sasl, pn_trace_t trace)
-{
- require:
- sasl != NULL;
-}
-
-%contract pn_sasl_free(pn_sasl_t *sasl)
-{
- require:
- sasl != NULL;
-}
-
%include "proton/sasl.h"
%contract pn_driver(void)
Modified: qpid/proton/trunk/proton-c/include/proton/engine.h
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/include/proton/engine.h?rev=1381839&r1=1381838&r2=1381839&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/include/proton/engine.h (original)
+++ qpid/proton/trunk/proton-c/include/proton/engine.h Fri Sep 7 01:19:16 2012
@@ -148,15 +148,22 @@ pn_delivery_t *pn_work_next(pn_delivery_
*/
pn_session_t *pn_session(pn_connection_t *connection);
-/** Factory for creating the connection's transport.
+/** Factory for creating a transport.
*
- * The transport used by the connection to interface with the network.
- * There can only be one transport associated with a connection.
+ * A transport to be used by a connection to interface with the
+ * network. There can only be one connection associated with a
+ * transport. See pn_transport_bind().
*
- * @param[in] connection connection that will use the transport
- * @return pointer to new session
+ * @return pointer to new transport
*/
-pn_transport_t *pn_transport(pn_connection_t *connection);
+pn_transport_t *pn_transport(void);
+
+/** Binds the transport to an AMQP connection endpoint.
+ *
+ * @return an error code, or 0 on success
+ */
+
+int pn_transport_bind(pn_transport_t *transport, pn_connection_t *connection);
/** Retrieve the first Session that matches the given state mask.
*
Modified: qpid/proton/trunk/proton-c/include/proton/parser.h
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/include/proton/parser.h?rev=1381839&r1=1381838&r2=1381839&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/include/proton/parser.h (original)
+++ qpid/proton/trunk/proton-c/include/proton/parser.h Fri Sep 7 01:19:16 2012
@@ -30,7 +30,7 @@ extern "C" {
typedef struct pn_parser_t pn_parser_t;
-pn_parser_t *pn_parser();
+pn_parser_t *pn_parser(void);
int pn_parser_parse(pn_parser_t *parser, const char *str, pn_atoms_t *atoms);
int pn_parser_errno(pn_parser_t *parser);
const char *pn_parser_error(pn_parser_t *parser);
Modified: qpid/proton/trunk/proton-c/include/proton/sasl.h
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/include/proton/sasl.h?rev=1381839&r1=1381838&r2=1381839&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/include/proton/sasl.h (original)
+++ qpid/proton/trunk/proton-c/include/proton/sasl.h Fri Sep 7 01:19:16 2012
@@ -24,6 +24,7 @@
#include <sys/types.h>
#include <stdbool.h>
+#include <proton/engine.h>
#ifdef __cplusplus
extern "C" {
@@ -64,7 +65,7 @@ typedef enum {
*
* @return a new SASL object representing the layer.
*/
-pn_sasl_t *pn_sasl();
+pn_sasl_t *pn_sasl(pn_transport_t *transport);
/** Access the current state of the layer.
*
@@ -163,39 +164,6 @@ void pn_sasl_done(pn_sasl_t *sasl, pn_sa
*/
pn_sasl_outcome_t pn_sasl_outcome(pn_sasl_t *sasl);
-/** Decode input data bytes into SASL frames, and process them.
- *
- * This function is called by the driver layer to pass data received
- * from the remote peer into the SASL layer.
- *
- * @param[in] sasl the SASL layer.
- * @param[in] bytes buffer of frames to process
- * @param[in] available number of octets of data in 'bytes'
- * @return the number of bytes consumed, or error code if < 0
- */
-ssize_t pn_sasl_input(pn_sasl_t *sasl, char *bytes, size_t available);
-
-/** Gather output frames from the layer.
- *
- * This function is used by the driver to poll the SASL layer for data
- * that will be sent to the remote peer.
- *
- * @param[in] sasl The SASL layer.
- * @param[out] bytes to be filled with encoded frames.
- * @param[in] size space available in bytes array.
- * @return the number of octets written to bytes, or error code if < 0
- */
-ssize_t pn_sasl_output(pn_sasl_t *sasl, char *bytes, size_t size);
-
-void pn_sasl_trace(pn_sasl_t *sasl, pn_trace_t trace);
-
-/** Destructor for the given SASL layer.
- *
- * @param[in] sasl the SASL object to free. No longer valid on
- * return.
- */
-void pn_sasl_free(pn_sasl_t *sasl);
-
#ifdef __cplusplus
}
#endif
Modified: qpid/proton/trunk/proton-c/src/buffer.c
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/buffer.c?rev=1381839&r1=1381838&r2=1381839&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/buffer.c (original)
+++ qpid/proton/trunk/proton-c/src/buffer.c Fri Sep 7 01:19:16 2012
@@ -139,9 +139,6 @@ int pn_buffer_ensure(pn_buffer_t *buf, s
return 0;
}
-#define min(X,Y) ((X) > (Y) ? (Y) : (X))
-#define max(X,Y) ((X) < (Y) ? (Y) : (X))
-
int pn_buffer_append(pn_buffer_t *buf, const char *bytes, size_t size)
{
int err = pn_buffer_ensure(buf, size);
@@ -149,7 +146,7 @@ int pn_buffer_append(pn_buffer_t *buf, c
size_t tail = pn_buffer_tail(buf);
size_t tail_space = pn_buffer_tail_space(buf);
- size_t n = min(tail_space, size);
+ size_t n = pn_min(tail_space, size);
memmove(buf->bytes + tail, bytes, n);
memmove(buf->bytes, bytes + n, size - n);
@@ -166,7 +163,7 @@ int pn_buffer_prepend(pn_buffer_t *buf,
size_t head = pn_buffer_head(buf);
size_t head_space = pn_buffer_head_space(buf);
- size_t n = min(head_space, size);
+ size_t n = pn_min(head_space, size);
memmove(buf->bytes + head - n, bytes + size - n, n);
memmove(buf->bytes + buf->capacity - (size - n), bytes, size - n);
Modified: qpid/proton/trunk/proton-c/src/dispatcher/dispatcher.c
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/dispatcher/dispatcher.c?rev=1381839&r1=1381838&r2=1381839&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/dispatcher/dispatcher.c (original)
+++ qpid/proton/trunk/proton-c/src/dispatcher/dispatcher.c Fri Sep 7 01:19:16 2012
@@ -82,8 +82,8 @@ static void pn_do_trace(pn_dispatcher_t
uint8_t code = scanned ? code64 : 0;
size_t n = SCRATCH;
pn_data_format(args, disp->scratch, &n);
- fprintf(stderr, "[%p:%u] %s %s %s", (void *) disp, ch,
- dir == OUT ? "->" : "<-", disp->names[code], disp->scratch);
+ pn_dispatcher_trace(disp, ch, "%s %s %s", dir == OUT ? "->" : "<-",
+ disp->names[code], disp->scratch);
if (size) {
size_t capacity = 4*size + 1;
char buf[capacity];
@@ -95,6 +95,16 @@ static void pn_do_trace(pn_dispatcher_t
}
}
+void pn_dispatcher_trace(pn_dispatcher_t *disp, uint16_t ch, char *fmt, ...)
+{
+ va_list ap;
+ fprintf(stderr, "[%p:%u] ", (void *) disp, ch);
+
+ va_start(ap, fmt);
+ vfprintf(stderr, fmt, ap);
+ va_end(ap);
+}
+
ssize_t pn_dispatcher_input(pn_dispatcher_t *disp, char *bytes, size_t available)
{
size_t read = 0;
Modified: qpid/proton/trunk/proton-c/src/dispatcher/dispatcher.h
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/dispatcher/dispatcher.h?rev=1381839&r1=1381838&r2=1381839&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/dispatcher/dispatcher.h (original)
+++ qpid/proton/trunk/proton-c/src/dispatcher/dispatcher.h Fri Sep 7 01:19:16 2012
@@ -64,5 +64,6 @@ void pn_set_payload(pn_dispatcher_t *dis
int pn_post_frame(pn_dispatcher_t *disp, uint16_t ch, const char *fmt, ...);
ssize_t pn_dispatcher_input(pn_dispatcher_t *disp, char *bytes, size_t available);
ssize_t pn_dispatcher_output(pn_dispatcher_t *disp, char *bytes, size_t size);
+void pn_dispatcher_trace(pn_dispatcher_t *disp, uint16_t ch, char *fmt, ...);
#endif /* dispatcher.h */
Modified: qpid/proton/trunk/proton-c/src/driver.c
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/driver.c?rev=1381839&r1=1381838&r2=1381839&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/driver.c (original)
+++ qpid/proton/trunk/proton-c/src/driver.c Fri Sep 7 01:19:16 2012
@@ -94,11 +94,8 @@ struct pn_connector_t {
bool input_eos;
size_t output_size;
char output[IO_BUF_SIZE];
- pn_sasl_t *sasl;
pn_connection_t *connection;
pn_transport_t *transport;
- ssize_t (*process_input)(pn_connector_t *);
- ssize_t (*process_output)(pn_connector_t *);
bool input_done;
bool output_done;
pn_listener_t *listener;
@@ -332,15 +329,6 @@ static void pn_connector_read(pn_connect
static void pn_connector_write(pn_connector_t *ctor);
static time_t pn_connector_tick(pn_connector_t *ctor, time_t now);
-static ssize_t pn_connector_read_sasl_header(pn_connector_t *ctor);
-static ssize_t pn_connector_read_sasl(pn_connector_t *ctor);
-static ssize_t pn_connector_read_amqp_header(pn_connector_t *ctor);
-static ssize_t pn_connector_read_amqp(pn_connector_t *ctor);
-static ssize_t pn_connector_write_sasl_header(pn_connector_t *ctor);
-static ssize_t pn_connector_write_sasl(pn_connector_t *ctor);
-static ssize_t pn_connector_write_amqp_header(pn_connector_t *ctor);
-static ssize_t pn_connector_write_amqp(pn_connector_t *ctor);
-
pn_connector_t *pn_connector_fd(pn_driver_t *driver, int fd, void *context)
{
if (!driver) return NULL;
@@ -366,11 +354,8 @@ pn_connector_t *pn_connector_fd(pn_drive
c->input_size = 0;
c->input_eos = false;
c->output_size = 0;
- c->sasl = pn_sasl();
c->connection = NULL;
- c->transport = NULL;
- c->process_input = pn_connector_read_sasl_header;
- c->process_output = pn_connector_write_sasl_header;
+ c->transport = pn_transport();
c->input_done = false;
c->output_done = false;
c->context = context;
@@ -396,20 +381,19 @@ void pn_connector_trace(pn_connector_t *
{
if (!ctor) return;
ctor->trace = trace;
- if (ctor->sasl) pn_sasl_trace(ctor->sasl, trace);
if (ctor->transport) pn_trace(ctor->transport, trace);
}
pn_sasl_t *pn_connector_sasl(pn_connector_t *ctor)
{
- return ctor ? ctor->sasl : NULL;
+ return ctor ? pn_sasl(ctor->transport) : NULL;
}
void pn_connector_set_connection(pn_connector_t *ctor, pn_connection_t *connection)
{
if (!ctor) return;
ctor->connection = connection;
- ctor->transport = pn_transport(connection);
+ pn_transport_bind(ctor->transport, connection);
if (ctor->transport) pn_trace(ctor->transport, ctor->trace);
}
@@ -457,8 +441,8 @@ void pn_connector_free(pn_connector_t *c
if (ctor->driver) pn_driver_remove_connector(ctor->driver, ctor);
ctor->connection = NULL;
+ pn_transport_free(ctor->transport);
ctor->transport = NULL;
- pn_sasl_free(ctor->sasl);
free(ctor);
}
@@ -482,93 +466,18 @@ static void pn_connector_consume(pn_conn
static void pn_connector_process_input(pn_connector_t *ctor)
{
- while (!ctor->input_done && (ctor->input_size > 0 || ctor->input_eos)) {
- ssize_t n = ctor->process_input(ctor);
- if (n > 0) {
- pn_connector_consume(ctor, n);
- } else if (n == 0) {
- break;
- } else {
- if (n == PN_EOS) {
- pn_connector_consume(ctor, ctor->input_size);
+ pn_transport_t *transport = ctor->transport;
+ if (!ctor->input_done) {
+ if (ctor->input_size > 0 || ctor->input_eos) {
+ ssize_t n = pn_input(transport, ctor->input, ctor->input_size);
+ if (n >= 0) {
+ pn_connector_consume(ctor, n);
} else {
- fprintf(stderr, "error in process_input: %s\n", pn_code(n));
+ pn_connector_consume(ctor, ctor->input_size);
+ ctor->input_done = true;
}
- ctor->input_done = true;
- break;
- }
- }
-}
-
-static ssize_t pn_connector_read_sasl_header(pn_connector_t *ctor)
-{
- if (ctor->input_size >= 8) {
- if (memcmp(ctor->input, "AMQP\x03\x01\x00\x00", 8)) {
- fprintf(stderr, "sasl header missmatch: ");
- pn_fprint_data(stderr, ctor->input, ctor->input_size);
- fprintf(stderr, "\n");
- ctor->output_done = true;
- return PN_ERR;
- } else {
- if (ctor->trace & PN_TRACE_FRM)
- fprintf(stderr, " <- AMQP SASL 1.0\n");
- ctor->process_input = pn_connector_read_sasl;
- return 8;
- }
- } else if (ctor->input_eos) {
- fprintf(stderr, "sasl header missmatch: ");
- pn_fprint_data(stderr, ctor->input, ctor->input_size);
- fprintf(stderr, "\n");
- ctor->output_done = true;
- return PN_ERR;
- }
-
- return 0;
-}
-
-static ssize_t pn_connector_read_sasl(pn_connector_t *ctor)
-{
- pn_sasl_t *sasl = ctor->sasl;
- ssize_t n = pn_sasl_input(sasl, ctor->input, ctor->input_size);
- if (n == PN_EOS) {
- ctor->process_input = pn_connector_read_amqp_header;
- return ctor->process_input(ctor);
- } else {
- return n;
- }
-}
-
-static ssize_t pn_connector_read_amqp_header(pn_connector_t *ctor)
-{
- if (ctor->input_size >= 8) {
- if (memcmp(ctor->input, "AMQP\x00\x01\x00\x00", 8)) {
- fprintf(stderr, "amqp header missmatch: ");
- pn_fprint_data(stderr, ctor->input, ctor->input_size);
- fprintf(stderr, "\n");
- ctor->output_done = true;
- return PN_ERR;
- } else {
- if (ctor->trace & PN_TRACE_FRM)
- fprintf(stderr, " <- AMQP 1.0\n");
- ctor->process_input = pn_connector_read_amqp;
- return 8;
}
- } else if (ctor->input_eos) {
- fprintf(stderr, "amqp header missmatch: ");
- pn_fprint_data(stderr, ctor->input, ctor->input_size);
- fprintf(stderr, "\n");
- ctor->output_done = true;
- return PN_ERR;
}
-
- return 0;
-}
-
-static ssize_t pn_connector_read_amqp(pn_connector_t *ctor)
-{
- if (!ctor->transport) return 0;
- pn_transport_t *transport = ctor->transport;
- return pn_input(transport, ctor->input, ctor->input_size);
}
static char *pn_connector_output(pn_connector_t *ctor)
@@ -583,18 +492,13 @@ static size_t pn_connector_available(pn_
static void pn_connector_process_output(pn_connector_t *ctor)
{
- while (!ctor->output_done && pn_connector_available(ctor) > 0) {
- ssize_t n = ctor->process_output(ctor);
- if (n > 0) {
+ pn_transport_t *transport = ctor->transport;
+ if (!ctor->output_done) {
+ ssize_t n = pn_output(transport, pn_connector_output(ctor), pn_connector_available(ctor));
+ if (n >= 0) {
ctor->output_size += n;
- } else if (n == 0) {
- break;
} else {
- if (n != PN_EOS) {
- fprintf(stderr, "error in process_output: %s\n", pn_code(n));
- }
ctor->output_done = true;
- break;
}
}
@@ -622,43 +526,6 @@ static void pn_connector_write(pn_connec
ctor->status &= ~PN_SEL_WR;
}
-static ssize_t pn_connector_write_sasl_header(pn_connector_t *ctor)
-{
- if (ctor->trace & PN_TRACE_FRM)
- fprintf(stderr, " -> AMQP SASL 1.0\n");
- memmove(pn_connector_output(ctor), "AMQP\x03\x01\x00\x00", 8);
- ctor->process_output = pn_connector_write_sasl;
- return 8;
-}
-
-static ssize_t pn_connector_write_sasl(pn_connector_t *ctor)
-{
- pn_sasl_t *sasl = ctor->sasl;
- ssize_t n = pn_sasl_output(sasl, pn_connector_output(ctor), pn_connector_available(ctor));
- if (n == PN_EOS) {
- ctor->process_output = pn_connector_write_amqp_header;
- return ctor->process_output(ctor);
- } else {
- return n;
- }
-}
-
-static ssize_t pn_connector_write_amqp_header(pn_connector_t *ctor)
-{
- if (ctor->trace & PN_TRACE_FRM)
- fprintf(stderr, " -> AMQP 1.0\n");
- memmove(pn_connector_output(ctor), "AMQP\x00\x01\x00\x00", 8);
- ctor->process_output = pn_connector_write_amqp;
- return 8;
-}
-
-static ssize_t pn_connector_write_amqp(pn_connector_t *ctor)
-{
- if (!ctor->transport) return 0;
- pn_transport_t *transport = ctor->transport;
- return pn_output(transport, pn_connector_output(ctor), pn_connector_available(ctor));
-}
-
static time_t pn_connector_tick(pn_connector_t *ctor, time_t now)
{
if (!ctor->transport) return 0;
Modified: qpid/proton/trunk/proton-c/src/engine/engine-internal.h
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/engine/engine-internal.h?rev=1381839&r1=1381838&r2=1381839&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/engine/engine-internal.h (original)
+++ qpid/proton/trunk/proton-c/src/engine/engine-internal.h Fri Sep 7 01:19:16 2012
@@ -85,15 +85,22 @@ typedef struct {
#define SCRATCH (1024)
+#include <proton/sasl.h>
+
struct pn_transport_t {
- pn_endpoint_t endpoint;
+ ssize_t (*process_input)(pn_transport_t *, char *, size_t);
+ ssize_t (*process_output)(pn_transport_t *, char *, size_t);
+ size_t header_count;
+ pn_sasl_t *sasl;
pn_connection_t *connection;
pn_dispatcher_t *disp;
bool open_sent;
bool open_rcvd;
bool close_sent;
bool close_rcvd;
- int error;
+ char *remote_container;
+ char *remote_hostname;
+ pn_error_t *error;
pn_session_state_t *sessions;
size_t session_capacity;
pn_session_state_t **channels;
@@ -118,8 +125,6 @@ struct pn_connection_t {
pn_delivery_t *tpwork_tail;
char *container;
char *hostname;
- char *remote_container;
- char *remote_hostname;
};
struct pn_session_t {
@@ -201,5 +206,6 @@ void pn_link_dump(pn_link_t *link);
}
void pn_dump(pn_connection_t *conn);
+void pn_transport_sasl_init(pn_transport_t *transport);
#endif /* engine-internal.h */
Modified: qpid/proton/trunk/proton-c/src/engine/engine.c
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/engine/engine.c?rev=1381839&r1=1381838&r2=1381839&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/engine/engine.c (original)
+++ qpid/proton/trunk/proton-c/src/engine/engine.c Fri Sep 7 01:19:16 2012
@@ -29,6 +29,8 @@
#include <stdarg.h>
#include <stdio.h>
+#include "../sasl/sasl-internal.h"
+
// delivery buffers
void pn_delivery_buffer_init(pn_delivery_buffer_t *db, pn_sequence_t next, size_t capacity)
@@ -187,18 +189,18 @@ void pn_connection_close(pn_connection_t
if (connection) pn_close((pn_endpoint_t *) connection);
}
+void pn_endpoint_tini(pn_endpoint_t *endpoint);
+
void pn_connection_free(pn_connection_t *connection)
{
if (!connection) return;
- pn_transport_free(connection->transport);
while (connection->session_count)
pn_session_free(connection->sessions[connection->session_count - 1]);
free(connection->sessions);
free(connection->container);
free(connection->hostname);
- free(connection->remote_container);
- free(connection->remote_hostname);
+ pn_endpoint_tini(&connection->endpoint);
free(connection);
}
@@ -216,6 +218,7 @@ void pn_transport_free(pn_transport_t *t
{
if (!transport) return;
+ pn_sasl_free(transport->sasl);
pn_dispatcher_free(transport->disp);
for (int i = 0; i < transport->session_capacity; i++) {
pn_delivery_buffer_free(&transport->sessions[i].incoming);
@@ -223,6 +226,9 @@ void pn_transport_free(pn_transport_t *t
free(transport->sessions[i].links);
free(transport->sessions[i].handles);
}
+ free(transport->remote_container);
+ free(transport->remote_hostname);
+ pn_error_free(transport->error);
free(transport->sessions);
free(transport->channels);
free(transport);
@@ -273,6 +279,7 @@ void pn_session_free(pn_session_t *sessi
pn_link_free(session->links[session->link_count - 1]);
pn_remove_session(session->connection, session);
free(session->links);
+ pn_endpoint_tini(&session->endpoint);
free(session);
}
@@ -354,6 +361,7 @@ void pn_link_free(pn_link_t *link)
pn_free_delivery(d);
}
free(link->name);
+ pn_endpoint_tini(&link->endpoint);
free(link);
}
@@ -371,6 +379,11 @@ void pn_endpoint_init(pn_endpoint_t *end
LL_ADD(conn, endpoint, endpoint);
}
+void pn_endpoint_tini(pn_endpoint_t *endpoint)
+{
+ pn_error_free(endpoint->error);
+}
+
pn_connection_t *pn_connection()
{
pn_connection_t *conn = malloc(sizeof(pn_connection_t));
@@ -391,8 +404,6 @@ pn_connection_t *pn_connection()
conn->tpwork_tail = NULL;
conn->container = NULL;
conn->hostname = NULL;
- conn->remote_container = NULL;
- conn->remote_hostname = NULL;
return conn;
}
@@ -433,12 +444,14 @@ void pn_connection_set_hostname(pn_conne
const char *pn_connection_remote_container(pn_connection_t *connection)
{
- return connection ? connection->remote_container : NULL;
+ if (!connection) return NULL;
+ return connection->transport ? connection->transport->remote_container : NULL;
}
const char *pn_connection_remote_hostname(pn_connection_t *connection)
{
- return connection ? connection->remote_hostname : NULL;
+ if (!connection) return NULL;
+ return connection->transport ? connection->transport->remote_hostname : NULL;
}
pn_delivery_t *pn_work_head(pn_connection_t *connection)
@@ -655,10 +668,21 @@ int pn_do_detach(pn_dispatcher_t *disp);
int pn_do_end(pn_dispatcher_t *disp);
int pn_do_close(pn_dispatcher_t *disp);
+static ssize_t pn_input_read_sasl_header(pn_transport_t *transport, char *bytes, size_t available);
+static ssize_t pn_input_read_sasl(pn_transport_t *transport, char *bytes, size_t available);
+static ssize_t pn_input_read_amqp_header(pn_transport_t *transport, char *bytes, size_t available);
+static ssize_t pn_input_read_amqp(pn_transport_t *transport, char *bytes, size_t available);
+static ssize_t pn_output_write_sasl_header(pn_transport_t *transport, char *bytes, size_t available);
+static ssize_t pn_output_write_sasl(pn_transport_t *transport, char *bytes, size_t available);
+static ssize_t pn_output_write_amqp_header(pn_transport_t *transport, char *bytes, size_t available);
+static ssize_t pn_output_write_amqp(pn_transport_t *transport, char *bytes, size_t available);
+
void pn_transport_init(pn_transport_t *transport)
{
- pn_endpoint_init(&transport->endpoint, TRANSPORT, transport->connection);
-
+ transport->process_input = pn_input_read_amqp_header;
+ transport->process_output = pn_output_write_amqp_header;
+ transport->header_count = 0;
+ transport->sasl = NULL;
transport->disp = pn_dispatcher(0, transport);
pn_dispatcher_action(transport->disp, OPEN, "OPEN", pn_do_open);
@@ -675,7 +699,9 @@ void pn_transport_init(pn_transport_t *t
transport->open_rcvd = false;
transport->close_sent = false;
transport->close_rcvd = false;
- transport->error = 0;
+ transport->remote_container = NULL;
+ transport->remote_hostname = NULL;
+ transport->error = pn_error();
transport->sessions = NULL;
transport->session_capacity = 0;
@@ -716,25 +742,41 @@ void pn_map_channel(pn_transport_t *tran
transport->channels[channel] = state;
}
-pn_transport_t *pn_transport(pn_connection_t *conn)
+pn_transport_t *pn_transport()
{
- if (!conn) return NULL;
+ pn_transport_t *transport = malloc(sizeof(pn_transport_t));
+ if (!transport) return NULL;
- if (conn->transport) {
- return NULL;
- } else {
- conn->transport = malloc(sizeof(pn_transport_t));
- if (!conn->transport) return NULL;
+ transport->connection = NULL;
+ pn_transport_init(transport);
+ return transport;
+}
+
+void pn_transport_sasl_init(pn_transport_t *transport)
+{
+ transport->process_input = pn_input_read_sasl_header;
+ transport->process_output = pn_output_write_sasl_header;
+}
- conn->transport->connection = conn;
- pn_transport_init(conn->transport);
- return conn->transport;
+int pn_transport_bind(pn_transport_t *transport, pn_connection_t *connection)
+{
+ if (!transport) return PN_ARG_ERR;
+ if (transport->connection) return PN_STATE_ERR;
+ if (connection->transport) return PN_STATE_ERR;
+ transport->connection = connection;
+ connection->transport = transport;
+ if (transport->open_rcvd) {
+ PN_SET_REMOTE(connection->endpoint.state, PN_REMOTE_ACTIVE);
+ if (!pn_error_code(transport->error)) {
+ transport->disp->halt = false;
+ }
}
+ return 0;
}
pn_error_t *pn_transport_error(pn_transport_t *transport)
{
- return transport->endpoint.error;
+ return transport->error;
}
void pn_link_init(pn_link_t *link, int type, pn_session_t *session, const char *name)
@@ -1047,13 +1089,13 @@ int pn_do_error(pn_transport_t *transpor
// XXX: result
vsnprintf(buf, 1024, fmt, ap);
va_end(ap);
- pn_error_set(transport->endpoint.error, PN_ERR, buf);
+ pn_error_set(transport->error, PN_ERR, buf);
if (!transport->close_sent) {
pn_post_close(transport);
transport->close_sent = true;
}
transport->disp->halt = true;
- fprintf(stderr, "ERROR %s %s\n", condition, pn_error_text(transport->endpoint.error));
+ fprintf(stderr, "ERROR %s %s\n", condition, pn_error_text(transport->error));
return PN_ERR;
}
@@ -1072,16 +1114,20 @@ int pn_do_open(pn_dispatcher_t *disp)
&hostname_q, &remote_hostname);
if (err) return err;
if (container_q) {
- conn->remote_container = pn_bytes_strdup(remote_container);
+ transport->remote_container = pn_bytes_strdup(remote_container);
} else {
- conn->remote_container = NULL;
+ transport->remote_container = NULL;
}
if (hostname_q) {
- conn->remote_hostname = pn_bytes_strdup(remote_hostname);
+ transport->remote_hostname = pn_bytes_strdup(remote_hostname);
+ } else {
+ transport->remote_hostname = NULL;
+ }
+ if (conn) {
+ PN_SET_REMOTE(conn->endpoint.state, PN_REMOTE_ACTIVE);
} else {
- conn->remote_hostname = NULL;
+ transport->disp->halt = true;
}
- PN_SET_REMOTE(conn->endpoint.state, PN_REMOTE_ACTIVE);
transport->open_rcvd = true;
return 0;
}
@@ -1382,29 +1428,103 @@ ssize_t pn_input(pn_transport_t *transpo
{
if (!transport) return PN_ARG_ERR;
- if (!available) {
- pn_do_error(transport, "amqp:connection:framing-error", "connection aborted");
- if (transport->disp->trace & (PN_TRACE_RAW | PN_TRACE_FRM))
- fprintf(stderr, " <- EOS\n");
- return PN_ERR;
+ size_t consumed = 0;
+
+ while (true) {
+ ssize_t n = transport->process_input(transport, bytes + consumed, available - consumed);
+ if (n > 0) {
+ consumed += n;
+ if (consumed >= available) {
+ break;
+ }
+ } else if (n == 0) {
+ break;
+ } else {
+ if (transport->disp->trace & (PN_TRACE_RAW | PN_TRACE_FRM))
+ pn_dispatcher_trace(transport->disp, 0, "<- EOS\n");
+ return n;
+ }
+ }
+
+ return consumed;
+}
+
+#define SASL_HEADER ("AMQP\x03\x01\x00\x00")
+
+static ssize_t pn_input_read_header(pn_transport_t *transport, char *bytes, size_t available,
+ const char *header, size_t size, const char *protocol,
+ ssize_t (*next)(pn_transport_t *, char *, size_t))
+{
+ const char *point = header + transport->header_count;
+ int delta = pn_min(available, size - transport->header_count);
+ if (!available || memcmp(bytes, point, delta)) {
+ char quoted[1024];
+ pn_quote_data(quoted, 1024, bytes, available);
+ return pn_error_format(transport->error, PN_ERR,
+ "%s header missmatch: '%s'", protocol, quoted);
+ } else {
+ transport->header_count += delta;
+ if (transport->header_count == size) {
+ transport->header_count = 0;
+ transport->process_input = next;
+
+ if (transport->disp->trace & PN_TRACE_FRM)
+ fprintf(stderr, " <- %s\n", protocol);
+ }
+ return delta;
+ }
+}
+
+static ssize_t pn_input_read_sasl_header(pn_transport_t *transport, char *bytes, size_t available)
+{
+ return pn_input_read_header(transport, bytes, available, SASL_HEADER, 8, "SASL", pn_input_read_sasl);
+}
+
+static ssize_t pn_input_read_sasl(pn_transport_t *transport, char *bytes, size_t available)
+{
+ pn_sasl_t *sasl = transport->sasl;
+ ssize_t n = pn_sasl_input(sasl, bytes, available);
+ if (n == PN_EOS) {
+ transport->process_input = pn_input_read_amqp_header;
+ return transport->process_input(transport, bytes, available);
+ } else {
+ return n;
}
+}
+#define AMQP_HEADER ("AMQP\x00\x01\x00\x00")
+
+static ssize_t pn_input_read_amqp_header(pn_transport_t *transport, char *bytes, size_t available)
+{
+ return pn_input_read_header(transport, bytes, available, AMQP_HEADER, 8,
+ "AMQP", pn_input_read_amqp);
+}
+
+static ssize_t pn_input_read_amqp(pn_transport_t *transport, char *bytes, size_t available)
+{
if (transport->close_rcvd) {
- pn_do_error(transport, "amqp:connection:framing-error", "data after close");
- if (transport->disp->trace & (PN_TRACE_RAW | PN_TRACE_FRM))
- fprintf(stderr, " <- EOS\n");
+ if (available > 0) {
+ pn_do_error(transport, "amqp:connection:framing-error", "data after close");
+ return PN_ERR;
+ } else {
+ return PN_EOS;
+ }
+ }
+
+ if (!available) {
+ pn_do_error(transport, "amqp:connection:framing-error", "connection aborted");
return PN_ERR;
}
+
ssize_t n = pn_dispatcher_input(transport->disp, bytes, available);
- if (n >= 0 && transport->close_rcvd) {
- if (transport->disp->trace & (PN_TRACE_RAW | PN_TRACE_FRM))
- fprintf(stderr, " <- EOS\n");
+ if (n < 0) {
+ return pn_error_set(transport->error, n, "dispatch error");
+ } else if (transport->close_rcvd) {
return PN_EOS;
- } else if (n < 0) {
- transport->error = n;
+ } else {
+ return n;
}
- return n;
}
bool pn_delivery_buffered(pn_delivery_t *delivery)
@@ -1801,18 +1921,60 @@ int pn_process(pn_transport_t *transport
return 0;
}
-ssize_t pn_output(pn_transport_t *transport, char *bytes, size_t size)
+static ssize_t pn_output_write_header(pn_transport_t *transport,
+ char *bytes, size_t size,
+ const char *header, size_t hdrsize,
+ const char *protocol,
+ ssize_t (*next)(pn_transport_t *, char *, size_t))
+{
+ if (transport->disp->trace & PN_TRACE_FRM)
+ fprintf(stderr, " -> %s\n", protocol);
+ if (size >= hdrsize) {
+ memmove(bytes, header, hdrsize);
+ transport->process_output = next;
+ return hdrsize;
+ } else {
+ return pn_error_format(transport->error, PN_UNDERFLOW, "underflow writing %s header", protocol);
+ }
+}
+
+static ssize_t pn_output_write_sasl_header(pn_transport_t *transport, char *bytes, size_t size)
{
- if (!transport) return PN_ARG_ERR;
+ return pn_output_write_header(transport, bytes, size, SASL_HEADER, 8, "SASL",
+ pn_output_write_sasl);
+}
+
+static ssize_t pn_output_write_sasl(pn_transport_t *transport, char *bytes, size_t size)
+{
+ pn_sasl_t *sasl = transport->sasl;
+ ssize_t n = pn_sasl_output(sasl, bytes, size);
+ if (n == PN_EOS) {
+ transport->process_output = pn_output_write_amqp_header;
+ return 0;
+ } else {
+ return n;
+ }
+}
+
+static ssize_t pn_output_write_amqp_header(pn_transport_t *transport, char *bytes, size_t size)
+{
+ return pn_output_write_header(transport, bytes, size, AMQP_HEADER, 8, "AMQP",
+ pn_output_write_amqp);
+}
- if (!transport->error)
- transport->error = pn_process(transport);
+static ssize_t pn_output_write_amqp(pn_transport_t *transport, char *bytes, size_t size)
+{
+ if (!transport->connection) {
+ return 0;
+ }
+
+ if (!pn_error_code(transport->error)) {
+ pn_error_set(transport->error, pn_process(transport), "process error");
+ }
- if (!transport->disp->available && (transport->close_sent || transport->error)) {
- if (transport->disp->trace & (PN_TRACE_RAW | PN_TRACE_FRM))
- fprintf(stderr, " -> EOS\n");
- if (transport->error)
- return transport->error;
+ if (!transport->disp->available && (transport->close_sent || pn_error_code(transport->error))) {
+ if (pn_error_code(transport->error))
+ return pn_error_code(transport->error);
else
return PN_EOS;
}
@@ -1820,8 +1982,40 @@ ssize_t pn_output(pn_transport_t *transp
return pn_dispatcher_output(transport->disp, bytes, size);
}
+ssize_t pn_output(pn_transport_t *transport, char *bytes, size_t size)
+{
+ if (!transport) return PN_ARG_ERR;
+
+ size_t total = 0;
+
+ while (size - total > 0) {
+ ssize_t n = transport->process_output(transport, bytes + total, size - total);
+ if (n > 0) {
+ total += n;
+ } else if (n == 0) {
+ break;
+ } else if (n == PN_EOS) {
+ if (total > 0) {
+ return total;
+ } else {
+ if (transport->disp->trace & (PN_TRACE_RAW | PN_TRACE_FRM))
+ pn_dispatcher_trace(transport->disp, 0, "-> EOS\n");
+ return PN_EOS;
+ }
+ } else {
+ if (transport->disp->trace & (PN_TRACE_RAW | PN_TRACE_FRM))
+ pn_dispatcher_trace(transport->disp, 0, "-> EOS (%zi) %s\n", n,
+ pn_error_text(transport->error));
+ return n;
+ }
+ }
+
+ return total;
+}
+
void pn_trace(pn_transport_t *transport, pn_trace_t trace)
{
+ if (transport->sasl) pn_sasl_trace(transport->sasl, trace);
transport->disp->trace = trace;
}
Modified: qpid/proton/trunk/proton-c/src/error.c
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/error.c?rev=1381839&r1=1381838&r2=1381839&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/error.c (original)
+++ qpid/proton/trunk/proton-c/src/error.c Fri Sep 7 01:19:16 2012
@@ -62,8 +62,10 @@ void pn_error_clear(pn_error_t *error)
int pn_error_set(pn_error_t *error, int code, const char *text)
{
pn_error_clear(error);
- error->code = code;
- error->text = pn_strdup(text);
+ if (code) {
+ error->code = code;
+ error->text = pn_strdup(text);
+ }
return code;
}
Modified: qpid/proton/trunk/proton-c/src/message/message.c
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/message/message.c?rev=1381839&r1=1381838&r2=1381839&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/message/message.c (original)
+++ qpid/proton/trunk/proton-c/src/message/message.c Fri Sep 7 01:19:16 2012
@@ -113,6 +113,7 @@ void pn_message_free(pn_message_t *msg)
pn_buffer_free(msg->reply_to_group_id);
pn_data_free(msg->data);
pn_data_free(msg->body);
+ pn_parser_free(msg->parser);
free(msg);
}
}
Modified: qpid/proton/trunk/proton-c/src/messenger.c
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/messenger.c?rev=1381839&r1=1381838&r2=1381839&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/messenger.c (original)
+++ qpid/proton/trunk/proton-c/src/messenger.c Fri Sep 7 01:19:16 2012
@@ -32,9 +32,6 @@ struct pn_messenger_t {
char *name;
int timeout;
pn_driver_t *driver;
- pn_connector_t *connectors[1024];
- size_t size;
- size_t listeners;
int credit;
uint64_t next_tag;
pn_error_t *error;
@@ -61,8 +58,6 @@ pn_messenger_t *pn_messenger(const char
m->name = build_name(name);
m->timeout = -1;
m->driver = pn_driver();
- m->size = 0;
- m->listeners = 0;
m->credit = 0;
m->next_tag = 0;
m->error = pn_error();
@@ -94,6 +89,7 @@ void pn_messenger_free(pn_messenger_t *m
free(messenger->name);
pn_driver_free(messenger->driver);
pn_error_free(messenger->error);
+ free(messenger);
}
}
@@ -119,8 +115,8 @@ void pn_messenger_flow(pn_messenger_t *m
{
while (messenger->credit > 0) {
int prev = messenger->credit;
- for (int i = 0; i < messenger->size; i++) {
- pn_connector_t *ctor = messenger->connectors[i];
+ pn_connector_t *ctor = pn_connector_head(messenger->driver);
+ while (ctor) {
pn_connection_t *conn = pn_connector_connection(ctor);
pn_link_t *link = pn_link_head(conn, PN_LOCAL_ACTIVE);
@@ -131,6 +127,8 @@ void pn_messenger_flow(pn_messenger_t *m
}
link = pn_link_next(link, PN_LOCAL_ACTIVE);
}
+
+ ctor = pn_connector_next(ctor);
}
if (messenger->credit == prev) break;
}
@@ -193,8 +191,10 @@ long int millis(struct timeval tv)
int pn_messenger_tsync(pn_messenger_t *messenger, bool (*predicate)(pn_messenger_t *), int timeout)
{
- for (int i = 0; i < messenger->size; i++) {
- pn_connector_process(messenger->connectors[i]);
+ pn_connector_t *ctor = pn_connector_head(messenger->driver);
+ while (ctor) {
+ pn_connector_process(ctor);
+ ctor = pn_connector_next(ctor);
}
struct timeval now;
@@ -219,7 +219,6 @@ int pn_messenger_tsync(pn_messenger_t *m
pn_connection_t *conn = pn_connection();
pn_connection_set_container(conn, messenger->name);
pn_connector_set_connection(c, conn);
- messenger->connectors[messenger->size++] = c;
}
pn_connector_t *c;
@@ -228,17 +227,10 @@ int pn_messenger_tsync(pn_messenger_t *m
pn_connection_t *conn = pn_connector_connection(c);
pn_messenger_endpoints(messenger, conn);
if (pn_connector_closed(c)) {
- for (int i = 0; i < messenger->size; i++) {
- if (c == messenger->connectors[i]) {
- memmove(messenger->connectors + i, messenger->connectors + i + 1, messenger->size - i - 1);
- messenger->size--;
- pn_connector_free(c);
- pn_messenger_reclaim(messenger, conn);
- pn_connection_free(conn);
- pn_messenger_flow(messenger);
- break;
- }
- }
+ pn_connector_free(c);
+ pn_messenger_reclaim(messenger, conn);
+ pn_connection_free(conn);
+ pn_messenger_flow(messenger);
} else {
pn_connector_process(c);
}
@@ -266,15 +258,15 @@ int pn_messenger_start(pn_messenger_t *m
bool pn_messenger_stopped(pn_messenger_t *messenger)
{
- return messenger->size == 0;
+ return pn_connector_head(messenger->driver) == NULL;
}
int pn_messenger_stop(pn_messenger_t *messenger)
{
if (!messenger) return PN_ARG_ERR;
- for (int i = 0; i < messenger->size; i++) {
- pn_connector_t *ctor = messenger->connectors[i];
+ pn_connector_t *ctor = pn_connector_head(messenger->driver);
+ while (ctor) {
pn_connection_t *conn = pn_connector_connection(ctor);
pn_link_t *link = pn_link_head(conn, PN_LOCAL_ACTIVE);
while (link) {
@@ -282,6 +274,7 @@ int pn_messenger_stop(pn_messenger_t *me
link = pn_link_next(link, PN_LOCAL_ACTIVE);
}
pn_connection_close(conn);
+ ctor = pn_connector_next(ctor);
}
pn_listener_t *l = pn_listener_head(messenger->driver);
@@ -334,17 +327,18 @@ pn_connection_t *pn_messenger_domain(pn_
char *port = "5672";
parse_url(buf, &user, &pass, &host, &port);
- for (int i = 0; i < messenger->size; i++) {
- pn_connection_t *connection = pn_connector_connection(messenger->connectors[i]);
+ pn_connector_t *ctor = pn_connector_head(messenger->driver);
+ while (ctor) {
+ pn_connection_t *connection = pn_connector_connection(ctor);
const char *container = pn_connection_remote_container(connection);
const char *hostname = pn_connection_hostname(connection);
if (pn_streq(container, domain) || pn_streq(hostname, domain))
return connection;
+ ctor = pn_connector_next(ctor);
}
pn_connector_t *connector = pn_connector(messenger->driver, host, port, NULL);
if (!connector) return NULL;
- messenger->connectors[messenger->size++] = connector;
pn_sasl_t *sasl = pn_connector_sasl(connector);
if (user) {
pn_sasl_plain(sasl, user, pass);
@@ -423,11 +417,7 @@ pn_listener_t *pn_messenger_isource(pn_m
char *port = "5672";
parse_url(domain + 1, &user, &pass, &host, &port);
- pn_listener_t *listener = pn_listener(messenger->driver, host, port, NULL);
- if (listener) {
- messenger->listeners++;
- }
- return listener;
+ return pn_listener(messenger->driver, host, port, NULL);
}
int pn_messenger_subscribe(pn_messenger_t *messenger, const char *source)
@@ -516,8 +506,8 @@ int pn_messenger_put(pn_messenger_t *mes
bool pn_messenger_sent(pn_messenger_t *messenger)
{
- for (int i = 0; i < messenger->size; i++) {
- pn_connector_t *ctor = messenger->connectors[i];
+ pn_connector_t *ctor = pn_connector_head(messenger->driver);
+ while (ctor) {
pn_connection_t *conn = pn_connector_connection(ctor);
pn_link_t *link = pn_link_head(conn, PN_LOCAL_ACTIVE);
@@ -535,6 +525,8 @@ bool pn_messenger_sent(pn_messenger_t *m
}
link = pn_link_next(link, PN_LOCAL_ACTIVE);
}
+
+ ctor = pn_connector_next(ctor);
}
return true;
@@ -542,8 +534,8 @@ bool pn_messenger_sent(pn_messenger_t *m
bool pn_messenger_rcvd(pn_messenger_t *messenger)
{
- for (int i = 0; i < messenger->size; i++) {
- pn_connector_t *ctor = messenger->connectors[i];
+ pn_connector_t *ctor = pn_connector_head(messenger->driver);
+ while (ctor) {
pn_connection_t *conn = pn_connector_connection(ctor);
pn_delivery_t *d = pn_work_head(conn);
@@ -553,6 +545,7 @@ bool pn_messenger_rcvd(pn_messenger_t *m
}
d = pn_work_next(d);
}
+ ctor = pn_connector_next(ctor);
}
return false;
@@ -566,7 +559,7 @@ int pn_messenger_send(pn_messenger_t *me
int pn_messenger_recv(pn_messenger_t *messenger, int n)
{
if (!messenger) return PN_ARG_ERR;
- if (!messenger->listeners && !messenger->size)
+ if (!pn_listener_head(messenger->driver) && !pn_connector_head(messenger->driver))
return pn_error_format(messenger->error, PN_STATE_ERR, "no valid sources");
messenger->credit += n;
pn_messenger_flow(messenger);
@@ -577,8 +570,8 @@ int pn_messenger_get(pn_messenger_t *mes
{
if (!messenger) return PN_ARG_ERR;
- for (int i = 0; i < messenger->size; i++) {
- pn_connector_t *ctor = messenger->connectors[i];
+ pn_connector_t *ctor = pn_connector_head(messenger->driver);
+ while (ctor) {
pn_connection_t *conn = pn_connector_connection(ctor);
pn_delivery_t *d = pn_work_head(conn);
@@ -604,6 +597,8 @@ int pn_messenger_get(pn_messenger_t *mes
}
d = pn_work_next(d);
}
+
+ ctor = pn_connector_next(ctor);
}
// XXX: need to drain credit before returning EOS
@@ -617,8 +612,8 @@ int pn_messenger_queued(pn_messenger_t *
int result = 0;
- for (int i = 0; i < messenger->size; i++) {
- pn_connector_t *ctor = messenger->connectors[i];
+ pn_connector_t *ctor = pn_connector_head(messenger->driver);
+ while (ctor) {
pn_connection_t *conn = pn_connector_connection(ctor);
pn_link_t *link = pn_link_head(conn, PN_LOCAL_ACTIVE);
@@ -632,6 +627,7 @@ int pn_messenger_queued(pn_messenger_t *
}
link = pn_link_next(link, PN_LOCAL_ACTIVE);
}
+ ctor = pn_connector_next(ctor);
}
return result;
Added: qpid/proton/trunk/proton-c/src/sasl/sasl-internal.h
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/sasl/sasl-internal.h?rev=1381839&view=auto
==============================================================================
--- qpid/proton/trunk/proton-c/src/sasl/sasl-internal.h (added)
+++ qpid/proton/trunk/proton-c/src/sasl/sasl-internal.h Fri Sep 7 01:19:16 2012
@@ -0,0 +1,39 @@
+#ifndef PROTON_SASL_INTERNAL_H
+#define PROTON_SASL_INTERNAL_H 1
+
+#include <proton/sasl.h>
+
+/** Decode input data bytes into SASL frames, and process them.
+ *
+ * This function is called by the driver layer to pass data received
+ * from the remote peer into the SASL layer.
+ *
+ * @param[in] sasl the SASL layer.
+ * @param[in] bytes buffer of frames to process
+ * @param[in] available number of octets of data in 'bytes'
+ * @return the number of bytes consumed, or error code if < 0
+ */
+ssize_t pn_sasl_input(pn_sasl_t *sasl, char *bytes, size_t available);
+
+/** Gather output frames from the layer.
+ *
+ * This function is used by the driver to poll the SASL layer for data
+ * that will be sent to the remote peer.
+ *
+ * @param[in] sasl The SASL layer.
+ * @param[out] bytes to be filled with encoded frames.
+ * @param[in] size space available in bytes array.
+ * @return the number of octets written to bytes, or error code if < 0
+ */
+ssize_t pn_sasl_output(pn_sasl_t *sasl, char *bytes, size_t size);
+
+void pn_sasl_trace(pn_sasl_t *sasl, pn_trace_t trace);
+
+/** Destructor for the given SASL layer.
+ *
+ * @param[in] sasl the SASL object to free. No longer valid on
+ * return.
+ */
+void pn_sasl_free(pn_sasl_t *sasl);
+
+#endif /* sasl-internal.h */
Modified: qpid/proton/trunk/proton-c/src/sasl/sasl.c
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/sasl/sasl.c?rev=1381839&r1=1381838&r2=1381839&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/sasl/sasl.c (original)
+++ qpid/proton/trunk/proton-c/src/sasl/sasl.c Fri Sep 7 01:19:16 2012
@@ -27,6 +27,7 @@
#include <proton/sasl.h>
#include "protocol.h"
#include "../dispatcher/dispatcher.h"
+#include "../engine/engine-internal.h"
#include "../util.h"
#define SCRATCH (1024)
@@ -53,30 +54,36 @@ int pn_do_challenge(pn_dispatcher_t *dis
int pn_do_response(pn_dispatcher_t *disp);
int pn_do_outcome(pn_dispatcher_t *disp);
-pn_sasl_t *pn_sasl()
+pn_sasl_t *pn_sasl(pn_transport_t *transport)
{
- pn_sasl_t *sasl = malloc(sizeof(pn_sasl_t));
- sasl->disp = pn_dispatcher(1, sasl);
+ if (!transport->sasl) {
+ pn_sasl_t *sasl = malloc(sizeof(pn_sasl_t));
+ sasl->disp = pn_dispatcher(1, sasl);
+
+ pn_dispatcher_action(sasl->disp, SASL_INIT, "SASL-INIT", pn_do_init);
+ pn_dispatcher_action(sasl->disp, SASL_MECHANISMS, "SASL-MECHANISMS", pn_do_mechanisms);
+ pn_dispatcher_action(sasl->disp, SASL_CHALLENGE, "SASL-CHALLENGE", pn_do_challenge);
+ pn_dispatcher_action(sasl->disp, SASL_RESPONSE, "SASL-RESPONSE", pn_do_response);
+ pn_dispatcher_action(sasl->disp, SASL_OUTCOME, "SASL-OUTCOME", pn_do_outcome);
- pn_dispatcher_action(sasl->disp, SASL_INIT, "SASL-INIT", pn_do_init);
- pn_dispatcher_action(sasl->disp, SASL_MECHANISMS, "SASL-MECHANISMS", pn_do_mechanisms);
- pn_dispatcher_action(sasl->disp, SASL_CHALLENGE, "SASL-CHALLENGE", pn_do_challenge);
- pn_dispatcher_action(sasl->disp, SASL_RESPONSE, "SASL-RESPONSE", pn_do_response);
- pn_dispatcher_action(sasl->disp, SASL_OUTCOME, "SASL-OUTCOME", pn_do_outcome);
-
- sasl->client = false;
- sasl->configured = false;
- sasl->mechanisms = NULL;
- sasl->remote_mechanisms = NULL;
- sasl->send_data = (pn_bytes_t) {0, NULL};
- sasl->recv_data = (pn_bytes_t) {0, NULL};
- sasl->outcome = PN_SASL_NONE;
- sasl->sent_init = false;
- sasl->rcvd_init = false;
- sasl->sent_done = false;
- sasl->rcvd_done = false;
+ sasl->client = false;
+ sasl->configured = false;
+ sasl->mechanisms = NULL;
+ sasl->remote_mechanisms = NULL;
+ sasl->send_data = (pn_bytes_t) {0, NULL};
+ sasl->recv_data = (pn_bytes_t) {0, NULL};
+ sasl->outcome = PN_SASL_NONE;
+ sasl->sent_init = false;
+ sasl->rcvd_init = false;
+ sasl->sent_done = false;
+ sasl->rcvd_done = false;
+
+ transport->sasl = sasl;
+
+ pn_transport_sasl_init(transport);
+ }
- return sasl;
+ return transport->sasl;
}
pn_sasl_state_t pn_sasl_state(pn_sasl_t *sasl)
@@ -201,12 +208,14 @@ void pn_sasl_trace(pn_sasl_t *sasl, pn_t
void pn_sasl_free(pn_sasl_t *sasl)
{
- free(sasl->mechanisms);
- free(sasl->remote_mechanisms);
- free(sasl->send_data.start);
- free(sasl->recv_data.start);
- pn_dispatcher_free(sasl->disp);
- free(sasl);
+ if (sasl) {
+ free(sasl->mechanisms);
+ free(sasl->remote_mechanisms);
+ free(sasl->send_data.start);
+ free(sasl->recv_data.start);
+ pn_dispatcher_free(sasl->disp);
+ free(sasl);
+ }
}
void pn_client_init(pn_sasl_t *sasl)
Modified: qpid/proton/trunk/proton-c/src/scanner.c
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/scanner.c?rev=1381839&r1=1381838&r2=1381839&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/scanner.c (original)
+++ qpid/proton/trunk/proton-c/src/scanner.c Fri Sep 7 01:19:16 2012
@@ -81,7 +81,10 @@ pn_scanner_t *pn_scanner()
void pn_scanner_free(pn_scanner_t *scanner)
{
- free(scanner);
+ if (scanner) {
+ pn_error_free(scanner->error);
+ free(scanner);
+ }
}
pn_token_t pn_scanner_token(pn_scanner_t *scanner)
Modified: qpid/proton/trunk/proton-c/src/util.h
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/util.h?rev=1381839&r1=1381838&r2=1381839&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/util.h (original)
+++ qpid/proton/trunk/proton-c/src/util.h Fri Sep 7 01:19:16 2012
@@ -92,4 +92,7 @@ bool pn_env_bool(const char *name);
char *pn_strdup(const char *src);
char *pn_strndup(const char *src, size_t n);
+#define pn_min(X,Y) ((X) > (Y) ? (Y) : (X))
+#define pn_max(X,Y) ((X) < (Y) ? (Y) : (X))
+
#endif /* util.h */
Modified: qpid/proton/trunk/tests/proton_tests/__init__.py
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/tests/proton_tests/__init__.py?rev=1381839&r1=1381838&r2=1381839&view=diff
==============================================================================
--- qpid/proton/trunk/tests/proton_tests/__init__.py (original)
+++ qpid/proton/trunk/tests/proton_tests/__init__.py Fri Sep 7 01:19:16 2012
@@ -21,3 +21,4 @@ import proton_tests.engine
import proton_tests.message
import proton_tests.messenger
import proton_tests.sasl
+import proton_tests.transport
Modified: qpid/proton/trunk/tests/proton_tests/engine.py
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/tests/proton_tests/engine.py?rev=1381839&r1=1381838&r2=1381839&view=diff
==============================================================================
--- qpid/proton/trunk/tests/proton_tests/engine.py (original)
+++ qpid/proton/trunk/tests/proton_tests/engine.py Fri Sep 7 01:19:16 2012
@@ -55,8 +55,10 @@ class Test(common.Test):
def connection(self):
c1 = pn_connection()
c2 = pn_connection()
- t1 = pn_transport(c1)
- t2 = pn_transport(c2)
+ t1 = pn_transport()
+ pn_transport_bind(t1, c1)
+ t2 = pn_transport()
+ pn_transport_bind(t2, c2)
self._wires.append((c1, t1, c2, t2))
trc = os.environ.get("PN_TRACE_FRM")
if trc and trc.lower() in ("1", "2", "yes", "true"):
@@ -82,7 +84,9 @@ class Test(common.Test):
def cleanup(self):
for c1, t1, c2, t2 in self._wires:
pn_connection_free(c1)
+ pn_transport_free(t1)
pn_connection_free(c2)
+ pn_transport_free(t2)
def pump(self):
for c1, t1, c2, t2 in self._wires:
Modified: qpid/proton/trunk/tests/proton_tests/message.py
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/tests/proton_tests/message.py?rev=1381839&r1=1381838&r2=1381839&view=diff
==============================================================================
--- qpid/proton/trunk/tests/proton_tests/message.py (original)
+++ qpid/proton/trunk/tests/proton_tests/message.py Fri Sep 7 01:19:16 2012
@@ -127,6 +127,8 @@ class CodecTest(Test):
assert not cd, cd
assert saved == body, (body, saved)
+ pn_message_free(msg2)
+
class LoadSaveTest(Test):
Modified: qpid/proton/trunk/tests/proton_tests/messenger.py
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/tests/proton_tests/messenger.py?rev=1381839&r1=1381838&r2=1381839&view=diff
==============================================================================
--- qpid/proton/trunk/tests/proton_tests/messenger.py (original)
+++ qpid/proton/trunk/tests/proton_tests/messenger.py Fri Sep 7 01:19:16 2012
@@ -48,6 +48,7 @@ class Test(common.Test):
pn_messenger_free(self.server)
self.client = None
self.server = None
+ pn_message_free(msg)
class MessengerTest(Test):
@@ -64,6 +65,7 @@ class MessengerTest(Test):
pn_message_set_address(msg, reply_to)
pn_messenger_put(self.server, msg)
pn_messenger_stop(self.server)
+ pn_message_free(msg)
def testSendReceive(self):
msg = pn_message()
@@ -91,4 +93,6 @@ class MessengerTest(Test):
msg = pn_message()
pn_message_set_address(msg, "totally-bogus-address")
assert pn_messenger_put(self.client, msg) == PN_ERR
- assert "unable to send to address: totally-bogus-address (getaddrinfo:" in pn_messenger_error(self.client)
+ err = pn_messenger_error(self.client)
+ assert "unable to send to address: totally-bogus-address (getaddrinfo:" in err, err
+ pn_message_free(msg)
Modified: qpid/proton/trunk/tests/proton_tests/sasl.py
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/tests/proton_tests/sasl.py?rev=1381839&r1=1381838&r2=1381839&view=diff
==============================================================================
--- qpid/proton/trunk/tests/proton_tests/sasl.py (original)
+++ qpid/proton/trunk/tests/proton_tests/sasl.py Fri Sep 7 01:19:16 2012
@@ -26,36 +26,34 @@ class Test(common.Test):
class SaslTest(Test):
def testPipelined(self):
- cli = pn_sasl()
- pn_sasl_mechanisms(cli, "ANONYMOUS")
- pn_sasl_client(cli)
-
- srv = pn_sasl()
- pn_sasl_mechanisms(srv, "ANONYMOUS")
- pn_sasl_server(srv)
- pn_sasl_done(srv, PN_SASL_OK)
+ cli = pn_transport()
+ cli_auth = pn_sasl(cli)
+ pn_sasl_mechanisms(cli_auth, "ANONYMOUS")
+ pn_sasl_client(cli_auth)
+
+ assert pn_sasl_outcome(cli_auth) == PN_SASL_NONE
+
+ srv = pn_transport()
+ srv_auth = pn_sasl(srv)
+ pn_sasl_mechanisms(srv_auth, "ANONYMOUS")
+ pn_sasl_server(srv_auth)
+ pn_sasl_done(srv_auth, PN_SASL_OK)
- cli_code, cli_out = pn_sasl_output(cli, 1024)
- srv_code, srv_out = pn_sasl_output(srv, 1024)
+ cli_code, cli_out = pn_output(cli, 1024)
+ srv_code, srv_out = pn_output(srv, 1024)
assert cli_code > 0, cli_code
assert srv_code > 0, srv_code
- dummy_header = "AMQPxxxx"
+ n = pn_input(srv, cli_out)
+ assert n == len(cli_out), "(%s) %s" % (n, pn_error_text(pn_transport_error(srv)))
- srv_out += dummy_header
- cli_out += dummy_header
+ assert pn_sasl_outcome(cli_auth) == PN_SASL_NONE
- n = pn_sasl_input(srv, cli_out)
- assert n > 0, n
- cli_out = cli_out[n:]
- assert cli_out == dummy_header
- n = pn_sasl_input(srv, cli_out)
- assert n == PN_EOS, n
-
- n = pn_sasl_input(cli, srv_out)
- assert n > 0, n
- srv_out = srv_out[n:]
- assert srv_out == dummy_header
- n = pn_sasl_input(cli, srv_out)
- assert n == PN_EOS, n
+ n = pn_input(cli, srv_out)
+ assert n == len(srv_out), n
+
+ assert pn_sasl_outcome(cli_auth) == PN_SASL_OK
+
+ pn_transport_free(cli)
+ pn_transport_free(srv)
Added: qpid/proton/trunk/tests/proton_tests/transport.py
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/tests/proton_tests/transport.py?rev=1381839&view=auto
==============================================================================
--- qpid/proton/trunk/tests/proton_tests/transport.py (added)
+++ qpid/proton/trunk/tests/proton_tests/transport.py Fri Sep 7 01:19:16 2012
@@ -0,0 +1,105 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import os, common, xproton
+from xproton import *
+
+class Test(common.Test):
+ pass
+
+class TransportTest(Test):
+
+ def setup(self):
+ self.transport = pn_transport()
+
+ def teardown(self):
+ pn_transport_free(self.transport)
+ self.transport = None
+
+ def testEOS(self):
+ n = pn_input(self.transport, "")
+ assert n == PN_ERR, pn_error_text(pn_transport_error(self.transport))
+
+ def testPartial(self):
+ n = pn_input(self.transport, "AMQ")
+ assert n == 3, n
+ n = pn_input(self.transport, "")
+ assert n == PN_ERR, n
+
+ def testGarbage(self):
+ n = pn_input(self.transport, "GARBAGE_")
+ assert n == PN_ERR, pn_error_text(pn_transport_error(self.transport))
+ n = pn_input(self.transport, "")
+ assert n == PN_ERR, n
+
+ def testSmallGarbage(self):
+ n = pn_input(self.transport, "XXX")
+ assert n == PN_ERR, pn_error_text(pn_transport_error(self.transport))
+ n = pn_input(self.transport, "")
+ assert n == PN_ERR, n
+
+ def testBigGarbage(self):
+ n = pn_input(self.transport, "GARBAGE_XXX")
+ assert n == PN_ERR, pn_error_text(pn_transport_error(self.transport))
+ n = pn_input(self.transport, "")
+ assert n == PN_ERR, n
+
+ def testHeader(self):
+ n = pn_input(self.transport, "AMQP\x00\x01\x00\x00")
+ assert n == 8, n
+ n = pn_input(self.transport, "")
+ assert n == PN_ERR, n
+
+ def testOutput(self):
+ n, out = pn_output(self.transport, 1024)
+ assert n == len(out)
+
+ def testBindAfterOpen(self):
+ conn = pn_connection()
+ ssn = pn_session(conn)
+ pn_connection_open(conn)
+ pn_session_open(ssn)
+ pn_connection_set_container(conn, "test-container")
+ pn_connection_set_hostname(conn, "test-hostname")
+ trn = pn_transport()
+ pn_transport_bind(trn, conn)
+ n, out = pn_output(trn, 1024)
+ assert n == len(out), n
+ assert "test-container" in out, repr(out)
+ n = pn_input(self.transport, out)
+ assert n > 0 and n < len(out)
+ out = out[n:]
+
+ n = pn_input(self.transport, out)
+ assert n == 0
+
+ c = pn_connection()
+ assert pn_connection_remote_container(c) == None
+ assert pn_connection_remote_hostname(c) == None
+ pn_transport_bind(self.transport, c)
+ assert pn_connection_remote_container(c) == "test-container"
+ assert pn_connection_remote_hostname(c) == "test-hostname"
+ assert pn_session_head(c, 0) == None
+ n = pn_input(self.transport, out)
+ assert n == len(out), (n, out)
+ assert pn_session_head(c, 0) != None
+
+ pn_transport_free(trn)
+ pn_connection_free(conn)
+ pn_connection_free(c)
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org