You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@trafficcontrol.apache.org by GitBox <gi...@apache.org> on 2018/02/12 19:49:06 UTC

[GitHub] mtorluemke closed pull request #1768: Add Traffic Monitor Plugin System for Cache Stats Formats

mtorluemke closed pull request #1768: Add Traffic Monitor Plugin System for Cache Stats Formats
URL: https://github.com/apache/incubator-trafficcontrol/pull/1768
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/lib/go-tc/traffic_monitor.go b/lib/go-tc/traffic_monitor.go
index d8dbba8201..2597ed5c3c 100644
--- a/lib/go-tc/traffic_monitor.go
+++ b/lib/go-tc/traffic_monitor.go
@@ -96,6 +96,7 @@ type TMProfile struct {
 type TMParameters struct {
 	HealthConnectionTimeout int    `json:"health.connection.timeout"`
 	HealthPollingURL        string `json:"health.polling.url"`
+	HealthPollingFormat     string `json:"health.polling.format"`
 	HistoryCount            int    `json:"history.count"`
 	MinFreeKbps             int64
 	Thresholds              map[string]HealthThreshold `json:"health_threshold"`
diff --git a/traffic_monitor/cache/cache.go b/traffic_monitor/cache/cache.go
index 6c04e97fdc..0c9f5c212d 100644
--- a/traffic_monitor/cache/cache.go
+++ b/traffic_monitor/cache/cache.go
@@ -24,9 +24,6 @@ import (
 	"fmt"
 	"io"
 	"net/url"
-	"regexp"
-	"strconv"
-	"strings"
 	"time"
 
 	"github.com/apache/incubator-trafficcontrol/lib/go-log"
@@ -38,10 +35,8 @@ import (
 
 // Handler is a cache handler, which fulfills the common/handler `Handler` interface.
 type Handler struct {
-	resultChan         chan Result
-	Notify             int
-	ToData             *todata.TODataThreadsafe
-	MultipleSpaceRegex *regexp.Regexp
+	resultChan chan Result
+	ToData     *todata.TODataThreadsafe
 }
 
 func (h Handler) ResultChan() <-chan Result {
@@ -50,12 +45,12 @@ func (h Handler) ResultChan() <-chan Result {
 
 // NewHandler returns a new cache handler. Note this handler does NOT precomputes stat data before calling ResultChan, and Result.Precomputed will be nil
 func NewHandler() Handler {
-	return Handler{resultChan: make(chan Result), MultipleSpaceRegex: regexp.MustCompile(" +")}
+	return Handler{resultChan: make(chan Result)}
 }
 
 // NewPrecomputeHandler constructs a new cache Handler, which precomputes stat data and populates result.Precomputed before passing to ResultChan.
 func NewPrecomputeHandler(toData todata.TODataThreadsafe) Handler {
-	return Handler{resultChan: make(chan Result), MultipleSpaceRegex: regexp.MustCompile(" +"), ToData: &toData}
+	return Handler{resultChan: make(chan Result), ToData: &toData}
 }
 
 // Precompute returns whether this handler precomputes data before passing the result to the ResultChan
@@ -277,8 +272,8 @@ func StatsMarshall(statResultHistory ResultStatHistory, statInfo ResultInfoHisto
 }
 
 // Handle handles results fetched from a cache, parsing the raw Reader data and passing it along to a chan for further processing.
-func (handler Handler) Handle(id string, r io.Reader, reqTime time.Duration, reqEnd time.Time, reqErr error, pollID uint64, pollFinished chan<- uint64) {
-	log.Debugf("poll %v %v handle start\n", pollID, time.Now())
+func (handler Handler) Handle(id string, r io.Reader, format string, reqTime time.Duration, reqEnd time.Time, reqErr error, pollID uint64, pollFinished chan<- uint64) {
+	log.Debugf("poll %v %v (format '%v') handle start\n", pollID, time.Now(), format)
 	result := Result{
 		ID:           tc.CacheName(id),
 		Time:         reqEnd,
@@ -301,260 +296,36 @@ func (handler Handler) Handle(id string, r io.Reader, reqTime time.Duration, req
 		return
 	}
 
-	result.PrecomputedData.Reporting = true
-	result.PrecomputedData.Time = result.Time
+	statDecoder, ok := StatsTypeDecoders[format]
+	if !ok {
+		log.Errorf("Handler cache '%s' stat type '%s' not found! Returning handle error for this cache poll.\n", id, format)
+		result.Error = fmt.Errorf("handler stat type %s missing")
+		handler.resultChan <- result
+		return
+	}
 
-	if decodeErr := json.NewDecoder(r).Decode(&result.Astats); decodeErr != nil {
-		log.Warnf("%s procnetdev decode error '%v'\n", id, decodeErr)
+	decodeErr := error(nil)
+	if decodeErr, result.Astats.Ats, result.Astats.System = statDecoder.Parse(result.ID, r); decodeErr != nil {
+		log.Warnf("%s decode error '%v'\n", id, decodeErr)
 		result.Error = decodeErr
 		handler.resultChan <- result
 		return
 	}
 
 	if result.Astats.System.ProcNetDev == "" {
-		log.Warnf("addkbps %s procnetdev empty\n", id)
+		log.Warnf("Handler cache %s procnetdev empty\n", id)
 	}
-
 	if result.Astats.System.InfSpeed == 0 {
-		log.Warnf("addkbps %s inf.speed empty\n", id)
+		log.Warnf("Handler cache %s inf.speed empty\n", id)
 	}
 
-	if reqErr != nil {
-		result.Error = reqErr
-		log.Errorf("addkbps handle %s error '%v'\n", id, reqErr)
-	} else {
-		result.Available = true
-	}
+	result.Available = true
 
 	if handler.Precompute() {
-		result = handler.precompute(result)
+		result.PrecomputedData = statDecoder.Precompute(result.ID, handler.ToData.Get(), result.Astats.Ats, result.Astats.System)
 	}
+	result.PrecomputedData.Reporting = true
+	result.PrecomputedData.Time = result.Time
 
 	handler.resultChan <- result
 }
-
-// outBytes takes the proc.net.dev string, and the interface name, and returns the bytes field
-func outBytes(procNetDev, iface string, multipleSpaceRegex *regexp.Regexp) (int64, error) {
-	if procNetDev == "" {
-		return 0, fmt.Errorf("procNetDev empty")
-	}
-	if iface == "" {
-		return 0, fmt.Errorf("iface empty")
-	}
-	ifacePos := strings.Index(procNetDev, iface)
-	if ifacePos == -1 {
-		return 0, fmt.Errorf("interface '%s' not found in proc.net.dev '%s'", iface, procNetDev)
-	}
-
-	procNetDevIfaceBytes := procNetDev[ifacePos+len(iface)+1:]
-	procNetDevIfaceBytes = strings.TrimLeft(procNetDevIfaceBytes, " ")
-	procNetDevIfaceBytes = multipleSpaceRegex.ReplaceAllLiteralString(procNetDevIfaceBytes, " ")
-	procNetDevIfaceBytesArr := strings.Split(procNetDevIfaceBytes, " ") // this could be made faster with a custom function (DFA?) that splits and ignores duplicate spaces at the same time
-	if len(procNetDevIfaceBytesArr) < 10 {
-		return 0, fmt.Errorf("proc.net.dev iface '%v' unknown format '%s'", iface, procNetDev)
-	}
-	procNetDevIfaceBytes = procNetDevIfaceBytesArr[8]
-
-	return strconv.ParseInt(procNetDevIfaceBytes, 10, 64)
-}
-
-// precompute does the calculations which are possible with only this one cache result.
-// TODO precompute ResultStatVal
-func (handler Handler) precompute(result Result) Result {
-	todata := handler.ToData.Get()
-	stats := map[tc.DeliveryServiceName]dsdata.Stat{}
-
-	var err error
-	if result.PrecomputedData.OutBytes, err = outBytes(result.Astats.System.ProcNetDev, result.Astats.System.InfName, handler.MultipleSpaceRegex); err != nil {
-		result.PrecomputedData.OutBytes = 0
-		log.Errorf("addkbps %s handle precomputing outbytes '%v'\n", result.ID, err)
-	}
-
-	kbpsInMbps := int64(1000)
-	result.PrecomputedData.MaxKbps = int64(result.Astats.System.InfSpeed) * kbpsInMbps
-
-	for stat, value := range result.Astats.Ats {
-		var err error
-		stats, err = processStat(result.ID, stats, todata, stat, value, result.Time)
-		if err != nil && err != dsdata.ErrNotProcessedStat {
-			log.Infof("precomputing cache %v stat %v value %v error %v", result.ID, stat, value, err)
-			result.PrecomputedData.Errors = append(result.PrecomputedData.Errors, err)
-		}
-	}
-	result.PrecomputedData.DeliveryServiceStats = stats
-	return result
-}
-
-// processStat and its subsidiary functions act as a State Machine, flowing the stat thru states for each "." component of the stat name
-func processStat(server tc.CacheName, stats map[tc.DeliveryServiceName]dsdata.Stat, toData todata.TOData, stat string, value interface{}, timeReceived time.Time) (map[tc.DeliveryServiceName]dsdata.Stat, error) {
-	parts := strings.Split(stat, ".")
-	if len(parts) < 1 {
-		return stats, fmt.Errorf("stat has no initial part")
-	}
-
-	switch parts[0] {
-	case "plugin":
-		return processStatPlugin(server, stats, toData, stat, parts[1:], value, timeReceived)
-	case "proxy":
-		return stats, dsdata.ErrNotProcessedStat
-	case "server":
-		return stats, dsdata.ErrNotProcessedStat
-	default:
-		return stats, fmt.Errorf("stat '%s' has unknown initial part '%s'", stat, parts[0])
-	}
-}
-
-func processStatPlugin(server tc.CacheName, stats map[tc.DeliveryServiceName]dsdata.Stat, toData todata.TOData, stat string, statParts []string, value interface{}, timeReceived time.Time) (map[tc.DeliveryServiceName]dsdata.Stat, error) {
-	if len(statParts) < 1 {
-		return stats, fmt.Errorf("stat has no plugin part")
-	}
-	switch statParts[0] {
-	case "remap_stats":
-		return processStatPluginRemapStats(server, stats, toData, stat, statParts[1:], value, timeReceived)
-	default:
-		return stats, fmt.Errorf("stat has unknown plugin part '%s'", statParts[0])
-	}
-}
-
-func processStatPluginRemapStats(server tc.CacheName, stats map[tc.DeliveryServiceName]dsdata.Stat, toData todata.TOData, stat string, statParts []string, value interface{}, timeReceived time.Time) (map[tc.DeliveryServiceName]dsdata.Stat, error) {
-	if len(statParts) < 3 {
-		return stats, fmt.Errorf("stat has no remap_stats deliveryservice and name parts")
-	}
-
-	// the FQDN is `subsubdomain`.`subdomain`.`domain`. For a HTTP delivery service, `subsubdomain` will be the cache hostname; for a DNS delivery service, it will be `edge`. Then, `subdomain` is the delivery service regex.
-	subsubdomain := statParts[0]
-	subdomain := statParts[1]
-	domain := strings.Join(statParts[2:len(statParts)-1], ".")
-
-	ds, ok := toData.DeliveryServiceRegexes.DeliveryService(domain, subdomain, subsubdomain)
-	if !ok {
-		fqdn := fmt.Sprintf("%s.%s.%s", subsubdomain, subdomain, domain)
-		return stats, fmt.Errorf("ERROR no delivery service match for fqdn '%v' stat '%v'\n", fqdn, strings.Join(statParts, "."))
-	}
-	if ds == "" {
-		fqdn := fmt.Sprintf("%s.%s.%s", subsubdomain, subdomain, domain)
-		return stats, fmt.Errorf("ERROR EMPTY delivery service fqdn %v stat %v\n", fqdn, strings.Join(statParts, "."))
-	}
-
-	statName := statParts[len(statParts)-1]
-
-	dsStat, ok := stats[ds]
-	if !ok {
-		newStat := dsdata.NewStat()
-		dsStat = *newStat
-	}
-
-	if err := addCacheStat(&dsStat.TotalStats, statName, value); err != nil {
-		return stats, err
-	}
-
-	cachegroup, ok := toData.ServerCachegroups[server]
-	if !ok {
-		return stats, fmt.Errorf("server missing from TOData.ServerCachegroups")
-	}
-	dsStat.CacheGroups[cachegroup] = dsStat.TotalStats
-
-	cacheType, ok := toData.ServerTypes[server]
-	if !ok {
-		return stats, fmt.Errorf("server missing from TOData.ServerTypes")
-	}
-	dsStat.Types[cacheType] = dsStat.TotalStats
-
-	dsStat.Caches[server] = dsStat.TotalStats
-
-	dsStat.CachesTimeReceived[server] = timeReceived
-	stats[ds] = dsStat
-	return stats, nil
-}
-
-// addCacheStat adds the given stat to the existing stat. Note this adds, it doesn't overwrite. Numbers are summed, strings are concatenated.
-// TODO make this less duplicate code somehow.
-func addCacheStat(stat *dsdata.StatCacheStats, name string, val interface{}) error {
-	switch name {
-	case "status_2xx":
-		v, ok := val.(float64)
-		if !ok {
-			return fmt.Errorf("stat '%s' value expected int actual '%v' type %T", name, val, val)
-		}
-		stat.Status2xx.Value += int64(v)
-	case "status_3xx":
-		v, ok := val.(float64)
-		if !ok {
-			return fmt.Errorf("stat '%s' value expected int actual '%v' type %T", name, val, val)
-		}
-		stat.Status3xx.Value += int64(v)
-	case "status_4xx":
-		v, ok := val.(float64)
-		if !ok {
-			return fmt.Errorf("stat '%s' value expected int actual '%v' type %T", name, val, val)
-		}
-		stat.Status4xx.Value += int64(v)
-	case "status_5xx":
-		v, ok := val.(float64)
-		if !ok {
-			return fmt.Errorf("stat '%s' value expected int actual '%v' type %T", name, val, val)
-		}
-		stat.Status5xx.Value += int64(v)
-	case "out_bytes":
-		v, ok := val.(float64)
-		if !ok {
-			return fmt.Errorf("stat '%s' value expected int actual '%v' type %T", name, val, val)
-		}
-		stat.OutBytes.Value += int64(v)
-	case "is_available":
-		v, ok := val.(bool)
-		if !ok {
-			return fmt.Errorf("stat '%s' value expected bool actual '%v' type %T", name, val, val)
-		}
-		if v {
-			stat.IsAvailable.Value = true
-		}
-	case "in_bytes":
-		v, ok := val.(float64)
-		if !ok {
-			return fmt.Errorf("stat '%s' value expected int actual '%v' type %T", name, val, val)
-		}
-		stat.InBytes.Value += v
-	case "tps_2xx":
-		v, ok := val.(int64)
-		if !ok {
-			return fmt.Errorf("stat '%s' value expected int actual '%v' type %T", name, val, val)
-		}
-		stat.Tps2xx.Value += float64(v)
-	case "tps_3xx":
-		v, ok := val.(int64)
-		if !ok {
-			return fmt.Errorf("stat '%s' value expected int actual '%v' type %T", name, val, val)
-		}
-		stat.Tps3xx.Value += float64(v)
-	case "tps_4xx":
-		v, ok := val.(int64)
-		if !ok {
-			return fmt.Errorf("stat '%s' value expected int actual '%v' type %T", name, val, val)
-		}
-		stat.Tps4xx.Value += float64(v)
-	case "tps_5xx":
-		v, ok := val.(int64)
-		if !ok {
-			return fmt.Errorf("stat '%s' value expected int actual '%v' type %T", name, val, val)
-		}
-		stat.Tps5xx.Value += float64(v)
-	case "error_string":
-		v, ok := val.(string)
-		if !ok {
-			return fmt.Errorf("stat '%s' value expected string actual '%v' type %T", name, val, val)
-		}
-		stat.ErrorString.Value += v + ", "
-	case "tps_total":
-		v, ok := val.(float64)
-		if !ok {
-			return fmt.Errorf("stat '%s' value expected int actual '%v' type %T", name, val, val)
-		}
-		stat.TpsTotal.Value += v
-	case "status_unknown":
-		return dsdata.ErrNotProcessedStat
-	default:
-		return fmt.Errorf("unknown stat '%s'", name)
-	}
-	return nil
-}
diff --git a/traffic_monitor/cache/data_test.go b/traffic_monitor/cache/data_test.go
index 6d90774a6d..0ac20b500a 100644
--- a/traffic_monitor/cache/data_test.go
+++ b/traffic_monitor/cache/data_test.go
@@ -164,12 +164,11 @@ func randDsStat() dsdata.Stat {
 	}
 
 	return dsdata.Stat{
-		CommonStats:        randStatCommon(),
-		CacheGroups:        cacheGroups,
-		Types:              types,
-		Caches:             caches,
-		CachesTimeReceived: cachesTime,
-		TotalStats:         randStatCacheStats(),
+		CommonStats: randStatCommon(),
+		CacheGroups: cacheGroups,
+		Types:       types,
+		Caches:      caches,
+		TotalStats:  randStatCacheStats(),
 	}
 }
 
diff --git a/traffic_monitor/cache/stats_type_astats.go b/traffic_monitor/cache/stats_type_astats.go
new file mode 100644
index 0000000000..98b7919d49
--- /dev/null
+++ b/traffic_monitor/cache/stats_type_astats.go
@@ -0,0 +1,253 @@
+package cache
+
+// stats_type_astats is the default Stats format for Traffic Control.
+// It is the Stats format produced by the `astats` plugin to Apache Traffic Server, included with Traffic Control.
+//
+// Stats are of the form `{"ats": {"name", number}}`,
+// Where `name` is of the form:
+//   `"plugin.remap_stats.fully-qualfiied-domain-name.example.net.stat-name"`
+// Where `stat-name` is one of:
+//   `in_bytes`, `out_bytes`, `status_2xx`, `status_3xx`, `status_4xx`, `status_5xx`
+
+import (
+	"encoding/json"
+	"fmt"
+	"io"
+	"strconv"
+	"strings"
+
+	"github.com/apache/incubator-trafficcontrol/lib/go-log"
+	"github.com/apache/incubator-trafficcontrol/lib/go-tc"
+	"github.com/apache/incubator-trafficcontrol/traffic_monitor/dsdata"
+	"github.com/apache/incubator-trafficcontrol/traffic_monitor/todata"
+)
+
+func init() {
+	AddStatsType("astats", astatsParse, astatsPrecompute)
+}
+
+func astatsParse(cache tc.CacheName, r io.Reader) (error, map[string]interface{}, AstatsSystem) {
+	astats := Astats{}
+	err := json.NewDecoder(r).Decode(&astats)
+	return err, astats.Ats, astats.System
+}
+
+func astatsPrecompute(cache tc.CacheName, toData todata.TOData, rawStats map[string]interface{}, system AstatsSystem) PrecomputedData {
+	stats := map[tc.DeliveryServiceName]dsdata.Stat{}
+	precomputed := PrecomputedData{}
+	var err error
+	if precomputed.OutBytes, err = astatsOutBytes(system.ProcNetDev, system.InfName); err != nil {
+		precomputed.OutBytes = 0
+		log.Errorf("precomputeAstats %s handle precomputing outbytes '%v'\n", cache, err)
+	}
+
+	kbpsInMbps := int64(1000)
+	precomputed.MaxKbps = int64(system.InfSpeed) * kbpsInMbps
+
+	for stat, value := range rawStats {
+		var err error
+		stats, err = astatsProcessStat(cache, stats, toData, stat, value)
+		if err != nil && err != dsdata.ErrNotProcessedStat {
+			log.Infof("precomputing cache %v stat %v value %v error %v", cache, stat, value, err)
+			precomputed.Errors = append(precomputed.Errors, err)
+		}
+	}
+	precomputed.DeliveryServiceStats = stats
+	return precomputed
+}
+
+// outBytes takes the proc.net.dev string, and the interface name, and returns the bytes field
+func astatsOutBytes(procNetDev, iface string) (int64, error) {
+	if procNetDev == "" {
+		return 0, fmt.Errorf("procNetDev empty")
+	}
+	if iface == "" {
+		return 0, fmt.Errorf("iface empty")
+	}
+	ifacePos := strings.Index(procNetDev, iface)
+	if ifacePos == -1 {
+		return 0, fmt.Errorf("interface '%s' not found in proc.net.dev '%s'", iface, procNetDev)
+	}
+
+	procNetDevIfaceBytes := procNetDev[ifacePos+len(iface)+1:]
+	procNetDevIfaceBytesArr := strings.Fields(procNetDevIfaceBytes) // TODO test
+	if len(procNetDevIfaceBytesArr) < 10 {
+		return 0, fmt.Errorf("proc.net.dev iface '%v' unknown format '%s'", iface, procNetDev)
+	}
+	procNetDevIfaceBytes = procNetDevIfaceBytesArr[8]
+
+	return strconv.ParseInt(procNetDevIfaceBytes, 10, 64)
+}
+
+// astatsProcessStat and its subsidiary functions act as a State Machine, flowing the stat thru states for each "." component of the stat name
+func astatsProcessStat(server tc.CacheName, stats map[tc.DeliveryServiceName]dsdata.Stat, toData todata.TOData, stat string, value interface{}) (map[tc.DeliveryServiceName]dsdata.Stat, error) {
+	parts := strings.Split(stat, ".")
+	if len(parts) < 1 {
+		return stats, fmt.Errorf("stat has no initial part")
+	}
+
+	switch parts[0] {
+	case "plugin":
+		return astatsProcessStatPlugin(server, stats, toData, stat, parts[1:], value)
+	case "proxy":
+		return stats, dsdata.ErrNotProcessedStat
+	case "server":
+		return stats, dsdata.ErrNotProcessedStat
+	default:
+		return stats, fmt.Errorf("stat '%s' has unknown initial part '%s'", stat, parts[0])
+	}
+}
+
+func astatsProcessStatPlugin(server tc.CacheName, stats map[tc.DeliveryServiceName]dsdata.Stat, toData todata.TOData, stat string, statParts []string, value interface{}) (map[tc.DeliveryServiceName]dsdata.Stat, error) {
+	if len(statParts) < 1 {
+		return stats, fmt.Errorf("stat has no plugin part")
+	}
+	switch statParts[0] {
+	case "remap_stats":
+		return astatsProcessStatPluginRemapStats(server, stats, toData, stat, statParts[1:], value)
+	default:
+		return stats, fmt.Errorf("stat has unknown plugin part '%s'", statParts[0])
+	}
+}
+
+func astatsProcessStatPluginRemapStats(server tc.CacheName, stats map[tc.DeliveryServiceName]dsdata.Stat, toData todata.TOData, stat string, statParts []string, value interface{}) (map[tc.DeliveryServiceName]dsdata.Stat, error) {
+	if len(statParts) < 3 {
+		return stats, fmt.Errorf("stat has no remap_stats deliveryservice and name parts")
+	}
+
+	// the FQDN is `subsubdomain`.`subdomain`.`domain`. For a HTTP delivery service, `subsubdomain` will be the cache hostname; for a DNS delivery service, it will be `edge`. Then, `subdomain` is the delivery service regex.
+	subsubdomain := statParts[0]
+	subdomain := statParts[1]
+	domain := strings.Join(statParts[2:len(statParts)-1], ".")
+
+	ds, ok := toData.DeliveryServiceRegexes.DeliveryService(domain, subdomain, subsubdomain)
+	if !ok {
+		fqdn := fmt.Sprintf("%s.%s.%s", subsubdomain, subdomain, domain)
+		return stats, fmt.Errorf("ERROR no delivery service match for fqdn '%v' stat '%v'\n", fqdn, strings.Join(statParts, "."))
+	}
+	if ds == "" {
+		fqdn := fmt.Sprintf("%s.%s.%s", subsubdomain, subdomain, domain)
+		return stats, fmt.Errorf("ERROR EMPTY delivery service fqdn %v stat %v\n", fqdn, strings.Join(statParts, "."))
+	}
+
+	statName := statParts[len(statParts)-1]
+
+	dsStat, ok := stats[ds]
+	if !ok {
+		newStat := dsdata.NewStat()
+		dsStat = *newStat
+	}
+
+	if err := astatsAddCacheStat(&dsStat.TotalStats, statName, value); err != nil {
+		return stats, err
+	}
+
+	cachegroup, ok := toData.ServerCachegroups[server]
+	if !ok {
+		return stats, fmt.Errorf("server missing from TOData.ServerCachegroups")
+	}
+	dsStat.CacheGroups[cachegroup] = dsStat.TotalStats
+
+	cacheType, ok := toData.ServerTypes[server]
+	if !ok {
+		return stats, fmt.Errorf("server missing from TOData.ServerTypes")
+	}
+	dsStat.Types[cacheType] = dsStat.TotalStats
+
+	dsStat.Caches[server] = dsStat.TotalStats
+
+	stats[ds] = dsStat
+	return stats, nil
+}
+
+// addCacheStat adds the given stat to the existing stat. Note this adds, it doesn't overwrite. Numbers are summed, strings are concatenated.
+// TODO make this less duplicate code somehow.
+func astatsAddCacheStat(stat *dsdata.StatCacheStats, name string, val interface{}) error {
+	switch name {
+	case "status_2xx":
+		v, ok := val.(float64)
+		if !ok {
+			return fmt.Errorf("stat '%s' value expected int actual '%v' type %T", name, val, val)
+		}
+		stat.Status2xx.Value += int64(v)
+	case "status_3xx":
+		v, ok := val.(float64)
+		if !ok {
+			return fmt.Errorf("stat '%s' value expected int actual '%v' type %T", name, val, val)
+		}
+		stat.Status3xx.Value += int64(v)
+	case "status_4xx":
+		v, ok := val.(float64)
+		if !ok {
+			return fmt.Errorf("stat '%s' value expected int actual '%v' type %T", name, val, val)
+		}
+		stat.Status4xx.Value += int64(v)
+	case "status_5xx":
+		v, ok := val.(float64)
+		if !ok {
+			return fmt.Errorf("stat '%s' value expected int actual '%v' type %T", name, val, val)
+		}
+		stat.Status5xx.Value += int64(v)
+	case "out_bytes":
+		v, ok := val.(float64)
+		if !ok {
+			return fmt.Errorf("stat '%s' value expected int actual '%v' type %T", name, val, val)
+		}
+		stat.OutBytes.Value += int64(v)
+	case "is_available":
+		v, ok := val.(bool)
+		if !ok {
+			return fmt.Errorf("stat '%s' value expected bool actual '%v' type %T", name, val, val)
+		}
+		if v {
+			stat.IsAvailable.Value = true
+		}
+	case "in_bytes":
+		v, ok := val.(float64)
+		if !ok {
+			return fmt.Errorf("stat '%s' value expected int actual '%v' type %T", name, val, val)
+		}
+		stat.InBytes.Value += v
+	case "tps_2xx":
+		v, ok := val.(int64)
+		if !ok {
+			return fmt.Errorf("stat '%s' value expected int actual '%v' type %T", name, val, val)
+		}
+		stat.Tps2xx.Value += float64(v)
+	case "tps_3xx":
+		v, ok := val.(int64)
+		if !ok {
+			return fmt.Errorf("stat '%s' value expected int actual '%v' type %T", name, val, val)
+		}
+		stat.Tps3xx.Value += float64(v)
+	case "tps_4xx":
+		v, ok := val.(int64)
+		if !ok {
+			return fmt.Errorf("stat '%s' value expected int actual '%v' type %T", name, val, val)
+		}
+		stat.Tps4xx.Value += float64(v)
+	case "tps_5xx":
+		v, ok := val.(int64)
+		if !ok {
+			return fmt.Errorf("stat '%s' value expected int actual '%v' type %T", name, val, val)
+		}
+		stat.Tps5xx.Value += float64(v)
+	case "error_string":
+		v, ok := val.(string)
+		if !ok {
+			return fmt.Errorf("stat '%s' value expected string actual '%v' type %T", name, val, val)
+		}
+		stat.ErrorString.Value += v + ", "
+	case "tps_total":
+		v, ok := val.(float64)
+		if !ok {
+			return fmt.Errorf("stat '%s' value expected int actual '%v' type %T", name, val, val)
+		}
+		stat.TpsTotal.Value += v
+	case "status_unknown":
+		return dsdata.ErrNotProcessedStat
+	default:
+		return fmt.Errorf("unknown stat '%s'", name)
+	}
+	return nil
+}
diff --git a/traffic_monitor/cache/stats_types.go b/traffic_monitor/cache/stats_types.go
new file mode 100644
index 0000000000..ee18623cbd
--- /dev/null
+++ b/traffic_monitor/cache/stats_types.go
@@ -0,0 +1,48 @@
+package cache
+
+import (
+	"io"
+
+	"github.com/apache/incubator-trafficcontrol/lib/go-tc"
+	"github.com/apache/incubator-trafficcontrol/traffic_monitor/todata"
+)
+
+//
+// To create a new Stats Type, for a custom caching proxy with its own stats format:
+//
+// 1. Create a file for your type in this directory and package, `traffic_monitor/cache/`
+// 2. Create Parse and Precompute functions in your file, with the signature of `StatsTypeParser` and `StatsTypePrecomputer`
+// 3. In your file, add `func init(){AddStatsType(myTypeParser, myTypePrecomputer})`
+//
+// Your Parser should take the raw bytes from the `io.Reader` and populate the raw stats from them. For maximum compatibility, the names of these should be of the same form as Apache Traffic Server's `stats_over_http`, of the form "plugin.remap_stats.delivery-service-fqdn.com.in_bytes" et cetera. Traffic Control _may_ work with custom stat names, but we don't currently guarantee it.
+//
+// Your Precomputer should take the Stats and System information your Parser created, and populate the PrecomputedData. It is essential that all PrecomputedData fields are populated, especially `DeliveryServiceStats`, as they are used for cache and delivery service availability and threshold computation. If PrecomputedData is not properly and fully populated, the cache's availability will not be properly computed.
+//
+// Note this function is not called for Health polls, only Stat polls. Your Cache should have two separate stats endpoints: a small light endpoint returning only system stats and used to quickly verify reachability, and a large endpoint with all stats. If your cache does not have two stat endpoints, you may use your large stat endpoint for the Health poll, and configure the Health poll interval to be arbitrarily slow.
+//
+// Note the PrecomputedData `Reporting` and `Time` fields are the exception: they do not need to be set, and will be forcibly overridden by the Handler after your Precomputer function returns.
+//
+// Note your stats functions SHOULD NOT reuse functions from other stats types, even if they are similar, or have identical helper functions. This is a case where "duplicate" code is acceptable, because it's not conceptually duplicate. You don't want your stat parsers to break if the similar stats format you reuse code from changes.
+//
+
+const DefaultStatsType = "astats"
+
+// CacheStatsTypeDecoder is a pair of functions registered for decoding a particular Stats type, for parsing stats, and creating precomputed data
+type StatsTypeDecoder struct {
+	Parse      StatsTypeParser
+	Precompute StatsTypePrecomputer
+}
+
+// StatsTypeParser takes the bytes returned from the cache's stats endpoint, along with the cache name, and returns the map of raw stats (whose names must be strings, and values may be any primitive type but MUST be float64 if they are used by a Parameter Threshold) and System information.
+type StatsTypeParser func(cache tc.CacheName, r io.Reader) (error, map[string]interface{}, AstatsSystem)
+
+// StatsTypePrecomputer takes the cache name, the time the given stats were received, the Traffic Ops data, and the raw stats and system information created by Parse, and returns the PrecomputedData. Note this will only be called for Stats polls, not Health polls. Note errors should be returned in PrecomputedData.Errors
+//
+type StatsTypePrecomputer func(cache tc.CacheName, toData todata.TOData, stats map[string]interface{}, system AstatsSystem) PrecomputedData
+
+// StatsTypeDecoders holds the functions for parsing cache stats. This is not const, because Go doesn't allow constant maps. This is populated on startup, and MUST NOT be modified after startup.
+var StatsTypeDecoders = map[string]StatsTypeDecoder{}
+
+func AddStatsType(typeName string, parser StatsTypeParser, precomputer StatsTypePrecomputer) {
+	StatsTypeDecoders[typeName] = StatsTypeDecoder{Parse: parser, Precompute: precomputer}
+}
diff --git a/traffic_monitor/ds/stat.go b/traffic_monitor/ds/stat.go
index 8ef2244b0e..82008bdbf6 100644
--- a/traffic_monitor/ds/stat.go
+++ b/traffic_monitor/ds/stat.go
@@ -383,7 +383,6 @@ func CreateStats(precomputed map[tc.CacheName]cache.PrecomputedData, toData toda
 			httpDsStat.CacheGroups[cachegroup] = httpDsStat.CacheGroups[cachegroup].Sum(resultStat.CacheGroups[cachegroup])
 			httpDsStat.Types[serverType] = httpDsStat.Types[serverType].Sum(resultStat.Types[serverType])
 			httpDsStat.Caches[server] = httpDsStat.Caches[server].Sum(resultStat.Caches[server])
-			httpDsStat.CachesTimeReceived[server] = resultStat.CachesTimeReceived[server]
 			httpDsStat.CommonStats = dsStats.DeliveryService[ds].CommonStats
 			dsStats.DeliveryService[ds] = httpDsStat // TODO determine if necessary
 		}
diff --git a/traffic_monitor/dsdata/stat.go b/traffic_monitor/dsdata/stat.go
index 80e9010456..43d168757d 100644
--- a/traffic_monitor/dsdata/stat.go
+++ b/traffic_monitor/dsdata/stat.go
@@ -223,12 +223,11 @@ func (a StatCacheStats) Sum(b StatCacheStats) StatCacheStats {
 
 // Stat represents a complete delivery service stat, for a given poll, or at the time requested.
 type Stat struct {
-	CommonStats        StatCommon
-	CacheGroups        map[tc.CacheGroupName]StatCacheStats
-	Types              map[tc.CacheType]StatCacheStats
-	Caches             map[tc.CacheName]StatCacheStats
-	CachesTimeReceived map[tc.CacheName]time.Time
-	TotalStats         StatCacheStats
+	CommonStats StatCommon
+	CacheGroups map[tc.CacheGroupName]StatCacheStats
+	Types       map[tc.CacheType]StatCacheStats
+	Caches      map[tc.CacheName]StatCacheStats
+	TotalStats  StatCacheStats
 }
 
 // ErrNotProcessedStat indicates a stat received is not used by Traffic Monitor, nor returned by any API endpoint. Receiving this error indicates the stat has been discarded.
@@ -237,23 +236,21 @@ var ErrNotProcessedStat = errors.New("This stat is not used.")
 // NewStat returns a new delivery service Stat, initializing pointer members.
 func NewStat() *Stat {
 	return &Stat{
-		CacheGroups:        map[tc.CacheGroupName]StatCacheStats{},
-		Types:              map[tc.CacheType]StatCacheStats{},
-		CommonStats:        StatCommon{CachesReporting: map[tc.CacheName]bool{}},
-		Caches:             map[tc.CacheName]StatCacheStats{},
-		CachesTimeReceived: map[tc.CacheName]time.Time{},
+		CacheGroups: map[tc.CacheGroupName]StatCacheStats{},
+		Types:       map[tc.CacheType]StatCacheStats{},
+		CommonStats: StatCommon{CachesReporting: map[tc.CacheName]bool{}},
+		Caches:      map[tc.CacheName]StatCacheStats{},
 	}
 }
 
 // Copy performs a deep copy of this Stat. It does not modify, and is thus safe for multiple goroutines.
 func (a Stat) Copy() Stat {
 	b := Stat{
-		CommonStats:        a.CommonStats.Copy(),
-		TotalStats:         a.TotalStats,
-		CacheGroups:        map[tc.CacheGroupName]StatCacheStats{},
-		Types:              map[tc.CacheType]StatCacheStats{},
-		Caches:             map[tc.CacheName]StatCacheStats{},
-		CachesTimeReceived: map[tc.CacheName]time.Time{},
+		CommonStats: a.CommonStats.Copy(),
+		TotalStats:  a.TotalStats,
+		CacheGroups: map[tc.CacheGroupName]StatCacheStats{},
+		Types:       map[tc.CacheType]StatCacheStats{},
+		Caches:      map[tc.CacheName]StatCacheStats{},
 	}
 	for k, v := range a.CacheGroups {
 		b.CacheGroups[k] = v
@@ -264,9 +261,6 @@ func (a Stat) Copy() Stat {
 	for k, v := range a.Caches {
 		b.Caches[k] = v
 	}
-	for k, v := range a.CachesTimeReceived {
-		b.CachesTimeReceived[k] = v
-	}
 	return b
 }
 
diff --git a/traffic_monitor/fetcher/fetcher.go b/traffic_monitor/fetcher/fetcher.go
index 8f18eed5ba..0a9238555e 100644
--- a/traffic_monitor/fetcher/fetcher.go
+++ b/traffic_monitor/fetcher/fetcher.go
@@ -30,7 +30,7 @@ import (
 )
 
 type Fetcher interface {
-	Fetch(id string, url string, host string, pollId uint64, pollFinishedChan chan<- uint64)
+	Fetch(id string, url string, host string, format string, pollId uint64, pollFinishedChan chan<- uint64)
 }
 
 type HttpFetcher struct {
@@ -46,7 +46,7 @@ type Result struct {
 	Error  error
 }
 
-func (f HttpFetcher) Fetch(id string, url string, host string, pollId uint64, pollFinishedChan chan<- uint64) {
+func (f HttpFetcher) Fetch(id string, url string, host string, format string, pollId uint64, pollFinishedChan chan<- uint64) {
 	log.Debugf("poll %v %v fetch start\n", pollId, time.Now())
 	req, err := http.NewRequest("GET", url, nil)
 	// TODO: change this to use f.Headers. -jse
@@ -76,8 +76,8 @@ func (f HttpFetcher) Fetch(id string, url string, host string, pollId uint64, po
 
 	if err == nil && response != nil {
 		log.Debugf("poll %v %v fetch end\n", pollId, time.Now())
-		f.Handler.Handle(id, response.Body, reqTime, reqEnd, err, pollId, pollFinishedChan)
+		f.Handler.Handle(id, response.Body, format, reqTime, reqEnd, err, pollId, pollFinishedChan)
 	} else {
-		f.Handler.Handle(id, nil, reqTime, reqEnd, err, pollId, pollFinishedChan)
+		f.Handler.Handle(id, nil, format, reqTime, reqEnd, err, pollId, pollFinishedChan)
 	}
 }
diff --git a/traffic_monitor/handler/handler.go b/traffic_monitor/handler/handler.go
index 3b001c62ae..5777bb7d31 100644
--- a/traffic_monitor/handler/handler.go
+++ b/traffic_monitor/handler/handler.go
@@ -40,5 +40,5 @@ type OpsConfig struct {
 }
 
 type Handler interface {
-	Handle(string, io.Reader, time.Duration, time.Time, error, uint64, chan<- uint64)
+	Handle(string, io.Reader, string, time.Duration, time.Time, error, uint64, chan<- uint64)
 }
diff --git a/traffic_monitor/manager/monitorconfig.go b/traffic_monitor/manager/monitorconfig.go
index 1291fc0a15..c33ee451ef 100644
--- a/traffic_monitor/manager/monitorconfig.go
+++ b/traffic_monitor/manager/monitorconfig.go
@@ -28,6 +28,7 @@ import (
 	"github.com/apache/incubator-trafficcontrol/lib/go-log"
 	"github.com/apache/incubator-trafficcontrol/lib/go-tc"
 	"github.com/apache/incubator-trafficcontrol/traffic_monitor/config"
+	"github.com/apache/incubator-trafficcontrol/traffic_monitor/cache"
 	"github.com/apache/incubator-trafficcontrol/traffic_monitor/peer"
 	"github.com/apache/incubator-trafficcontrol/traffic_monitor/poller"
 	"github.com/apache/incubator-trafficcontrol/traffic_monitor/threadsafe"
@@ -253,6 +254,13 @@ func monitorConfigListen(
 				log.Errorf("monitor config server %v profile %v has no polling URL; can't poll", srv.HostName, srv.Profile)
 				continue
 			}
+
+			format := monitorConfig.Profile[srv.Profile].Parameters.HealthPollingFormat
+			if format == "" {
+				format = cache.DefaultStatsType
+				log.Infof("health.polling.format for '%v' is empty, using default '%v'", srv.HostName, format)
+			}
+
 			r := strings.NewReplacer(
 				"${hostname}", srv.IP,
 				"${interface_name}", srv.InterfaceName,
@@ -267,10 +275,10 @@ func monitorConfigListen(
 				log.Warnln("profile " + srv.Profile + " health.connection.timeout Parameter is missing or zero, using default " + DefaultHealthConnectionTimeout.String())
 			}
 
-			healthURLs[srv.HostName] = poller.PollConfig{URL: url, Host: srv.FQDN, Timeout: connTimeout}
+			healthURLs[srv.HostName] = poller.PollConfig{URL: url, Host: srv.FQDN, Timeout: connTimeout, Format: format}
 			r = strings.NewReplacer("application=system", "application=")
 			statURL := r.Replace(url)
-			statURLs[srv.HostName] = poller.PollConfig{URL: statURL, Host: srv.FQDN, Timeout: connTimeout}
+			statURLs[srv.HostName] = poller.PollConfig{URL: statURL, Host: srv.FQDN, Timeout: connTimeout, Format: format}
 		}
 
 		peerSet := map[tc.TrafficMonitorName]struct{}{}
diff --git a/traffic_monitor/peer/peer.go b/traffic_monitor/peer/peer.go
index 384d787595..21d879b114 100644
--- a/traffic_monitor/peer/peer.go
+++ b/traffic_monitor/peer/peer.go
@@ -30,7 +30,6 @@ import (
 // Handler handles peer Traffic Monitor data, taking a raw reader, parsing the data, and passing a result object to the ResultChannel. This fulfills the common `Handler` interface.
 type Handler struct {
 	ResultChannel chan Result
-	Notify        int
 }
 
 // NewHandler returns a new peer Handler.
@@ -50,7 +49,7 @@ type Result struct {
 }
 
 // Handle handles a response from a polled Traffic Monitor peer, parsing the data and forwarding it to the ResultChannel.
-func (handler Handler) Handle(id string, r io.Reader, reqTime time.Duration, reqEnd time.Time, err error, pollID uint64, pollFinished chan<- uint64) {
+func (handler Handler) Handle(id string, r io.Reader, format string, reqTime time.Duration, reqEnd time.Time, err error, pollID uint64, pollFinished chan<- uint64) {
 	result := Result{
 		ID:           tc.TrafficMonitorName(id),
 		Available:    false,
diff --git a/traffic_monitor/poller/poller.go b/traffic_monitor/poller/poller.go
index 84f5f7b387..a8fff5e1f5 100644
--- a/traffic_monitor/poller/poller.go
+++ b/traffic_monitor/poller/poller.go
@@ -49,6 +49,7 @@ type PollConfig struct {
 	Host    string
 	Timeout time.Duration
 	Handler handler.Handler
+	Format  string
 }
 
 type HttpPollerConfig struct {
@@ -177,11 +178,8 @@ var debugPollNum uint64
 type HTTPPollInfo struct {
 	NoKeepAlive bool
 	Interval    time.Duration
-	Timeout     time.Duration
 	ID          string
-	URL         string
-	Host        string
-	Handler     handler.Handler
+	PollConfig
 }
 
 func (p HttpPoller) Poll() {
@@ -219,7 +217,7 @@ func (p HttpPoller) Poll() {
 					}
 				}
 			}
-			go poller(info.Interval, info.ID, info.URL, info.Host, fetcher, kill)
+			go poller(info.Interval, info.ID, info.URL, info.Host, info.Format, fetcher, kill)
 		}
 		p.Config = newConfig
 	}
@@ -235,7 +233,7 @@ func mustDie(die <-chan struct{}) bool {
 }
 
 // TODO iterationCount and/or p.TickChan?
-func poller(interval time.Duration, id string, url string, host string, fetcher fetcher.Fetcher, die <-chan struct{}) {
+func poller(interval time.Duration, id string, url string, host string, format string, fetcher fetcher.Fetcher, die <-chan struct{}) {
 	pollSpread := time.Duration(rand.Float64()*float64(interval/time.Nanosecond)) * time.Nanosecond
 	time.Sleep(pollSpread)
 	tick := time.NewTicker(interval)
@@ -252,7 +250,7 @@ func poller(interval time.Duration, id string, url string, host string, fetcher
 			pollId := atomic.AddUint64(&debugPollNum, 1)
 			pollFinishedChan := make(chan uint64)
 			log.Debugf("poll %v %v start\n", pollId, time.Now())
-			go fetcher.Fetch(id, url, host, pollId, pollFinishedChan) // TODO persist fetcher, with its own die chan?
+			go fetcher.Fetch(id, url, host, format, pollId, pollFinishedChan) // TODO persist fetcher, with its own die chan?
 			<-pollFinishedChan
 		case <-die:
 			tick.Stop()
@@ -275,9 +273,7 @@ func diffConfigs(old HttpPollerConfig, new HttpPollerConfig) ([]string, []HTTPPo
 				Interval:    new.Interval,
 				NoKeepAlive: new.NoKeepAlive,
 				ID:          id,
-				URL:         pollCfg.URL,
-				Host:        pollCfg.Host,
-				Timeout:     pollCfg.Timeout,
+				PollConfig: pollCfg,
 			})
 		}
 		return deletions, additions
@@ -293,9 +289,7 @@ func diffConfigs(old HttpPollerConfig, new HttpPollerConfig) ([]string, []HTTPPo
 				Interval:    new.Interval,
 				NoKeepAlive: new.NoKeepAlive,
 				ID:          id,
-				URL:         newPollCfg.URL,
-				Host:        newPollCfg.Host,
-				Timeout:     newPollCfg.Timeout,
+				PollConfig: newPollCfg,
 			})
 		}
 	}
@@ -307,9 +301,7 @@ func diffConfigs(old HttpPollerConfig, new HttpPollerConfig) ([]string, []HTTPPo
 				Interval:    new.Interval,
 				NoKeepAlive: new.NoKeepAlive,
 				ID:          id,
-				URL:         newPollCfg.URL,
-				Host:        newPollCfg.Host,
-				Timeout:     newPollCfg.Timeout,
+				PollConfig: newPollCfg,
 			})
 		}
 	}


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services