You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@trafficserver.apache.org by rr...@apache.org on 2022/04/05 14:55:43 UTC

[trafficserver] branch master updated: Adding prefetch feature to slice plugin (#8723)

This is an automated email from the ASF dual-hosted git repository.

rrm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/trafficserver.git


The following commit(s) were added to refs/heads/master by this push:
     new 99545bfd4 Adding prefetch feature to slice plugin (#8723)
99545bfd4 is described below

commit 99545bfd4a01146060effc74add7f0aab3888415
Author: Serris Lew <se...@gmail.com>
AuthorDate: Tue Apr 5 07:55:36 2022 -0700

    Adding prefetch feature to slice plugin (#8723)
    
    Co-authored-by: Serris Lew <ls...@apple.com>
---
 doc/admin-guide/plugins/slice.en.rst               |   7 +
 plugins/experimental/slice/Config.cc               |   4 +
 plugins/experimental/slice/Config.h                |   3 +-
 plugins/experimental/slice/Data.h                  |   3 +
 plugins/experimental/slice/Makefile.inc            |   2 +
 plugins/experimental/slice/prefetch.cc             | 134 ++++++++++++++++
 plugins/experimental/slice/prefetch.h              |  56 +++++++
 plugins/experimental/slice/util.cc                 |  14 ++
 .../pluginTest/slice/slice_prefetch.test.py        | 178 +++++++++++++++++++++
 9 files changed, 400 insertions(+), 1 deletion(-)

diff --git a/doc/admin-guide/plugins/slice.en.rst b/doc/admin-guide/plugins/slice.en.rst
index 0246fa78f..8b56bbd36 100644
--- a/doc/admin-guide/plugins/slice.en.rst
+++ b/doc/admin-guide/plugins/slice.en.rst
@@ -126,6 +126,13 @@ The slice plugin supports the following options::
         `cache_range_requests` plugin.
         -i for short
 
+    --prefetch-count=<int> (optional)
+        Default is 0
+        Prefetches successive 'n' slice block requests in the background
+        and cached. Especially for large objects, prefetching can improve
+        cache miss latency.
+        -f for short
+
 Examples::
 
     @plugin=slice.so @pparam=--blockbytes=1000000 @plugin=cache_range_requests.so
diff --git a/plugins/experimental/slice/Config.cc b/plugins/experimental/slice/Config.cc
index 2c810c693..5cd71d473 100644
--- a/plugins/experimental/slice/Config.cc
+++ b/plugins/experimental/slice/Config.cc
@@ -123,6 +123,7 @@ Config::fromArgs(int const argc, char const *const argv[])
     {const_cast<char *>("remap-host"), required_argument, nullptr, 'r'},
     {const_cast<char *>("skip-header"), required_argument, nullptr, 's'},
     {const_cast<char *>("blockbytes-test"), required_argument, nullptr, 't'},
+    {const_cast<char *>("prefetch-count"), required_argument, nullptr, 'f'},
     {nullptr, 0, nullptr, 0},
   };
 
@@ -222,6 +223,9 @@ Config::fromArgs(int const argc, char const *const argv[])
         DEBUG_LOG("Skipping blockbytes-test in favor of blockbytes");
       }
     } break;
+    case 'f': {
+      m_prefetchcount = atoi(optarg);
+    } break;
     default:
       break;
     }
