You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kg...@apache.org on 2013/02/15 18:07:45 UTC

svn commit: r1446691 - in /qpid/proton/branches/kgiusti-proton-225/proton-c: include/proton/engine.h src/engine/engine-internal.h src/engine/engine.c src/messenger.c src/posix/driver.c src/ssl/openssl.c src/windows/driver.c

Author: kgiusti
Date: Fri Feb 15 17:07:44 2013
New Revision: 1446691

URL: http://svn.apache.org/r1446691
Log:
PROTON-222: give messenger visibility into the amount of buffered output

Modified:
    qpid/proton/branches/kgiusti-proton-225/proton-c/include/proton/engine.h
    qpid/proton/branches/kgiusti-proton-225/proton-c/src/engine/engine-internal.h
    qpid/proton/branches/kgiusti-proton-225/proton-c/src/engine/engine.c
    qpid/proton/branches/kgiusti-proton-225/proton-c/src/messenger.c
    qpid/proton/branches/kgiusti-proton-225/proton-c/src/posix/driver.c
    qpid/proton/branches/kgiusti-proton-225/proton-c/src/ssl/openssl.c
    qpid/proton/branches/kgiusti-proton-225/proton-c/src/windows/driver.c

Modified: qpid/proton/branches/kgiusti-proton-225/proton-c/include/proton/engine.h
URL: http://svn.apache.org/viewvc/qpid/proton/branches/kgiusti-proton-225/proton-c/include/proton/engine.h?rev=1446691&r1=1446690&r2=1446691&view=diff
==============================================================================
--- qpid/proton/branches/kgiusti-proton-225/proton-c/include/proton/engine.h (original)
+++ qpid/proton/branches/kgiusti-proton-225/proton-c/include/proton/engine.h Fri Feb 15 17:07:44 2013
@@ -402,6 +402,7 @@ PN_EXTERN void pn_transport_set_idle_tim
 PN_EXTERN pn_millis_t pn_transport_get_remote_idle_timeout(pn_transport_t *transport);
 PN_EXTERN uint64_t pn_transport_get_frames_output(const pn_transport_t *transport);
 PN_EXTERN uint64_t pn_transport_get_frames_input(const pn_transport_t *transport);
+PN_EXTERN size_t pn_transport_buffered_output(pn_transport_t *transport);
 PN_EXTERN void pn_transport_free(pn_transport_t *transport);
 
 // session

