You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@trafficserver.apache.org by bn...@apache.org on 2021/01/08 14:01:24 UTC
[trafficserver] branch master updated: default to throttling and
subsequently simplify the transfer code (#7257)
This is an automated email from the ASF dual-hosted git repository.
bnolsen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/trafficserver.git
The following commit(s) were added to refs/heads/master by this push:
new 4462484 default to throttling and subsequently simplify the transfer code (#7257)
4462484 is described below
commit 4462484040e2f6ff6001002fb97bbbec367a95d9
Author: Brian Olsen <bn...@gmail.com>
AuthorDate: Fri Jan 8 07:01:11 2021 -0700
default to throttling and subsequently simplify the transfer code (#7257)
slice: default to throttling, 416 changes and bug fix for downstream closing.
---
plugins/experimental/slice/Config.cc | 5 --
plugins/experimental/slice/Config.h | 3 +-
plugins/experimental/slice/client.cc | 30 +++++-----
plugins/experimental/slice/server.cc | 39 +++++++-----
plugins/experimental/slice/slice.h | 4 +-
plugins/experimental/slice/transfer.cc | 106 ++++++++++++---------------------
plugins/experimental/slice/util.cc | 1 +
7 files changed, 80 insertions(+), 108 deletions(-)
diff --git a/plugins/experimental/slice/Config.cc b/plugins/experimental/slice/Config.cc
index 54df2f1..0408389 100644
--- a/plugins/experimental/slice/Config.cc
+++ b/plugins/experimental/slice/Config.cc
@@ -112,7 +112,6 @@ Config::fromArgs(int const argc, char const *const argv[])
{const_cast<char *>("exclude-regex"), required_argument, nullptr, 'e'},
{const_cast<char *>("include-regex"), required_argument, nullptr, 'i'},
{const_cast<char *>("ref-relative"), no_argument, nullptr, 'l'},
- {const_cast<char *>("throttle"), no_argument, nullptr, 'o'},
{const_cast<char *>("pace-errorlog"), required_argument, nullptr, 'p'},
{const_cast<char *>("remap-host"), required_argument, nullptr, 'r'},
{const_cast<char *>("blockbytes-test"), required_argument, nullptr, 't'},
@@ -182,10 +181,6 @@ Config::fromArgs(int const argc, char const *const argv[])
m_reftype = RefType::Relative;
DEBUG_LOG("Reference slice relative to request (not slice block 0)");
} break;
- case 'o': {
- m_throttle = true;
- DEBUG_LOG("Enabling internal block throttling");
- } break;
case 'p': {
int const secsread = atoi(optarg);
if (0 < secsread) {
diff --git a/plugins/experimental/slice/Config.h b/plugins/experimental/slice/Config.h
index 1f59511..4a5b3d6 100644
--- a/plugins/experimental/slice/Config.h
+++ b/plugins/experimental/slice/Config.h
@@ -41,8 +41,7 @@ struct Config {
RegexType m_regex_type{None};
pcre *m_regex{nullptr};
pcre_extra *m_regex_extra{nullptr};
- bool m_throttle{false}; // internal block throttling
- int m_paceerrsecs{0}; // -1 disable logging, 0 no pacing, max 60s
+ int m_paceerrsecs{0}; // -1 disable logging, 0 no pacing, max 60s
enum RefType { First, Relative };
RefType m_reftype{First}; // reference slice is relative to request
diff --git a/plugins/experimental/slice/client.cc b/plugins/experimental/slice/client.cc
index d5cbd52..c1a9b59 100644
--- a/plugins/experimental/slice/client.cc
+++ b/plugins/experimental/slice/client.cc
@@ -140,22 +140,20 @@ handle_client_resp(TSCont contp, TSEvent event, Data *const data)
} break;
case BlockState::Pending: {
- bool start_next_block = true;
-
- if (data->m_config->m_throttle) {
- TSVIO const output_vio = data->m_dnstream.m_write.m_vio;
- int64_t const output_done = TSVIONDoneGet(output_vio);
- int64_t const output_sent = data->m_bytessent;
- int64_t const threshout = data->m_config->m_blockbytes;
-
- if (threshout < (output_sent - output_done)) {
- start_next_block = false;
- DEBUG_LOG("%p handle_client_resp: throttling %" PRId64, data, (output_sent - output_done));
- }
-
- if (start_next_block) {
- DEBUG_LOG("Starting next block request");
- request_block(contp, data);
+ // throttle
+ TSVIO const output_vio = data->m_dnstream.m_write.m_vio;
+ int64_t const output_done = TSVIONDoneGet(output_vio);
+ int64_t const output_sent = data->m_bytessent;
+ int64_t const threshout = data->m_config->m_blockbytes;
+ int64_t const buffered = output_sent - output_done;
+
+ if (threshout < buffered) {
+ DEBUG_LOG("%p handle_client_resp: throttling %" PRId64, data, buffered);
+ } else {
+ DEBUG_LOG("Starting next block request");
+ if (!request_block(contp, data)) {
+ data->m_blockstate = BlockState::Fail;
+ return;
}
}
} break;
diff --git a/plugins/experimental/slice/server.cc b/plugins/experimental/slice/server.cc
index bdc37da..089b1fc 100644
--- a/plugins/experimental/slice/server.cc
+++ b/plugins/experimental/slice/server.cc
@@ -549,13 +549,16 @@ handle_server_resp(TSCont contp, TSEvent event, Data *const data)
// header may have been successfully parsed but with caveats
switch (data->m_blockstate) {
// request new version of current internal slice
- case BlockState::PendingInt: {
- request_block(contp, data);
- return;
- } break;
- // request new version of reference slice
+ case BlockState::PendingInt:
case BlockState::PendingRef: {
- request_block(contp, data);
+ if (!request_block(contp, data)) {
+ data->m_blockstate = BlockState::Fail;
+ if (data->m_dnstream.m_write.isOpen()) {
+ TSVIOReenable(data->m_dnstream.m_write.m_vio);
+ } else {
+ shutdown(contp, data);
+ }
+ }
return;
} break;
case BlockState::ActiveRef: {
@@ -644,29 +647,33 @@ handle_server_resp(TSCont contp, TSEvent event, Data *const data)
// Don't immediately request the next slice if the client
// isn't keeping up
- bool start_next_block = true;
+ if (data->m_dnstream.m_write.isOpen()) {
+ bool start_next_block = true;
- // throttle condition
- if (data->m_config->m_throttle && data->m_dnstream.m_read.isOpen()) {
+ // check throttle condition
TSVIO const output_vio = data->m_dnstream.m_write.m_vio;
int64_t const output_done = TSVIONDoneGet(output_vio);
int64_t const output_sent = data->m_bytessent;
int64_t const threshout = data->m_config->m_blockbytes;
+ int64_t const buffered = output_sent - output_done;
- if (threshout < (output_sent - output_done)) {
+ if (threshout < buffered) {
start_next_block = false;
- DEBUG_LOG("%p handle_server_resp: throttling %" PRId64, data, (output_sent - output_done));
+ DEBUG_LOG("%p handle_server_resp: throttling %" PRId64, data, buffered);
}
- }
- if (start_next_block) {
- request_block(contp, data);
+ if (start_next_block) {
+ if (!request_block(contp, data)) {
+ data->m_blockstate = BlockState::Fail;
+ abort(contp, data);
+ return;
+ }
+ }
}
-
} else {
data->m_upstream.close();
data->m_blockstate = BlockState::Done;
- if (!data->m_dnstream.m_read.isOpen()) {
+ if (!data->m_dnstream.m_write.isOpen()) {
shutdown(contp, data);
}
}
diff --git a/plugins/experimental/slice/slice.h b/plugins/experimental/slice/slice.h
index bd58818..ed86dd6 100644
--- a/plugins/experimental/slice/slice.h
+++ b/plugins/experimental/slice/slice.h
@@ -39,8 +39,8 @@ constexpr std::string_view X_CRR_IMS_HEADER = {"X-Crr-Ims"};
#define __FILENAME__ (strrchr(__FILE__, '/') ? strrchr(__FILE__, '/') + 1 : __FILE__)
#define DEBUG_LOG(fmt, ...) TSDebug(PLUGIN_NAME, "[%s:% 4d] %s(): " fmt, __FILENAME__, __LINE__, __func__, ##__VA_ARGS__)
-#define ERROR_LOG(fmt, ...) \
- TSError("[%s:% 4d] %s(): " fmt, __FILENAME__, __LINE__, __func__, ##__VA_ARGS__); \
+#define ERROR_LOG(fmt, ...) \
+ TSError("[%s/%s:% 4d] %s(): " fmt, PLUGIN_NAME, __FILENAME__, __LINE__, __func__, ##__VA_ARGS__); \
TSDebug(PLUGIN_NAME, "[%s:%04d] %s(): " fmt, __FILENAME__, __LINE__, __func__, ##__VA_ARGS__)
#else
diff --git a/plugins/experimental/slice/transfer.cc b/plugins/experimental/slice/transfer.cc
index f83b088..ce31b75 100644
--- a/plugins/experimental/slice/transfer.cc
+++ b/plugins/experimental/slice/transfer.cc
@@ -33,56 +33,40 @@ transfer_content_bytes(Data *const data)
int64_t consumed = 0; // input vio bytes visited
int64_t copied = 0; // output bytes transferred
- bool const canWrite = data->m_dnstream.m_write.isOpen();
- bool done = false;
-
- TSIOBufferBlock block = TSIOBufferReaderStart(reader);
-
- while (!done && nullptr != block) {
- int64_t bavail = TSIOBufferBlockReadAvail(block, reader);
-
- if (0 == bavail) {
- block = TSIOBufferBlockNext(block);
- } else {
- int64_t toconsume = 0;
-
- if (canWrite) {
- int64_t const toskip = std::min(data->m_blockskip, bavail);
- if (0 < toskip) { // before bytes
- toconsume = toskip;
- data->m_blockskip -= toskip;
- } else {
- int64_t const bytesleft = data->m_bytestosend - data->m_bytessent;
- if (0 < bytesleft) { // transfer bytes
- int64_t const tocopy = std::min(bavail, bytesleft);
- int64_t const nbytes = TSIOBufferCopy(output_buf, reader, tocopy, 0);
-
- done = (nbytes < tocopy); // output buffer stuffed
-
- copied += nbytes;
- data->m_bytessent += nbytes;
-
- toconsume = nbytes;
- } else { // after bytes
- toconsume = bavail;
- }
- }
- } else { // drain
- toconsume = bavail;
- }
+ int64_t avail = TSIOBufferReaderAvail(reader);
+ if (0 < avail) {
+ int64_t const toskip = std::min(data->m_blockskip, avail);
+ if (0 < toskip) {
+ TSIOBufferReaderConsume(reader, toskip);
+ data->m_blockskip -= toskip;
+ avail -= toskip;
+ consumed += toskip;
+ }
+ }
- if (0 < toconsume) {
- if (bavail == toconsume) {
- block = TSIOBufferBlockNext(block);
- }
- TSIOBufferReaderConsume(reader, toconsume);
- consumed += toconsume;
- }
+ // bool const canWrite = data->m_dnstream.m_write.isOpen();
+
+ if (0 < avail) {
+ int64_t const bytesleft = data->m_bytestosend - data->m_bytessent;
+ int64_t const tocopy = std::min(avail, bytesleft);
+ if (0 < tocopy) {
+ copied = TSIOBufferCopy(output_buf, reader, tocopy, 0);
+
+ data->m_bytessent += copied;
+ TSIOBufferReaderConsume(reader, copied);
+
+ avail -= copied;
+ consumed += copied;
}
}
- // tell output more data is available
- if (0 < copied) {
+ // if hit fulfillment start bulk consuming
+ if (0 < avail && data->m_bytestosend <= data->m_bytessent) {
+ TSIOBufferReaderConsume(reader, avail);
+ consumed += avail;
+ }
+
+ if (0 < copied && nullptr != output_vio) {
TSVIOReenable(output_vio);
}
@@ -112,35 +96,23 @@ transfer_all_bytes(Data *const data)
TSIOBufferReader const reader = data->m_upstream.m_read.m_reader;
TSIOBuffer const output_buf = data->m_dnstream.m_write.m_iobuf;
- bool done = false;
+ int64_t const read_avail = TSIOBufferReaderAvail(reader);
- TSIOBufferBlock block = TSIOBufferReaderStart(reader);
+ if (0 < read_avail) {
+ int64_t const copied = TSIOBufferCopy(output_buf, reader, read_avail, 0);
- while (!done && nullptr != block) {
- int64_t bavail = TSIOBufferBlockReadAvail(block, reader);
+ if (0 < copied) {
+ TSIOBufferReaderConsume(reader, copied);
+ consumed = copied;
- if (0 == bavail) {
- block = TSIOBufferBlockNext(block);
- } else {
- int64_t const nbytes = TSIOBufferCopy(output_buf, reader, bavail, 0);
- done = nbytes < bavail; // output buffer is full
-
- if (0 < nbytes) {
- if (bavail == nbytes) {
- block = TSIOBufferBlockNext(block);
- }
- TSIOBufferReaderConsume(reader, nbytes);
- consumed += nbytes;
+ TSVIO const output_vio = data->m_dnstream.m_write.m_vio;
+ if (nullptr != output_vio) {
+ TSVIOReenable(output_vio);
}
}
}
if (0 < consumed) {
- TSVIO const output_vio = data->m_dnstream.m_write.m_vio;
- if (nullptr != output_vio) {
- TSVIOReenable(output_vio);
- }
-
TSVIO const input_vio = data->m_upstream.m_read.m_vio;
if (nullptr != input_vio) {
TSVIONDoneSet(input_vio, TSVIONDoneGet(input_vio) + consumed);
diff --git a/plugins/experimental/slice/util.cc b/plugins/experimental/slice/util.cc
index 04516d4..15c68b4 100644
--- a/plugins/experimental/slice/util.cc
+++ b/plugins/experimental/slice/util.cc
@@ -128,6 +128,7 @@ request_block(TSCont contp, Data *const data)
break;
default:
ERROR_LOG("Invalid blockstate");
+ return false;
break;
}