diff --git a/plugins/experimental/slice/Config.h b/plugins/experimental/slice/Config.h
index 7188c6ea9..8408ffc20 100644
--- a/plugins/experimental/slice/Config.h
+++ b/plugins/experimental/slice/Config.h
@@ -41,7 +41,8 @@ struct Config {
   RegexType m_regex_type{None};
   pcre *m_regex{nullptr};
   pcre_extra *m_regex_extra{nullptr};
-  int m_paceerrsecs{0}; // -1 disable logging, 0 no pacing, max 60s
+  int m_paceerrsecs{0};   // -1 disable logging, 0 no pacing, max 60s
+  int m_prefetchcount{0}; // 0 disables prefetching
   enum RefType { First, Relative };
   RefType m_reftype{First}; // reference slice is relative to request
 
diff --git a/plugins/experimental/slice/Data.h b/plugins/experimental/slice/Data.h
index 496ebdc39..9cfbb2f7e 100644
--- a/plugins/experimental/slice/Data.h
+++ b/plugins/experimental/slice/Data.h
@@ -25,6 +25,7 @@
 #include "Stage.h"
 
 #include <netinet/in.h>
+#include <unordered_map>
 
 struct Config;
 
@@ -92,6 +93,8 @@ struct Data {
   Stage m_upstream;
   Stage m_dnstream;
 
+  std::unordered_map<int, bool> m_fetchstates;
+
   HdrMgr m_req_hdrmgr;  // manager for server request
   HdrMgr m_resp_hdrmgr; // manager for client response
 
diff --git a/plugins/experimental/slice/Makefile.inc b/plugins/experimental/slice/Makefile.inc
index be376ca1c..f02631e20 100644
--- a/plugins/experimental/slice/Makefile.inc
+++ b/plugins/experimental/slice/Makefile.inc
@@ -28,6 +28,8 @@ experimental_slice_slice_la_SOURCES = \
   experimental/slice/HttpHeader.h \
   experimental/slice/intercept.cc \
   experimental/slice/intercept.h \
+  experimental/slice/prefetch.cc \
+  experimental/slice/prefetch.h \
   experimental/slice/Range.cc \
   experimental/slice/Range.h \
   experimental/slice/response.cc \
diff --git a/plugins/experimental/slice/prefetch.cc b/plugins/experimental/slice/prefetch.cc
new file mode 100644
index 000000000..2325db77f
--- /dev/null
+++ b/plugins/experimental/slice/prefetch.cc
@@ -0,0 +1,134 @@
+/*
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+*/
+
+/**
+ * @file prefetch.cpp
+ * @brief Background fetch related classes (header file).
+ */
+
+#include "ts/ts.h" /* ATS API */
+#include "prefetch.h"
+
+bool
+BgBlockFetch::schedule(Data *const data, int blocknum)
+{
+  bool ret         = false;
+  BgBlockFetch *bg = new BgBlockFetch(blocknum);
+  if (bg->fetch(data)) {
+    ret = true;
+  } else {
+    delete bg;
+  }
+  return ret;
+}
+
+/**
+ * Initialize and schedule the background fetch
+ */
+bool
+BgBlockFetch::fetch(Data *const data)
+{
+  if (_bg_stream.m_read.isOpen()) {
+    // should never happen since the connection was just initialized
+    ERROR_LOG("Background block request already in flight!");
+    return false;
+  }
+
+  int64_t const blockbeg = (data->m_config->m_blockbytes * _blocknum);
+  Range blockbe(blockbeg, blockbeg + data->m_config->m_blockbytes);
+
+  char rangestr[1024];
+  int rangelen      = sizeof(rangestr);
+  bool const rpstat = blockbe.toStringClosed(rangestr, &rangelen);
+  TSAssert(rpstat);
+
+  DEBUG_LOG("Request background block: %s", rangestr);
+
+  // reuse the incoming client header, just change the range
+  HttpHeader header(data->m_req_hdrmgr.m_buffer, data->m_req_hdrmgr.m_lochdr);
+
+  // add/set sub range key and add slicer tag
+  bool const rangestat = header.setKeyVal(TS_MIME_FIELD_RANGE, TS_MIME_LEN_RANGE, rangestr, rangelen);
+
+  if (!rangestat) {
+    ERROR_LOG("Error trying to set range request header %s", rangestr);
+    return false;
+  }
+  TSAssert(nullptr == _cont);
+
+  // Setup the continuation
+  _cont = TSContCreate(handler, TSMutexCreate());
+  TSContDataSet(_cont, static_cast<void *>(this));
+
+  // create virtual connection back into ATS
+  TSHttpConnectOptions options = TSHttpConnectOptionsGet(TS_CONNECT_PLUGIN);
+  options.addr                 = reinterpret_cast<sockaddr *>(&data->m_client_ip);
+  options.tag                  = PLUGIN_NAME;
+  options.id                   = 0;
+  options.buffer_index         = data->m_buffer_index;
+  options.buffer_water_mark    = data->m_buffer_water_mark;
+
+  TSVConn const upvc = TSHttpConnectPlugin(&options);
+
+  int const hlen = TSHttpHdrLengthGet(header.m_buffer, header.m_lochdr);
+
+  // set up connection with the HttpConnect server
+  _bg_stream.setupConnection(upvc);
+  _bg_stream.setupVioWrite(_cont, hlen);
+  TSHttpHdrPrint(header.m_buffer, header.m_lochdr, _bg_stream.m_write.m_iobuf);
+
+  if (TSIsDebugTagSet(PLUGIN_NAME)) {
+    std::string const headerstr(header.toString());
+    DEBUG_LOG("Headers\n%s", headerstr.c_str());
+  }
+
+  data->m_fetchstates[_blocknum] = true;
+  return true;
+}
+
+/**
+ * @brief Continuation to close background fetch after
+ * writing to cache is complete or error
+ *
+ */
+int
+BgBlockFetch::handler(TSCont contp, TSEvent event, void * /* edata ATS_UNUSED */)
+{
+  BgBlockFetch *bg = static_cast<BgBlockFetch *>(TSContDataGet(contp));
+
+  switch (event) {
+  case TS_EVENT_VCONN_WRITE_COMPLETE:
+    TSVConnShutdown(bg->_bg_stream.m_vc, 0, 1);
+    bg->_bg_stream.close();
+    delete bg;
+    break;
+  default:
+    if (event == TS_EVENT_VCONN_INACTIVITY_TIMEOUT) {
+      DEBUG_LOG("encountered Inactivity Timeout");
+      TSVConnAbort(bg->_bg_stream.m_vc, TS_VC_CLOSE_ABORT);
+    } else {
+      TSVConnClose(bg->_bg_stream.m_vc);
+    }
+    bg->_bg_stream.abort();
+    TSContDataSet(contp, nullptr);
+    delete bg;
+    TSContDestroy(contp);
+    break;
+  }
+  return 0;
+}
diff --git a/plugins/experimental/slice/prefetch.h b/plugins/experimental/slice/prefetch.h
new file mode 100644
index 000000000..e04376bac
--- /dev/null
+++ b/plugins/experimental/slice/prefetch.h
@@ -0,0 +1,56 @@
+/*
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+*/
+
+/**
+ * @file prefetch.h
+ * @brief Background fetch classes for slice plugin.
+ */
+
+#pragma once
+
+#include <map>
+
+#include "ts/ts.h"
+#include "Data.h"
+#include "Config.h"
+
+/**
+ * @brief Represents a single background fetch.
+ */
+struct BgBlockFetch {
+  static bool schedule(Data *const data, int blocknum);
+
+  explicit BgBlockFetch(int blocknum) : _blocknum(blocknum) {}
+
+  bool fetch(Data *const data);
+  static int handler(TSCont contp, TSEvent event, void * /* edata ATS_UNUSED */);
+
+  /* This is for the actual background fetch / NetVC */
+  Stage _bg_stream;
+
+  int _blocknum;
+  TSCont _cont = nullptr;
+
+  ~BgBlockFetch()
+  {
+    if (nullptr != _cont) {
+      TSContDestroy(_cont);
+      _cont = nullptr;
+    }
+  }
+};
\ No newline at end of file
diff --git a/plugins/experimental/slice/util.cc b/plugins/experimental/slice/util.cc
index 90ef9d8b5..7e4bc1148 100644
--- a/plugins/experimental/slice/util.cc
+++ b/plugins/experimental/slice/util.cc
@@ -17,6 +17,7 @@
  */
 
 #include "util.h"
+#include "prefetch.h"
 
 #include "Config.h"
 #include "Data.h"
@@ -110,6 +111,19 @@ request_block(TSCont contp, Data *const data)
     DEBUG_LOG("Headers\n%s", headerstr.c_str());
   }
 
+  // if prefetch config set, schedule next block requests in background
+  for (int i = 0; i < data->m_config->m_prefetchcount; i++) {
+    int nextblocknum = data->m_blocknum + i + 1;
+    if (data->m_req_range.blockIsInside(data->m_config->m_blockbytes, nextblocknum) && !data->m_fetchstates[nextblocknum]) {
+      if (BgBlockFetch::schedule(data, nextblocknum)) {
+        DEBUG_LOG("Background fetch requested");
+      } else {
+        DEBUG_LOG("Background fetch not requested");
+      }
+    } else {
+      break;
+    }
+  }
   // get ready for data back from the server
   data->m_upstream.setupVioRead(contp, INT64_MAX);
 
diff --git a/tests/gold_tests/pluginTest/slice/slice_prefetch.test.py b/tests/gold_tests/pluginTest/slice/slice_prefetch.test.py
new file mode 100644
index 000000000..5dc6aed13
--- /dev/null
+++ b/tests/gold_tests/pluginTest/slice/slice_prefetch.test.py
@@ -0,0 +1,178 @@
+'''
+'''
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+
+Test.Summary = '''
+slice plugin prefetch feature test
+'''
+
+# Test description:
+# Fill origin server with range requests
+# Request content through slice plugin with varied prefetch counts
+
+Test.SkipUnless(
+    Condition.PluginExists('slice.so'),
+    Condition.PluginExists('cache_range_requests.so'),
+)
+Test.ContinueOnFail = False
+
+# configure origin server, lookup by Range header
+server = Test.MakeOriginServer("server", lookup_key="{%Range}")
+
+# Define ATS and configure
+ts = Test.MakeATSProcess("ts", command="traffic_server")
+
+block_bytes_1 = 7
+block_bytes_2 = 5
+body = "lets go surfin now"
+bodylen = len(body)
+
+request_header = {"headers":
+                  "GET /path HTTP/1.1\r\n" +
+                  "Host: origin\r\n" +
+                  "\r\n",
+                  "timestamp": "1469733493.993",
+                  "body": "",
+                  }
+
+response_header = {"headers":
+                   "HTTP/1.1 200 OK\r\n" +
+                   "Connection: close\r\n" +
+                   "Cache-Control: public, max-age=500\r\n" +
+                   "\r\n",
+                   "timestamp": "1469733493.993",
+                   "body": body,
+                   }
+
+server.addResponse("sessionlog.json", request_header, response_header)
+
+# Autest OS doesn't support range request, must manually add requests/responses
+for block_bytes in [block_bytes_1, block_bytes_2]:
+    for i in range(bodylen // block_bytes + 1):
+        b0 = i * block_bytes
+        b1 = b0 + block_bytes - 1
+        req_header = {"headers":
+                      "GET /path HTTP/1.1\r\n" +
+                      "Host: *\r\n" +
+                      "Accept: */*\r\n" +
+                      f"Range: bytes={b0}-{b1}\r\n" +
+                      "\r\n",
+                      "timestamp": "1469733493.993",
+                      "body": ""
+                      }
+        if (b1 > bodylen - 1):
+            b1 = bodylen - 1
+        resp_header = {"headers":
+                       "HTTP/1.1 206 Partial Content\r\n" +
+                       "Accept-Ranges: bytes\r\n" +
+                       "Cache-Control: public, max-age=500\r\n" +
+                       f"Content-Range: bytes {b0}-{b1}/{bodylen}\r\n" +
+                       "Connection: close\r\n" +
+                       "\r\n",
+                       "timestamp": "1469733493.993",
+                       "body": body[b0:b1 + 1]
+                       }
+        server.addResponse("sessionlog.json", req_header, resp_header)
+
+curl_and_args = 'curl -s -D /dev/stdout -o /dev/stderr -x http://127.0.0.1:{}'.format(ts.Variables.port)
+
+ts.Disk.remap_config.AddLines([
+    f'map http://sliceprefetch1bytes1/ http://127.0.0.1:{server.Variables.Port}' +
+    f' @plugin=slice.so @pparam=--blockbytes-test={block_bytes_1} @pparam=--prefetch-count=1' +
+    ' @plugin=cache_range_requests.so',
+    f'map http://sliceprefetch2bytes1/ http://127.0.0.1:{server.Variables.Port}' +
+    f' @plugin=slice.so @pparam=--blockbytes-test={block_bytes_1} @pparam=--prefetch-count=2' +
+    ' @plugin=cache_range_requests.so',
+    f'map http://sliceprefetch1bytes2/ http://127.0.0.1:{server.Variables.Port}' +
+    f' @plugin=slice.so @pparam=--blockbytes-test={block_bytes_2} @pparam=--prefetch-count=1' +
+    ' @plugin=cache_range_requests.so',
+    f'map http://sliceprefetch2bytes2/ http://127.0.0.1:{server.Variables.Port}' +
+    f' @plugin=slice.so @pparam=--blockbytes-test={block_bytes_2} @pparam=--prefetch-count=2' +
+    ' @plugin=cache_range_requests.so',
+])
+
+ts.Disk.records_config.update({
+    'proxy.config.diags.debug.enabled': 1,
+    'proxy.config.diags.debug.tags': 'slice|cache_range_requests|pvc',
+})
+
+# 0 Test - Full object slice with only next block prefetched in background, block bytes= 7
+tr = Test.AddTestRun("Full object slice with only next block prefetched in background, block bytes= 7")
+ps = tr.Processes.Default
+ps.StartBefore(server, ready=When.PortOpen(server.Variables.Port))
+ps.StartBefore(Test.Processes.ts)
+ps.Command = curl_and_args + ' http://sliceprefetch1bytes1/path'
+ps.ReturnCode = 0
+ps.Streams.stderr = "gold/slice_200.stderr.gold"
+ps.Streams.stdout.Content = Testers.ContainsExpression("200 OK", "expected 200 OK response")
+tr.StillRunningAfter = ts
+
+# 1 Test - Full object slice with nest 2 blocks prefetched in background, block bytes= 7
+tr = Test.AddTestRun("Test - Full object slice with nest 2 blocks prefetched in background, block bytes= 7")
+ps = tr.Processes.Default
+ps.Command = curl_and_args + ' http://sliceprefetch2bytes1/path'
+ps.ReturnCode = 0
+ps.Streams.stderr = "gold/slice_200.stderr.gold"
+ps.Streams.stdout.Content = Testers.ContainsExpression("200 OK", "expected 200 OK response")
+tr.StillRunningAfter = ts
+
+# 2 Test - Full object slice with only next block prefetched in background, block bytes= 5
+tr = Test.AddTestRun("Full object slice with only next block prefetched in background, block bytes= 5")
+ps = tr.Processes.Default
+ps.Command = curl_and_args + ' http://sliceprefetch1bytes2/path'
+ps.ReturnCode = 0
+ps.Streams.stderr = "gold/slice_200.stderr.gold"
+ps.Streams.stdout.Content = Testers.ContainsExpression("200 OK", "expected 200 OK response")
+tr.StillRunningAfter = ts
+
+# 3 Test - Full object slice with nest 2 blocks prefetched in background, block bytes= 5
+tr = Test.AddTestRun("Full object slice with nest 2 blocks prefetched in background, block bytes= 5")
+ps = tr.Processes.Default
+ps.Command = curl_and_args + ' http://sliceprefetch2bytes2/path'
+ps.ReturnCode = 0
+ps.Streams.stderr = "gold/slice_200.stderr.gold"
+ps.Streams.stdout.Content = Testers.ContainsExpression("200 OK", "expected 200 OK response")
+tr.StillRunningAfter = ts
+
+# 4 Test - Whole asset via range
+tr = Test.AddTestRun("Whole asset via range")
+ps = tr.Processes.Default
+ps.Command = curl_and_args + ' http://sliceprefetch1bytes1/path' + ' -r 0-'
+ps.ReturnCode = 0
+ps.Streams.stderr = "gold/slice_206.stderr.gold"
+ps.Streams.stdout.Content = Testers.ContainsExpression("206 Partial Content", "expected 206 response")
+ps.Streams.stdout.Content += Testers.ContainsExpression("Content-Range: bytes 0-17/18", "mismatch byte content response")
+tr.StillRunningAfter = ts
+
+# 5 Test - Non aligned slice request
+tr = Test.AddTestRun("Non aligned slice request")
+ps = tr.Processes.Default
+ps.Command = curl_and_args + ' http://sliceprefetch1bytes1/path' + ' -r 5-16'
+ps.ReturnCode = 0
+ps.Streams.stderr = "gold/slice_mid.stderr.gold"
+ps.Streams.stdout.Content = Testers.ContainsExpression("206 Partial Content", "expected 206 response")
+ps.Streams.stdout.Content += Testers.ContainsExpression("Content-Range: bytes 5-16/18", "mismatch byte content response")
+tr.StillRunningAfter = ts
+
+# 6 Test - special case, begin inside last slice block but outside asset len
+tr = Test.AddTestRun("Invalid end range request, 416")
+beg = len(body) + 1
+end = beg + block_bytes
+ps = tr.Processes.Default
+ps.Command = curl_and_args + f' http://sliceprefetch1bytes1/path -r {beg}-{end}'
+ps.Streams.stdout.Content = Testers.ContainsExpression("416 Requested Range Not Satisfiable", "expected 416 response")
+tr.StillRunningAfter = ts