You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@trafficserver.apache.org by so...@apache.org on 2012/12/01 22:56:28 UTC

[1/2] git commit: TS-1596 - Added channel_stats plugin to experimental.

Updated Branches:
  refs/heads/master c55e396bb -> 0c7c43dca


TS-1596 - Added channel_stats plugin to experimental.


Project: http://git-wip-us.apache.org/repos/asf/trafficserver/repo
Commit: http://git-wip-us.apache.org/repos/asf/trafficserver/commit/de35ad0a
Tree: http://git-wip-us.apache.org/repos/asf/trafficserver/tree/de35ad0a
Diff: http://git-wip-us.apache.org/repos/asf/trafficserver/diff/de35ad0a

Branch: refs/heads/master
Commit: de35ad0abdf579a52ea765eaceba898dcc1e0b73
Parents: c55e396
Author: Phil Sorber <so...@apache.org>
Authored: Sat Dec 1 16:13:40 2012 -0500
Committer: Phil Sorber <so...@apache.org>
Committed: Sat Dec 1 16:53:46 2012 -0500

----------------------------------------------------------------------
 CHANGES                                            |    2 +
 configure.ac                                       |    1 +
 plugins/experimental/Makefile.am                   |    3 +-
 plugins/experimental/channel_stats/Makefile.am     |   18 +
 plugins/experimental/channel_stats/Makefile.tsxs   |   24 +
 plugins/experimental/channel_stats/README          |   90 ++
 .../experimental/channel_stats/channel_stats.cc    |  806 +++++++++++++++
 plugins/experimental/channel_stats/debug_macros.h  |   77 ++
 8 files changed, 1020 insertions(+), 1 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/trafficserver/blob/de35ad0a/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index 0a8a6ca..2af471c 100644
--- a/CHANGES
+++ b/CHANGES
@@ -1,6 +1,8 @@
                                                          -*- coding: utf-8 -*-
 Changes with Apache Traffic Server 3.3.1
 
+  *) [TS-1596] Added channel_stats plugin to experimental.
+
   *) [TS-1607] decouple SSL certificate lookup
 
   *) [TS-1506] %<cquuh> log symbol will crash TS when requesting a SSL url.

http://git-wip-us.apache.org/repos/asf/trafficserver/blob/de35ad0a/configure.ac
----------------------------------------------------------------------
diff --git a/configure.ac b/configure.ac
index 29dff32..a5455d5 100644
--- a/configure.ac
+++ b/configure.ac
@@ -1614,6 +1614,7 @@ AC_CONFIG_FILES([plugins/experimental/header_rewrite/Makefile])
 AC_CONFIG_FILES([plugins/experimental/metalink/Makefile])
 AC_CONFIG_FILES([plugins/experimental/gzip/Makefile])
 AC_CONFIG_FILES([plugins/experimental/spdy/Makefile])
+AC_CONFIG_FILES([plugins/experimental/channel_stats/Makefile])
 # various tools
 AC_CONFIG_FILES([tools/Makefile])
 # example plugins

http://git-wip-us.apache.org/repos/asf/trafficserver/blob/de35ad0a/plugins/experimental/Makefile.am
----------------------------------------------------------------------
diff --git a/plugins/experimental/Makefile.am b/plugins/experimental/Makefile.am
index 025ebce..fae28a7 100644
--- a/plugins/experimental/Makefile.am
+++ b/plugins/experimental/Makefile.am
@@ -24,5 +24,6 @@ SUBDIRS = \
  header_rewrite \
  metalink \
  gzip \
- spdy
+ spdy \
+ channel_stats
 endif

http://git-wip-us.apache.org/repos/asf/trafficserver/blob/de35ad0a/plugins/experimental/channel_stats/Makefile.am
----------------------------------------------------------------------
diff --git a/plugins/experimental/channel_stats/Makefile.am b/plugins/experimental/channel_stats/Makefile.am
new file mode 100644
index 0000000..e2e6872
--- /dev/null
+++ b/plugins/experimental/channel_stats/Makefile.am
@@ -0,0 +1,18 @@
+#  Licensed under the Apache License, Version 2.0 (the "License");
+#  you may not use this file except in compliance with the License.
+#  You may obtain a copy of the License at
+#
+#  http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+
+AM_CPPFLAGS = -I$(top_builddir)/proxy/api -I$(top_srcdir)/proxy/api
+
+pkglibdir = ${pkglibexecdir}
+pkglib_LTLIBRARIES = channel_stats.la
+channel_stats_la_SOURCES = channel_stats.cc
+channel_stats_la_LDFLAGS = -module -avoid-version -shared

http://git-wip-us.apache.org/repos/asf/trafficserver/blob/de35ad0a/plugins/experimental/channel_stats/Makefile.tsxs
----------------------------------------------------------------------
diff --git a/plugins/experimental/channel_stats/Makefile.tsxs b/plugins/experimental/channel_stats/Makefile.tsxs
new file mode 100644
index 0000000..aa84569
--- /dev/null
+++ b/plugins/experimental/channel_stats/Makefile.tsxs
@@ -0,0 +1,24 @@
+#  Licensed under the Apache License, Version 2.0 (the "License");
+#  you may not use this file except in compliance with the License.
+#  You may obtain a copy of the License at
+#
+#  http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+
+TSXS?=tsxs
+
+%.so: %.cc
+	$(TSXS) -C $< -o $@
+
+all: channel_stats.so
+
+install: all
+	$(TSXS) -i -o channel_stats.so
+
+clean:
+	rm -f *.lo *.so

