You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by as...@apache.org on 2021/12/22 17:03:06 UTC
[qpid-proton] branch main updated: PROTON-2479: Make reading more efficient by resizing input buffer
This is an automated email from the ASF dual-hosted git repository.
astitcher pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/qpid-proton.git
The following commit(s) were added to refs/heads/main by this push:
new c1b2861 PROTON-2479: Make reading more efficient by resizing input buffer
c1b2861 is described below
commit c1b28618536c67fe9ba02b8b4b65e18608c3a08c
Author: Andrew Stitcher <as...@apache.org>
AuthorDate: Thu Dec 16 20:21:52 2021 -0500
PROTON-2479: Make reading more efficient by resizing input buffer
This also introduces a connection driver API that allows the input
buffer resizing to be done explicitly by the proactor code.
---
c/include/proton/connection_driver.h | 13 +++++++++++++
c/src/core/connection_driver.c | 5 +++++
c/src/core/engine-internal.h | 2 ++
c/src/core/transport.c | 32 +++++++++++++++++---------------
c/src/proactor/epoll.c | 6 ++++++
5 files changed, 43 insertions(+), 15 deletions(-)
diff --git a/c/include/proton/connection_driver.h b/c/include/proton/connection_driver.h
index b172643..bc478bd 100644
--- a/c/include/proton/connection_driver.h
+++ b/c/include/proton/connection_driver.h
@@ -138,6 +138,19 @@ PN_EXTERN void pn_connection_driver_destroy(pn_connection_driver_t *);
PN_EXTERN pn_connection_t *pn_connection_driver_release_connection(pn_connection_driver_t *d);
/**
+ * Try to get a read buffer with the specified size.
+ *
+ * This will try to grow the read buffer to the specified size and then it will return whatever size
+ * read buffer can be got.
+ *
+ * Copy data from your input byte source to buf.start, up to buf.size.
+ * Call pn_connection_driver_read_done() when reading is complete.
+ *
+ * buf.size==0 means reading is not possible: no buffer space or the read side is closed.
+ */
+PN_EXTERN pn_rwbytes_t pn_connection_driver_read_buffer_sized(pn_connection_driver_t *, size_t n);
+
+/**
* Get the read buffer.
*
* Copy data from your input byte source to buf.start, up to buf.size.
diff --git a/c/src/core/connection_driver.c b/c/src/core/connection_driver.c
index 803917c..6d9c085 100644
--- a/c/src/core/connection_driver.c
+++ b/c/src/core/connection_driver.c
@@ -87,6 +87,11 @@ void pn_connection_driver_destroy(pn_connection_driver_t *d) {
memset(d, 0, sizeof(*d));
}
+pn_rwbytes_t pn_connection_driver_read_buffer_sized(pn_connection_driver_t *d, size_t n) {
+ ssize_t cap = pni_transport_grow_capacity(d->transport, n);
+ return (cap > 0) ? pn_rwbytes(cap, pn_transport_tail(d->transport)) : pn_rwbytes(0, 0);
+}
+
pn_rwbytes_t pn_connection_driver_read_buffer(pn_connection_driver_t *d) {
ssize_t cap = pn_transport_capacity(d->transport);
return (cap > 0) ? pn_rwbytes(cap, pn_transport_tail(d->transport)) : pn_rwbytes(0, 0);
diff --git a/c/src/core/engine-internal.h b/c/src/core/engine-internal.h
index 5c8b6c2..09ff3ec 100644
--- a/c/src/core/engine-internal.h
+++ b/c/src/core/engine-internal.h
@@ -376,6 +376,8 @@ void pn_link_unbound(pn_link_t* link);
void pn_ep_incref(pn_endpoint_t *endpoint);
void pn_ep_decref(pn_endpoint_t *endpoint);
+ssize_t pni_transport_grow_capacity(pn_transport_t *transport, size_t n);
+
#if __cplusplus
}
#endif
diff --git a/c/src/core/transport.c b/c/src/core/transport.c
index b885e38..fc5d6ab 100644
--- a/c/src/core/transport.c
+++ b/c/src/core/transport.c
@@ -2916,6 +2916,22 @@ uint64_t pn_transport_get_frames_input(const pn_transport_t *transport)
return 0;
}
+ssize_t pni_transport_grow_capacity(pn_transport_t *transport, size_t n) {
+ // can we expand the size of the input buffer?
+ size_t size = pn_max(n, transport->input_size);
+ if (transport->local_max_frame) { // there is a limit to buffer size
+ size = pn_min(size, transport->local_max_frame);
+ }
+ if (size > transport->input_size) {
+ char *newbuf = (char *) pni_mem_subreallocate(pn_class(transport), transport, transport->input_buf, size );
+ if (newbuf) {
+ transport->input_buf = newbuf;
+ transport->input_size = size;
+ }
+ }
+ return transport->input_size-transport->input_pending;
+}
+
// input
ssize_t pn_transport_capacity(pn_transport_t *transport) /* <0 == done */
{
@@ -2924,21 +2940,7 @@ ssize_t pn_transport_capacity(pn_transport_t *transport) /* <0 == done */
ssize_t capacity = transport->input_size - transport->input_pending;
if ( capacity<=0 ) {
- // can we expand the size of the input buffer?
- int more = 0;
- if (!transport->local_max_frame) { // no limit (ha!)
- more = transport->input_size;
- } else if (transport->local_max_frame > transport->input_size) {
- more = pn_min(transport->input_size, transport->local_max_frame - transport->input_size);
- }
- if (more) {
- char *newbuf = (char *) pni_mem_subreallocate(pn_class(transport), transport, transport->input_buf, transport->input_size + more );
- if (newbuf) {
- transport->input_buf = newbuf;
- transport->input_size += more;
- capacity += more;
- }
- }
+ capacity = pni_transport_grow_capacity(transport, 2*transport->input_size);
}
return capacity;
}
diff --git a/c/src/proactor/epoll.c b/c/src/proactor/epoll.c
index 6ad043f..61e4dbd 100644
--- a/c/src/proactor/epoll.c
+++ b/c/src/proactor/epoll.c
@@ -1214,6 +1214,12 @@ static pn_event_batch_t *pconnection_process(pconnection_t *pc, uint32_t events,
ssize_t n = read(pc->psocket.epoll_io.fd, rbuf.start, rbuf.size);
if (n > 0) {
pn_connection_driver_read_done(&pc->driver, n);
+ // If n == rbuf.size then we should enlarge the buffer and see if there is more to read
+ if (n==(ssize_t)rbuf.size) {
+ rbuf = pn_connection_driver_read_buffer_sized(&pc->driver, n*2);
+ n = read(pc->psocket.epoll_io.fd, rbuf.start, rbuf.size);
+ pn_connection_driver_read_done(&pc->driver, n);
+ }
pc->output_drained = false;
pconnection_tick(pc); /* check for tick changes. */
tick_required = false;
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org