You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2014/11/26 21:05:51 UTC
[04/35] qpid-proton git commit: PROTON-749: Refactoring of io layers:
- Eliminate some unnecessary stuff. - Make pn_io_layer_t a pure interface. -
Simplify amqp header code; remove header_count member from pn_transport_t
PROTON-749: Refactoring of io layers:
- Eliminate some unnecessary stuff.
- Make pn_io_layer_t a pure interface.
- Simplify amqp header code; remove header_count member from pn_transport_t
Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/c814d5c3
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/c814d5c3
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/c814d5c3
Branch: refs/heads/examples
Commit: c814d5c39147afa642e95e1b1ad51650b04d9739
Parents: 2794da5
Author: Andrew Stitcher <as...@apache.org>
Authored: Wed Aug 6 17:57:56 2014 -0400
Committer: Andrew Stitcher <as...@apache.org>
Committed: Mon Nov 17 14:55:03 2014 -0500
----------------------------------------------------------------------
proton-c/src/engine/engine-internal.h | 20 ++-
proton-c/src/sasl/sasl.c | 128 +++++++++++-------
proton-c/src/ssl/openssl.c | 143 +++++++++++---------
proton-c/src/transport/transport.c | 204 ++++++++++++++---------------
proton-c/src/windows/schannel.c | 155 ++++++++++++----------
5 files changed, 354 insertions(+), 296 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/c814d5c3/proton-c/src/engine/engine-internal.h
----------------------------------------------------------------------
diff --git a/proton-c/src/engine/engine-internal.h b/proton-c/src/engine/engine-internal.h
index dd4c44e..86f5161 100644
--- a/proton-c/src/engine/engine-internal.h
+++ b/proton-c/src/engine/engine-internal.h
@@ -100,18 +100,16 @@ typedef struct {
#include <proton/ssl.h>
typedef struct pn_io_layer_t {
- void *context;
- struct pn_io_layer_t *next;
- ssize_t (*process_input)(struct pn_io_layer_t *io_layer, const char *, size_t);
- ssize_t (*process_output)(struct pn_io_layer_t *io_layer, char *, size_t);
- pn_timestamp_t (*process_tick)(struct pn_io_layer_t *io_layer, pn_timestamp_t);
- size_t (*buffered_output)(struct pn_io_layer_t *); // how much output is held
- size_t (*buffered_input)(struct pn_io_layer_t *); // how much input is held
+ ssize_t (*process_input)(struct pn_transport_t *transport, unsigned int layer, const char *, size_t);
+ ssize_t (*process_output)(struct pn_transport_t *transport, unsigned int layer, char *, size_t);
+ pn_timestamp_t (*process_tick)(struct pn_transport_t *transport, unsigned int layer, pn_timestamp_t);
+ size_t (*buffered_output)(struct pn_transport_t *); // how much output is held
} pn_io_layer_t;
+extern const pn_io_layer_t pni_passthru_layer;
+
struct pn_transport_t {
pn_tracer_t tracer;
- size_t header_count;
pn_sasl_t *sasl;
pn_ssl_t *ssl;
pn_connection_t *connection; // reference counted
@@ -134,7 +132,7 @@ struct pn_transport_t {
#define PN_IO_SASL 1
#define PN_IO_AMQP 2
#define PN_IO_LAYER_CT (PN_IO_AMQP+1)
- pn_io_layer_t io_layers[PN_IO_LAYER_CT];
+ const pn_io_layer_t *io_layers[PN_IO_LAYER_CT];
/* dead remote detection */
pn_millis_t local_idle_timeout;
@@ -302,9 +300,7 @@ void pn_link_dump(pn_link_t *link);
void pn_dump(pn_connection_t *conn);
void pn_transport_sasl_init(pn_transport_t *transport);
-ssize_t pn_io_layer_input_passthru(pn_io_layer_t *, const char *, size_t );
-ssize_t pn_io_layer_output_passthru(pn_io_layer_t *, char *, size_t );
-pn_timestamp_t pn_io_layer_tick_passthru(pn_io_layer_t *, pn_timestamp_t);
+pn_timestamp_t pn_io_layer_tick_passthru(pn_transport_t *, unsigned int, pn_timestamp_t);
void pn_condition_init(pn_condition_t *condition);
void pn_condition_tini(pn_condition_t *condition);
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/c814d5c3/proton-c/src/sasl/sasl.c
----------------------------------------------------------------------
diff --git a/proton-c/src/sasl/sasl.c b/proton-c/src/sasl/sasl.c
index a82ec02..97bead4 100644
--- a/proton-c/src/sasl/sasl.c
+++ b/proton-c/src/sasl/sasl.c
@@ -36,7 +36,6 @@
struct pn_sasl_t {
pn_transport_t *transport;
- pn_io_layer_t *io_layer;
pn_dispatcher_t *disp;
char *mechanisms;
char *remote_mechanisms;
@@ -50,12 +49,42 @@ struct pn_sasl_t {
bool rcvd_init;
bool sent_done;
bool rcvd_done;
+ bool input_bypass;
+ bool output_bypass;
};
-static ssize_t pn_input_read_sasl_header(pn_io_layer_t *io_layer, const char *bytes, size_t available);
-static ssize_t pn_input_read_sasl(pn_io_layer_t *io_layer, const char *bytes, size_t available);
-static ssize_t pn_output_write_sasl_header(pn_io_layer_t *io_layer, char *bytes, size_t available);
-static ssize_t pn_output_write_sasl(pn_io_layer_t *io_layer, char *bytes, size_t available);
+static ssize_t pn_input_read_sasl_header(pn_transport_t* transport, unsigned int layer, const char* bytes, size_t available);
+static ssize_t pn_input_read_sasl(pn_transport_t *transport, unsigned int layer, const char *bytes, size_t available);
+static ssize_t pn_output_write_sasl_header(pn_transport_t* transport, unsigned int layer, char* bytes, size_t size);
+static ssize_t pn_output_write_sasl(pn_transport_t *transport, unsigned int layer, char *bytes, size_t available);
+
+const pn_io_layer_t sasl_headers_layer = {
+ pn_input_read_sasl_header,
+ pn_output_write_sasl_header,
+ pn_io_layer_tick_passthru,
+ NULL
+};
+
+const pn_io_layer_t sasl_write_header_layer = {
+ pn_input_read_sasl,
+ pn_output_write_sasl_header,
+ pn_io_layer_tick_passthru,
+ NULL
+};
+
+const pn_io_layer_t sasl_read_header_layer = {
+ pn_input_read_sasl_header,
+ pn_output_write_sasl,
+ pn_io_layer_tick_passthru,
+ NULL
+};
+
+const pn_io_layer_t sasl_layer = {
+ pn_input_read_sasl,
+ pn_output_write_sasl,
+ pn_io_layer_tick_passthru,
+ NULL
+};
pn_sasl_t *pn_sasl(pn_transport_t *transport)
{
@@ -76,14 +105,12 @@ pn_sasl_t *pn_sasl(pn_transport_t *transport)
sasl->rcvd_init = false;
sasl->sent_done = false;
sasl->rcvd_done = false;
+ sasl->input_bypass = false;
+ sasl->output_bypass = false;
transport->sasl = sasl;
sasl->transport = transport;
- sasl->io_layer = &transport->io_layers[PN_IO_SASL];
- sasl->io_layer->context = sasl;
- sasl->io_layer->process_input = pn_input_read_sasl_header;
- sasl->io_layer->process_output = pn_output_write_sasl_header;
- sasl->io_layer->process_tick = pn_io_layer_tick_passthru;
+ transport->io_layers[PN_IO_SASL] = &sasl_headers_layer;
}
return transport->sasl;
@@ -404,9 +431,9 @@ int pn_do_outcome(pn_dispatcher_t *disp)
#define AMQP_HEADER ("AMQP\x00\x01\x00\x00")
#define SASL_HEADER_LEN 8
-static ssize_t pn_input_read_sasl_header(pn_io_layer_t *io_layer, const char *bytes, size_t available)
+static ssize_t pn_input_read_sasl_header(pn_transport_t* transport, unsigned int layer, const char* bytes, size_t available)
{
- pn_sasl_t *sasl = (pn_sasl_t *)io_layer->context;
+ pn_sasl_t *sasl = transport->sasl;
if (available > 0) {
if (available < SASL_HEADER_LEN) {
if (memcmp(bytes, SASL_HEADER, available) == 0 ||
@@ -414,20 +441,22 @@ static ssize_t pn_input_read_sasl_header(pn_io_layer_t *io_layer, const char *by
return 0;
} else {
if (memcmp(bytes, SASL_HEADER, SASL_HEADER_LEN) == 0) {
- sasl->io_layer->process_input = pn_input_read_sasl;
+ if (transport->io_layers[layer] == &sasl_read_header_layer) {
+ transport->io_layers[layer] = &sasl_layer;
+ } else {
+ transport->io_layers[layer] = &sasl_write_header_layer;
+ }
if (sasl->disp->trace & PN_TRACE_FRM)
- pn_transport_logf(sasl->transport, " <- %s", "SASL");
+ pn_transport_logf(transport, " <- %s", "SASL");
return SASL_HEADER_LEN;
}
if (memcmp(bytes, AMQP_HEADER, SASL_HEADER_LEN) == 0) {
if (sasl->allow_skip) {
sasl->outcome = PN_SASL_SKIPPED;
- sasl->io_layer->process_input = pn_io_layer_input_passthru;
- sasl->io_layer->process_output = pn_io_layer_output_passthru;
- pn_io_layer_t *io_next = sasl->io_layer->next;
- return io_next->process_input( io_next, bytes, available );
+ transport->io_layers[layer] = &pni_passthru_layer;
+ return pni_passthru_layer.process_input(transport, layer, bytes, available);
} else {
- pn_do_error(sasl->transport, "amqp:connection:policy-error",
+ pn_do_error(transport, "amqp:connection:policy-error",
"Client skipped SASL exchange - forbidden");
return PN_EOS;
}
@@ -436,50 +465,57 @@ static ssize_t pn_input_read_sasl_header(pn_io_layer_t *io_layer, const char *by
}
char quoted[1024];
pn_quote_data(quoted, 1024, bytes, available);
- pn_do_error(sasl->transport, "amqp:connection:framing-error",
+ pn_do_error(transport, "amqp:connection:framing-error",
"%s header mismatch: '%s'", "SASL", quoted);
return PN_EOS;
}
-static ssize_t pn_input_read_sasl(pn_io_layer_t *io_layer, const char *bytes, size_t available)
+static ssize_t pn_input_read_sasl(pn_transport_t* transport, unsigned int layer, const char* bytes, size_t available)
{
- pn_sasl_t *sasl = (pn_sasl_t *)io_layer->context;
- ssize_t n = pn_sasl_input(sasl, bytes, available);
- if (n == PN_EOS) {
- sasl->io_layer->process_input = pn_io_layer_input_passthru;
- pn_io_layer_t *io_next = sasl->io_layer->next;
- return io_next->process_input( io_next, bytes, available );
+ pn_sasl_t *sasl = transport->sasl;
+ if (!sasl->input_bypass) {
+ ssize_t n = pn_sasl_input(sasl, bytes, available);
+ if (n != PN_EOS) return n;
+
+ sasl->input_bypass = true;
+ if (sasl->output_bypass)
+ transport->io_layers[layer] = &pni_passthru_layer;
}
- return n;
+ return pni_passthru_layer.process_input(transport, layer, bytes, available );
}
-static ssize_t pn_output_write_sasl_header(pn_io_layer_t *io_layer, char *bytes, size_t size)
+static ssize_t pn_output_write_sasl_header(pn_transport_t *transport, unsigned int layer, char *bytes, size_t size)
{
- pn_sasl_t *sasl = (pn_sasl_t *)io_layer->context;
+ pn_sasl_t *sasl = transport->sasl;
if (sasl->disp->trace & PN_TRACE_FRM)
- pn_transport_logf(sasl->transport, " -> %s", "SASL");
+ pn_transport_logf(transport, " -> %s", "SASL");
assert(size >= SASL_HEADER_LEN);
memmove(bytes, SASL_HEADER, SASL_HEADER_LEN);
- sasl->io_layer->process_output = pn_output_write_sasl;
+ if (transport->io_layers[layer]==&sasl_write_header_layer) {
+ transport->io_layers[layer] = &sasl_layer;
+ } else {
+ transport->io_layers[layer] = &sasl_read_header_layer;
+ }
return SASL_HEADER_LEN;
}
-static ssize_t pn_output_write_sasl(pn_io_layer_t *io_layer, char *bytes, size_t size)
+static ssize_t pn_output_write_sasl(pn_transport_t* transport, unsigned int layer, char* bytes, size_t available)
{
- pn_sasl_t *sasl = (pn_sasl_t *)io_layer->context;
- // this accounts for when pn_do_error is invoked, e.g. by idle timeout
- ssize_t n;
- if (sasl->transport->close_sent) {
- n = PN_EOS;
- } else {
- n = pn_sasl_output(sasl, bytes, size);
- }
+ pn_sasl_t *sasl = transport->sasl;
+ if (!sasl->output_bypass) {
+ // this accounts for when pn_do_error is invoked, e.g. by idle timeout
+ ssize_t n;
+ if (transport->close_sent) {
+ n = PN_EOS;
+ } else {
+ n = pn_sasl_output(sasl, bytes, available);
+ }
+ if (n != PN_EOS) return n;
- if (n == PN_EOS) {
- sasl->io_layer->process_output = pn_io_layer_output_passthru;
- pn_io_layer_t *io_next = sasl->io_layer->next;
- return io_next->process_output( io_next, bytes, size );
+ sasl->output_bypass = true;
+ if (sasl->input_bypass)
+ transport->io_layers[layer] = &pni_passthru_layer;
}
- return n;
+ return pni_passthru_layer.process_output(transport, layer, bytes, available );
}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/c814d5c3/proton-c/src/ssl/openssl.c
----------------------------------------------------------------------
diff --git a/proton-c/src/ssl/openssl.c b/proton-c/src/ssl/openssl.c
index dd1b88b..a763cfb 100644
--- a/proton-c/src/ssl/openssl.c
+++ b/proton-c/src/ssl/openssl.c
@@ -87,9 +87,7 @@ struct pn_ssl_domain_t {
struct pn_ssl_t {
-
pn_transport_t *transport;
- pn_io_layer_t *io_layer;
pn_ssl_domain_t *domain;
const char *session_id;
const char *peer_hostname;
@@ -134,19 +132,18 @@ struct pn_ssl_session_t {
/* */
static int keyfile_pw_cb(char *buf, int size, int rwflag, void *userdata);
-static ssize_t process_input_ssl( pn_io_layer_t *io_layer, const char *input_data, size_t len);
-static ssize_t process_output_ssl( pn_io_layer_t *io_layer, char *input_data, size_t len);
-static ssize_t process_input_unknown( pn_io_layer_t *io_layer, const char *input_data, size_t len);
-static ssize_t process_output_unknown( pn_io_layer_t *io_layer, char *input_data, size_t len);
-static ssize_t process_input_done(pn_io_layer_t *io_layer, const char *input_data, size_t len);
-static ssize_t process_output_done(pn_io_layer_t *io_layer, char *input_data, size_t len);
+static ssize_t process_input_ssl( pn_transport_t *transport, unsigned int layer, const char *input_data, size_t len);
+static ssize_t process_output_ssl( pn_transport_t *transport, unsigned int layer, char *input_data, size_t len);
+static ssize_t process_input_unknown( pn_transport_t *transport, unsigned int layer, const char *input_data, size_t len);
+static ssize_t process_output_unknown( pn_transport_t *transport, unsigned int layer, char *input_data, size_t len);
+static ssize_t process_input_done(pn_transport_t *transport, unsigned int layer, const char *input_data, size_t len);
+static ssize_t process_output_done(pn_transport_t *transport, unsigned int layer, char *input_data, size_t len);
static connection_mode_t check_for_ssl_connection( const char *data, size_t len );
static int init_ssl_socket( pn_ssl_t * );
static void release_ssl_socket( pn_ssl_t * );
static pn_ssl_session_t *ssn_cache_find( pn_ssl_domain_t *, const char * );
static void ssl_session_free( pn_ssl_session_t *);
-static size_t buffered_output( pn_io_layer_t *io_layer );
-static size_t buffered_input( pn_io_layer_t *io_layer );
+static size_t buffered_output( pn_transport_t *transport );
// @todo: used to avoid littering the code with calls to printf...
static void _log_error(pn_ssl_t *ssl, const char *fmt, ...)
@@ -670,6 +667,40 @@ int pn_ssl_domain_set_peer_authentication(pn_ssl_domain_t *domain,
return 0;
}
+const pn_io_layer_t unknown_layer = {
+ process_input_unknown,
+ process_output_unknown,
+ pn_io_layer_tick_passthru,
+ NULL
+};
+
+const pn_io_layer_t ssl_layer = {
+ process_input_ssl,
+ process_output_ssl,
+ pn_io_layer_tick_passthru,
+ buffered_output
+};
+
+const pn_io_layer_t ssl_input_closed_layer = {
+ process_input_done,
+ process_output_ssl,
+ pn_io_layer_tick_passthru,
+ buffered_output
+};
+
+const pn_io_layer_t ssl_output_closed_layer = {
+ process_input_ssl,
+ process_output_done,
+ pn_io_layer_tick_passthru,
+ buffered_output
+};
+
+const pn_io_layer_t ssl_closed_layer = {
+ process_input_done,
+ process_output_done,
+ pn_io_layer_tick_passthru,
+ buffered_output
+};
int pn_ssl_init( pn_ssl_t *ssl, pn_ssl_domain_t *domain, const char *session_id)
{
@@ -678,13 +709,10 @@ int pn_ssl_init( pn_ssl_t *ssl, pn_ssl_domain_t *domain, const char *session_id)
ssl->domain = domain;
domain->ref_count++;
if (domain->allow_unsecured) {
- ssl->io_layer->process_input = process_input_unknown;
- ssl->io_layer->process_output = process_output_unknown;
+ ssl->transport->io_layers[PN_IO_SSL] = &unknown_layer;
} else {
- ssl->io_layer->process_input = process_input_ssl;
- ssl->io_layer->process_output = process_output_ssl;
+ ssl->transport->io_layers[PN_IO_SSL] = &ssl_layer;
}
-
if (session_id && domain->mode == PN_SSL_MODE_CLIENT)
ssl->session_id = pn_strdup(session_id);
@@ -773,13 +801,7 @@ pn_ssl_t *pn_ssl(pn_transport_t *transport)
ssl->transport = transport;
transport->ssl = ssl;
- ssl->io_layer = &transport->io_layers[PN_IO_SSL];
- ssl->io_layer->context = ssl;
- ssl->io_layer->process_input = pn_io_layer_input_passthru;
- ssl->io_layer->process_output = pn_io_layer_output_passthru;
- ssl->io_layer->process_tick = pn_io_layer_tick_passthru;
- ssl->io_layer->buffered_output = buffered_output;
- ssl->io_layer->buffered_input = buffered_input;
+ transport->io_layers[PN_IO_SSL] = &pni_passthru_layer;
ssl->trace = (transport->disp) ? transport->disp->trace : PN_TRACE_OFF;
@@ -823,11 +845,9 @@ static int start_ssl_shutdown( pn_ssl_t *ssl )
-static int setup_ssl_connection( pn_ssl_t *ssl )
+static int setup_ssl_connection(pn_transport_t *transport, unsigned int layer)
{
- _log( ssl, "SSL connection detected.");
- ssl->io_layer->process_input = process_input_ssl;
- ssl->io_layer->process_output = process_output_ssl;
+ transport->io_layers[layer] = &ssl_layer;
return 0;
}
@@ -836,9 +856,9 @@ static int setup_ssl_connection( pn_ssl_t *ssl )
// take data from the network, and pass it into SSL. Attempt to read decrypted data from
// SSL socket and pass it to the application.
-static ssize_t process_input_ssl( pn_io_layer_t *io_layer, const char *input_data, size_t available)
+static ssize_t process_input_ssl( pn_transport_t *transport, unsigned int layer, const char *input_data, size_t available)
{
- pn_ssl_t *ssl = (pn_ssl_t *)io_layer->context;
+ pn_ssl_t *ssl = transport->ssl;
if (ssl->ssl == NULL && init_ssl_socket(ssl)) return PN_EOS;
_log( ssl, "process_input_ssl( data size=%d )",available );
@@ -910,8 +930,7 @@ static ssize_t process_input_ssl( pn_io_layer_t *io_layer, const char *input_dat
if (!ssl->app_input_closed) {
if (ssl->in_count > 0 || ssl->ssl_closed) { /* if ssl_closed, send 0 count */
- pn_io_layer_t *io_next = ssl->io_layer->next;
- ssize_t consumed = io_next->process_input( io_next, ssl->inbuf, ssl->in_count);
+ ssize_t consumed = transport->io_layers[layer+1]->process_input(transport, layer+1, ssl->inbuf, ssl->in_count);
if (consumed > 0) {
ssl->in_count -= consumed;
if (ssl->in_count)
@@ -973,15 +992,19 @@ static ssize_t process_input_ssl( pn_io_layer_t *io_layer, const char *input_dat
//}
if (ssl->app_input_closed && (SSL_get_shutdown(ssl->ssl) & SSL_SENT_SHUTDOWN) ) {
consumed = ssl->app_input_closed;
- ssl->io_layer->process_input = process_input_done;
+ if (transport->io_layers[layer]==&ssl_output_closed_layer) {
+ transport->io_layers[layer] = &ssl_closed_layer;
+ } else {
+ transport->io_layers[layer] = &ssl_input_closed_layer;
+ }
}
_log(ssl, "process_input_ssl() returning %d", (int) consumed);
return consumed;
}
-static ssize_t process_output_ssl( pn_io_layer_t *io_layer, char *buffer, size_t max_len)
+static ssize_t process_output_ssl( pn_transport_t *transport, unsigned int layer, char *buffer, size_t max_len)
{
- pn_ssl_t *ssl = (pn_ssl_t *)io_layer->context;
+ pn_ssl_t *ssl = transport->ssl;
if (!ssl) return PN_EOS;
if (ssl->ssl == NULL && init_ssl_socket(ssl)) return PN_EOS;
@@ -993,8 +1016,7 @@ static ssize_t process_output_ssl( pn_io_layer_t *io_layer, char *buffer, size_t
// first, get any pending application output, if possible
if (!ssl->app_output_closed && ssl->out_count < ssl->out_size) {
- pn_io_layer_t *io_next = ssl->io_layer->next;
- ssize_t app_bytes = io_next->process_output( io_next, &ssl->outbuf[ssl->out_count], ssl->out_size - ssl->out_count);
+ ssize_t app_bytes = transport->io_layers[layer+1]->process_output(transport, layer+1, &ssl->outbuf[ssl->out_count], ssl->out_size - ssl->out_count);
if (app_bytes > 0) {
ssl->out_count += app_bytes;
work_pending = true;
@@ -1086,7 +1108,11 @@ static ssize_t process_output_ssl( pn_io_layer_t *io_layer, char *buffer, size_t
//}
if (written == 0 && (SSL_get_shutdown(ssl->ssl) & SSL_SENT_SHUTDOWN) && BIO_pending(ssl->bio_net_io) == 0) {
written = ssl->app_output_closed ? ssl->app_output_closed : PN_EOS;
- ssl->io_layer->process_output = process_output_done;
+ if (transport->io_layers[layer]==&ssl_input_closed_layer) {
+ transport->io_layers[layer] = &ssl_closed_layer;
+ } else {
+ transport->io_layers[layer] = &ssl_output_closed_layer;
+ }
}
_log(ssl, "process_output_ssl() returning %d", (int) written);
return written;
@@ -1169,33 +1195,34 @@ static void release_ssl_socket( pn_ssl_t *ssl )
}
-static int setup_cleartext_connection( pn_ssl_t *ssl )
+static int setup_cleartext_connection(pn_transport_t *transport, unsigned int layer)
{
- _log( ssl, "Cleartext connection detected.");
- ssl->io_layer->process_input = pn_io_layer_input_passthru;
- ssl->io_layer->process_output = pn_io_layer_output_passthru;
+ transport->io_layers[layer] = &pni_passthru_layer;
return 0;
}
// until we determine if the client is using SSL or not:
-static ssize_t process_input_unknown(pn_io_layer_t *io_layer, const char *input_data, size_t len)
+static ssize_t process_input_unknown(pn_transport_t *transport, unsigned int layer, const char *input_data, size_t len)
{
- pn_ssl_t *ssl = (pn_ssl_t *)io_layer->context;
+ pn_ssl_t *ssl = transport->ssl;
switch (check_for_ssl_connection( input_data, len )) {
case SSL_CONNECTION:
- setup_ssl_connection( ssl );
- return ssl->io_layer->process_input( ssl->io_layer, input_data, len );
+ _log( ssl, "SSL connection detected.\n");
+ setup_ssl_connection(transport, layer);
+ break;
case CLEAR_CONNECTION:
- setup_cleartext_connection( ssl );
- return ssl->io_layer->process_input( ssl->io_layer, input_data, len );
+ _log( ssl, "Cleartext connection detected.\n");
+ setup_cleartext_connection(transport, layer);
+ break;
default:
return 0;
}
+ return transport->io_layers[layer]->process_input(transport, layer, input_data, len );
}
-static ssize_t process_output_unknown(pn_io_layer_t *io_layer, char *input_data, size_t len)
+static ssize_t process_output_unknown(pn_transport_t *transport, unsigned int layer, char *input_data, size_t len)
{
// do not do output until we know if SSL is used or not
return 0;
@@ -1307,20 +1334,20 @@ int pn_ssl_get_peer_hostname( pn_ssl_t *ssl, char *hostname, size_t *bufsize )
return 0;
}
-static ssize_t process_input_done(pn_io_layer_t *io_layer, const char *input_data, size_t len)
+static ssize_t process_input_done(pn_transport_t *transport, unsigned int layer, const char *input_data, size_t len)
{
return PN_EOS;
}
-static ssize_t process_output_done(pn_io_layer_t *io_layer, char *input_data, size_t len)
+static ssize_t process_output_done(pn_transport_t *transport, unsigned int layer, char *input_data, size_t len)
{
return PN_EOS;
}
// return # output bytes sitting in this layer
-static size_t buffered_output(pn_io_layer_t *io_layer)
+static size_t buffered_output(pn_transport_t *transport)
{
size_t count = 0;
- pn_ssl_t *ssl = (pn_ssl_t *)io_layer->context;
+ pn_ssl_t *ssl = transport->ssl;
if (ssl) {
count += ssl->out_count;
if (ssl->bio_net_io) { // pick up any bytes waiting for network io
@@ -1329,17 +1356,3 @@ static size_t buffered_output(pn_io_layer_t *io_layer)
}
return count;
}
-
-// return # input bytes sitting in this layer
-static size_t buffered_input( pn_io_layer_t *io_layer )
-{
- size_t count = 0;
- pn_ssl_t *ssl = (pn_ssl_t *)io_layer->context;
- if (ssl) {
- count += ssl->in_count;
- if (ssl->bio_ssl) { // pick up any bytes waiting to be read
- count += BIO_ctrl_pending(ssl->bio_ssl);
- }
- }
- return count;
-}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/c814d5c3/proton-c/src/transport/transport.c
----------------------------------------------------------------------
diff --git a/proton-c/src/transport/transport.c b/proton-c/src/transport/transport.c
index 601d6a2..d93e16f 100644
--- a/proton-c/src/transport/transport.c
+++ b/proton-c/src/transport/transport.c
@@ -92,17 +92,55 @@ void pn_delivery_map_clear(pn_delivery_map_t *dm)
dm->next = 0;
}
-static ssize_t pn_input_read_amqp_header(pn_io_layer_t *io_layer, const char *bytes, size_t available);
-static ssize_t pn_input_read_amqp(pn_io_layer_t *io_layer, const char *bytes, size_t available);
-static ssize_t pn_output_write_amqp_header(pn_io_layer_t *io_layer, char *bytes, size_t available);
-static ssize_t pn_output_write_amqp(pn_io_layer_t *io_layer, char *bytes, size_t available);
-static pn_timestamp_t pn_tick_amqp(pn_io_layer_t *io_layer, pn_timestamp_t now);
+static ssize_t pn_io_layer_input_passthru(pn_transport_t *, unsigned int, const char *, size_t );
+static ssize_t pn_io_layer_output_passthru(pn_transport_t *, unsigned int, char *, size_t );
+
+static ssize_t pn_input_read_amqp_header(pn_transport_t *transport, unsigned int layer, const char *bytes, size_t available);
+static ssize_t pn_input_read_amqp(pn_transport_t *transport, unsigned int layer, const char *bytes, size_t available);
+static ssize_t pn_output_write_amqp_header(pn_transport_t *transport, unsigned int layer, char *bytes, size_t available);
+static ssize_t pn_output_write_amqp(pn_transport_t *transport, unsigned int layer, char *bytes, size_t available);
+static pn_timestamp_t pn_tick_amqp(pn_transport_t *transport, unsigned int layer, pn_timestamp_t now);
static void pni_default_tracer(pn_transport_t *transport, const char *message)
{
fprintf(stderr, "[%p]:%s\n", (void *) transport, message);
}
+const pn_io_layer_t pni_passthru_layer = {
+ pn_io_layer_input_passthru,
+ pn_io_layer_output_passthru,
+ pn_io_layer_tick_passthru,
+ NULL
+};
+
+const pn_io_layer_t amqp_header_layer = {
+ pn_input_read_amqp_header,
+ pn_output_write_amqp_header,
+ pn_tick_amqp,
+ NULL
+};
+
+const pn_io_layer_t amqp_write_header_layer = {
+ pn_input_read_amqp,
+ pn_output_write_amqp_header,
+ pn_tick_amqp,
+ NULL
+};
+
+const pn_io_layer_t amqp_read_header_layer = {
+ pn_input_read_amqp_header,
+ pn_output_write_amqp,
+ pn_tick_amqp,
+ NULL
+};
+
+const pn_io_layer_t amqp_layer = {
+ pn_input_read_amqp,
+ pn_output_write_amqp,
+ pn_tick_amqp,
+ NULL
+};
+
static void pn_transport_initialize(void *object)
{
pn_transport_t *transport = (pn_transport_t *)object;
@@ -112,33 +150,16 @@ static void pn_transport_initialize(void *object)
transport->input_buf = NULL;
transport->input_size = PN_DEFAULT_MAX_FRAME_SIZE ? PN_DEFAULT_MAX_FRAME_SIZE : 16 * 1024;
transport->tracer = pni_default_tracer;
- transport->header_count = 0;
transport->sasl = NULL;
transport->ssl = NULL;
transport->scratch = pn_string(NULL);
transport->disp = pn_dispatcher(0, transport);
transport->connection = NULL;
- pn_io_layer_t *io_layer = transport->io_layers;
- while (io_layer != &transport->io_layers[PN_IO_AMQP]) {
- io_layer->context = NULL;
- io_layer->next = io_layer + 1;
- io_layer->process_input = pn_io_layer_input_passthru;
- io_layer->process_output = pn_io_layer_output_passthru;
- io_layer->process_tick = pn_io_layer_tick_passthru;
- io_layer->buffered_output = NULL;
- io_layer->buffered_input = NULL;
- ++io_layer;
- }
-
- pn_io_layer_t *amqp = &transport->io_layers[PN_IO_AMQP];
- amqp->context = transport;
- amqp->process_input = pn_input_read_amqp_header;
- amqp->process_output = pn_output_write_amqp_header;
- amqp->process_tick = pn_io_layer_tick_passthru;
- amqp->buffered_output = NULL;
- amqp->buffered_input = NULL;
- amqp->next = NULL;
+ for (int layer=0; layer<PN_IO_LAYER_CT; ++layer) {
+ transport->io_layers[layer] = &pni_passthru_layer;
+ }
+ transport->io_layers[PN_IO_AMQP] = &amqp_header_layer;
transport->open_sent = false;
transport->open_rcvd = false;
@@ -550,8 +571,6 @@ int pn_do_open(pn_dispatcher_t *disp)
} else {
transport->disp->halt = true;
}
- if (transport->remote_idle_timeout)
- transport->io_layers[PN_IO_AMQP].process_tick = pn_tick_amqp; // enable timeouts
transport->open_rcvd = true;
return 0;
}
@@ -1072,14 +1091,14 @@ ssize_t pn_transport_input(pn_transport_t *transport, const char *bytes, size_t
// process pending input until none remaining or EOS
static ssize_t transport_consume(pn_transport_t *transport)
{
- pn_io_layer_t *io_layer = transport->io_layers;
size_t consumed = 0;
while (transport->input_pending || transport->tail_closed) {
ssize_t n;
- n = io_layer->process_input( io_layer,
- transport->input_buf + consumed,
- transport->input_pending );
+ n = transport->io_layers[PN_IO_SSL]->
+ process_input( transport, PN_IO_SSL,
+ transport->input_buf + consumed,
+ transport->input_pending );
if (n > 0) {
consumed += n;
transport->input_pending -= n;
@@ -1101,44 +1120,34 @@ static ssize_t transport_consume(pn_transport_t *transport)
return consumed;
}
-static ssize_t pn_input_read_header(pn_transport_t *transport, const char *bytes, size_t available,
- const char *header, size_t size, const char *protocol,
- ssize_t (*next)(pn_io_layer_t *, const char *, size_t))
+#define AMQP_HEADER ("AMQP\x00\x01\x00\x00")
+
+static ssize_t pn_input_read_amqp_header(pn_transport_t* transport, unsigned int layer, const char* bytes, size_t available)
{
- const char *point = header + transport->header_count;
- int delta = pn_min(available, size - transport->header_count);
- if (!available || memcmp(bytes, point, delta)) {
+ unsigned readable = pn_min(8, available);
+ bool eos = pn_transport_capacity(transport)==PN_EOS;
+ if (memcmp(bytes, AMQP_HEADER, readable) || (readable<8 && eos) ) {
char quoted[1024];
pn_quote_data(quoted, 1024, bytes, available);
pn_do_error(transport, "amqp:connection:framing-error",
- "%s header mismatch: '%s'%s", protocol, quoted,
- available ? "" : " (connection aborted)");
+ "%s header mismatch: '%s'%s", "AMQP", quoted,
+ !eos ? "" : " (connection aborted)");
return PN_EOS;
- } else {
- transport->header_count += delta;
- if (transport->header_count == size) {
- transport->header_count = 0;
- transport->io_layers[PN_IO_AMQP].process_input = next;
-
- if (transport->disp->trace & PN_TRACE_FRM)
- pn_transport_logf(transport, " <- %s", protocol);
+ } else if (readable==8) {
+ if (transport->io_layers[layer] == &amqp_read_header_layer) {
+ transport->io_layers[layer] = &amqp_layer;
+ } else {
+ transport->io_layers[layer] = &amqp_write_header_layer;
}
- return delta;
+ if (transport->disp->trace & PN_TRACE_FRM)
+ pn_transport_logf(transport, " <- %s", "AMQP");
+ return 8;
}
+ return 0;
}
-#define AMQP_HEADER ("AMQP\x00\x01\x00\x00")
-
-static ssize_t pn_input_read_amqp_header(pn_io_layer_t *io_layer, const char *bytes, size_t available)
-{
- pn_transport_t *transport = (pn_transport_t *)io_layer->context;
- return pn_input_read_header(transport, bytes, available, AMQP_HEADER, 8,
- "AMQP", pn_input_read_amqp);
-}
-
-static ssize_t pn_input_read_amqp(pn_io_layer_t *io_layer, const char *bytes, size_t available)
+static ssize_t pn_input_read_amqp(pn_transport_t* transport, unsigned int layer, const char* bytes, size_t available)
{
- pn_transport_t *transport = (pn_transport_t *)io_layer->context;
if (transport->close_rcvd) {
if (available > 0) {
pn_do_error(transport, "amqp:connection:framing-error", "data after close");
@@ -1164,10 +1173,9 @@ static ssize_t pn_input_read_amqp(pn_io_layer_t *io_layer, const char *bytes, si
}
/* process AMQP related timer events */
-static pn_timestamp_t pn_tick_amqp(pn_io_layer_t *io_layer, pn_timestamp_t now)
+static pn_timestamp_t pn_tick_amqp(pn_transport_t* transport, unsigned int layer, pn_timestamp_t now)
{
pn_timestamp_t timeout = 0;
- pn_transport_t *transport = (pn_transport_t *)io_layer->context;
if (transport->local_idle_timeout) {
if (transport->dead_remote_deadline == 0 ||
@@ -1827,30 +1835,22 @@ int pn_process(pn_transport_t *transport)
return 0;
}
-static ssize_t pn_output_write_header(pn_transport_t *transport,
- char *bytes, size_t size,
- const char *header, size_t hdrsize,
- const char *protocol,
- ssize_t (*next)(pn_io_layer_t *, char *, size_t))
+static ssize_t pn_output_write_amqp_header(pn_transport_t* transport, unsigned int layer, char* bytes, size_t available)
{
if (transport->disp->trace & PN_TRACE_FRM)
- pn_transport_logf(transport, " -> %s", protocol);
- assert(size >= hdrsize);
- memmove(bytes, header, hdrsize);
- transport->io_layers[PN_IO_AMQP].process_output = next;
- return hdrsize;
-}
-
-static ssize_t pn_output_write_amqp_header(pn_io_layer_t *io_layer, char *bytes, size_t size)
-{
- pn_transport_t *transport = (pn_transport_t *)io_layer->context;
- return pn_output_write_header(transport, bytes, size, AMQP_HEADER, 8, "AMQP",
- pn_output_write_amqp);
+ pn_transport_logf(transport, " -> %s", "AMQP");
+ assert(available >= 8);
+ memmove(bytes, AMQP_HEADER, 8);
+ if (transport->io_layers[layer] == &amqp_write_header_layer) {
+ transport->io_layers[layer] = &amqp_layer;
+ } else {
+ transport->io_layers[layer] = &amqp_read_header_layer;
+ }
+ return 8;
}
-static ssize_t pn_output_write_amqp(pn_io_layer_t *io_layer, char *bytes, size_t size)
+static ssize_t pn_output_write_amqp(pn_transport_t* transport, unsigned int layer, char* bytes, size_t available)
{
- pn_transport_t *transport = (pn_transport_t *)io_layer->context;
if (transport->connection && !transport->done_processing) {
int err = pn_process(transport);
if (err) {
@@ -1866,7 +1866,7 @@ static ssize_t pn_output_write_amqp(pn_io_layer_t *io_layer, char *bytes, size_t
return PN_EOS;
}
- return pn_dispatcher_output(transport->disp, bytes, size);
+ return pn_dispatcher_output(transport->disp, bytes, available);
}
static void pni_close_head(pn_transport_t *transport)
@@ -1884,7 +1884,6 @@ static ssize_t transport_produce(pn_transport_t *transport)
{
if (transport->head_closed) return PN_EOS;
- pn_io_layer_t *io_layer = transport->io_layers;
ssize_t space = transport->output_size - transport->output_pending;
if (space <= 0) { // can we expand the buffer?
@@ -1905,9 +1904,10 @@ static ssize_t transport_produce(pn_transport_t *transport)
while (space > 0) {
ssize_t n;
- n = io_layer->process_output( io_layer,
- &transport->output_buf[transport->output_pending],
- space );
+ n = transport->io_layers[PN_IO_SSL]->
+ process_output( transport, PN_IO_SSL,
+ &transport->output_buf[transport->output_pending],
+ space );
if (n > 0) {
space -= n;
transport->output_pending += n;
@@ -2043,7 +2043,6 @@ pn_millis_t pn_transport_get_idle_timeout(pn_transport_t *transport)
void pn_transport_set_idle_timeout(pn_transport_t *transport, pn_millis_t timeout)
{
transport->local_idle_timeout = timeout;
- transport->io_layers[PN_IO_AMQP].process_tick = pn_tick_amqp;
}
pn_millis_t pn_transport_get_remote_idle_timeout(pn_transport_t *transport)
@@ -2053,8 +2052,7 @@ pn_millis_t pn_transport_get_remote_idle_timeout(pn_transport_t *transport)
pn_timestamp_t pn_transport_tick(pn_transport_t *transport, pn_timestamp_t now)
{
- pn_io_layer_t *io_layer = transport->io_layers;
- return io_layer->process_tick( io_layer, now );
+ return transport->io_layers[PN_IO_SSL]->process_tick(transport, PN_IO_SSL, now);
}
uint64_t pn_transport_get_frames_output(const pn_transport_t *transport)
@@ -2072,29 +2070,26 @@ uint64_t pn_transport_get_frames_input(const pn_transport_t *transport)
}
/** Pass through input handler */
-ssize_t pn_io_layer_input_passthru(pn_io_layer_t *io_layer, const char *data, size_t available)
+ssize_t pn_io_layer_input_passthru(pn_transport_t *transport, unsigned int layer, const char *data, size_t available)
{
- pn_io_layer_t *next = io_layer->next;
- if (next)
- return next->process_input( next, data, available );
+ if (layer+1<PN_IO_LAYER_CT)
+ return transport->io_layers[layer+1]->process_input(transport, layer+1, data, available);
return PN_EOS;
}
/** Pass through output handler */
-ssize_t pn_io_layer_output_passthru(pn_io_layer_t *io_layer, char *bytes, size_t size)
+ssize_t pn_io_layer_output_passthru(pn_transport_t *transport, unsigned int layer, char *data, size_t available)
{
- pn_io_layer_t *next = io_layer->next;
- if (next)
- return next->process_output( next, bytes, size );
+ if (layer+1<PN_IO_LAYER_CT)
+ return transport->io_layers[layer+1]->process_output(transport, layer+1, data, available);
return PN_EOS;
}
/** Pass through tick handler */
-pn_timestamp_t pn_io_layer_tick_passthru(pn_io_layer_t *io_layer, pn_timestamp_t now)
+pn_timestamp_t pn_io_layer_tick_passthru(pn_transport_t *transport, unsigned int layer, pn_timestamp_t now)
{
- pn_io_layer_t *next = io_layer->next;
- if (next)
- return next->process_tick( next, now );
+ if (layer+1<PN_IO_LAYER_CT)
+ return transport->io_layers[layer+1]->process_tick(transport, layer+1, now);
return 0;
}
@@ -2253,11 +2248,10 @@ bool pn_transport_quiesced(pn_transport_t *transport)
if (pending < 0) return true; // output done
else if (pending > 0) return false;
// no pending at transport, but check if data is buffered in I/O layers
- pn_io_layer_t *io_layer = transport->io_layers;
- while (io_layer != &transport->io_layers[PN_IO_LAYER_CT]) {
- if (io_layer->buffered_output && io_layer->buffered_output( io_layer ))
+ for (int layer = 0; layer<PN_IO_LAYER_CT; ++layer) {
+ if (transport->io_layers[layer]->buffered_output &&
+ transport->io_layers[layer]->buffered_output( transport ))
return false;
- ++io_layer;
}
return true;
}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/c814d5c3/proton-c/src/windows/schannel.c
----------------------------------------------------------------------
diff --git a/proton-c/src/windows/schannel.c b/proton-c/src/windows/schannel.c
index de6e117..397fa21 100644
--- a/proton-c/src/windows/schannel.c
+++ b/proton-c/src/windows/schannel.c
@@ -82,7 +82,6 @@ typedef enum { CREATED, CLIENT_HELLO, NEGOTIATING,
struct pn_ssl_t {
pn_transport_t *transport;
- pn_io_layer_t *io_layer;
pn_ssl_domain_t *domain;
const char *session_id;
const char *peer_hostname;
@@ -136,17 +135,16 @@ struct pn_ssl_session_t {
};
-static ssize_t process_input_ssl( pn_io_layer_t *io_layer, const char *input_data, size_t len);
-static ssize_t process_output_ssl( pn_io_layer_t *io_layer, char *input_data, size_t len);
-static ssize_t process_input_unknown( pn_io_layer_t *io_layer, const char *input_data, size_t len);
-static ssize_t process_output_unknown( pn_io_layer_t *io_layer, char *input_data, size_t len);
-static ssize_t process_input_done(pn_io_layer_t *io_layer, const char *input_data, size_t len);
-static ssize_t process_output_done(pn_io_layer_t *io_layer, char *input_data, size_t len);
+static ssize_t process_input_ssl( pn_transport_t *transport, unsigned int layer, const char *input_data, size_t len);
+static ssize_t process_output_ssl( pn_transport_t *transport, unsigned int layer, char *input_data, size_t len);
+static ssize_t process_input_unknown( pn_transport_t *transport, unsigned int layer, const char *input_data, size_t len);
+static ssize_t process_output_unknown( pn_transport_t *transport, unsigned int layer, char *input_data, size_t len);
+static ssize_t process_input_done(pn_transport_t *transport, unsigned int layer, const char *input_data, size_t len);
+static ssize_t process_output_done(pn_transport_t *transport, unsigned int layer, char *input_data, size_t len);
static connection_mode_t check_for_ssl_connection( const char *data, size_t len );
static pn_ssl_session_t *ssn_cache_find( pn_ssl_domain_t *, const char * );
static void ssl_session_free( pn_ssl_session_t *);
-static size_t buffered_output( pn_io_layer_t *io_layer );
-static size_t buffered_input( pn_io_layer_t *io_layer );
+static size_t buffered_output( pn_transport_t *transport );
static void start_ssl_shutdown(pn_ssl_t *ssl);
static void rewind_sc_inbuf(pn_ssl_t *ssl);
static bool grow_inbuf2(pn_ssl_t *ssl, size_t minimum_size);
@@ -350,6 +348,41 @@ int pn_ssl_domain_set_peer_authentication(pn_ssl_domain_t *domain,
return 0;
}
+const pn_io_layer_t unknown_layer = {
+ process_input_unknown,
+ process_output_unknown,
+ pn_io_layer_tick_passthru,
+ NULL
+};
+
+const pn_io_layer_t ssl_layer = {
+ process_input_ssl,
+ process_output_ssl,
+ pn_io_layer_tick_passthru,
+ buffered_output
+};
+
+const pn_io_layer_t ssl_input_closed_layer = {
+ process_input_done,
+ process_output_ssl,
+ pn_io_layer_tick_passthru,
+ buffered_output
+};
+
+const pn_io_layer_t ssl_output_closed_layer = {
+ process_input_ssl,
+ process_output_done,
+ pn_io_layer_tick_passthru,
+ buffered_output
+};
+
+const pn_io_layer_t ssl_closed_layer = {
+ process_input_done,
+ process_output_done,
+ pn_io_layer_tick_passthru,
+ buffered_output
+};
+
int pn_ssl_init(pn_ssl_t *ssl, pn_ssl_domain_t *domain, const char *session_id)
{
if (!ssl || !domain || ssl->domain) return -1;
@@ -358,13 +391,11 @@ int pn_ssl_init(pn_ssl_t *ssl, pn_ssl_domain_t *domain, const char *session_id)
ssl->domain = domain;
domain->ref_count++;
if (domain->allow_unsecured) {
- ssl->io_layer->process_input = process_input_unknown;
- ssl->io_layer->process_output = process_output_unknown;
- } else {
- ssl->io_layer->process_input = process_input_ssl;
- ssl->io_layer->process_output = process_output_ssl;
+ ssl->transport->io_layers[PN_IO_SSL] = &unknown_layer;
+ }
+ else {
+ ssl->transport->io_layers[PN_IO_SSL] = &ssl_layer;
}
-
if (session_id && domain->mode == PN_SSL_MODE_CLIENT)
ssl->session_id = pn_strdup(session_id);
@@ -460,13 +491,7 @@ pn_ssl_t *pn_ssl(pn_transport_t *transport)
ssl->transport = transport;
transport->ssl = ssl;
- ssl->io_layer = &transport->io_layers[PN_IO_SSL];
- ssl->io_layer->context = ssl;
- ssl->io_layer->process_input = pn_io_layer_input_passthru;
- ssl->io_layer->process_output = pn_io_layer_output_passthru;
- ssl->io_layer->process_tick = pn_io_layer_tick_passthru;
- ssl->io_layer->buffered_output = buffered_output;
- ssl->io_layer->buffered_input = buffered_input;
+ transport->io_layers[PN_IO_SSL] = &pni_passthru_layer;
ssl->trace = (transport->disp) ? transport->disp->trace : PN_TRACE_OFF;
SecInvalidateHandle(&ssl->cred_handle);
@@ -849,11 +874,9 @@ static void start_ssl_shutdown(pn_ssl_t *ssl)
ssl_handshake(ssl);
}
-static int setup_ssl_connection(pn_ssl_t *ssl)
+static int setup_ssl_connection(pn_transport_t *transport, unsigned int layer)
{
- ssl_log( ssl, "SSL connection detected.\n");
- ssl->io_layer->process_input = process_input_ssl;
- ssl->io_layer->process_output = process_output_ssl;
+ transport->io_layers[layer] = &ssl_layer;
return 0;
}
@@ -976,14 +999,14 @@ static void app_inbytes_advance(pn_ssl_t *ssl, size_t consumed)
app_inbytes_progress(ssl, 0);
}
-static void read_closed(pn_ssl_t *ssl, ssize_t error)
+static void read_closed(pn_transport_t *transport, unsigned int layer, ssize_t error)
{
+ pn_ssl_t *ssl = transport->ssl;
if (ssl->app_input_closed)
return;
if (ssl->state == RUNNING && !error) {
- pn_io_layer_t *io_next = ssl->io_layer->next;
// Signal end of stream
- ssl->app_input_closed = io_next->process_input(io_next, ssl->app_inbytes.start, 0);
+ ssl->app_input_closed = transport->io_layers[layer+1]->process_input(transport, layer+1, ssl->app_inbytes.start, 0);
}
if (!ssl->app_input_closed)
ssl->app_input_closed = error ? error : PN_ERR;
@@ -1000,9 +1023,9 @@ static void read_closed(pn_ssl_t *ssl, ssize_t error)
// Read up to "available" bytes from the network, decrypt it and pass plaintext to application.
-static ssize_t process_input_ssl(pn_io_layer_t *io_layer, const char *input_data, size_t available)
+static ssize_t process_input_ssl(pn_transport_t *transport, unsigned int layer, const char *input_data, size_t available)
{
- pn_ssl_t *ssl = (pn_ssl_t *)io_layer->context;
+ pn_ssl_t *ssl = transport->ssl;
ssl_log( ssl, "process_input_ssl( data size=%d )\n",available );
ssize_t consumed = 0;
ssize_t forwarded = 0;
@@ -1010,14 +1033,14 @@ static ssize_t process_input_ssl(pn_io_layer_t *io_layer, const char *input_data
if (available == 0) {
// No more inbound network data
- read_closed(ssl,0);
+ read_closed(transport, layer, 0);
return 0;
}
do {
if (ssl->sc_input_shutdown) {
// TLS protocol shutdown detected on input
- read_closed(ssl,0);
+ read_closed(transport, layer, 0);
return consumed;
}
@@ -1097,8 +1120,7 @@ static ssize_t process_input_ssl(pn_io_layer_t *io_layer, const char *input_data
// present app_inbytes to io_next only if it has new content
while (ssl->app_inbytes.size > 0) {
if (!ssl->app_input_closed) {
- pn_io_layer_t *io_next = ssl->io_layer->next;
- ssize_t count = io_next->process_input(io_next, ssl->app_inbytes.start, ssl->app_inbytes.size);
+ ssize_t count = transport->io_layers[layer+1]->process_input(transport, layer+1, ssl->app_inbytes.start, ssl->app_inbytes.size);
if (count > 0) {
forwarded += count;
// advance() can increase app_inbytes.size if double buffered
@@ -1115,7 +1137,7 @@ static ssize_t process_input_ssl(pn_io_layer_t *io_layer, const char *input_data
ssl_log(ssl, "Application layer closed its input, error=%d (discarding %d bytes)\n",
(int) count, (int)ssl->app_inbytes.size);
app_inbytes_advance(ssl, ssl->app_inbytes.size); // discard
- read_closed(ssl, count);
+ read_closed(transport, layer, count);
}
} else {
ssl_log(ssl, "Input closed discard %d bytes\n",
@@ -1128,15 +1150,19 @@ static ssize_t process_input_ssl(pn_io_layer_t *io_layer, const char *input_data
if (ssl->app_input_closed && ssl->state >= SHUTTING_DOWN) {
consumed = ssl->app_input_closed;
- ssl->io_layer->process_input = process_input_done;
+ if (transport->io_layers[layer]==&ssl_output_closed_layer) {
+ transport->io_layers[layer] = &ssl_closed_layer;
+ } else {
+ transport->io_layers[layer] = &ssl_input_closed_layer;
+ }
}
ssl_log(ssl, "process_input_ssl() returning %d, forwarded %d\n", (int) consumed, (int) forwarded);
return consumed;
}
-static ssize_t process_output_ssl( pn_io_layer_t *io_layer, char *buffer, size_t max_len)
+static ssize_t process_output_ssl( pn_transport_t *transport, unsigned int layer, char *buffer, size_t max_len)
{
- pn_ssl_t *ssl = (pn_ssl_t *)io_layer->context;
+ pn_ssl_t *ssl = transport->ssl;
if (!ssl) return PN_EOS;
ssl_log( ssl, "process_output_ssl( max_len=%d )\n",max_len );
@@ -1173,8 +1199,7 @@ static ssize_t process_output_ssl( pn_io_layer_t *io_layer, char *buffer, size_t
size_t remaining = ssl->max_data_size;
ssize_t app_bytes;
do {
- pn_io_layer_t *io_next = ssl->io_layer->next;
- app_bytes = io_next->process_output(io_next, app_outp, remaining);
+ app_bytes = transport->io_layers[layer+1]->process_output(transport, layer+1, app_outp, remaining);
if (app_bytes > 0) {
app_outp += app_bytes;
remaining -= app_bytes;
@@ -1212,40 +1237,45 @@ static ssize_t process_output_ssl( pn_io_layer_t *io_layer, char *buffer, size_t
if (written == 0 && ssl->state == SSL_CLOSED) {
written = ssl->app_output_closed ? ssl->app_output_closed : PN_EOS;
- ssl->io_layer->process_output = process_output_done;
+ if (transport->io_layers[layer]==&ssl_input_closed_layer) {
+ transport->io_layers[layer] = &ssl_closed_layer;
+ } else {
+ transport->io_layers[layer] = &ssl_output_closed_layer;
+ }
}
ssl_log(ssl, "process_output_ssl() returning %d\n", (int) written);
return written;
}
-static int setup_cleartext_connection( pn_ssl_t *ssl )
+static int setup_cleartext_connection(pn_transport_t *transport, unsigned int layer)
{
- ssl_log( ssl, "Cleartext connection detected.\n");
- ssl->io_layer->process_input = pn_io_layer_input_passthru;
- ssl->io_layer->process_output = pn_io_layer_output_passthru;
+ transport->io_layers[layer] = &pni_passthru_layer;
return 0;
}
// until we determine if the client is using SSL or not:
-static ssize_t process_input_unknown(pn_io_layer_t *io_layer, const char *input_data, size_t len)
+static ssize_t process_input_unknown(pn_transport_t *transport, unsigned int layer, const char *input_data, size_t len)
{
- pn_ssl_t *ssl = (pn_ssl_t *)io_layer->context;
+ pn_ssl_t *ssl = transport->ssl;
switch (check_for_ssl_connection( input_data, len )) {
case SSL_CONNECTION:
- setup_ssl_connection( ssl );
- return ssl->io_layer->process_input( ssl->io_layer, input_data, len );
+ ssl_log(ssl, "SSL connection detected.\n");
+ setup_ssl_connection(transport, layer);
+ break;
case CLEAR_CONNECTION:
- setup_cleartext_connection( ssl );
- return ssl->io_layer->process_input( ssl->io_layer, input_data, len );
+ ssl_log(ssl, "Cleartext connection detected.\n");
+ setup_cleartext_connection(transport, layer);
+ break;
default:
return 0;
}
+ return transport->io_layers[layer]->process_input(transport, layer, input_data, len);
}
-static ssize_t process_output_unknown(pn_io_layer_t *io_layer, char *input_data, size_t len)
+static ssize_t process_output_unknown(pn_transport_t *transport, unsigned int layer, char *input_data, size_t len)
{
// do not do output until we know if SSL is used or not
return 0;
@@ -1304,21 +1334,21 @@ static connection_mode_t check_for_ssl_connection( const char *data, size_t len
return UNKNOWN_CONNECTION;
}
-static ssize_t process_input_done(pn_io_layer_t *io_layer, const char *input_data, size_t len)
+static ssize_t process_input_done(pn_transport_t *transport, unsigned int layer, const char *input_data, size_t len)
{
return PN_EOS;
}
-static ssize_t process_output_done(pn_io_layer_t *io_layer, char *input_data, size_t len)
+static ssize_t process_output_done(pn_transport_t *transport, unsigned int layer, char *input_data, size_t len)
{
return PN_EOS;
}
// return # output bytes sitting in this layer
-static size_t buffered_output(pn_io_layer_t *io_layer)
+static size_t buffered_output(pn_transport_t *transport)
{
size_t count = 0;
- pn_ssl_t *ssl = (pn_ssl_t *)io_layer->context;
+ pn_ssl_t *ssl = transport->ssl;
if (ssl) {
count += ssl->network_out_pending;
if (count == 0 && ssl->state == SHUTTING_DOWN && ssl->queued_shutdown)
@@ -1326,14 +1356,3 @@ static size_t buffered_output(pn_io_layer_t *io_layer)
}
return count;
}
-
-// return # input bytes sitting in this layer
-static size_t buffered_input( pn_io_layer_t *io_layer )
-{
- size_t count = 0;
- pn_ssl_t *ssl = (pn_ssl_t *)io_layer->context;
- if (ssl) {
- count += ssl->in_data_count;
- }
- return count;
-}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org