http://git-wip-us.apache.org/repos/asf/trafficserver/blob/de35ad0a/plugins/experimental/channel_stats/README
----------------------------------------------------------------------
diff --git a/plugins/experimental/channel_stats/README b/plugins/experimental/channel_stats/README
new file mode 100644
index 0000000..873e151
--- /dev/null
+++ b/plugins/experimental/channel_stats/README
@@ -0,0 +1,90 @@
+channel_stats plugin for Apache Traffic Server 3.0.0+
+
+About channel_stats plugin
+==========================
+channel_stats plugin collect the runtime statistics (speed, request count and
+more in the future) for each channel. The stats is exposed with http interface
+in json. (The code of interface is from 'stats_over_http' plugin.)
+
+In general, the plugin could be used on reverse proxy with a fixed number of
+remap rules. It's not design for the proxy which serves unlimited channels (
+open-relay forward proxy).
+
+
+Installation
+==========================
+  make -f Makefile.tsxs
+  sudo make install -f Makefile.tsxs
+(if 'tsxs' is not in your PATH, run make by appending TSXS=/path/to/ts/bin/tsxs)
+
+Add 'channel_stats.so' to plugins.config. By default, the path of http interface
+is '_cstats'. For safety, you should change it by adding a parameter after
+'channel_stats.so'. Example: 'channel_stats.so _my_cstats'.
+
+Restart Traffic Server: sudo traffic_line -L or sudo trafficserver restart
+
+
+Viewing Stats
+==========================
+Visit http://local_IP:port/_cstats (or as you configured in plugin.config)
+Output will be in json:
+
+{
+  "channel": {
+    "www.example.com": {
+      "response.bytes.content": "3486995502046",
+      "response.count.2xx.get": "64040675",
+      "speed.ua.bytes_per_sec_64k": "3972287"
+    },
+    "www.test.com": {
+      "response.bytes.content": "3349404916760",
+      "response.count.2xx.get": "64038172",
+      "speed.ua.bytes_per_sec_64k": "3989255"
+    }
+  },
+ "global": {
+    "response.count.2xx.get": "268516715",
+    "server": "3.0.4"
+  }
+}
+
+Available stats:
+ - response.bytes.content: transferred content length (not including header)
+ - response.count.2xx.get: transaction count
+ - speed.ua.bytes_per_sec_64k: count of transaction whose speed is < 64KBps
+
+Additional parameters:
+ - topn: only output top N channels order by response count
+ - channel: only output the channels which contain specific string
+ - global: also display TS internal stats as 'stats_over_http' plugin
+ Example:
+ - http://127.0.0.1/_cstats?global
+ - http://127.0.0.1/_cstats?topn=5
+ - http://127.0.0.1/_cstats?channel=test.com
+ - http://127.0.0.1/_cstats?channel=test.com&topn=5&global
+If you have a large number of channels (e.g. more than 10k), those parameters 
+may not be heavily used due to extra overhead.
+
+
+Warning
+==========================
+Security
+ - As mentioned above, you should change default path of http interface to make
+   other people harder to access your channel stats.
+ - For IPv4, plugin will make sure visitor is from private network.
+   (http://en.wikipedia.org/wiki/Private_network#Private_IPv4_address_spaces)
+   In addition, you should deny the request to http interface on your L7 front-
+   end because your L7 server and TS are probably in a same local network. And 
+   currently plugin doesn't check IPv6 address. (should disable IPv6 by default?)
+ - The number of channels is limited to 100000.
+
+
+DEV
+==========================
+
+
+ChangeLog
+==========================
+
+Version 0.1 (11/26/12)
+  - Initial release

http://git-wip-us.apache.org/repos/asf/trafficserver/blob/de35ad0a/plugins/experimental/channel_stats/channel_stats.cc
----------------------------------------------------------------------
diff --git a/plugins/experimental/channel_stats/channel_stats.cc b/plugins/experimental/channel_stats/channel_stats.cc
new file mode 100644
index 0000000..7cca4e7
--- /dev/null
+++ b/plugins/experimental/channel_stats/channel_stats.cc
@@ -0,0 +1,806 @@
+/*
+  Licensed under the Apache License, Version 2.0 (the "License");
+  you may not use this file except in compliance with the License.
+  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+*/
+
+// get INT64_MAX
+#define __STDC_LIMIT_MACROS
+
+#include <cstdio>
+#include <cstring>
+#include <cctype>
+#include <string>
+#include <map> // may optimize by using hash_map, but mind compiler portability
+#include <vector>
+#include <algorithm>
+#include <sstream>
+#include <arpa/inet.h>
+
+#include <ts/ts.h>
+#if (TS_VERSION_NUMBER < 3003001)
+// get TSHttpTxnStartTimeGet
+#include <ts/experimental.h>
+#endif
+
+#include "debug_macros.h"
+
+#define PLUGIN_NAME     "channel_stats"
+#define PLUGIN_VERSION  "0.1"
+
+#define MAX_SPEED 999999999
+
+/* limit the number of channels (items) to avoid potential attack,
+   regex_map rule can also generate infinite channels (hosts) */
+#define MAX_MAP_SIZE 100000
+
+static std::string api_path("_cstats");
+
+// global stats
+static uint64_t global_response_count_2xx_get = 0;  // 2XX GET response count
+static uint64_t global_response_bytes_content = 0;  // transferred bytes (2xx.get)
+
+// channel stats
+struct channel_stat {
+  channel_stat()
+      : response_bytes_content(0),
+        response_count_2xx(0),
+        speed_ua_bytes_per_sec_64k(0) {
+  }
+
+  inline void increment(uint64_t rbc, uint64_t rc2, uint64_t sbps6) {
+    __sync_fetch_and_add(&response_bytes_content, rbc);
+    if (rc2) __sync_fetch_and_add(&response_count_2xx, rc2);
+    if (sbps6) __sync_fetch_and_add(&speed_ua_bytes_per_sec_64k, sbps6);
+  }
+
+  inline void debug_channel() {
+    debug("response.bytes.content: %" PRIu64 "", response_bytes_content);
+    debug("response.count.2xx: %" PRIu64 "", response_count_2xx);
+    debug("speed.ua.bytes_per_sec_64k: %" PRIu64 "", speed_ua_bytes_per_sec_64k);
+  }
+
+  uint64_t response_bytes_content;
+  uint64_t response_count_2xx;
+  uint64_t speed_ua_bytes_per_sec_64k;
+};
+
+typedef std::map<std::string, channel_stat *> stats_map_type;
+typedef stats_map_type::iterator smap_iterator;
+
+static stats_map_type channel_stats;
+static TSMutex stats_map_mutex;
+
+// api Intercept Data
+typedef struct intercept_state_t
+{
+  TSVConn net_vc;
+  TSVIO read_vio;
+  TSVIO write_vio;
+
+  TSIOBuffer req_buffer;
+  TSIOBuffer resp_buffer;
+  TSIOBufferReader resp_reader;
+
+  int output_bytes;
+  int body_written;
+
+  int show_global; // default 0
+  char * channel; // default ""
+  int topn; // default -1
+  int deny; // default 0
+} intercept_state;
+
+struct private_seg_t {
+  struct in_addr net;
+  struct in_addr mask;
+};
+
+// don't put inet_addr("255.255.255.255"), see BUGS in 'man 3 inet_addr'
+static struct private_seg_t private_segs[] = {
+  {{inet_addr("10.0.0.0")}, {inet_addr("255.0.0.0")}},
+  {{inet_addr("127.0.0.0")}, {inet_addr("255.0.0.0")}},
+  {{inet_addr("172.16.0.0")}, {inet_addr("255.240.0.0")}},
+  {{inet_addr("192.168.0.0")}, {inet_addr("255.255.0.0")}}
+};
+static int num_private_segs = sizeof(private_segs) / sizeof(private_seg_t);
+
+// all parameters are in network byte order
+static int
+is_in_net (const struct in_addr *  addr,
+           const struct in_addr *  netaddr,
+           const struct in_addr *  netmask)
+{
+   if ((addr->s_addr & netmask->s_addr) == (netaddr->s_addr & netmask->s_addr))
+      return 1;
+   return 0;
+}
+
+static int
+is_private_ip(const struct in_addr * addr)
+{
+  int i;
+  for (i = 0; i < num_private_segs; i++) {
+    if (is_in_net(addr, &private_segs[i].net, &private_segs[i].mask))
+      return 1;
+  }
+  return 0;
+}
+
+static int handle_event(TSCont contp, TSEvent event, void *edata);
+static int api_handle_event(TSCont contp, TSEvent event, void *edata);
+
+/*
+  Get the value of parameter in url querystring
+  Return 0 and a null string if not find the parameter.
+  Return 1 and a value string, normally
+  Return 2 and a max_length value string if the length of the value exceeds.
+
+  Possible appearance: ?param=value&fake_param=value&param=value
+*/
+static int
+get_query_param(const char *query, const char *param,
+                char *result, int max_length)
+{
+  char *pos = 0;
+
+  pos = strstr(query, param); // try to find in querystring of url
+  if (pos != query) {
+    // if param is not prefix of querystring
+    while (pos && *(pos - 1) != '&') { // param must be after '&'
+        pos = strstr(pos + strlen(param), param); // try next
+    }
+  }
+
+  if (!pos) {
+    // set it null string if not found
+    result[0] = '\0';
+    return 0;
+  }
+
+  pos += strlen(param); // skip 'param='
+
+  // copy value of param
+  int now = 0;
+  while (*pos != '\0' && *pos != '&' && now < max_length) {
+    result[now++] = *pos;
+    pos++;
+  }
+  result[now] = '\0'; // make sure null-terminated
+
+  if (*pos != '\0' && *pos != '&' && now == max_length)
+    return 2;
+  else
+    return 1;
+}
+
+/*
+  check if exist param in query string
+
+  Possible querystring: ?param1=value1&param2
+  (param2 is a param which "has_no_value")
+*/
+static int
+has_query_param(const char *query, const char *param, int has_no_value)
+{
+  char *pos = 0;
+
+  pos = strstr(query, param); // try to find in querystring of url
+  if (pos != query) {
+    // if param is not prefix of querystring
+    while (pos && *(pos - 1) != '&') { // param must be after '&'
+        pos = strstr(pos + strlen(param), param); // try next
+    }
+  }
+
+  if (!pos)
+    return 0;
+
+  pos += strlen(param); // skip 'param='
+
+  if (has_no_value) {
+    if (*pos == '\0' || *pos == '&') return 1;
+  } else {
+    if (*pos == '=') return 1;
+  }
+
+  return 0;
+}
+
+static void
+get_api_params(TSMBuffer   bufp,
+               TSMLoc      url_loc,
+               int *       show_global,
+               char **     channel,
+               int *       topn)
+{
+  const char * query; // not null-terminated, get from TS api
+  char * tmp_query = NULL; // null-terminated
+  int query_len = 0;
+
+  *show_global = 0;
+  *topn = -1;
+
+  query = TSUrlHttpQueryGet(bufp, url_loc, &query_len);
+  if (query_len == 0)
+    return;
+  tmp_query = TSstrndup(query, query_len);
+  debug_api("querystring: %s", tmp_query);
+
+  if (has_query_param(tmp_query, "global", 1)) {
+    debug_api("found 'global' param");
+    *show_global = 1;
+  }
+
+  *channel = (char *) TSmalloc(query_len);
+  if (get_query_param(tmp_query, "channel=", *channel, query_len)) {
+    debug_api("found 'channel' param: %s", *channel);
+  }
+
+  std::stringstream ss;
+  char * tmp_topn = (char *) TSmalloc(query_len);
+  if (get_query_param(tmp_query, "topn=", tmp_topn, 10)) {
+    if (strlen(tmp_topn) > 0) {
+      ss.str(tmp_topn);
+      ss >> *topn;
+    }
+    debug_api("found 'topn' param: %d", *topn);
+  }
+
+  TSfree(tmp_query);
+  TSfree(tmp_topn);
+}
+
+static void
+handle_read_req(TSCont contp, TSHttpTxn txnp)
+{
+  TSMBuffer bufp;
+  TSMLoc hdr_loc = NULL;
+  TSMLoc url_loc = NULL;
+  const char *method;
+  int method_length = 0;
+  TSCont txn_contp;
+
+  const char * path;
+  int path_len;
+  struct sockaddr * client_addr;
+  struct sockaddr_in * client_addr4;
+  TSCont api_contp;
+  char * client_ip;
+  intercept_state *api_state;
+
+  if (TSHttpTxnClientReqGet(txnp, &bufp, &hdr_loc) != TS_SUCCESS) {
+    error("couldn't retrieve client's request");
+    goto cleanup;
+  }
+
+  method = TSHttpHdrMethodGet(bufp, hdr_loc, &method_length);
+  if (0 != strncmp(method, TS_HTTP_METHOD_GET, method_length)) {
+    debug("do not count %.*s method", method_length, method);
+    goto cleanup;
+  }
+
+  if (TSHttpHdrUrlGet(bufp, hdr_loc, &url_loc) != TS_SUCCESS)
+    goto cleanup;
+
+  path = TSUrlPathGet(bufp, url_loc, &path_len);
+  if (path_len == 0 || (unsigned)path_len != api_path.length() ||
+        strncmp(api_path.c_str(), path, path_len) != 0) {
+    goto not_api;
+  }
+
+  // register our intercept
+  debug_api("Intercepting request");
+  api_state = (intercept_state *) TSmalloc(sizeof(*api_state));
+  memset(api_state, 0, sizeof(*api_state));
+  get_api_params(bufp, url_loc,
+                 &api_state->show_global, &api_state->channel,
+                 &api_state->topn);
+
+  // check private ip
+  client_addr = (struct sockaddr *) TSHttpTxnClientAddrGet(txnp);
+  if (client_addr->sa_family == AF_INET) {
+    client_addr4 = (struct sockaddr_in *) client_addr;
+    if (!is_private_ip(&client_addr4->sin_addr)) {
+      client_ip = (char *) TSmalloc(INET_ADDRSTRLEN);
+      inet_ntop(AF_INET, &client_addr4->sin_addr, client_ip, INET_ADDRSTRLEN);
+      debug_api("%s is not a private IP, request denied", client_ip);
+      api_state->deny = 1;
+      TSfree(client_ip);
+    }
+  } else {
+    debug_api("not IPv4, ignore IP auth"); // TODO check AF_INET6 private IP?
+  }
+
+  TSSkipRemappingSet(txnp, 1); //not strictly necessary
+
+  api_contp = TSContCreate(api_handle_event, TSMutexCreate());
+  TSContDataSet(api_contp, api_state);
+  TSHttpTxnIntercept(api_contp, txnp);
+
+  goto cleanup;
+
+not_api:
+
+  txn_contp = TSContCreate(handle_event, NULL); // reuse global hander
+  TSHttpTxnHookAdd(txnp, TS_HTTP_TXN_CLOSE_HOOK, txn_contp);
+
+cleanup:
+  if (url_loc) TSHandleMLocRelease(bufp, hdr_loc, url_loc);
+  if (hdr_loc) TSHandleMLocRelease(bufp, TS_NULL_MLOC, hdr_loc);
+}
+
+static void
+handle_txn_close(TSCont contp, TSHttpTxn txnp)
+{
+  TSMBuffer bufp;
+  TSMLoc hdr_loc;
+  TSHttpStatus status;
+  TSMLoc purl_loc;
+  const char * pristine_host;
+  int pristine_host_len = 0;
+  int pristine_port;
+  uint64_t user_speed;
+  uint64_t body_bytes;
+  TSHRTime start_time = 0;
+  TSHRTime end_time = 0;
+  TSHRTime interval_time = 0;
+  smap_iterator stat_it;
+  channel_stat *stat;
+  std::pair<smap_iterator, bool> insert_ret;
+  std::string host;
+  std::stringstream ss;
+
+  if (TSHttpTxnClientRespGet(txnp, &bufp, &hdr_loc) != TS_SUCCESS) {
+    debug("couldn't retrieve final response");
+    return;
+  }
+
+  status = TSHttpHdrStatusGet(bufp, hdr_loc);
+  if (status != TS_HTTP_STATUS_OK && status != TS_HTTP_STATUS_PARTIAL_CONTENT) {
+    debug("only count 200/206 response");
+    goto cleanup;
+  }
+
+  if (TSHttpTxnPristineUrlGet(txnp, &bufp, &purl_loc) != TS_SUCCESS) {
+    debug("couldn't retrieve pristine url");
+    goto cleanup;
+  }
+
+  pristine_host = TSUrlHostGet(bufp, purl_loc, &pristine_host_len);
+  if (pristine_host_len == 0) {
+    debug("couldn't retrieve pristine host");
+    goto cleanup;
+  }
+  pristine_port = TSUrlPortGet(bufp, purl_loc);
+  host = std::string(pristine_host, pristine_host_len);
+  if (pristine_port != 80) {
+    ss << pristine_port;
+    host += ":" + ss.str();
+  }
+
+  body_bytes = TSHttpTxnClientRespBodyBytesGet(txnp);
+  __sync_fetch_and_add(&global_response_count_2xx_get, 1);
+  __sync_fetch_and_add(&global_response_bytes_content, body_bytes);
+
+  debug("pristine host: %.*s", pristine_host_len, pristine_host);
+  debug("pristine port: %d", pristine_port);
+  debug("host to lookup: %s", host.c_str());
+  debug("body bytes: %" PRIu64 "", body_bytes);
+  debug("2xx req count: %" PRIu64 "", global_response_count_2xx_get);
+
+#if (TS_VERSION_NUMBER < 3003001)
+  TSHttpTxnStartTimeGet(txnp, &start_time);
+  TSHttpTxnEndTimeGet(txnp, &end_time);
+#else
+  TSHttpTxnMilestoneGet(txnp, TS_MILESTONE_UA_BEGIN, &start_time);
+  TSHttpTxnMilestoneGet(txnp, TS_MILESTONE_UA_CLOSE, &end_time);
+#endif
+
+  if (start_time != 0 && end_time != 0 && end_time >= start_time) {
+    interval_time = end_time - start_time;
+  } else {
+    warning("not valid time, start: %" PRId64", end: %" PRId64"", start_time, end_time);
+    goto cleanup;
+  }
+
+  if (interval_time == 0 || body_bytes == 0)
+    user_speed = MAX_SPEED;
+  else
+    user_speed = (int)((float)body_bytes / interval_time * HRTIME_SECOND);
+
+  debug("start time: %" PRId64 "", start_time);
+  debug("end time: %" PRId64 "", end_time);
+  debug("interval time: %" PRId64 "", interval_time);
+  debug("interval seconds: %.5f", interval_time / (float)HRTIME_SECOND);
+  debug("speed bytes per second: %" PRIu64 "", user_speed);
+
+  /*
+  // test large number of channels
+  if (channel_stats.size() < MAX_MAP_SIZE)
+    ss << channel_stats.size() + 1;
+  else
+    ss << (rand() % MAX_MAP_SIZE + 1);
+  host = host + "--" + ss.str();
+  debug("%s", host.c_str()); 
+  */
+
+  stat_it = channel_stats.find(host);
+  if (stat_it == channel_stats.end()) {
+    if (channel_stats.size() >= MAX_MAP_SIZE) {
+      warning("channel_stats map exceeds max size");
+      goto cleanup;
+    }
+    stat = new channel_stat();
+    TSMutexLock(stats_map_mutex);
+    insert_ret = channel_stats.insert(std::make_pair(host, stat));
+    TSMutexUnlock(stats_map_mutex);
+    if (insert_ret.second == false) {
+      warning("stat of this channel already existed");
+      delete stat;
+      stat = insert_ret.first->second;
+    } else {
+      debug("********** new channel(%zu) **********", channel_stats.size());
+    }
+  } else { // found
+    stat = stat_it->second;
+  }
+
+  stat->increment(body_bytes, 1, user_speed < 64000 ? 1 : 0);
+  stat->debug_channel();
+
+cleanup:
+  TSHandleMLocRelease(bufp, TS_NULL_MLOC, hdr_loc);
+}
+
+static int
+handle_event(TSCont contp, TSEvent event, void *edata) {
+  TSHttpTxn txnp = (TSHttpTxn) edata;
+
+  switch (event) {
+    case TS_EVENT_HTTP_READ_REQUEST_HDR: // for global contp
+      debug("---------- new request ----------");
+      handle_read_req(contp, txnp);
+      break;
+    case TS_EVENT_HTTP_TXN_CLOSE: // for txn contp
+      handle_txn_close(contp, txnp);
+      TSContDestroy(contp);
+      break;
+    default:
+      error("unknown event for this plugin");
+  }
+
+  TSHttpTxnReenable(txnp, TS_EVENT_HTTP_CONTINUE);
+
+  return 0;
+}
+
+// below is api part
+
+static void
+stats_cleanup(TSCont contp, intercept_state * api_state)
+{
+  if (api_state->req_buffer) {
+    TSIOBufferDestroy(api_state->req_buffer);
+    api_state->req_buffer = NULL;
+  }
+
+  if (api_state->resp_buffer) {
+    TSIOBufferDestroy(api_state->resp_buffer);
+    api_state->resp_buffer = NULL;
+  }
+
+  TSfree(api_state->channel);
+  TSVConnClose(api_state->net_vc);
+  TSfree(api_state);
+  TSContDestroy(contp);
+}
+
+static void
+stats_process_accept(TSCont contp, intercept_state * api_state)
+{
+
+  api_state->req_buffer = TSIOBufferCreate();
+  api_state->resp_buffer = TSIOBufferCreate();
+  api_state->resp_reader = TSIOBufferReaderAlloc(api_state->resp_buffer);
+  api_state->read_vio = TSVConnRead(api_state->net_vc, contp, api_state->req_buffer, INT64_MAX);
+}
+
+static int
+stats_add_data_to_resp_buffer(const char *s, intercept_state * api_state)
+{
+  int s_len = strlen(s);
+
+  TSIOBufferWrite(api_state->resp_buffer, s, s_len);
+
+  return s_len;
+}
+
+static const char RESP_HEADER[] =
+  "HTTP/1.0 200 Ok\r\nContent-Type: application/json\r\nCache-Control: no-cache\r\n\r\n";
+
+static int
+stats_add_resp_header(intercept_state * api_state)
+{
+  return stats_add_data_to_resp_buffer(RESP_HEADER, api_state);
+}
+
+static void
+stats_process_read(TSCont contp, TSEvent event, intercept_state * api_state)
+{
+  debug_api("stats_process_read(%d)", event);
+  if (event == TS_EVENT_VCONN_READ_READY) {
+    api_state->output_bytes = stats_add_resp_header(api_state);
+    TSVConnShutdown(api_state->net_vc, 1, 0);
+    api_state->write_vio = TSVConnWrite(api_state->net_vc, contp, api_state->resp_reader, INT64_MAX);
+  } else if (event == TS_EVENT_ERROR) {
+    error_api("stats_process_read: Received TS_EVENT_ERROR\n");
+  } else if (event == TS_EVENT_VCONN_EOS) {
+    // client may end the connection, simply return
+    return;
+  } else if (event == TS_EVENT_NET_ACCEPT_FAILED) {
+    error_api("stats_process_read: Received TS_EVENT_NET_ACCEPT_FAILED\n");
+  } else {
+    error_api("Unexpected Event %d\n", event);
+    // TSReleaseAssert(!"Unexpected Event");
+  }
+}
+
+#define APPEND(a) api_state->output_bytes += stats_add_data_to_resp_buffer(a, api_state)
+#define APPEND_STAT(a, fmt, v) do { \
+  char b[256]; \
+  if(snprintf(b, sizeof(b), "\"%s\": \"" fmt "\",\n", a, v) < (signed)sizeof(b)) \
+    APPEND(b); \
+} while(0)
+#define APPEND_END_STAT(a, fmt, v) do { \
+  char b[256]; \
+  if(snprintf(b, sizeof(b), "\"%s\": \"" fmt "\"\n", a, v) < (signed)sizeof(b)) \
+    APPEND(b); \
+} while(0)
+#define APPEND_DICT_NAME(a) do { \
+  char b[256]; \
+  if(snprintf(b, sizeof(b), "\"%s\": {\n", a) < (signed)sizeof(b)) \
+    APPEND(b); \
+} while(0)
+
+static void
+json_out_stat(TSRecordType rec_type, void *edata, int registered,
+              const char *name, TSRecordDataType data_type,
+              TSRecordData *datum) {
+  intercept_state *api_state = (intercept_state *) edata;
+
+  switch(data_type) {
+  case TS_RECORDDATATYPE_COUNTER:
+    APPEND_STAT(name, "%" PRId64, datum->rec_counter); break;
+  case TS_RECORDDATATYPE_INT:
+    APPEND_STAT(name, "%" PRIu64, datum->rec_int); break;
+  case TS_RECORDDATATYPE_FLOAT:
+    APPEND_STAT(name, "%f", datum->rec_float); break;
+  case TS_RECORDDATATYPE_STRING:
+    APPEND_STAT(name, "%s", datum->rec_string); break;
+  default:
+    debug_api("unkown type for %s: %d", name, data_type);
+    break;
+  }
+}
+
+template<class T>
+struct compare
+: std::binary_function<T,T,bool>
+{
+   inline bool operator()(const T& lhs, const T& rhs) {
+      return lhs.second->response_count_2xx > rhs.second->response_count_2xx;
+   }
+};
+
+static void
+append_channel_stat(intercept_state * api_state,
+                    const std::string channel, channel_stat * cs,
+                    int is_last)
+{
+  APPEND_DICT_NAME(channel.c_str());
+  APPEND_STAT("response.bytes.content", "%" PRIu64, cs->response_bytes_content);
+  APPEND_STAT("response.count.2xx.get", "%" PRIu64, cs->response_count_2xx);
+  APPEND_END_STAT("speed.ua.bytes_per_sec_64k", "%" PRIu64, cs->speed_ua_bytes_per_sec_64k);
+  if (is_last)
+    APPEND("}\n");
+  else
+    APPEND("},\n");
+}
+
+static void
+json_out_channel_stats(intercept_state * api_state) {
+  if (channel_stats.empty())
+    return;
+
+  typedef std::pair<std::string, channel_stat *> data_pair;
+  typedef std::vector<data_pair> stats_vec_t;
+  smap_iterator it;
+
+  debug("appending channel stats");
+
+  if (api_state->topn > -1 ||
+      (api_state->channel && strlen(api_state->channel) > 0)) {
+    // will use vector to output
+
+    if (api_state->topn == 0)
+      return;
+
+    stats_vec_t stats_vec; // a tmp vector to sort or filter
+    if (strlen(api_state->channel) > 0) {
+      // filter by channel
+      size_t found;
+      for (it=channel_stats.begin(); it != channel_stats.end(); it++) {
+        found = it->first.find(api_state->channel);
+        if (found != std::string::npos)
+          stats_vec.push_back(*it);
+      }
+    } else {
+      for (it=channel_stats.begin(); it != channel_stats.end(); it++)
+        stats_vec.push_back(*it);
+      /* stats_vec.assign is not safe when map is being inserted concurrently */
+    }
+
+    if (stats_vec.empty())
+      return;
+
+    stats_vec_t::size_type out_st = stats_vec.size();
+    if (api_state->topn > 0) { // need sort and limit output size
+      if ((unsigned)api_state->topn < stats_vec.size())
+        out_st = (unsigned)api_state->topn;
+      else
+        api_state->topn = stats_vec.size();
+      std::partial_sort(stats_vec.begin(), stats_vec.begin() + api_state->topn,
+                        stats_vec.end(), compare<data_pair>());
+    } // else will output whole vector without sort
+
+    stats_vec_t::size_type i;
+    for (i = 0; i < out_st - 1; i++) {
+      append_channel_stat(api_state, stats_vec[i].first, stats_vec[i].second, 0);
+    }
+    append_channel_stat(api_state, stats_vec[i].first, stats_vec[i].second, 1);
+
+  } else {
+    smap_iterator last_it = channel_stats.end();
+    last_it--;
+    for (it = channel_stats.begin(); it != last_it; it++) {
+      append_channel_stat(api_state, it->first, it->second, 0);
+    }
+    append_channel_stat(api_state, it->first, it->second, 1);
+  }
+}
+
+static void
+json_out_stats(intercept_state * api_state)
+{
+  const char *version;
+
+  APPEND("{ \"channel\": {\n");
+  json_out_channel_stats(api_state);
+  APPEND("  },\n");
+
+  APPEND(" \"global\": {\n");
+  APPEND_STAT("response.count.2xx.get", "%" PRIu64, global_response_count_2xx_get);
+  APPEND_STAT("response.bytes.content", "%" PRIu64, global_response_bytes_content);
+  APPEND_STAT("channel.count", "%zu", channel_stats.size());
+
+  if (api_state->show_global)
+    TSRecordDump(TS_RECORDTYPE_PROCESS, json_out_stat, api_state); // internal stats
+
+  version = TSTrafficServerVersionGet();
+  APPEND("\"server\": \"");
+  APPEND(version);
+  APPEND("\"\n");
+
+  APPEND("  }\n}\n");
+}
+
+static void
+stats_process_write(TSCont contp, TSEvent event, intercept_state * api_state)
+{
+  if (event == TS_EVENT_VCONN_WRITE_READY) {
+    if (api_state->body_written == 0) {
+      debug_api("plugin adding response body");
+      api_state->body_written = 1;
+      if (!api_state->deny)
+        json_out_stats(api_state);
+      else
+        APPEND("forbidden");
+      TSVIONBytesSet(api_state->write_vio, api_state->output_bytes);
+    }
+    TSVIOReenable(api_state->write_vio);
+  } else if (TS_EVENT_VCONN_WRITE_COMPLETE) {
+    stats_cleanup(contp, api_state);
+  } else if (event == TS_EVENT_ERROR) {
+    error_api("stats_process_write: Received TS_EVENT_ERROR\n");
+  } else {
+    error_api("Unexpected Event %d\n", event);
+    // TSReleaseAssert(!"Unexpected Event");
+  }
+}
+
+static int
+api_handle_event(TSCont contp, TSEvent event, void *edata)
+{
+  intercept_state *api_state = (intercept_state *) TSContDataGet(contp);
+  if (event == TS_EVENT_NET_ACCEPT) {
+    api_state->net_vc = (TSVConn) edata;
+    stats_process_accept(contp, api_state);
+  } else if (edata == api_state->read_vio) {
+    stats_process_read(contp, event, api_state);
+  } else if (edata == api_state->write_vio) {
+    stats_process_write(contp, event, api_state);
+  } else {
+    error_api("Unexpected Event %d\n", event);
+    // TSReleaseAssert(!"Unexpected Event");
+  }
+  return 0;
+}
+
+// initial part
+
+static int
+check_ts_version()
+{
+  const char *ts_version = TSTrafficServerVersionGet();
+  int result = 0;
+
+  if (ts_version) {
+    int major_ts_version = 0;
+    int minor_ts_version = 0;
+    int patch_ts_version = 0;
+
+    if (sscanf(ts_version, "%d.%d.%d", &major_ts_version, &minor_ts_version,
+                &patch_ts_version) != 3) {
+      return 0;
+    }
+
+    // Need at least TS 3.0.0
+    if (major_ts_version >= 3) {
+      result = 1;
+    }
+  }
+
+  return result;
+}
+
+void
+TSPluginInit(int argc, const char *argv[])
+{
+  if (argc > 2) {
+    fatal("plugin does not accept more than 1 argument");
+  } else if (argc == 2) {
+    api_path = std::string(argv[1]);
+    debug_api("stats api path: %s", api_path.c_str());
+  }
+
+  TSPluginRegistrationInfo info;
+
+  info.plugin_name = (char *)PLUGIN_NAME;
+  info.vendor_name = (char *)"wkl";
+  info.support_email = (char *)"conanmind@gmail.com";
+
+  if (TSPluginRegister(TS_SDK_VERSION_3_0, &info) != TS_SUCCESS) {
+    fatal("plugin registration failed.");
+  }
+
+  if (!check_ts_version()) {
+    fatal("plugin requires Traffic Server 3.0.0 or later");
+  }
+
+  info("%s(%s) plugin starting...", PLUGIN_NAME, PLUGIN_VERSION);
+
+  stats_map_mutex = TSMutexCreate();
+
+  TSCont cont = TSContCreate(handle_event, NULL);
+  TSHttpHookAdd(TS_HTTP_READ_REQUEST_HDR_HOOK, cont);
+}
+

