You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@trafficserver.apache.org by wk...@apache.org on 2022/09/19 17:30:16 UTC

[trafficserver] branch master updated: Remove intermediate buffer in PluginVC (#8698)

This is an automated email from the ASF dual-hosted git repository.

wkaras pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/trafficserver.git


The following commit(s) were added to refs/heads/master by this push:
     new feb7f1e36 Remove intermediate buffer in PluginVC (#8698)
feb7f1e36 is described below

commit feb7f1e368752065e67096c9df021845e07e5f5f
Author: Serris Lew <se...@gmail.com>
AuthorDate: Mon Sep 19 10:30:10 2022 -0700

    Remove intermediate buffer in PluginVC (#8698)
    
    * Remove intermediate buffer in PluginVC
    
    * Update inactivity status for both sides
    
    * Accounts for read ntodo when writing, resolves issues when rw ntodo are not equal
    
    * Checks vc is closed before reenabled, prevent race cond
    
    Co-authored-by: Serris Lew <ls...@apple.com>
---
 proxy/PluginVC.cc | 240 +++++++++++++++++-------------------------------------
 proxy/PluginVC.h  |  10 +--
 2 files changed, 77 insertions(+), 173 deletions(-)

diff --git a/proxy/PluginVC.cc b/proxy/PluginVC.cc
index e64d59f68..d491d4d5a 100644
--- a/proxy/PluginVC.cc
+++ b/proxy/PluginVC.cc
@@ -217,11 +217,11 @@ PluginVC::main_handler(int event, void *data)
     }
 
     if (need_read_process) {
-      process_read_side(false);
+      process_read_side();
     }
 
     if (need_write_process && !closed) {
-      process_write_side(false);
+      process_write_side();
     }
   }
 
@@ -343,11 +343,11 @@ PluginVC::reenable_re(VIO *vio)
   if (vio->op == VIO::WRITE) {
     ink_assert(vio == &write_state.vio);
     need_write_process = true;
-    process_write_side(false);
+    process_write_side();
   } else if (vio->op == VIO::READ) {
     ink_assert(vio == &read_state.vio);
     need_read_process = true;
-    process_read_side(false);
+    process_read_side();
   } else {
     ink_release_assert(0);
   }
@@ -458,22 +458,20 @@ PluginVC::transfer_bytes(MIOBuffer *transfer_to, IOBufferReader *transfer_from,
   return total_added;
 }
 
-// void PluginVC::process_write_side(bool cb_ok)
+// void PluginVC::process_write_side()
 //
 //   This function may only be called while holding
 //      this->mutex & while it is ok to callback the
 //      write side continuation
 //
-//   Does write side processing
+//   Does write side processing directly to other read side writer
 //
 void
-PluginVC::process_write_side(bool other_side_call)
+PluginVC::process_write_side()
 {
   ink_assert(!deletable);
   ink_assert(magic == PLUGIN_VC_MAGIC_ALIVE);
 
-  MIOBuffer *core_buffer = (vc_type == PLUGIN_VC_ACTIVE) ? core_obj->a_to_p_buffer : core_obj->p_to_a_buffer;
-
   Debug("pvc", "[%u] %s: process_write_side", core_obj->id, PVC_TYPE);
   need_write_process = false;
 
@@ -494,7 +492,8 @@ PluginVC::process_write_side(bool other_side_call)
 
   Debug("pvc", "[%u] %s: process_write_side; act_on %" PRId64 "", core_obj->id, PVC_TYPE, act_on);
 
-  if (other_side->closed || other_side->read_state.shutdown) {
+  // Check read_state of other side
+  if (other_side->read_state.vio.op != VIO::READ || other_side->closed || other_side->read_state.shutdown) {
     write_state.vio.cont->handleEvent(VC_EVENT_ERROR, &write_state.vio);
     return;
   }
@@ -507,28 +506,54 @@ PluginVC::process_write_side(bool other_side_call)
     }
     return;
   }
-  // Bytes available, try to transfer to the PluginVCCore
-  //   intermediate buffer
-  //
-  int64_t buf_space = core_obj->buffer_size - core_buffer->max_read_avail();
+
+  // Check the state of the other side read buffer as well as ntodo
+  int64_t other_ntodo = other_side->read_state.vio.ntodo();
+  if (other_ntodo == 0) {
+    return;
+  }
+  act_on = std::min(act_on, other_ntodo);
+
+  // Other side read_state is open
+  //  obtain the proper mutexes on other side
+  EThread *my_ethread = mutex->thread_holding;
+  ink_assert(my_ethread != nullptr);
+  MUTEX_TRY_LOCK(lock, other_side->read_state.vio.mutex, my_ethread);
+  if (!lock.is_locked()) {
+    Debug("pvc_event", "[%u] %s: process_read_side from other side lock miss, retrying", other_side->core_obj->id,
+          ((other_side->vc_type == PLUGIN_VC_ACTIVE) ? "Active" : "Passive"));
+
+    // set need_read_process to enforce the read processing
+    other_side->need_read_process = true;
+    other_side->setup_event_cb(PVC_LOCK_RETRY_TIME, &other_side->core_lock_retry_event);
+    return;
+  }
+
+  // Bytes available, setting up other side read state writer
+  MIOBuffer *output_buffer = other_side->read_state.vio.get_writer();
+  int64_t water_mark       = output_buffer->water_mark;
+  water_mark               = std::max<int64_t>(water_mark, core_obj->buffer_size);
+  int64_t buf_space        = water_mark - output_buffer->max_read_avail();
   if (buf_space <= 0) {
-    Debug("pvc", "[%u] %s: process_write_side no buffer space", core_obj->id, PVC_TYPE);
+    Debug("pvc", "[%u] %s: process_read_side from other side no buffer space", core_obj->id, PVC_TYPE);
     return;
   }
   act_on = std::min(act_on, buf_space);
 
-  int64_t added = transfer_bytes(core_buffer, reader, act_on);
+  int64_t added = transfer_bytes(output_buffer, reader, act_on);
   if (added < 0) {
     // Couldn't actually get the buffer space.  This only
     //   happens on small transfers with the above
     //   buffer_size factor doesn't apply
-    Debug("pvc", "[%u] %s: process_write_side out of buffer space", core_obj->id, PVC_TYPE);
+    Debug("pvc", "[%u] %s: process_read_side from other side out of buffer space", core_obj->id, PVC_TYPE);
     return;
   }
 
   write_state.vio.ndone += added;
+  other_side->read_state.vio.ndone += added;
 
-  Debug("pvc", "[%u] %s: process_write_side; added %" PRId64 "", core_obj->id, PVC_TYPE, added);
+  Debug("pvc", "[%u] %s: process_write_side and process_read_side from other side; added %" PRId64 "", core_obj->id, PVC_TYPE,
+        added);
 
   if (write_state.vio.ntodo() == 0) {
     write_state.vio.cont->handleEvent(VC_EVENT_WRITE_COMPLETE, &write_state.vio);
@@ -536,39 +561,17 @@ PluginVC::process_write_side(bool other_side_call)
     write_state.vio.cont->handleEvent(VC_EVENT_WRITE_READY, &write_state.vio);
   }
 
-  update_inactive_time();
+  if (other_side->read_state.vio.ntodo() == 0) {
+    other_side->read_state.vio.cont->handleEvent(VC_EVENT_READ_COMPLETE, &other_side->read_state.vio);
+  } else {
+    other_side->read_state.vio.cont->handleEvent(VC_EVENT_READ_READY, &other_side->read_state.vio);
+  }
 
-  // Wake up the read side on the other side to process these bytes
-  if (!other_side->closed) {
-    if (!other_side_call) {
-      /* To clear the `need_read_process`, the mutexes must be obtained:
-       *
-       *   - PluginVC::mutex
-       *   - PluginVC::read_state.vio.mutex
-       *
-       */
-      if (other_side->read_state.vio.op != VIO::READ || other_side->closed || other_side->read_state.shutdown) {
-        // Just return, no touch on `other_side->need_read_process`.
-        return;
-      }
-      // Acquire the lock of the read side continuation
-      EThread *my_ethread = mutex->thread_holding;
-      ink_assert(my_ethread != nullptr);
-      MUTEX_TRY_LOCK(lock, other_side->read_state.vio.mutex, my_ethread);
-      if (!lock.is_locked()) {
-        Debug("pvc_event", "[%u] %s: process_read_side from other side lock miss, retrying", other_side->core_obj->id,
-              ((other_side->vc_type == PLUGIN_VC_ACTIVE) ? "Active" : "Passive"));
-
-        // set need_read_process to enforce the read processing
-        other_side->need_read_process = true;
-        other_side->setup_event_cb(PVC_LOCK_RETRY_TIME, &other_side->core_lock_retry_event);
-        return;
-      }
+  update_inactive_time();
+  other_side->update_inactive_time();
 
-      other_side->process_read_side(true);
-    } else {
-      other_side->read_state.vio.reenable();
-    }
+  if (!closed) {
+    write_state.vio.reenable();
   }
 }
 
@@ -578,120 +581,45 @@ PluginVC::process_write_side(bool other_side_call)
 //      this->mutex & while it is ok to callback the
 //      read side continuation
 //
-//   Does read side processing
+//   Closes read state if other side
+//    write state is no longer available
 //
 void
-PluginVC::process_read_side(bool other_side_call)
+PluginVC::process_read_side()
 {
   ink_assert(!deletable);
   ink_assert(magic == PLUGIN_VC_MAGIC_ALIVE);
 
-  // TODO: Never used??
-  // MIOBuffer *core_buffer;
-
-  IOBufferReader *core_reader;
-
-  if (vc_type == PLUGIN_VC_ACTIVE) {
-    // core_buffer = core_obj->p_to_a_buffer;
-    core_reader = core_obj->p_to_a_reader;
-  } else {
-    ink_assert(vc_type == PLUGIN_VC_PASSIVE);
-    // core_buffer = core_obj->a_to_p_buffer;
-    core_reader = core_obj->a_to_p_reader;
-  }
-
   Debug("pvc", "[%u] %s: process_read_side", core_obj->id, PVC_TYPE);
   need_read_process = false;
 
   // Check read_state
-  if (read_state.vio.op != VIO::READ || closed || read_state.shutdown) {
-    return;
-  }
-
-  // Check the state of our read buffer as well as ntodo
-  int64_t ntodo = read_state.vio.ntodo();
-  if (ntodo == 0) {
+  if (read_state.vio.op != VIO::READ || closed || read_state.shutdown || !read_state.vio.ntodo()) {
     return;
   }
 
-  int64_t bytes_avail = core_reader->read_avail();
-  int64_t act_on      = std::min(bytes_avail, ntodo);
-
-  Debug("pvc", "[%u] %s: process_read_side; act_on %" PRId64 "", core_obj->id, PVC_TYPE, act_on);
-
-  if (act_on <= 0) {
-    if (other_side->closed || other_side->write_state.shutdown) {
-      read_state.vio.cont->handleEvent(VC_EVENT_EOS, &read_state.vio);
+  if (!other_side->closed && !other_side->write_state.shutdown) {
+    if (other_side->write_state.vio.op != VIO::WRITE || other_side->write_state.shutdown) {
+      // Just return, no touch on `other_side->need_write_process`.
+      return;
     }
-    return;
-  }
-  // Bytes available, try to transfer from the PluginVCCore
-  //   intermediate buffer
-  //
-  MIOBuffer *output_buffer = read_state.vio.get_writer();
-
-  int64_t water_mark = output_buffer->water_mark;
-  water_mark         = std::max(water_mark, static_cast<int64_t>(core_obj->buffer_size));
-  int64_t buf_space  = water_mark - output_buffer->max_read_avail();
-  if (buf_space <= 0) {
-    Debug("pvc", "[%u] %s: process_read_side no buffer space", core_obj->id, PVC_TYPE);
-    return;
-  }
-  act_on = std::min(act_on, buf_space);
-
-  int64_t added = transfer_bytes(output_buffer, core_reader, act_on);
-  if (added <= 0) {
-    // Couldn't actually get the buffer space.  This only
-    //   happens on small transfers with the above
-    //   buffer_size factor doesn't apply
-    Debug("pvc", "[%u] %s: process_read_side out of buffer space", core_obj->id, PVC_TYPE);
-    return;
-  }
-
-  read_state.vio.ndone += added;
-
-  Debug("pvc", "[%u] %s: process_read_side; added %" PRId64 "", core_obj->id, PVC_TYPE, added);
-
-  if (read_state.vio.ntodo() == 0) {
-    read_state.vio.cont->handleEvent(VC_EVENT_READ_COMPLETE, &read_state.vio);
-  } else {
-    read_state.vio.cont->handleEvent(VC_EVENT_READ_READY, &read_state.vio);
-  }
-
-  update_inactive_time();
-
-  // Wake up the other side so it knows there is space available in
-  //  intermediate buffer
-  if (!other_side->closed) {
-    if (!other_side_call) {
-      /* To clear the `need_write_process`, the mutexes must be obtained:
-       *
-       *   - PluginVC::mutex
-       *   - PluginVC::write_state.vio.mutex
-       *
-       */
-      if (other_side->write_state.vio.op != VIO::WRITE || other_side->closed || other_side->write_state.shutdown) {
-        // Just return, no touch on `other_side->need_write_process`.
-        return;
-      }
-      // Acquire the lock of the write side continuation
-      EThread *my_ethread = mutex->thread_holding;
-      ink_assert(my_ethread != nullptr);
-      MUTEX_TRY_LOCK(lock, other_side->write_state.vio.mutex, my_ethread);
-      if (!lock.is_locked()) {
-        Debug("pvc_event", "[%u] %s: process_write_side from other side lock miss, retrying", other_side->core_obj->id,
-              ((other_side->vc_type == PLUGIN_VC_ACTIVE) ? "Active" : "Passive"));
-
-        // set need_write_process to enforce the write processing
-        other_side->need_write_process = true;
-        other_side->setup_event_cb(PVC_LOCK_RETRY_TIME, &other_side->core_lock_retry_event);
-        return;
-      }
+    // Acquire the lock of the write side continuation
+    EThread *my_ethread = mutex->thread_holding;
+    ink_assert(my_ethread != nullptr);
+    MUTEX_TRY_LOCK(lock, other_side->write_state.vio.mutex, my_ethread);
+    if (!lock.is_locked()) {
+      Debug("pvc_event", "[%u] %s: process_write_side from other side lock miss, retrying", other_side->core_obj->id,
+            ((other_side->vc_type == PLUGIN_VC_ACTIVE) ? "Active" : "Passive"));
 
-      other_side->process_write_side(true);
-    } else {
-      other_side->write_state.vio.reenable();
+      // set need_write_process to enforce the write processing
+      other_side->need_write_process = true;
+      other_side->setup_event_cb(PVC_LOCK_RETRY_TIME, &other_side->core_lock_retry_event);
+      return;
     }
+    other_side->process_write_side();
+  } else {
+    Debug("pvc", "[%u] %s: write_state of other side is not available", core_obj->id, PVC_TYPE);
+    read_state.vio.cont->handleEvent(VC_EVENT_EOS, &read_state.vio);
   }
 }
 
@@ -1078,14 +1006,6 @@ PluginVCCore::init(int64_t buffer_index, int64_t buffer_water_mark)
   passive_vc.mutex      = mutex;
   passive_vc.thread     = active_vc.thread;
 
-  p_to_a_buffer             = new_MIOBuffer(buffer_index);
-  p_to_a_buffer->water_mark = buffer_water_mark;
-  p_to_a_reader             = p_to_a_buffer->alloc_reader();
-
-  a_to_p_buffer             = new_MIOBuffer(buffer_index);
-  a_to_p_buffer->water_mark = buffer_water_mark;
-  a_to_p_reader             = a_to_p_buffer->alloc_reader();
-
   buffer_size = BUFFER_SIZE_FOR_INDEX(buffer_index);
 
   Debug("pvc",
@@ -1111,16 +1031,6 @@ PluginVCCore::destroy()
   passive_vc.write_state.vio.buffer.clear();
   passive_vc.magic = PLUGIN_VC_MAGIC_DEAD;
 
-  if (p_to_a_buffer) {
-    free_MIOBuffer(p_to_a_buffer);
-    p_to_a_buffer = nullptr;
-  }
-
-  if (a_to_p_buffer) {
-    free_MIOBuffer(a_to_p_buffer);
-    a_to_p_buffer = nullptr;
-  }
-
   this->mutex = nullptr;
   delete this;
 }
diff --git a/proxy/PluginVC.h b/proxy/PluginVC.h
index 81b484d92..3f90678a6 100644
--- a/proxy/PluginVC.h
+++ b/proxy/PluginVC.h
@@ -150,8 +150,8 @@ public:
   int main_handler(int event, void *data);
 
 private:
-  void process_read_side(bool);
-  void process_write_side(bool);
+  void process_read_side();
+  void process_write_side();
   void process_close();
   void process_timeout(Event **e, int event_to_send);
 
@@ -254,12 +254,6 @@ private:
   Continuation *connect_to = nullptr;
   bool connected           = false;
 
-  MIOBuffer *p_to_a_buffer      = nullptr;
-  IOBufferReader *p_to_a_reader = nullptr;
-
-  MIOBuffer *a_to_p_buffer      = nullptr;
-  IOBufferReader *a_to_p_reader = nullptr;
-
   IpEndpoint passive_addr_struct;
   IpEndpoint active_addr_struct;