You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@trafficserver.apache.org by ni...@apache.org on 2015/07/03 23:43:39 UTC
trafficserver git commit: Introduce experimental "stream-editor"
plugin to enable rewriting of incoming/outgoing data on the fly.
Repository: trafficserver
Updated Branches:
refs/heads/master f71d06853 -> 164d507a9
Introduce experimental "stream-editor" plugin to enable rewriting of
incoming/outgoing data on the fly.
Project: http://git-wip-us.apache.org/repos/asf/trafficserver/repo
Commit: http://git-wip-us.apache.org/repos/asf/trafficserver/commit/164d507a
Tree: http://git-wip-us.apache.org/repos/asf/trafficserver/tree/164d507a
Diff: http://git-wip-us.apache.org/repos/asf/trafficserver/diff/164d507a
Branch: refs/heads/master
Commit: 164d507a96a33c16c0f4afee7a15b12af9b796bf
Parents: f71d068
Author: Nick Kew <nk...@qualys.com>
Authored: Fri Jul 3 22:40:34 2015 +0100
Committer: Nick Kew <nk...@qualys.com>
Committed: Fri Jul 3 22:40:34 2015 +0100
----------------------------------------------------------------------
plugins/experimental/Makefile.am | 3 +-
plugins/experimental/stream-editor/Makefile.am | 22 +
.../stream-editor/stream-editor.c++ | 831 +++++++++++++++++++
3 files changed, 855 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/trafficserver/blob/164d507a/plugins/experimental/Makefile.am
----------------------------------------------------------------------
diff --git a/plugins/experimental/Makefile.am b/plugins/experimental/Makefile.am
index 7fdd7c2..75d9b25 100644
--- a/plugins/experimental/Makefile.am
+++ b/plugins/experimental/Makefile.am
@@ -40,7 +40,8 @@ SUBDIRS = \
stale_while_revalidate \
url_sig \
xdebug \
- mp4
+ mp4 \
+ stream-editor
if HAS_MYSQL
SUBDIRS += mysql_remap
http://git-wip-us.apache.org/repos/asf/trafficserver/blob/164d507a/plugins/experimental/stream-editor/Makefile.am
----------------------------------------------------------------------
diff --git a/plugins/experimental/stream-editor/Makefile.am b/plugins/experimental/stream-editor/Makefile.am
new file mode 100644
index 0000000..040c525
--- /dev/null
+++ b/plugins/experimental/stream-editor/Makefile.am
@@ -0,0 +1,22 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+include $(top_srcdir)/build/plugins.mk
+
+pkglib_LTLIBRARIES = stream-editor.la
+epic_la_SOURCES = stream-editor.cc
+epic_la_LDFLAGS = $(TS_PLUGIN_LDFLAGS)
+
http://git-wip-us.apache.org/repos/asf/trafficserver/blob/164d507a/plugins/experimental/stream-editor/stream-editor.c++
----------------------------------------------------------------------
diff --git a/plugins/experimental/stream-editor/stream-editor.c++ b/plugins/experimental/stream-editor/stream-editor.c++
new file mode 100644
index 0000000..c6fb4a2
--- /dev/null
+++ b/plugins/experimental/stream-editor/stream-editor.c++
@@ -0,0 +1,831 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/* stream-editor: apply string and/or regexp search-and-replace to
+ * HTTP request and response bodies.
+ *
+ * Load from plugin.config, with one or more filenames as args.
+ * These are config files, and all config files are equal.
+ *
+ * Each line in a config file and conforming to config syntax specifies a
+ * rule for rewriting input or output.
+ *
+ * A line starting with [out] is an output rule.
+ * One starting with [in] is an input rule.
+ * Any other line is ignored, so blank lines and comments are fine.
+ *
+ * Each line must have a from: field and a to: field specifying what it
+ * rewrites from and to. Other fields are optional. The full list:
+ * from:flags:value
+ * to:value
+ * scope:flags:value
+ * prio:value
+ * len:value
+ *
+ * Fields are separated by whitespace. from: and to: fields may contain
+ * whitespace if they are quoted. Quoting may use any non-alphanumeric
+ * matched-pair delimiter, though the delimiter may not then appear
+ * (even escaped) within the value string.
+ *
+ * Flags are:
+ * i - case-independent matching
+ * r - regexp match
+ * u (applies only to scope) - apply scope match to full URI
+ * starting with "http://" (the default is to match the path
+ * only, as in for example a <Location> in HTTPD).
+ *
+ * A from: value is a string or a regexp, according to flags.
+ * A to: string is a replacement, and may reference regexp memory $1 - $9.
+ *
+ * A scope: value is likewise a string or (memory-less) regexp and
+ * determines the scope of URLs over which the rule applies.
+ *
+ * A prio: value is a single digit, and determines the priority of the
+ * rule. That is to say, two or more rules generate overlapping matches,
+ * the priority value will determine which rule prevails. A lower
+ * priority value prevails over a higher one.
+ *
+ * A len: value is an integer, and applies only to a regexp from:
+ * It should be an estimate of the largest match size expected from
+ * the from: pattern. It is used internally to determine the size of
+ * a continuity buffer, that avoids missing a match that spans more
+ * than one incoming data chunk arriving at the stream-editor filter.
+ * The default is 20.
+ *
+ * Performance tips:
+ * - A high len: value on any rule can severely impact on performance,
+ * especially if mixed with short matches that match frequently.
+ * - Specify high-precedence rules (low prio: values) first in your
+ * configuration to avoid reshuffling edits while processing data.
+ *
+ * Example: a trivial ruleset to escape HTML entities:
+ * [out] scope::/html-escape/ from::"&" to:"&"
+ * [out] scope::/html-escape/ from::< to:<
+ * [out] scope::/html-escape/ from::> to:>
+ * [out] scope::/html-escape/ from::/"/ to:/"/
+ * Note, the first & has to be quoted, as the two ampersands in the line
+ * would otherwise be mis-parsed as a matching pair of delimiters.
+ * Quoting the &, and the " line with //, are optional (and quoting
+ * is not applicable to the scope: field).
+ * The double-colons delimit flags, of which none are used in this example.
+ */
+#define MAX_CONFIG_LINE 1024
+#define MAX_RX_MATCH 10
+
+#define __STDC_LIMIT_MACROS
+#include <stdint.h>
+
+#include <vector>
+#include <set>
+#include <regex.h>
+#include <ctype.h>
+#include <assert.h>
+#include <string.h>
+#include <string>
+#include <stdio.h>
+#include "ts/ts.h"
+
+struct edit_t;
+typedef std::set<edit_t> editset_t;
+typedef editset_t::const_iterator edit_p;
+struct edit_t {
+ const size_t start;
+ const size_t bytes;
+ const std::string repl;
+ const int priority;
+ edit_t(size_t s, size_t b, const std::string& r, int p) :
+ start(s), bytes(b), repl(r), priority(p) {;}
+ bool operator!=(const edit_t& x) const {
+ return start != x.start
+ || bytes != x.bytes
+ || repl != x.repl
+ || priority != x.priority ;
+ }
+ bool operator<(const edit_t& x) const {
+ if ((start == x.start)
+ || (start < x.start && start+bytes > x.start)
+ || (x.start < start && x.start+x.bytes > start)) {
+
+ /* conflicting edits. Throw back to resolve conflict */
+ /* Problem: we get called from erase() within conflict resolution,
+ * and comparing to ourself then re-throws.
+ * Need to exclude that case.
+ */
+ if (*this != x) throw x;
+ }
+ return start < x.start;
+ }
+ bool saveto(editset_t &edits) const {
+ /* loop to try until inserted or we lose a conflict */
+ for(;;) {
+ try {
+ edits.insert(*this);
+ return true;
+ }
+ catch (const edit_t& conflicted) {
+ TSDebug("stream-editor",
+ "Conflicting edits [%ld-%ld] vs [%ld-%ld]",
+ start,start+bytes,
+ conflicted.start,conflicted.start+conflicted.bytes);
+ if (priority < conflicted.priority) {
+ /* we win conflict and oust our enemy */
+ edits.erase(conflicted);
+ }
+ else {
+ /* we lose the conflict - give up */
+ return false;
+ }
+ }
+ }
+ }
+};
+
+
+
+class scope_t {
+ virtual bool match(const char *) const = 0;
+ const bool uri;
+public:
+ bool in_scope(TSHttpTxn tx) const {
+ /* Get the URL from tx, and feed it to match() */
+ bool ret = false;
+ TSMBuffer bufp;
+ TSMLoc offset;
+ int length;
+ TSReturnCode rc = TSHttpTxnPristineUrlGet(tx, &bufp, &offset);
+ if (rc != TS_SUCCESS) {
+ TSError("Error getting URL of current Txn");
+ return ret;
+ }
+ char *url = TSUrlStringGet(bufp, offset, &length);
+
+ if (!strncasecmp(url, "https://", 8)) {
+ /* No use trying to edit https data */
+ ret = false;
+ }
+ else {
+ char *p = url;
+ if (uri) {
+ /* match against path component, discard earlier components */
+ if (!strncasecmp(url, "http://", 7)) {
+ p += 7;
+ while (*p != '/') {
+ assert(*p != '\0');
+ ++p;
+ }
+ }
+ }
+ ret = match(p);
+ }
+ TSfree(url);
+ TSHandleMLocRelease(bufp, TS_NULL_MLOC, offset);
+ //TSMBufferDestroy(bufp);
+ return ret;
+ }
+ scope_t(const bool u) : uri(u) {;}
+ virtual ~scope_t() {}
+};
+class rxscope : public scope_t {
+private:
+ regex_t rx;
+ virtual bool match(const char *str) const {
+ return (regexec(&rx, str, 0, NULL, 0) == 0) ? true : false;
+ }
+public:
+ rxscope(const bool u, const bool i, const char *pattern, int len)
+ : scope_t(u) {
+ int flags = REG_NOSUB | REG_EXTENDED | (i ? REG_ICASE : 0);
+ char *str = TSstrndup(pattern, len);
+ int error = regcomp(&rx, str, flags);
+ if (error) {
+ TSError("stream-editor: can't compile regexp [%s]", str);
+ TSfree(str);
+ throw;
+ }
+ TSfree(str);
+ }
+ virtual ~rxscope() {
+ regfree(&rx);
+ }
+};
+class strscope : public scope_t {
+private:
+ const bool icase;
+ char *str;
+ virtual bool match(const char *p) const {
+ return ((icase ? strncasecmp : strncmp)(str, p, strlen(str)) == 0)
+ ? true : false;
+ }
+public:
+ strscope(const bool u, const bool i, const char *pattern, int len)
+ : scope_t(u), icase(i) {
+ str = TSstrndup(pattern, len);
+ }
+ virtual ~strscope() {
+ if (str) TSfree(str);
+ }
+};
+
+class match_t {
+public:
+ virtual bool find(const char *, size_t, size_t&, size_t&,
+ const char *, std::string&) const = 0;
+ virtual size_t cont_size() const = 0;
+ virtual ~match_t() {}
+};
+class strmatch : public match_t {
+ const bool icase;
+ char *str;
+ const size_t slen;
+public:
+ virtual bool find(const char *buf, size_t len, size_t& found,
+ size_t& found_len, const char *to,
+ std::string& repl) const {
+ const char *match = icase ? strcasestr(buf, str) : strstr(buf, str);
+ if (match) {
+ found = match - buf;
+ found_len = slen;
+ repl = to;
+ return (found+slen > len) ? false : true;
+ }
+ else {
+ return false;
+ }
+ }
+ strmatch(const bool i, const char *pattern, int len) : icase(i), slen(len) {
+ str = TSstrndup(pattern, len);
+ }
+ virtual ~strmatch() {
+ if (str) TSfree(str);
+ }
+ virtual size_t cont_size() const {
+ return slen;
+ }
+};
+class rxmatch : public match_t {
+ size_t match_len;
+ regex_t rx;
+public:
+ virtual bool find(const char *buf, size_t len, size_t& found,
+ size_t& found_len, const char *tmpl,
+ std::string& repl) const {
+ regmatch_t pmatch[MAX_RX_MATCH];
+ if (regexec(&rx, buf, MAX_RX_MATCH, pmatch, REG_NOTEOL) == 0) {
+ char c;
+ int n;
+ found = pmatch[0].rm_so;
+ found_len = pmatch[0].rm_eo - found;
+ while (c = *tmpl++, c != '\0') {
+ switch (c) {
+ case '\\':
+ if (*tmpl != '\0') {
+ repl.push_back(*tmpl++);
+ }
+ break;
+ case '$':
+ if (isdigit(*tmpl)) {
+ n = *tmpl - '0';
+ }
+ else {
+ n = MAX_RX_MATCH;
+ }
+ if (n > 0 && n < MAX_RX_MATCH) {
+ repl.append(buf+pmatch[n].rm_so,
+ pmatch[n].rm_eo - pmatch[n].rm_so);
+ tmpl++; /* we've consumed one more character */
+ }
+ else {
+ repl.push_back(c);
+ }
+ break;
+ default:
+ repl.push_back(c);
+ break;
+ }
+ }
+ return true;
+ }
+ else {
+ return false;
+ }
+ }
+ virtual size_t cont_size() const {
+ return match_len;
+ }
+ rxmatch(bool i, const char* pattern, size_t sz, size_t match_max)
+ : match_len(match_max) {
+ char *str = TSstrndup(pattern, sz);
+ int flags = REG_EXTENDED | (i ? REG_ICASE : 0);
+ int error = regcomp(&rx, str, flags);
+ if (error) {
+ TSError("stream-editor: can't compile regexp [%s]", str);
+ TSfree(str);
+ throw;
+ }
+ TSfree(str);
+ }
+ virtual ~rxmatch() {
+ regfree(&rx);
+ }
+};
+
+#define PARSE_VERIFY(line,x,str) \
+ while (x) \
+ if (!isspace(*(x-1))) \
+ x = strcasestr(x+1, str); \
+ else \
+ break
+
+class rule_t {
+private:
+ scope_t *scope;
+ unsigned int priority;
+ match_t *from;
+ char *to;
+ size_t to_len;
+public:
+ rule_t(const char *line) : scope(NULL), priority(5), from(NULL), to(NULL) {
+ const char *scope_spec = strcasestr(line, "scope:");
+ const char *from_spec = strcasestr(line, "from:");
+ const char *to_spec = strcasestr(line, "to:");
+ const char *prio_spec = strcasestr(line, "prio:");
+ const char *len_spec = strcasestr(line, "len:");
+ bool icase = false;
+ bool rx = false;
+ bool uri;
+ size_t len, match_len;
+ char delim;
+
+ PARSE_VERIFY(line, scope_spec, "scope:");
+ PARSE_VERIFY(line, from_spec, "from:");
+ PARSE_VERIFY(line, to_spec, "to:");
+ PARSE_VERIFY(line, prio_spec, "prio:");
+ PARSE_VERIFY(line, len_spec, "len:");
+
+ if (!from_spec || !to_spec) {
+ throw "Incomplete stream edit spec";
+ }
+
+ if (len_spec) {
+ match_len = 0;
+ len_spec += 4;
+ while (isdigit(*len_spec)) {
+ match_len = 10*match_len + (*len_spec++ - '0');
+ }
+ }
+ else {
+ match_len = 20; // default
+ }
+
+ /* parse From: now, as failure could abort constructor */
+ for (from_spec += 5; *from_spec != ':'; ++from_spec) {
+ switch (*from_spec) {
+ case 'i': icase = true; break;
+ case 'r': rx = true; break;
+ }
+ }
+ delim = *++from_spec;
+ if (isalnum(delim)) {
+ len = strcspn(from_spec, " \t\r\n");
+ }
+ else {
+ const char *end = strchr(++from_spec, delim);
+ if (end) {
+ len = end - from_spec;
+ }
+ else {
+ /* it wasn't a delimiter after all */
+ len = strcspn(--from_spec, " \t\r\n");
+ }
+ }
+ if (rx) {
+ from = new rxmatch(icase, from_spec, len, match_len);
+ }
+ else {
+ from = new strmatch(icase, from_spec, len);
+ }
+
+ if (scope_spec) {
+ icase = false;
+ rx = false;
+ uri = true;
+ for (scope_spec += 6; *scope_spec != ':'; ++scope_spec) {
+ switch (*scope_spec) {
+ case 'i': icase = true; break;
+ case 'r': rx = true; break;
+ case 'u': uri = false; break;
+ }
+ }
+ ++scope_spec;
+ len = strcspn(scope_spec, " ");
+ if (rx) {
+ scope = new rxscope(uri, icase, scope_spec, len);
+ }
+ else {
+ scope = new strscope(uri, icase, scope_spec, len);
+ }
+ }
+
+ if (prio_spec) {
+ prio_spec += 5;
+ if (isdigit(*prio_spec)) {
+ priority = *prio_spec - '0';
+ }
+ }
+
+ to_spec += 3;
+ delim = *to_spec;
+ if (isalnum(delim)) {
+ to_len = strcspn(to_spec, " \t\r\n");
+ }
+ else {
+ const char *end = strchr(++to_spec, delim);
+ if (end) {
+ to_len = end - to_spec;
+ }
+ else {
+ /* it wasn't a delimiter after all */
+ to_len = strcspn(--to_spec, " \t\r\n");
+ }
+ }
+ to = TSstrndup(to_spec, to_len);
+ }
+ rule_t(const rule_t& r) : scope(r.scope), priority(r.priority),
+ from(r.from), to(r.to), to_len(r.to_len) { }
+/* FIXME - since rules get copied per-request, we can't delete these.
+ But we can leave these to leak 'cos they're only ever created
+ as a one-off at startup. Would be cleaner to refcount or to
+ use subclasses with and without destructor for original vs copy.
+ ~rule_t() {
+ if (scope) delete scope;
+ if (from) delete from;
+ if (to) TSfree(to);
+ }
+*/
+ bool in_scope(TSHttpTxn tx) const {
+ /* if no scope is specified then everything is in-scope */
+ return scope ? scope->in_scope(tx) : true;
+ }
+ size_t cont_size() const {
+ return from->cont_size();
+ }
+ void apply(const char *buf, size_t len, editset_t& edits) const {
+ /* find matches in the buf, and add match+replace to edits */
+
+ size_t found;
+ size_t found_len;
+ size_t offs = 0;
+ while (offs < len) {
+ std::string repl;
+ if (from->find(buf+offs, len-offs, found, found_len,
+ to, repl)) {
+ found += offs;
+ edit_t(found, found_len, repl, priority).saveto(edits);
+ offs = found + found_len;
+ }
+ else {
+ break;
+ }
+ }
+ }
+};
+typedef std::vector<rule_t> ruleset_t;
+typedef ruleset_t::const_iterator rule_p;
+
+typedef struct contdata_t {
+ TSCont cont;
+ TSIOBuffer out_buf;
+ TSIOBufferReader out_rd;
+ TSVIO out_vio;
+ ruleset_t rules;
+ std::string contbuf;
+ size_t contbuf_sz;
+ int64_t bytes_in;
+ int64_t bytes_out;
+/* Use new/delete so destructor does cleanup for us */
+ contdata_t() : cont(NULL), out_buf(NULL), out_rd(NULL), out_vio(NULL),
+ contbuf_sz(0), bytes_in(0), bytes_out(0) {}
+ ~contdata_t() {
+ if (out_rd) TSIOBufferReaderFree(out_rd);
+ if (out_buf) TSIOBufferDestroy(out_buf);
+ if (cont) TSContDestroy(cont);
+ }
+ void set_cont_size(size_t sz) {
+ if (contbuf_sz < 2*sz)
+ contbuf_sz = 2*sz - 1;
+ }
+} contdata_t;
+
+static int64_t process_block(contdata_t *contdata, TSIOBufferReader reader) {
+ int64_t nbytes, start;
+ size_t n = 0;
+ size_t buflen;
+ size_t keep;
+ const char *buf;
+ TSIOBufferBlock block;
+
+ if (reader == NULL) { // We're just flushing anything we have buffered
+ keep = 0;
+ buf = contdata->contbuf.c_str();
+ buflen = contdata->contbuf.length();
+ }
+ else {
+
+ block = TSIOBufferReaderStart(reader);
+ buf = TSIOBufferBlockReadStart(block, reader, &nbytes);
+
+ if (contdata->contbuf.empty()) {
+ /* Use the data as-is */
+ buflen = nbytes;
+ }
+ else {
+ contdata->contbuf.append(buf, nbytes);
+ buf = contdata->contbuf.c_str();
+ buflen = contdata->contbuf.length();
+ }
+ keep = contdata->contbuf_sz;
+ }
+ size_t bytes_read = 0;
+
+ editset_t edits;
+
+ for (rule_p r = contdata->rules.begin();
+ r != contdata->rules.end(); ++r) {
+ r->apply(buf, buflen, edits);
+ }
+
+ for (edit_p p = edits.begin(); p != edits.end(); ++p) {
+ /* Preserve continuity buffer */
+ if (p->start >= buflen - keep)
+ break;
+
+ /* pass through bytes before edit */
+ start = p->start - bytes_read;
+
+ while (start > 0) {
+ // FIXME: would this be quicker if we managed a TSIOBuffer
+ // so we could use TSIOBufferCopy ?
+ n = TSIOBufferWrite(contdata->out_buf, buf+bytes_read, start);
+ assert (n > 0); // FIXME - handle error
+ bytes_read += n;
+ contdata->bytes_out += n;
+ start -= n;
+ }
+
+ /* omit deleted bytes */
+ bytes_read += p->bytes;
+
+ /* insert replacement bytes */
+ n = TSIOBufferWrite(contdata->out_buf, p->repl.c_str(), p->repl.length());
+ assert(n == p->repl.length()); // FIXME (if this ever happens)!
+ contdata->bytes_out += n;
+
+ /* increment counts - done */
+ }
+ contdata->bytes_in += bytes_read;
+
+ /* data after the last edit */
+ if (bytes_read < buflen - keep) {
+ n = TSIOBufferWrite(contdata->out_buf, buf+bytes_read,
+ buflen - bytes_read - keep);
+ contdata->bytes_in += n;
+ contdata->bytes_out += n;
+ bytes_read += n;
+ }
+ /* reset buf to what we've not processed */
+ contdata->contbuf = buf+bytes_read;
+
+ return nbytes;
+}
+static void streamedit_process(TSCont contp)
+{
+ // Read the data available to us
+ // Concatenate with anything we have buffered
+ // Loop over rules, and apply them to build our edit set
+ // Loop over edits, and apply them to the stream
+ // Retain buffered data at the end
+ int64_t ntodo, nbytes;
+
+ contdata_t *contdata = (contdata_t*) TSContDataGet(contp);
+ TSVIO input_vio = TSVConnWriteVIOGet(contp);
+ TSIOBufferReader input_rd = TSVIOReaderGet(input_vio);
+
+ if (contdata->out_buf == NULL) {
+ contdata->out_buf = TSIOBufferCreate();
+ contdata->out_rd = TSIOBufferReaderAlloc(contdata->out_buf);
+ contdata->out_vio = TSVConnWrite(TSTransformOutputVConnGet(contp),
+ contp, contdata->out_rd, INT64_MAX);
+ }
+
+ TSIOBuffer in_buf = TSVIOBufferGet(input_vio);
+ /* Test for EOS */
+ if (in_buf == NULL) {
+ process_block(contdata, NULL); // flush any buffered data
+ TSVIONBytesSet(contdata->out_vio, contdata->bytes_out);
+ TSVIOReenable(contdata->out_vio);
+ return;
+ }
+
+ /* Test for EOS */
+ ntodo = TSVIONTodoGet(input_vio);
+ if (ntodo == 0) {
+ /* Call back the input VIO continuation to let it know that we
+ * have completed the write operation.
+ */
+ TSContCall(TSVIOContGet(input_vio), TS_EVENT_VCONN_WRITE_COMPLETE,
+ input_vio);
+ TSVIOReenable(contdata->out_vio);
+ return;
+ }
+
+ /* now parse & process buffered data. We can set some aside
+ * as a continuity buffer to deal with the problem of matches
+ * that span input chunks.
+ */
+ while (ntodo = TSIOBufferReaderAvail(input_rd), ntodo > 0) {
+ nbytes = process_block(contdata, input_rd);
+ TSIOBufferReaderConsume(input_rd, nbytes);
+ TSVIONDoneSet(input_vio, TSVIONDoneGet(input_vio) + nbytes);
+ }
+
+ ntodo = TSVIONTodoGet(input_vio);
+ if (ntodo == 0) {
+ /* Call back the input VIO continuation to let it know that we
+ * have completed the write operation.
+ */
+ TSContCall(TSVIOContGet(input_vio), TS_EVENT_VCONN_WRITE_COMPLETE, input_vio);
+ }
+ else {
+ /* Call back the input VIO continuation to let it know that we
+ * are ready for more data.
+ */
+ TSContCall(TSVIOContGet(input_vio), TS_EVENT_VCONN_WRITE_READY, input_vio);
+ }
+ TSVIOReenable(contdata->out_vio);
+}
+static int streamedit_filter(TSCont contp, TSEvent event, void *edata)
+{
+ /* Our main function that does the work.
+ * Called as a continuation for filtering.
+ * *** if necessary, add call at TXN_CLOSE for cleanup.
+ */
+ TSVIO input_vio;
+
+ if (TSVConnClosedGet(contp)) {
+ contdata_t *contdata = (contdata_t*) TSContDataGet(contp);
+ delete contdata;
+ return TS_SUCCESS;
+ }
+
+ switch (event) {
+ case TS_EVENT_ERROR:
+ input_vio = TSVConnWriteVIOGet(contp);
+ TSContCall(TSVIOContGet(input_vio), TS_EVENT_ERROR, input_vio);
+ break;
+ case TS_EVENT_VCONN_WRITE_COMPLETE:
+ TSVConnShutdown(TSTransformOutputVConnGet(contp), 0, 1);
+ break;
+ default:
+ streamedit_process(contp);
+ break;
+ }
+ return TS_SUCCESS;
+}
+
+static int streamedit_setup(TSCont contp, TSEvent event, void *edata)
+{
+ TSHttpTxn txn = (TSHttpTxn) edata;
+ ruleset_t *rules_in = (ruleset_t*) TSContDataGet(contp);
+ contdata_t *contdata = NULL;
+
+ assert((event == TS_EVENT_HTTP_READ_RESPONSE_HDR)
+ || (event == TS_EVENT_HTTP_READ_REQUEST_HDR));
+
+ /* make a new list comprising those rules that are in scope */
+ for (rule_p r = rules_in->begin(); r != rules_in->end(); ++r) {
+ if (r->in_scope(txn)) {
+ if (contdata == NULL) {
+ contdata = new contdata_t();
+ }
+ contdata->rules.push_back(*r);
+ contdata->set_cont_size(r->cont_size());
+ }
+ }
+
+ if (contdata == NULL) {
+ /* Nothing to do */
+ return TS_SUCCESS;
+ }
+
+ /* we have a job to do, so insert filter */
+ contdata->cont = TSTransformCreate(streamedit_filter, txn);
+ TSContDataSet(contdata->cont, contdata);
+
+ if (event == TS_EVENT_HTTP_READ_REQUEST_HDR) {
+ TSHttpTxnHookAdd(txn, TS_HTTP_REQUEST_TRANSFORM_HOOK, contdata->cont);
+ }
+ else {
+ TSHttpTxnHookAdd(txn, TS_HTTP_RESPONSE_TRANSFORM_HOOK, contdata->cont);
+ }
+
+ TSHttpTxnReenable(txn, TS_EVENT_HTTP_CONTINUE);
+
+ return TS_SUCCESS;
+}
+
+static void read_conf(const char *filename, ruleset_t *&in, ruleset_t *&out)
+{
+ char buf[MAX_CONFIG_LINE];
+ FILE *file = fopen(filename, "r");
+ if (file == NULL) {
+ TSError("[stream-editor] Failed to open %s", filename);
+ return;
+ }
+ while (fgets(buf, MAX_CONFIG_LINE, file) != NULL) {
+ try {
+ if (!strncasecmp(buf, "[in]", 4)) {
+ if (in == NULL) {
+ in = new ruleset_t();
+ }
+ in->push_back(rule_t(buf));
+ }
+ else if (!strncasecmp(buf, "[out]", 5)) {
+ if (out == NULL) {
+ out = new ruleset_t();
+ }
+ out->push_back(rule_t(buf));
+ }
+ }
+ catch(...) {
+ TSError("stream-editor: failed to parse rule %s", buf);
+ }
+ }
+ fclose(file);
+}
+
+extern "C" void TSPluginInit(int argc, const char *argv[])
+{
+ TSPluginRegistrationInfo info;
+ TSCont inputcont, outputcont;
+ ruleset_t *rewrites_in = NULL;
+ ruleset_t *rewrites_out = NULL;
+
+ info.plugin_name = (char *)"stream-editor";
+ info.vendor_name = (char *)"Apache Software Foundation";
+ info.support_email = (char *)"users@trafficserver.apache.org";
+
+ if (TSPluginRegister(TS_SDK_VERSION_3_0, &info) != TS_SUCCESS) {
+ TSError("[stream-editor] Plugin registration failed.");
+ return;
+ }
+
+ /* Allow different config files */
+ while (--argc) {
+ read_conf(*++argv, rewrites_in, rewrites_out);
+ }
+
+ if (rewrites_in != NULL) {
+ TSDebug("[stream-editor]", "initialising input filtering");
+ inputcont = TSContCreate(streamedit_setup, NULL);
+ if (inputcont == NULL) {
+ TSError("[stream-editor] failed to initialise input filtering!");
+ }
+ else {
+ TSContDataSet(inputcont, rewrites_in);
+ TSHttpHookAdd(TS_HTTP_READ_REQUEST_HDR_HOOK, inputcont);
+ }
+ }
+ else {
+ TSDebug("[stream-editor]", "no input filter rules, skipping filter");
+ }
+
+ if (rewrites_out != NULL) {
+ TSDebug("[stream-editor]", "initialising output filtering");
+ outputcont = TSContCreate(streamedit_setup, NULL);
+ if (outputcont == NULL) {
+ TSError("[stream-editor] failed to initialise output filtering!");
+ }
+ else {
+ TSContDataSet(outputcont, rewrites_out);
+ TSHttpHookAdd(TS_HTTP_READ_RESPONSE_HDR_HOOK, outputcont);
+ }
+ }
+ else {
+ TSDebug("[stream-editor]", "no output filter rules, skipping filter");
+ }
+}