http://git-wip-us.apache.org/repos/asf/trafficserver/blob/de35ad0a/plugins/experimental/channel_stats/debug_macros.h
----------------------------------------------------------------------
diff --git a/plugins/experimental/channel_stats/debug_macros.h b/plugins/experimental/channel_stats/debug_macros.h
new file mode 100644
index 0000000..519c608
--- /dev/null
+++ b/plugins/experimental/channel_stats/debug_macros.h
@@ -0,0 +1,77 @@
+/*
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+*/
+
+#ifndef _DBG_MACROS_H
+#define _DBG_MACROS_H
+
+#define __STDC_FORMAT_MACROS
+#include <inttypes.h>
+
+#define TAG PLUGIN_NAME
+#define API_TAG PLUGIN_NAME ".api"
+
+#define unlikely(x) __builtin_expect(!!(x), 0)
+#define likely(x)   __builtin_expect(!!(x), 1)
+
+#define debug_tag(tag, fmt, ...) do { \
+    if (unlikely(TSIsDebugTagSet(tag))) { \
+        TSDebug(tag, fmt, ##__VA_ARGS__); \
+    } \
+} while(0)
+
+#define debug(fmt, ...) \
+  debug_tag(TAG, "DEBUG: [%s:%d] [%s] " fmt, __FILE__, __LINE__, __FUNCTION__ , ##__VA_ARGS__);
+
+#define info(fmt, ...) \
+  debug_tag(TAG, "INFO: " fmt, ##__VA_ARGS__);
+
+#define warning(fmt, ...) \
+  debug_tag(TAG, "WARNING: " fmt, ##__VA_ARGS__);
+
+#define error(fmt, ...) do { \
+  TSError("[%s:%d] [%s] ERROR: " fmt, __FILE__, __LINE__, __FUNCTION__ , ##__VA_ARGS__); \
+  debug_tag(TAG, "[%s:%d] [%s] ERROR: " fmt, __FILE__, __LINE__, __FUNCTION__ , ##__VA_ARGS__); \
+} while (0)
+
+#define fatal(fmt, ...) do { \
+  TSError("[%s:%d] [%s] ERROR: " fmt, __FILE__, __LINE__, __FUNCTION__ , ##__VA_ARGS__); \
+  debug_tag(TAG, "[%s:%d] [%s] ERROR: " fmt, __FILE__, __LINE__, __FUNCTION__ , ##__VA_ARGS__); \
+  exit(-1); \
+} while (0)
+
+#define debug_api(fmt, ...) \
+  debug_tag(API_TAG, "DEBUG: [%s:%d] [%s] " fmt, __FILE__, __LINE__, __FUNCTION__ , ##__VA_ARGS__);
+
+#define error_api(fmt, ...) do { \
+  TSError("[%s:%d] [%s] ERROR: " fmt, __FILE__, __LINE__, __FUNCTION__ , ##__VA_ARGS__); \
+  debug_tag(API_TAG, "ERROR: [%s:%d] [%s] " fmt, __FILE__, __LINE__, __FUNCTION__ , ##__VA_ARGS__); \
+} while (0)
+
+#define HRTIME_FOREVER  (10*HRTIME_DECADE)
+#define HRTIME_DECADE   (10*HRTIME_YEAR)
+#define HRTIME_YEAR     (365*HRTIME_DAY+HRTIME_DAY/4)
+#define HRTIME_WEEK     (7*HRTIME_DAY)
+#define HRTIME_DAY      (24*HRTIME_HOUR)
+#define HRTIME_HOUR     (60*HRTIME_MINUTE)
+#define HRTIME_MINUTE   (60*HRTIME_SECOND)
+#define HRTIME_SECOND   (1000*HRTIME_MSECOND)
+#define HRTIME_MSECOND  (1000*HRTIME_USECOND)
+#define HRTIME_USECOND  (1000*HRTIME_NSECOND)
+#define HRTIME_NSECOND	(1LL)
+
+#endif //_DBG_MACROS_H