Modified: qpid/proton/branches/kgiusti-proton-225/proton-c/src/engine/engine-internal.h
URL: http://svn.apache.org/viewvc/qpid/proton/branches/kgiusti-proton-225/proton-c/src/engine/engine-internal.h?rev=1446691&r1=1446690&r2=1446691&view=diff
==============================================================================
--- qpid/proton/branches/kgiusti-proton-225/proton-c/src/engine/engine-internal.h (original)
+++ qpid/proton/branches/kgiusti-proton-225/proton-c/src/engine/engine-internal.h Fri Feb 15 17:07:44 2013
@@ -113,6 +113,8 @@ typedef struct pn_io_layer_t {
   ssize_t (*process_input)(struct pn_io_layer_t *io_layer, const char *, size_t);
   ssize_t (*process_output)(struct pn_io_layer_t *io_layer, char *, size_t);
   pn_timestamp_t (*process_tick)(struct pn_io_layer_t *io_layer, pn_timestamp_t);
+  size_t (*buffered_output)(struct pn_io_layer_t *);  // how much output is held
+  size_t (*buffered_input)(struct pn_io_layer_t *);   // how much input is held
 } pn_io_layer_t;
 
 struct pn_transport_t {

Modified: qpid/proton/branches/kgiusti-proton-225/proton-c/src/engine/engine.c
URL: http://svn.apache.org/viewvc/qpid/proton/branches/kgiusti-proton-225/proton-c/src/engine/engine.c?rev=1446691&r1=1446690&r2=1446691&view=diff
==============================================================================
--- qpid/proton/branches/kgiusti-proton-225/proton-c/src/engine/engine.c (original)
+++ qpid/proton/branches/kgiusti-proton-225/proton-c/src/engine/engine.c Fri Feb 15 17:07:44 2013
@@ -752,6 +752,8 @@ void pn_transport_init(pn_transport_t *t
     io_layer->process_input = pn_io_layer_input_passthru;
     io_layer->process_output = pn_io_layer_output_passthru;
     io_layer->process_tick = pn_io_layer_tick_passthru;
+    io_layer->buffered_output = NULL;
+    io_layer->buffered_input = NULL;
     ++io_layer;
   }
 
@@ -760,6 +762,8 @@ void pn_transport_init(pn_transport_t *t
   amqp->process_input = pn_input_read_amqp_header;
   amqp->process_output = pn_output_write_amqp_header;
   amqp->process_tick = pn_io_layer_tick_passthru;
+  amqp->buffered_output = NULL;
+  amqp->buffered_input = NULL;
   amqp->next = NULL;
 
   pn_dispatcher_action(transport->disp, OPEN, "OPEN", pn_do_open);
@@ -3087,3 +3091,21 @@ int pn_transport_close_head(pn_transport
 {
   return 0;
 }
+
+size_t pn_transport_buffered_output(pn_transport_t *transport)
+{
+  size_t count = 0;
+  if (transport) {
+    ssize_t pending = pn_transport_pending(transport);
+    if (pending >= 0) {  // !error
+      count += pending;
+      pn_io_layer_t *io_layer = transport->io_layers;
+      while (io_layer != &transport->io_layers[PN_IO_LAYER_CT]) {
+        if (io_layer->buffered_output)
+          count += io_layer->buffered_output( io_layer );
+        ++io_layer;
+      }
+    }
+  }
+  return count;
+}

Modified: qpid/proton/branches/kgiusti-proton-225/proton-c/src/messenger.c
URL: http://svn.apache.org/viewvc/qpid/proton/branches/kgiusti-proton-225/proton-c/src/messenger.c?rev=1446691&r1=1446690&r2=1446691&view=diff
==============================================================================
--- qpid/proton/branches/kgiusti-proton-225/proton-c/src/messenger.c (original)
+++ qpid/proton/branches/kgiusti-proton-225/proton-c/src/messenger.c Fri Feb 15 17:07:44 2013
@@ -1046,10 +1046,21 @@ int pn_messenger_settle(pn_messenger_t *
   return pn_queue_update(queue, pn_tracker_sequence(tracker), (pn_status_t) 0, flags, true, true);
 }
 
+// true if all pending output has been sent to peer
 bool pn_messenger_sent(pn_messenger_t *messenger)
 {
   pn_connector_t *ctor = pn_connector_head(messenger->driver);
   while (ctor) {
+
+    pn_transport_t *transport = pn_connector_transport(ctor);
+    if (transport) {
+      // could be as simple as this, if not for SSL:
+      //if (pn_transport_pending(transport) > 0)
+      //  return false;
+      if (pn_transport_buffered_output(transport) > 0)
+        return false;
+    }
+
     pn_connection_t *conn = pn_connector_connection(ctor);
 
     pn_link_t *link = pn_link_head(conn, PN_LOCAL_ACTIVE);

Modified: qpid/proton/branches/kgiusti-proton-225/proton-c/src/posix/driver.c
URL: http://svn.apache.org/viewvc/qpid/proton/branches/kgiusti-proton-225/proton-c/src/posix/driver.c?rev=1446691&r1=1446690&r2=1446691&view=diff
==============================================================================
--- qpid/proton/branches/kgiusti-proton-225/proton-c/src/posix/driver.c (original)
+++ qpid/proton/branches/kgiusti-proton-225/proton-c/src/posix/driver.c Fri Feb 15 17:07:44 2013
@@ -103,7 +103,6 @@ struct pn_listener_t {
   void *context;
 };
 
-#define IO_BUF_SIZE (64*1024)
 #define PN_NAME_MAX (256)
 
 struct pn_connector_t {
@@ -120,13 +119,7 @@ struct pn_connector_t {
   pn_trace_t trace;
   bool closed;
   pn_timestamp_t wakeup;
-  void (*read)(pn_connector_t *);
-  void (*write) (pn_connector_t *);
-  size_t input_size;
-  char input[IO_BUF_SIZE];
   bool input_eos;
-  size_t output_size;
-  char output[IO_BUF_SIZE];
   pn_connection_t *connection;
   pn_transport_t *transport;
   pn_sasl_t *sasl;
@@ -378,9 +371,6 @@ pn_connector_t *pn_connector(pn_driver_t
   return c;
 }
 
-static void pn_connector_read(pn_connector_t *ctor);
-static void pn_connector_write(pn_connector_t *ctor);
-
 pn_connector_t *pn_connector_fd(pn_driver_t *driver, int fd, void *context)
 {
   if (!driver) return NULL;
@@ -400,11 +390,7 @@ pn_connector_t *pn_connector_fd(pn_drive
   c->trace = driver->trace;
   c->closed = false;
   c->wakeup = 0;
-  c->read = pn_connector_read;
-  c->write = pn_connector_write;
-  c->input_size = 0;
   c->input_eos = false;
-  c->output_size = 0;
   c->connection = NULL;
   c->transport = pn_transport();
   c->sasl = pn_sasl(c->transport);
@@ -503,74 +489,6 @@ void pn_connector_free(pn_connector_t *c
   free(ctor);
 }
 
-static void pn_connector_read(pn_connector_t *ctor)
-{
-  ssize_t n = recv(ctor->fd, ctor->input + ctor->input_size, IO_BUF_SIZE - ctor->input_size, 0);
-  if (n < 0) {
-      if (errno != EAGAIN) {
-          if (n < 0) perror("read");
-          ctor->status &= ~PN_SEL_RD;
-          ctor->input_eos = true;
-      }
-  } else if (n == 0) {
-    ctor->status &= ~PN_SEL_RD;
-    ctor->input_eos = true;
-  } else {
-    ctor->input_size += n;
-  }
-}
-
-static void pn_connector_consume(pn_connector_t *ctor, int n)
-{
-  ctor->input_size -= n;
-  memmove(ctor->input, ctor->input + n, ctor->input_size);
-}
-
-static void pn_connector_process_input(pn_connector_t *ctor)
-{
-  pn_transport_t *transport = ctor->transport;
-  if (!ctor->input_done) {
-    if (ctor->input_size > 0 || ctor->input_eos) {
-      ssize_t n = pn_transport_input(transport, ctor->input, ctor->input_size);
-      if (n >= 0) {
-        pn_connector_consume(ctor, n);
-      } else {
-        pn_connector_consume(ctor, ctor->input_size);
-        ctor->input_done = true;
-      }
-    }
-  }
-}
-
-static char *pn_connector_output(pn_connector_t *ctor)
-{
-  return ctor->output + ctor->output_size;
-}
-
-static size_t pn_connector_available(pn_connector_t *ctor)
-{
-  return IO_BUF_SIZE - ctor->output_size;
-}
-
-static void pn_connector_process_output(pn_connector_t *ctor)
-{
-  pn_transport_t *transport = ctor->transport;
-  if (!ctor->output_done) {
-    ssize_t n = pn_transport_output(transport, pn_connector_output(ctor),
-                                    pn_connector_available(ctor));
-    if (n >= 0) {
-      ctor->output_size += n;
-    } else {
-      ctor->output_done = true;
-    }
-  }
-
-  if (ctor->output_size) {
-    ctor->status |= PN_SEL_WR;
-  }
-}
-
-
 void pn_connector_activate(pn_connector_t *ctor, pn_activate_criteria_t crit)
 {
     switch (crit) {
@@ -606,28 +524,6 @@ bool pn_connector_activated(pn_connector
     return result;
 }
 
-
-static void pn_connector_write(pn_connector_t *ctor)
-{
-  if (ctor->output_size > 0) {
-    ssize_t n = pn_send(ctor->fd, ctor->output, ctor->output_size);
-    if (n < 0) {
-      // XXX
-        if (errno != EAGAIN) {
-            perror("send");
-            ctor->output_size = 0;
-            ctor->output_done = true;
-        }
-    } else {
-      ctor->output_size -= n;
-      memmove(ctor->output, ctor->output + n, ctor->output_size);
-    }
-  }
-
-  if (!ctor->output_size)
-    ctor->status &= ~PN_SEL_WR;
-}
-
 static pn_timestamp_t pn_connector_tick(pn_connector_t *ctor, time_t now)
 {
   if (!ctor->transport) return 0;
@@ -639,21 +535,80 @@ void pn_connector_process(pn_connector_t
   if (c) {
     if (c->closed) return;
 
-    if (c->pending_read) {
-      c->read(c);
-      c->pending_read = false;
+    pn_transport_t *transport = c->transport;
+
+    ///
+    /// Socket read
+    ///
+    if (!c->input_done) {
+      ssize_t capacity = pn_transport_capacity(transport);
+      if (capacity > 0) {
+        c->status |= PN_SEL_RD;
+        if (c->pending_read) {
+          c->pending_read = false;
+          ssize_t n =  recv(c->fd, pn_transport_buffer(transport),
+                            capacity, 0);
+          if (n < 0) {
+            if (errno != EAGAIN) {
+              perror("read");
+              c->status &= ~PN_SEL_RD;
+              c->input_eos = true;
+            }
+          } else {
+            if (n == 0) {
+              c->status &= ~PN_SEL_RD;
+              c->input_eos = true;
+            }
+            if (pn_transport_push(transport, (size_t) n) < 0) {
+              c->status &= ~PN_SEL_RD;
+              c->input_done = true;
+            }
+          }
+        }
+      } else if (capacity < 0) {
+        c->status &= ~PN_SEL_RD;
+        c->input_done = true;
+      }
     }
-    pn_connector_process_input(c);
 
+    ///
+    /// Event wakeup
+    ///
     c->wakeup = pn_connector_tick(c, pn_i_now());
 
-    pn_connector_process_output(c);
-    if (c->pending_write) {
-      c->write(c);
-      c->pending_write = false;
-      pn_connector_process_output(c);  // XXX: review this - there's a better way to determine if the WR flag should be re-set
+    ///
+    /// Socket write
+    ///
+    if (!c->output_done) {
+      ssize_t pending = pn_transport_pending(transport);
+      if (pending > 0) {
+        c->status |= PN_SEL_WR;
+        if (c->pending_write) {
+          c->pending_write = false;
+          ssize_t n = pn_send(c->fd, pn_transport_peek(transport), pending);
+          if (n < 0) {
+            // XXX
+            if (errno != EAGAIN) {
+              perror("send");
+              c->output_done = true;
+              c->status &= ~PN_SEL_WR;
+            }
+          } else if (n) {
+            pn_transport_pop(transport, (size_t) n);
+            pending -= n;
+            if (pending == 0)
+              c->status &= ~PN_SEL_WR;
+          }
+        }
+      } else if (pending < 0) {
+        c->output_done = true;
+        c->status &= ~PN_SEL_WR;
+      }
     }
-    if (c->output_size == 0 && c->input_done && c->output_done) {
+
+    // Closed?
+
+    if (c->input_done && c->output_done) {
       if (c->trace & (PN_TRACE_FRM | PN_TRACE_RAW | PN_TRACE_DRV)) {
         fprintf(stderr, "Closed %s\n", c->name);
       }
@@ -879,8 +834,7 @@ pn_connector_t *pn_driver_connector(pn_d
     pn_connector_t *c = d->connector_next;
     d->connector_next = c->connector_next;
 
-    if (c->closed || c->pending_read || c->pending_write || c->pending_tick ||
-        c->input_size || c->input_eos) {
+    if (c->closed || c->pending_read || c->pending_write || c->pending_tick || c->input_eos) {
       return c;
     }
   }

Modified: qpid/proton/branches/kgiusti-proton-225/proton-c/src/ssl/openssl.c
URL: http://svn.apache.org/viewvc/qpid/proton/branches/kgiusti-proton-225/proton-c/src/ssl/openssl.c?rev=1446691&r1=1446690&r2=1446691&view=diff
==============================================================================
--- qpid/proton/branches/kgiusti-proton-225/proton-c/src/ssl/openssl.c (original)
+++ qpid/proton/branches/kgiusti-proton-225/proton-c/src/ssl/openssl.c Fri Feb 15 17:07:44 2013
@@ -127,7 +127,8 @@ static int init_ssl_socket( pn_ssl_t * )
 static void release_ssl_socket( pn_ssl_t * );
 static pn_ssl_session_t *ssn_cache_find( pn_ssl_domain_t *, const char * );
 static void ssl_session_free( pn_ssl_session_t *);
-
+static size_t buffered_output( pn_io_layer_t *io_layer );
+static size_t buffered_input( pn_io_layer_t *io_layer );
 
 // @todo: used to avoid littering the code with calls to printf...
 static void _log_error(const char *fmt, ...)
@@ -740,6 +741,8 @@ pn_ssl_t *pn_ssl(pn_transport_t *transpo
   ssl->io_layer->process_input = pn_io_layer_input_passthru;
   ssl->io_layer->process_output = pn_io_layer_output_passthru;
   ssl->io_layer->process_tick = pn_io_layer_tick_passthru;
+  ssl->io_layer->buffered_output = buffered_output;
+  ssl->io_layer->buffered_input = buffered_input;
 
   ssl->trace = (transport->disp) ? transport->disp->trace : PN_TRACE_OFF;
 
@@ -1277,3 +1280,31 @@ static ssize_t process_output_done(pn_io
 {
   return PN_EOS;
 }
+
+// return # output bytes sitting in this layer
+static size_t buffered_output(pn_io_layer_t *io_layer)
+{
+  size_t count = 0;
+  pn_ssl_t *ssl = (pn_ssl_t *)io_layer->context;
+  if (ssl) {
+    count += ssl->out_count;
+    if (ssl->bio_net_io) { // pick up any bytes waiting for network io
+      count += BIO_ctrl_pending(ssl->bio_net_io);
+    }
+  }
+  return count;
+}
+
+// return # input bytes sitting in this layer
+static size_t buffered_input( pn_io_layer_t *io_layer )
+{
+  size_t count = 0;
+  pn_ssl_t *ssl = (pn_ssl_t *)io_layer->context;
+  if (ssl) {
+    count += ssl->in_count;
+    if (ssl->bio_ssl) { // pick up any bytes waiting to be read
+      count += BIO_ctrl_pending(ssl->bio_ssl);
+    }
+  }
+  return count;
+}

Modified: qpid/proton/branches/kgiusti-proton-225/proton-c/src/windows/driver.c
URL: http://svn.apache.org/viewvc/qpid/proton/branches/kgiusti-proton-225/proton-c/src/windows/driver.c?rev=1446691&r1=1446690&r2=1446691&view=diff
==============================================================================
--- qpid/proton/branches/kgiusti-proton-225/proton-c/src/windows/driver.c (original)
+++ qpid/proton/branches/kgiusti-proton-225/proton-c/src/windows/driver.c Fri Feb 15 17:07:44 2013
@@ -145,7 +145,6 @@ struct pn_listener_t {
   void *context;
 };
 
-#define IO_BUF_SIZE (64*1024)
 #define PN_NAME_MAX (256)
 
 struct pn_connector_t {
@@ -162,13 +161,7 @@ struct pn_connector_t {
   pn_trace_t trace;
   bool closed;
   pn_timestamp_t wakeup;
-  void (*read)(pn_connector_t *);
-  void (*write) (pn_connector_t *);
-  size_t input_size;
-  char input[IO_BUF_SIZE];
   bool input_eos;
-  size_t output_size;
-  char output[IO_BUF_SIZE];
   pn_connection_t *connection;
   pn_transport_t *transport;
   pn_sasl_t *sasl;
@@ -442,11 +435,7 @@ pn_connector_t *pn_connector_fd(pn_drive
   c->trace = driver->trace;
   c->closed = false;
   c->wakeup = 0;
-  c->read = pn_connector_read;
-  c->write = pn_connector_write;
-  c->input_size = 0;
   c->input_eos = false;
-  c->output_size = 0;
   c->connection = NULL;
   c->transport = pn_transport();
   c->sasl = pn_sasl(c->transport);
@@ -545,74 +534,6 @@ void pn_connector_free(pn_connector_t *c
   free(ctor);
 }
 
-static void pn_connector_read(pn_connector_t *ctor)
-{
-  ssize_t n = recv(ctor->fd, ctor->input + ctor->input_size, IO_BUF_SIZE - ctor->input_size, 0);
-  if (n < 0) {
-      if (errno != EAGAIN) {
-          if (n < 0) perror("read");
-          ctor->status &= ~PN_SEL_RD;
-          ctor->input_eos = true;
-      }
-  } else if (n == 0) {
-    ctor->status &= ~PN_SEL_RD;
-    ctor->input_eos = true;
-  } else {
-    ctor->input_size += n;
-  }
-}
-
-static void pn_connector_consume(pn_connector_t *ctor, int n)
-{
-  ctor->input_size -= n;
-  memmove(ctor->input, ctor->input + n, ctor->input_size);
-}
-
-static void pn_connector_process_input(pn_connector_t *ctor)
-{
-  pn_transport_t *transport = ctor->transport;
-  if (!ctor->input_done) {
-    if (ctor->input_size > 0 || ctor->input_eos) {
-      ssize_t n = pn_transport_input(transport, ctor->input, ctor->input_size);
-      if (n >= 0) {
-        pn_connector_consume(ctor, n);
-      } else {
-        pn_connector_consume(ctor, ctor->input_size);
-        ctor->input_done = true;
-      }
-    }
-  }
-}
-
-static char *pn_connector_output(pn_connector_t *ctor)
-{
-  return ctor->output + ctor->output_size;
-}
-
-static size_t pn_connector_available(pn_connector_t *ctor)
-{
-  return IO_BUF_SIZE - ctor->output_size;
-}
-
-static void pn_connector_process_output(pn_connector_t *ctor)
-{
-  pn_transport_t *transport = ctor->transport;
-  if (!ctor->output_done) {
-    ssize_t n = pn_transport_output(transport, pn_connector_output(ctor),
-                                    pn_connector_available(ctor));
-    if (n >= 0) {
-      ctor->output_size += n;
-    } else {
-      ctor->output_done = true;
-    }
-  }
-
-  if (ctor->output_size) {
-    ctor->status |= PN_SEL_WR;
-  }
-}
-
-
 void pn_connector_activate(pn_connector_t *ctor, pn_activate_criteria_t crit)
 {
     switch (crit) {
@@ -648,28 +569,6 @@ bool pn_connector_activated(pn_connector
     return result;
 }
 
-
-static void pn_connector_write(pn_connector_t *ctor)
-{
-  if (ctor->output_size > 0) {
-    ssize_t n = pn_send(ctor->fd, ctor->output, ctor->output_size);
-    if (n < 0) {
-      // XXX
-        if (errno != EAGAIN) {
-            perror("send");
-            ctor->output_size = 0;
-            ctor->output_done = true;
-        }
-    } else {
-      ctor->output_size -= n;
-      memmove(ctor->output, ctor->output + n, ctor->output_size);
-    }
-  }
-
-  if (!ctor->output_size)
-    ctor->status &= ~PN_SEL_WR;
-}
-
 static pn_timestamp_t pn_connector_tick(pn_connector_t *ctor, time_t now)
 {
   if (!ctor->transport) return 0;
@@ -681,21 +580,80 @@ void pn_connector_process(pn_connector_t
   if (c) {
     if (c->closed) return;
 
-    if (c->pending_read) {
-      c->read(c);
-      c->pending_read = false;
+    pn_transport_t *transport = c->transport;
+
+    ///
+    /// Socket read
+    ///
+    if (!c->input_done) {
+      ssize_t capacity = pn_transport_capacity(transport);
+      if (capacity > 0) {
+        c->status |= PN_SEL_RD;
+        if (c->pending_read) {
+          c->pending_read = false;
+          ssize_t n =  recv(c->fd, pn_transport_buffer(transport),
+                            capacity, 0);
+          if (n < 0) {
+            if (errno != EAGAIN) {
+              perror("read");
+              c->status &= ~PN_SEL_RD;
+              c->input_eos = true;
+            }
+          } else {
+            if (n == 0) {
+              c->status &= ~PN_SEL_RD;
+              c->input_eos = true;
+            }
+            if (pn_transport_push(transport, (size_t) n) < 0) {
+              c->status &= ~PN_SEL_RD;
+              c->input_done = true;
+            }
+          }
+        }
+      } else if (capacity < 0) {
+        c->status &= ~PN_SEL_RD;
+        c->input_done = true;
+      }
     }
-    pn_connector_process_input(c);
 
+    ///
+    /// Event wakeup
+    ///
     c->wakeup = pn_connector_tick(c, pn_i_now());
 
-    pn_connector_process_output(c);
-    if (c->pending_write) {
-      c->write(c);
-      c->pending_write = false;
-      pn_connector_process_output(c);  // XXX: review this - there's a better way to determine if the WR flag should be re-set
+    ///
+    /// Socket write
+    ///
+    if (!c->output_done) {
+      ssize_t pending = pn_transport_pending(transport);
+      if (pending > 0) {
+        c->status |= PN_SEL_WR;
+        if (c->pending_write) {
+          c->pending_write = false;
+          ssize_t n = pn_send(c->fd, pn_transport_peek(transport), pending);
+          if (n < 0) {
+            // XXX
+            if (errno != EAGAIN) {
+              perror("send");
+              c->output_done = true;
+              c->status &= ~PN_SEL_WR;
+            }
+          } else if (n) {
+            pn_transport_pop(transport, (size_t) n);
+            pending -= n;
+            if (pending == 0)
+              c->status &= ~PN_SEL_WR;
+          }
+        }
+      } else if (pending < 0) {
+        c->output_done = true;
+        c->status &= ~PN_SEL_WR;
+      }
     }
-    if (c->output_size == 0 && c->input_done && c->output_done) {
+
+    // Closed?
+
+    if (c->input_done && c->output_done) {
       if (c->trace & (PN_TRACE_FRM | PN_TRACE_RAW | PN_TRACE_DRV)) {
         fprintf(stderr, "Closed %s\n", c->name);
       }
@@ -960,8 +918,7 @@ pn_connector_t *pn_driver_connector(pn_d
     pn_connector_t *c = d->connector_next;
     d->connector_next = c->connector_next;
 
-    if (c->closed || c->pending_read || c->pending_write || c->pending_tick ||
-        c->input_size || c->input_eos) {
+    if (c->closed || c->pending_read || c->pending_write || c->pending_tick || c->input_eos) {
       return c;
     }
   }



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org