You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@trafficserver.apache.org by bn...@apache.org on 2021/01/08 14:01:24 UTC

[trafficserver] branch master updated: default to throttling and subsequently simplify the transfer code (#7257)

This is an automated email from the ASF dual-hosted git repository.

bnolsen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/trafficserver.git


The following commit(s) were added to refs/heads/master by this push:
     new 4462484  default to throttling and subsequently simplify the transfer code (#7257)
4462484 is described below

commit 4462484040e2f6ff6001002fb97bbbec367a95d9
Author: Brian Olsen <bn...@gmail.com>
AuthorDate: Fri Jan 8 07:01:11 2021 -0700

    default to throttling and subsequently simplify the transfer code (#7257)
    
    slice: default to throttling, 416 changes and bug fix for downstream closing.
---
 plugins/experimental/slice/Config.cc   |   5 --
 plugins/experimental/slice/Config.h    |   3 +-
 plugins/experimental/slice/client.cc   |  30 +++++-----
 plugins/experimental/slice/server.cc   |  39 +++++++-----
 plugins/experimental/slice/slice.h     |   4 +-
 plugins/experimental/slice/transfer.cc | 106 ++++++++++++---------------------
 plugins/experimental/slice/util.cc     |   1 +
 7 files changed, 80 insertions(+), 108 deletions(-)

diff --git a/plugins/experimental/slice/Config.cc b/plugins/experimental/slice/Config.cc
index 54df2f1..0408389 100644
--- a/plugins/experimental/slice/Config.cc
+++ b/plugins/experimental/slice/Config.cc
@@ -112,7 +112,6 @@ Config::fromArgs(int const argc, char const *const argv[])
     {const_cast<char *>("exclude-regex"), required_argument, nullptr, 'e'},
     {const_cast<char *>("include-regex"), required_argument, nullptr, 'i'},
     {const_cast<char *>("ref-relative"), no_argument, nullptr, 'l'},
-    {const_cast<char *>("throttle"), no_argument, nullptr, 'o'},
     {const_cast<char *>("pace-errorlog"), required_argument, nullptr, 'p'},
     {const_cast<char *>("remap-host"), required_argument, nullptr, 'r'},
     {const_cast<char *>("blockbytes-test"), required_argument, nullptr, 't'},
@@ -182,10 +181,6 @@ Config::fromArgs(int const argc, char const *const argv[])
       m_reftype = RefType::Relative;
       DEBUG_LOG("Reference slice relative to request (not slice block 0)");
     } break;
-    case 'o': {
-      m_throttle = true;
-      DEBUG_LOG("Enabling internal block throttling");
-    } break;
     case 'p': {
       int const secsread = atoi(optarg);
       if (0 < secsread) {
diff --git a/plugins/experimental/slice/Config.h b/plugins/experimental/slice/Config.h
index 1f59511..4a5b3d6 100644
--- a/plugins/experimental/slice/Config.h
+++ b/plugins/experimental/slice/Config.h
@@ -41,8 +41,7 @@ struct Config {
   RegexType m_regex_type{None};
   pcre *m_regex{nullptr};
   pcre_extra *m_regex_extra{nullptr};
-  bool m_throttle{false}; // internal block throttling
-  int m_paceerrsecs{0};   // -1 disable logging, 0 no pacing, max 60s
+  int m_paceerrsecs{0}; // -1 disable logging, 0 no pacing, max 60s
   enum RefType { First, Relative };
   RefType m_reftype{First}; // reference slice is relative to request
 
diff --git a/plugins/experimental/slice/client.cc b/plugins/experimental/slice/client.cc
index d5cbd52..c1a9b59 100644
--- a/plugins/experimental/slice/client.cc
+++ b/plugins/experimental/slice/client.cc
@@ -140,22 +140,20 @@ handle_client_resp(TSCont contp, TSEvent event, Data *const data)
     } break;
 
     case BlockState::Pending: {
-      bool start_next_block = true;
-
-      if (data->m_config->m_throttle) {
-        TSVIO const output_vio    = data->m_dnstream.m_write.m_vio;
-        int64_t const output_done = TSVIONDoneGet(output_vio);
-        int64_t const output_sent = data->m_bytessent;
-        int64_t const threshout   = data->m_config->m_blockbytes;
-
-        if (threshout < (output_sent - output_done)) {
-          start_next_block = false;
-          DEBUG_LOG("%p handle_client_resp: throttling %" PRId64, data, (output_sent - output_done));
-        }
-
-        if (start_next_block) {
-          DEBUG_LOG("Starting next block request");
-          request_block(contp, data);
+      // throttle
+      TSVIO const output_vio    = data->m_dnstream.m_write.m_vio;
+      int64_t const output_done = TSVIONDoneGet(output_vio);
+      int64_t const output_sent = data->m_bytessent;
+      int64_t const threshout   = data->m_config->m_blockbytes;
+      int64_t const buffered    = output_sent - output_done;
+
+      if (threshout < buffered) {
+        DEBUG_LOG("%p handle_client_resp: throttling %" PRId64, data, buffered);
+      } else {
+        DEBUG_LOG("Starting next block request");
+        if (!request_block(contp, data)) {
+          data->m_blockstate = BlockState::Fail;
+          return;
         }
       }
     } break;
diff --git a/plugins/experimental/slice/server.cc b/plugins/experimental/slice/server.cc
index bdc37da..089b1fc 100644
--- a/plugins/experimental/slice/server.cc
+++ b/plugins/experimental/slice/server.cc
@@ -549,13 +549,16 @@ handle_server_resp(TSCont contp, TSEvent event, Data *const data)
       // header may have been successfully parsed but with caveats
       switch (data->m_blockstate) {
         // request new version of current internal slice
-      case BlockState::PendingInt: {
-        request_block(contp, data);
-        return;
-      } break;
-        // request new version of reference slice
+      case BlockState::PendingInt:
       case BlockState::PendingRef: {
-        request_block(contp, data);
+        if (!request_block(contp, data)) {
+          data->m_blockstate = BlockState::Fail;
+          if (data->m_dnstream.m_write.isOpen()) {
+            TSVIOReenable(data->m_dnstream.m_write.m_vio);
+          } else {
+            shutdown(contp, data);
+          }
+        }
         return;
       } break;
       case BlockState::ActiveRef: {
@@ -644,29 +647,33 @@ handle_server_resp(TSCont contp, TSEvent event, Data *const data)
       // Don't immediately request the next slice if the client
       // isn't keeping up
 
-      bool start_next_block = true;
+      if (data->m_dnstream.m_write.isOpen()) {
+        bool start_next_block = true;
 
-      // throttle condition
-      if (data->m_config->m_throttle && data->m_dnstream.m_read.isOpen()) {
+        // check throttle condition
         TSVIO const output_vio    = data->m_dnstream.m_write.m_vio;
         int64_t const output_done = TSVIONDoneGet(output_vio);
         int64_t const output_sent = data->m_bytessent;
         int64_t const threshout   = data->m_config->m_blockbytes;
+        int64_t const buffered    = output_sent - output_done;
 
-        if (threshout < (output_sent - output_done)) {
+        if (threshout < buffered) {
           start_next_block = false;
-          DEBUG_LOG("%p handle_server_resp: throttling %" PRId64, data, (output_sent - output_done));
+          DEBUG_LOG("%p handle_server_resp: throttling %" PRId64, data, buffered);
         }
-      }
 
-      if (start_next_block) {
-        request_block(contp, data);
+        if (start_next_block) {
+          if (!request_block(contp, data)) {
+            data->m_blockstate = BlockState::Fail;
+            abort(contp, data);
+            return;
+          }
+        }
       }
-
     } else {
       data->m_upstream.close();
       data->m_blockstate = BlockState::Done;
-      if (!data->m_dnstream.m_read.isOpen()) {
+      if (!data->m_dnstream.m_write.isOpen()) {
         shutdown(contp, data);
       }
     }
diff --git a/plugins/experimental/slice/slice.h b/plugins/experimental/slice/slice.h
index bd58818..ed86dd6 100644
--- a/plugins/experimental/slice/slice.h
+++ b/plugins/experimental/slice/slice.h
@@ -39,8 +39,8 @@ constexpr std::string_view X_CRR_IMS_HEADER = {"X-Crr-Ims"};
 #define __FILENAME__ (strrchr(__FILE__, '/') ? strrchr(__FILE__, '/') + 1 : __FILE__)
 #define DEBUG_LOG(fmt, ...) TSDebug(PLUGIN_NAME, "[%s:% 4d] %s(): " fmt, __FILENAME__, __LINE__, __func__, ##__VA_ARGS__)
 
-#define ERROR_LOG(fmt, ...)                                                         \
-  TSError("[%s:% 4d] %s(): " fmt, __FILENAME__, __LINE__, __func__, ##__VA_ARGS__); \
+#define ERROR_LOG(fmt, ...)                                                                         \
+  TSError("[%s/%s:% 4d] %s(): " fmt, PLUGIN_NAME, __FILENAME__, __LINE__, __func__, ##__VA_ARGS__); \
   TSDebug(PLUGIN_NAME, "[%s:%04d] %s(): " fmt, __FILENAME__, __LINE__, __func__, ##__VA_ARGS__)
 
 #else
diff --git a/plugins/experimental/slice/transfer.cc b/plugins/experimental/slice/transfer.cc
index f83b088..ce31b75 100644
--- a/plugins/experimental/slice/transfer.cc
+++ b/plugins/experimental/slice/transfer.cc
@@ -33,56 +33,40 @@ transfer_content_bytes(Data *const data)
   int64_t consumed = 0; // input vio bytes visited
   int64_t copied   = 0; // output bytes transferred
 
-  bool const canWrite = data->m_dnstream.m_write.isOpen();
-  bool done           = false;
-
-  TSIOBufferBlock block = TSIOBufferReaderStart(reader);
-
-  while (!done && nullptr != block) {
-    int64_t bavail = TSIOBufferBlockReadAvail(block, reader);
-
-    if (0 == bavail) {
-      block = TSIOBufferBlockNext(block);
-    } else {
-      int64_t toconsume = 0;
-
-      if (canWrite) {
-        int64_t const toskip = std::min(data->m_blockskip, bavail);
-        if (0 < toskip) { // before bytes
-          toconsume = toskip;
-          data->m_blockskip -= toskip;
-        } else {
-          int64_t const bytesleft = data->m_bytestosend - data->m_bytessent;
-          if (0 < bytesleft) { // transfer bytes
-            int64_t const tocopy = std::min(bavail, bytesleft);
-            int64_t const nbytes = TSIOBufferCopy(output_buf, reader, tocopy, 0);
-
-            done = (nbytes < tocopy); // output buffer stuffed
-
-            copied += nbytes;
-            data->m_bytessent += nbytes;
-
-            toconsume = nbytes;
-          } else { // after bytes
-            toconsume = bavail;
-          }
-        }
-      } else { // drain
-        toconsume = bavail;
-      }
+  int64_t avail = TSIOBufferReaderAvail(reader);
+  if (0 < avail) {
+    int64_t const toskip = std::min(data->m_blockskip, avail);
+    if (0 < toskip) {
+      TSIOBufferReaderConsume(reader, toskip);
+      data->m_blockskip -= toskip;
+      avail -= toskip;
+      consumed += toskip;
+    }
+  }
 
-      if (0 < toconsume) {
-        if (bavail == toconsume) {
-          block = TSIOBufferBlockNext(block);
-        }
-        TSIOBufferReaderConsume(reader, toconsume);
-        consumed += toconsume;
-      }
+  // bool const canWrite = data->m_dnstream.m_write.isOpen();
+
+  if (0 < avail) {
+    int64_t const bytesleft = data->m_bytestosend - data->m_bytessent;
+    int64_t const tocopy    = std::min(avail, bytesleft);
+    if (0 < tocopy) {
+      copied = TSIOBufferCopy(output_buf, reader, tocopy, 0);
+
+      data->m_bytessent += copied;
+      TSIOBufferReaderConsume(reader, copied);
+
+      avail -= copied;
+      consumed += copied;
     }
   }
 
-  // tell output more data is available
-  if (0 < copied) {
+  // if hit fulfillment start bulk consuming
+  if (0 < avail && data->m_bytestosend <= data->m_bytessent) {
+    TSIOBufferReaderConsume(reader, avail);
+    consumed += avail;
+  }
+
+  if (0 < copied && nullptr != output_vio) {
     TSVIOReenable(output_vio);
   }
 
@@ -112,35 +96,23 @@ transfer_all_bytes(Data *const data)
   TSIOBufferReader const reader = data->m_upstream.m_read.m_reader;
   TSIOBuffer const output_buf   = data->m_dnstream.m_write.m_iobuf;
 
-  bool done = false;
+  int64_t const read_avail = TSIOBufferReaderAvail(reader);
 
-  TSIOBufferBlock block = TSIOBufferReaderStart(reader);
+  if (0 < read_avail) {
+    int64_t const copied = TSIOBufferCopy(output_buf, reader, read_avail, 0);
 
-  while (!done && nullptr != block) {
-    int64_t bavail = TSIOBufferBlockReadAvail(block, reader);
+    if (0 < copied) {
+      TSIOBufferReaderConsume(reader, copied);
+      consumed = copied;
 
-    if (0 == bavail) {
-      block = TSIOBufferBlockNext(block);
-    } else {
-      int64_t const nbytes = TSIOBufferCopy(output_buf, reader, bavail, 0);
-      done                 = nbytes < bavail; // output buffer is full
-
-      if (0 < nbytes) {
-        if (bavail == nbytes) {
-          block = TSIOBufferBlockNext(block);
-        }
-        TSIOBufferReaderConsume(reader, nbytes);
-        consumed += nbytes;
+      TSVIO const output_vio = data->m_dnstream.m_write.m_vio;
+      if (nullptr != output_vio) {
+        TSVIOReenable(output_vio);
       }
     }
   }
 
   if (0 < consumed) {
-    TSVIO const output_vio = data->m_dnstream.m_write.m_vio;
-    if (nullptr != output_vio) {
-      TSVIOReenable(output_vio);
-    }
-
     TSVIO const input_vio = data->m_upstream.m_read.m_vio;
     if (nullptr != input_vio) {
       TSVIONDoneSet(input_vio, TSVIONDoneGet(input_vio) + consumed);
diff --git a/plugins/experimental/slice/util.cc b/plugins/experimental/slice/util.cc
index 04516d4..15c68b4 100644
--- a/plugins/experimental/slice/util.cc
+++ b/plugins/experimental/slice/util.cc
@@ -128,6 +128,7 @@ request_block(TSCont contp, Data *const data)
     break;
   default:
     ERROR_LOG("Invalid blockstate");
+    return false;
     break;
   }