You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@trafficserver.apache.org by so...@apache.org on 2012/12/01 22:56:28 UTC
[1/2] git commit: TS-1596 - Added channel_stats plugin to
experimental.
Updated Branches:
refs/heads/master c55e396bb -> 0c7c43dca
TS-1596 - Added channel_stats plugin to experimental.
Project: http://git-wip-us.apache.org/repos/asf/trafficserver/repo
Commit: http://git-wip-us.apache.org/repos/asf/trafficserver/commit/de35ad0a
Tree: http://git-wip-us.apache.org/repos/asf/trafficserver/tree/de35ad0a
Diff: http://git-wip-us.apache.org/repos/asf/trafficserver/diff/de35ad0a
Branch: refs/heads/master
Commit: de35ad0abdf579a52ea765eaceba898dcc1e0b73
Parents: c55e396
Author: Phil Sorber <so...@apache.org>
Authored: Sat Dec 1 16:13:40 2012 -0500
Committer: Phil Sorber <so...@apache.org>
Committed: Sat Dec 1 16:53:46 2012 -0500
----------------------------------------------------------------------
CHANGES | 2 +
configure.ac | 1 +
plugins/experimental/Makefile.am | 3 +-
plugins/experimental/channel_stats/Makefile.am | 18 +
plugins/experimental/channel_stats/Makefile.tsxs | 24 +
plugins/experimental/channel_stats/README | 90 ++
.../experimental/channel_stats/channel_stats.cc | 806 +++++++++++++++
plugins/experimental/channel_stats/debug_macros.h | 77 ++
8 files changed, 1020 insertions(+), 1 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/trafficserver/blob/de35ad0a/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index 0a8a6ca..2af471c 100644
--- a/CHANGES
+++ b/CHANGES
@@ -1,6 +1,8 @@
-*- coding: utf-8 -*-
Changes with Apache Traffic Server 3.3.1
+ *) [TS-1596] Added channel_stats plugin to experimental.
+
*) [TS-1607] decouple SSL certificate lookup
*) [TS-1506] %<cquuh> log symbol will crash TS when requesting a SSL url.
http://git-wip-us.apache.org/repos/asf/trafficserver/blob/de35ad0a/configure.ac
----------------------------------------------------------------------
diff --git a/configure.ac b/configure.ac
index 29dff32..a5455d5 100644
--- a/configure.ac
+++ b/configure.ac
@@ -1614,6 +1614,7 @@ AC_CONFIG_FILES([plugins/experimental/header_rewrite/Makefile])
AC_CONFIG_FILES([plugins/experimental/metalink/Makefile])
AC_CONFIG_FILES([plugins/experimental/gzip/Makefile])
AC_CONFIG_FILES([plugins/experimental/spdy/Makefile])
+AC_CONFIG_FILES([plugins/experimental/channel_stats/Makefile])
# various tools
AC_CONFIG_FILES([tools/Makefile])
# example plugins
http://git-wip-us.apache.org/repos/asf/trafficserver/blob/de35ad0a/plugins/experimental/Makefile.am
----------------------------------------------------------------------
diff --git a/plugins/experimental/Makefile.am b/plugins/experimental/Makefile.am
index 025ebce..fae28a7 100644
--- a/plugins/experimental/Makefile.am
+++ b/plugins/experimental/Makefile.am
@@ -24,5 +24,6 @@ SUBDIRS = \
header_rewrite \
metalink \
gzip \
- spdy
+ spdy \
+ channel_stats
endif
http://git-wip-us.apache.org/repos/asf/trafficserver/blob/de35ad0a/plugins/experimental/channel_stats/Makefile.am
----------------------------------------------------------------------
diff --git a/plugins/experimental/channel_stats/Makefile.am b/plugins/experimental/channel_stats/Makefile.am
new file mode 100644
index 0000000..e2e6872
--- /dev/null
+++ b/plugins/experimental/channel_stats/Makefile.am
@@ -0,0 +1,18 @@
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+AM_CPPFLAGS = -I$(top_builddir)/proxy/api -I$(top_srcdir)/proxy/api
+
+pkglibdir = ${pkglibexecdir}
+pkglib_LTLIBRARIES = channel_stats.la
+channel_stats_la_SOURCES = channel_stats.cc
+channel_stats_la_LDFLAGS = -module -avoid-version -shared
http://git-wip-us.apache.org/repos/asf/trafficserver/blob/de35ad0a/plugins/experimental/channel_stats/Makefile.tsxs
----------------------------------------------------------------------
diff --git a/plugins/experimental/channel_stats/Makefile.tsxs b/plugins/experimental/channel_stats/Makefile.tsxs
new file mode 100644
index 0000000..aa84569
--- /dev/null
+++ b/plugins/experimental/channel_stats/Makefile.tsxs
@@ -0,0 +1,24 @@
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+TSXS?=tsxs
+
+%.so: %.cc
+ $(TSXS) -C $< -o $@
+
+all: channel_stats.so
+
+install: all
+ $(TSXS) -i -o channel_stats.so
+
+clean:
+ rm -f *.lo *.so
http://git-wip-us.apache.org/repos/asf/trafficserver/blob/de35ad0a/plugins/experimental/channel_stats/README
----------------------------------------------------------------------
diff --git a/plugins/experimental/channel_stats/README b/plugins/experimental/channel_stats/README
new file mode 100644
index 0000000..873e151
--- /dev/null
+++ b/plugins/experimental/channel_stats/README
@@ -0,0 +1,90 @@
+channel_stats plugin for Apache Traffic Server 3.0.0+
+
+About channel_stats plugin
+==========================
+channel_stats plugin collect the runtime statistics (speed, request count and
+more in the future) for each channel. The stats is exposed with http interface
+in json. (The code of interface is from 'stats_over_http' plugin.)
+
+In general, the plugin could be used on reverse proxy with a fixed number of
+remap rules. It's not design for the proxy which serves unlimited channels (
+open-relay forward proxy).
+
+
+Installation
+==========================
+ make -f Makefile.tsxs
+ sudo make install -f Makefile.tsxs
+(if 'tsxs' is not in your PATH, run make by appending TSXS=/path/to/ts/bin/tsxs)
+
+Add 'channel_stats.so' to plugins.config. By default, the path of http interface
+is '_cstats'. For safety, you should change it by adding a parameter after
+'channel_stats.so'. Example: 'channel_stats.so _my_cstats'.
+
+Restart Traffic Server: sudo traffic_line -L or sudo trafficserver restart
+
+
+Viewing Stats
+==========================
+Visit http://local_IP:port/_cstats (or as you configured in plugin.config)
+Output will be in json:
+
+{
+ "channel": {
+ "www.example.com": {
+ "response.bytes.content": "3486995502046",
+ "response.count.2xx.get": "64040675",
+ "speed.ua.bytes_per_sec_64k": "3972287"
+ },
+ "www.test.com": {
+ "response.bytes.content": "3349404916760",
+ "response.count.2xx.get": "64038172",
+ "speed.ua.bytes_per_sec_64k": "3989255"
+ }
+ },
+ "global": {
+ "response.count.2xx.get": "268516715",
+ "server": "3.0.4"
+ }
+}
+
+Available stats:
+ - response.bytes.content: transferred content length (not including header)
+ - response.count.2xx.get: transaction count
+ - speed.ua.bytes_per_sec_64k: count of transaction whose speed is < 64KBps
+
+Additional parameters:
+ - topn: only output top N channels order by response count
+ - channel: only output the channels which contain specific string
+ - global: also display TS internal stats as 'stats_over_http' plugin
+ Example:
+ - http://127.0.0.1/_cstats?global
+ - http://127.0.0.1/_cstats?topn=5
+ - http://127.0.0.1/_cstats?channel=test.com
+ - http://127.0.0.1/_cstats?channel=test.com&topn=5&global
+If you have a large number of channels (e.g. more than 10k), those parameters
+may not be heavily used due to extra overhead.
+
+
+Warning
+==========================
+Security
+ - As mentioned above, you should change default path of http interface to make
+ other people harder to access your channel stats.
+ - For IPv4, plugin will make sure visitor is from private network.
+ (http://en.wikipedia.org/wiki/Private_network#Private_IPv4_address_spaces)
+ In addition, you should deny the request to http interface on your L7 front-
+ end because your L7 server and TS are probably in a same local network. And
+ currently plugin doesn't check IPv6 address. (should disable IPv6 by default?)
+ - The number of channels is limited to 100000.
+
+
+DEV
+==========================
+
+
+ChangeLog
+==========================
+
+Version 0.1 (11/26/12)
+ - Initial release
http://git-wip-us.apache.org/repos/asf/trafficserver/blob/de35ad0a/plugins/experimental/channel_stats/channel_stats.cc
----------------------------------------------------------------------
diff --git a/plugins/experimental/channel_stats/channel_stats.cc b/plugins/experimental/channel_stats/channel_stats.cc
new file mode 100644
index 0000000..7cca4e7
--- /dev/null
+++ b/plugins/experimental/channel_stats/channel_stats.cc
@@ -0,0 +1,806 @@
+/*
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+
+// get INT64_MAX
+#define __STDC_LIMIT_MACROS
+
+#include <cstdio>
+#include <cstring>
+#include <cctype>
+#include <string>
+#include <map> // may optimize by using hash_map, but mind compiler portability
+#include <vector>
+#include <algorithm>
+#include <sstream>
+#include <arpa/inet.h>
+
+#include <ts/ts.h>
+#if (TS_VERSION_NUMBER < 3003001)
+// get TSHttpTxnStartTimeGet
+#include <ts/experimental.h>
+#endif
+
+#include "debug_macros.h"
+
+#define PLUGIN_NAME "channel_stats"
+#define PLUGIN_VERSION "0.1"
+
+#define MAX_SPEED 999999999
+
+/* limit the number of channels (items) to avoid potential attack,
+ regex_map rule can also generate infinite channels (hosts) */
+#define MAX_MAP_SIZE 100000
+
+static std::string api_path("_cstats");
+
+// global stats
+static uint64_t global_response_count_2xx_get = 0; // 2XX GET response count
+static uint64_t global_response_bytes_content = 0; // transferred bytes (2xx.get)
+
+// channel stats
+struct channel_stat {
+ channel_stat()
+ : response_bytes_content(0),
+ response_count_2xx(0),
+ speed_ua_bytes_per_sec_64k(0) {
+ }
+
+ inline void increment(uint64_t rbc, uint64_t rc2, uint64_t sbps6) {
+ __sync_fetch_and_add(&response_bytes_content, rbc);
+ if (rc2) __sync_fetch_and_add(&response_count_2xx, rc2);
+ if (sbps6) __sync_fetch_and_add(&speed_ua_bytes_per_sec_64k, sbps6);
+ }
+
+ inline void debug_channel() {
+ debug("response.bytes.content: %" PRIu64 "", response_bytes_content);
+ debug("response.count.2xx: %" PRIu64 "", response_count_2xx);
+ debug("speed.ua.bytes_per_sec_64k: %" PRIu64 "", speed_ua_bytes_per_sec_64k);
+ }
+
+ uint64_t response_bytes_content;
+ uint64_t response_count_2xx;
+ uint64_t speed_ua_bytes_per_sec_64k;
+};
+
+typedef std::map<std::string, channel_stat *> stats_map_type;
+typedef stats_map_type::iterator smap_iterator;
+
+static stats_map_type channel_stats;
+static TSMutex stats_map_mutex;
+
+// api Intercept Data
+typedef struct intercept_state_t
+{
+ TSVConn net_vc;
+ TSVIO read_vio;
+ TSVIO write_vio;
+
+ TSIOBuffer req_buffer;
+ TSIOBuffer resp_buffer;
+ TSIOBufferReader resp_reader;
+
+ int output_bytes;
+ int body_written;
+
+ int show_global; // default 0
+ char * channel; // default ""
+ int topn; // default -1
+ int deny; // default 0
+} intercept_state;
+
+struct private_seg_t {
+ struct in_addr net;
+ struct in_addr mask;
+};
+
+// don't put inet_addr("255.255.255.255"), see BUGS in 'man 3 inet_addr'
+static struct private_seg_t private_segs[] = {
+ {{inet_addr("10.0.0.0")}, {inet_addr("255.0.0.0")}},
+ {{inet_addr("127.0.0.0")}, {inet_addr("255.0.0.0")}},
+ {{inet_addr("172.16.0.0")}, {inet_addr("255.240.0.0")}},
+ {{inet_addr("192.168.0.0")}, {inet_addr("255.255.0.0")}}
+};
+static int num_private_segs = sizeof(private_segs) / sizeof(private_seg_t);
+
+// all parameters are in network byte order
+static int
+is_in_net (const struct in_addr * addr,
+ const struct in_addr * netaddr,
+ const struct in_addr * netmask)
+{
+ if ((addr->s_addr & netmask->s_addr) == (netaddr->s_addr & netmask->s_addr))
+ return 1;
+ return 0;
+}
+
+static int
+is_private_ip(const struct in_addr * addr)
+{
+ int i;
+ for (i = 0; i < num_private_segs; i++) {
+ if (is_in_net(addr, &private_segs[i].net, &private_segs[i].mask))
+ return 1;
+ }
+ return 0;
+}
+
+static int handle_event(TSCont contp, TSEvent event, void *edata);
+static int api_handle_event(TSCont contp, TSEvent event, void *edata);
+
+/*
+ Get the value of parameter in url querystring
+ Return 0 and a null string if not find the parameter.
+ Return 1 and a value string, normally
+ Return 2 and a max_length value string if the length of the value exceeds.
+
+ Possible appearance: ?param=value&fake_param=value¶m=value
+*/
+static int
+get_query_param(const char *query, const char *param,
+ char *result, int max_length)
+{
+ char *pos = 0;
+
+ pos = strstr(query, param); // try to find in querystring of url
+ if (pos != query) {
+ // if param is not prefix of querystring
+ while (pos && *(pos - 1) != '&') { // param must be after '&'
+ pos = strstr(pos + strlen(param), param); // try next
+ }
+ }
+
+ if (!pos) {
+ // set it null string if not found
+ result[0] = '\0';
+ return 0;
+ }
+
+ pos += strlen(param); // skip 'param='
+
+ // copy value of param
+ int now = 0;
+ while (*pos != '\0' && *pos != '&' && now < max_length) {
+ result[now++] = *pos;
+ pos++;
+ }
+ result[now] = '\0'; // make sure null-terminated
+
+ if (*pos != '\0' && *pos != '&' && now == max_length)
+ return 2;
+ else
+ return 1;
+}
+
+/*
+ check if exist param in query string
+
+ Possible querystring: ?param1=value1¶m2
+ (param2 is a param which "has_no_value")
+*/
+static int
+has_query_param(const char *query, const char *param, int has_no_value)
+{
+ char *pos = 0;
+
+ pos = strstr(query, param); // try to find in querystring of url
+ if (pos != query) {
+ // if param is not prefix of querystring
+ while (pos && *(pos - 1) != '&') { // param must be after '&'
+ pos = strstr(pos + strlen(param), param); // try next
+ }
+ }
+
+ if (!pos)
+ return 0;
+
+ pos += strlen(param); // skip 'param='
+
+ if (has_no_value) {
+ if (*pos == '\0' || *pos == '&') return 1;
+ } else {
+ if (*pos == '=') return 1;
+ }
+
+ return 0;
+}
+
+static void
+get_api_params(TSMBuffer bufp,
+ TSMLoc url_loc,
+ int * show_global,
+ char ** channel,
+ int * topn)
+{
+ const char * query; // not null-terminated, get from TS api
+ char * tmp_query = NULL; // null-terminated
+ int query_len = 0;
+
+ *show_global = 0;
+ *topn = -1;
+
+ query = TSUrlHttpQueryGet(bufp, url_loc, &query_len);
+ if (query_len == 0)
+ return;
+ tmp_query = TSstrndup(query, query_len);
+ debug_api("querystring: %s", tmp_query);
+
+ if (has_query_param(tmp_query, "global", 1)) {
+ debug_api("found 'global' param");
+ *show_global = 1;
+ }
+
+ *channel = (char *) TSmalloc(query_len);
+ if (get_query_param(tmp_query, "channel=", *channel, query_len)) {
+ debug_api("found 'channel' param: %s", *channel);
+ }
+
+ std::stringstream ss;
+ char * tmp_topn = (char *) TSmalloc(query_len);
+ if (get_query_param(tmp_query, "topn=", tmp_topn, 10)) {
+ if (strlen(tmp_topn) > 0) {
+ ss.str(tmp_topn);
+ ss >> *topn;
+ }
+ debug_api("found 'topn' param: %d", *topn);
+ }
+
+ TSfree(tmp_query);
+ TSfree(tmp_topn);
+}
+
+static void
+handle_read_req(TSCont contp, TSHttpTxn txnp)
+{
+ TSMBuffer bufp;
+ TSMLoc hdr_loc = NULL;
+ TSMLoc url_loc = NULL;
+ const char *method;
+ int method_length = 0;
+ TSCont txn_contp;
+
+ const char * path;
+ int path_len;
+ struct sockaddr * client_addr;
+ struct sockaddr_in * client_addr4;
+ TSCont api_contp;
+ char * client_ip;
+ intercept_state *api_state;
+
+ if (TSHttpTxnClientReqGet(txnp, &bufp, &hdr_loc) != TS_SUCCESS) {
+ error("couldn't retrieve client's request");
+ goto cleanup;
+ }
+
+ method = TSHttpHdrMethodGet(bufp, hdr_loc, &method_length);
+ if (0 != strncmp(method, TS_HTTP_METHOD_GET, method_length)) {
+ debug("do not count %.*s method", method_length, method);
+ goto cleanup;
+ }
+
+ if (TSHttpHdrUrlGet(bufp, hdr_loc, &url_loc) != TS_SUCCESS)
+ goto cleanup;
+
+ path = TSUrlPathGet(bufp, url_loc, &path_len);
+ if (path_len == 0 || (unsigned)path_len != api_path.length() ||
+ strncmp(api_path.c_str(), path, path_len) != 0) {
+ goto not_api;
+ }
+
+ // register our intercept
+ debug_api("Intercepting request");
+ api_state = (intercept_state *) TSmalloc(sizeof(*api_state));
+ memset(api_state, 0, sizeof(*api_state));
+ get_api_params(bufp, url_loc,
+ &api_state->show_global, &api_state->channel,
+ &api_state->topn);
+
+ // check private ip
+ client_addr = (struct sockaddr *) TSHttpTxnClientAddrGet(txnp);
+ if (client_addr->sa_family == AF_INET) {
+ client_addr4 = (struct sockaddr_in *) client_addr;
+ if (!is_private_ip(&client_addr4->sin_addr)) {
+ client_ip = (char *) TSmalloc(INET_ADDRSTRLEN);
+ inet_ntop(AF_INET, &client_addr4->sin_addr, client_ip, INET_ADDRSTRLEN);
+ debug_api("%s is not a private IP, request denied", client_ip);
+ api_state->deny = 1;
+ TSfree(client_ip);
+ }
+ } else {
+ debug_api("not IPv4, ignore IP auth"); // TODO check AF_INET6 private IP?
+ }
+
+ TSSkipRemappingSet(txnp, 1); //not strictly necessary
+
+ api_contp = TSContCreate(api_handle_event, TSMutexCreate());
+ TSContDataSet(api_contp, api_state);
+ TSHttpTxnIntercept(api_contp, txnp);
+
+ goto cleanup;
+
+not_api:
+
+ txn_contp = TSContCreate(handle_event, NULL); // reuse global hander
+ TSHttpTxnHookAdd(txnp, TS_HTTP_TXN_CLOSE_HOOK, txn_contp);
+
+cleanup:
+ if (url_loc) TSHandleMLocRelease(bufp, hdr_loc, url_loc);
+ if (hdr_loc) TSHandleMLocRelease(bufp, TS_NULL_MLOC, hdr_loc);
+}
+
+static void
+handle_txn_close(TSCont contp, TSHttpTxn txnp)
+{
+ TSMBuffer bufp;
+ TSMLoc hdr_loc;
+ TSHttpStatus status;
+ TSMLoc purl_loc;
+ const char * pristine_host;
+ int pristine_host_len = 0;
+ int pristine_port;
+ uint64_t user_speed;
+ uint64_t body_bytes;
+ TSHRTime start_time = 0;
+ TSHRTime end_time = 0;
+ TSHRTime interval_time = 0;
+ smap_iterator stat_it;
+ channel_stat *stat;
+ std::pair<smap_iterator, bool> insert_ret;
+ std::string host;
+ std::stringstream ss;
+
+ if (TSHttpTxnClientRespGet(txnp, &bufp, &hdr_loc) != TS_SUCCESS) {
+ debug("couldn't retrieve final response");
+ return;
+ }
+
+ status = TSHttpHdrStatusGet(bufp, hdr_loc);
+ if (status != TS_HTTP_STATUS_OK && status != TS_HTTP_STATUS_PARTIAL_CONTENT) {
+ debug("only count 200/206 response");
+ goto cleanup;
+ }
+
+ if (TSHttpTxnPristineUrlGet(txnp, &bufp, &purl_loc) != TS_SUCCESS) {
+ debug("couldn't retrieve pristine url");
+ goto cleanup;
+ }
+
+ pristine_host = TSUrlHostGet(bufp, purl_loc, &pristine_host_len);
+ if (pristine_host_len == 0) {
+ debug("couldn't retrieve pristine host");
+ goto cleanup;
+ }
+ pristine_port = TSUrlPortGet(bufp, purl_loc);
+ host = std::string(pristine_host, pristine_host_len);
+ if (pristine_port != 80) {
+ ss << pristine_port;
+ host += ":" + ss.str();
+ }
+
+ body_bytes = TSHttpTxnClientRespBodyBytesGet(txnp);
+ __sync_fetch_and_add(&global_response_count_2xx_get, 1);
+ __sync_fetch_and_add(&global_response_bytes_content, body_bytes);
+
+ debug("pristine host: %.*s", pristine_host_len, pristine_host);
+ debug("pristine port: %d", pristine_port);
+ debug("host to lookup: %s", host.c_str());
+ debug("body bytes: %" PRIu64 "", body_bytes);
+ debug("2xx req count: %" PRIu64 "", global_response_count_2xx_get);
+
+#if (TS_VERSION_NUMBER < 3003001)
+ TSHttpTxnStartTimeGet(txnp, &start_time);
+ TSHttpTxnEndTimeGet(txnp, &end_time);
+#else
+ TSHttpTxnMilestoneGet(txnp, TS_MILESTONE_UA_BEGIN, &start_time);
+ TSHttpTxnMilestoneGet(txnp, TS_MILESTONE_UA_CLOSE, &end_time);
+#endif
+
+ if (start_time != 0 && end_time != 0 && end_time >= start_time) {
+ interval_time = end_time - start_time;
+ } else {
+ warning("not valid time, start: %" PRId64", end: %" PRId64"", start_time, end_time);
+ goto cleanup;
+ }
+
+ if (interval_time == 0 || body_bytes == 0)
+ user_speed = MAX_SPEED;
+ else
+ user_speed = (int)((float)body_bytes / interval_time * HRTIME_SECOND);
+
+ debug("start time: %" PRId64 "", start_time);
+ debug("end time: %" PRId64 "", end_time);
+ debug("interval time: %" PRId64 "", interval_time);
+ debug("interval seconds: %.5f", interval_time / (float)HRTIME_SECOND);
+ debug("speed bytes per second: %" PRIu64 "", user_speed);
+
+ /*
+ // test large number of channels
+ if (channel_stats.size() < MAX_MAP_SIZE)
+ ss << channel_stats.size() + 1;
+ else
+ ss << (rand() % MAX_MAP_SIZE + 1);
+ host = host + "--" + ss.str();
+ debug("%s", host.c_str());
+ */
+
+ stat_it = channel_stats.find(host);
+ if (stat_it == channel_stats.end()) {
+ if (channel_stats.size() >= MAX_MAP_SIZE) {
+ warning("channel_stats map exceeds max size");
+ goto cleanup;
+ }
+ stat = new channel_stat();
+ TSMutexLock(stats_map_mutex);
+ insert_ret = channel_stats.insert(std::make_pair(host, stat));
+ TSMutexUnlock(stats_map_mutex);
+ if (insert_ret.second == false) {
+ warning("stat of this channel already existed");
+ delete stat;
+ stat = insert_ret.first->second;
+ } else {
+ debug("********** new channel(%zu) **********", channel_stats.size());
+ }
+ } else { // found
+ stat = stat_it->second;
+ }
+
+ stat->increment(body_bytes, 1, user_speed < 64000 ? 1 : 0);
+ stat->debug_channel();
+
+cleanup:
+ TSHandleMLocRelease(bufp, TS_NULL_MLOC, hdr_loc);
+}
+
+static int
+handle_event(TSCont contp, TSEvent event, void *edata) {
+ TSHttpTxn txnp = (TSHttpTxn) edata;
+
+ switch (event) {
+ case TS_EVENT_HTTP_READ_REQUEST_HDR: // for global contp
+ debug("---------- new request ----------");
+ handle_read_req(contp, txnp);
+ break;
+ case TS_EVENT_HTTP_TXN_CLOSE: // for txn contp
+ handle_txn_close(contp, txnp);
+ TSContDestroy(contp);
+ break;
+ default:
+ error("unknown event for this plugin");
+ }
+
+ TSHttpTxnReenable(txnp, TS_EVENT_HTTP_CONTINUE);
+
+ return 0;
+}
+
+// below is api part
+
+static void
+stats_cleanup(TSCont contp, intercept_state * api_state)
+{
+ if (api_state->req_buffer) {
+ TSIOBufferDestroy(api_state->req_buffer);
+ api_state->req_buffer = NULL;
+ }
+
+ if (api_state->resp_buffer) {
+ TSIOBufferDestroy(api_state->resp_buffer);
+ api_state->resp_buffer = NULL;
+ }
+
+ TSfree(api_state->channel);
+ TSVConnClose(api_state->net_vc);
+ TSfree(api_state);
+ TSContDestroy(contp);
+}
+
+static void
+stats_process_accept(TSCont contp, intercept_state * api_state)
+{
+
+ api_state->req_buffer = TSIOBufferCreate();
+ api_state->resp_buffer = TSIOBufferCreate();
+ api_state->resp_reader = TSIOBufferReaderAlloc(api_state->resp_buffer);
+ api_state->read_vio = TSVConnRead(api_state->net_vc, contp, api_state->req_buffer, INT64_MAX);
+}
+
+static int
+stats_add_data_to_resp_buffer(const char *s, intercept_state * api_state)
+{
+ int s_len = strlen(s);
+
+ TSIOBufferWrite(api_state->resp_buffer, s, s_len);
+
+ return s_len;
+}
+
+static const char RESP_HEADER[] =
+ "HTTP/1.0 200 Ok\r\nContent-Type: application/json\r\nCache-Control: no-cache\r\n\r\n";
+
+static int
+stats_add_resp_header(intercept_state * api_state)
+{
+ return stats_add_data_to_resp_buffer(RESP_HEADER, api_state);
+}
+
+static void
+stats_process_read(TSCont contp, TSEvent event, intercept_state * api_state)
+{
+ debug_api("stats_process_read(%d)", event);
+ if (event == TS_EVENT_VCONN_READ_READY) {
+ api_state->output_bytes = stats_add_resp_header(api_state);
+ TSVConnShutdown(api_state->net_vc, 1, 0);
+ api_state->write_vio = TSVConnWrite(api_state->net_vc, contp, api_state->resp_reader, INT64_MAX);
+ } else if (event == TS_EVENT_ERROR) {
+ error_api("stats_process_read: Received TS_EVENT_ERROR\n");
+ } else if (event == TS_EVENT_VCONN_EOS) {
+ // client may end the connection, simply return
+ return;
+ } else if (event == TS_EVENT_NET_ACCEPT_FAILED) {
+ error_api("stats_process_read: Received TS_EVENT_NET_ACCEPT_FAILED\n");
+ } else {
+ error_api("Unexpected Event %d\n", event);
+ // TSReleaseAssert(!"Unexpected Event");
+ }
+}
+
+#define APPEND(a) api_state->output_bytes += stats_add_data_to_resp_buffer(a, api_state)
+#define APPEND_STAT(a, fmt, v) do { \
+ char b[256]; \
+ if(snprintf(b, sizeof(b), "\"%s\": \"" fmt "\",\n", a, v) < (signed)sizeof(b)) \
+ APPEND(b); \
+} while(0)
+#define APPEND_END_STAT(a, fmt, v) do { \
+ char b[256]; \
+ if(snprintf(b, sizeof(b), "\"%s\": \"" fmt "\"\n", a, v) < (signed)sizeof(b)) \
+ APPEND(b); \
+} while(0)
+#define APPEND_DICT_NAME(a) do { \
+ char b[256]; \
+ if(snprintf(b, sizeof(b), "\"%s\": {\n", a) < (signed)sizeof(b)) \
+ APPEND(b); \
+} while(0)
+
+static void
+json_out_stat(TSRecordType rec_type, void *edata, int registered,
+ const char *name, TSRecordDataType data_type,
+ TSRecordData *datum) {
+ intercept_state *api_state = (intercept_state *) edata;
+
+ switch(data_type) {
+ case TS_RECORDDATATYPE_COUNTER:
+ APPEND_STAT(name, "%" PRId64, datum->rec_counter); break;
+ case TS_RECORDDATATYPE_INT:
+ APPEND_STAT(name, "%" PRIu64, datum->rec_int); break;
+ case TS_RECORDDATATYPE_FLOAT:
+ APPEND_STAT(name, "%f", datum->rec_float); break;
+ case TS_RECORDDATATYPE_STRING:
+ APPEND_STAT(name, "%s", datum->rec_string); break;
+ default:
+ debug_api("unkown type for %s: %d", name, data_type);
+ break;
+ }
+}
+
+template<class T>
+struct compare
+: std::binary_function<T,T,bool>
+{
+ inline bool operator()(const T& lhs, const T& rhs) {
+ return lhs.second->response_count_2xx > rhs.second->response_count_2xx;
+ }
+};
+
+static void
+append_channel_stat(intercept_state * api_state,
+ const std::string channel, channel_stat * cs,
+ int is_last)
+{
+ APPEND_DICT_NAME(channel.c_str());
+ APPEND_STAT("response.bytes.content", "%" PRIu64, cs->response_bytes_content);
+ APPEND_STAT("response.count.2xx.get", "%" PRIu64, cs->response_count_2xx);
+ APPEND_END_STAT("speed.ua.bytes_per_sec_64k", "%" PRIu64, cs->speed_ua_bytes_per_sec_64k);
+ if (is_last)
+ APPEND("}\n");
+ else
+ APPEND("},\n");
+}
+
+static void
+json_out_channel_stats(intercept_state * api_state) {
+ if (channel_stats.empty())
+ return;
+
+ typedef std::pair<std::string, channel_stat *> data_pair;
+ typedef std::vector<data_pair> stats_vec_t;
+ smap_iterator it;
+
+ debug("appending channel stats");
+
+ if (api_state->topn > -1 ||
+ (api_state->channel && strlen(api_state->channel) > 0)) {
+ // will use vector to output
+
+ if (api_state->topn == 0)
+ return;
+
+ stats_vec_t stats_vec; // a tmp vector to sort or filter
+ if (strlen(api_state->channel) > 0) {
+ // filter by channel
+ size_t found;
+ for (it=channel_stats.begin(); it != channel_stats.end(); it++) {
+ found = it->first.find(api_state->channel);
+ if (found != std::string::npos)
+ stats_vec.push_back(*it);
+ }
+ } else {
+ for (it=channel_stats.begin(); it != channel_stats.end(); it++)
+ stats_vec.push_back(*it);
+ /* stats_vec.assign is not safe when map is being inserted concurrently */
+ }
+
+ if (stats_vec.empty())
+ return;
+
+ stats_vec_t::size_type out_st = stats_vec.size();
+ if (api_state->topn > 0) { // need sort and limit output size
+ if ((unsigned)api_state->topn < stats_vec.size())
+ out_st = (unsigned)api_state->topn;
+ else
+ api_state->topn = stats_vec.size();
+ std::partial_sort(stats_vec.begin(), stats_vec.begin() + api_state->topn,
+ stats_vec.end(), compare<data_pair>());
+ } // else will output whole vector without sort
+
+ stats_vec_t::size_type i;
+ for (i = 0; i < out_st - 1; i++) {
+ append_channel_stat(api_state, stats_vec[i].first, stats_vec[i].second, 0);
+ }
+ append_channel_stat(api_state, stats_vec[i].first, stats_vec[i].second, 1);
+
+ } else {
+ smap_iterator last_it = channel_stats.end();
+ last_it--;
+ for (it = channel_stats.begin(); it != last_it; it++) {
+ append_channel_stat(api_state, it->first, it->second, 0);
+ }
+ append_channel_stat(api_state, it->first, it->second, 1);
+ }
+}
+
+static void
+json_out_stats(intercept_state * api_state)
+{
+ const char *version;
+
+ APPEND("{ \"channel\": {\n");
+ json_out_channel_stats(api_state);
+ APPEND(" },\n");
+
+ APPEND(" \"global\": {\n");
+ APPEND_STAT("response.count.2xx.get", "%" PRIu64, global_response_count_2xx_get);
+ APPEND_STAT("response.bytes.content", "%" PRIu64, global_response_bytes_content);
+ APPEND_STAT("channel.count", "%zu", channel_stats.size());
+
+ if (api_state->show_global)
+ TSRecordDump(TS_RECORDTYPE_PROCESS, json_out_stat, api_state); // internal stats
+
+ version = TSTrafficServerVersionGet();
+ APPEND("\"server\": \"");
+ APPEND(version);
+ APPEND("\"\n");
+
+ APPEND(" }\n}\n");
+}
+
+static void
+stats_process_write(TSCont contp, TSEvent event, intercept_state * api_state)
+{
+ if (event == TS_EVENT_VCONN_WRITE_READY) {
+ if (api_state->body_written == 0) {
+ debug_api("plugin adding response body");
+ api_state->body_written = 1;
+ if (!api_state->deny)
+ json_out_stats(api_state);
+ else
+ APPEND("forbidden");
+ TSVIONBytesSet(api_state->write_vio, api_state->output_bytes);
+ }
+ TSVIOReenable(api_state->write_vio);
+ } else if (TS_EVENT_VCONN_WRITE_COMPLETE) {
+ stats_cleanup(contp, api_state);
+ } else if (event == TS_EVENT_ERROR) {
+ error_api("stats_process_write: Received TS_EVENT_ERROR\n");
+ } else {
+ error_api("Unexpected Event %d\n", event);
+ // TSReleaseAssert(!"Unexpected Event");
+ }
+}
+
+static int
+api_handle_event(TSCont contp, TSEvent event, void *edata)
+{
+ intercept_state *api_state = (intercept_state *) TSContDataGet(contp);
+ if (event == TS_EVENT_NET_ACCEPT) {
+ api_state->net_vc = (TSVConn) edata;
+ stats_process_accept(contp, api_state);
+ } else if (edata == api_state->read_vio) {
+ stats_process_read(contp, event, api_state);
+ } else if (edata == api_state->write_vio) {
+ stats_process_write(contp, event, api_state);
+ } else {
+ error_api("Unexpected Event %d\n", event);
+ // TSReleaseAssert(!"Unexpected Event");
+ }
+ return 0;
+}
+
+// initial part
+
+static int
+check_ts_version()
+{
+ const char *ts_version = TSTrafficServerVersionGet();
+ int result = 0;
+
+ if (ts_version) {
+ int major_ts_version = 0;
+ int minor_ts_version = 0;
+ int patch_ts_version = 0;
+
+ if (sscanf(ts_version, "%d.%d.%d", &major_ts_version, &minor_ts_version,
+ &patch_ts_version) != 3) {
+ return 0;
+ }
+
+ // Need at least TS 3.0.0
+ if (major_ts_version >= 3) {
+ result = 1;
+ }
+ }
+
+ return result;
+}
+
+void
+TSPluginInit(int argc, const char *argv[])
+{
+ if (argc > 2) {
+ fatal("plugin does not accept more than 1 argument");
+ } else if (argc == 2) {
+ api_path = std::string(argv[1]);
+ debug_api("stats api path: %s", api_path.c_str());
+ }
+
+ TSPluginRegistrationInfo info;
+
+ info.plugin_name = (char *)PLUGIN_NAME;
+ info.vendor_name = (char *)"wkl";
+ info.support_email = (char *)"conanmind@gmail.com";
+
+ if (TSPluginRegister(TS_SDK_VERSION_3_0, &info) != TS_SUCCESS) {
+ fatal("plugin registration failed.");
+ }
+
+ if (!check_ts_version()) {
+ fatal("plugin requires Traffic Server 3.0.0 or later");
+ }
+
+ info("%s(%s) plugin starting...", PLUGIN_NAME, PLUGIN_VERSION);
+
+ stats_map_mutex = TSMutexCreate();
+
+ TSCont cont = TSContCreate(handle_event, NULL);
+ TSHttpHookAdd(TS_HTTP_READ_REQUEST_HDR_HOOK, cont);
+}
+
http://git-wip-us.apache.org/repos/asf/trafficserver/blob/de35ad0a/plugins/experimental/channel_stats/debug_macros.h
----------------------------------------------------------------------
diff --git a/plugins/experimental/channel_stats/debug_macros.h b/plugins/experimental/channel_stats/debug_macros.h
new file mode 100644
index 0000000..519c608
--- /dev/null
+++ b/plugins/experimental/channel_stats/debug_macros.h
@@ -0,0 +1,77 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+
+#ifndef _DBG_MACROS_H
+#define _DBG_MACROS_H
+
+#define __STDC_FORMAT_MACROS
+#include <inttypes.h>
+
+#define TAG PLUGIN_NAME
+#define API_TAG PLUGIN_NAME ".api"
+
+#define unlikely(x) __builtin_expect(!!(x), 0)
+#define likely(x) __builtin_expect(!!(x), 1)
+
+#define debug_tag(tag, fmt, ...) do { \
+ if (unlikely(TSIsDebugTagSet(tag))) { \
+ TSDebug(tag, fmt, ##__VA_ARGS__); \
+ } \
+} while(0)
+
+#define debug(fmt, ...) \
+ debug_tag(TAG, "DEBUG: [%s:%d] [%s] " fmt, __FILE__, __LINE__, __FUNCTION__ , ##__VA_ARGS__);
+
+#define info(fmt, ...) \
+ debug_tag(TAG, "INFO: " fmt, ##__VA_ARGS__);
+
+#define warning(fmt, ...) \
+ debug_tag(TAG, "WARNING: " fmt, ##__VA_ARGS__);
+
+#define error(fmt, ...) do { \
+ TSError("[%s:%d] [%s] ERROR: " fmt, __FILE__, __LINE__, __FUNCTION__ , ##__VA_ARGS__); \
+ debug_tag(TAG, "[%s:%d] [%s] ERROR: " fmt, __FILE__, __LINE__, __FUNCTION__ , ##__VA_ARGS__); \
+} while (0)
+
+#define fatal(fmt, ...) do { \
+ TSError("[%s:%d] [%s] ERROR: " fmt, __FILE__, __LINE__, __FUNCTION__ , ##__VA_ARGS__); \
+ debug_tag(TAG, "[%s:%d] [%s] ERROR: " fmt, __FILE__, __LINE__, __FUNCTION__ , ##__VA_ARGS__); \
+ exit(-1); \
+} while (0)
+
+#define debug_api(fmt, ...) \
+ debug_tag(API_TAG, "DEBUG: [%s:%d] [%s] " fmt, __FILE__, __LINE__, __FUNCTION__ , ##__VA_ARGS__);
+
+#define error_api(fmt, ...) do { \
+ TSError("[%s:%d] [%s] ERROR: " fmt, __FILE__, __LINE__, __FUNCTION__ , ##__VA_ARGS__); \
+ debug_tag(API_TAG, "ERROR: [%s:%d] [%s] " fmt, __FILE__, __LINE__, __FUNCTION__ , ##__VA_ARGS__); \
+} while (0)
+
+#define HRTIME_FOREVER (10*HRTIME_DECADE)
+#define HRTIME_DECADE (10*HRTIME_YEAR)
+#define HRTIME_YEAR (365*HRTIME_DAY+HRTIME_DAY/4)
+#define HRTIME_WEEK (7*HRTIME_DAY)
+#define HRTIME_DAY (24*HRTIME_HOUR)
+#define HRTIME_HOUR (60*HRTIME_MINUTE)
+#define HRTIME_MINUTE (60*HRTIME_SECOND)
+#define HRTIME_SECOND (1000*HRTIME_MSECOND)
+#define HRTIME_MSECOND (1000*HRTIME_USECOND)
+#define HRTIME_USECOND (1000*HRTIME_NSECOND)
+#define HRTIME_NSECOND (1LL)
+
+#endif //_DBG_MACROS_H