You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@trafficserver.apache.org by zw...@apache.org on 2018/08/13 20:23:00 UTC
[trafficserver] branch 8.0.x updated: Prefetch plugin
This is an automated email from the ASF dual-hosted git repository.
zwoop pushed a commit to branch 8.0.x
in repository https://gitbox.apache.org/repos/asf/trafficserver.git
The following commit(s) were added to refs/heads/8.0.x by this push:
new 10dce66 Prefetch plugin
10dce66 is described below
commit 10dce66501294b297f376dcbdb4bc75003df97d5
Author: Gancho Tenev <ga...@apache.org>
AuthorDate: Sat Jun 30 00:48:24 2018 -0700
Prefetch plugin
The purpose of the plugin is to increase the cache-hit ratio
for a sequence of objects which URL paths follow a common pattern.
(cherry picked from commit 18e67bd9e9790543e2872741d48b61571ef5408c)
---
doc/admin-guide/plugins/index.en.rst | 1 +
doc/admin-guide/plugins/prefetch.en.rst | 278 ++++++++
.../images/admin/prefetch_plugin_deployment.png | Bin 0 -> 222757 bytes
plugins/Makefile.am | 1 +
plugins/experimental/prefetch/Makefile.inc | 28 +
plugins/experimental/prefetch/README.md | 8 +
plugins/experimental/prefetch/common.cc | 59 ++
plugins/experimental/prefetch/common.h | 68 ++
plugins/experimental/prefetch/configs.cc | 172 +++++
plugins/experimental/prefetch/configs.h | 202 ++++++
plugins/experimental/prefetch/fetch.cc | 739 ++++++++++++++++++++
plugins/experimental/prefetch/fetch.h | 202 ++++++
plugins/experimental/prefetch/fetch_policy.cc | 57 ++
plugins/experimental/prefetch/fetch_policy.h | 66 ++
plugins/experimental/prefetch/fetch_policy_lru.cc | 141 ++++
plugins/experimental/prefetch/fetch_policy_lru.h | 105 +++
.../experimental/prefetch/fetch_policy_simple.cc | 80 +++
.../experimental/prefetch/fetch_policy_simple.h | 46 ++
plugins/experimental/prefetch/headers.cc | 213 ++++++
plugins/experimental/prefetch/headers.h | 31 +
plugins/experimental/prefetch/pattern.cc | 463 +++++++++++++
plugins/experimental/prefetch/pattern.h | 92 +++
plugins/experimental/prefetch/plugin.cc | 751 +++++++++++++++++++++
23 files changed, 3803 insertions(+)
diff --git a/doc/admin-guide/plugins/index.en.rst b/doc/admin-guide/plugins/index.en.rst
index 9a61b2e..0b097f3 100644
--- a/doc/admin-guide/plugins/index.en.rst
+++ b/doc/admin-guide/plugins/index.en.rst
@@ -160,6 +160,7 @@ directory of the |TS| source tree. Experimental plugins can be compiled by passi
Stale While Revalidate <stale_while_revalidate.en>
System Statistics <system_stats.en>
WebP Transform <webp_transform.en>
+ Prefetch <prefetch.en>
:doc:`Access Control <access_control.en>`
Access control plugin that handles various access control use-cases.
diff --git a/doc/admin-guide/plugins/prefetch.en.rst b/doc/admin-guide/plugins/prefetch.en.rst
new file mode 100644
index 0000000..93a7e3c
--- /dev/null
+++ b/doc/admin-guide/plugins/prefetch.en.rst
@@ -0,0 +1,278 @@
+.. Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+
+.. include:: ../../common.defs
+
+.. _admin-plugins-prefetch:
+
+
+Prefetch Plugin
+***************
+
+Description
+===========
+
+The purpose of the plugin is to increase the cache-hit ratio for a sequence of
+objects which URL paths follow a common pattern.
+
+On every **incoming** URL request, the plugin can decide to pre-fetch the
+**next object** or more objects based on the common URL path pattern and a
+pre-defined pre-fetch policy.
+
+Currently, most HLS video urls follow a predictable pattern, with most URLs
+containing a segment number. Since the segments are ~10s of content, the normal
+usage pattern is to fetch the incremental segment every few seconds. The CDN
+has its best chance of delivering a good user experience if the requests are
+served from cache. Since we can predict the **next object** fetched, we should be
+able to dramatically increase the chance of it being a cache hit.
+
+This is primarily useful for:
+
+* less popular content. Popular movies' segments are constantly being refreshed
+ in cache by user requests. Less popular content is less likely to be in cache.
+* device failures. There can be a significant time gap between a seeding request
+ and the user request. During this time, devices can fail, which cause cache
+ misses. The time gap between the plugin's request and the user's request can be
+ used to smooth over the failures.
+
+
+Why do this? Isn't seeding sufficient?
+--------------------------------------
+
+In practice the cache hit rate for the user facing HLS video content is never perfect 100%.
+This plugin should increase the cache hit rate.
+
+* The caches will eventually wrap. The unpopular content/tiers will be evicted,
+ and not repopulated.
+* Disks fail. Content on these disk will become cache misses at least at that layer.
+* Machines fail.
+ * Content seeded on those machines will become misses
+ * Content seeded while a machine is down will be seeded to the "wrong" machine
+* Bugs.
+ * The content is usually managed by another organization which could have
+ * issues determining all the content (especially international)
+ * issues getting every asset on the storage service (i.e. AWS S3)
+ * issues sending us the assets in time
+ * ATS has had 5xx errors preventing seeding of assets
+* The process of seeding sometimes wastes significantly more resources then the normal usage.
+
+
+How well it works
+-----------------
+
+The Prefetch plugin was initially designed to assist the seeding performed by a separate subsystem / process which suffered the problems mentioned above.
+
+The initial Prefetch plugin deployment graph below shows the per-POP cache-hit-ratio before and after its full deployment.
+It is worth mentioning that a small percentage of the requests did not follow a predictable pattern and were not handled by the plugin.
+
+.. figure:: ../../static/images/admin/prefetch_plugin_deployment.png
+ :align: center
+ :alt: prefetch plugin initial deployment
+
+ Prefetch plugin initial deployment.
+
+* All POPs were seeded periodically except for POP #1 and the plugin was deployed in the following order: POP #0, #1, #2, #3 and then to the rest at once.
+* POP #0 was the first plugin deployment and was used to tune its configuration for better results.
+* POP #1 was a "testing ground" for the “worst case” (no seeding at all, imperfect conditions like low traffic and poorer connectivity to origin) and relying only on the Prefetch plugin.
+* POP #2 and POP #3 experienced seeding problems (at times it reached ~60%, not shown here).
+
+
+How does it work?
+-----------------
+
+The primary use-case for the plugin is to work in a multi-tier (child-parent)
+environment where a consistent hashing of the URI is used to choose the next
+tier parent but a single-tier use case is also supported (should work w/o
+any code changes).
+
+When a request comes to the child (only), the url is checked in an LRU. If the
+object exists in the LRU, we assume that we've pre-fetched the following object
+recently, and thus do not need to take any further action. If, however, the
+object is **not** found, we proceed with prefetching.
+
+The plugin calculates the URI of the next segment, ATS performs the consistent
+hash calculation on it to find the appropriate parent, and sends that parent
+a request for it, including a special header. When the parent receives the
+request, it will either find it in cache or begin the fetch from its next tier.
+Since the request from the child has the special header, the parent will only
+send the headers of the object back to the client, saving network and processing
+bytes. The child thus does not cache the pre-fetched object which is ok since
+the user may not hit that same child for the subsquent object.
+
+Then, when the user makes their next request for the pre-fetched object, the
+child that handles the request will perform the consistent-hash, find the
+same parent that got the pre-fetch request, and be served from its cache.
+
+Usage
+-----
+
+* Dual-tiered usage - the plugin runs in 2 modes (2 instances)
+ * the **front-tier** instance decides if the "next" object needs prefetching
+ based on the pre-fetch policy and only sends a signal to the **back-tier**
+ * the **back-tier** instance responds quickly w/o returning any objects to
+ the **front-tier** and actually performs the background fetch.
+* Single-tier usage - the plugin runs on the first user facing tier.
+
+
+How the "next" object path is calculated
+----------------------------------------
+
+* The cache key of an incoming URL is checked against the fetching policy defined by ``--fetch-policy``.
+* If the **next object** is to be pre-fetched the ``--fetch-path-pattern=/regex/capture/`` is used to transform the **incoming** URL path into the **next**
+* The number of prefetched objects is specified by ``—fetch-count``
+* The hostname of the prefetch request can be replaced by using ``--replace-host``
+
+Let say we have the following setup ::
+
+ map http://example.com http://origin.com \
+ @plugin=cachekey.so @pparam=--remove-all-params=true \
+ @plugin=prefetch.so \
+ @pparam=--fetch-policy=simple \
+ @pparam=--fetch-path-pattern=/(.*-)(\d+)(.*)/$1{$2+2}$3/ \
+ @pparam=--fetch-count=3 \
+ @pparam=--replace-host=example-seed.com
+
+
+If the "incoming" URL is ::
+
+ http://example.com/path/file-104.mov?a=a&b=b
+
+
+the the following URLs will be requested to be prefetched ::
+
+ http://example-seed.com/path/file-106.mov?a=a&b=b
+ http://example-seed.com/path/file-108.mov?a=a&b=b
+ http://example-seed.com/path/file-110.mov?a=a&b=b
+
+
+Note ``--fetch-path-pattern`` is a PCRE regex/capture pattern and
+``{$2+2}`` is a mechanism to calculate the next path by adding or
+subtracting integer numbers.
+
+
+Overhead from **next object** prefetch
+--------------------------------------
+
+Consuming extra resources
+^^^^^^^^^^^^^^^^^^^^^^^^^
+
+The plugin uses more CDN resources to improve the user experience. The plugin
+attempts to minimize the extraneous resources used.
+
+* The prefetch policy (LRU) attempts to minimize the URLs fetched. The popular video segments
+ (which represent the majority of the requests) will quickly populate the LRU,
+ preventing their pre-fetching.
+* If the original request is for the last segment in the video, the plugin will
+ make our system have a frivolous request to origin for the next non-existent
+ segment.
+* If the user stops watching the video, the plugin may (if not popular) make a
+ request for a single segment that goes un-requested.
+
+Minimizing **next object** prefetch overhead
+^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
+
+The current implementation relies on the following assumptions and egnineering
+compromises:
+
+* **First match the next object pattern** defined by ``--fetch-path-pattern``
+ plugin parameter, not matching requests are ignored (prefetch is never triggered)
+* **Define a prefetch policy** which tries to suppress uneccessary **next object**
+ prefetches for the most recently used requests which are assumed should be already
+ in cache. Currently only ``lru:n`` policy is supported, it is using an URI-hash LRU
+ cache which evicts the least recently used elements first. Every request's **cache key**
+ is checked against it and if found the **next object** prefetch is skipped/cancelled.
+ (**side note**: the ``lru:n`` **is not** the same as cache_promote plugin `lru`,
+ the latter is rather a frequency divider for the most recently used URIs).
+* **Always use request's cache key** instead of request's URI and also **remove
+ the query parameters** from the key which guarantees that multiple different
+ requests which would result in using the same **cache key** are not considered as
+ separate requests (which could bloat/dilute the LRU cache
+ if not normalized)
+* **Check if the the fetch request is unique**. A ``simple`` prefetching policy is
+ always used to make sure prefetches for the same object (same cache key) are
+ never triggered simultaneously.
+* **Check if already cached**. Before triggering the prefetch request to the
+ next tier it is always checked if the **incoming** object is already cached,
+ assuming that if already cached there is a good change the **next
+ object** would be cached as well.
+* **Don't fetch the response body** and **never cache** at the **front-tier**.
+ The **front-tier** marks the prefetch request with a special API header defined
+ by ``--api-header`` plugin parameter. When recieved the **back-tier** responds
+ right away before actually fetching the object (without a body), it just
+ schedules the real prefetch at the **back-tier**. ``Cache-Control: no-store``
+ is used to make sure the prefetch request response is never cached at the **front-tier**.
+ In such a way resources are saved (time, memory, CPU, bandwidth, etc) and also
+ unnecessary caching at the **front-tier** is avoided (where currently cache_promote
+ plugin is already being used to alleviate the load on the disks).
+* **Throttle the prefetch activity** - if necessary a limit can by imposed on the
+ number of concurrent prefetch requests by using ``--fetch-max`` plugin parameter.
+
+Plugin parameters
+=================
+
+* ``--front``
+ - ``true`` - configures the plugin run on the **front-tier**,
+ - ``false`` - to be run on the **back-tier**.
+* ``--api-header`` - the header used by the plugin internally, also used to mark a prefetch request to the next tier in dual-tier usage.
+* ``--fetch-policy`` - fetch policy
+ - ``simple`` - this policy just makes sure there are no same concurrent prefetches triggered (default and always used in combination with any other policy)
+ - ``lru:n`` - this policy uses LRU to identify “hot” objects and triggers prefetch if the object is not found. `n` is the size of the LRU
+* ``--fetch-count`` - how many objects to be prefetched.
+* ``--fetch-path-pattern`` - regex/capture pattern that would transform the **incoming** into the **next object** path.
+* ``--fetch-max`` - maximum concurrent fetches allowed, this would allow to throttle the prefetch activity if necessary
+* ``--replace-host`` - allows the prefetch requests to be forwarded to a different host or remap rule (replaces the host in the prefetch request)
+* ``--name-space`` - by default all plugin instances used for all remap use a single background fetch state, this parameter allows to specify a separate state per remap rule of per group of remap rules.
+* ``--metrics-prefix`` - prefix for the metrics generated by the plugin.
+* ``--exact-match``
+ * if ``false`` (default) the fetch policy would use the **incoming** URL's cache key to find out if the **next object** should be prefetched or not,
+ * if ``true`` the fetch policy would use the **next** URL's cache key that to find out if the **next object** should be prefetched or not
+* ``--log-name`` - specifies a custom log name (if not specified a log is not created)
+
+Metrics
+=======
+
+The plugin maintains the following metrics:
+
+* Prefetch request status related
+ * ``fetch.active`` - number of currently active prefetch requests (counter)
+ * ``fetch.completed``- number of succesfully completed prefetch requests (counter)
+ * ``fetch.errors`` - number of failed prefetch requests (counter)
+ * ``fetch.timeouts`` - number of timed-out prefetch requests (counter)
+ * ``fetch.throttled`` - number of throttled prefetch requests (counter), throttle limit defined by ``--fetch-max``
+ * ``fetch.total``- total number of prefetch requests (counter).
+* Fetch policy related:
+ * all **incoming** request URIs are first matched against the next object pattern defined in ``--fetch-path-pattern``
+ * ``fetch.match.yes`` - number of requests matched the pattern (counter), eligible for triggering prefetch request
+ * ``fetch.match.no`` - number of requests not matching the pattern (counter), ignored by the plugin, will never trigger prefetch request
+ * prefetch policy related (i.e. ``--fetch-policy=lru:n``)
+ * ``fetch.policy.yes`` - number of times (counter) the policy allowed scheduling of the prefetch request (for ``lru:n`` policy cachekey **was not** found in the LRU)
+ * ``fetch.policy.no`` - number of times (counter) the policy disallowed scheduling of the prefetch request (for ``lru:n`` policy cachekey **was** found in the LRU)
+ * ``fetch.policy.maxsize`` - size of the prefetch policy (gauge, i.e for ``lru:n`` policy the max size is ``n``)
+ * ``fetch.policy.size`` - current size of the prefetch policy (gauge, i.e for ``lru:n`` policy the max size is a number <= ``n``)
+ * before sending any new prefetch request plugin makes sure the object is not currently being prefetched (unique).
+ * ``fetch.unique.yes`` - number of unique requests (counter), for which there are no current prefetch requests for the same object (cache key is used for this check).
+ * ``fetch.unique.no`` - number of not unique request (counter), for which there is currently prefetch running for the same object (cache key is used for this check).
+ * before sending any new prefetch request plugin makes sure the object is not already cached.
+ * ``fetch.already_cached`` - number of prefetch requests not sent (cancelled) because the object was already in cache (likely no prefetch needed)
+
+The exact metric name is defined by the following plugin parameters:
+
+* ``—metrics-prefix=<sample-prefix>``
+* ``—name-space=<sample-name-space>``
+
+For instance the final ``fetch.active`` metric will be called ``<sample-prefix>.<sample-name-space>.fetch.active``
diff --git a/doc/static/images/admin/prefetch_plugin_deployment.png b/doc/static/images/admin/prefetch_plugin_deployment.png
new file mode 100644
index 0000000..4b218de
Binary files /dev/null and b/doc/static/images/admin/prefetch_plugin_deployment.png differ
diff --git a/plugins/Makefile.am b/plugins/Makefile.am
index 05773e7..487978d 100644
--- a/plugins/Makefile.am
+++ b/plugins/Makefile.am
@@ -81,6 +81,7 @@ include experimental/stream_editor/Makefile.inc
include experimental/system_stats/Makefile.inc
include experimental/tls_bridge/Makefile.inc
include experimental/url_sig/Makefile.inc
+include experimental/prefetch/Makefile.inc
if BUILD_URI_SIGNING_PLUGIN
include experimental/uri_signing/Makefile.inc
diff --git a/plugins/experimental/prefetch/Makefile.inc b/plugins/experimental/prefetch/Makefile.inc
new file mode 100644
index 0000000..566ba82
--- /dev/null
+++ b/plugins/experimental/prefetch/Makefile.inc
@@ -0,0 +1,28 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+pkglib_LTLIBRARIES += experimental/prefetch/prefetch.la
+experimental_prefetch_prefetch_la_SOURCES = \
+ experimental/prefetch/plugin.cc \
+ experimental/prefetch/common.cc \
+ experimental/prefetch/configs.cc \
+ experimental/prefetch/fetch.cc \
+ experimental/prefetch/headers.cc \
+ experimental/prefetch/pattern.cc \
+ experimental/prefetch/fetch_policy.cc \
+ experimental/prefetch/fetch_policy_simple.cc \
+ experimental/prefetch/fetch_policy_lru.cc
+
diff --git a/plugins/experimental/prefetch/README.md b/plugins/experimental/prefetch/README.md
new file mode 100644
index 0000000..86e9b6a
--- /dev/null
+++ b/plugins/experimental/prefetch/README.md
@@ -0,0 +1,8 @@
+# Description
+
+The purpose of the plugin is to increase the cache-hit ratio for a sequence of
+objects which URL paths follow a common pattern.
+
+# Documentation
+Details and examples can be found in [prefetch plugin documentation](../../doc/admin-guide/plugins/prefetch.en.rst).
+
diff --git a/plugins/experimental/prefetch/common.cc b/plugins/experimental/prefetch/common.cc
new file mode 100644
index 0000000..fa2c9cf
--- /dev/null
+++ b/plugins/experimental/prefetch/common.cc
@@ -0,0 +1,59 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+
+/**
+ * @file common.cc
+ * @brief Common declarations and definitions.
+ * @see common.h
+ */
+
+#include <string.h>
+#include <stdlib.h>
+
+#include "common.h"
+
+#ifdef PREFETCH_UNIT_TEST
+
+void
+PrintToStdErr(const char *fmt, ...)
+{
+ va_list args;
+ va_start(args, fmt);
+ vfprintf(stderr, fmt, args);
+ va_end(args);
+}
+
+#endif /* PREFETCH_UNIT_TEST */
+
+size_t
+getValue(const String &str)
+{
+ char buffer[str.length() + 1];
+ strncpy(buffer, str.c_str(), str.length());
+ buffer[str.length()] = 0;
+ return static_cast<size_t>(strtoul(buffer, nullptr, 10));
+}
+
+size_t
+getValue(const char *str, size_t len)
+{
+ char buffer[len + 1];
+ strncpy(buffer, str, len);
+ buffer[len] = 0;
+ return static_cast<size_t>(strtoul(buffer, nullptr, 10));
+}
diff --git a/plugins/experimental/prefetch/common.h b/plugins/experimental/prefetch/common.h
new file mode 100644
index 0000000..35bbab0
--- /dev/null
+++ b/plugins/experimental/prefetch/common.h
@@ -0,0 +1,68 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+
+/**
+ * @file common.h
+ * @brief Common declarations and definitions (header file).
+ */
+
+#pragma once
+
+#define PLUGIN_NAME "prefetch"
+
+#include <list>
+#include <set>
+#include <string>
+#include <vector>
+
+typedef std::string String;
+typedef std::set<std::string> StringSet;
+typedef std::list<std::string> StringList;
+typedef std::vector<std::string> StringVector;
+
+#ifdef PREFETCH_UNIT_TEST
+#include <assert.h>
+#include <stdarg.h>
+#include <stdio.h>
+
+#define PrefetchDebug(fmt, ...) PrintToStdErr("(%s) %s:%d:%s() " fmt "\n", PLUGIN_NAME, __FILE__, __LINE__, __func__, ##__VA_ARGS__)
+#define PrefetchError(fmt, ...) PrintToStdErr("(%s) %s:%d:%s() " fmt "\n", PLUGIN_NAME, __FILE__, __LINE__, __func__, ##__VA_ARGS__)
+void PrintToStdErr(const char *fmt, ...);
+#define PrefetchAssert assert
+
+#else /* PREFETCH_UNIT_TEST */
+
+#include "ts/ts.h"
+
+#define PrefetchDebug(fmt, ...) \
+ do { \
+ TSDebug(PLUGIN_NAME, "%s:%d:%s() " fmt, __FILE__, __LINE__, __func__, ##__VA_ARGS__); \
+ } while (0)
+
+#define PrefetchError(fmt, ...) \
+ do { \
+ TSError("(%s) " fmt, PLUGIN_NAME, ##__VA_ARGS__); \
+ TSDebug(PLUGIN_NAME, "%s:%d:%s() " fmt, __FILE__, __LINE__, __func__, ##__VA_ARGS__); \
+ } while (0)
+
+#define PrefetchAssert TSAssert
+
+#endif /* PREFETCH_UNIT_TEST */
+
+size_t getValue(const String &str);
+size_t getValue(const char *str, size_t len);
diff --git a/plugins/experimental/prefetch/configs.cc b/plugins/experimental/prefetch/configs.cc
new file mode 100644
index 0000000..721acd3
--- /dev/null
+++ b/plugins/experimental/prefetch/configs.cc
@@ -0,0 +1,172 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+
+/**
+ * @file configs.cc
+ * @brief Plugin configuration.
+ */
+
+#include <fstream> /* std::ifstream */
+#include <getopt.h> /* getopt_long() */
+#include <sstream> /* std::istringstream */
+#include <strings.h> /* strncasecmp() */
+
+#include "configs.h"
+
+template <typename ContainerType>
+static void
+commaSeparateString(ContainerType &c, const String &input)
+{
+ std::istringstream istr(input);
+ String token;
+
+ while (std::getline(istr, token, ',')) {
+ c.insert(c.end(), token);
+ }
+}
+
+static bool
+isTrue(const char *arg)
+{
+ return (0 == strncasecmp("true", arg, 4) || 0 == strncasecmp("1", arg, 1) || 0 == strncasecmp("yes", arg, 3));
+}
+
+/**
+ * @brief initializes plugin configuration.
+ * @param argc number of plugin parameters
+ * @param argv plugin parameters
+ */
+bool
+PrefetchConfig::init(int argc, char *argv[])
+{
+ static const struct option longopt[] = {{const_cast<char *>("front"), optional_argument, 0, 'f'},
+ {const_cast<char *>("api-header"), optional_argument, 0, 'h'},
+ {const_cast<char *>("next-header"), optional_argument, 0, 'n'},
+ {const_cast<char *>("fetch-policy"), optional_argument, 0, 'p'},
+ {const_cast<char *>("fetch-count"), optional_argument, 0, 'c'},
+ {const_cast<char *>("fetch-path-pattern"), optional_argument, 0, 'e'},
+ {const_cast<char *>("fetch-max"), optional_argument, 0, 'x'},
+ {const_cast<char *>("replace-host"), optional_argument, 0, 'r'},
+ {const_cast<char *>("name-space"), optional_argument, 0, 's'},
+ {const_cast<char *>("metrics-prefix"), optional_argument, 0, 'm'},
+ {const_cast<char *>("exact-match"), optional_argument, 0, 'y'},
+ {const_cast<char *>("log-name"), optional_argument, 0, 'l'},
+ {0, 0, 0, 0}};
+
+ bool status = true;
+ optind = 0;
+
+ /* argv contains the "to" and "from" URLs. Skip the first so that the second one poses as the program name. */
+ argc--;
+ argv++;
+
+ for (;;) {
+ int opt;
+ opt = getopt_long(argc, (char *const *)argv, "", longopt, nullptr);
+
+ if (opt == -1) {
+ break;
+ }
+
+ PrefetchDebug("processing %s", argv[optind - 1]);
+
+ switch (opt) {
+ case 'f': /* --front */
+ _front = ::isTrue(optarg);
+ break;
+
+ case 'h': /* --api-header */
+ setApiHeader(optarg);
+ break;
+
+ case 'n': /* --next-header */
+ setNextHeader(optarg);
+ break;
+
+ case 'p': /* --fetch-policy */
+ setFetchPolicy(optarg);
+ break;
+
+ case 'c': /* --fetch-count */
+ setFetchCount(optarg);
+ break;
+
+ case 'e': /* --fetch-path-pattern */ {
+ Pattern *pattern = new Pattern();
+ if (nullptr != pattern) {
+ if (pattern->init(optarg)) {
+ _nextPaths.add(pattern);
+ } else {
+ PrefetchError("failed to initialize next object pattern");
+ delete pattern;
+ }
+ }
+ } break;
+
+ case 'x': /* --fetch-max */
+ setFetchMax(optarg);
+ break;
+
+ case 'r': /* --replace-host */
+ setReplaceHost(optarg);
+ break;
+
+ case 's': /* --name-space */
+ setNameSpace(optarg);
+ break;
+
+ case 'm': /* --metrics-prefix */
+ setMetricsPrefix(optarg);
+ break;
+
+ case 'y': /* --exact-match */
+ _exactMatch = ::isTrue(optarg);
+ break;
+
+ case 'l': /* --log-name */
+ setLogName(optarg);
+ break;
+ }
+ }
+
+ status &= finalize();
+
+ return status;
+}
+
+/**
+ * @brief provides means for post-processing of the plugin parameters to finalize the configuration or to "cache" some of the
+ * decisions for later use.
+ * @return true if successful, false if failure.
+ */
+bool
+PrefetchConfig::finalize()
+{
+ PrefetchDebug("front-end: %s", (_front ? "true" : "false"));
+ PrefetchDebug("exact match: %s", (_exactMatch ? "true" : "false"));
+ PrefetchDebug("API header name: %s", _apiHeader.c_str());
+ PrefetchDebug("next object header name: %s", _nextHeader.c_str());
+ PrefetchDebug("fetch policy parameters: %s", _fetchPolicy.c_str());
+ PrefetchDebug("fetch count: %d", _fetchCount);
+ PrefetchDebug("fetch concurrently max: %d", _fetchMax);
+ PrefetchDebug("replace host name: %s", _replaceHost.c_str());
+ PrefetchDebug("name space: %s", _namespace.c_str());
+ PrefetchDebug("log name: %s", _logName.c_str());
+
+ return true;
+}
diff --git a/plugins/experimental/prefetch/configs.h b/plugins/experimental/prefetch/configs.h
new file mode 100644
index 0000000..2552c36
--- /dev/null
+++ b/plugins/experimental/prefetch/configs.h
@@ -0,0 +1,202 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+
+/**
+ * @file configs.h
+ * @brief Plugin configuration (header file).
+ */
+
+#pragma once
+
+#include <string>
+
+#include "common.h"
+#include "pattern.h"
+
+/**
+ * @brief Prefetch configuration instance
+ */
+class PrefetchConfig
+{
+public:
+ PrefetchConfig()
+ : _apiHeader("X-AppleCDN-Prefetch"),
+ _nextHeader("X-AppleCDN-Prefetch-Next"),
+ _replaceHost(),
+ _namespace("default"),
+ _metricsPrefix("prefetch.stats"),
+ _fetchCount(1),
+ _fetchMax(0),
+ _front(false),
+ _exactMatch(false)
+ {
+ }
+
+ /**
+ * @brief initializes plugin configuration.
+ * @param argc number of plugin parameters
+ * @param argv plugin parameters
+ */
+ bool init(int argc, char *argv[]);
+
+ void
+ setApiHeader(const char *optarg)
+ {
+ _apiHeader.assign(optarg);
+ }
+
+ const std::string &
+ getApiHeader() const
+ {
+ return _apiHeader;
+ }
+
+ void
+ setNextHeader(const char *optarg)
+ {
+ _nextHeader.assign(optarg);
+ }
+
+ const std::string &
+ getNextHeader() const
+ {
+ return _nextHeader;
+ }
+
+ void
+ setFetchPolicy(const char *optarg)
+ {
+ _fetchPolicy.assign(optarg);
+ }
+
+ const std::string &
+ getFetchPolicy() const
+ {
+ return _fetchPolicy;
+ }
+
+ void
+ setReplaceHost(const char *optarg)
+ {
+ _replaceHost.assign(optarg);
+ }
+
+ const std::string &
+ getReplaceHost() const
+ {
+ return _replaceHost;
+ }
+
+ bool
+ isFront() const
+ {
+ return _front;
+ }
+
+ bool
+ isExactMatch() const
+ {
+ return _exactMatch;
+ }
+
+ void
+ setFetchCount(const char *optarg)
+ {
+ _fetchCount = getValue(optarg);
+ }
+
+ unsigned
+ getFetchCount() const
+ {
+ return _fetchCount;
+ }
+
+ void
+ setFetchMax(const char *optarg)
+ {
+ _fetchMax = getValue(optarg);
+ }
+
+ unsigned
+ getFetchMax() const
+ {
+ return _fetchMax;
+ }
+
+ void
+ setNameSpace(const char *optarg)
+ {
+ _namespace.assign(optarg);
+ }
+
+ const String &
+ getNameSpace() const
+ {
+ return _namespace;
+ }
+
+ void
+ setMetricsPrefix(const char *optarg)
+ {
+ _metricsPrefix.assign(optarg);
+ }
+
+ const String &
+ getMetricsPrefix() const
+ {
+ return _metricsPrefix;
+ }
+
+ MultiPattern &
+ getNextPath()
+ {
+ return _nextPaths;
+ }
+
+ void
+ setLogName(const char *optarg)
+ {
+ _logName.assign(optarg);
+ }
+
+ const String &
+ getLogName() const
+ {
+ return _logName;
+ }
+
+ /**
+ * @brief provides means for post-processing of the plugin parameters to finalize the configuration.
+ * @return true if successful, false if failure.
+ */
+ bool finalize();
+
+private:
+ std::string _apiHeader;
+ std::string _nextHeader;
+ std::string _fetchPolicy;
+ std::string _replaceHost;
+ std::string _namespace;
+ std::string _metricsPrefix;
+ std::string _logName;
+ unsigned _fetchCount;
+ unsigned _fetchMax;
+ bool _front;
+ bool _exactMatch;
+ MultiPattern _nextPaths;
+};
diff --git a/plugins/experimental/prefetch/fetch.cc b/plugins/experimental/prefetch/fetch.cc
new file mode 100644
index 0000000..ca7daf0
--- /dev/null
+++ b/plugins/experimental/prefetch/fetch.cc
@@ -0,0 +1,739 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+
+/**
+ * @file bg_fetch.cpp
+ * @brief Background fetch related classes (header file).
+ */
+
+#include <arpa/inet.h>
+#include <sys/types.h>
+#include <netinet/in.h>
+#include <netinet/ip.h>
+#include <string.h>
+#include <sys/socket.h>
+#include <inttypes.h>
+
+#include "ts/ts.h" /* ATS API */
+#include "fetch.h"
+#include "headers.h"
+
+const char *
+getPrefetchMetricsNames(int metric)
+{
+ switch (metric) {
+ case FETCH_ACTIVE:
+ return "fetch.active";
+ break;
+ case FETCH_COMPLETED:
+ return "fetch.completed";
+ break;
+ case FETCH_ERRORS:
+ return "fetch.errors";
+ break;
+ case FETCH_TIMEOOUTS:
+ return "fetch.timeouts";
+ break;
+ case FETCH_THROTTLED:
+ return "fetch.throttled";
+ break;
+ case FETCH_ALREADY_CACHED:
+ return "fetch.already_cached";
+ break;
+ case FETCH_TOTAL:
+ return "fetch.total";
+ break;
+ case FETCH_UNIQUE_YES:
+ return "fetch.unique.yes";
+ break;
+ case FETCH_UNIQUE_NO:
+ return "fetch.unique.no";
+ break;
+ case FETCH_MATCH_YES:
+ return "fetch.match.yes";
+ break;
+ case FETCH_MATCH_NO:
+ return "fetch.match.no";
+ break;
+ case FETCH_POLICY_YES:
+ return "fetch.policy.yes";
+ break;
+ case FETCH_POLICY_NO:
+ return "fetch.policy.no";
+ break;
+ case FETCH_POLICY_SIZE:
+ return "fetch.policy.size";
+ break;
+ case FETCH_POLICY_MAXSIZE:
+ return "fetch.policy.maxsize";
+ break;
+ default:
+ return "unknown";
+ break;
+ }
+}
+
+static bool
+createStat(const String &prefix, const String &space, const char *module, const char *statName, TSRecordDataType statType,
+ int &statId)
+{
+ String name(prefix);
+ name.append(".").append(space);
+ if (nullptr != module) {
+ name.append(".").append(module);
+ }
+ name.append(".").append(statName);
+
+ if (TSStatFindName(name.c_str(), &statId) == TS_ERROR) {
+ statId = TSStatCreate(name.c_str(), TS_RECORDDATATYPE_INT, TS_STAT_NON_PERSISTENT, TS_STAT_SYNC_SUM);
+ if (statId == TS_ERROR) {
+ PrefetchError("failed to register '%s'", name.c_str());
+ return false;
+ }
+
+ TSStatIntSet(statId, 0);
+ }
+
+ PrefetchDebug("created metric '%s (id:%d)'", name.c_str(), statId);
+
+ return true;
+}
+
+BgFetchState::BgFetchState() : _policy(nullptr), _unique(nullptr), _concurrentFetches(0), _concurrentFetchesMax(0), _log(nullptr)
+{
+ _policyLock = TSMutexCreate();
+ if (nullptr == _policyLock) {
+ PrefetchError("failed to initialize lock");
+ } else {
+ PrefetchDebug("initialized lock");
+ }
+
+ _lock = TSMutexCreate();
+ if (nullptr == _lock) {
+ PrefetchError("failed to initialize lock");
+ } else {
+ PrefetchDebug("initialized lock");
+ }
+}
+
+BgFetchState::~BgFetchState()
+{
+ TSMutexLock(_policyLock);
+ delete _policy;
+ TSMutexUnlock(_policyLock);
+
+ TSMutexLock(_lock);
+ delete _unique;
+ TSMutexUnlock(_lock);
+
+ TSMutexDestroy(_policyLock);
+ TSMutexDestroy(_lock);
+
+ TSTextLogObjectFlush(_log);
+ TSTextLogObjectDestroy(_log);
+}
+
+static bool
+initializePolicy(FetchPolicy *&policy, const char *policyName)
+{
+ bool status = true;
+ if (nullptr == policy) {
+ policy = FetchPolicy::getInstance(policyName);
+ if (nullptr == policy) {
+ PrefetchError("failed to initialize the %s policy", policyName);
+ status = false;
+ }
+ } else {
+ PrefetchDebug("state already initialized");
+ }
+ return status;
+}
+
+bool
+initializeMetrics(PrefetchMetricInfo metrics[], const PrefetchConfig &config)
+{
+ bool status = true;
+ for (int i = FETCH_ACTIVE; i < FETCHES_MAX_METRICS; i++) {
+ if (-1 == metrics[i].id) {
+ status = createStat(config.getMetricsPrefix(), config.getNameSpace(), nullptr, getPrefetchMetricsNames(i), metrics[i].type,
+ metrics[i].id);
+ } else {
+ PrefetchDebug("metric %s already initialized", getPrefetchMetricsNames(i));
+ }
+ }
+ return status;
+}
+
+bool
+initializeLog(TSTextLogObject &log, const PrefetchConfig &config)
+{
+ bool status = true;
+ if (!config.getLogName().empty()) {
+ if (nullptr == log) {
+ TSReturnCode error = TSTextLogObjectCreate(config.getLogName().c_str(), TS_LOG_MODE_ADD_TIMESTAMP, &log);
+ if (error != TS_SUCCESS) {
+ PrefetchError("failed to create log file");
+ status = false;
+ } else {
+ PrefetchDebug("initialized log file '%s'", config.getLogName().c_str());
+ }
+ } else {
+ PrefetchDebug("log file '%s' already initialized", config.getLogName().c_str());
+ }
+ } else {
+ PrefetchDebug("skip creating log file");
+ }
+ return status;
+}
+
+bool
+BgFetchState::init(const PrefetchConfig &config)
+{
+ int status = true;
+
+ /* Is throttling configured, 0 - don't throttle */
+ _concurrentFetchesMax = config.getFetchMax();
+
+ /* Initialize the state */
+ TSMutexLock(_lock);
+
+ /* Initialize 'simple' policy used to avoid concurrent fetches of the same object */
+ status &= initializePolicy(_unique, "simple");
+
+ /* Initialize the fetch metrics */
+ status &= initializeMetrics(_metrics, config);
+
+ /* Initialize the "pre-fetch" log */
+ status &= initializeLog(_log, config);
+
+ TSMutexUnlock(_lock);
+
+ /* Initialize fetching policy */
+ TSMutexLock(_policyLock);
+
+ if (!config.getFetchPolicy().empty() && 0 != config.getFetchPolicy().compare("simple")) {
+ status &= initializePolicy(_policy, config.getFetchPolicy().c_str());
+ if (nullptr != _policy) {
+ setMetric(FETCH_POLICY_MAXSIZE, _policy->getMaxSize());
+ }
+ } else {
+ PrefetchDebug("Policy not specified or 'simple' policy chosen (skipping)");
+ }
+
+ TSMutexUnlock(_policyLock);
+
+ return status;
+}
+
+bool
+BgFetchState::acquire(const String &url)
+{
+ bool permitted = true;
+ if (nullptr != _policy) {
+ TSMutexLock(_policyLock);
+ permitted = _policy->acquire(url);
+ TSMutexUnlock(_policyLock);
+ }
+
+ if (permitted) {
+ incrementMetric(FETCH_POLICY_YES);
+ } else {
+ incrementMetric(FETCH_POLICY_NO);
+ }
+
+ if (nullptr != _policy) {
+ setMetric(FETCH_POLICY_SIZE, _policy->getSize());
+ }
+
+ return permitted;
+}
+
+bool
+BgFetchState::release(const String &url)
+{
+ bool ret = true;
+ if (nullptr != _policy) {
+ TSMutexLock(_policyLock);
+ ret &= _policy->release(url);
+ TSMutexUnlock(_policyLock);
+ }
+
+ if (nullptr != _policy) {
+ setMetric(FETCH_POLICY_SIZE, _policy->getSize());
+ }
+
+ return ret;
+}
+
+bool
+BgFetchState::uniqueAcquire(const String &url)
+{
+ bool permitted = true;
+ bool throttled = false;
+ size_t cachedCounter = 0;
+
+ TSMutexLock(_lock);
+ if (0 == _concurrentFetchesMax || _concurrentFetches < _concurrentFetchesMax) {
+ permitted = _unique->acquire(url);
+ if (permitted) {
+ cachedCounter = ++_concurrentFetches;
+ }
+ } else {
+ throttled = true;
+ }
+ TSMutexUnlock(_lock);
+
+ /* Update the metrics, no need to lock? */
+ if (throttled) {
+ incrementMetric(FETCH_THROTTLED);
+ }
+
+ if (permitted && !throttled) {
+ incrementMetric(FETCH_UNIQUE_YES);
+ incrementMetric(FETCH_TOTAL);
+ setMetric(FETCH_ACTIVE, cachedCounter);
+ } else {
+ incrementMetric(FETCH_UNIQUE_NO);
+ }
+
+ return permitted;
+}
+
+bool
+BgFetchState::uniqueRelease(const String &url)
+{
+ bool permitted = true;
+ ssize_t cachedCounter = 0;
+
+ TSMutexLock(_lock);
+ cachedCounter = --_concurrentFetches;
+ permitted = _unique->release(url);
+ TSMutexUnlock(_lock);
+
+ TSAssert(cachedCounter < 0);
+
+ /* Update the metrics, no need to lock? */
+ if (permitted) {
+ setMetric(FETCH_ACTIVE, cachedCounter);
+ }
+ return permitted;
+}
+
+void
+BgFetchState::incrementMetric(PrefetchMetric m)
+{
+ if (-1 != _metrics[m].id) {
+ TSStatIntIncrement(_metrics[m].id, 1);
+ }
+}
+
+void
+BgFetchState::setMetric(PrefetchMetric m, size_t value)
+{
+ if (-1 != _metrics[m].id) {
+ TSStatIntSet(_metrics[m].id, value);
+ }
+}
+
+inline TSTextLogObject
+BgFetchState::getLog()
+{
+ return _log;
+}
+BgFetchStates *BgFetchStates::_prefetchStates = nullptr;
+
+BgFetch::BgFetch(BgFetchState *state, const PrefetchConfig &config, bool lock)
+ : _headerLoc(TS_NULL_MLOC),
+ _urlLoc(TS_NULL_MLOC),
+ vc(nullptr),
+ req_io_buf(nullptr),
+ resp_io_buf(nullptr),
+ req_io_buf_reader(nullptr),
+ resp_io_buf_reader(nullptr),
+ r_vio(nullptr),
+ w_vio(nullptr),
+ _bytes(0),
+ _cont(nullptr),
+ _state(state),
+ _config(config),
+ _askPermission(lock),
+ _startTime(0)
+{
+ _mbuf = TSMBufferCreate();
+ memset(&client_ip, 0, sizeof(client_ip));
+}
+
+BgFetch::~BgFetch()
+{
+ TSHandleMLocRelease(_mbuf, TS_NULL_MLOC, _headerLoc);
+ TSHandleMLocRelease(_mbuf, TS_NULL_MLOC, _urlLoc);
+
+ TSMBufferDestroy(_mbuf);
+
+ if (vc) {
+ PrefetchError("Destroyed BgFetch while VC was alive");
+ TSVConnClose(vc);
+ vc = nullptr;
+ }
+
+ if (nullptr != _cont) {
+ if (_askPermission) {
+ _state->release(_cachekey);
+ _state->uniqueRelease(_cachekey);
+ }
+
+ TSContDestroy(_cont);
+ _cont = nullptr;
+
+ TSIOBufferReaderFree(req_io_buf_reader);
+ TSIOBufferDestroy(req_io_buf);
+ TSIOBufferReaderFree(resp_io_buf_reader);
+ TSIOBufferDestroy(resp_io_buf);
+ }
+}
+
+bool
+BgFetch::schedule(BgFetchState *state, const PrefetchConfig &config, bool askPermission, TSMBuffer requestBuffer,
+ TSMLoc requestHeaderLoc, TSHttpTxn txnp, const char *path, size_t pathLen, const String &cachekey)
+{
+ bool ret = false;
+ BgFetch *fetch = new BgFetch(state, config, askPermission);
+ if (fetch->init(requestBuffer, requestHeaderLoc, txnp, path, pathLen, cachekey)) {
+ fetch->schedule();
+ ret = true;
+ } else {
+ delete fetch;
+ }
+ return ret;
+}
+
+bool
+BgFetch::saveIp(TSHttpTxn txnp)
+{
+ struct sockaddr const *ip = TSHttpTxnClientAddrGet(txnp);
+ if (ip) {
+ if (ip->sa_family == AF_INET) {
+ memcpy(&client_ip, ip, sizeof(sockaddr_in));
+ } else if (ip->sa_family == AF_INET6) {
+ memcpy(&client_ip, ip, sizeof(sockaddr_in6));
+ } else {
+ PrefetchError("unknown address family %d", ip->sa_family);
+ }
+ } else {
+ PrefetchError("failed to get client host info");
+ return false;
+ }
+ return true;
+}
+
+inline void
+BgFetch::addBytes(int64_t b)
+{
+ _bytes += b;
+}
+/**
+ * Initialize the background fetch
+ */
+bool
+BgFetch::init(TSMBuffer reqBuffer, TSMLoc reqHdrLoc, TSHttpTxn txnp, const char *fetchPath, size_t fetchPathLen,
+ const String &cachekey)
+{
+ TSAssert(TS_NULL_MLOC == _headerLoc);
+ TSAssert(TS_NULL_MLOC == _urlLoc);
+
+ if (_askPermission) {
+ if (!_state->acquire(cachekey)) {
+ PrefetchDebug("request is not fetchable");
+ return false;
+ }
+
+ if (!_state->uniqueAcquire(cachekey)) {
+ PrefetchDebug("already fetching the object");
+ _state->release(cachekey);
+ return false;
+ }
+ }
+
+ _cachekey.assign(cachekey);
+
+ /* Save the IP info */
+ if (!saveIp(txnp)) {
+ return false;
+ }
+
+ /* Create HTTP header */
+ _headerLoc = TSHttpHdrCreate(_mbuf);
+
+ /* Copy the headers to the new marshal buffer */
+ if (TS_SUCCESS != TSHttpHdrCopy(_mbuf, _headerLoc, reqBuffer, reqHdrLoc)) {
+ PrefetchError("header copy failed");
+ }
+
+ /* Copy the pristine request URL into fetch marshal buffer */
+ TSMLoc pristineUrlLoc;
+ if (TS_SUCCESS == TSHttpTxnPristineUrlGet(txnp, &reqBuffer, &pristineUrlLoc)) {
+ if (TS_SUCCESS != TSUrlClone(_mbuf, reqBuffer, pristineUrlLoc, &_urlLoc)) {
+ PrefetchError("failed to clone URL");
+ TSHandleMLocRelease(reqBuffer, TS_NULL_MLOC, pristineUrlLoc);
+ return false;
+ }
+ TSHandleMLocRelease(reqBuffer, TS_NULL_MLOC, pristineUrlLoc);
+ } else {
+ PrefetchError("failed to get pristine URL");
+ return false;
+ }
+
+ /* Save the path before changing */
+ int pathLen;
+ const char *path = TSUrlPathGet(_mbuf, _urlLoc, &pathLen);
+ if (nullptr == path) {
+ PrefetchError("failed to get a URL path");
+ return false;
+ }
+
+ /* Now set or remove the prefetch API header */
+ const String &header = _config.getApiHeader();
+ if (_config.isFront()) {
+ if (setHeader(_mbuf, _headerLoc, header.c_str(), (int)header.length(), path, pathLen)) {
+ PrefetchDebug("set header '%.*s: %.*s'", (int)header.length(), header.c_str(), (int)fetchPathLen, fetchPath);
+ }
+ } else {
+ if (removeHeader(_mbuf, _headerLoc, header.c_str(), header.length())) {
+ PrefetchDebug("remove header '%.*s'", (int)header.length(), header.c_str());
+ }
+ }
+
+ /* Make sure we remove the RANGE header to avoid 416 "Request Range Not Satisfiable" response when
+ * the current request is a RANGE request and its range turns out invalid for the "next" object */
+ if (removeHeader(_mbuf, _headerLoc, TS_MIME_FIELD_RANGE, TS_MIME_LEN_RANGE)) {
+ PrefetchDebug("remove header '%.*s'", TS_MIME_LEN_RANGE, TS_MIME_FIELD_RANGE);
+ }
+
+ /* Overwrite the path if required */
+ if (nullptr != fetchPath && 0 != fetchPathLen) {
+ if (TS_SUCCESS == TSUrlPathSet(_mbuf, _urlLoc, fetchPath, fetchPathLen)) {
+ PrefetchDebug("setting URL path to %.*s", (int)fetchPathLen, fetchPath);
+ } else {
+ PrefetchError("failed to set a URL path %.*s", (int)fetchPathLen, fetchPath);
+ }
+ }
+
+ /* Come up with the host name to be used in the fetch request */
+ const char *hostName = nullptr;
+ int hostNameLen = 0;
+ if (_config.getReplaceHost().empty()) {
+ hostName = TSUrlHostGet(_mbuf, _urlLoc, &hostNameLen);
+ } else {
+ hostName = _config.getReplaceHost().c_str();
+ hostNameLen = _config.getReplaceHost().length();
+ }
+
+ /* Set the URI host */
+ if (TS_SUCCESS == TSUrlHostSet(_mbuf, _urlLoc, hostName, hostNameLen)) {
+ PrefetchDebug("setting URL host: %.*s", hostNameLen, hostName);
+ } else {
+ PrefetchError("failed to set URL host: %.*s", hostNameLen, hostName);
+ }
+
+ /* Set the host header */
+ if (setHeader(_mbuf, _headerLoc, TS_MIME_FIELD_HOST, TS_MIME_LEN_HOST, hostName, hostNameLen)) {
+ PrefetchDebug("setting Host header: %.*s", hostNameLen, hostName);
+ } else {
+ PrefetchError("failed to set Host header: %.*s", hostNameLen, hostName);
+ }
+
+ /* Save the URL to be fetched with this fetch for debugging purposes, expensive TSUrlStringGet()
+ * but really helpful when debugging multi-remap / host-replacement use cases */
+ int urlLen = 0;
+ char *url = TSUrlStringGet(_mbuf, _urlLoc, &urlLen);
+ if (nullptr != url) {
+ _url.assign(url, urlLen);
+ TSfree(static_cast<void *>(url));
+ }
+
+ /* TODO: TBD is this the right place? */
+ if (TS_SUCCESS != TSHttpHdrUrlSet(_mbuf, _headerLoc, _urlLoc)) {
+ return false;
+ }
+
+ /* Initialization is success */
+ return true;
+}
+
+/**
+ * @brief Create, setup and schedule the background fetch continuation.
+ */
+void
+BgFetch::schedule()
+{
+ TSAssert(nullptr == _cont);
+
+ /* Setup the continuation */
+ _cont = TSContCreate(handler, TSMutexCreate());
+ TSContDataSet(_cont, static_cast<void *>(this));
+
+ /* Initialize the VIO (for the fetch) */
+ req_io_buf = TSIOBufferCreate();
+ req_io_buf_reader = TSIOBufferReaderAlloc(req_io_buf);
+ resp_io_buf = TSIOBufferCreate();
+ resp_io_buf_reader = TSIOBufferReaderAlloc(resp_io_buf);
+
+ /* Schedule */
+ PrefetchDebug("schedule fetch: %s", _url.c_str());
+ _startTime = TShrtime();
+ TSContSchedule(_cont, 0, TS_THREAD_POOL_NET);
+}
+
+/* Log format is: name-space bytes status url */
+void
+BgFetch::logAndMetricUpdate(TSEvent event) const
+{
+ const char *status;
+
+ switch (event) {
+ case TS_EVENT_VCONN_EOS:
+ status = "EOS";
+ _state->incrementMetric(FETCH_COMPLETED);
+ break;
+ case TS_EVENT_VCONN_INACTIVITY_TIMEOUT:
+ status = "TIMEOUT";
+ _state->incrementMetric(FETCH_TIMEOOUTS);
+ break;
+ case TS_EVENT_ERROR:
+ _state->incrementMetric(FETCH_ERRORS);
+ status = "ERROR";
+ break;
+ case TS_EVENT_VCONN_READ_COMPLETE:
+ _state->incrementMetric(FETCH_COMPLETED);
+ status = "READ_COMP";
+ break;
+ default:
+ status = "UNKNOWN";
+ break;
+ }
+
+ if (TSIsDebugTagSet(PLUGIN_NAME "_log")) {
+ TSHRTime now = TShrtime();
+ double elapsed = (double)(now - _startTime) / 1000000.0;
+
+ PrefetchDebug("ns=%s bytes=%" PRId64 " time=%1.3lf status=%s url=%s key=%s", _config.getNameSpace().c_str(), _bytes, elapsed,
+ status, _url.c_str(), _cachekey.c_str());
+ if (_state->getLog()) {
+ TSTextLogObjectWrite(_state->getLog(), "ns=%s bytes=%" PRId64 " time=%1.3lf status=%s url=%s key=%s",
+ _config.getNameSpace().c_str(), _bytes, elapsed, status, _url.c_str(), _cachekey.c_str());
+ }
+ }
+}
+
+/**
+ * @brief Continuation to perform a background fill of a URL.
+ *
+ * This is pretty expensive (memory allocations etc.)
+ */
+int
+BgFetch::handler(TSCont contp, TSEvent event, void * /* edata ATS_UNUSED */)
+{
+ BgFetch *fetch = static_cast<BgFetch *>(TSContDataGet(contp));
+ int64_t avail;
+
+ PrefetchDebug("event: %s (%d)", TSHttpEventNameLookup(event), event);
+
+ switch (event) {
+ case TS_EVENT_IMMEDIATE:
+ case TS_EVENT_TIMEOUT:
+ // Debug info for this particular bg fetch (put all debug in here please)
+ if (TSIsDebugTagSet(PLUGIN_NAME)) {
+ char buf[INET6_ADDRSTRLEN];
+ const sockaddr *sockaddress = (const sockaddr *)&fetch->client_ip;
+
+ switch (sockaddress->sa_family) {
+ case AF_INET:
+ inet_ntop(AF_INET, &(((struct sockaddr_in *)sockaddress)->sin_addr), buf, INET_ADDRSTRLEN);
+ PrefetchDebug("client IPv4 = %s", buf);
+ break;
+ case AF_INET6:
+ inet_ntop(AF_INET6, &(((struct sockaddr_in6 *)sockaddress)->sin6_addr), buf, INET6_ADDRSTRLEN);
+ PrefetchDebug("client IPv6 = %s", buf);
+ break;
+ default:
+ TSError("[%s] Unknown address family %d", PLUGIN_NAME, sockaddress->sa_family);
+ break;
+ }
+ PrefetchDebug("Starting background fetch.");
+ dumpHeaders(fetch->_mbuf, fetch->_headerLoc);
+ }
+
+ // Setup the NetVC for background fetch
+ TSAssert(nullptr == fetch->vc);
+ if ((fetch->vc = TSHttpConnect((sockaddr *)&fetch->client_ip)) != nullptr) {
+ TSHttpHdrPrint(fetch->_mbuf, fetch->_headerLoc, fetch->req_io_buf);
+ // We never send a body with the request. ToDo: Do we ever need to support that ?
+ TSIOBufferWrite(fetch->req_io_buf, "\r\n", 2);
+
+ fetch->r_vio = TSVConnRead(fetch->vc, contp, fetch->resp_io_buf, INT64_MAX);
+ fetch->w_vio = TSVConnWrite(fetch->vc, contp, fetch->req_io_buf_reader, TSIOBufferReaderAvail(fetch->req_io_buf_reader));
+ } else {
+ delete fetch;
+ PrefetchError("Failed to connect to internal process, major malfunction");
+ }
+ break;
+
+ case TS_EVENT_VCONN_WRITE_COMPLETE:
+ // TSVConnShutdown(data->vc, 0, 1);
+ // TSVIOReenable(data->w_vio);
+ PrefetchDebug("write complete");
+ break;
+
+ case TS_EVENT_VCONN_READ_READY:
+ avail = TSIOBufferReaderAvail(fetch->resp_io_buf_reader);
+ fetch->addBytes(avail);
+ TSIOBufferReaderConsume(fetch->resp_io_buf_reader, avail);
+ TSVIONDoneSet(fetch->r_vio, TSVIONDoneGet(fetch->r_vio) + avail);
+ TSVIOReenable(fetch->r_vio);
+ break;
+
+ case TS_EVENT_VCONN_READ_COMPLETE:
+ case TS_EVENT_VCONN_EOS:
+ case TS_EVENT_VCONN_INACTIVITY_TIMEOUT:
+ case TS_EVENT_ERROR:
+ if (event == TS_EVENT_VCONN_INACTIVITY_TIMEOUT) {
+ PrefetchDebug("encountered Inactivity Timeout");
+ TSVConnAbort(fetch->vc, TS_VC_CLOSE_ABORT);
+ } else {
+ TSVConnClose(fetch->vc);
+ }
+
+ PrefetchDebug("closing background transaction");
+ avail = TSIOBufferReaderAvail(fetch->resp_io_buf_reader);
+ fetch->addBytes(avail);
+ TSIOBufferReaderConsume(fetch->resp_io_buf_reader, avail);
+ TSVIONDoneSet(fetch->r_vio, TSVIONDoneGet(fetch->r_vio) + avail);
+ fetch->logAndMetricUpdate(event);
+
+ /* Close, release and cleanup */
+ fetch->vc = nullptr;
+ delete fetch;
+ break;
+
+ default:
+ PrefetchDebug("unhandled event");
+ break;
+ }
+
+ return 0;
+}
diff --git a/plugins/experimental/prefetch/fetch.h b/plugins/experimental/prefetch/fetch.h
new file mode 100644
index 0000000..4d73a7b
--- /dev/null
+++ b/plugins/experimental/prefetch/fetch.h
@@ -0,0 +1,202 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+
+/**
+ * @file bg_fetch.h
+ * @brief Background fetch related classes (header file).
+ */
+
+#pragma once
+
+#include <list>
+#include <map>
+#include <unordered_map>
+#include <vector>
+
+#include "ts/ts.h"
+#include "ts/experimental.h"
+#include "common.h"
+#include "configs.h"
+#include "fetch_policy.h"
+
+enum PrefetchMetric {
+ FETCH_ACTIVE = 0,
+ FETCH_COMPLETED,
+ FETCH_ERRORS,
+ FETCH_TIMEOOUTS,
+ FETCH_THROTTLED,
+ FETCH_ALREADY_CACHED, /*metric if for counting how many times fetch was not scheduled because of cache-hit */
+ FETCH_TOTAL,
+ FETCH_UNIQUE_YES,
+ FETCH_UNIQUE_NO,
+ FETCH_MATCH_YES, /* metric id for URL path pattern match successes */
+ FETCH_MATCH_NO, /* metric id for URL path pattern match failures */
+ FETCH_POLICY_YES, /* metric id for counting fetch policy successes */
+ FETCH_POLICY_NO, /* metric id for counting fetch policy failures */
+ FETCH_POLICY_SIZE,
+ FETCH_POLICY_MAXSIZE,
+ FETCHES_MAX_METRICS,
+};
+
+struct PrefetchMetricInfo {
+ PrefetchMetric index;
+ TSRecordDataType type;
+ int id;
+};
+
+/**
+ * @brief to store background fetch state, metrics, logs etc (shared between all scheduled fetches).
+ *
+ * @todo: reconsider the locks (tried to be granular but it feels too crowded, remove unnecessary locks)
+ */
+class BgFetchState
+{
+public:
+ BgFetchState();
+ virtual ~BgFetchState();
+ bool init(const PrefetchConfig &config);
+
+ /* Fetch policy */
+ bool acquire(const String &url);
+ bool release(const String &url);
+
+ /* De-duplication of requests */
+ bool uniqueAcquire(const String &url);
+ bool uniqueRelease(const String &url);
+
+ /* Metrics and logs */
+ void incrementMetric(PrefetchMetric m);
+ void setMetric(PrefetchMetric m, size_t value);
+ TSTextLogObject getLog();
+
+private:
+ BgFetchState(BgFetchState const &); /* never implement */
+ void operator=(BgFetchState const &); /* never implement */
+
+ /* Fetch policy related */
+ FetchPolicy *_policy; /* fetch policy */
+ TSMutex _policyLock; /* protects the policy object only */
+
+ /* Mechanisms to avoid concurrent fetches and applying limits */
+ FetchPolicy *_unique; /* make sure we never download same object multiple times at the same time */
+ TSMutex _lock; /* protects the deduplication object only */
+ size_t _concurrentFetches;
+ size_t _concurrentFetchesMax;
+ PrefetchMetricInfo _metrics[FETCHES_MAX_METRICS] = {
+ {FETCH_ACTIVE, TS_RECORDDATATYPE_INT, -1}, {FETCH_COMPLETED, TS_RECORDDATATYPE_COUNTER, -1},
+ {FETCH_ERRORS, TS_RECORDDATATYPE_COUNTER, -1}, {FETCH_TIMEOOUTS, TS_RECORDDATATYPE_COUNTER, -1},
+ {FETCH_THROTTLED, TS_RECORDDATATYPE_COUNTER, -1}, {FETCH_ALREADY_CACHED, TS_RECORDDATATYPE_COUNTER, -1},
+ {FETCH_TOTAL, TS_RECORDDATATYPE_COUNTER, -1}, {FETCH_UNIQUE_YES, TS_RECORDDATATYPE_COUNTER, -1},
+ {FETCH_UNIQUE_NO, TS_RECORDDATATYPE_COUNTER, -1}, {FETCH_MATCH_YES, TS_RECORDDATATYPE_COUNTER, -1},
+ {FETCH_MATCH_NO, TS_RECORDDATATYPE_COUNTER, -1}, {FETCH_POLICY_YES, TS_RECORDDATATYPE_COUNTER, -1},
+ {FETCH_POLICY_NO, TS_RECORDDATATYPE_COUNTER, -1}, {FETCH_POLICY_SIZE, TS_RECORDDATATYPE_INT, -1},
+ {FETCH_POLICY_MAXSIZE, TS_RECORDDATATYPE_INT, -1}};
+
+ /* plugin specific fetch logging */
+ TSTextLogObject _log;
+};
+
+/**
+ * @brief Contains all background states to be shared between different plugin instances (grouped in namespaces)
+ */
+class BgFetchStates
+{
+public:
+ /* Initialize on first use */
+ static BgFetchStates *
+ get()
+ {
+ if (nullptr == _prefetchStates) {
+ _prefetchStates = new BgFetchStates();
+ }
+ return _prefetchStates;
+ }
+
+ BgFetchState *
+ getStateByName(const String &space)
+ {
+ BgFetchState *state;
+ std::map<String, BgFetchState *>::iterator it;
+
+ TSMutexLock(_prefetchStates->_lock);
+ it = _prefetchStates->_states.find(space);
+ if (it != _prefetchStates->_states.end()) {
+ state = it->second;
+ } else {
+ state = new BgFetchState();
+ _prefetchStates->_states[space] = state;
+ }
+ TSMutexUnlock(_prefetchStates->_lock);
+ return state;
+ }
+
+private:
+ BgFetchStates() : _lock(TSMutexCreate()) {}
+ ~BgFetchStates() { TSMutexDestroy(_lock); }
+ static BgFetchStates *_prefetchStates;
+
+ std::map<String, BgFetchState *> _states; /* stores pointers to states per namespace */
+ TSMutex _lock;
+};
+
+/**
+ * @brief Represents a single background fetch.
+ */
+class BgFetch
+{
+public:
+ static bool schedule(BgFetchState *state, const PrefetchConfig &config, bool askPermission, TSMBuffer requestBuffer,
+ TSMLoc requestHeaderLoc, TSHttpTxn txnp, const char *path, size_t pathLen, const String &cachekey);
+
+private:
+ BgFetch(BgFetchState *state, const PrefetchConfig &config, bool lock);
+ ~BgFetch();
+ bool init(TSMBuffer requestBuffer, TSMLoc requestHeaderLoc, TSHttpTxn txnp, const char *fetchPath, size_t fetchPathLen,
+ const String &cacheKey);
+ void schedule();
+ static int handler(TSCont contp, TSEvent event, void * /* edata ATS_UNUSED */);
+ bool saveIp(TSHttpTxn txnp);
+ void addBytes(int64_t b);
+ void logAndMetricUpdate(TSEvent event) const;
+
+ /* Request related */
+ TSMBuffer _mbuf;
+ TSMLoc _headerLoc;
+ TSMLoc _urlLoc;
+ struct sockaddr_storage client_ip;
+
+ /* This is for the actual background fetch / NetVC */
+ TSVConn vc;
+ TSIOBuffer req_io_buf, resp_io_buf;
+ TSIOBufferReader req_io_buf_reader, resp_io_buf_reader;
+ TSVIO r_vio, w_vio;
+ int64_t _bytes;
+
+ /* Background fetch continuation */
+ TSCont _cont;
+
+ /* Pointers and cache */
+ String _cachekey; /* saving the cache key for later use */
+ String _url; /* saving the URL for later use */
+ BgFetchState *_state; /* pointer for access to the plugin state */
+ const PrefetchConfig &_config; /* reference for access to the configuration */
+
+ bool _askPermission; /* true - check with the fetch policies if we should schedule the fetch */
+
+ TSHRTime _startTime; /* for calculation of downloadTime for this fetch */
+};
diff --git a/plugins/experimental/prefetch/fetch_policy.cc b/plugins/experimental/prefetch/fetch_policy.cc
new file mode 100644
index 0000000..083257f
--- /dev/null
+++ b/plugins/experimental/prefetch/fetch_policy.cc
@@ -0,0 +1,57 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+
+/**
+ * @file fetch_policy.cc
+ * @brief Fetch policy interface.
+ */
+
+#include "fetch_policy.h"
+
+#include <string.h>
+
+#include "common.h"
+#include "fetch_policy_lru.h"
+#include "fetch_policy_simple.h"
+
+FetchPolicy *
+FetchPolicy::getInstance(const char *parameters)
+{
+ const char *name = parameters;
+ const char *delim = strchr(parameters, ':');
+ size_t len = (nullptr == delim ? strlen(name) : delim - name);
+ const char *params = (nullptr == delim ? nullptr : delim + 1);
+
+ PrefetchDebug("getting '%.*s' policy instance, params: %s", (int)len, name, params);
+ FetchPolicy *p = nullptr;
+ if (6 == len && 0 == strncmp(name, "simple", 6)) {
+ p = new FetchPolicySimple();
+ } else if (3 == len && 0 == strncmp(name, "lru", 3)) {
+ p = new FetchPolicyLru();
+ } else {
+ PrefetchError("unrecognized fetch policy type: %.*s", (int)len, name);
+ return nullptr;
+ }
+
+ if (p->init(params)) {
+ return p;
+ }
+ delete p;
+
+ return nullptr;
+}
diff --git a/plugins/experimental/prefetch/fetch_policy.h b/plugins/experimental/prefetch/fetch_policy.h
new file mode 100644
index 0000000..8594759
--- /dev/null
+++ b/plugins/experimental/prefetch/fetch_policy.h
@@ -0,0 +1,66 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+
+/**
+ * @file fetch_policy.h
+ * @brief Fetch policy interface (header file).
+ */
+
+#pragma once
+
+#include <list>
+#include <openssl/sha.h>
+#include <string.h>
+#include <string>
+#include <unordered_map>
+
+#include "common.h"
+
+class FetchPolicy;
+class SimplePolicy;
+class Prescaler;
+
+/**
+ * @brief Fetch policy interface.
+ */
+class FetchPolicy
+{
+public:
+ static FetchPolicy *getInstance(const char *name);
+ virtual ~FetchPolicy(){};
+
+ virtual bool init(const char *parameters) = 0;
+ virtual bool acquire(const std::string &url) = 0;
+ virtual bool release(const std::string &url) = 0;
+ virtual const char *name() = 0;
+ virtual size_t getSize() = 0;
+ virtual size_t getMaxSize() = 0;
+
+private:
+ FetchPolicy(const FetchPolicy &);
+ FetchPolicy &operator=(const FetchPolicy &);
+
+protected:
+ FetchPolicy(){};
+ void
+ log(const char *msg, const String &url, bool ret)
+ {
+ PrefetchDebug("%s::%s('%.*s%s'): %s", name(), msg, (int)(url.length() > 100 ? 100 : url.length()), url.c_str(),
+ url.length() > 100 ? "..." : "", ret ? "true" : "false");
+ }
+};
diff --git a/plugins/experimental/prefetch/fetch_policy_lru.cc b/plugins/experimental/prefetch/fetch_policy_lru.cc
new file mode 100644
index 0000000..c0b0902
--- /dev/null
+++ b/plugins/experimental/prefetch/fetch_policy_lru.cc
@@ -0,0 +1,141 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+
+/**
+ * @file fetch_policy_lru.cc
+ * @brief LRU fetch policy.
+ */
+
+#include "fetch_policy_lru.h"
+#include "common.h"
+
+inline const char *
+FetchPolicyLru::name()
+{
+ return "lru";
+}
+
+bool
+FetchPolicyLru::init(const char *parameters)
+{
+ if (nullptr == parameters) {
+ /* Leave defaults */
+ } else {
+ size_t size = 0;
+
+ /* look for buckets first */
+ const char *sizeStr = parameters;
+ const char *delim = strchr(parameters, ',');
+
+ if (nullptr == delim) {
+ /* no divider specified, set the buckets */
+ size = getValue(sizeStr, strlen(sizeStr));
+ } else {
+ /* set the buckets */
+ size = getValue(sizeStr, delim - sizeStr);
+ }
+
+ /* Defaults are considered minimum */
+ static const char *defaultStr = " (default)";
+ bool useDefault = false;
+
+ /* Make sure size is not larger than what std::list is physically able to hold */
+ LruList::size_type realMax = _list.max_size();
+ if (size > realMax) {
+ PrefetchDebug("size: %lu is not feasible, cutting to %lu", size, realMax);
+ size = realMax;
+ }
+ /* Guarantee minimum value */
+ if (size > _maxSize) {
+ _maxSize = size;
+ } else {
+ useDefault = true;
+ PrefetchError("size: %lu is not a good value", size);
+ };
+
+ PrefetchDebug("initialized %s fetch policy: size: %lu%s", name(), _maxSize, (useDefault ? defaultStr : ""));
+ }
+
+ return true;
+}
+
+inline size_t
+FetchPolicyLru::getMaxSize()
+{
+ return _maxSize;
+}
+
+inline size_t
+FetchPolicyLru::getSize()
+{
+ return _size;
+}
+
+bool
+FetchPolicyLru::acquire(const std::string &url)
+{
+ bool ret = false;
+
+ LruHash hash;
+ hash.init(url.c_str(), url.length());
+
+ LruMapIterator it = _map.find(&hash);
+
+ if (_map.end() != it) {
+ PrefetchDebug("recently used LRU entry, moving to front");
+
+ /* We have an entry in the LRU */
+ PrefetchAssert(_list.size() > 0);
+
+ /* Move to the front of the list */
+ _list.splice(_list.begin(), _list, it->second);
+
+ /* Don't trigger fetch if the url is found amongst the most recently used ones */
+ ret = false;
+ } else {
+ /* New LRU entry */
+ if (_size >= _maxSize) {
+ /* Move the last (least recently used) element to the front and remove it from the hash table. */
+ _list.splice(_list.begin(), _list, --_list.end());
+ _map.erase(&(*_list.begin()));
+ PrefetchDebug("reused the least recently used LRU entry");
+ } else {
+ /* With this implementation we are never removing LRU elements from the list but just updating the front element of the list
+ * so the following addition should happen at most FetchPolicyLru::_maxSize number of times */
+ _list.push_front(NULL_LRU_ENTRY);
+ _size++;
+ PrefetchDebug("created a new LRU entry, size=%d", (int)_size);
+ }
+ /* Update the "new" or the most recently used LRU entry and add it to the hash */
+ *_list.begin() = hash;
+ _map[&(*_list.begin())] = _list.begin();
+
+ /* Trigger fetch since the URL is not amongst the most recently used ones */
+ ret = true;
+ }
+
+ log("acquire", url, ret);
+ return ret;
+}
+
+bool
+FetchPolicyLru::release(const std::string &url)
+{
+ log("release", url, true);
+ return true;
+}
diff --git a/plugins/experimental/prefetch/fetch_policy_lru.h b/plugins/experimental/prefetch/fetch_policy_lru.h
new file mode 100644
index 0000000..2699647
--- /dev/null
+++ b/plugins/experimental/prefetch/fetch_policy_lru.h
@@ -0,0 +1,105 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+
+/**
+ * @file fetch_policy_lru.h
+ * @brief LRU fetch policy (header file).
+ */
+
+#pragma once
+
+#include "fetch_policy.h"
+
+/* Here reusing some of the classes used in cache_promote plugin.
+ * @todo: this was done in interest of time, see if LRU is what we really need, can we do it differently / better? */
+class LruHash
+{
+ friend struct LruHashHasher;
+
+public:
+ LruHash() {}
+ ~LruHash() {}
+ LruHash &
+ operator=(const LruHash &h)
+ {
+ memcpy(_hash, h._hash, sizeof(_hash));
+ return *this;
+ }
+
+ void
+ init(const char *data, int len)
+ {
+ SHA_CTX sha;
+
+ SHA1_Init(&sha);
+ SHA1_Update(&sha, data, len);
+ SHA1_Final(_hash, &sha);
+ }
+
+private:
+ u_char _hash[SHA_DIGEST_LENGTH];
+};
+
+struct LruHashHasher {
+ bool
+ operator()(const LruHash *s1, const LruHash *s2) const
+ {
+ return 0 == memcmp(s1->_hash, s2->_hash, sizeof(s2->_hash));
+ }
+
+ size_t
+ operator()(const LruHash *s) const
+ {
+ return *((size_t *)s->_hash) ^ *((size_t *)(s->_hash + 9));
+ }
+};
+
+typedef LruHash LruEntry;
+typedef std::list<LruEntry> LruList;
+typedef std::unordered_map<const LruHash *, LruList::iterator, LruHashHasher, LruHashHasher> LruMap;
+typedef LruMap::iterator LruMapIterator;
+
+static LruEntry NULL_LRU_ENTRY; // Used to create an "empty" new LRUEntry
+
+/**
+ * @brief Fetch policy that allows fetches only for not-"hot" objects.
+ *
+ * Trying to identify "hot" object by keeping track of most recently used objects and
+ * allows fetches only when a URL is not found in the most recently used set.
+ */
+class FetchPolicyLru : public FetchPolicy
+{
+public:
+ /* Default size values are also considered minimum. TODO: find out if this works OK. */
+ FetchPolicyLru() : _maxSize(10), _size(0){};
+ virtual ~FetchPolicyLru(){};
+
+ /* Fetch policy interface methods */
+ bool init(const char *parameters);
+ bool acquire(const std::string &url);
+ bool release(const std::string &url);
+ const char *name();
+ size_t getMaxSize();
+ size_t getSize();
+
+protected:
+ LruMap _map;
+ LruList _list;
+ LruList::size_type _maxSize;
+ LruList::size_type _size;
+};
diff --git a/plugins/experimental/prefetch/fetch_policy_simple.cc b/plugins/experimental/prefetch/fetch_policy_simple.cc
new file mode 100644
index 0000000..aefd07f
--- /dev/null
+++ b/plugins/experimental/prefetch/fetch_policy_simple.cc
@@ -0,0 +1,80 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+
+/**
+ * @file fetch_policy_simple.cc
+ * @brief Simple fetch policy.
+ */
+
+#include "fetch_policy_simple.h"
+
+bool
+FetchPolicySimple::init(const char *parameters)
+{
+ PrefetchDebug("initialized %s fetch policy", name());
+ return true;
+}
+
+bool
+FetchPolicySimple::acquire(const std::string &url)
+{
+ bool ret;
+ if (_urls.end() == _urls.find(url)) {
+ _urls[url] = true;
+ ret = true;
+ } else {
+ ret = false;
+ }
+
+ log("acquire", url, ret);
+ return ret;
+}
+
+bool
+FetchPolicySimple::release(const std::string &url)
+{
+ bool ret;
+ if (_urls.end() == _urls.find(url)) {
+ ret = false;
+ } else {
+ _urls.erase(url);
+ ret = true;
+ }
+
+ log("release", url, ret);
+ return ret;
+}
+
+inline const char *
+FetchPolicySimple::name()
+{
+ return "simple";
+}
+
+inline size_t
+FetchPolicySimple::getSize()
+{
+ return _urls.size();
+}
+
+inline size_t
+FetchPolicySimple::getMaxSize()
+{
+ /* Unlimited */
+ return 0;
+}
diff --git a/plugins/experimental/prefetch/fetch_policy_simple.h b/plugins/experimental/prefetch/fetch_policy_simple.h
new file mode 100644
index 0000000..be04d86
--- /dev/null
+++ b/plugins/experimental/prefetch/fetch_policy_simple.h
@@ -0,0 +1,46 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+
+/**
+ * @file fetch_policy_simple.h
+ * @brief Simple fetch policy (header file).
+ */
+
+#pragma once
+
+#include "fetch_policy.h"
+
+/**
+ * @brief Simple de-duplication fetch policy, used to make sure only one background fetch is running at a time.
+ */
+
+class FetchPolicySimple : public FetchPolicy
+{
+public:
+ FetchPolicySimple() {}
+ virtual ~FetchPolicySimple(){};
+ bool init(const char *parameters);
+ bool acquire(const std::string &url);
+ bool release(const std::string &url);
+ const char *name();
+ size_t getSize();
+ size_t getMaxSize();
+
+private:
+ std::unordered_map<std::string, bool> _urls;
+};
diff --git a/plugins/experimental/prefetch/headers.cc b/plugins/experimental/prefetch/headers.cc
new file mode 100644
index 0000000..8233dc5
--- /dev/null
+++ b/plugins/experimental/prefetch/headers.cc
@@ -0,0 +1,213 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+
+/**
+ * @file headers.cc
+ * @brief HTTP headers manipulation.
+ */
+
+#include <stdlib.h>
+#include <string.h>
+
+#include "configs.h"
+#include "headers.h"
+
+/**
+ * @brief Remove a header (fully) from an TSMLoc / TSMBuffer.
+ *
+ * @param bufp request's buffer
+ * @param hdrLoc request's header location
+ * @param header header name
+ * @param headerlen header name length
+ * @return the number of fields (header values) we removed.
+ */
+int
+removeHeader(TSMBuffer bufp, TSMLoc hdrLoc, const char *header, int headerlen)
+{
+ TSMLoc fieldLoc = TSMimeHdrFieldFind(bufp, hdrLoc, header, headerlen);
+ int cnt = 0;
+
+ while (fieldLoc) {
+ TSMLoc tmp = TSMimeHdrFieldNextDup(bufp, hdrLoc, fieldLoc);
+
+ ++cnt;
+ TSMimeHdrFieldDestroy(bufp, hdrLoc, fieldLoc);
+ TSHandleMLocRelease(bufp, hdrLoc, fieldLoc);
+ fieldLoc = tmp;
+ }
+
+ return cnt;
+}
+
+/**
+ * @brief Checks if the header exists.
+ *
+ * @param bufp request's buffer
+ * @param hdrLoc request's header location
+ * @return true - exists, false - does not exist
+ */
+bool
+headerExist(TSMBuffer bufp, TSMLoc hdrLoc, const char *header, int headerlen)
+{
+ TSMLoc fieldLoc = TSMimeHdrFieldFind(bufp, hdrLoc, header, headerlen);
+ if (TS_NULL_MLOC != fieldLoc) {
+ TSHandleMLocRelease(bufp, hdrLoc, fieldLoc);
+ return true;
+ }
+ return false;
+}
+
+/**
+ * @brief Get the header value
+ *
+ * @param bufp request's buffer
+ * @param hdrLoc request's header location
+ * @param header header name
+ * @param headerlen header name length
+ * @param value buffer for the value
+ * @param valuelen lenght of the buffer for the value
+ * @return pointer to the string with the value.
+ */
+char *
+getHeader(TSMBuffer bufp, TSMLoc hdrLoc, const char *header, int headerlen, char *value, int *valuelen)
+{
+ TSMLoc fieldLoc = TSMimeHdrFieldFind(bufp, hdrLoc, header, headerlen);
+ char *dst = value;
+ while (fieldLoc) {
+ TSMLoc next = TSMimeHdrFieldNextDup(bufp, hdrLoc, fieldLoc);
+
+ int count = TSMimeHdrFieldValuesCount(bufp, hdrLoc, fieldLoc);
+ for (int i = 0; i < count; ++i) {
+ const char *v = nullptr;
+ int vlen = 0;
+ v = TSMimeHdrFieldValueStringGet(bufp, hdrLoc, fieldLoc, i, &vlen);
+ if (v == nullptr || vlen == 0) {
+ continue;
+ }
+ /* append the field content to the output buffer if enough space, plus space for ", " */
+ bool first = (dst == value);
+ int neededSpace = ((dst - value) + vlen + (dst == value ? 0 : 2));
+ if (neededSpace < *valuelen) {
+ if (!first) {
+ memcpy(dst, ", ", 2);
+ dst += 2;
+ }
+ memcpy(dst, v, vlen);
+ dst += vlen;
+ }
+ }
+ TSHandleMLocRelease(bufp, hdrLoc, fieldLoc);
+ fieldLoc = next;
+ }
+
+ *valuelen = dst - value;
+ return value;
+}
+
+/**
+ * @brief Set a header to a specific value.
+ *
+ * This will avoid going to through a remove / add sequence in case of an existing header but clean.
+ *
+ * @param bufp request's buffer
+ * @param hdrLoc request's header location
+ * @param header header name
+ * @param headerlen header name len
+ * @param value the new value
+ * @param valuelen lenght of the value
+ * @return true - OK, false - failed
+ */
+bool
+setHeader(TSMBuffer bufp, TSMLoc hdrLoc, const char *header, int headerlen, const char *value, int valuelen)
+{
+ if (!bufp || !hdrLoc || !header || headerlen <= 0 || !value || valuelen <= 0) {
+ return false;
+ }
+
+ bool ret = false;
+ TSMLoc fieldLoc = TSMimeHdrFieldFind(bufp, hdrLoc, header, headerlen);
+
+ if (!fieldLoc) {
+ // No existing header, so create one
+ if (TS_SUCCESS == TSMimeHdrFieldCreateNamed(bufp, hdrLoc, header, headerlen, &fieldLoc)) {
+ if (TS_SUCCESS == TSMimeHdrFieldValueStringSet(bufp, hdrLoc, fieldLoc, -1, value, valuelen)) {
+ TSMimeHdrFieldAppend(bufp, hdrLoc, fieldLoc);
+ ret = true;
+ }
+ TSHandleMLocRelease(bufp, hdrLoc, fieldLoc);
+ }
+ } else {
+ TSMLoc tmp = nullptr;
+ bool first = true;
+
+ while (fieldLoc) {
+ if (first) {
+ first = false;
+ if (TS_SUCCESS == TSMimeHdrFieldValueStringSet(bufp, hdrLoc, fieldLoc, -1, value, valuelen)) {
+ ret = true;
+ }
+ } else {
+ TSMimeHdrFieldDestroy(bufp, hdrLoc, fieldLoc);
+ }
+ tmp = TSMimeHdrFieldNextDup(bufp, hdrLoc, fieldLoc);
+ TSHandleMLocRelease(bufp, hdrLoc, fieldLoc);
+ fieldLoc = tmp;
+ }
+ }
+
+ return ret;
+}
+
+/**
+ * @brief Dump a header on stderr
+ *
+ * Useful together with TSDebug().
+ *
+ * @param bufp request's buffer
+ * @param hdrLoc request's header location
+ */
+void
+dumpHeaders(TSMBuffer bufp, TSMLoc hdrLoc)
+{
+ TSIOBuffer output_buffer;
+ TSIOBufferReader reader;
+ TSIOBufferBlock block;
+ const char *block_start;
+ int64_t block_avail;
+
+ output_buffer = TSIOBufferCreate();
+ reader = TSIOBufferReaderAlloc(output_buffer);
+
+ /* This will print just MIMEFields and not the http request line */
+ TSMimeHdrPrint(bufp, hdrLoc, output_buffer);
+
+ /* We need to loop over all the buffer blocks, there can be more than 1 */
+ block = TSIOBufferReaderStart(reader);
+ do {
+ block_start = TSIOBufferBlockReadStart(block, reader, &block_avail);
+ if (block_avail > 0) {
+ PrefetchDebug("Headers are:\n%.*s", static_cast<int>(block_avail), block_start);
+ }
+ TSIOBufferReaderConsume(reader, block_avail);
+ block = TSIOBufferReaderStart(reader);
+ } while (block && block_avail != 0);
+
+ /* Free up the TSIOBuffer that we used to print out the header */
+ TSIOBufferReaderFree(reader);
+ TSIOBufferDestroy(output_buffer);
+}
diff --git a/plugins/experimental/prefetch/headers.h b/plugins/experimental/prefetch/headers.h
new file mode 100644
index 0000000..b6e0e1b
--- /dev/null
+++ b/plugins/experimental/prefetch/headers.h
@@ -0,0 +1,31 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+
+/**
+ * @file headers.h
+ * @brief HTTP headers manipulation (header file).
+ */
+
+#pragma once
+
+int removeHeader(TSMBuffer bufp, TSMLoc hdr_loc, const char *header, int len);
+bool headerExist(TSMBuffer bufp, TSMLoc hdr_loc, const char *header, int len);
+char *getHeader(TSMBuffer bufp, TSMLoc hdr_loc, const char *header, int headerlen, char *value, int *valuelen);
+
+bool setHeader(TSMBuffer bufp, TSMLoc hdr_loc, const char *header, int len, const char *val, int val_len);
+void dumpHeaders(TSMBuffer bufp, TSMLoc hdr_loc);
diff --git a/plugins/experimental/prefetch/pattern.cc b/plugins/experimental/prefetch/pattern.cc
new file mode 100644
index 0000000..20e3b64
--- /dev/null
+++ b/plugins/experimental/prefetch/pattern.cc
@@ -0,0 +1,463 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+
+/**
+ * @file pattern.cc
+ * @brief PRCE related classes.
+ * @see pattern.h
+ */
+
+#include "pattern.h"
+
+static void
+replaceString(String &str, const String &from, const String &to)
+{
+ if (from.empty()) {
+ return;
+ }
+
+ String::size_type start_pos = 0;
+ while ((start_pos = str.find(from, start_pos)) != String::npos) {
+ str.replace(start_pos, from.length(), to);
+ start_pos += to.length();
+ }
+}
+
+Pattern::Pattern() : _re(nullptr), _extra(nullptr), _pattern(""), _replacement(""), _tokenCount(0) {}
+
+/**
+ * @brief Initializes PCRE pattern by providing the subject and replacement strings.
+ * @param pattern PCRE pattern, a string containing PCRE patterns, capturing groups.
+ * @param replacement PCRE replacement, a string where $0 ... $9 will be replaced with the corresponding capturing groups
+ * @return true if successful, false if failure
+ */
+bool
+Pattern::init(const String &pattern, const String &replacenemt)
+{
+ pcreFree();
+
+ _pattern.assign(pattern);
+ _replacement.assign(replacenemt);
+
+ _tokenCount = 0;
+
+ if (!compile()) {
+ PrefetchDebug("failed to initialize pattern:'%s', replacement:'%s'", pattern.c_str(), replacenemt.c_str());
+ pcreFree();
+ return false;
+ }
+
+ return true;
+}
+
+/**
+ * @brief Initializes PCRE pattern by providing the pattern only or pattern+replacement in a single configuration string.
+ * @see init()
+ * @param config PCRE pattern <pattern> or PCRE pattern + replacement in format /<pattern>/<replacement>/
+ * @return true if successful, false if failure
+ */
+bool
+Pattern::init(const String &config)
+{
+ if (config[0] == '/') {
+ /* This is a config in format /regex/replacement/ */
+ String pattern;
+ String replacement;
+
+ size_t start = 1;
+ size_t current = 0;
+ size_t next = 1;
+ do {
+ current = next + 1;
+ next = config.find_first_of("/", current);
+ } while (next != String::npos && '\\' == config[next - 1]);
+
+ if (next != String::npos) {
+ pattern = config.substr(start, next - start);
+ } else {
+ /* Error, no closing '/' */
+ PrefetchError("failed to parse the pattern in '%s'", config.c_str());
+ return false;
+ }
+
+ start = next + 1;
+ do {
+ current = next + 1;
+ next = config.find_first_of("/", current);
+ } while (next != String::npos && '\\' == config[next - 1]);
+
+ if (next != String::npos) {
+ replacement = config.substr(start, next - start);
+ } else {
+ /* Error, no closing '/' */
+ PrefetchError("failed to parse the replacement in '%s'", config.c_str());
+ return false;
+ }
+
+ // Remove '\' which escaped '/' inside the pattern and replacement strings.
+ ::replaceString(pattern, "\\/", "/");
+ ::replaceString(replacement, "\\/", "/");
+
+ return this->init(pattern, replacement);
+ } else {
+ return this->init(config, "");
+ }
+
+ /* Should never get here. */
+ return false;
+}
+
+/**
+ * @brief Checks if the pattern object was initialized with a meaningful regex pattern.
+ * @return true if initialized, false if not.
+ */
+bool
+Pattern::empty() const
+{
+ return _pattern.empty() || nullptr == _re;
+}
+
+/**
+ * @brief Frees PCRE library related resources.
+ */
+void
+Pattern::pcreFree()
+{
+ if (_re) {
+ pcre_free(_re);
+ _re = nullptr;
+ }
+
+ if (_extra) {
+ pcre_free(_extra);
+ _extra = nullptr;
+ }
+}
+
+/**
+ * @bried Destructor, frees PCRE related resources.
+ */
+Pattern::~Pattern()
+{
+ pcreFree();
+}
+
+/**
+ * @brief Capture or capture-and-replace depending on whether a replacement string is specified.
+ * @see replace()
+ * @see capture()
+ * @param subject PCRE subject string
+ * @param result vector of strings where the result of captures or the replacements will be returned.
+ * @return true if there was a match and capture or replacement succeeded, false if failure.
+ */
+bool
+Pattern::process(const String &subject, StringVector &result)
+{
+ if (!_replacement.empty()) {
+ /* Replacement pattern was provided in the configuration - capture and replace. */
+ String element;
+ if (replace(subject, element)) {
+ result.push_back(element);
+ } else {
+ return false;
+ }
+ } else {
+ /* Replacement was not provided so return all capturing groups except the group zero. */
+ StringVector captures;
+ if (capture(subject, captures)) {
+ if (captures.size() == 1) {
+ result.push_back(captures[0]);
+ } else {
+ StringVector::iterator it = captures.begin() + 1;
+ for (; it != captures.end(); it++) {
+ result.push_back(*it);
+ }
+ }
+ } else {
+ return false;
+ }
+ }
+
+ return true;
+}
+
+/**
+ * @brief PCRE matches a subject string against the the regex pattern.
+ * @param subject PCRE subject
+ * @return true - matched, false - did not.
+ */
+bool
+Pattern::match(const String &subject)
+{
+ int matchCount;
+ PrefetchDebug("matching '%s' to '%s'", _pattern.c_str(), subject.c_str());
+
+ if (!_re) {
+ return false;
+ }
+
+ matchCount = pcre_exec(_re, _extra, subject.c_str(), subject.length(), 0, PCRE_NOTEMPTY, nullptr, 0);
+ if (matchCount < 0) {
+ if (matchCount != PCRE_ERROR_NOMATCH)
+ PrefetchError("matching error %d", matchCount);
+ return false;
+ }
+
+ return true;
+}
+
+/**
+ * @brief Return all PCRE capture groups that matched in the subject string
+ * @param subject PCRE subject string
+ * @param result reference to vector of strings containing all capture groups
+ */
+bool
+Pattern::capture(const String &subject, StringVector &result)
+{
+ int matchCount;
+ int ovector[OVECOUNT];
+
+ PrefetchDebug("matching '%s' to '%s'", _pattern.c_str(), subject.c_str());
+
+ if (!_re) {
+ return false;
+ }
+
+ matchCount = pcre_exec(_re, nullptr, subject.c_str(), subject.length(), 0, PCRE_NOTEMPTY, ovector, OVECOUNT);
+ if (matchCount < 0) {
+ if (matchCount != PCRE_ERROR_NOMATCH)
+ PrefetchError("matching error %d", matchCount);
+ return false;
+ }
+
+ for (int i = 0; i < matchCount; i++) {
+ int start = ovector[2 * i];
+ int length = ovector[2 * i + 1] - ovector[2 * i];
+
+ String dst(subject, start, length);
+
+ PrefetchDebug("capturing '%s' %d[%d,%d]", dst.c_str(), i, ovector[2 * i], ovector[2 * i + 1]);
+ result.push_back(dst);
+ }
+
+ return true;
+}
+
+/**
+ * @brief Replaces all replacements found in the replacement string with what matched in the PCRE capturing groups.
+ * @param subject PCRE subject string
+ * @param result reference to A string where the result of the replacement will be stored
+ * @return true - success, false - nothing matched or failure.
+ */
+bool
+Pattern::replace(const String &subject, String &result)
+{
+ int matchCount;
+ int ovector[OVECOUNT];
+
+ PrefetchDebug("matching '%s' to '%s'", _pattern.c_str(), subject.c_str());
+
+ if (!_re) {
+ return false;
+ }
+
+ matchCount = pcre_exec(_re, nullptr, subject.c_str(), subject.length(), 0, PCRE_NOTEMPTY, ovector, OVECOUNT);
+ if (matchCount < 0) {
+ if (matchCount != PCRE_ERROR_NOMATCH)
+ PrefetchError("matching error %d", matchCount);
+ return false;
+ }
+
+ /* Verify the replacement has the right number of matching groups */
+ for (int i = 0; i < _tokenCount; i++) {
+ if (_tokens[i] >= matchCount) {
+ PrefetchError("invalid reference in replacement string: $%d", _tokens[i]);
+ return false;
+ }
+ }
+
+ int previous = 0;
+ for (int i = 0; i < _tokenCount; i++) {
+ int replIndex = _tokens[i];
+ int start = ovector[2 * replIndex];
+ int length = ovector[2 * replIndex + 1] - ovector[2 * replIndex];
+
+ String src(_replacement, _tokenOffset[i], 2);
+ String dst(subject, start, length);
+
+ PrefetchDebug("replacing '%s' with '%s'", src.c_str(), dst.c_str());
+
+ result.append(_replacement, previous, _tokenOffset[i] - previous);
+ result.append(dst);
+
+ previous = _tokenOffset[i] + 2; /* 2 is the size of $0 or $1 or $2, ... or $9 */
+ }
+
+ result.append(_replacement, previous, _replacement.length() - previous);
+
+ PrefetchDebug("replacing '%s' resulted in '%s'", _replacement.c_str(), result.c_str());
+
+ return true;
+}
+
+/**
+ * @brief PCRE compiles the regex, called only during initialization.
+ * @return true if successful, false if not.
+ */
+bool
+Pattern::compile()
+{
+ const char *errPtr; /* PCRE error */
+ int errOffset; /* PCRE error offset */
+
+ PrefetchDebug("compiling pattern:'%s', replacement:'%s'", _pattern.c_str(), _replacement.c_str());
+
+ _re = pcre_compile(_pattern.c_str(), /* the pattern */
+ 0, /* options */
+ &errPtr, /* for error message */
+ &errOffset, /* for error offset */
+ nullptr); /* use default character tables */
+
+ if (nullptr == _re) {
+ PrefetchError("compile of regex '%s' at char %d: %s", _pattern.c_str(), errOffset, errPtr);
+
+ return false;
+ }
+
+ _extra = pcre_study(_re, 0, &errPtr);
+
+ if ((nullptr == _extra) && (nullptr != errPtr) && (0 != *errPtr)) {
+ PrefetchError("failed to study regex '%s': %s", _pattern.c_str(), errPtr);
+
+ pcre_free(_re);
+ _re = nullptr;
+ return false;
+ }
+
+ if (_replacement.empty()) {
+ /* No replacement necessary - we are done. */
+ return true;
+ }
+
+ _tokenCount = 0;
+ bool success = true;
+
+ for (unsigned i = 0; i < _replacement.length(); i++) {
+ if (_replacement[i] == '$') {
+ if (_tokenCount >= TOKENCOUNT) {
+ PrefetchError("too many tokens in replacement string: %s", _replacement.c_str());
+
+ success = false;
+ break;
+ } else if (_replacement[i + 1] < '0' || _replacement[i + 1] > '9') {
+ PrefetchError("invalid replacement token $%c in %s: should be $0 - $9", _replacement[i + 1], _replacement.c_str());
+
+ success = false;
+ break;
+ } else {
+ /* Store the location of the replacement */
+ /* Convert '0' to 0 */
+ _tokens[_tokenCount] = _replacement[i + 1] - '0';
+ _tokenOffset[_tokenCount] = i;
+ _tokenCount++;
+ /* Skip the next char */
+ i++;
+ }
+ }
+ }
+
+ if (!success) {
+ pcreFree();
+ }
+
+ return success;
+}
+
+/**
+ * @brief Destructor, deletes all patterns.
+ */
+MultiPattern::~MultiPattern()
+{
+ for (std::vector<Pattern *>::iterator p = this->_list.begin(); p != this->_list.end(); ++p) {
+ delete (*p);
+ }
+}
+
+/**
+ * @brief Check if empty.
+ * @return true if the classification contains any patterns, false otherwise
+ */
+bool
+MultiPattern::empty() const
+{
+ return _list.empty();
+}
+
+/**
+ * @brief Adds a pattern to the multi-pattern
+ *
+ * The order of addition matters during the classification
+ * @param pattern pattern pointer
+ */
+void
+MultiPattern::add(Pattern *pattern)
+{
+ this->_list.push_back(pattern);
+}
+
+/**
+ * @brief Matches the subject string against all patterns.
+ * @param subject subject string.
+ * @return true if any matches, false if nothing matches.
+ */
+bool
+MultiPattern::match(const String &subject) const
+{
+ for (std::vector<Pattern *>::const_iterator p = this->_list.begin(); p != this->_list.end(); ++p) {
+ if (nullptr != (*p) && (*p)->match(subject)) {
+ return true;
+ }
+ }
+ return false;
+}
+
+/**
+ * @brief Calls Pattern::replace() on all patterns in the multi-pattern one by one until the first match.
+ * @param subject subject string.
+ * @param result vector of the result.
+ * @return true if any matches, false if nothing matches.
+ */
+bool
+MultiPattern::replace(const String &subject, String &result) const
+{
+ for (std::vector<Pattern *>::const_iterator p = this->_list.begin(); p != this->_list.end(); ++p) {
+ if (nullptr != (*p) && (*p)->replace(subject, result)) {
+ return true;
+ }
+ }
+ return false;
+}
+
+/**
+ * @brief Returns the name of the multi-pattern (set during the instantiation only).
+ */
+const String &
+MultiPattern::name() const
+{
+ return _name;
+}
diff --git a/plugins/experimental/prefetch/pattern.h b/plugins/experimental/prefetch/pattern.h
new file mode 100644
index 0000000..2db7665
--- /dev/null
+++ b/plugins/experimental/prefetch/pattern.h
@@ -0,0 +1,92 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+
+/**
+ * @file pattern.h
+ * @brief PRCE related classes (header file).
+ */
+
+#pragma once
+
+#ifdef HAVE_PCRE_PCRE_H
+#include <pcre/pcre.h>
+#else
+#include <pcre.h>
+#endif
+
+#include "common.h"
+
+/**
+ * @brief PCRE matching, capturing and replacing
+ */
+class Pattern
+{
+public:
+ static const int TOKENCOUNT = 10; /**< @brief Capturing groups $0..$9 */
+ static const int OVECOUNT = TOKENCOUNT * 3; /**< @brief pcre_exec() array count, handle 10 capture groups */
+
+ Pattern();
+ virtual ~Pattern();
+
+ bool init(const String &pattern, const String &replacenemt);
+ bool init(const String &config);
+ bool empty() const;
+ bool match(const String &subject);
+ bool capture(const String &subject, StringVector &result);
+ bool replace(const String &subject, String &result);
+ bool process(const String &subject, StringVector &result);
+
+private:
+ bool compile();
+ bool failed(const String &subject) const;
+ void pcreFree();
+
+ pcre *_re; /**< @brief PCRE compiled info structure, computed during initialization */
+ pcre_extra *_extra; /**< @brief PCRE study data block, computed during initialization */
+
+ String _pattern; /**< @brief PCRE pattern string, containing PCRE patterns and capturing groups. */
+ String _replacement; /**< @brief PCRE replacement string, containing $0..$9 to be replaced with content of the capturing groups */
+
+ int _tokenCount; /**< @brief number of replacements $0..$9 found in the replacement string if not empty */
+ int _tokens[TOKENCOUNT]; /**< @brief replacement index 0..9, since they can be used in the replacement string in any order */
+ int _tokenOffset[TOKENCOUNT]; /**< @brief replacement offset inside the replacement string */
+};
+
+/**
+ * @brief Named list of regular expressions.
+ */
+class MultiPattern
+{
+public:
+ MultiPattern(const String name = "") : _name(name) {}
+ virtual ~MultiPattern();
+
+ bool empty() const;
+ void add(Pattern *pattern);
+ virtual bool match(const String &subject) const;
+ virtual bool replace(const String &subject, String &result) const;
+ const String &name() const;
+
+protected:
+ std::vector<Pattern *> _list; /**< @brief vector which dictates the order of the pattern evaluation. */
+ String _name; /**< @brief multi-pattern name */
+
+private:
+ MultiPattern(const MultiPattern &); // disallow
+ MultiPattern &operator=(const MultiPattern &); // disallow
+};
diff --git a/plugins/experimental/prefetch/plugin.cc b/plugins/experimental/prefetch/plugin.cc
new file mode 100644
index 0000000..1c16ae0
--- /dev/null
+++ b/plugins/experimental/prefetch/plugin.cc
@@ -0,0 +1,751 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+
+/**
+ * @file plugin.cc
+ * @brief traffic server plugin entry points.
+ */
+
+#include <sstream>
+#include <iomanip>
+
+#include "ts/ts.h" /* ATS API */
+
+#include "ts/remap.h" /* TSRemapInterface, TSRemapStatus, apiInfo */
+
+#include "common.h"
+#include "configs.h"
+#include "fetch.h"
+#include "fetch_policy.h"
+#include "headers.h"
+
+static const char *
+getEventName(TSEvent event)
+{
+ switch (event) {
+ case TS_EVENT_HTTP_CONTINUE:
+ return "TS_EVENT_HTTP_CONTINUE";
+ case TS_EVENT_HTTP_ERROR:
+ return "TS_EVENT_HTTP_ERROR";
+ case TS_EVENT_HTTP_READ_REQUEST_HDR:
+ return "TS_EVENT_HTTP_READ_REQUEST_HDR";
+ case TS_EVENT_HTTP_OS_DNS:
+ return "TS_EVENT_HTTP_OS_DNS";
+ case TS_EVENT_HTTP_SEND_REQUEST_HDR:
+ return "TS_EVENT_HTTP_SEND_REQUEST_HDR";
+ case TS_EVENT_HTTP_READ_CACHE_HDR:
+ return "TS_EVENT_HTTP_READ_CACHE_HDR";
+ case TS_EVENT_HTTP_READ_RESPONSE_HDR:
+ return "TS_EVENT_HTTP_READ_RESPONSE_HDR";
+ case TS_EVENT_HTTP_SEND_RESPONSE_HDR:
+ return "TS_EVENT_HTTP_SEND_RESPONSE_HDR";
+ case TS_EVENT_HTTP_REQUEST_TRANSFORM:
+ return "TS_EVENT_HTTP_REQUEST_TRANSFORM";
+ case TS_EVENT_HTTP_RESPONSE_TRANSFORM:
+ return "TS_EVENT_HTTP_RESPONSE_TRANSFORM";
+ case TS_EVENT_HTTP_SELECT_ALT:
+ return "TS_EVENT_HTTP_SELECT_ALT";
+ case TS_EVENT_HTTP_TXN_START:
+ return "TS_EVENT_HTTP_TXN_START";
+ case TS_EVENT_HTTP_TXN_CLOSE:
+ return "TS_EVENT_HTTP_TXN_CLOSE";
+ case TS_EVENT_HTTP_SSN_START:
+ return "TS_EVENT_HTTP_SSN_START";
+ case TS_EVENT_HTTP_SSN_CLOSE:
+ return "TS_EVENT_HTTP_SSN_CLOSE";
+ case TS_EVENT_HTTP_CACHE_LOOKUP_COMPLETE:
+ return "TS_EVENT_HTTP_CACHE_LOOKUP_COMPLETE";
+ case TS_EVENT_HTTP_PRE_REMAP:
+ return "TS_EVENT_HTTP_PRE_REMAP";
+ case TS_EVENT_HTTP_POST_REMAP:
+ return "TS_EVENT_HTTP_POST_REMAP";
+ default:
+ return "UNHANDLED";
+ }
+ return "UNHANDLED";
+}
+
+static const char *
+getCacheLookupResultName(TSCacheLookupResult result)
+{
+ switch (result) {
+ case TS_CACHE_LOOKUP_MISS:
+ return "TS_CACHE_LOOKUP_MISS";
+ break;
+ case TS_CACHE_LOOKUP_HIT_STALE:
+ return "TS_CACHE_LOOKUP_HIT_STALE";
+ break;
+ case TS_CACHE_LOOKUP_HIT_FRESH:
+ return "TS_CACHE_LOOKUP_HIT_FRESH";
+ break;
+ case TS_CACHE_LOOKUP_SKIPPED:
+ return "TS_CACHE_LOOKUP_SKIPPED";
+ break;
+ default:
+ return "UNKNOWN_CACHE_LOOKUP_EVENT";
+ break;
+ }
+ return "UNKNOWN_CACHE_LOOKUP_EVENT";
+}
+
+/**
+ * @brief Plugin initialization.
+ * @param apiInfo remap interface info pointer
+ * @param errBuf error message buffer
+ * @param errBufSize error message buffer size
+ * @return always TS_SUCCESS.
+ */
+TSReturnCode
+TSRemapInit(TSRemapInterface *apiInfo, char *errBuf, int erroBufSize)
+{
+ return TS_SUCCESS;
+}
+
+/**
+ * @brief Plugin instance data.
+ */
+
+struct PrefetchInstance {
+ PrefetchInstance() : _state(nullptr){};
+
+private:
+ PrefetchInstance(PrefetchInstance const &);
+ void operator=(BgFetchState const &);
+
+public:
+ PrefetchConfig _config;
+ BgFetchState *_state;
+};
+
+/**
+ * brief Plugin transaction data.
+ */
+class PrefetchTxnData
+{
+public:
+ PrefetchTxnData(PrefetchInstance *inst)
+ : _inst(inst), _front(true), _firstPass(true), _fetchable(false), _status(TS_HTTP_STATUS_OK)
+ {
+ }
+
+ bool
+ firstPass() const
+ {
+ return _firstPass;
+ }
+
+ bool
+ secondPass() const
+ {
+ return !_firstPass;
+ }
+
+ bool
+ frontend() const
+ {
+ return _front;
+ }
+
+ bool
+ backend() const
+ {
+ return !_front;
+ }
+
+ PrefetchInstance *_inst; /* Pointer to the plugin instance */
+
+ bool _front; /* front-end vs back-end */
+ bool _firstPass; /* first vs second pass */
+
+ /* saves state between hooks */
+ String _cachekey; /* cache key */
+ bool _fetchable; /* saves the result of the attempt to fetch */
+ TSHttpStatus _status; /* status to return to the UA */
+ String _body; /* body to return to the UA */
+};
+
+/**
+ * @brief Evaluate a math addition or subtraction expression.
+ *
+ * @param v string containing an expression, i.e. "3 + 4"
+ * @return string containing the result, i.e. "7"
+ */
+static String
+evaluate(const String &v)
+{
+ if (v.empty()) {
+ return String("");
+ }
+
+ /* Find out if width is specified (hence leading zeros are required if the width is bigger then the result width) */
+ String stmt;
+ size_t len = 0;
+ size_t pos = v.find_first_of(":");
+ if (String::npos != pos) {
+ stmt.assign(v.substr(0, pos));
+ len = getValue(v.substr(pos + 1));
+ } else {
+ stmt.assign(v);
+ }
+ PrefetchDebug("statement: '%s', formating length: %zu", stmt.c_str(), len);
+
+ int result = 0;
+ pos = stmt.find_first_of("+-");
+
+ if (String::npos == pos) {
+ result = getValue(stmt);
+ } else {
+ unsigned a = getValue(stmt.substr(0, pos));
+ unsigned b = getValue(stmt.substr(pos + 1));
+
+ if ('+' == stmt[pos]) {
+ result = a + b;
+ } else {
+ result = a - b;
+ }
+ }
+
+ std::ostringstream convert;
+ convert << std::setw(len) << std::setfill('0') << result;
+ PrefetchDebug("evaluation of '%s' resulted in '%s'", v.c_str(), convert.str().c_str());
+ return convert.str();
+}
+
+/**
+ * @brief Expand+evaluate (in place) an expression surrounded with "{" and "}" and uses evaluate() to evaluate the math expression.
+ *
+ * @param s string containing an expression, i.e. "{3 + 4}"
+ * @return void
+ */
+static void
+expand(String &s)
+{
+ size_t cur = 0;
+ while (String::npos != cur) {
+ size_t start = s.find_first_of("{", cur);
+ size_t stop = s.find_first_of("}", start);
+
+ if (String::npos != start && String::npos != stop) {
+ s.replace(start, stop - start + 1, evaluate(s.substr(start + 1, stop - start - 1)));
+ cur = stop + 1;
+ } else {
+ cur = stop;
+ }
+ }
+}
+
+/**
+ * @brief Get the cachekey used for the particular object in this transaction.
+ *
+ * @param txnp HTTP transaction structure
+ * @param reqBuffer request TSMBuffer
+ * @param destination string reference to where the result is to be appended.
+ * @return true if success or false if failure
+ */
+bool
+appendCacheKey(const TSHttpTxn txnp, const TSMBuffer reqBuffer, String &key)
+{
+ bool ret = false;
+ TSMLoc keyLoc = TS_NULL_MLOC;
+ if (TS_SUCCESS == TSUrlCreate(reqBuffer, &keyLoc)) {
+ if (TS_SUCCESS == TSHttpTxnCacheLookupUrlGet(txnp, reqBuffer, keyLoc)) {
+ int urlLen = 0;
+ char *url = TSUrlStringGet(reqBuffer, keyLoc, &urlLen);
+ if (nullptr != url) {
+ key.append(url, urlLen);
+ PrefetchDebug("cache key: %s", key.c_str());
+ TSfree(static_cast<void *>(url));
+ ret = true;
+ }
+ }
+ TSHandleMLocRelease(reqBuffer, TS_NULL_MLOC, keyLoc);
+ }
+
+ if (!ret) {
+ PrefetchError("failed to get cache key");
+ }
+ return ret;
+}
+
+/**
+ * @brief Find out if the object was found fresh in cache.
+ *
+ * This function finaly controls if the pre-fetch should be scheduled or not.
+ * @param txnp HTTP transaction structure
+ * @return true - hit fresh, false - miss/stale/skipped or error
+ */
+static bool
+foundFresh(TSHttpTxn txnp)
+{
+ bool fresh = false;
+ int lookupStatus;
+ if (TS_SUCCESS == TSHttpTxnCacheLookupStatusGet(txnp, &lookupStatus)) {
+ PrefetchDebug("lookup status: %s", getCacheLookupResultName((TSCacheLookupResult)lookupStatus));
+ if (TS_CACHE_LOOKUP_HIT_FRESH == lookupStatus) {
+ fresh = true;
+ }
+ } else {
+ /* Failed to get the lookup status, likely a previous plugin already prepared the client response w/o a cache lookup,
+ * we don't really know if the cache has a fresh object, so just don't trigger pre-fetch */
+ PrefetchDebug("failed to check cache-ability");
+ }
+ return fresh;
+}
+
+/**
+ * @brief Check if the response from origin for N-th object is success (200 and 206)
+ *
+ * and only then schedule a pre-fetch for the next
+ *
+ * @param txnp HTTP transaction structure
+ * @return true - yes, false - no
+ */
+bool
+isResponseGood(TSHttpTxn txnp)
+{
+ bool good = false;
+ TSMBuffer respBuffer;
+ TSMLoc respHdrLoc;
+ if (TS_SUCCESS == TSHttpTxnServerRespGet(txnp, &respBuffer, &respHdrLoc)) {
+ TSHttpStatus status = TSHttpHdrStatusGet(respBuffer, respHdrLoc);
+ PrefetchDebug("origin response code: %d", status);
+ if (TS_HTTP_STATUS_PARTIAL_CONTENT == status || TS_HTTP_STATUS_OK == status) {
+ good = true;
+ }
+ /* Release the response MLoc */
+ TSHandleMLocRelease(respBuffer, TS_NULL_MLOC, respHdrLoc);
+ } else {
+ /* Failed to get the origin response, possible cause could be a origin connection problems or timeouts or
+ * a previous plugin could have already prepared the client response w/o going to origin server */
+ PrefetchDebug("failed to get origin response");
+ }
+ return good;
+}
+
+/**
+ * @brief get the pristin URL path
+ *
+ * @param txnp HTTP transaction structure
+ * @return pristine URL path
+ */
+static String
+getPristineUrlPath(TSHttpTxn txnp)
+{
+ String pristinePath;
+ TSMLoc pristineUrlLoc;
+ TSMBuffer reqBuffer;
+
+ if (TS_SUCCESS == TSHttpTxnPristineUrlGet(txnp, &reqBuffer, &pristineUrlLoc)) {
+ int pathLen = 0;
+ const char *path = TSUrlPathGet(reqBuffer, pristineUrlLoc, &pathLen);
+ if (nullptr != path) {
+ PrefetchDebug("path: '%.*s'", pathLen, path);
+ pristinePath.assign(path, pathLen);
+ } else {
+ PrefetchError("failed to get pristine URL path");
+ }
+ TSHandleMLocRelease(reqBuffer, TS_NULL_MLOC, pristineUrlLoc);
+ } else {
+ PrefetchError("failed to get pristine URL");
+ }
+ return pristinePath;
+}
+
+/**
+ * @brief short-cut to set the response .
+ */
+TSEvent
+shortcutResponse(PrefetchTxnData *data, TSHttpStatus status, const char *body, TSEvent event)
+{
+ data->_status = status;
+ data->_body.assign(body);
+ return event;
+}
+
+/**
+ * @brief Checks if we are still supposed to schedule a background fetch based on whether the object is in the cache.
+ * It is 'fetchable' only if not a fresh hit.
+ *
+ * @param txnp HTTP transaction structure
+ * @param data transaction data
+ * @return true if fetchable and false if not.
+ */
+static bool
+isFetchable(TSHttpTxn txnp, PrefetchTxnData *data)
+{
+ bool fetchable = false;
+ BgFetchState *state = data->_inst->_state;
+ if (!foundFresh(txnp)) {
+ /* Schedule fetch only if not in cache */
+ PrefetchDebug("object to be fetched");
+ fetchable = true;
+ } else {
+ PrefetchDebug("object already in cache or to be skipped");
+ state->incrementMetric(FETCH_ALREADY_CACHED);
+ state->incrementMetric(FETCH_TOTAL);
+ }
+ return fetchable;
+}
+
+/**
+ * @brief Find out if the current response to trigger a background prefetch.
+ *
+ * Pre-fetch only on HTTP codes 200 and 206 or object found in cache (previous good response)
+ *
+ * @param txnp HTTP transaction structure
+ * @return true - trigger prefetch, false - don't trigger.
+ */
+static bool
+respToTriggerPrefetch(TSHttpTxn txnp)
+{
+ bool trigger = false;
+ if (foundFresh(txnp)) {
+ /* If found in cache and fresh trigger next (same as good response from origin) */
+ PrefetchDebug("trigger background fetch (cached)");
+ trigger = true;
+ } else if (isResponseGood(txnp)) {
+ /* Trigger all necessary background fetches based on the next path pattern */
+ PrefetchDebug("trigger background fetch (good origin response)");
+ trigger = true;
+ } else {
+ PrefetchDebug("don't trigger background fetch");
+ }
+ return trigger;
+}
+
+/**
+ * @brief Callback function that handles necessary foreground / background fetch operations.
+ *
+ * @param contp continuation associated with this function.
+ * @param event corresponding event triggered at different hooks.
+ * @param edata HTTP transaction structures.
+ * @return always 0
+ */
+int
+contHandleFetch(const TSCont contp, TSEvent event, void *edata)
+{
+ PrefetchTxnData *data = static_cast<PrefetchTxnData *>(TSContDataGet(contp));
+ TSHttpTxn txnp = static_cast<TSHttpTxn>(edata);
+ PrefetchConfig &config = data->_inst->_config;
+ BgFetchState *state = data->_inst->_state;
+ TSMBuffer reqBuffer;
+ TSMLoc reqHdrLoc;
+
+ PrefetchDebug("event: %s (%d)", getEventName(event), event);
+
+ TSEvent retEvent = TS_EVENT_HTTP_CONTINUE;
+
+ if (TS_SUCCESS != TSHttpTxnClientReqGet(txnp, &reqBuffer, &reqHdrLoc)) {
+ PrefetchError("failed to get client request");
+ TSHttpTxnReenable(txnp, TS_EVENT_HTTP_ERROR);
+ return 0;
+ }
+
+ switch (event) {
+ case TS_EVENT_HTTP_POST_REMAP: {
+ /* Use the cache key since this has better lookup behavior when using plugins like the cachekey plugin,
+ * for example multiple URIs can match a single cache key */
+ if (data->frontend() && data->secondPass()) {
+ /* Create a separate cache key name space to be used only for front-end and second-pass fetch policy checks. */
+ data->_cachekey.assign("/prefetch");
+ }
+ if (!appendCacheKey(txnp, reqBuffer, data->_cachekey)) {
+ PrefetchError("failed to get the cache key");
+ TSHttpTxnReenable(txnp, TS_EVENT_HTTP_ERROR);
+ return 0;
+ }
+
+ if (data->frontend()) {
+ /* front-end instance */
+ if (data->firstPass()) {
+ /* first-pass */
+ if (!config.isExactMatch()) {
+ data->_fetchable = state->acquire(data->_cachekey);
+ PrefetchDebug("request is%sfetchable", data->_fetchable ? " " : " not ");
+ }
+ }
+ }
+ } break;
+
+ case TS_EVENT_HTTP_CACHE_LOOKUP_COMPLETE: {
+ if (data->frontend()) {
+ /* front-end instance */
+ if (data->secondPass()) {
+ /* second-pass */
+ data->_fetchable = state->acquire(data->_cachekey);
+ data->_fetchable = data->_fetchable && state->uniqueAcquire(data->_cachekey);
+ PrefetchDebug("request is%sfetchable", data->_fetchable ? " " : " not ");
+
+ if (isFetchable(txnp, data)) {
+ if (!data->_fetchable) {
+ /* Cancel the requested fetch */
+ retEvent = shortcutResponse(data, TS_HTTP_STATUS_ALREADY_REPORTED, "fetch not scheduled\n", TS_EVENT_HTTP_ERROR);
+ } else {
+ /* Fetch */
+ }
+ } else {
+ retEvent = shortcutResponse(data, TS_HTTP_STATUS_ALREADY_REPORTED, "fetch not scheduled\n", TS_EVENT_HTTP_ERROR);
+ }
+ }
+ } else {
+ /* back-end instance */
+ if (data->firstPass()) {
+ if (isFetchable(txnp, data)) {
+ if (BgFetch::schedule(state, config, /* askPermission */ true, reqBuffer, reqHdrLoc, txnp, nullptr, 0, data->_cachekey)) {
+ retEvent = shortcutResponse(data, TS_HTTP_STATUS_OK, "fetch scheduled\n", TS_EVENT_HTTP_ERROR);
+ } else {
+ retEvent = shortcutResponse(data, TS_HTTP_STATUS_ALREADY_REPORTED, "fetch not scheduled\n", TS_EVENT_HTTP_ERROR);
+ }
+ } else {
+ retEvent = shortcutResponse(data, TS_HTTP_STATUS_ALREADY_REPORTED, "fetch not scheduled\n", TS_EVENT_HTTP_ERROR);
+ }
+ }
+ }
+ } break;
+
+ case TS_EVENT_HTTP_SEND_RESPONSE_HDR: {
+ if (data->frontend()) {
+ /* front-end instance */
+
+ if (data->firstPass() && data->_fetchable && !config.getNextPath().empty() && respToTriggerPrefetch(txnp)) {
+ /* Trigger all necessary background fetches based on the next path pattern */
+
+ String currentPath = getPristineUrlPath(txnp);
+ if (!currentPath.empty()) {
+ unsigned total = config.getFetchCount();
+ for (unsigned i = 0; i < total; ++i) {
+ PrefetchDebug("generating prefetch request %d/%d", i + 1, total);
+ String expandedPath;
+
+ if (config.getNextPath().replace(currentPath, expandedPath)) {
+ PrefetchDebug("replaced: %s", expandedPath.c_str());
+ expand(expandedPath);
+ PrefetchDebug("expanded: %s", expandedPath.c_str());
+
+ BgFetch::schedule(state, config, /* askPermission */ false, reqBuffer, reqHdrLoc, txnp, expandedPath.c_str(),
+ expandedPath.length(), data->_cachekey);
+ } else {
+ /* We should be here only if the pattern replacement fails (match already checked) */
+ PrefetchError("failed to process the pattern");
+
+ /* If the first or any matches fails there must be something wrong so don't continue */
+ break;
+ }
+ currentPath.assign(expandedPath);
+ }
+ } else {
+ PrefetchDebug("failed to get current path");
+ }
+ }
+ }
+
+ if ((data->backend() && data->firstPass()) || (data->frontend() && data->secondPass() && !data->_body.empty())) {
+ TSMBuffer bufp;
+ TSMLoc hdrLoc;
+
+ if (TS_SUCCESS == TSHttpTxnClientRespGet(txnp, &bufp, &hdrLoc)) {
+ const char *reason = TSHttpHdrReasonLookup(data->_status);
+ int reasonLen = strlen(reason);
+ TSHttpHdrStatusSet(bufp, hdrLoc, data->_status);
+ TSHttpHdrReasonSet(bufp, hdrLoc, reason, reasonLen);
+ PrefetchDebug("set response: %d %.*s '%s'", data->_status, reasonLen, reason, data->_body.c_str());
+
+ char *buf = (char *)TSmalloc(data->_body.length() + 1);
+ sprintf(buf, "%s", data->_body.c_str());
+ TSHttpTxnErrorBodySet(txnp, buf, strlen(buf), nullptr);
+
+ setHeader(bufp, hdrLoc, TS_MIME_FIELD_CACHE_CONTROL, TS_MIME_LEN_CACHE_CONTROL, TS_HTTP_VALUE_NO_STORE,
+ TS_HTTP_LEN_NO_STORE);
+
+ TSHandleMLocRelease(bufp, TS_NULL_MLOC, hdrLoc);
+ } else {
+ PrefetchError("failed to retrieve client response header");
+ }
+ }
+ } break;
+
+ case TS_EVENT_HTTP_TXN_CLOSE: {
+ if (data->_fetchable) {
+ if (data->frontend()) {
+ /* front-end */
+ if (data->firstPass()) {
+ /* first-pass */
+ if (!config.isExactMatch()) {
+ state->release(data->_cachekey);
+ }
+ } else {
+ /* second-pass */
+ state->uniqueRelease(data->_cachekey);
+ state->release(data->_cachekey);
+ }
+ }
+ }
+
+ /* Destroy the txn continuation and its data */
+ delete data;
+ TSContDestroy(contp);
+ } break;
+
+ default: {
+ PrefetchError("unhandled event: %s(%d)", getEventName(event), event);
+ } break;
+ }
+
+ /* Release the request MLoc */
+ TSHandleMLocRelease(reqBuffer, TS_NULL_MLOC, reqHdrLoc);
+
+ /* Reenable and continue with the state machine. */
+ TSHttpTxnReenable(txnp, retEvent);
+ return 0;
+}
+
+/**
+ * @brief Plugin new instance entry point.
+ *
+ * Processes the configuration and initializes the plugin instance.
+ * @param argc plugin arguments number
+ * @param argv plugin arguments
+ * @param instance new plugin instance pointer (initialized in this function)
+ * @param errBuf error message buffer
+ * @param errBufSize error message buffer size
+ * @return TS_SUCCES if success or TS_ERROR if failure
+ */
+TSReturnCode
+TSRemapNewInstance(int argc, char *argv[], void **instance, char *errBuf, int errBufSize)
+{
+ bool failed = true;
+
+ PrefetchInstance *inst = new PrefetchInstance();
+ if (nullptr != inst) {
+ if (inst->_config.init(argc, argv)) {
+ inst->_state = BgFetchStates::get()->getStateByName(inst->_config.getNameSpace());
+ if (nullptr != inst->_state) {
+ failed = !inst->_state->init(inst->_config);
+ }
+ }
+ }
+
+ if (failed) {
+ PrefetchError("failed to initialize the plugin");
+ delete inst;
+ *instance = nullptr;
+ return TS_ERROR;
+ }
+
+ *instance = inst;
+ return TS_SUCCESS;
+}
+
+/**
+ * @brief Plugin instance deletion clean-up entry point.
+ * @param plugin instance pointer.
+ */
+void
+TSRemapDeleteInstance(void *instance)
+{
+ PrefetchInstance *inst = (PrefetchInstance *)instance;
+ delete inst;
+}
+
+/**
+ * @brief Organizes the background fetch by registering necessary hooks, by identifying front-end vs back-end, first vs second
+ * pass.
+ *
+ * Remap is never done, continue with next in chain.
+ * @param instance plugin instance pointer
+ * @param txnp transaction handle
+ * @param rri remap request info pointer
+ * @return always TSREMAP_NO_REMAP
+ */
+TSRemapStatus
+TSRemapDoRemap(void *instance, TSHttpTxn txnp, TSRemapRequestInfo *rri)
+{
+ PrefetchInstance *inst = (PrefetchInstance *)instance;
+
+ if (nullptr != inst) {
+ PrefetchConfig &config = inst->_config;
+
+ int methodLen = 0;
+ const char *method = TSHttpHdrMethodGet(rri->requestBufp, rri->requestHdrp, &methodLen);
+ const String &header = config.getApiHeader();
+ if (nullptr != method && methodLen == TS_HTTP_LEN_GET && 0 == memcmp(TS_HTTP_METHOD_GET, method, TS_HTTP_LEN_GET)) {
+ bool front = config.isFront();
+ bool firstPass = false;
+ if (headerExist(rri->requestBufp, rri->requestHdrp, header.c_str(), header.length())) {
+ PrefetchDebug("%s: found %.*s", front ? "front-end" : "back-end", (int)header.length(), header.c_str());
+ /* On front-end: presence of header means second-pass, on back-end means first-pass. */
+ firstPass = !front;
+ } else {
+ /* On front-end: lack of header means first-pass, on back-end means second-pass. */
+ firstPass = front;
+ }
+
+ /* Make sure we handle only URLs that match the path pattern on the front-end + first-pass, cancel otherwise */
+ bool handleFetch = true;
+ if (front && firstPass) {
+ /* Front-end plug-in instance + first pass. */
+ if (config.getNextPath().empty()) {
+ /* No next path pattern specified then pass this request untouched. */
+ PrefetchDebug("next object pattern not specified, skip");
+ handleFetch = false;
+ } else {
+ /* Next path pattern specified hence try to match. */
+ String pristinePath = getPristineUrlPath(txnp);
+ if (!pristinePath.empty()) {
+ if (config.getNextPath().match(pristinePath)) {
+ /* Matched - handle the request */
+ PrefetchDebug("matched next object pattern");
+ inst->_state->incrementMetric(FETCH_MATCH_YES);
+ } else {
+ /* Next path pattern specified but did not match. */
+ PrefetchDebug("failed to match next object pattern, skip");
+ inst->_state->incrementMetric(FETCH_MATCH_NO);
+ handleFetch = false;
+ }
+ } else {
+ PrefetchDebug("failed to get path to (pre)match");
+ }
+ }
+ }
+
+ if (handleFetch) {
+ PrefetchTxnData *data = new PrefetchTxnData(inst);
+ if (nullptr != data) {
+ data->_front = front;
+ data->_firstPass = firstPass;
+
+ TSCont cont = TSContCreate(contHandleFetch, TSMutexCreate());
+ TSContDataSet(cont, static_cast<void *>(data));
+
+ TSHttpTxnHookAdd(txnp, TS_HTTP_POST_REMAP_HOOK, cont);
+ TSHttpTxnHookAdd(txnp, TS_HTTP_CACHE_LOOKUP_COMPLETE_HOOK, cont);
+ TSHttpTxnHookAdd(txnp, TS_HTTP_SEND_RESPONSE_HDR_HOOK, cont);
+ TSHttpTxnHookAdd(txnp, TS_HTTP_TXN_CLOSE_HOOK, cont);
+ } else {
+ PrefetchError("failed to allocate transaction data object");
+ }
+ }
+ } else {
+ PrefetchDebug("not a GET method (%.*s), skipping", methodLen, method);
+ }
+ } else {
+ PrefetchError("could not get prefetch instance");
+ }
+
+ return TSREMAP_NO_REMAP;
+}