You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@trafficserver.apache.org by wk...@apache.org on 2022/09/19 17:30:16 UTC
[trafficserver] branch master updated: Remove intermediate buffer in PluginVC (#8698)
This is an automated email from the ASF dual-hosted git repository.
wkaras pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/trafficserver.git
The following commit(s) were added to refs/heads/master by this push:
new feb7f1e36 Remove intermediate buffer in PluginVC (#8698)
feb7f1e36 is described below
commit feb7f1e368752065e67096c9df021845e07e5f5f
Author: Serris Lew <se...@gmail.com>
AuthorDate: Mon Sep 19 10:30:10 2022 -0700
Remove intermediate buffer in PluginVC (#8698)
* Remove intermediate buffer in PluginVC
* Update inactivity status for both sides
* Accounts for read ntodo when writing, resolves issues when rw ntodo are not equal
* Checks vc is closed before reenabled, prevent race cond
Co-authored-by: Serris Lew <ls...@apple.com>
---
proxy/PluginVC.cc | 240 +++++++++++++++++-------------------------------------
proxy/PluginVC.h | 10 +--
2 files changed, 77 insertions(+), 173 deletions(-)
diff --git a/proxy/PluginVC.cc b/proxy/PluginVC.cc
index e64d59f68..d491d4d5a 100644
--- a/proxy/PluginVC.cc
+++ b/proxy/PluginVC.cc
@@ -217,11 +217,11 @@ PluginVC::main_handler(int event, void *data)
}
if (need_read_process) {
- process_read_side(false);
+ process_read_side();
}
if (need_write_process && !closed) {
- process_write_side(false);
+ process_write_side();
}
}
@@ -343,11 +343,11 @@ PluginVC::reenable_re(VIO *vio)
if (vio->op == VIO::WRITE) {
ink_assert(vio == &write_state.vio);
need_write_process = true;
- process_write_side(false);
+ process_write_side();
} else if (vio->op == VIO::READ) {
ink_assert(vio == &read_state.vio);
need_read_process = true;
- process_read_side(false);
+ process_read_side();
} else {
ink_release_assert(0);
}
@@ -458,22 +458,20 @@ PluginVC::transfer_bytes(MIOBuffer *transfer_to, IOBufferReader *transfer_from,
return total_added;
}
-// void PluginVC::process_write_side(bool cb_ok)
+// void PluginVC::process_write_side()
//
// This function may only be called while holding
// this->mutex & while it is ok to callback the
// write side continuation
//
-// Does write side processing
+// Does write side processing directly to other read side writer
//
void
-PluginVC::process_write_side(bool other_side_call)
+PluginVC::process_write_side()
{
ink_assert(!deletable);
ink_assert(magic == PLUGIN_VC_MAGIC_ALIVE);
- MIOBuffer *core_buffer = (vc_type == PLUGIN_VC_ACTIVE) ? core_obj->a_to_p_buffer : core_obj->p_to_a_buffer;
-
Debug("pvc", "[%u] %s: process_write_side", core_obj->id, PVC_TYPE);
need_write_process = false;
@@ -494,7 +492,8 @@ PluginVC::process_write_side(bool other_side_call)
Debug("pvc", "[%u] %s: process_write_side; act_on %" PRId64 "", core_obj->id, PVC_TYPE, act_on);
- if (other_side->closed || other_side->read_state.shutdown) {
+ // Check read_state of other side
+ if (other_side->read_state.vio.op != VIO::READ || other_side->closed || other_side->read_state.shutdown) {
write_state.vio.cont->handleEvent(VC_EVENT_ERROR, &write_state.vio);
return;
}
@@ -507,28 +506,54 @@ PluginVC::process_write_side(bool other_side_call)
}
return;
}
- // Bytes available, try to transfer to the PluginVCCore
- // intermediate buffer
- //
- int64_t buf_space = core_obj->buffer_size - core_buffer->max_read_avail();
+
+ // Check the state of the other side read buffer as well as ntodo
+ int64_t other_ntodo = other_side->read_state.vio.ntodo();
+ if (other_ntodo == 0) {
+ return;
+ }
+ act_on = std::min(act_on, other_ntodo);
+
+ // Other side read_state is open
+ // obtain the proper mutexes on other side
+ EThread *my_ethread = mutex->thread_holding;
+ ink_assert(my_ethread != nullptr);
+ MUTEX_TRY_LOCK(lock, other_side->read_state.vio.mutex, my_ethread);
+ if (!lock.is_locked()) {
+ Debug("pvc_event", "[%u] %s: process_read_side from other side lock miss, retrying", other_side->core_obj->id,
+ ((other_side->vc_type == PLUGIN_VC_ACTIVE) ? "Active" : "Passive"));
+
+ // set need_read_process to enforce the read processing
+ other_side->need_read_process = true;
+ other_side->setup_event_cb(PVC_LOCK_RETRY_TIME, &other_side->core_lock_retry_event);
+ return;
+ }
+
+ // Bytes available, setting up other side read state writer
+ MIOBuffer *output_buffer = other_side->read_state.vio.get_writer();
+ int64_t water_mark = output_buffer->water_mark;
+ water_mark = std::max<int64_t>(water_mark, core_obj->buffer_size);
+ int64_t buf_space = water_mark - output_buffer->max_read_avail();
if (buf_space <= 0) {
- Debug("pvc", "[%u] %s: process_write_side no buffer space", core_obj->id, PVC_TYPE);
+ Debug("pvc", "[%u] %s: process_read_side from other side no buffer space", core_obj->id, PVC_TYPE);
return;
}
act_on = std::min(act_on, buf_space);
- int64_t added = transfer_bytes(core_buffer, reader, act_on);
+ int64_t added = transfer_bytes(output_buffer, reader, act_on);
if (added < 0) {
// Couldn't actually get the buffer space. This only
// happens on small transfers with the above
// buffer_size factor doesn't apply
- Debug("pvc", "[%u] %s: process_write_side out of buffer space", core_obj->id, PVC_TYPE);
+ Debug("pvc", "[%u] %s: process_read_side from other side out of buffer space", core_obj->id, PVC_TYPE);
return;
}
write_state.vio.ndone += added;
+ other_side->read_state.vio.ndone += added;
- Debug("pvc", "[%u] %s: process_write_side; added %" PRId64 "", core_obj->id, PVC_TYPE, added);
+ Debug("pvc", "[%u] %s: process_write_side and process_read_side from other side; added %" PRId64 "", core_obj->id, PVC_TYPE,
+ added);
if (write_state.vio.ntodo() == 0) {
write_state.vio.cont->handleEvent(VC_EVENT_WRITE_COMPLETE, &write_state.vio);
@@ -536,39 +561,17 @@ PluginVC::process_write_side(bool other_side_call)
write_state.vio.cont->handleEvent(VC_EVENT_WRITE_READY, &write_state.vio);
}
- update_inactive_time();
+ if (other_side->read_state.vio.ntodo() == 0) {
+ other_side->read_state.vio.cont->handleEvent(VC_EVENT_READ_COMPLETE, &other_side->read_state.vio);
+ } else {
+ other_side->read_state.vio.cont->handleEvent(VC_EVENT_READ_READY, &other_side->read_state.vio);
+ }
- // Wake up the read side on the other side to process these bytes
- if (!other_side->closed) {
- if (!other_side_call) {
- /* To clear the `need_read_process`, the mutexes must be obtained:
- *
- * - PluginVC::mutex
- * - PluginVC::read_state.vio.mutex
- *
- */
- if (other_side->read_state.vio.op != VIO::READ || other_side->closed || other_side->read_state.shutdown) {
- // Just return, no touch on `other_side->need_read_process`.
- return;
- }
- // Acquire the lock of the read side continuation
- EThread *my_ethread = mutex->thread_holding;
- ink_assert(my_ethread != nullptr);
- MUTEX_TRY_LOCK(lock, other_side->read_state.vio.mutex, my_ethread);
- if (!lock.is_locked()) {
- Debug("pvc_event", "[%u] %s: process_read_side from other side lock miss, retrying", other_side->core_obj->id,
- ((other_side->vc_type == PLUGIN_VC_ACTIVE) ? "Active" : "Passive"));
-
- // set need_read_process to enforce the read processing
- other_side->need_read_process = true;
- other_side->setup_event_cb(PVC_LOCK_RETRY_TIME, &other_side->core_lock_retry_event);
- return;
- }
+ update_inactive_time();
+ other_side->update_inactive_time();
- other_side->process_read_side(true);
- } else {
- other_side->read_state.vio.reenable();
- }
+ if (!closed) {
+ write_state.vio.reenable();
}
}
@@ -578,120 +581,45 @@ PluginVC::process_write_side(bool other_side_call)
// this->mutex & while it is ok to callback the
// read side continuation
//
-// Does read side processing
+// Closes read state if other side
+// write state is no longer available
//
void
-PluginVC::process_read_side(bool other_side_call)
+PluginVC::process_read_side()
{
ink_assert(!deletable);
ink_assert(magic == PLUGIN_VC_MAGIC_ALIVE);
- // TODO: Never used??
- // MIOBuffer *core_buffer;
-
- IOBufferReader *core_reader;
-
- if (vc_type == PLUGIN_VC_ACTIVE) {
- // core_buffer = core_obj->p_to_a_buffer;
- core_reader = core_obj->p_to_a_reader;
- } else {
- ink_assert(vc_type == PLUGIN_VC_PASSIVE);
- // core_buffer = core_obj->a_to_p_buffer;
- core_reader = core_obj->a_to_p_reader;
- }
-
Debug("pvc", "[%u] %s: process_read_side", core_obj->id, PVC_TYPE);
need_read_process = false;
// Check read_state
- if (read_state.vio.op != VIO::READ || closed || read_state.shutdown) {
- return;
- }
-
- // Check the state of our read buffer as well as ntodo
- int64_t ntodo = read_state.vio.ntodo();
- if (ntodo == 0) {
+ if (read_state.vio.op != VIO::READ || closed || read_state.shutdown || !read_state.vio.ntodo()) {
return;
}
- int64_t bytes_avail = core_reader->read_avail();
- int64_t act_on = std::min(bytes_avail, ntodo);
-
- Debug("pvc", "[%u] %s: process_read_side; act_on %" PRId64 "", core_obj->id, PVC_TYPE, act_on);
-
- if (act_on <= 0) {
- if (other_side->closed || other_side->write_state.shutdown) {
- read_state.vio.cont->handleEvent(VC_EVENT_EOS, &read_state.vio);
+ if (!other_side->closed && !other_side->write_state.shutdown) {
+ if (other_side->write_state.vio.op != VIO::WRITE || other_side->write_state.shutdown) {
+ // Just return, no touch on `other_side->need_write_process`.
+ return;
}
- return;
- }
- // Bytes available, try to transfer from the PluginVCCore
- // intermediate buffer
- //
- MIOBuffer *output_buffer = read_state.vio.get_writer();
-
- int64_t water_mark = output_buffer->water_mark;
- water_mark = std::max(water_mark, static_cast<int64_t>(core_obj->buffer_size));
- int64_t buf_space = water_mark - output_buffer->max_read_avail();
- if (buf_space <= 0) {
- Debug("pvc", "[%u] %s: process_read_side no buffer space", core_obj->id, PVC_TYPE);
- return;
- }
- act_on = std::min(act_on, buf_space);
-
- int64_t added = transfer_bytes(output_buffer, core_reader, act_on);
- if (added <= 0) {
- // Couldn't actually get the buffer space. This only
- // happens on small transfers with the above
- // buffer_size factor doesn't apply
- Debug("pvc", "[%u] %s: process_read_side out of buffer space", core_obj->id, PVC_TYPE);
- return;
- }
-
- read_state.vio.ndone += added;
-
- Debug("pvc", "[%u] %s: process_read_side; added %" PRId64 "", core_obj->id, PVC_TYPE, added);
-
- if (read_state.vio.ntodo() == 0) {
- read_state.vio.cont->handleEvent(VC_EVENT_READ_COMPLETE, &read_state.vio);
- } else {
- read_state.vio.cont->handleEvent(VC_EVENT_READ_READY, &read_state.vio);
- }
-
- update_inactive_time();
-
- // Wake up the other side so it knows there is space available in
- // intermediate buffer
- if (!other_side->closed) {
- if (!other_side_call) {
- /* To clear the `need_write_process`, the mutexes must be obtained:
- *
- * - PluginVC::mutex
- * - PluginVC::write_state.vio.mutex
- *
- */
- if (other_side->write_state.vio.op != VIO::WRITE || other_side->closed || other_side->write_state.shutdown) {
- // Just return, no touch on `other_side->need_write_process`.
- return;
- }
- // Acquire the lock of the write side continuation
- EThread *my_ethread = mutex->thread_holding;
- ink_assert(my_ethread != nullptr);
- MUTEX_TRY_LOCK(lock, other_side->write_state.vio.mutex, my_ethread);
- if (!lock.is_locked()) {
- Debug("pvc_event", "[%u] %s: process_write_side from other side lock miss, retrying", other_side->core_obj->id,
- ((other_side->vc_type == PLUGIN_VC_ACTIVE) ? "Active" : "Passive"));
-
- // set need_write_process to enforce the write processing
- other_side->need_write_process = true;
- other_side->setup_event_cb(PVC_LOCK_RETRY_TIME, &other_side->core_lock_retry_event);
- return;
- }
+ // Acquire the lock of the write side continuation
+ EThread *my_ethread = mutex->thread_holding;
+ ink_assert(my_ethread != nullptr);
+ MUTEX_TRY_LOCK(lock, other_side->write_state.vio.mutex, my_ethread);
+ if (!lock.is_locked()) {
+ Debug("pvc_event", "[%u] %s: process_write_side from other side lock miss, retrying", other_side->core_obj->id,
+ ((other_side->vc_type == PLUGIN_VC_ACTIVE) ? "Active" : "Passive"));
- other_side->process_write_side(true);
- } else {
- other_side->write_state.vio.reenable();
+ // set need_write_process to enforce the write processing
+ other_side->need_write_process = true;
+ other_side->setup_event_cb(PVC_LOCK_RETRY_TIME, &other_side->core_lock_retry_event);
+ return;
}
+ other_side->process_write_side();
+ } else {
+ Debug("pvc", "[%u] %s: write_state of other side is not available", core_obj->id, PVC_TYPE);
+ read_state.vio.cont->handleEvent(VC_EVENT_EOS, &read_state.vio);
}
}
@@ -1078,14 +1006,6 @@ PluginVCCore::init(int64_t buffer_index, int64_t buffer_water_mark)
passive_vc.mutex = mutex;
passive_vc.thread = active_vc.thread;
- p_to_a_buffer = new_MIOBuffer(buffer_index);
- p_to_a_buffer->water_mark = buffer_water_mark;
- p_to_a_reader = p_to_a_buffer->alloc_reader();
-
- a_to_p_buffer = new_MIOBuffer(buffer_index);
- a_to_p_buffer->water_mark = buffer_water_mark;
- a_to_p_reader = a_to_p_buffer->alloc_reader();
-
buffer_size = BUFFER_SIZE_FOR_INDEX(buffer_index);
Debug("pvc",
@@ -1111,16 +1031,6 @@ PluginVCCore::destroy()
passive_vc.write_state.vio.buffer.clear();
passive_vc.magic = PLUGIN_VC_MAGIC_DEAD;
- if (p_to_a_buffer) {
- free_MIOBuffer(p_to_a_buffer);
- p_to_a_buffer = nullptr;
- }
-
- if (a_to_p_buffer) {
- free_MIOBuffer(a_to_p_buffer);
- a_to_p_buffer = nullptr;
- }
-
this->mutex = nullptr;
delete this;
}
diff --git a/proxy/PluginVC.h b/proxy/PluginVC.h
index 81b484d92..3f90678a6 100644
--- a/proxy/PluginVC.h
+++ b/proxy/PluginVC.h
@@ -150,8 +150,8 @@ public:
int main_handler(int event, void *data);
private:
- void process_read_side(bool);
- void process_write_side(bool);
+ void process_read_side();
+ void process_write_side();
void process_close();
void process_timeout(Event **e, int event_to_send);
@@ -254,12 +254,6 @@ private:
Continuation *connect_to = nullptr;
bool connected = false;
- MIOBuffer *p_to_a_buffer = nullptr;
- IOBufferReader *p_to_a_reader = nullptr;
-
- MIOBuffer *a_to_p_buffer = nullptr;
- IOBufferReader *a_to_p_reader = nullptr;
-
IpEndpoint passive_addr_struct;
IpEndpoint active_addr_struct;