You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@trafficserver.apache.org by bn...@apache.org on 2022/05/09 23:07:31 UTC

[trafficserver] branch 10-Dev updated: Add support for request body transaction data sink (#8804)

This is an automated email from the ASF dual-hosted git repository.

bneradt pushed a commit to branch 10-Dev
in repository https://gitbox.apache.org/repos/asf/trafficserver.git


The following commit(s) were added to refs/heads/10-Dev by this push:
     new 8de37afae Add support for request body transaction data sink (#8804)
8de37afae is described below

commit 8de37afae8a2ea75d9f1135a04dae3fc2e24b060
Author: Brian Neradt <br...@gmail.com>
AuthorDate: Mon May 9 18:07:24 2022 -0500

    Add support for request body transaction data sink (#8804)
    
    We have supported response body data sink for a while but have never
    provided the analogous functionality for request bodies. This adds the
    ability for a plugin to register for a request body sink transformation.
---
 doc/developer-guide/api/types/TSHttpHookID.en.rst  |   2 +
 .../plugins/http-transformations/index.en.rst      |   7 +-
 .../plugins/c-api/txn_data_sink/txn_data_sink.cc   | 144 ++++++++++----
 include/ts/apidefs.h.in                            |   4 +
 include/tscpp/api/TransformationPlugin.h           |  10 +-
 proxy/ProxySession.cc                              |   1 +
 proxy/http/HttpDebugNames.cc                       |   2 +
 proxy/http/HttpSM.cc                               |  56 ++++--
 proxy/http/HttpSM.h                                |  27 ++-
 src/traffic_server/InkAPITest.cc                   |   1 +
 src/tscpp/api/TransformationPlugin.cc              |  12 +-
 src/tscpp/api/utils_internal.cc                    |   4 +-
 .../transform/transaction-with-body.replays.yaml   | 209 ++++++++++++++++++++-
 .../transform/transaction_data_sink.test.py        |  95 +++++++---
 14 files changed, 477 insertions(+), 97 deletions(-)

diff --git a/doc/developer-guide/api/types/TSHttpHookID.en.rst b/doc/developer-guide/api/types/TSHttpHookID.en.rst
index a8356a6c2..90f75c450 100644
--- a/doc/developer-guide/api/types/TSHttpHookID.en.rst
+++ b/doc/developer-guide/api/types/TSHttpHookID.en.rst
@@ -72,6 +72,8 @@ Enumeration Members
 
    .. c:enumerator:: TS_HTTP_RESPONSE_CLIENT_HOOK
 
+   .. c:enumerator:: TS_HTTP_REQUEST_CLIENT_HOOK
+
    .. c:enumerator:: TS_SSL_FIRST_HOOK
 
    .. c:enumerator:: TS_VCONN_START_HOOK
diff --git a/doc/developer-guide/plugins/http-transformations/index.en.rst b/doc/developer-guide/plugins/http-transformations/index.en.rst
index 6e057dc27..de75c6b7e 100644
--- a/doc/developer-guide/plugins/http-transformations/index.en.rst
+++ b/doc/developer-guide/plugins/http-transformations/index.en.rst
@@ -168,9 +168,10 @@ modifies the IO buffer directly.
 Transaction Data Sink
 ~~~~~~~~~~~~~~~~~~~~~
 
-The hook `TS_HTTP_RESPONSE_CLIENT_HOOK` is a hook that supports a special type of transformation, one with only input and no output.
-Although the transformation doesn't provide data back to Traffic Server it can do anything else with the data, such as writing it
-to another output device or process. It must, however, consume all the data for the transaction. There are two primary use cases.
+The `TS_HTTP_REQUEST_CLIENT_HOOK` and `TS_HTTP_RESPONSE_CLIENT_HOOK` hooks supports a special type of transformation, one with only request or
+response body input and no output.  Although the transformation doesn't provide data back to Traffic Server they can do anything else with the
+data, such as writing it to another output device or process. It must, however, consume all the data for the transaction. There are two primary
+use cases.
 
 #. Tap in to the transaction to provide the data for external processing.
 #. Maintain the transaction.
diff --git a/example/plugins/c-api/txn_data_sink/txn_data_sink.cc b/example/plugins/c-api/txn_data_sink/txn_data_sink.cc
index 2be7d6163..af4817ccf 100644
--- a/example/plugins/c-api/txn_data_sink/txn_data_sink.cc
+++ b/example/plugins/c-api/txn_data_sink/txn_data_sink.cc
@@ -1,6 +1,6 @@
 /** @file
 
-  Example plugin for using TS_HTTP_RESPONSE_CLIENT_HOOK.
+  Example plugin for using the TS_HTTP_REQUEST_CLIENT_HOOK and TS_HTTP_RESPONSE_CLIENT_HOOK hooks.
 
   This example is used to maintain the transaction and thence the connection to the origin server for the full
   transaction, even if the user agent aborts. This is useful in cases where there are other reasons to complete
@@ -37,8 +37,11 @@ namespace
 {
 constexpr char const *PLUGIN_NAME = "txn_data_sink";
 
-/** Activate the data sink if this header field is present in the request. */
-std::string_view FLAG_HEADER_FIELD = "TS-Agent";
+/** The flag for activating response body data sink for a transaction. */
+std::string_view FLAG_DUMP_RESPONSE_BODY = "X-Dump-Response";
+
+/** The flag for activating request body data sink for a transaction. */
+std::string_view FLAG_DUMP_REQUEST_BODY = "X-Dump-Request";
 
 /** The sink data for a transaction. */
 struct SinkData {
@@ -49,14 +52,36 @@ struct SinkData {
    * interact with the body as a stream rather than buffering the entire body
    * for each transaction.
    */
-  std::string body_bytes;
+  std::string response_body_bytes;
+
+  /** The bytes for the request body streamed in from the sink.
+   *
+   * @note This example plugin buffers the body which is useful for the
+   * associated Autest. In most production scenarios the user will want to
+   * interact with the body as a stream rather than buffering the entire body
+   * for each transaction.
+   */
+  std::string request_body_bytes;
 };
 
-// This serves to consume all the data that arrives. If it's not consumed the tunnel gets stalled
-// and the transaction doesn't complete. Other things could be done with the data, accessible via
-// the IO buffer @a reader, such as writing it to disk to make an externally accessible copy.
+/** A flag to request that response body bytes be sinked. */
+constexpr bool SINK_RESPONSE_BODY = true;
+
+/** A flag to request that request body bytes be sinked. */
+constexpr bool SINK_REQUEST_BODY = false;
+
+/** This serves to consume all the data that arrives in the VIO.
+ *
+ * Note that if any data is not consumed then the tunnel gets stalled and the
+ * transaction doesn't complete. Various things can be done with the data,
+ * accessible via the IO buffer @a reader, such as writing it to disk in order
+ * to make an externally accessible copy.
+ *
+ * @param[in] sync_response_body: Indicates whether response body bytes should
+ * be consumed.
+ */
 int
-client_reader(TSCont contp, TSEvent event, void *edata)
+body_reader_helper(TSCont contp, TSEvent event, bool sync_response_body)
 {
   SinkData *data = static_cast<SinkData *>(TSContDataGet(contp));
 
@@ -74,6 +99,8 @@ client_reader(TSCont contp, TSEvent event, void *edata)
     TSContDataSet(contp, data);
   }
 
+  std::string &body_bytes = sync_response_body ? data->response_body_bytes : data->request_body_bytes;
+
   switch (event) {
   case TS_EVENT_ERROR:
     TSDebug(PLUGIN_NAME, "Error event");
@@ -90,9 +117,9 @@ client_reader(TSCont contp, TSEvent event, void *edata)
       TSIOBufferReader reader = TSVIOReaderGet(input_vio);
       size_t const n          = TSIOBufferReaderAvail(reader);
       if (n > 0) {
-        auto const offset = data->body_bytes.size();
-        data->body_bytes.resize(offset + n);
-        TSIOBufferReaderCopy(reader, data->body_bytes.data() + offset, n);
+        auto const offset = body_bytes.size();
+        body_bytes.resize(offset + n);
+        TSIOBufferReaderCopy(reader, body_bytes.data() + offset, n);
 
         TSIOBufferReaderConsume(reader, n);
         TSVIONDoneSet(input_vio, TSVIONDoneGet(input_vio) + n);
@@ -102,11 +129,11 @@ client_reader(TSCont contp, TSEvent event, void *edata)
         // Signal that we can accept more data.
         TSContCall(TSVIOContGet(input_vio), TS_EVENT_VCONN_WRITE_READY, input_vio);
       } else {
-        TSDebug(PLUGIN_NAME, "Consumed the following body: \"%.*s\"", (int)data->body_bytes.size(), data->body_bytes.data());
+        TSDebug(PLUGIN_NAME, "Consumed the following body: \"%.*s\"", static_cast<int>(body_bytes.size()), body_bytes.data());
         TSContCall(TSVIOContGet(input_vio), TS_EVENT_VCONN_WRITE_COMPLETE, input_vio);
       }
     } else { // The buffer is gone so we're done.
-      TSDebug(PLUGIN_NAME, "upstream buffer disappeared - %zd bytes", data->body_bytes.size());
+      TSDebug(PLUGIN_NAME, "upstream buffer disappeared - %zd bytes", body_bytes.size());
     }
     break;
   default:
@@ -117,43 +144,90 @@ client_reader(TSCont contp, TSEvent event, void *edata)
   return 0;
 }
 
+/** The handler for transaction data sink for response bodies. */
+int
+response_body_reader(TSCont contp, TSEvent event, void *edata)
+{
+  return body_reader_helper(contp, event, SINK_RESPONSE_BODY);
+}
+
+/** The handler for transaction data sink for request bodies. */
 int
-enable_agent_check(TSHttpTxn txnp)
+request_body_reader(TSCont contp, TSEvent event, void *edata)
 {
-  TSMBuffer req_buf;
-  TSMLoc req_loc;
-  int zret = 0;
+  return body_reader_helper(contp, event, SINK_REQUEST_BODY);
+}
 
-  // Enable the sink agent if the header is present.
+/** A helper function for common logic between request_sink_requested and
+ * response_sink_requested. */
+bool
+sink_requested_helper(TSHttpTxn txnp, std::string_view header)
+{
+  TSMLoc field      = nullptr;
+  TSMBuffer req_buf = nullptr;
+  TSMLoc req_loc    = nullptr;
   if (TS_SUCCESS == TSHttpTxnClientReqGet(txnp, &req_buf, &req_loc)) {
-    TSMLoc agent_field = TSMimeHdrFieldFind(req_buf, req_loc, FLAG_HEADER_FIELD.data(), FLAG_HEADER_FIELD.length());
-    zret               = nullptr == agent_field ? 0 : 1;
+    field = TSMimeHdrFieldFind(req_buf, req_loc, header.data(), header.length());
   }
 
-  return zret;
+  return field != nullptr;
 }
 
-void
-client_add(TSHttpTxn txnp)
+/** Determine whether the headers enable request body sink.
+ *
+ * Inspect the given request headers for the flag that enables request body
+ * sink.
+ *
+ * @param[in] txnp The transaction with the request headers to search for the
+ * header that enables request body sink.
+ *
+ * @return True if the headers enable request body sink, false otherwise.
+ */
+bool
+request_sink_requested(TSHttpTxn txnp)
 {
-  // We use @c TSTransformCreate because ATS sees this the same as a transform, but with only
-  // the input side hooked up and not the output side. Data flows from ATS in to the reader
-  // but not back out to ATS. From the plugin point of view the input data is provided exactly
-  // as it is with a transform.
-  TSHttpTxnHookAdd(txnp, TS_HTTP_RESPONSE_CLIENT_HOOK, TSTransformCreate(client_reader, txnp));
+  return sink_requested_helper(txnp, FLAG_DUMP_REQUEST_BODY);
 }
 
+/** Determine whether the headers enable response body sink.
+ *
+ * Inspect the given response headers for the flag that enables response body
+ * sink.
+ *
+ * @param[in] txnp The transaction with the response headers to search for the
+ * header that enables response body sink.
+ *
+ * @return True if the headers enable response body sink, false otherwise.
+ */
+bool
+response_sink_requested(TSHttpTxn txnp)
+{
+  return sink_requested_helper(txnp, FLAG_DUMP_RESPONSE_BODY);
+}
+
+/** Implements the handler for inspecting the request header bytes and enabling
+ * transaction data sink if X-Dump-Request or X-Dump-Response flags are used.
+ */
 int
 main_hook(TSCont contp, TSEvent event, void *edata)
 {
-  TSHttpTxn txnp = (TSHttpTxn)edata;
+  TSHttpTxn txnp = static_cast<TSHttpTxn>(edata);
 
-  TSDebug(PLUGIN_NAME, "Checking transaction");
+  TSDebug(PLUGIN_NAME, "Checking transaction for any flags to enable transaction data sink.");
   switch (event) {
-  case TS_EVENT_HTTP_READ_RESPONSE_HDR:
-    if (enable_agent_check(txnp)) {
-      TSDebug(PLUGIN_NAME, "Adding data sink to transaction");
-      client_add(txnp);
+  case TS_EVENT_HTTP_READ_REQUEST_HDR:
+    /// We use @c TSTransformCreate because ATS sees this the same as a
+    /// transform, but with only the input side hooked up and not the output
+    /// side. Data flows from ATS in to the reader but not back out to ATS.
+    /// From the plugin point of view the input data is provided exactly as it
+    /// is with a transform.
+    if (response_sink_requested(txnp)) {
+      TSHttpTxnHookAdd(txnp, TS_HTTP_RESPONSE_CLIENT_HOOK, TSTransformCreate(response_body_reader, txnp));
+      TSDebug(PLUGIN_NAME, "Adding response data sink to transaction");
+    }
+    if (request_sink_requested(txnp)) {
+      TSHttpTxnHookAdd(txnp, TS_HTTP_REQUEST_CLIENT_HOOK, TSTransformCreate(request_body_reader, txnp));
+      TSDebug(PLUGIN_NAME, "Adding request data sink to transaction");
     }
     break;
   default:
@@ -179,6 +253,6 @@ TSPluginInit(int argc, const char *argv[])
     return;
   }
 
-  TSHttpHookAdd(TS_HTTP_READ_RESPONSE_HDR_HOOK, TSContCreate(main_hook, nullptr));
+  TSHttpHookAdd(TS_HTTP_READ_REQUEST_HDR_HOOK, TSContCreate(main_hook, nullptr));
   return;
 }
diff --git a/include/ts/apidefs.h.in b/include/ts/apidefs.h.in
index 0cd18f73b..1cdecf219 100644
--- a/include/ts/apidefs.h.in
+++ b/include/ts/apidefs.h.in
@@ -356,6 +356,7 @@ typedef enum {
   TS_EVENT_HTTP_POST_REMAP                   = 60017,
   TS_EVENT_HTTP_REQUEST_BUFFER_READ_COMPLETE = 60018,
   TS_EVENT_HTTP_RESPONSE_CLIENT              = 60019,
+  TS_EVENT_HTTP_REQUEST_CLIENT               = 60020,
 
   TS_EVENT_LIFECYCLE_PORTS_INITIALIZED          = 60100,
   TS_EVENT_LIFECYCLE_PORTS_READY                = 60101,
@@ -390,6 +391,7 @@ typedef enum {
      - TS_HTTP_REQUEST_TRANSFORM_HOOK
      - TS_HTTP_RESPONSE_TRANSFORM_HOOK
      - TS_HTTP_RESPONSE_CLIENT_HOOK
+     - TS_HTTP_REQUEST_CLIENT_HOOK
 
     The following hooks can ONLY be added globally:
      - TS_HTTP_SELECT_ALT_HOOK
@@ -406,6 +408,7 @@ typedef enum {
      - TS_HTTP_REQUEST_TRANSFORM_HOOK
      - TS_HTTP_RESPONSE_TRANSFORM_HOOK
      - TS_HTTP_RESPONSE_CLIENT_HOOK
+     - TS_HTTP_REQUEST_CLIENT_HOOK
      - TS_HTTP_TXN_START_HOOK
      - TS_HTTP_TXN_CLOSE_HOOK
      - TS_HTTP_SSN_CLOSE_HOOK
@@ -468,6 +471,7 @@ typedef enum {
   DERIVE_HOOK_VALUE_FROM_EVENT(POST_REMAP),                   // TS_HTTP_POST_REMAP_HOOK
   DERIVE_HOOK_VALUE_FROM_EVENT(REQUEST_BUFFER_READ_COMPLETE), // TS_HTTP_REQUEST_BUFFER_READ_COMPLETE_HOOK
   DERIVE_HOOK_VALUE_FROM_EVENT(RESPONSE_CLIENT),              // TS_HTTP_RESPONSE_CLIENT_HOOK
+  DERIVE_HOOK_VALUE_FROM_EVENT(REQUEST_CLIENT),               // TS_HTTP_REQUEST_CLIENT_HOOK
 
 // NOTE:
 // If adding any TS_HTTP hooks, be sure to understand the note above in the
diff --git a/include/tscpp/api/TransformationPlugin.h b/include/tscpp/api/TransformationPlugin.h
index e9b1a7fe6..c77e33c2f 100644
--- a/include/tscpp/api/TransformationPlugin.h
+++ b/include/tscpp/api/TransformationPlugin.h
@@ -84,10 +84,12 @@ public:
    * The available types of Transformations.
    */
   enum Type {
-    REQUEST_TRANSFORMATION = 0, /**< Transform the Request body content */
-    RESPONSE_TRANSFORMATION,    /**< Transform the Response body content */
-    SINK_TRANSFORMATION         /**< Sink transformation, meaning you get a separate stream of the Response
-                                     body content that does not get hooked up to a downstream input */
+    REQUEST_TRANSFORMATION = 0,          /**< Transform the Request body content */
+    RESPONSE_TRANSFORMATION,             /**< Transform the Response body content */
+    CLIENT_RESPONSE_SINK_TRANSFORMATION, /**< Sink transformation, meaning you get a separate stream of the Response
+                                              body content that does not get hooked up to a downstream input */
+    CLIENT_REQUEST_SINK_TRANSFORMATION,  /**< Sink transformation, meaning you get a separate stream of the Request
+                                              body content that does not get hooked up to a downstream input */
   };
 
   /**
diff --git a/proxy/ProxySession.cc b/proxy/ProxySession.cc
index 332b468ce..c0df4f913 100644
--- a/proxy/ProxySession.cc
+++ b/proxy/ProxySession.cc
@@ -78,6 +78,7 @@ static const TSEvent eventmap[TS_HTTP_LAST_HOOK + 1] = {
   TS_EVENT_HTTP_PRE_REMAP,             // TS_HTTP_PRE_REMAP_HOOK
   TS_EVENT_HTTP_POST_REMAP,            // TS_HTTP_POST_REMAP_HOOK
   TS_EVENT_HTTP_RESPONSE_CLIENT,       // TS_HTTP_RESPONSE_CLIENT_HOOK
+  TS_EVENT_HTTP_REQUEST_CLIENT,        // TS_HTTP_REQUEST_CLIENT_HOOK
   TS_EVENT_NONE,                       // TS_HTTP_LAST_HOOK
 };
 
diff --git a/proxy/http/HttpDebugNames.cc b/proxy/http/HttpDebugNames.cc
index 313891330..c1c5db04d 100644
--- a/proxy/http/HttpDebugNames.cc
+++ b/proxy/http/HttpDebugNames.cc
@@ -586,6 +586,8 @@ HttpDebugNames::get_api_hook_name(TSHttpHookID t)
     return "TS_HTTP_POST_REMAP_HOOK";
   case TS_HTTP_RESPONSE_CLIENT_HOOK:
     return "TS_HTTP_RESPONSE_CLIENT_HOOK";
+  case TS_HTTP_REQUEST_CLIENT_HOOK:
+    return "TS_HTTP_REQUEST_CLIENT_HOOK";
   case TS_HTTP_LAST_HOOK:
     return "TS_HTTP_LAST_HOOK";
   case TS_VCONN_START_HOOK:
diff --git a/proxy/http/HttpSM.cc b/proxy/http/HttpSM.cc
index 65630c3a5..02ae4a428 100644
--- a/proxy/http/HttpSM.cc
+++ b/proxy/http/HttpSM.cc
@@ -6046,6 +6046,8 @@ HttpSM::do_setup_post_tunnel(HttpVC_t to_vc_type)
     break;
   }
 
+  this->setup_client_request_plugin_agents(p);
+
   // The user agent may support chunked (HTTP/1.1) or not (HTTP/2)
   // In either case, the server will support chunked (HTTP/1.1)
   if (chunked) {
@@ -6808,7 +6810,7 @@ HttpSM::setup_transfer_from_transform()
   transform_info.entry->in_tunnel = true;
   ua_entry->in_tunnel             = true;
 
-  this->setup_plugin_agents(p, client_response_hdr_bytes);
+  this->setup_client_response_plugin_agents(p, client_response_hdr_bytes);
 
   if (t_state.client_info.receive_chunked_response) {
     tunnel.set_producer_chunking_action(p, client_response_hdr_bytes, TCA_CHUNK_CONTENT);
@@ -6938,7 +6940,7 @@ HttpSM::setup_server_transfer()
   ua_entry->in_tunnel     = true;
   server_entry->in_tunnel = true;
 
-  this->setup_plugin_agents(p, client_response_hdr_bytes);
+  this->setup_client_response_plugin_agents(p, client_response_hdr_bytes);
 
   // If the incoming server response is chunked and the client does not
   // expect a chunked response, then dechunk it.  Otherwise, if the
@@ -7080,13 +7082,29 @@ HttpSM::setup_blind_tunnel(bool send_response_hdr, IOBufferReader *initial)
 }
 
 void
-HttpSM::setup_plugin_agents(HttpTunnelProducer *p, int num_header_bytes)
+HttpSM::setup_client_response_plugin_agents(HttpTunnelProducer *p, int num_header_bytes)
 {
-  APIHook *agent           = txn_hook_get(TS_HTTP_RESPONSE_CLIENT_HOOK);
-  has_active_plugin_agents = agent != nullptr;
+  APIHook *agent                    = txn_hook_get(TS_HTTP_RESPONSE_CLIENT_HOOK);
+  has_active_response_plugin_agents = agent != nullptr;
   while (agent) {
     INKVConnInternal *contp = static_cast<INKVConnInternal *>(agent->m_cont);
-    tunnel.add_consumer(contp, p->vc, &HttpSM::tunnel_handler_plugin_agent, HT_HTTP_CLIENT, "plugin agent", num_header_bytes);
+    tunnel.add_consumer(contp, p->vc, &HttpSM::tunnel_handler_plugin_agent, HT_HTTP_CLIENT, "response plugin agent",
+                        num_header_bytes);
+    // We don't put these in the SM VC table because the tunnel
+    // will clean them up in do_io_close().
+    agent = agent->next();
+  }
+}
+
+void
+HttpSM::setup_client_request_plugin_agents(HttpTunnelProducer *p, int num_header_bytes)
+{
+  APIHook *agent                   = txn_hook_get(TS_HTTP_REQUEST_CLIENT_HOOK);
+  has_active_request_plugin_agents = agent != nullptr;
+  while (agent) {
+    INKVConnInternal *contp = static_cast<INKVConnInternal *>(agent->m_cont);
+    tunnel.add_consumer(contp, p->vc, &HttpSM::tunnel_handler_plugin_agent, HT_HTTP_CLIENT, "request plugin agent",
+                        num_header_bytes);
     // We don't put these in the SM VC table because the tunnel
     // will clean them up in do_io_close().
     agent = agent->next();
@@ -7112,12 +7130,26 @@ HttpSM::plugin_agents_cleanup()
   // If this is set then all of the plugin agent VCs were put in
   // the VC table and cleaned up there. This handles the case where
   // something went wrong early.
-  if (!has_active_plugin_agents) {
-    APIHook *agent = txn_hook_get(TS_HTTP_RESPONSE_CLIENT_HOOK);
-    while (agent) {
-      INKVConnInternal *contp = static_cast<INKVConnInternal *>(agent->m_cont);
-      contp->do_io_close();
-      agent = agent->next();
+  if (!has_active_response_plugin_agents) {
+    std::vector<APIHook *> agent_lists;
+    agent_lists.push_back(txn_hook_get(TS_HTTP_RESPONSE_CLIENT_HOOK));
+    for (auto &agent : agent_lists) {
+      while (agent) {
+        INKVConnInternal *contp = static_cast<INKVConnInternal *>(agent->m_cont);
+        contp->do_io_close();
+        agent = agent->next();
+      }
+    }
+  }
+  if (!has_active_request_plugin_agents) {
+    std::vector<APIHook *> agent_lists;
+    agent_lists.push_back(txn_hook_get(TS_HTTP_REQUEST_CLIENT_HOOK));
+    for (auto &agent : agent_lists) {
+      while (agent) {
+        INKVConnInternal *contp = static_cast<INKVConnInternal *>(agent->m_cont);
+        contp->do_io_close();
+        agent = agent->next();
+      }
     }
   }
 }
diff --git a/proxy/http/HttpSM.h b/proxy/http/HttpSM.h
index 0d66cab71..ff5575242 100644
--- a/proxy/http/HttpSM.h
+++ b/proxy/http/HttpSM.h
@@ -376,9 +376,17 @@ protected:
 
   HttpTransformInfo transform_info;
   HttpTransformInfo post_transform_info;
-  /// Set if plugin client / user agents are active.
-  /// Need primarily for cleanup.
-  bool has_active_plugin_agents = false;
+  /** A flag to keep track of whether there are active request plugin agents.
+   *
+   * This is used to guide plugin agent cleanup.
+   */
+  bool has_active_request_plugin_agents = false;
+
+  /** A flag to keep track of whether there are active response plugin agents.
+   *
+   * This is used to guide plugin agent cleanup.
+   */
+  bool has_active_response_plugin_agents = false;
 
   HttpCacheSM cache_sm;
   HttpCacheSM transform_cache_sm;
@@ -503,14 +511,23 @@ protected:
   HttpTunnelProducer *setup_cache_transfer_to_transform();
   HttpTunnelProducer *setup_transfer_from_transform_to_cache_only();
 
-  /** Configure consumers for transform plugins.
+  /** Configure consumers for client response transform plugins.
+   *
+   * @param[in] p The Tunnel's producer for whom transform plugins' consumers
+   *   will be configured.
+   * @param[in] num_header_bytes The number of header bytes in the stream.
+   *   These will be skipped and not passed to the consumers of the data sink.
+   */
+  void setup_client_response_plugin_agents(HttpTunnelProducer *p, int num_header_bytes = 0);
+
+  /** Configure consumers for client request transform plugins.
    *
    * @param[in] p The Tunnel's producer for whom transform plugins' consumers
    *   will be configured.
    * @param[in] num_header_bytes The number of header bytes in the stream.
    *   These will be skipped and not passed to the consumers of the data sink.
    */
-  void setup_plugin_agents(HttpTunnelProducer *p, int num_header_bytes);
+  void setup_client_request_plugin_agents(HttpTunnelProducer *p, int num_header_bytes = 0);
 
   HttpTransact::StateMachineAction_t last_action     = HttpTransact::SM_ACTION_UNDEFINED;
   int (HttpSM::*m_last_state)(int event, void *data) = nullptr;
diff --git a/src/traffic_server/InkAPITest.cc b/src/traffic_server/InkAPITest.cc
index e5170503d..0765337f6 100644
--- a/src/traffic_server/InkAPITest.cc
+++ b/src/traffic_server/InkAPITest.cc
@@ -6638,6 +6638,7 @@ enum ORIG_TSHttpHookID {
   ORIG_TS_HTTP_POST_REMAP_HOOK,
   ORIG_TS_HTTP_REQUEST_BUFFER_READ_COMPLETE_HOOK,
   ORIG_TS_HTTP_RESPONSE_CLIENT_HOOK,
+  ORIG_TS_HTTP_REQUEST_CLIENT_HOOK,
   ORIG_TS_SSL_FIRST_HOOK   = 201,
   ORIG_TS_VCONN_START_HOOK = ORIG_TS_SSL_FIRST_HOOK,
   ORIG_TS_VCONN_CLOSE_HOOK,
diff --git a/src/tscpp/api/TransformationPlugin.cc b/src/tscpp/api/TransformationPlugin.cc
index 547dc7a74..6fb38167b 100644
--- a/src/tscpp/api/TransformationPlugin.cc
+++ b/src/tscpp/api/TransformationPlugin.cc
@@ -396,9 +396,13 @@ TransformationPlugin::produce(std::string_view data)
   if (state_->type_ == REQUEST_TRANSFORMATION) {
     state_->request_xform_output_.append(data.data(), data.length());
     return data.size();
-  } else if (state_->type_ == SINK_TRANSFORMATION) {
-    LOG_DEBUG("produce TransformationPlugin=%p tshttptxn=%p : This is a sink transform. Not producing any output", this,
-              state_->txn_);
+  } else if (state_->type_ == CLIENT_RESPONSE_SINK_TRANSFORMATION) {
+    LOG_DEBUG("produce TransformationPlugin=%p tshttptxn=%p : This is a client response sink transform. Not producing any output",
+              this, state_->txn_);
+    return 0;
+  } else if (state_->type_ == CLIENT_REQUEST_SINK_TRANSFORMATION) {
+    LOG_DEBUG("produce TransformationPlugin=%p tshttptxn=%p : This is a client request sink transform. Not producing any output",
+              this, state_->txn_);
     return 0;
   } else {
     return doProduce(data);
@@ -408,7 +412,7 @@ TransformationPlugin::produce(std::string_view data)
 size_t
 TransformationPlugin::setOutputComplete()
 {
-  if (state_->type_ == SINK_TRANSFORMATION) {
+  if (state_->type_ == CLIENT_RESPONSE_SINK_TRANSFORMATION || state_->type_ == CLIENT_REQUEST_SINK_TRANSFORMATION) {
     // There's no output stream for a sink transform, so we do nothing
     //
     // Warning: don't try to shutdown the VConn, since the default implementation (DummyVConnection)
diff --git a/src/tscpp/api/utils_internal.cc b/src/tscpp/api/utils_internal.cc
index a4da599a0..783ddb8bf 100644
--- a/src/tscpp/api/utils_internal.cc
+++ b/src/tscpp/api/utils_internal.cc
@@ -235,8 +235,10 @@ utils::internal::convertInternalTransformationTypeToTsHook(TransformationPlugin:
     return TS_HTTP_RESPONSE_TRANSFORM_HOOK;
   case TransformationPlugin::REQUEST_TRANSFORMATION:
     return TS_HTTP_REQUEST_TRANSFORM_HOOK;
-  case TransformationPlugin::SINK_TRANSFORMATION:
+  case TransformationPlugin::CLIENT_RESPONSE_SINK_TRANSFORMATION:
     return TS_HTTP_RESPONSE_CLIENT_HOOK;
+  case TransformationPlugin::CLIENT_REQUEST_SINK_TRANSFORMATION:
+    return TS_HTTP_REQUEST_CLIENT_HOOK;
   default:
     assert(false); // shouldn't happen, let's catch it early
     break;
diff --git a/tests/gold_tests/pluginTest/transform/transaction-with-body.replays.yaml b/tests/gold_tests/pluginTest/transform/transaction-with-body.replays.yaml
index 08153fda1..07a4b1a80 100644
--- a/tests/gold_tests/pluginTest/transform/transaction-with-body.replays.yaml
+++ b/tests/gold_tests/pluginTest/transform/transaction-with-body.replays.yaml
@@ -1,36 +1,227 @@
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+
 meta:
   version: "1.0"
 
 sessions:
 - transactions:
+
+  #---------------------------------------------------------------------------
+  # Verify that Content-Length response bodies can be consumed.
+  #---------------------------------------------------------------------------
+  - client-request:
+      method: "POST"
+      version: "1.1"
+      url: "/test/http11_response"
+      headers:
+        fields:
+        - [ Host, example.com ]
+        - [ uuid, 10 ]
+        - [ X-Dump-Response, 1 ]
+        - [ Content-Length, 31 ]
+      content:
+        data: "http1.1_request_body_not_dumped"
+
+    proxy-request:
+      headers:
+        fields:
+        - [ X-Dump-Response, { as: present } ]
+
+    server-response:
+      status: 200
+      reason: OK
+      headers:
+        fields:
+        - [ Content-Length, 31 ]
+      content:
+        data: "http1.1_cl_response_body_dumped"
+
+    proxy-response:
+      status: 200
+
+  #---------------------------------------------------------------------------
+  # Verify that chunked response bodies can be consumed.
+  #---------------------------------------------------------------------------
+  - client-request:
+      method: "POST"
+      version: "1.1"
+      url: "/test/http11_response"
+      headers:
+        fields:
+        - [ Host, example.com ]
+        - [ uuid, 11 ]
+        - [ X-Dump-Response, 1 ]
+        - [ Content-Length, 31 ]
+      content:
+        data: "http1.1_request_body_not_dumped"
+
+    proxy-request:
+      headers:
+        fields:
+        - [ X-Dump-Response, { as: present } ]
+
+    server-response:
+      status: 200
+      reason: OK
+      headers:
+        fields:
+        - [ Transfer-Encoding, chunked ]
+      content:
+        data: "http1.1_chunked_response_body_dumped"
+
+    proxy-response:
+      status: 200
+
+  #---------------------------------------------------------------------------
+  # Verify that Content-Length request bodies can be consumed.
+  #---------------------------------------------------------------------------
   - client-request:
-      method: "GET"
+      method: "POST"
       version: "1.1"
-      url: "/test/http11"
+      url: "/test/http11_request"
       headers:
         fields:
         - [ Host, example.com ]
-        - [ uuid, 1 ]
-        - [ Content-Length, 20 ]
+        - [ uuid, 12 ]
         # The example plugin only prints bodies with responses to requests
         # containing The TS-Agent header.
-        - [ TS-Agent, 1 ]
+        - [ X-Dump-Request, 1 ]
+        - [ Content-Length, 30 ]
       content:
-        data: "http1.1_request_body"
+        data: "http1.1_cl_request_body_dumped"
 
     proxy-request:
       headers:
         fields:
-        - [ TS-Agent, { as: present } ]
+        - [ X-Dump-Request, { as: present } ]
 
     server-response:
       status: 200
       reason: OK
       headers:
         fields:
-        - [ Content-Length, 21 ]
+        - [ Transfer-Encoding, chunked ]
+      content:
+        data: "http1.1_response_body_not_dumped"
+
+    proxy-response:
+      status: 200
+
+  #---------------------------------------------------------------------------
+  # Verify that chunked request bodies can be consumed.
+  #---------------------------------------------------------------------------
+  - client-request:
+      method: "POST"
+      version: "1.1"
+      url: "/test/http11_request"
+      headers:
+        fields:
+        - [ Host, example.com ]
+        - [ uuid, 13 ]
+        # The example plugin only prints bodies with responses to requests
+        # containing The TS-Agent header.
+        - [ X-Dump-Request, 1 ]
+        - [ Transfer-Encoding, chunked ]
+      content:
+        data: "http1.1_chunked_request_body_dumped"
+
+    proxy-request:
+      headers:
+        fields:
+        - [ X-Dump-Request, { as: present } ]
+
+    server-response:
+      status: 200
+      reason: OK
+      headers:
+        fields:
+        - [ Transfer-Encoding, chunked ]
+      content:
+        data: "http1.1_response_body_not_dumped"
+
+    proxy-response:
+      status: 200
+
+- protocol:
+  - name: http
+    version: 2
+  - name: tls
+    sni: test_sni
+  - name: tcp
+  - name: ip
+
+  transactions:
+
+  #---------------------------------------------------------------------------
+  # Verify that HTTP/2 response bodies can be consumed.
+  #---------------------------------------------------------------------------
+  - client-request:
+      headers:
+        fields:
+        - [ :method, POST ]
+        - [ :scheme, https ]
+        - [ :authority, www.example.com ]
+        - [ :path, /test/http2_response ]
+        - [ uuid, 20 ]
+        - [ X-Dump-Response, 1 ]
+      content:
+        data: "http2_request_body_not_dumped"
+
+    proxy-request:
+      headers:
+        fields:
+        - [ X-Dump-Response, { as: present } ]
+
+    server-response:
+      headers:
+        fields:
+        - [ :status, 200 ]
+      content:
+        data: "http2_response_body_dumped"
+
+    proxy-response:
+      status: 200
+
+  #---------------------------------------------------------------------------
+  # Verify that HTTP/2 request bodies can be consumed.
+  #---------------------------------------------------------------------------
+  - client-request:
+      headers:
+        fields:
+        - [ :method, POST ]
+        - [ :scheme, https ]
+        - [ :authority, www.example.com ]
+        - [ :path, /test/http2_response ]
+        - [ uuid, 21 ]
+        - [ X-Dump-Request, 1 ]
+      content:
+        data: "http2_request_body_dumped"
+
+    proxy-request:
+      headers:
+        fields:
+        - [ X-Dump-Request, { as: present } ]
+
+    server-response:
+      headers:
+        fields:
+        - [ :status, 200 ]
       content:
-        data: "http1.1_response_body"
+        data: "http2_response_body_not_dumped"
 
     proxy-response:
       status: 200
diff --git a/tests/gold_tests/pluginTest/transform/transaction_data_sink.test.py b/tests/gold_tests/pluginTest/transform/transaction_data_sink.test.py
index 8162d62ae..ecbe08068 100644
--- a/tests/gold_tests/pluginTest/transform/transaction_data_sink.test.py
+++ b/tests/gold_tests/pluginTest/transform/transaction_data_sink.test.py
@@ -16,8 +16,6 @@
 #  See the License for the specific language governing permissions and
 #  limitations under the License.
 
-import os
-
 Test.Summary = '''
 Verify transaction data sink.
 '''
@@ -26,26 +24,75 @@ Test.SkipUnless(
     Condition.PluginExists('txn_data_sink.so'),
 )
 
-replay_file = "transaction-with-body.replays.yaml"
-server = Test.MakeVerifierServerProcess("server", replay_file)
 
-ts = Test.MakeATSProcess("ts", enable_cache=False)
-ts.Disk.records_config.update({
-    'proxy.config.diags.debug.enabled': 1,
-    'proxy.config.diags.debug.tags': 'txn_data_sink',
-})
-ts.Disk.remap_config.AddLine(
-    f'map / http://localhost:{server.Variables.http_port}/'
-)
-ts.Disk.plugin_config.AddLine('txn_data_sink.so')
-
-# Verify that the various aspects of the expected debug output for the
-# transaction are logged.
-ts.Streams.stderr = Testers.ContainsExpression(
-    '"http1.1_response_body"',
-    "The response body should be printed by the plugin.")
-
-tr = Test.AddTestRun()
-tr.Processes.Default.StartBefore(server)
-tr.Processes.Default.StartBefore(ts)
-tr.AddVerifierClientProcess("client-1", replay_file, http_ports=[ts.Variables.port])
+class TransactionDataSyncTest:
+
+    replay_file = "transaction-with-body.replays.yaml"
+
+    def __init__(self):
+        self._setupOriginServer()
+        self._setupTS()
+
+    def _setupOriginServer(self):
+        self.server = Test.MakeVerifierServerProcess(
+            "server", self.replay_file)
+
+    def _setupTS(self):
+        self.ts = Test.MakeATSProcess("ts", enable_cache=False, enable_tls=True)
+        self.ts.Disk.records_config.update({
+            "proxy.config.ssl.server.cert.path": f'{self.ts.Variables.SSLDir}',
+            "proxy.config.ssl.server.private_key.path": f'{self.ts.Variables.SSLDir}',
+            "proxy.config.ssl.client.verify.server.policy": 'PERMISSIVE',
+
+            'proxy.config.diags.debug.enabled': 1,
+            'proxy.config.diags.debug.tags': 'http|txn_data_sink',
+        })
+        self.ts.addDefaultSSLFiles()
+        self.ts.Disk.remap_config.AddLine(
+            f'map / http://localhost:{self.server.Variables.http_port}/'
+        )
+        self.ts.Disk.ssl_multicert_config.AddLine(
+            'dest_ip=* ssl_cert_name=server.pem ssl_key_name=server.key'
+        )
+        self.ts.Disk.plugin_config.AddLine('txn_data_sink.so')
+
+        # All of the bodies that contained "not_dumped" were not configured to
+        # be dumped. Therefore it is a bug if they show up in the logs.
+        self.ts.Streams.stderr += Testers.ExcludesExpression(
+            'body_not_dumped',
+            "An unexpected body was dumped.")
+
+        # Verify that each of the configured transaction bodies were dumped.
+        self.ts.Streams.stderr += Testers.ContainsExpression(
+            'http1.1_cl_response_body_dumped',
+            "The expected HTTP/1.1 Content-Length response body was dumped.")
+        self.ts.Streams.stderr += Testers.ContainsExpression(
+            'http1.1_chunked_response_body_dumped',
+            "The expected HTTP/1.1 chunked response body was dumped.")
+        self.ts.Streams.stderr += Testers.ContainsExpression(
+            'http1.1_cl_request_body_dumped',
+            "The expected HTTP/1.1 Content-Length request body was dumped.")
+        self.ts.Streams.stderr += Testers.ContainsExpression(
+            'http1.1_chunked_request_body_dumped',
+            "The expected HTTP/1.1 chunked request body was dumped.")
+        self.ts.Streams.stderr += Testers.ContainsExpression(
+            '"http2_response_body_dumped"',
+            "The expected HTTP/2 response body was dumped.")
+        self.ts.Streams.stderr += Testers.ContainsExpression(
+            'http2_request_body_dumped',
+            "The expected HTTP/2 request body was dumped.")
+
+    def run(self):
+        """Configure a TestRun for the test."""
+        tr = Test.AddTestRun()
+        tr.Processes.Default.StartBefore(self.server)
+        tr.Processes.Default.StartBefore(self.ts)
+        tr.AddVerifierClientProcess(
+            "client",
+            self.replay_file,
+            http_ports=[self.ts.Variables.port],
+            https_ports=[self.ts.Variables.ssl_port],
+            other_args='--thread-limit 1')
+
+
+TransactionDataSyncTest().run()