You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by as...@apache.org on 2021/12/22 17:03:06 UTC

[qpid-proton] branch main updated: PROTON-2479: Make reading more efficient by resizing input buffer

This is an automated email from the ASF dual-hosted git repository.

astitcher pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/qpid-proton.git


The following commit(s) were added to refs/heads/main by this push:
     new c1b2861  PROTON-2479: Make reading more efficient by resizing input buffer
c1b2861 is described below

commit c1b28618536c67fe9ba02b8b4b65e18608c3a08c
Author: Andrew Stitcher <as...@apache.org>
AuthorDate: Thu Dec 16 20:21:52 2021 -0500

    PROTON-2479: Make reading more efficient by resizing input buffer
    
    This also introduces a connection driver API that allows the input
    buffer resizing to be done explicitly by the proactor code.
---
 c/include/proton/connection_driver.h | 13 +++++++++++++
 c/src/core/connection_driver.c       |  5 +++++
 c/src/core/engine-internal.h         |  2 ++
 c/src/core/transport.c               | 32 +++++++++++++++++---------------
 c/src/proactor/epoll.c               |  6 ++++++
 5 files changed, 43 insertions(+), 15 deletions(-)

diff --git a/c/include/proton/connection_driver.h b/c/include/proton/connection_driver.h
index b172643..bc478bd 100644
--- a/c/include/proton/connection_driver.h
+++ b/c/include/proton/connection_driver.h
@@ -138,6 +138,19 @@ PN_EXTERN void pn_connection_driver_destroy(pn_connection_driver_t *);
 PN_EXTERN pn_connection_t *pn_connection_driver_release_connection(pn_connection_driver_t *d);
 
 /**
+ * Try to get a read buffer with the specified size.
+ *
+ * This will try to grow the read buffer to the specified size and then it will return whatever size
+ * read buffer can be got.
+ *
+ * Copy data from your input byte source to buf.start, up to buf.size.
+ * Call pn_connection_driver_read_done() when reading is complete.
+ *
+ * buf.size==0 means reading is not possible: no buffer space or the read side is closed.
+ */
+PN_EXTERN pn_rwbytes_t pn_connection_driver_read_buffer_sized(pn_connection_driver_t *, size_t n);
+
+/**
  * Get the read buffer.
  *
  * Copy data from your input byte source to buf.start, up to buf.size.
diff --git a/c/src/core/connection_driver.c b/c/src/core/connection_driver.c
index 803917c..6d9c085 100644
--- a/c/src/core/connection_driver.c
+++ b/c/src/core/connection_driver.c
@@ -87,6 +87,11 @@ void pn_connection_driver_destroy(pn_connection_driver_t *d) {
   memset(d, 0, sizeof(*d));
 }
 
+pn_rwbytes_t pn_connection_driver_read_buffer_sized(pn_connection_driver_t *d, size_t n) {
+  ssize_t cap = pni_transport_grow_capacity(d->transport, n);
+  return (cap > 0) ?  pn_rwbytes(cap, pn_transport_tail(d->transport)) : pn_rwbytes(0, 0);
+}
+
 pn_rwbytes_t pn_connection_driver_read_buffer(pn_connection_driver_t *d) {
   ssize_t cap = pn_transport_capacity(d->transport);
   return (cap > 0) ?  pn_rwbytes(cap, pn_transport_tail(d->transport)) : pn_rwbytes(0, 0);
diff --git a/c/src/core/engine-internal.h b/c/src/core/engine-internal.h
index 5c8b6c2..09ff3ec 100644
--- a/c/src/core/engine-internal.h
+++ b/c/src/core/engine-internal.h
@@ -376,6 +376,8 @@ void pn_link_unbound(pn_link_t* link);
 void pn_ep_incref(pn_endpoint_t *endpoint);
 void pn_ep_decref(pn_endpoint_t *endpoint);
 
+ssize_t pni_transport_grow_capacity(pn_transport_t *transport, size_t n);
+
 #if __cplusplus
 }
 #endif
diff --git a/c/src/core/transport.c b/c/src/core/transport.c
index b885e38..fc5d6ab 100644
--- a/c/src/core/transport.c
+++ b/c/src/core/transport.c
@@ -2916,6 +2916,22 @@ uint64_t pn_transport_get_frames_input(const pn_transport_t *transport)
   return 0;
 }
 
+ssize_t pni_transport_grow_capacity(pn_transport_t *transport, size_t n) {
+  // can we expand the size of the input buffer?
+  size_t size = pn_max(n, transport->input_size);
+  if (transport->local_max_frame) {  // there is a limit to buffer size
+    size = pn_min(size, transport->local_max_frame);
+  }
+  if (size > transport->input_size) {
+    char *newbuf = (char *) pni_mem_subreallocate(pn_class(transport), transport, transport->input_buf, size );
+    if (newbuf) {
+      transport->input_buf = newbuf;
+      transport->input_size = size;
+    }
+  }
+  return transport->input_size-transport->input_pending;
+}
+
 // input
 ssize_t pn_transport_capacity(pn_transport_t *transport)  /* <0 == done */
 {
@@ -2924,21 +2940,7 @@ ssize_t pn_transport_capacity(pn_transport_t *transport)  /* <0 == done */
 
   ssize_t capacity = transport->input_size - transport->input_pending;
   if ( capacity<=0 ) {
-    // can we expand the size of the input buffer?
-    int more = 0;
-    if (!transport->local_max_frame) {  // no limit (ha!)
-      more = transport->input_size;
-    } else if (transport->local_max_frame > transport->input_size) {
-      more = pn_min(transport->input_size, transport->local_max_frame - transport->input_size);
-    }
-    if (more) {
-      char *newbuf = (char *) pni_mem_subreallocate(pn_class(transport), transport, transport->input_buf, transport->input_size + more );
-      if (newbuf) {
-        transport->input_buf = newbuf;
-        transport->input_size += more;
-        capacity += more;
-      }
-    }
+    capacity = pni_transport_grow_capacity(transport, 2*transport->input_size);
   }
   return capacity;
 }
diff --git a/c/src/proactor/epoll.c b/c/src/proactor/epoll.c
index 6ad043f..61e4dbd 100644
--- a/c/src/proactor/epoll.c
+++ b/c/src/proactor/epoll.c
@@ -1214,6 +1214,12 @@ static pn_event_batch_t *pconnection_process(pconnection_t *pc, uint32_t events,
       ssize_t n = read(pc->psocket.epoll_io.fd, rbuf.start, rbuf.size);
       if (n > 0) {
         pn_connection_driver_read_done(&pc->driver, n);
+        // If n == rbuf.size then we should enlarge the buffer and see if there is more to read
+        if (n==(ssize_t)rbuf.size) {
+          rbuf = pn_connection_driver_read_buffer_sized(&pc->driver, n*2);
+          n = read(pc->psocket.epoll_io.fd, rbuf.start, rbuf.size);
+          pn_connection_driver_read_done(&pc->driver, n);
+        }
         pc->output_drained = false;
         pconnection_tick(pc);         /* check for tick changes. */
         tick_required = false;

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org