You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kg...@apache.org on 2013/02/15 18:01:04 UTC
svn commit: r1446688 - in /qpid/proton/branches/kgiusti-proton-225:
proton-c/src/dispatcher/dispatcher.c proton-c/src/ssl/openssl.c
tests/python/proton_tests/sasl.py
Author: kgiusti
Date: Fri Feb 15 17:01:04 2013
New Revision: 1446688
URL: http://svn.apache.org/r1446688
Log:
PROTON-235: fix SASL buffering problem
Modified:
qpid/proton/branches/kgiusti-proton-225/proton-c/src/dispatcher/dispatcher.c
qpid/proton/branches/kgiusti-proton-225/proton-c/src/ssl/openssl.c
qpid/proton/branches/kgiusti-proton-225/tests/python/proton_tests/sasl.py
Modified: qpid/proton/branches/kgiusti-proton-225/proton-c/src/dispatcher/dispatcher.c
URL: http://svn.apache.org/viewvc/qpid/proton/branches/kgiusti-proton-225/proton-c/src/dispatcher/dispatcher.c?rev=1446688&r1=1446687&r2=1446688&view=diff
==============================================================================
--- qpid/proton/branches/kgiusti-proton-225/proton-c/src/dispatcher/dispatcher.c (original)
+++ qpid/proton/branches/kgiusti-proton-225/proton-c/src/dispatcher/dispatcher.c Fri Feb 15 17:01:04 2013
@@ -166,63 +166,26 @@ int pn_dispatch_frame(pn_dispatcher_t *d
ssize_t pn_dispatcher_input(pn_dispatcher_t *disp, const char *bytes, size_t available)
{
- size_t offered = available;
-
- if (offered == disp->fragment) {
- return 0;
- }
-
- size_t leftover = pn_buffer_size(disp->input);
- if (leftover) {
- int e = pn_buffer_append(disp->input, bytes, available);
- if (e) return e;
- pn_bytes_t b = pn_buffer_bytes(disp->input);
- bytes = b.start;
- available = b.size;
- }
-
size_t read = 0;
- bool fragment = false;
- while (!disp->halt) {
+ while (available && !disp->halt) {
pn_frame_t frame;
- size_t n = pn_read_frame(&frame, bytes + read, available - read);
+ size_t n = pn_read_frame(&frame, bytes + read, available);
if (n) {
+ read += n;
+ available -= n;
disp->input_frames_ct += 1;
int e = pn_dispatch_frame(disp, frame);
if (e) return e;
- read += n;
} else {
- if (leftover) {
- if (read > leftover) {
- pn_buffer_clear(disp->input);
- fragment = true;
- } else {
- read = available;
- }
- } else {
- if (!read) {
- int e = pn_buffer_append(disp->input, bytes + read, available - read);
- if (e) return e;
- read = available;
- } else {
- fragment = true;
- }
- }
break;
}
if (!disp->batch) break;
}
- size_t consumed = read - leftover;
- if (consumed && fragment) {
- disp->fragment = offered - consumed;
- } else {
- disp->fragment = 0;
- }
- return consumed;
+ return read;
}
int pn_scan_args(pn_dispatcher_t *disp, const char *fmt, ...)
Modified: qpid/proton/branches/kgiusti-proton-225/proton-c/src/ssl/openssl.c
URL: http://svn.apache.org/viewvc/qpid/proton/branches/kgiusti-proton-225/proton-c/src/ssl/openssl.c?rev=1446688&r1=1446687&r2=1446688&view=diff
==============================================================================
--- qpid/proton/branches/kgiusti-proton-225/proton-c/src/ssl/openssl.c (original)
+++ qpid/proton/branches/kgiusti-proton-225/proton-c/src/ssl/openssl.c Fri Feb 15 17:01:04 2013
@@ -91,9 +91,11 @@ struct pn_ssl_t {
// buffers for holding I/O from "applications" above SSL
#define APP_BUF_SIZE (4*1024)
- char outbuf[APP_BUF_SIZE];
+ char *outbuf;
+ size_t out_size;
size_t out_count;
- char inbuf[APP_BUF_SIZE];
+ char *inbuf;
+ size_t in_size;
size_t in_count;
pn_trace_t trace;
@@ -703,7 +705,8 @@ void pn_ssl_free( pn_ssl_t *ssl)
if (ssl->domain) pn_ssl_domain_free(ssl->domain);
if (ssl->session_id) free((void *)ssl->session_id);
if (ssl->peer_hostname) free((void *)ssl->peer_hostname);
-
+ if (ssl->inbuf) free((void *)ssl->inbuf);
+ if (ssl->outbuf) free((void *)ssl->outbuf);
free(ssl);
}
@@ -714,6 +717,21 @@ pn_ssl_t *pn_ssl(pn_transport_t *transpo
pn_ssl_t *ssl = (pn_ssl_t *) calloc(1, sizeof(pn_ssl_t));
if (!ssl) return NULL;
+ ssl->out_size = APP_BUF_SIZE;
+ uint32_t max_frame = pn_transport_get_max_frame(transport);
+ ssl->in_size = max_frame ? max_frame : APP_BUF_SIZE;
+ ssl->outbuf = (char *)malloc(ssl->out_size);
+ if (!ssl->outbuf) {
+ free(ssl);
+ return NULL;
+ }
+ ssl->inbuf = (char *)malloc(ssl->in_size);
+ if (!ssl->inbuf) {
+ free(ssl->outbuf);
+ free(ssl);
+ return NULL;
+ }
+
ssl->transport = transport;
transport->ssl = ssl;
@@ -814,8 +832,8 @@ static ssize_t process_input_ssl( pn_io_
// Read all available data from the SSL socket
- if (!ssl->ssl_closed && ssl->in_count < APP_BUF_SIZE) {
- int read = BIO_read( ssl->bio_ssl, &ssl->inbuf[ssl->in_count], APP_BUF_SIZE - ssl->in_count );
+ if (!ssl->ssl_closed && ssl->in_count < ssl->in_size) {
+ int read = BIO_read( ssl->bio_ssl, &ssl->inbuf[ssl->in_count], ssl->in_size - ssl->in_count );
if (read > 0) {
_log( ssl, "Read %d bytes from SSL socket for app\n", read );
_log_clear_data( ssl, &ssl->inbuf[ssl->in_count], read );
@@ -860,15 +878,39 @@ static ssize_t process_input_ssl( pn_io_
data += consumed;
work_pending = true;
_log( ssl, "Application consumed %d bytes from peer\n", (int) consumed );
+ } else if (consumed < 0) {
+ _log(ssl, "Application layer closed its input, error=%d (discarding %d bytes)\n",
+ (int) consumed, (int)ssl->in_count);
+ ssl->in_count = 0; // discard any pending input
+ ssl->app_input_closed = consumed;
+ if (ssl->app_output_closed && ssl->out_count == 0) {
+ // both sides of app closed, and no more app output pending:
+ start_ssl_shutdown(ssl);
+ }
} else {
- if (consumed < 0) {
- _log(ssl, "Application layer closed its input, error=%d (discarding %d bytes)\n",
- (int) consumed, (int)ssl->in_count);
- ssl->in_count = 0; // discard any pending input
- ssl->app_input_closed = consumed;
- if (ssl->app_output_closed && ssl->out_count == 0) {
- // both sides of app closed, and no more app output pending:
- start_ssl_shutdown(ssl);
+ // app did not consume any bytes, must be waiting for a full frame
+ if (ssl->in_count == ssl->in_size) {
+ // but the buffer is full, not enough room for a full frame.
+ // can we grow the buffer?
+ uint32_t max_frame = pn_transport_get_max_frame(ssl->transport);
+ if (!max_frame) max_frame = ssl->in_size * 2; // no limit
+ if (ssl->in_size < max_frame) {
+ // no max frame limit - grow it.
+ char *newbuf = (char *)malloc( max_frame );
+ if (newbuf) {
+ ssl->in_size *= max_frame;
+ memmove( newbuf, ssl->inbuf, ssl->in_count );
+ free( ssl->inbuf );
+ ssl->inbuf = newbuf;
+ }
+ work_pending = true; // can we get more input?
+ } else {
+ // can't gather any more input, but app needs more?
+ // This is a bug - since SSL can buffer up to max-frame,
+ // the application _must_ have enough data to process. If
+ // this is an oversized frame, the app _must_ handle it
+ // by returning an error code to SSL.
+ _log_error("Error: application unable to consume input.\n");
}
}
}
@@ -913,9 +955,9 @@ static ssize_t process_output_ssl( pn_io
work_pending = false;
// first, get any pending application output, if possible
- if (!ssl->app_output_closed && ssl->out_count < APP_BUF_SIZE) {
+ if (!ssl->app_output_closed && ssl->out_count < ssl->out_size) {
pn_io_layer_t *io_next = ssl->io_layer->next;
- ssize_t app_bytes = io_next->process_output( io_next, &ssl->outbuf[ssl->out_count], APP_BUF_SIZE - ssl->out_count);
+ ssize_t app_bytes = io_next->process_output( io_next, &ssl->outbuf[ssl->out_count], ssl->out_size - ssl->out_count);
if (app_bytes > 0) {
ssl->out_count += app_bytes;
work_pending = true;
Modified: qpid/proton/branches/kgiusti-proton-225/tests/python/proton_tests/sasl.py
URL: http://svn.apache.org/viewvc/qpid/proton/branches/kgiusti-proton-225/tests/python/proton_tests/sasl.py?rev=1446688&r1=1446687&r2=1446688&view=diff
==============================================================================
--- qpid/proton/branches/kgiusti-proton-225/tests/python/proton_tests/sasl.py (original)
+++ qpid/proton/branches/kgiusti-proton-225/tests/python/proton_tests/sasl.py Fri Feb 15 17:01:04 2013
@@ -106,3 +106,27 @@ class SaslTest(Test):
out1 = self.t1.output(1024)
assert len(out1) > 0
+
+ def testFracturedSASL(self):
+ """ PROTON-235
+ """
+ self.s1.mechanisms("ANONYMOUS")
+ self.s1.client()
+ assert self.s1.outcome is None
+
+ # self.t1.trace(Transport.TRACE_FRM)
+
+ out = self.t1.output(1024)
+ self.t1.input("AMQP\x03\x01\x00\x00")
+ out = self.t1.output(1024)
+ self.t1.input("\x00\x00\x00")
+ out = self.t1.output(1024)
+ self.t1.input("A\x02\x01\x00\x00\x00S@\xc04\x01\xe01\x06\xa3\x06GSSAPI\x05PLAIN\x0aDIGEST-MD5\x08AMQPLAIN\x08CRAM-MD5\x04NTLM")
+ out = self.t1.output(1024)
+ self.t1.input("\x00\x00\x00\x10\x02\x01\x00\x00\x00SD\xc0\x03\x01P\x00")
+ out = self.t1.output(1024)
+ while out:
+ out = self.t1.output(1024)
+
+ assert self.s1.outcome == SASL.OK
+
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org