You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2014/11/14 12:03:48 UTC
[39/50] [abbrv] qpid-proton git commit: PROTON-740: fixed shutdown
and event related issues with idle timeout during sasl
PROTON-740: fixed shutdown and event related issues with idle timeout during sasl
Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/a5d65452
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/a5d65452
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/a5d65452
Branch: refs/heads/examples
Commit: a5d654521d47498355089cf93281028876244b3e
Parents: 8a042a2
Author: Rafael Schloming <rh...@alum.mit.edu>
Authored: Sat Nov 8 08:22:39 2014 -0500
Committer: Rafael Schloming <rh...@alum.mit.edu>
Committed: Sat Nov 8 08:23:14 2014 -0500
----------------------------------------------------------------------
proton-c/include/proton/transport.h | 13 +++
proton-c/src/engine/engine-internal.h | 6 +-
proton-c/src/sasl/sasl.c | 9 +-
proton-c/src/ssl/openssl.c | 133 ++++++++++++++---------------
proton-c/src/transport/transport.c | 94 +++++++++++---------
proton-c/src/windows/schannel.c | 1 -
tests/python/proton_tests/engine.py | 27 ++++++
7 files changed, 170 insertions(+), 113 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/a5d65452/proton-c/include/proton/transport.h
----------------------------------------------------------------------
diff --git a/proton-c/include/proton/transport.h b/proton-c/include/proton/transport.h
index 33bb3a5..2262e7c 100644
--- a/proton-c/include/proton/transport.h
+++ b/proton-c/include/proton/transport.h
@@ -209,6 +209,19 @@ PN_EXTERN void pn_transport_log(pn_transport_t *transport, const char *message);
*
* @param[in] transport a transport object
* @param[in] fmt the printf formatted message to be logged
+ * @param[in] ap a vector containing the format arguments
+ */
+PN_EXTERN void pn_transport_vlogf(pn_transport_t *transport, const char *fmt, va_list ap);
+
+/**
+ * Log a printf formatted message using a transport's logging
+ * mechanism.
+ *
+ * This can be useful in a debugging context as the log message will
+ * be prefixed with the transport's identifier.
+ *
+ * @param[in] transport a transport object
+ * @param[in] fmt the printf formatted message to be logged
*/
PN_EXTERN void pn_transport_logf(pn_transport_t *transport, const char *fmt, ...);
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/a5d65452/proton-c/src/engine/engine-internal.h
----------------------------------------------------------------------
diff --git a/proton-c/src/engine/engine-internal.h b/proton-c/src/engine/engine-internal.h
index 97f7ead..dd4c44e 100644
--- a/proton-c/src/engine/engine-internal.h
+++ b/proton-c/src/engine/engine-internal.h
@@ -176,8 +176,7 @@ struct pn_transport_t {
bool tail_closed; // input stream closed by driver
bool head_closed;
bool done_processing; // if true, don't call pn_process again
- bool posted_head_closed;
- bool posted_tail_closed;
+ bool posted_idle_timeout;
};
struct pn_connection_t {
@@ -319,7 +318,4 @@ int pn_do_error(pn_transport_t *transport, const char *condition, const char *fm
void pn_session_unbound(pn_session_t* ssn);
void pn_link_unbound(pn_link_t* link);
-void pni_close_tail(pn_transport_t *transport);
-
-
#endif /* engine-internal.h */
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/a5d65452/proton-c/src/sasl/sasl.c
----------------------------------------------------------------------
diff --git a/proton-c/src/sasl/sasl.c b/proton-c/src/sasl/sasl.c
index f926b1b..2cc77c2 100644
--- a/proton-c/src/sasl/sasl.c
+++ b/proton-c/src/sasl/sasl.c
@@ -467,7 +467,14 @@ static ssize_t pn_output_write_sasl_header(pn_io_layer_t *io_layer, char *bytes,
static ssize_t pn_output_write_sasl(pn_io_layer_t *io_layer, char *bytes, size_t size)
{
pn_sasl_t *sasl = (pn_sasl_t *)io_layer->context;
- ssize_t n = pn_sasl_output(sasl, bytes, size);
+ // this accounts for pn_do_error is invoked, e.g. by idle timeout
+ ssize_t n;
+ if (sasl->transport->close_sent) {
+ n = PN_EOS;
+ } else {
+ n = pn_sasl_output(sasl, bytes, size);
+ }
+
if (n == PN_EOS) {
sasl->io_layer->process_output = pn_io_layer_output_passthru;
pn_io_layer_t *io_next = sasl->io_layer->next;
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/a5d65452/proton-c/src/ssl/openssl.c
----------------------------------------------------------------------
diff --git a/proton-c/src/ssl/openssl.c b/proton-c/src/ssl/openssl.c
index c9536e2..ea2bd5b 100644
--- a/proton-c/src/ssl/openssl.c
+++ b/proton-c/src/ssl/openssl.c
@@ -149,11 +149,11 @@ static size_t buffered_output( pn_io_layer_t *io_layer );
static size_t buffered_input( pn_io_layer_t *io_layer );
// @todo: used to avoid littering the code with calls to printf...
-static void _log_error(const char *fmt, ...)
+static void _log_error(pn_ssl_t *ssl, const char *fmt, ...)
{
va_list ap;
va_start(ap, fmt);
- vfprintf(stderr, fmt, ap);
+ pn_transport_vlogf(ssl ? ssl->transport : NULL, fmt, ap);
va_end(ap);
}
@@ -163,27 +163,27 @@ static void _log(pn_ssl_t *ssl, const char *fmt, ...)
if (PN_TRACE_DRV & ssl->trace) {
va_list ap;
va_start(ap, fmt);
- vfprintf(stderr, fmt, ap);
+ pn_transport_vlogf(ssl->transport, fmt, ap);
va_end(ap);
}
}
// log an error and dump the SSL error stack
-static void _log_ssl_error( const char *fmt, ...)
+static void _log_ssl_error(pn_ssl_t *ssl, const char *fmt, ...)
{
char buf[128]; // see "man ERR_error_string_n()"
va_list ap;
if (fmt) {
va_start(ap, fmt);
- vfprintf(stderr, fmt, ap);
+ pn_transport_vlogf(ssl ? ssl->transport : NULL, fmt, ap);
va_end(ap);
}
unsigned long err = ERR_get_error();
while (err) {
ERR_error_string_n(err, buf, sizeof(buf));
- _log_error("%s\n", buf);
+ _log_error(ssl, "%s", buf);
err = ERR_get_error();
}
}
@@ -211,8 +211,7 @@ static int ssl_failed(pn_ssl_t *ssl)
if (ssl_err) {
ERR_error_string_n( ssl_err, buf, sizeof(buf) );
}
- _log_ssl_error(NULL); // spit out any remaining errors to the log file
- pni_close_tail(ssl->transport);
+ _log_ssl_error(ssl, NULL); // spit out any remaining errors to the log file
pn_do_error(ssl->transport, "amqp:connection:framing-error", "SSL Failure: %s", buf);
return PN_EOS;
}
@@ -284,23 +283,23 @@ static int verify_callback(int preverify_ok, X509_STORE_CTX *ctx)
X509 *cert = X509_STORE_CTX_get_current_cert(ctx);
SSL *ssn = (SSL *) X509_STORE_CTX_get_ex_data(ctx, SSL_get_ex_data_X509_STORE_CTX_idx());
if (!ssn) {
- _log_error("Error: unexpected error - SSL session info not available for peer verify!\n");
+ _log_error(NULL, "Error: unexpected error - SSL session info not available for peer verify!");
return 0; // fail connection
}
pn_ssl_t *ssl = (pn_ssl_t *)SSL_get_ex_data(ssn, ssl_ex_data_index);
if (!ssl) {
- _log_error("Error: unexpected error - SSL context info not available for peer verify!\n");
+ _log_error(NULL, "Error: unexpected error - SSL context info not available for peer verify!");
return 0; // fail connection
}
if (ssl->domain->verify_mode != PN_SSL_VERIFY_PEER_NAME) return preverify_ok;
if (!ssl->peer_hostname) {
- _log_error("Error: configuration error: PN_SSL_VERIFY_PEER_NAME configured, but no peer hostname set!\n");
+ _log_error(ssl, "Error: configuration error: PN_SSL_VERIFY_PEER_NAME configured, but no peer hostname set!");
return 0; // fail connection
}
- _log( ssl, "Checking identifying name in peer cert against '%s'\n", ssl->peer_hostname);
+ _log( ssl, "Checking identifying name in peer cert against '%s'", ssl->peer_hostname);
bool matched = false;
@@ -317,7 +316,7 @@ static int verify_callback(int preverify_ok, X509_STORE_CTX *ctx)
unsigned char *str;
int len = ASN1_STRING_to_UTF8( &str, asn1 );
if (len >= 0) {
- _log( ssl, "SubjectAltName (dns) from peer cert = '%.*s'\n", len, str );
+ _log( ssl, "SubjectAltName (dns) from peer cert = '%.*s'", len, str );
matched = match_dns_pattern( ssl->peer_hostname, (const char *)str, len );
OPENSSL_free( str );
}
@@ -337,7 +336,7 @@ static int verify_callback(int preverify_ok, X509_STORE_CTX *ctx)
unsigned char *str;
int len = ASN1_STRING_to_UTF8( &str, name_asn1);
if (len >= 0) {
- _log( ssl, "commonName from peer cert = '%.*s'\n", len, str );
+ _log( ssl, "commonName from peer cert = '%.*s'", len, str );
matched = match_dns_pattern( ssl->peer_hostname, (const char *)str, len );
OPENSSL_free(str);
}
@@ -345,14 +344,14 @@ static int verify_callback(int preverify_ok, X509_STORE_CTX *ctx)
}
if (!matched) {
- _log( ssl, "Error: no name matching %s found in peer cert - rejecting handshake.\n",
+ _log( ssl, "Error: no name matching %s found in peer cert - rejecting handshake.",
ssl->peer_hostname);
preverify_ok = 0;
#ifdef X509_V_ERR_APPLICATION_VERIFICATION
X509_STORE_CTX_set_error( ctx, X509_V_ERR_APPLICATION_VERIFICATION );
#endif
} else {
- _log( ssl, "Name from peer cert matched - peer is valid.\n" );
+ _log( ssl, "Name from peer cert matched - peer is valid." );
}
return preverify_ok;
}
@@ -459,7 +458,7 @@ pn_ssl_domain_t *pn_ssl_domain( pn_ssl_mode_t mode )
case PN_SSL_MODE_CLIENT:
domain->ctx = SSL_CTX_new(SSLv23_client_method()); // and TLSv1+
if (!domain->ctx) {
- _log_ssl_error( "Unable to initialize OpenSSL context.\n");
+ _log_ssl_error(NULL, "Unable to initialize OpenSSL context.");
free(domain);
return NULL;
}
@@ -468,14 +467,14 @@ pn_ssl_domain_t *pn_ssl_domain( pn_ssl_mode_t mode )
case PN_SSL_MODE_SERVER:
domain->ctx = SSL_CTX_new(SSLv23_server_method()); // and TLSv1+
if (!domain->ctx) {
- _log_ssl_error("Unable to initialize OpenSSL context.\n");
+ _log_ssl_error(NULL, "Unable to initialize OpenSSL context.");
free(domain);
return NULL;
}
break;
default:
- _log_error("Invalid value for pn_ssl_mode_t: %d\n", mode);
+ _log_error(NULL, "Invalid value for pn_ssl_mode_t: %d", mode);
free(domain);
return NULL;
}
@@ -488,7 +487,7 @@ pn_ssl_domain_t *pn_ssl_domain( pn_ssl_mode_t mode )
// by default, allow anonymous ciphers so certificates are not required 'out of the box'
if (!SSL_CTX_set_cipher_list( domain->ctx, CIPHERS_ANONYMOUS )) {
- _log_ssl_error("Failed to set cipher list to %s\n", CIPHERS_ANONYMOUS);
+ _log_ssl_error(NULL, "Failed to set cipher list to %s", CIPHERS_ANONYMOUS);
pn_ssl_domain_free(domain);
return NULL;
}
@@ -537,7 +536,7 @@ int pn_ssl_domain_set_credentials( pn_ssl_domain_t *domain,
if (!domain || !domain->ctx) return -1;
if (SSL_CTX_use_certificate_chain_file(domain->ctx, certificate_file) != 1) {
- _log_ssl_error( "SSL_CTX_use_certificate_chain_file( %s ) failed\n", certificate_file);
+ _log_ssl_error(NULL, "SSL_CTX_use_certificate_chain_file( %s ) failed", certificate_file);
return -3;
}
@@ -548,12 +547,12 @@ int pn_ssl_domain_set_credentials( pn_ssl_domain_t *domain,
}
if (SSL_CTX_use_PrivateKey_file(domain->ctx, private_key_file, SSL_FILETYPE_PEM) != 1) {
- _log_ssl_error( "SSL_CTX_use_PrivateKey_file( %s ) failed\n", private_key_file);
+ _log_ssl_error(NULL, "SSL_CTX_use_PrivateKey_file( %s ) failed", private_key_file);
return -4;
}
if (SSL_CTX_check_private_key(domain->ctx) != 1) {
- _log_ssl_error( "The key file %s is not consistent with the certificate %s\n",
+ _log_ssl_error(NULL, "The key file %s is not consistent with the certificate %s",
private_key_file, certificate_file);
return -5;
}
@@ -564,7 +563,7 @@ int pn_ssl_domain_set_credentials( pn_ssl_domain_t *domain,
// cipher was negotiated. TLSv1 will reject such a request. Hack: once a cert is
// configured, allow only authenticated ciphers.
if (!SSL_CTX_set_cipher_list( domain->ctx, CIPHERS_AUTHENTICATE )) {
- _log_ssl_error( "Failed to set cipher list to %s\n", CIPHERS_AUTHENTICATE);
+ _log_ssl_error(NULL, "Failed to set cipher list to %s", CIPHERS_AUTHENTICATE);
return -6;
}
@@ -581,7 +580,7 @@ int pn_ssl_domain_set_trusted_ca_db(pn_ssl_domain_t *domain,
// to SSL_CTX_load_verify_locations()
struct stat sbuf;
if (stat( certificate_db, &sbuf ) != 0) {
- _log_error("stat(%s) failed: %s\n", certificate_db, strerror(errno));
+ _log_error(NULL, "stat(%s) failed: %s", certificate_db, strerror(errno));
return -1;
}
@@ -596,7 +595,7 @@ int pn_ssl_domain_set_trusted_ca_db(pn_ssl_domain_t *domain,
}
if (SSL_CTX_load_verify_locations( domain->ctx, file, dir ) != 1) {
- _log_ssl_error( "SSL_CTX_load_verify_locations( %s ) failed\n", certificate_db);
+ _log_ssl_error(NULL, "SSL_CTX_load_verify_locations( %s ) failed", certificate_db);
return -1;
}
@@ -617,8 +616,8 @@ int pn_ssl_domain_set_peer_authentication(pn_ssl_domain_t *domain,
case PN_SSL_VERIFY_PEER_NAME:
if (!domain->has_ca_db) {
- _log_error("Error: cannot verify peer without a trusted CA configured.\n"
- " Use pn_ssl_domain_set_trusted_ca_db()\n");
+ _log_error(NULL, "Error: cannot verify peer without a trusted CA configured.\n"
+ " Use pn_ssl_domain_set_trusted_ca_db()");
return -1;
}
@@ -626,12 +625,12 @@ int pn_ssl_domain_set_peer_authentication(pn_ssl_domain_t *domain,
// openssl requires that server connections supply a list of trusted CAs which is
// sent to the client
if (!trusted_CAs) {
- _log_error("Error: a list of trusted CAs must be provided.\n");
+ _log_error(NULL, "Error: a list of trusted CAs must be provided.");
return -1;
}
if (!domain->has_certificate) {
- _log_error("Error: Server cannot verify peer without configuring a certificate.\n"
- " Use pn_ssl_domain_set_credentials()\n");
+ _log_error(NULL, "Error: Server cannot verify peer without configuring a certificate.\n"
+ " Use pn_ssl_domain_set_credentials()");
}
if (domain->trusted_CAs) free(domain->trusted_CAs);
@@ -641,7 +640,7 @@ int pn_ssl_domain_set_peer_authentication(pn_ssl_domain_t *domain,
if (cert_names != NULL)
SSL_CTX_set_client_CA_list(domain->ctx, cert_names);
else {
- _log_error("Error: Unable to process file of trusted CAs: %s\n", trusted_CAs);
+ _log_error(NULL, "Error: Unable to process file of trusted CAs: %s", trusted_CAs);
return -1;
}
}
@@ -658,7 +657,7 @@ int pn_ssl_domain_set_peer_authentication(pn_ssl_domain_t *domain,
break;
default:
- _log_error( "Invalid peer authentication mode given.\n" );
+ _log_error(NULL, "Invalid peer authentication mode given." );
return -1;
}
@@ -692,7 +691,7 @@ int pn_ssl_domain_allow_unsecured_client(pn_ssl_domain_t *domain)
{
if (!domain) return -1;
if (domain->mode != PN_SSL_MODE_SERVER) {
- _log_error("Cannot permit unsecured clients - not a server.\n");
+ _log_error(NULL, "Cannot permit unsecured clients - not a server.");
return -1;
}
domain->allow_unsecured = true;
@@ -734,7 +733,7 @@ bool pn_ssl_get_protocol_name(pn_ssl_t *ssl, char *buffer, size_t size )
void pn_ssl_free( pn_ssl_t *ssl)
{
if (!ssl) return;
- _log( ssl, "SSL socket freed.\n" );
+ _log( ssl, "SSL socket freed." );
release_ssl_socket( ssl );
if (ssl->domain) pn_ssl_domain_free(ssl->domain);
if (ssl->session_id) free((void *)ssl->session_id);
@@ -796,7 +795,7 @@ static int keyfile_pw_cb(char *buf, int size, int rwflag, void *userdata)
static int start_ssl_shutdown( pn_ssl_t *ssl )
{
if (!ssl->ssl_shutdown) {
- _log(ssl, "Shutting down SSL connection...\n");
+ _log(ssl, "Shutting down SSL connection...");
if (ssl->session_id) {
// save the negotiated credentials before we close the connection
pn_ssl_session_t *ssn = (pn_ssl_session_t *)calloc( 1, sizeof(pn_ssl_session_t));
@@ -804,7 +803,7 @@ static int start_ssl_shutdown( pn_ssl_t *ssl )
ssn->id = pn_strdup( ssl->session_id );
ssn->session = SSL_get1_session( ssl->ssl );
if (ssn->session) {
- _log( ssl, "Saving SSL session as %s\n", ssl->session_id );
+ _log( ssl, "Saving SSL session as %s", ssl->session_id );
LL_ADD( ssl->domain, ssn_cache, ssn );
} else {
ssl_session_free( ssn );
@@ -821,7 +820,7 @@ static int start_ssl_shutdown( pn_ssl_t *ssl )
static int setup_ssl_connection( pn_ssl_t *ssl )
{
- _log( ssl, "SSL connection detected.\n");
+ _log( ssl, "SSL connection detected.");
ssl->io_layer->process_input = process_input_ssl;
ssl->io_layer->process_output = process_output_ssl;
return 0;
@@ -837,7 +836,7 @@ static ssize_t process_input_ssl( pn_io_layer_t *io_layer, const char *input_dat
pn_ssl_t *ssl = (pn_ssl_t *)io_layer->context;
if (ssl->ssl == NULL && init_ssl_socket(ssl)) return PN_EOS;
- _log( ssl, "process_input_ssl( data size=%d )\n",available );
+ _log( ssl, "process_input_ssl( data size=%d )",available );
ssize_t consumed = 0;
bool work_pending;
@@ -856,12 +855,12 @@ static ssize_t process_input_ssl( pn_io_layer_t *io_layer, const char *input_dat
consumed += written;
ssl->read_blocked = false;
work_pending = (available > 0);
- _log( ssl, "Wrote %d bytes to BIO Layer, %d left over\n", written, available );
+ _log( ssl, "Wrote %d bytes to BIO Layer, %d left over", written, available );
}
} else if (shutdown_input) {
// lower layer (caller) has closed. Close the WRITE side of the BIO. This will cause
// an EOF to be passed to SSL once all pending inbound data has been consumed.
- _log( ssl, "Lower layer closed - shutting down BIO write side\n");
+ _log( ssl, "Lower layer closed - shutting down BIO write side");
(void)BIO_shutdown_wr( ssl->bio_net_io );
shutdown_input = false;
}
@@ -871,7 +870,7 @@ static ssize_t process_input_ssl( pn_io_layer_t *io_layer, const char *input_dat
if (!ssl->ssl_closed && ssl->in_count < ssl->in_size) {
int read = BIO_read( ssl->bio_ssl, &ssl->inbuf[ssl->in_count], ssl->in_size - ssl->in_count );
if (read > 0) {
- _log( ssl, "Read %d bytes from SSL socket for app\n", read );
+ _log( ssl, "Read %d bytes from SSL socket for app", read );
_log_clear_data( ssl, &ssl->inbuf[ssl->in_count], read );
ssl->in_count += read;
work_pending = true;
@@ -881,7 +880,7 @@ static ssize_t process_input_ssl( pn_io_layer_t *io_layer, const char *input_dat
switch (reason) {
case SSL_ERROR_ZERO_RETURN:
// SSL closed cleanly
- _log(ssl, "SSL connection has closed\n");
+ _log(ssl, "SSL connection has closed");
start_ssl_shutdown(ssl); // KAG: not sure - this may not be necessary
ssl->ssl_closed = true;
break;
@@ -892,11 +891,11 @@ static ssize_t process_input_ssl( pn_io_layer_t *io_layer, const char *input_dat
} else {
if (BIO_should_write( ssl->bio_ssl )) {
ssl->write_blocked = true;
- _log(ssl, "Detected write-blocked\n");
+ _log(ssl, "Detected write-blocked");
}
if (BIO_should_read( ssl->bio_ssl )) {
ssl->read_blocked = true;
- _log(ssl, "Detected read-blocked\n");
+ _log(ssl, "Detected read-blocked");
}
}
}
@@ -913,9 +912,9 @@ static ssize_t process_input_ssl( pn_io_layer_t *io_layer, const char *input_dat
if (ssl->in_count)
memmove( ssl->inbuf, ssl->inbuf + consumed, ssl->in_count );
work_pending = true;
- _log( ssl, "Application consumed %d bytes from peer\n", (int) consumed );
+ _log( ssl, "Application consumed %d bytes from peer", (int) consumed );
} else if (consumed < 0) {
- _log(ssl, "Application layer closed its input, error=%d (discarding %d bytes)\n",
+ _log(ssl, "Application layer closed its input, error=%d (discarding %d bytes)",
(int) consumed, (int)ssl->in_count);
ssl->in_count = 0; // discard any pending input
ssl->app_input_closed = consumed;
@@ -945,7 +944,7 @@ static ssize_t process_input_ssl( pn_io_layer_t *io_layer, const char *input_dat
// the application _must_ have enough data to process. If
// this is an oversized frame, the app _must_ handle it
// by returning an error code to SSL.
- _log_error("Error: application unable to consume input.\n");
+ _log_error(ssl, "Error: application unable to consume input.");
}
}
}
@@ -954,7 +953,7 @@ static ssize_t process_input_ssl( pn_io_layer_t *io_layer, const char *input_dat
} while (work_pending);
- //_log(ssl, "ssl_closed=%d in_count=%d app_input_closed=%d app_output_closed=%d\n",
+ //_log(ssl, "ssl_closed=%d in_count=%d app_input_closed=%d app_output_closed=%d",
// ssl->ssl_closed, ssl->in_count, ssl->app_input_closed, ssl->app_output_closed );
// PROTON-82: Instead, close the input side as soon as we've completed enough of the SSL
@@ -971,7 +970,7 @@ static ssize_t process_input_ssl( pn_io_layer_t *io_layer, const char *input_dat
consumed = ssl->app_input_closed;
ssl->io_layer->process_input = process_input_done;
}
- _log(ssl, "process_input_ssl() returning %d\n", (int) consumed);
+ _log(ssl, "process_input_ssl() returning %d", (int) consumed);
return consumed;
}
@@ -994,10 +993,10 @@ static ssize_t process_output_ssl( pn_io_layer_t *io_layer, char *buffer, size_t
if (app_bytes > 0) {
ssl->out_count += app_bytes;
work_pending = true;
- _log( ssl, "Gathered %d bytes from app to send to peer\n", app_bytes );
+ _log( ssl, "Gathered %d bytes from app to send to peer", app_bytes );
} else {
if (app_bytes < 0) {
- _log(ssl, "Application layer closed its output, error=%d (%d bytes pending send)\n",
+ _log(ssl, "Application layer closed its output, error=%d (%d bytes pending send)",
(int) app_bytes, (int) ssl->out_count);
ssl->app_output_closed = app_bytes;
}
@@ -1014,14 +1013,14 @@ static ssize_t process_output_ssl( pn_io_layer_t *io_layer, char *buffer, size_t
data += wrote;
ssl->out_count -= wrote;
work_pending = true;
- _log( ssl, "Wrote %d bytes from app to socket\n", wrote );
+ _log( ssl, "Wrote %d bytes from app to socket", wrote );
} else {
if (!BIO_should_retry(ssl->bio_ssl)) {
int reason = SSL_get_error( ssl->ssl, wrote );
switch (reason) {
case SSL_ERROR_ZERO_RETURN:
// SSL closed cleanly
- _log(ssl, "SSL connection has closed\n");
+ _log(ssl, "SSL connection has closed");
start_ssl_shutdown(ssl); // KAG: not sure - this may not be necessary
ssl->out_count = 0; // can no longer write to socket, so erase app output data
ssl->ssl_closed = true;
@@ -1033,11 +1032,11 @@ static ssize_t process_output_ssl( pn_io_layer_t *io_layer, char *buffer, size_t
} else {
if (BIO_should_read( ssl->bio_ssl )) {
ssl->read_blocked = true;
- _log(ssl, "Detected read-blocked\n");
+ _log(ssl, "Detected read-blocked");
}
if (BIO_should_write( ssl->bio_ssl )) {
ssl->write_blocked = true;
- _log(ssl, "Detected write-blocked\n");
+ _log(ssl, "Detected write-blocked");
}
}
}
@@ -1063,13 +1062,13 @@ static ssize_t process_output_ssl( pn_io_layer_t *io_layer, char *buffer, size_t
written += available;
ssl->write_blocked = false;
work_pending = work_pending || max_len > 0;
- _log( ssl, "Read %d bytes from BIO Layer\n", available );
+ _log( ssl, "Read %d bytes from BIO Layer", available );
}
}
} while (work_pending);
- //_log(ssl, "written=%d ssl_closed=%d in_count=%d app_input_closed=%d app_output_closed=%d bio_pend=%d\n",
+ //_log(ssl, "written=%d ssl_closed=%d in_count=%d app_input_closed=%d app_output_closed=%d bio_pend=%d",
// written, ssl->ssl_closed, ssl->in_count, ssl->app_input_closed, ssl->app_output_closed, BIO_pending(ssl->bio_net_io) );
// PROTON-82: close the output side as soon as we've sent the SSL close_notify.
@@ -1084,7 +1083,7 @@ static ssize_t process_output_ssl( pn_io_layer_t *io_layer, char *buffer, size_t
written = ssl->app_output_closed ? ssl->app_output_closed : PN_EOS;
ssl->io_layer->process_output = process_output_done;
}
- _log(ssl, "process_output_ssl() returning %d\n", (int) written);
+ _log(ssl, "process_output_ssl() returning %d", (int) written);
return written;
}
@@ -1095,7 +1094,7 @@ static int init_ssl_socket( pn_ssl_t *ssl )
ssl->ssl = SSL_new(ssl->domain->ctx);
if (!ssl->ssl) {
- _log_error( "SSL socket setup failure.\n" );
+ _log_error(ssl, "SSL socket setup failure." );
return -1;
}
@@ -1112,10 +1111,10 @@ static int init_ssl_socket( pn_ssl_t *ssl )
if (ssl->session_id) {
pn_ssl_session_t *ssn = ssn_cache_find( ssl->domain, ssl->session_id );
if (ssn) {
- _log( ssl, "Restoring previous session id=%s\n", ssn->id );
+ _log( ssl, "Restoring previous session id=%s", ssn->id );
int rc = SSL_set_session( ssl->ssl, ssn->session );
if (rc != 1) {
- _log( ssl, "Session restore failed, id=%s\n", ssn->id );
+ _log( ssl, "Session restore failed, id=%s", ssn->id );
}
LL_REMOVE( ssl->domain, ssn_cache, ssn );
ssl_session_free( ssn );
@@ -1125,14 +1124,14 @@ static int init_ssl_socket( pn_ssl_t *ssl )
// now layer a BIO over the SSL socket
ssl->bio_ssl = BIO_new(BIO_f_ssl());
if (!ssl->bio_ssl) {
- _log_error( "BIO setup failure.\n" );
+ _log_error(ssl, "BIO setup failure." );
return -1;
}
(void)BIO_set_ssl(ssl->bio_ssl, ssl->ssl, BIO_NOCLOSE);
// create the "lower" BIO "pipe", and attach it below the SSL layer
if (!BIO_new_bio_pair(&ssl->bio_ssl_io, 0, &ssl->bio_net_io, 0)) {
- _log_error( "BIO setup failure.\n" );
+ _log_error(ssl, "BIO setup failure." );
return -1;
}
SSL_set_bio(ssl->ssl, ssl->bio_ssl_io, ssl->bio_ssl_io);
@@ -1140,11 +1139,11 @@ static int init_ssl_socket( pn_ssl_t *ssl )
if (ssl->domain->mode == PN_SSL_MODE_SERVER) {
SSL_set_accept_state(ssl->ssl);
BIO_set_ssl_mode(ssl->bio_ssl, 0); // server mode
- _log( ssl, "Server SSL socket created.\n" );
+ _log( ssl, "Server SSL socket created." );
} else { // client mode
SSL_set_connect_state(ssl->ssl);
BIO_set_ssl_mode(ssl->bio_ssl, 1); // client mode
- _log( ssl, "Client SSL socket created.\n" );
+ _log( ssl, "Client SSL socket created." );
}
return 0;
}
@@ -1167,7 +1166,7 @@ static void release_ssl_socket( pn_ssl_t *ssl )
static int setup_cleartext_connection( pn_ssl_t *ssl )
{
- _log( ssl, "Cleartext connection detected.\n");
+ _log( ssl, "Cleartext connection detected.");
ssl->io_layer->process_input = pn_io_layer_input_passthru;
ssl->io_layer->process_output = pn_io_layer_output_passthru;
return 0;
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/a5d65452/proton-c/src/transport/transport.c
----------------------------------------------------------------------
diff --git a/proton-c/src/transport/transport.c b/proton-c/src/transport/transport.c
index d91b55a..601d6a2 100644
--- a/proton-c/src/transport/transport.c
+++ b/proton-c/src/transport/transport.c
@@ -177,8 +177,7 @@ static void pn_transport_initialize(void *object)
transport->done_processing = false;
- transport->posted_head_closed = false;
- transport->posted_tail_closed = false;
+ transport->posted_idle_timeout = false;
}
pn_session_t *pn_channel_state(pn_transport_t *transport, uint16_t channel)
@@ -457,6 +456,24 @@ static pn_collector_t *pni_transport_collector(pn_transport_t *transport)
}
}
+static void pni_maybe_post_closed(pn_transport_t *transport)
+{
+ pn_collector_t *collector = pni_transport_collector(transport);
+ if (transport->head_closed && transport->tail_closed) {
+ pn_collector_put(collector, PN_OBJECT, transport, PN_TRANSPORT_CLOSED);
+ }
+}
+
+static void pni_close_tail(pn_transport_t *transport)
+{
+ if (!transport->tail_closed) {
+ transport->tail_closed = true;
+ pn_collector_t *collector = pni_transport_collector(transport);
+ pn_collector_put(collector, PN_OBJECT, transport, PN_TRANSPORT_TAIL_CLOSED);
+ pni_maybe_post_closed(transport);
+ }
+}
+
int pn_do_error(pn_transport_t *transport, const char *condition, const char *fmt, ...)
{
va_list ap;
@@ -479,6 +496,8 @@ int pn_do_error(pn_transport_t *transport, const char *condition, const char *fm
pn_collector_t *collector = pni_transport_collector(transport);
pn_collector_put(collector, PN_OBJECT, transport, PN_TRANSPORT_ERROR);
pn_transport_logf(transport, "ERROR %s %s", condition, buf);
+ transport->done_processing = true;
+ pni_close_tail(transport);
return PN_ERR;
}
@@ -1050,14 +1069,6 @@ ssize_t pn_transport_input(pn_transport_t *transport, const char *bytes, size_t
return original - available;
}
-static void pni_maybe_post_closed(pn_transport_t *transport)
-{
- pn_collector_t *collector = pni_transport_collector(transport);
- if (transport->posted_head_closed && transport->posted_tail_closed) {
- pn_collector_put(collector, PN_OBJECT, transport, PN_TRANSPORT_CLOSED);
- }
-}
-
// process pending input until none remaining or EOS
static ssize_t transport_consume(pn_transport_t *transport)
{
@@ -1079,12 +1090,6 @@ static ssize_t transport_consume(pn_transport_t *transport)
if (transport->disp->trace & (PN_TRACE_RAW | PN_TRACE_FRM))
pn_transport_log(transport, " <- EOS");
transport->input_pending = 0; // XXX ???
- if (!transport->posted_tail_closed) {
- pn_collector_t *collector = pni_transport_collector(transport);
- pn_collector_put(collector, PN_OBJECT, transport, PN_TRANSPORT_TAIL_CLOSED);
- transport->posted_tail_closed = true;
- pni_maybe_post_closed(transport);
- }
return n;
}
}
@@ -1171,8 +1176,11 @@ static pn_timestamp_t pn_tick_amqp(pn_io_layer_t *io_layer, pn_timestamp_t now)
transport->last_bytes_input = transport->bytes_input;
} else if (transport->dead_remote_deadline <= now) {
transport->dead_remote_deadline = now + transport->local_idle_timeout;
- // Note: AMQP-1.0 really should define a generic "timeout" error, but does not.
- pn_do_error(transport, "amqp:resource-limit-exceeded", "local-idle-timeout expired");
+ if (!transport->posted_idle_timeout) {
+ transport->posted_idle_timeout = true;
+ // Note: AMQP-1.0 really should define a generic "timeout" error, but does not.
+ pn_do_error(transport, "amqp:resource-limit-exceeded", "local-idle-timeout expired");
+ }
}
timeout = transport->dead_remote_deadline;
}
@@ -1861,9 +1869,21 @@ static ssize_t pn_output_write_amqp(pn_io_layer_t *io_layer, char *bytes, size_t
return pn_dispatcher_output(transport->disp, bytes, size);
}
+static void pni_close_head(pn_transport_t *transport)
+{
+ if (!transport->head_closed) {
+ transport->head_closed = true;
+ pn_collector_t *collector = pni_transport_collector(transport);
+ pn_collector_put(collector, PN_OBJECT, transport, PN_TRANSPORT_HEAD_CLOSED);
+ pni_maybe_post_closed(transport);
+ }
+}
+
// generate outbound data, return amount of pending output else error
static ssize_t transport_produce(pn_transport_t *transport)
{
+ if (transport->head_closed) return PN_EOS;
+
pn_io_layer_t *io_layer = transport->io_layers;
ssize_t space = transport->output_size - transport->output_pending;
@@ -1900,13 +1920,12 @@ static ssize_t transport_produce(pn_transport_t *transport)
if (n < 0) {
pn_transport_log(transport, " -> EOS");
}
- /*else
- pn_transport_logf(transport, " -> EOS (%" PN_ZI ") %s", n,
- pn_error_text(transport->error));*/
}
+ pni_close_head(transport);
return n;
}
}
+
return transport->output_pending;
}
@@ -1963,15 +1982,24 @@ void pn_transport_log(pn_transport_t *transport, const char *message)
transport->tracer(transport, message);
}
+void pn_transport_vlogf(pn_transport_t *transport, const char *fmt, va_list ap)
+{
+ if (transport) {
+ pn_string_vformat(transport->scratch, fmt, ap);
+ pn_transport_log(transport, pn_string_get(transport->scratch));
+ } else {
+ vfprintf(stderr, fmt, ap);
+ fprintf(stderr, "\n");
+ }
+}
+
void pn_transport_logf(pn_transport_t *transport, const char *fmt, ...)
{
va_list ap;
va_start(ap, fmt);
- pn_string_vformat(transport->scratch, fmt, ap);
+ pn_transport_vlogf(transport, fmt, ap);
va_end(ap);
-
- pn_transport_log(transport, pn_string_get(transport->scratch));
}
uint16_t pn_transport_get_channel_max(pn_transport_t *transport)
@@ -2132,13 +2160,6 @@ ssize_t pn_transport_push(pn_transport_t *transport, const char *src, size_t siz
}
}
-void pni_close_tail(pn_transport_t *transport)
-{
- if (!transport->tail_closed) {
- transport->tail_closed = true;
- }
-}
-
int pn_transport_process(pn_transport_t *transport, size_t size)
{
assert(transport);
@@ -2168,7 +2189,6 @@ int pn_transport_close_tail(pn_transport_t *transport)
ssize_t pn_transport_pending(pn_transport_t *transport) /* <0 == done */
{
assert(transport);
- if (transport->head_closed) return PN_EOS;
return transport_produce( transport );
}
@@ -2211,12 +2231,8 @@ void pn_transport_pop(pn_transport_t *transport, size_t size)
transport->output_pending );
}
- if (!transport->output_pending && pn_transport_pending(transport) < 0 &&
- !transport->posted_head_closed) {
- pn_collector_t *collector = pni_transport_collector(transport);
- pn_collector_put(collector, PN_OBJECT, transport, PN_TRANSPORT_HEAD_CLOSED);
- transport->posted_head_closed = true;
- pni_maybe_post_closed(transport);
+ if (!transport->output_pending && pn_transport_pending(transport) < 0) {
+ pni_close_head(transport);
}
}
}
@@ -2224,7 +2240,7 @@ void pn_transport_pop(pn_transport_t *transport, size_t size)
int pn_transport_close_head(pn_transport_t *transport)
{
size_t pending = pn_transport_pending(transport);
- transport->head_closed = true;
+ pni_close_head(transport);
pn_transport_pop(transport, pending);
return 0;
}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/a5d65452/proton-c/src/windows/schannel.c
----------------------------------------------------------------------
diff --git a/proton-c/src/windows/schannel.c b/proton-c/src/windows/schannel.c
index 373dc51..abf4b85 100644
--- a/proton-c/src/windows/schannel.c
+++ b/proton-c/src/windows/schannel.c
@@ -222,7 +222,6 @@ static int ssl_failed(pn_ssl_t *ssl, const char *reason)
ssl->ssl_closed = true;
ssl->app_input_closed = ssl->app_output_closed = PN_EOS;
ssl->state = SSL_CLOSED;
- pni_close_tail(ssl->transport);
pn_do_error(ssl->transport, "amqp:connection:framing-error", "SSL Failure: %s", reason);
return PN_EOS;
}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/a5d65452/tests/python/proton_tests/engine.py
----------------------------------------------------------------------
diff --git a/tests/python/proton_tests/engine.py b/tests/python/proton_tests/engine.py
index eec73d0..d17a57c 100644
--- a/tests/python/proton_tests/engine.py
+++ b/tests/python/proton_tests/engine.py
@@ -2413,3 +2413,30 @@ class TeardownLeakTest(PeerTest):
def testLeak(self):
self.doLeak(False, False)
+
+class IdleTimeoutEventTest(PeerTest):
+
+ def half_pump(self):
+ p = self.transport.pending()
+ self.transport.pop(p)
+
+ def testTimeoutWithZombieServer(self):
+ self.transport.idle_timeout = self.delay
+ self.connection.open()
+ self.half_pump()
+ self.transport.tick(time())
+ sleep(self.delay*2)
+ self.transport.tick(time())
+ self.expect(Event.CONNECTION_INIT, Event.CONNECTION_BOUND,
+ Event.CONNECTION_LOCAL_OPEN, Event.TRANSPORT,
+ Event.TRANSPORT_ERROR, Event.TRANSPORT_TAIL_CLOSED)
+ assert self.transport.capacity() < 0
+ assert self.transport.pending() > 0
+ self.half_pump()
+ self.expect(Event.TRANSPORT_HEAD_CLOSED, Event.TRANSPORT_CLOSED)
+ assert self.transport.pending() < 0
+
+ def testTimeoutWithZombieServerAndSASL(self):
+ sasl = self.transport.sasl()
+ sasl.client()
+ self.testTimeoutWithZombieServer()
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org