You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kg...@apache.org on 2013/02/15 18:07:45 UTC
svn commit: r1446691 - in /qpid/proton/branches/kgiusti-proton-225/proton-c:
include/proton/engine.h src/engine/engine-internal.h src/engine/engine.c
src/messenger.c src/posix/driver.c src/ssl/openssl.c src/windows/driver.c
Author: kgiusti
Date: Fri Feb 15 17:07:44 2013
New Revision: 1446691
URL: http://svn.apache.org/r1446691
Log:
PROTON-222: give messenger visibility into the amount of buffered output
Modified:
qpid/proton/branches/kgiusti-proton-225/proton-c/include/proton/engine.h
qpid/proton/branches/kgiusti-proton-225/proton-c/src/engine/engine-internal.h
qpid/proton/branches/kgiusti-proton-225/proton-c/src/engine/engine.c
qpid/proton/branches/kgiusti-proton-225/proton-c/src/messenger.c
qpid/proton/branches/kgiusti-proton-225/proton-c/src/posix/driver.c
qpid/proton/branches/kgiusti-proton-225/proton-c/src/ssl/openssl.c
qpid/proton/branches/kgiusti-proton-225/proton-c/src/windows/driver.c
Modified: qpid/proton/branches/kgiusti-proton-225/proton-c/include/proton/engine.h
URL: http://svn.apache.org/viewvc/qpid/proton/branches/kgiusti-proton-225/proton-c/include/proton/engine.h?rev=1446691&r1=1446690&r2=1446691&view=diff
==============================================================================
--- qpid/proton/branches/kgiusti-proton-225/proton-c/include/proton/engine.h (original)
+++ qpid/proton/branches/kgiusti-proton-225/proton-c/include/proton/engine.h Fri Feb 15 17:07:44 2013
@@ -402,6 +402,7 @@ PN_EXTERN void pn_transport_set_idle_tim
PN_EXTERN pn_millis_t pn_transport_get_remote_idle_timeout(pn_transport_t *transport);
PN_EXTERN uint64_t pn_transport_get_frames_output(const pn_transport_t *transport);
PN_EXTERN uint64_t pn_transport_get_frames_input(const pn_transport_t *transport);
+PN_EXTERN size_t pn_transport_buffered_output(pn_transport_t *transport);
PN_EXTERN void pn_transport_free(pn_transport_t *transport);
// session
Modified: qpid/proton/branches/kgiusti-proton-225/proton-c/src/engine/engine-internal.h
URL: http://svn.apache.org/viewvc/qpid/proton/branches/kgiusti-proton-225/proton-c/src/engine/engine-internal.h?rev=1446691&r1=1446690&r2=1446691&view=diff
==============================================================================
--- qpid/proton/branches/kgiusti-proton-225/proton-c/src/engine/engine-internal.h (original)
+++ qpid/proton/branches/kgiusti-proton-225/proton-c/src/engine/engine-internal.h Fri Feb 15 17:07:44 2013
@@ -113,6 +113,8 @@ typedef struct pn_io_layer_t {
ssize_t (*process_input)(struct pn_io_layer_t *io_layer, const char *, size_t);
ssize_t (*process_output)(struct pn_io_layer_t *io_layer, char *, size_t);
pn_timestamp_t (*process_tick)(struct pn_io_layer_t *io_layer, pn_timestamp_t);
+ size_t (*buffered_output)(struct pn_io_layer_t *); // how much output is held
+ size_t (*buffered_input)(struct pn_io_layer_t *); // how much input is held
} pn_io_layer_t;
struct pn_transport_t {
Modified: qpid/proton/branches/kgiusti-proton-225/proton-c/src/engine/engine.c
URL: http://svn.apache.org/viewvc/qpid/proton/branches/kgiusti-proton-225/proton-c/src/engine/engine.c?rev=1446691&r1=1446690&r2=1446691&view=diff
==============================================================================
--- qpid/proton/branches/kgiusti-proton-225/proton-c/src/engine/engine.c (original)
+++ qpid/proton/branches/kgiusti-proton-225/proton-c/src/engine/engine.c Fri Feb 15 17:07:44 2013
@@ -752,6 +752,8 @@ void pn_transport_init(pn_transport_t *t
io_layer->process_input = pn_io_layer_input_passthru;
io_layer->process_output = pn_io_layer_output_passthru;
io_layer->process_tick = pn_io_layer_tick_passthru;
+ io_layer->buffered_output = NULL;
+ io_layer->buffered_input = NULL;
++io_layer;
}
@@ -760,6 +762,8 @@ void pn_transport_init(pn_transport_t *t
amqp->process_input = pn_input_read_amqp_header;
amqp->process_output = pn_output_write_amqp_header;
amqp->process_tick = pn_io_layer_tick_passthru;
+ amqp->buffered_output = NULL;
+ amqp->buffered_input = NULL;
amqp->next = NULL;
pn_dispatcher_action(transport->disp, OPEN, "OPEN", pn_do_open);
@@ -3087,3 +3091,21 @@ int pn_transport_close_head(pn_transport
{
return 0;
}
+
+size_t pn_transport_buffered_output(pn_transport_t *transport)
+{
+ size_t count = 0;
+ if (transport) {
+ ssize_t pending = pn_transport_pending(transport);
+ if (pending >= 0) { // !error
+ count += pending;
+ pn_io_layer_t *io_layer = transport->io_layers;
+ while (io_layer != &transport->io_layers[PN_IO_LAYER_CT]) {
+ if (io_layer->buffered_output)
+ count += io_layer->buffered_output( io_layer );
+ ++io_layer;
+ }
+ }
+ }
+ return count;
+}
Modified: qpid/proton/branches/kgiusti-proton-225/proton-c/src/messenger.c
URL: http://svn.apache.org/viewvc/qpid/proton/branches/kgiusti-proton-225/proton-c/src/messenger.c?rev=1446691&r1=1446690&r2=1446691&view=diff
==============================================================================
--- qpid/proton/branches/kgiusti-proton-225/proton-c/src/messenger.c (original)
+++ qpid/proton/branches/kgiusti-proton-225/proton-c/src/messenger.c Fri Feb 15 17:07:44 2013
@@ -1046,10 +1046,21 @@ int pn_messenger_settle(pn_messenger_t *
return pn_queue_update(queue, pn_tracker_sequence(tracker), (pn_status_t) 0, flags, true, true);
}
+// true if all pending output has been sent to peer
bool pn_messenger_sent(pn_messenger_t *messenger)
{
pn_connector_t *ctor = pn_connector_head(messenger->driver);
while (ctor) {
+
+ pn_transport_t *transport = pn_connector_transport(ctor);
+ if (transport) {
+ // could be as simple as this, if not for SSL:
+ //if (pn_transport_pending(transport) > 0)
+ // return false;
+ if (pn_transport_buffered_output(transport) > 0)
+ return false;
+ }
+
pn_connection_t *conn = pn_connector_connection(ctor);
pn_link_t *link = pn_link_head(conn, PN_LOCAL_ACTIVE);
Modified: qpid/proton/branches/kgiusti-proton-225/proton-c/src/posix/driver.c
URL: http://svn.apache.org/viewvc/qpid/proton/branches/kgiusti-proton-225/proton-c/src/posix/driver.c?rev=1446691&r1=1446690&r2=1446691&view=diff
==============================================================================
--- qpid/proton/branches/kgiusti-proton-225/proton-c/src/posix/driver.c (original)
+++ qpid/proton/branches/kgiusti-proton-225/proton-c/src/posix/driver.c Fri Feb 15 17:07:44 2013
@@ -103,7 +103,6 @@ struct pn_listener_t {
void *context;
};
-#define IO_BUF_SIZE (64*1024)
#define PN_NAME_MAX (256)
struct pn_connector_t {
@@ -120,13 +119,7 @@ struct pn_connector_t {
pn_trace_t trace;
bool closed;
pn_timestamp_t wakeup;
- void (*read)(pn_connector_t *);
- void (*write) (pn_connector_t *);
- size_t input_size;
- char input[IO_BUF_SIZE];
bool input_eos;
- size_t output_size;
- char output[IO_BUF_SIZE];
pn_connection_t *connection;
pn_transport_t *transport;
pn_sasl_t *sasl;
@@ -378,9 +371,6 @@ pn_connector_t *pn_connector(pn_driver_t
return c;
}
-static void pn_connector_read(pn_connector_t *ctor);
-static void pn_connector_write(pn_connector_t *ctor);
-
pn_connector_t *pn_connector_fd(pn_driver_t *driver, int fd, void *context)
{
if (!driver) return NULL;
@@ -400,11 +390,7 @@ pn_connector_t *pn_connector_fd(pn_drive
c->trace = driver->trace;
c->closed = false;
c->wakeup = 0;
- c->read = pn_connector_read;
- c->write = pn_connector_write;
- c->input_size = 0;
c->input_eos = false;
- c->output_size = 0;
c->connection = NULL;
c->transport = pn_transport();
c->sasl = pn_sasl(c->transport);
@@ -503,74 +489,6 @@ void pn_connector_free(pn_connector_t *c
free(ctor);
}
-static void pn_connector_read(pn_connector_t *ctor)
-{
- ssize_t n = recv(ctor->fd, ctor->input + ctor->input_size, IO_BUF_SIZE - ctor->input_size, 0);
- if (n < 0) {
- if (errno != EAGAIN) {
- if (n < 0) perror("read");
- ctor->status &= ~PN_SEL_RD;
- ctor->input_eos = true;
- }
- } else if (n == 0) {
- ctor->status &= ~PN_SEL_RD;
- ctor->input_eos = true;
- } else {
- ctor->input_size += n;
- }
-}
-
-static void pn_connector_consume(pn_connector_t *ctor, int n)
-{
- ctor->input_size -= n;
- memmove(ctor->input, ctor->input + n, ctor->input_size);
-}
-
-static void pn_connector_process_input(pn_connector_t *ctor)
-{
- pn_transport_t *transport = ctor->transport;
- if (!ctor->input_done) {
- if (ctor->input_size > 0 || ctor->input_eos) {
- ssize_t n = pn_transport_input(transport, ctor->input, ctor->input_size);
- if (n >= 0) {
- pn_connector_consume(ctor, n);
- } else {
- pn_connector_consume(ctor, ctor->input_size);
- ctor->input_done = true;
- }
- }
- }
-}
-
-static char *pn_connector_output(pn_connector_t *ctor)
-{
- return ctor->output + ctor->output_size;
-}
-
-static size_t pn_connector_available(pn_connector_t *ctor)
-{
- return IO_BUF_SIZE - ctor->output_size;
-}
-
-static void pn_connector_process_output(pn_connector_t *ctor)
-{
- pn_transport_t *transport = ctor->transport;
- if (!ctor->output_done) {
- ssize_t n = pn_transport_output(transport, pn_connector_output(ctor),
- pn_connector_available(ctor));
- if (n >= 0) {
- ctor->output_size += n;
- } else {
- ctor->output_done = true;
- }
- }
-
- if (ctor->output_size) {
- ctor->status |= PN_SEL_WR;
- }
-}
-
-
void pn_connector_activate(pn_connector_t *ctor, pn_activate_criteria_t crit)
{
switch (crit) {
@@ -606,28 +524,6 @@ bool pn_connector_activated(pn_connector
return result;
}
-
-static void pn_connector_write(pn_connector_t *ctor)
-{
- if (ctor->output_size > 0) {
- ssize_t n = pn_send(ctor->fd, ctor->output, ctor->output_size);
- if (n < 0) {
- // XXX
- if (errno != EAGAIN) {
- perror("send");
- ctor->output_size = 0;
- ctor->output_done = true;
- }
- } else {
- ctor->output_size -= n;
- memmove(ctor->output, ctor->output + n, ctor->output_size);
- }
- }
-
- if (!ctor->output_size)
- ctor->status &= ~PN_SEL_WR;
-}
-
static pn_timestamp_t pn_connector_tick(pn_connector_t *ctor, time_t now)
{
if (!ctor->transport) return 0;
@@ -639,21 +535,80 @@ void pn_connector_process(pn_connector_t
if (c) {
if (c->closed) return;
- if (c->pending_read) {
- c->read(c);
- c->pending_read = false;
+ pn_transport_t *transport = c->transport;
+
+ ///
+ /// Socket read
+ ///
+ if (!c->input_done) {
+ ssize_t capacity = pn_transport_capacity(transport);
+ if (capacity > 0) {
+ c->status |= PN_SEL_RD;
+ if (c->pending_read) {
+ c->pending_read = false;
+ ssize_t n = recv(c->fd, pn_transport_buffer(transport),
+ capacity, 0);
+ if (n < 0) {
+ if (errno != EAGAIN) {
+ perror("read");
+ c->status &= ~PN_SEL_RD;
+ c->input_eos = true;
+ }
+ } else {
+ if (n == 0) {
+ c->status &= ~PN_SEL_RD;
+ c->input_eos = true;
+ }
+ if (pn_transport_push(transport, (size_t) n) < 0) {
+ c->status &= ~PN_SEL_RD;
+ c->input_done = true;
+ }
+ }
+ }
+ } else if (capacity < 0) {
+ c->status &= ~PN_SEL_RD;
+ c->input_done = true;
+ }
}
- pn_connector_process_input(c);
+ ///
+ /// Event wakeup
+ ///
c->wakeup = pn_connector_tick(c, pn_i_now());
- pn_connector_process_output(c);
- if (c->pending_write) {
- c->write(c);
- c->pending_write = false;
- pn_connector_process_output(c); // XXX: review this - there's a better way to determine if the WR flag should be re-set
+ ///
+ /// Socket write
+ ///
+ if (!c->output_done) {
+ ssize_t pending = pn_transport_pending(transport);
+ if (pending > 0) {
+ c->status |= PN_SEL_WR;
+ if (c->pending_write) {
+ c->pending_write = false;
+ ssize_t n = pn_send(c->fd, pn_transport_peek(transport), pending);
+ if (n < 0) {
+ // XXX
+ if (errno != EAGAIN) {
+ perror("send");
+ c->output_done = true;
+ c->status &= ~PN_SEL_WR;
+ }
+ } else if (n) {
+ pn_transport_pop(transport, (size_t) n);
+ pending -= n;
+ if (pending == 0)
+ c->status &= ~PN_SEL_WR;
+ }
+ }
+ } else if (pending < 0) {
+ c->output_done = true;
+ c->status &= ~PN_SEL_WR;
+ }
}
- if (c->output_size == 0 && c->input_done && c->output_done) {
+
+ // Closed?
+
+ if (c->input_done && c->output_done) {
if (c->trace & (PN_TRACE_FRM | PN_TRACE_RAW | PN_TRACE_DRV)) {
fprintf(stderr, "Closed %s\n", c->name);
}
@@ -879,8 +834,7 @@ pn_connector_t *pn_driver_connector(pn_d
pn_connector_t *c = d->connector_next;
d->connector_next = c->connector_next;
- if (c->closed || c->pending_read || c->pending_write || c->pending_tick ||
- c->input_size || c->input_eos) {
+ if (c->closed || c->pending_read || c->pending_write || c->pending_tick || c->input_eos) {
return c;
}
}
Modified: qpid/proton/branches/kgiusti-proton-225/proton-c/src/ssl/openssl.c
URL: http://svn.apache.org/viewvc/qpid/proton/branches/kgiusti-proton-225/proton-c/src/ssl/openssl.c?rev=1446691&r1=1446690&r2=1446691&view=diff
==============================================================================
--- qpid/proton/branches/kgiusti-proton-225/proton-c/src/ssl/openssl.c (original)
+++ qpid/proton/branches/kgiusti-proton-225/proton-c/src/ssl/openssl.c Fri Feb 15 17:07:44 2013
@@ -127,7 +127,8 @@ static int init_ssl_socket( pn_ssl_t * )
static void release_ssl_socket( pn_ssl_t * );
static pn_ssl_session_t *ssn_cache_find( pn_ssl_domain_t *, const char * );
static void ssl_session_free( pn_ssl_session_t *);
-
+static size_t buffered_output( pn_io_layer_t *io_layer );
+static size_t buffered_input( pn_io_layer_t *io_layer );
// @todo: used to avoid littering the code with calls to printf...
static void _log_error(const char *fmt, ...)
@@ -740,6 +741,8 @@ pn_ssl_t *pn_ssl(pn_transport_t *transpo
ssl->io_layer->process_input = pn_io_layer_input_passthru;
ssl->io_layer->process_output = pn_io_layer_output_passthru;
ssl->io_layer->process_tick = pn_io_layer_tick_passthru;
+ ssl->io_layer->buffered_output = buffered_output;
+ ssl->io_layer->buffered_input = buffered_input;
ssl->trace = (transport->disp) ? transport->disp->trace : PN_TRACE_OFF;
@@ -1277,3 +1280,31 @@ static ssize_t process_output_done(pn_io
{
return PN_EOS;
}
+
+// return # output bytes sitting in this layer
+static size_t buffered_output(pn_io_layer_t *io_layer)
+{
+ size_t count = 0;
+ pn_ssl_t *ssl = (pn_ssl_t *)io_layer->context;
+ if (ssl) {
+ count += ssl->out_count;
+ if (ssl->bio_net_io) { // pick up any bytes waiting for network io
+ count += BIO_ctrl_pending(ssl->bio_net_io);
+ }
+ }
+ return count;
+}
+
+// return # input bytes sitting in this layer
+static size_t buffered_input( pn_io_layer_t *io_layer )
+{
+ size_t count = 0;
+ pn_ssl_t *ssl = (pn_ssl_t *)io_layer->context;
+ if (ssl) {
+ count += ssl->in_count;
+ if (ssl->bio_ssl) { // pick up any bytes waiting to be read
+ count += BIO_ctrl_pending(ssl->bio_ssl);
+ }
+ }
+ return count;
+}
Modified: qpid/proton/branches/kgiusti-proton-225/proton-c/src/windows/driver.c
URL: http://svn.apache.org/viewvc/qpid/proton/branches/kgiusti-proton-225/proton-c/src/windows/driver.c?rev=1446691&r1=1446690&r2=1446691&view=diff
==============================================================================
--- qpid/proton/branches/kgiusti-proton-225/proton-c/src/windows/driver.c (original)
+++ qpid/proton/branches/kgiusti-proton-225/proton-c/src/windows/driver.c Fri Feb 15 17:07:44 2013
@@ -145,7 +145,6 @@ struct pn_listener_t {
void *context;
};
-#define IO_BUF_SIZE (64*1024)
#define PN_NAME_MAX (256)
struct pn_connector_t {
@@ -162,13 +161,7 @@ struct pn_connector_t {
pn_trace_t trace;
bool closed;
pn_timestamp_t wakeup;
- void (*read)(pn_connector_t *);
- void (*write) (pn_connector_t *);
- size_t input_size;
- char input[IO_BUF_SIZE];
bool input_eos;
- size_t output_size;
- char output[IO_BUF_SIZE];
pn_connection_t *connection;
pn_transport_t *transport;
pn_sasl_t *sasl;
@@ -442,11 +435,7 @@ pn_connector_t *pn_connector_fd(pn_drive
c->trace = driver->trace;
c->closed = false;
c->wakeup = 0;
- c->read = pn_connector_read;
- c->write = pn_connector_write;
- c->input_size = 0;
c->input_eos = false;
- c->output_size = 0;
c->connection = NULL;
c->transport = pn_transport();
c->sasl = pn_sasl(c->transport);
@@ -545,74 +534,6 @@ void pn_connector_free(pn_connector_t *c
free(ctor);
}
-static void pn_connector_read(pn_connector_t *ctor)
-{
- ssize_t n = recv(ctor->fd, ctor->input + ctor->input_size, IO_BUF_SIZE - ctor->input_size, 0);
- if (n < 0) {
- if (errno != EAGAIN) {
- if (n < 0) perror("read");
- ctor->status &= ~PN_SEL_RD;
- ctor->input_eos = true;
- }
- } else if (n == 0) {
- ctor->status &= ~PN_SEL_RD;
- ctor->input_eos = true;
- } else {
- ctor->input_size += n;
- }
-}
-
-static void pn_connector_consume(pn_connector_t *ctor, int n)
-{
- ctor->input_size -= n;
- memmove(ctor->input, ctor->input + n, ctor->input_size);
-}
-
-static void pn_connector_process_input(pn_connector_t *ctor)
-{
- pn_transport_t *transport = ctor->transport;
- if (!ctor->input_done) {
- if (ctor->input_size > 0 || ctor->input_eos) {
- ssize_t n = pn_transport_input(transport, ctor->input, ctor->input_size);
- if (n >= 0) {
- pn_connector_consume(ctor, n);
- } else {
- pn_connector_consume(ctor, ctor->input_size);
- ctor->input_done = true;
- }
- }
- }
-}
-
-static char *pn_connector_output(pn_connector_t *ctor)
-{
- return ctor->output + ctor->output_size;
-}
-
-static size_t pn_connector_available(pn_connector_t *ctor)
-{
- return IO_BUF_SIZE - ctor->output_size;
-}
-
-static void pn_connector_process_output(pn_connector_t *ctor)
-{
- pn_transport_t *transport = ctor->transport;
- if (!ctor->output_done) {
- ssize_t n = pn_transport_output(transport, pn_connector_output(ctor),
- pn_connector_available(ctor));
- if (n >= 0) {
- ctor->output_size += n;
- } else {
- ctor->output_done = true;
- }
- }
-
- if (ctor->output_size) {
- ctor->status |= PN_SEL_WR;
- }
-}
-
-
void pn_connector_activate(pn_connector_t *ctor, pn_activate_criteria_t crit)
{
switch (crit) {
@@ -648,28 +569,6 @@ bool pn_connector_activated(pn_connector
return result;
}
-
-static void pn_connector_write(pn_connector_t *ctor)
-{
- if (ctor->output_size > 0) {
- ssize_t n = pn_send(ctor->fd, ctor->output, ctor->output_size);
- if (n < 0) {
- // XXX
- if (errno != EAGAIN) {
- perror("send");
- ctor->output_size = 0;
- ctor->output_done = true;
- }
- } else {
- ctor->output_size -= n;
- memmove(ctor->output, ctor->output + n, ctor->output_size);
- }
- }
-
- if (!ctor->output_size)
- ctor->status &= ~PN_SEL_WR;
-}
-
static pn_timestamp_t pn_connector_tick(pn_connector_t *ctor, time_t now)
{
if (!ctor->transport) return 0;
@@ -681,21 +580,80 @@ void pn_connector_process(pn_connector_t
if (c) {
if (c->closed) return;
- if (c->pending_read) {
- c->read(c);
- c->pending_read = false;
+ pn_transport_t *transport = c->transport;
+
+ ///
+ /// Socket read
+ ///
+ if (!c->input_done) {
+ ssize_t capacity = pn_transport_capacity(transport);
+ if (capacity > 0) {
+ c->status |= PN_SEL_RD;
+ if (c->pending_read) {
+ c->pending_read = false;
+ ssize_t n = recv(c->fd, pn_transport_buffer(transport),
+ capacity, 0);
+ if (n < 0) {
+ if (errno != EAGAIN) {
+ perror("read");
+ c->status &= ~PN_SEL_RD;
+ c->input_eos = true;
+ }
+ } else {
+ if (n == 0) {
+ c->status &= ~PN_SEL_RD;
+ c->input_eos = true;
+ }
+ if (pn_transport_push(transport, (size_t) n) < 0) {
+ c->status &= ~PN_SEL_RD;
+ c->input_done = true;
+ }
+ }
+ }
+ } else if (capacity < 0) {
+ c->status &= ~PN_SEL_RD;
+ c->input_done = true;
+ }
}
- pn_connector_process_input(c);
+ ///
+ /// Event wakeup
+ ///
c->wakeup = pn_connector_tick(c, pn_i_now());
- pn_connector_process_output(c);
- if (c->pending_write) {
- c->write(c);
- c->pending_write = false;
- pn_connector_process_output(c); // XXX: review this - there's a better way to determine if the WR flag should be re-set
+ ///
+ /// Socket write
+ ///
+ if (!c->output_done) {
+ ssize_t pending = pn_transport_pending(transport);
+ if (pending > 0) {
+ c->status |= PN_SEL_WR;
+ if (c->pending_write) {
+ c->pending_write = false;
+ ssize_t n = pn_send(c->fd, pn_transport_peek(transport), pending);
+ if (n < 0) {
+ // XXX
+ if (errno != EAGAIN) {
+ perror("send");
+ c->output_done = true;
+ c->status &= ~PN_SEL_WR;
+ }
+ } else if (n) {
+ pn_transport_pop(transport, (size_t) n);
+ pending -= n;
+ if (pending == 0)
+ c->status &= ~PN_SEL_WR;
+ }
+ }
+ } else if (pending < 0) {
+ c->output_done = true;
+ c->status &= ~PN_SEL_WR;
+ }
}
- if (c->output_size == 0 && c->input_done && c->output_done) {
+
+ // Closed?
+
+ if (c->input_done && c->output_done) {
if (c->trace & (PN_TRACE_FRM | PN_TRACE_RAW | PN_TRACE_DRV)) {
fprintf(stderr, "Closed %s\n", c->name);
}
@@ -960,8 +918,7 @@ pn_connector_t *pn_driver_connector(pn_d
pn_connector_t *c = d->connector_next;
d->connector_next = c->connector_next;
- if (c->closed || c->pending_read || c->pending_write || c->pending_tick ||
- c->input_size || c->input_eos) {
+ if (c->closed || c->pending_read || c->pending_write || c->pending_tick || c->input_eos) {
return c;
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org