You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kg...@apache.org on 2013/01/29 15:38:13 UTC
svn commit: r1439912 - in /qpid/proton/trunk/proton-c/src:
engine/engine-internal.h engine/engine.c sasl/sasl.c ssl/openssl.c
ssl/ssl-internal.h
Author: kgiusti
Date: Tue Jan 29 14:38:13 2013
New Revision: 1439912
URL: http://svn.apache.org/viewvc?rev=1439912&view=rev
Log:
PROTON-152: explicitly separate the I/O layers
Modified:
qpid/proton/trunk/proton-c/src/engine/engine-internal.h
qpid/proton/trunk/proton-c/src/engine/engine.c
qpid/proton/trunk/proton-c/src/sasl/sasl.c
qpid/proton/trunk/proton-c/src/ssl/openssl.c
qpid/proton/trunk/proton-c/src/ssl/ssl-internal.h
Modified: qpid/proton/trunk/proton-c/src/engine/engine-internal.h
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/engine/engine-internal.h?rev=1439912&r1=1439911&r2=1439912&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/engine/engine-internal.h (original)
+++ qpid/proton/trunk/proton-c/src/engine/engine-internal.h Tue Jan 29 14:38:13 2013
@@ -107,10 +107,15 @@ typedef struct {
#include <proton/sasl.h>
#include <proton/ssl.h>
+typedef struct pn_io_layer_t {
+ void *context;
+ struct pn_io_layer_t *next;
+ ssize_t (*process_input)(struct pn_io_layer_t *io_layer, const char *, size_t);
+ ssize_t (*process_output)(struct pn_io_layer_t *io_layer, char *, size_t);
+ pn_timestamp_t (*process_tick)(struct pn_io_layer_t *io_layer, pn_timestamp_t);
+} pn_io_layer_t;
+
struct pn_transport_t {
- ssize_t (*process_input)(pn_transport_t *, const char *, size_t);
- ssize_t (*process_output)(pn_transport_t *, char *, size_t);
- pn_timestamp_t (*process_tick)(pn_transport_t *, pn_timestamp_t);
size_t header_count;
pn_sasl_t *sasl;
pn_ssl_t *ssl;
@@ -128,6 +133,12 @@ struct pn_transport_t {
uint32_t remote_max_frame;
pn_condition_t remote_condition;
+#define PN_IO_SSL 0
+#define PN_IO_SASL 1
+#define PN_IO_AMQP 2
+#define PN_IO_LAYER_CT (PN_IO_AMQP+1)
+ pn_io_layer_t io_layers[PN_IO_LAYER_CT];
+
/* dead remote detection */
pn_millis_t local_idle_timeout;
pn_timestamp_t dead_remote_deadline;
@@ -253,4 +264,8 @@ void pn_link_dump(pn_link_t *link);
void pn_dump(pn_connection_t *conn);
void pn_transport_sasl_init(pn_transport_t *transport);
+ssize_t pn_io_layer_input_passthru(pn_io_layer_t *, const char *, size_t );
+ssize_t pn_io_layer_output_passthru(pn_io_layer_t *, char *, size_t );
+pn_timestamp_t pn_io_layer_tick_passthru(pn_io_layer_t *, pn_timestamp_t);
+
#endif /* engine-internal.h */
Modified: qpid/proton/trunk/proton-c/src/engine/engine.c
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/engine/engine.c?rev=1439912&r1=1439911&r2=1439912&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/engine/engine.c (original)
+++ qpid/proton/trunk/proton-c/src/engine/engine.c Tue Jan 29 14:38:13 2013
@@ -728,26 +728,36 @@ int pn_do_detach(pn_dispatcher_t *disp);
int pn_do_end(pn_dispatcher_t *disp);
int pn_do_close(pn_dispatcher_t *disp);
-static ssize_t pn_input_read_sasl_header(pn_transport_t *transport, const char *bytes, size_t available);
-static ssize_t pn_input_read_sasl(pn_transport_t *transport, const char *bytes, size_t available);
-static ssize_t pn_input_read_amqp_header(pn_transport_t *transport, const char *bytes, size_t available);
-static ssize_t pn_input_read_amqp(pn_transport_t *transport, const char *bytes, size_t available);
-static ssize_t pn_output_write_sasl_header(pn_transport_t *transport, char *bytes, size_t available);
-static ssize_t pn_output_write_sasl(pn_transport_t *transport, char *bytes, size_t available);
-static ssize_t pn_output_write_amqp_header(pn_transport_t *transport, char *bytes, size_t available);
-static ssize_t pn_output_write_amqp(pn_transport_t *transport, char *bytes, size_t available);
-static pn_timestamp_t pn_process_tick(pn_transport_t *transport, pn_timestamp_t now);
+static ssize_t pn_input_read_amqp_header(pn_io_layer_t *io_layer, const char *bytes, size_t available);
+static ssize_t pn_input_read_amqp(pn_io_layer_t *io_layer, const char *bytes, size_t available);
+static ssize_t pn_output_write_amqp_header(pn_io_layer_t *io_layer, char *bytes, size_t available);
+static ssize_t pn_output_write_amqp(pn_io_layer_t *io_layer, char *bytes, size_t available);
+static pn_timestamp_t pn_tick_amqp(pn_io_layer_t *io_layer, pn_timestamp_t now);
void pn_transport_init(pn_transport_t *transport)
{
- transport->process_input = pn_input_read_amqp_header;
- transport->process_output = pn_output_write_amqp_header;
- transport->process_tick = NULL;
transport->header_count = 0;
transport->sasl = NULL;
transport->ssl = NULL;
transport->disp = pn_dispatcher(0, transport);
+ pn_io_layer_t *io_layer = transport->io_layers;
+ while (io_layer != &transport->io_layers[PN_IO_AMQP]) {
+ io_layer->context = NULL;
+ io_layer->next = io_layer + 1;
+ io_layer->process_input = pn_io_layer_input_passthru;
+ io_layer->process_output = pn_io_layer_output_passthru;
+ io_layer->process_tick = pn_io_layer_tick_passthru;
+ ++io_layer;
+ }
+
+ pn_io_layer_t *amqp = &transport->io_layers[PN_IO_AMQP];
+ amqp->context = transport;
+ amqp->process_input = pn_input_read_amqp_header;
+ amqp->process_output = pn_output_write_amqp_header;
+ amqp->process_tick = pn_io_layer_tick_passthru;
+ amqp->next = NULL;
+
pn_dispatcher_action(transport->disp, OPEN, "OPEN", pn_do_open);
pn_dispatcher_action(transport->disp, BEGIN, "BEGIN", pn_do_begin);
pn_dispatcher_action(transport->disp, ATTACH, "ATTACH", pn_do_attach);
@@ -827,12 +837,6 @@ pn_transport_t *pn_transport()
return transport;
}
-void pn_transport_sasl_init(pn_transport_t *transport)
-{
- transport->process_input = pn_input_read_sasl_header;
- transport->process_output = pn_output_write_sasl_header;
-}
-
int pn_transport_bind(pn_transport_t *transport, pn_connection_t *connection)
{
if (!transport) return PN_ARG_ERR;
@@ -1422,7 +1426,7 @@ int pn_do_open(pn_dispatcher_t *disp)
transport->disp->halt = true;
}
if (transport->remote_idle_timeout)
- transport->process_tick = pn_process_tick; // enable timeouts
+ transport->io_layers[PN_IO_AMQP].process_tick = pn_tick_amqp; // enable timeouts
transport->open_rcvd = true;
return 0;
}
@@ -1812,15 +1816,12 @@ ssize_t pn_transport_input(pn_transport_
{
if (!transport) return PN_ARG_ERR;
+ pn_io_layer_t *io_layer = transport->io_layers;
size_t consumed = 0;
- const bool use_ssl = transport->ssl != NULL;
while (true) {
ssize_t n;
- if (use_ssl)
- n = pn_ssl_input( transport->ssl, bytes + consumed, available - consumed);
- else
- n = transport->process_input(transport, bytes + consumed, available - consumed);
+ n = io_layer->process_input( io_layer, bytes + consumed, available - consumed);
if (n > 0) {
consumed += n;
if (consumed >= available) {
@@ -1844,11 +1845,9 @@ ssize_t pn_transport_input(pn_transport_
return consumed;
}
-#define SASL_HEADER ("AMQP\x03\x01\x00\x00")
-
static ssize_t pn_input_read_header(pn_transport_t *transport, const char *bytes, size_t available,
const char *header, size_t size, const char *protocol,
- ssize_t (*next)(pn_transport_t *, const char *, size_t))
+ ssize_t (*next)(pn_io_layer_t *, const char *, size_t))
{
const char *point = header + transport->header_count;
int delta = pn_min(available, size - transport->header_count);
@@ -1861,7 +1860,7 @@ static ssize_t pn_input_read_header(pn_t
transport->header_count += delta;
if (transport->header_count == size) {
transport->header_count = 0;
- transport->process_input = next;
+ transport->io_layers[PN_IO_AMQP].process_input = next;
if (transport->disp->trace & PN_TRACE_FRM)
fprintf(stderr, " <- %s\n", protocol);
@@ -1870,33 +1869,18 @@ static ssize_t pn_input_read_header(pn_t
}
}
-static ssize_t pn_input_read_sasl_header(pn_transport_t *transport, const char *bytes, size_t available)
-{
- return pn_input_read_header(transport, bytes, available, SASL_HEADER, 8, "SASL", pn_input_read_sasl);
-}
-
-static ssize_t pn_input_read_sasl(pn_transport_t *transport, const char *bytes, size_t available)
-{
- pn_sasl_t *sasl = transport->sasl;
- ssize_t n = pn_sasl_input(sasl, bytes, available);
- if (n == PN_EOS) {
- transport->process_input = pn_input_read_amqp_header;
- return transport->process_input(transport, bytes, available);
- } else {
- return n;
- }
-}
-
#define AMQP_HEADER ("AMQP\x00\x01\x00\x00")
-static ssize_t pn_input_read_amqp_header(pn_transport_t *transport, const char *bytes, size_t available)
+static ssize_t pn_input_read_amqp_header(pn_io_layer_t *io_layer, const char *bytes, size_t available)
{
+ pn_transport_t *transport = (pn_transport_t *)io_layer->context;
return pn_input_read_header(transport, bytes, available, AMQP_HEADER, 8,
"AMQP", pn_input_read_amqp);
}
-static ssize_t pn_input_read_amqp(pn_transport_t *transport, const char *bytes, size_t available)
+static ssize_t pn_input_read_amqp(pn_io_layer_t *io_layer, const char *bytes, size_t available)
{
+ pn_transport_t *transport = (pn_transport_t *)io_layer->context;
if (transport->close_rcvd) {
if (available > 0) {
pn_do_error(transport, "amqp:connection:framing-error", "data after close");
@@ -1923,9 +1907,10 @@ static ssize_t pn_input_read_amqp(pn_tra
}
/* process AMQP related timer events */
-static pn_timestamp_t pn_process_tick(pn_transport_t *transport, pn_timestamp_t now)
+static pn_timestamp_t pn_tick_amqp(pn_io_layer_t *io_layer, pn_timestamp_t now)
{
pn_timestamp_t timeout = 0;
+ pn_transport_t *transport = (pn_transport_t *)io_layer->context;
if (transport->local_idle_timeout) {
if (transport->dead_remote_deadline == 0 ||
@@ -2492,45 +2477,29 @@ static ssize_t pn_output_write_header(pn
char *bytes, size_t size,
const char *header, size_t hdrsize,
const char *protocol,
- ssize_t (*next)(pn_transport_t *, char *, size_t))
+ ssize_t (*next)(pn_io_layer_t *, char *, size_t))
{
if (transport->disp->trace & PN_TRACE_FRM)
fprintf(stderr, " -> %s\n", protocol);
if (size >= hdrsize) {
memmove(bytes, header, hdrsize);
- transport->process_output = next;
+ transport->io_layers[PN_IO_AMQP].process_output = next;
return hdrsize;
} else {
return pn_error_format(transport->error, PN_UNDERFLOW, "underflow writing %s header", protocol);
}
}
-static ssize_t pn_output_write_sasl_header(pn_transport_t *transport, char *bytes, size_t size)
-{
- return pn_output_write_header(transport, bytes, size, SASL_HEADER, 8, "SASL",
- pn_output_write_sasl);
-}
-
-static ssize_t pn_output_write_sasl(pn_transport_t *transport, char *bytes, size_t size)
-{
- pn_sasl_t *sasl = transport->sasl;
- ssize_t n = pn_sasl_output(sasl, bytes, size);
- if (n == PN_EOS) {
- transport->process_output = pn_output_write_amqp_header;
- return transport->process_output(transport, bytes, size);
- } else {
- return n;
- }
-}
-
-static ssize_t pn_output_write_amqp_header(pn_transport_t *transport, char *bytes, size_t size)
+static ssize_t pn_output_write_amqp_header(pn_io_layer_t *io_layer, char *bytes, size_t size)
{
+ pn_transport_t *transport = (pn_transport_t *)io_layer->context;
return pn_output_write_header(transport, bytes, size, AMQP_HEADER, 8, "AMQP",
pn_output_write_amqp);
}
-static ssize_t pn_output_write_amqp(pn_transport_t *transport, char *bytes, size_t size)
+static ssize_t pn_output_write_amqp(pn_io_layer_t *io_layer, char *bytes, size_t size)
{
+ pn_transport_t *transport = (pn_transport_t *)io_layer->context;
if (!transport->connection) {
return 0;
}
@@ -2553,16 +2522,12 @@ ssize_t pn_transport_output(pn_transport
{
if (!transport) return PN_ARG_ERR;
+ pn_io_layer_t *io_layer = transport->io_layers;
size_t total = 0;
- const bool use_ssl = transport->ssl != NULL;
-
while (size - total > 0) {
ssize_t n;
- if (use_ssl)
- n = pn_ssl_output( transport->ssl, bytes + total, size - total);
- else
- n = transport->process_output(transport, bytes + total, size - total);
+ n = io_layer->process_output( io_layer, bytes + total, size - total);
if (n > 0) {
total += n;
} else if (n == 0) {
@@ -2617,7 +2582,7 @@ pn_millis_t pn_transport_get_idle_timeou
void pn_transport_set_idle_timeout(pn_transport_t *transport, pn_millis_t timeout)
{
transport->local_idle_timeout = timeout;
- transport->process_tick = pn_process_tick;
+ transport->io_layers[PN_IO_AMQP].process_tick = pn_tick_amqp;
}
pn_millis_t pn_transport_get_remote_idle_timeout(pn_transport_t *transport)
@@ -2685,9 +2650,8 @@ void pn_link_drain(pn_link_t *receiver,
pn_timestamp_t pn_transport_tick(pn_transport_t *transport, pn_timestamp_t now)
{
- if (transport && transport->process_tick)
- return transport->process_tick( transport, now );
- return 0;
+ pn_io_layer_t *io_layer = transport->io_layers;
+ return io_layer->process_tick( io_layer, now );
}
uint64_t pn_transport_get_frames_output(const pn_transport_t *transport)
@@ -2904,3 +2868,30 @@ int pn_condition_redirect_port(pn_condit
pn_data_rewind(data);
return port;
}
+
+/** Pass through input handler */
+ssize_t pn_io_layer_input_passthru(pn_io_layer_t *io_layer, const char *data, size_t available)
+{
+ pn_io_layer_t *next = io_layer->next;
+ if (next)
+ return next->process_input( next, data, available );
+ return PN_EOS;
+}
+
+/** Pass through output handler */
+ssize_t pn_io_layer_output_passthru(pn_io_layer_t *io_layer, char *bytes, size_t size)
+{
+ pn_io_layer_t *next = io_layer->next;
+ if (next)
+ return next->process_output( next, bytes, size );
+ return PN_EOS;
+}
+
+/** Pass through tick handler */
+pn_timestamp_t pn_io_layer_tick_passthru(pn_io_layer_t *io_layer, pn_timestamp_t now)
+{
+ pn_io_layer_t *next = io_layer->next;
+ if (next)
+ return next->process_tick( next, now );
+ return 0;
+}
Modified: qpid/proton/trunk/proton-c/src/sasl/sasl.c
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/sasl/sasl.c?rev=1439912&r1=1439911&r2=1439912&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/sasl/sasl.c (original)
+++ qpid/proton/trunk/proton-c/src/sasl/sasl.c Tue Jan 29 14:38:13 2013
@@ -34,6 +34,9 @@
#define SCRATCH (1024)
struct pn_sasl_t {
+ pn_transport_t *transport;
+ pn_io_layer_t *io_layer;
+ size_t header_count;
pn_dispatcher_t *disp;
bool client;
bool configured;
@@ -49,6 +52,11 @@ struct pn_sasl_t {
char scratch[SCRATCH];
};
+static ssize_t pn_input_read_sasl_header(pn_io_layer_t *io_layer, const char *bytes, size_t available);
+static ssize_t pn_input_read_sasl(pn_io_layer_t *io_layer, const char *bytes, size_t available);
+static ssize_t pn_output_write_sasl_header(pn_io_layer_t *io_layer, char *bytes, size_t available);
+static ssize_t pn_output_write_sasl(pn_io_layer_t *io_layer, char *bytes, size_t available);
+
int pn_do_init(pn_dispatcher_t *disp);
int pn_do_mechanisms(pn_dispatcher_t *disp);
int pn_do_challenge(pn_dispatcher_t *disp);
@@ -81,8 +89,14 @@ pn_sasl_t *pn_sasl(pn_transport_t *trans
sasl->rcvd_done = false;
transport->sasl = sasl;
+ sasl->transport = transport;
+ sasl->io_layer = &transport->io_layers[PN_IO_SASL];
+ sasl->io_layer->context = sasl;
+ sasl->io_layer->process_input = pn_input_read_sasl_header;
+ sasl->io_layer->process_output = pn_output_write_sasl_header;
+ sasl->io_layer->process_tick = pn_io_layer_tick_passthru;
- pn_transport_sasl_init(transport);
+ sasl->header_count = 0;
}
return transport->sasl;
@@ -300,7 +314,7 @@ void pn_sasl_process(pn_sasl_t *sasl)
}
}
-ssize_t pn_sasl_input(pn_sasl_t *sasl, char *bytes, size_t available)
+ssize_t pn_sasl_input(pn_sasl_t *sasl, const char *bytes, size_t available)
{
ssize_t n = pn_dispatcher_input(sasl->disp, bytes, available);
if (n < 0) return n;
@@ -391,3 +405,66 @@ int pn_do_outcome(pn_dispatcher_t *disp)
disp->halt = true;
return 0;
}
+
+#define SASL_HEADER ("AMQP\x03\x01\x00\x00")
+#define SASL_HEADER_LEN 8
+
+static ssize_t pn_input_read_sasl_header(pn_io_layer_t *io_layer, const char *bytes, size_t available)
+{
+ pn_sasl_t *sasl = (pn_sasl_t *)io_layer->context;
+ const char *point = SASL_HEADER + sasl->header_count;
+ int delta = pn_min(available, SASL_HEADER_LEN - sasl->header_count);
+ if (!available || memcmp(bytes, point, delta)) {
+ char quoted[1024];
+ pn_quote_data(quoted, 1024, bytes, available);
+ return pn_error_format(sasl->transport->error, PN_ERR,
+ "%s header mismatch: '%s'", "SASL", quoted);
+ } else {
+ sasl->header_count += delta;
+ if (sasl->header_count == SASL_HEADER_LEN) {
+ sasl->io_layer->process_input = pn_input_read_sasl;
+ if (sasl->disp->trace & PN_TRACE_FRM)
+ fprintf(stderr, " <- %s\n", "SASL");
+ }
+ return delta;
+ }
+}
+
+static ssize_t pn_input_read_sasl(pn_io_layer_t *io_layer, const char *bytes, size_t available)
+{
+ pn_sasl_t *sasl = (pn_sasl_t *)io_layer->context;
+ ssize_t n = pn_sasl_input(sasl, bytes, available);
+ if (n == PN_EOS) {
+ sasl->io_layer->process_input = pn_io_layer_input_passthru;
+ pn_io_layer_t *io_next = sasl->io_layer->next;
+ return io_next->process_input( io_next, bytes, available );
+ }
+ return n;
+}
+
+static ssize_t pn_output_write_sasl_header(pn_io_layer_t *io_layer, char *bytes, size_t size)
+{
+ pn_sasl_t *sasl = (pn_sasl_t *)io_layer->context;
+ if (sasl->disp->trace & PN_TRACE_FRM)
+ fprintf(stderr, " -> %s\n", "SASL");
+ if (size >= SASL_HEADER_LEN) {
+ memmove(bytes, SASL_HEADER, SASL_HEADER_LEN);
+ sasl->io_layer->process_output = pn_output_write_sasl;
+ return SASL_HEADER_LEN;
+ } else {
+ return pn_error_format(sasl->transport->error, PN_UNDERFLOW, "underflow writing %s header", "SASL");
+ }
+}
+
+static ssize_t pn_output_write_sasl(pn_io_layer_t *io_layer, char *bytes, size_t size)
+{
+ pn_sasl_t *sasl = (pn_sasl_t *)io_layer->context;
+ ssize_t n = pn_sasl_output(sasl, bytes, size);
+ if (n == PN_EOS) {
+ sasl->io_layer->process_output = pn_io_layer_output_passthru;
+ pn_io_layer_t *io_next = sasl->io_layer->next;
+ return io_next->process_output( io_next, bytes, size );
+ }
+ return n;
+}
+
Modified: qpid/proton/trunk/proton-c/src/ssl/openssl.c
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/ssl/openssl.c?rev=1439912&r1=1439911&r2=1439912&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/ssl/openssl.c (original)
+++ qpid/proton/trunk/proton-c/src/ssl/openssl.c Tue Jan 29 14:38:13 2013
@@ -72,6 +72,7 @@ struct pn_ssl_domain_t {
struct pn_ssl_t {
pn_transport_t *transport;
+ pn_io_layer_t *io_layer;
pn_ssl_domain_t *domain;
const char *session_id;
const char *peer_hostname;
@@ -95,10 +96,6 @@ struct pn_ssl_t {
char inbuf[APP_BUF_SIZE];
size_t in_count;
- // process cleartext i/o "above" the SSL layer
- ssize_t (*process_input)(pn_transport_t *, const char *, size_t);
- ssize_t (*process_output)(pn_transport_t *, char *, size_t);
-
pn_trace_t trace;
};
@@ -117,14 +114,12 @@ struct pn_ssl_session_t {
/* */
static int keyfile_pw_cb(char *buf, int size, int rwflag, void *userdata);
-static ssize_t process_input_ssl( pn_transport_t *transport, const char *input_data, size_t len);
-static ssize_t process_output_ssl( pn_transport_t *transport, char *input_data, size_t len);
-static ssize_t process_input_cleartext(pn_transport_t *transport, const char *input_data, size_t len);
-static ssize_t process_output_cleartext(pn_transport_t *transport, char *buffer, size_t max_len);
-static ssize_t process_input_unknown(pn_transport_t *transport, const char *input_data, size_t len);
-static ssize_t process_output_unknown(pn_transport_t *transport, char *input_data, size_t len);
-static ssize_t process_input_done(pn_transport_t *transport, const char *input_data, size_t len);
-static ssize_t process_output_done(pn_transport_t *transport, char *input_data, size_t len);
+static ssize_t process_input_ssl( pn_io_layer_t *io_layer, const char *input_data, size_t len);
+static ssize_t process_output_ssl( pn_io_layer_t *io_layer, char *input_data, size_t len);
+static ssize_t process_input_unknown( pn_io_layer_t *io_layer, const char *input_data, size_t len);
+static ssize_t process_output_unknown( pn_io_layer_t *io_layer, char *input_data, size_t len);
+static ssize_t process_input_done(pn_io_layer_t *io_layer, const char *input_data, size_t len);
+static ssize_t process_output_done(pn_io_layer_t *io_layer, char *input_data, size_t len);
static connection_mode_t check_for_ssl_connection( const char *data, size_t len );
static int init_ssl_socket( pn_ssl_t * );
static void release_ssl_socket( pn_ssl_t * );
@@ -643,11 +638,11 @@ int pn_ssl_init( pn_ssl_t *ssl, pn_ssl_d
ssl->domain = domain;
domain->ref_count++;
if (domain->allow_unsecured) {
- ssl->process_input = process_input_unknown;
- ssl->process_output = process_output_unknown;
+ ssl->io_layer->process_input = process_input_unknown;
+ ssl->io_layer->process_output = process_output_unknown;
} else {
- ssl->process_input = process_input_ssl;
- ssl->process_output = process_output_ssl;
+ ssl->io_layer->process_input = process_input_ssl;
+ ssl->io_layer->process_output = process_output_ssl;
}
if (session_id && domain->mode == PN_SSL_MODE_CLIENT)
@@ -712,19 +707,6 @@ void pn_ssl_free( pn_ssl_t *ssl)
free(ssl);
}
-// move data received from the network into the SSL layer
-ssize_t pn_ssl_input(pn_ssl_t *ssl, const char *bytes, size_t available)
-{
- return ssl->process_input( ssl->transport, bytes, available );
-}
-
-// pull output from the SSL layer and move into network output buffers
-ssize_t pn_ssl_output(pn_ssl_t *ssl, char *buffer, size_t max_size)
-{
- return ssl->process_output( ssl->transport, buffer, max_size );
-}
-
-
pn_ssl_t *pn_ssl(pn_transport_t *transport)
{
if (!transport) return NULL;
@@ -735,6 +717,12 @@ pn_ssl_t *pn_ssl(pn_transport_t *transpo
ssl->transport = transport;
transport->ssl = ssl;
+ ssl->io_layer = &transport->io_layers[PN_IO_SSL];
+ ssl->io_layer->context = ssl;
+ ssl->io_layer->process_input = pn_io_layer_input_passthru;
+ ssl->io_layer->process_output = pn_io_layer_output_passthru;
+ ssl->io_layer->process_tick = pn_io_layer_tick_passthru;
+
ssl->trace = (transport->disp) ? transport->disp->trace : PN_TRACE_OFF;
return ssl;
@@ -780,8 +768,8 @@ static int start_ssl_shutdown( pn_ssl_t
static int setup_ssl_connection( pn_ssl_t *ssl )
{
_log( ssl, "SSL connection detected.\n");
- ssl->process_input = process_input_ssl;
- ssl->process_output = process_output_ssl;
+ ssl->io_layer->process_input = process_input_ssl;
+ ssl->io_layer->process_output = process_output_ssl;
return 0;
}
@@ -790,10 +778,9 @@ static int setup_ssl_connection( pn_ssl_
// take data from the network, and pass it into SSL. Attempt to read decrypted data from
// SSL socket and pass it to the application.
-static ssize_t process_input_ssl( pn_transport_t *transport, const char *input_data, size_t available)
+static ssize_t process_input_ssl( pn_io_layer_t *io_layer, const char *input_data, size_t available)
{
- pn_ssl_t *ssl = transport->ssl;
- if (!ssl) return PN_ERR;
+ pn_ssl_t *ssl = (pn_ssl_t *)io_layer->context;
if (ssl->ssl == NULL && init_ssl_socket(ssl)) return PN_ERR;
_log( ssl, "process_input_ssl( data size=%d )\n",available );
@@ -866,7 +853,8 @@ static ssize_t process_input_ssl( pn_tra
if (!ssl->app_input_closed) {
char *data = ssl->inbuf;
if (ssl->in_count > 0 || ssl->ssl_closed) { /* if ssl_closed, send 0 count */
- ssize_t consumed = transport->process_input(transport, data, ssl->in_count);
+ pn_io_layer_t *io_next = ssl->io_layer->next;
+ ssize_t consumed = io_next->process_input( io_next, data, ssl->in_count);
if (consumed > 0) {
ssl->in_count -= consumed;
data += consumed;
@@ -906,15 +894,15 @@ static ssize_t process_input_ssl( pn_tra
//}
if (ssl->app_input_closed && (SSL_get_shutdown(ssl->ssl) & SSL_SENT_SHUTDOWN) ) {
consumed = ssl->app_input_closed;
- ssl->process_input = process_input_done;
+ ssl->io_layer->process_input = process_input_done;
}
_log(ssl, "process_input_ssl() returning %d\n", (int) consumed);
return consumed;
}
-static ssize_t process_output_ssl( pn_transport_t *transport, char *buffer, size_t max_len)
+static ssize_t process_output_ssl( pn_io_layer_t *io_layer, char *buffer, size_t max_len)
{
- pn_ssl_t *ssl = transport->ssl;
+ pn_ssl_t *ssl = (pn_ssl_t *)io_layer->context;
if (!ssl) return PN_ERR;
if (ssl->ssl == NULL && init_ssl_socket(ssl)) return PN_ERR;
@@ -926,7 +914,8 @@ static ssize_t process_output_ssl( pn_tr
// first, get any pending application output, if possible
if (!ssl->app_output_closed && ssl->out_count < APP_BUF_SIZE) {
- ssize_t app_bytes = transport->process_output(transport, &ssl->outbuf[ssl->out_count], APP_BUF_SIZE - ssl->out_count);
+ pn_io_layer_t *io_next = ssl->io_layer->next;
+ ssize_t app_bytes = io_next->process_output( io_next, &ssl->outbuf[ssl->out_count], APP_BUF_SIZE - ssl->out_count);
if (app_bytes > 0) {
ssl->out_count += app_bytes;
work_pending = true;
@@ -1018,7 +1007,7 @@ static ssize_t process_output_ssl( pn_tr
//}
if (written == 0 && (SSL_get_shutdown(ssl->ssl) & SSL_SENT_SHUTDOWN) && BIO_pending(ssl->bio_net_io) == 0) {
written = ssl->app_output_closed ? ssl->app_output_closed : PN_EOS;
- ssl->process_output = process_output_done;
+ ssl->io_layer->process_output = process_output_done;
}
_log(ssl, "process_output_ssl() returning %d\n", (int) written);
return written;
@@ -1100,48 +1089,33 @@ static void release_ssl_socket( pn_ssl_t
}
-//////// CLEARTEXT CONNECTIONS
-
-static ssize_t process_input_cleartext(pn_transport_t *transport, const char *input_data, size_t len)
-{
- // just write directly to layer "above" SSL
- return transport->process_input( transport, input_data, len );
-}
-
-static ssize_t process_output_cleartext(pn_transport_t *transport, char *buffer, size_t max_len)
-{
- // just read directly from the layer "above" SSL
- return transport->process_output( transport, buffer, max_len );
-}
-
-
-
static int setup_cleartext_connection( pn_ssl_t *ssl )
{
_log( ssl, "Cleartext connection detected.\n");
- ssl->process_input = process_input_cleartext;
- ssl->process_output = process_output_cleartext;
+ ssl->io_layer->process_input = pn_io_layer_input_passthru;
+ ssl->io_layer->process_output = pn_io_layer_output_passthru;
return 0;
}
// until we determine if the client is using SSL or not:
-static ssize_t process_input_unknown(pn_transport_t *transport, const char *input_data, size_t len)
+static ssize_t process_input_unknown(pn_io_layer_t *io_layer, const char *input_data, size_t len)
{
+ pn_ssl_t *ssl = (pn_ssl_t *)io_layer->context;
switch (check_for_ssl_connection( input_data, len )) {
case SSL_CONNECTION:
- setup_ssl_connection( transport->ssl );
- return transport->ssl->process_input( transport, input_data, len );
+ setup_ssl_connection( ssl );
+ return ssl->io_layer->process_input( ssl->io_layer, input_data, len );
case CLEAR_CONNECTION:
- setup_cleartext_connection( transport->ssl );
- return transport->ssl->process_input( transport, input_data, len );
+ setup_cleartext_connection( ssl );
+ return ssl->io_layer->process_input( ssl->io_layer, input_data, len );
default:
return 0;
}
}
-static ssize_t process_output_unknown(pn_transport_t *transport, char *input_data, size_t len)
+static ssize_t process_output_unknown(pn_io_layer_t *io_layer, char *input_data, size_t len)
{
// do not do output until we know if SSL is used or not
return 0;
@@ -1253,11 +1227,11 @@ int pn_ssl_get_peer_hostname( pn_ssl_t *
return 0;
}
-static ssize_t process_input_done(pn_transport_t *transport, const char *input_data, size_t len)
+static ssize_t process_input_done(pn_io_layer_t *io_layer, const char *input_data, size_t len)
{
return PN_EOS;
}
-static ssize_t process_output_done(pn_transport_t *transport, char *input_data, size_t len)
+static ssize_t process_output_done(pn_io_layer_t *io_layer, char *input_data, size_t len)
{
return PN_EOS;
}
Modified: qpid/proton/trunk/proton-c/src/ssl/ssl-internal.h
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/ssl/ssl-internal.h?rev=1439912&r1=1439911&r2=1439912&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/ssl/ssl-internal.h (original)
+++ qpid/proton/trunk/proton-c/src/ssl/ssl-internal.h Tue Jan 29 14:38:13 2013
@@ -33,13 +33,6 @@
// release the SSL context
void pn_ssl_free( pn_ssl_t *ssl);
-// move data received from the network into the SSL layer
-ssize_t pn_ssl_input(pn_ssl_t *ssl, const char *bytes, size_t available);
-
-// copy data generated by SSL into the network's output buffers
-ssize_t pn_ssl_output(pn_ssl_t *ssl, char *buffer, size_t max_size);
-
-
void pn_ssl_trace(pn_ssl_t *ssl, pn_trace_t trace);
#endif /* ssl-internal.h */
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org