You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@trafficcontrol.apache.org by el...@apache.org on 2017/01/11 22:03:12 UTC

[2/3] incubator-trafficcontrol git commit: Add TM2 stat spans, to reduce memory

Add TM2 stat spans, to reduce memory

This restructures the structs for stats, so that when Astats returns
a result with a stat which hasn't changed, instead of storing the
name and value for that stat again, the last received stat has a
'span' counter incremented. This drastically reduces memory usage.


Project: http://git-wip-us.apache.org/repos/asf/incubator-trafficcontrol/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-trafficcontrol/commit/01a3f8df
Tree: http://git-wip-us.apache.org/repos/asf/incubator-trafficcontrol/tree/01a3f8df
Diff: http://git-wip-us.apache.org/repos/asf/incubator-trafficcontrol/diff/01a3f8df

Branch: refs/heads/master
Commit: 01a3f8df56500dacd9e31d5bdb723fb885707a8e
Parents: 554db0f
Author: Robert Butts <ro...@gmail.com>
Authored: Wed Jan 11 14:03:58 2017 -0700
Committer: Jeff Elsloo <je...@cable.comcast.com>
Committed: Wed Jan 11 15:02:05 2017 -0700

----------------------------------------------------------------------
 .../experimental/traffic_monitor/cache/cache.go |  49 ++---
 .../experimental/traffic_monitor/cache/data.go  | 150 +++++++++++++
 .../traffic_monitor/deliveryservice/stat.go     |  57 ++---
 .../traffic_monitor/manager/datarequest.go      | 211 +++++++++++--------
 .../traffic_monitor/manager/healthresult.go     |   6 -
 .../traffic_monitor/manager/manager.go          |   7 +-
 .../traffic_monitor/manager/opsconfig.go        |   8 +-
 .../traffic_monitor/manager/stathistory.go      |  52 ++++-
 .../threadsafe/cachemaxkbpses.go                |  52 +++++
 .../threadsafe/resultstathistory.go             |  83 ++++++++
 10 files changed, 497 insertions(+), 178 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-trafficcontrol/blob/01a3f8df/traffic_monitor/experimental/traffic_monitor/cache/cache.go
----------------------------------------------------------------------
diff --git a/traffic_monitor/experimental/traffic_monitor/cache/cache.go b/traffic_monitor/experimental/traffic_monitor/cache/cache.go
index 8d973d5..539a2e1 100644
--- a/traffic_monitor/experimental/traffic_monitor/cache/cache.go
+++ b/traffic_monitor/experimental/traffic_monitor/cache/cache.go
@@ -63,11 +63,12 @@ func (handler Handler) Precompute() bool {
 
 // PrecomputedData represents data parsed and pre-computed from the Result.
 type PrecomputedData struct {
-	DeliveryServiceStats map[enum.DeliveryServiceName]dsdata.Stat
-	OutBytes             int64
+	DeliveryServiceStats map[enum.DeliveryServiceName]dsdata.Stat // x
+	OutBytes             int64                                    // x
 	MaxKbps              int64
 	Errors               []error
-	Reporting            bool
+	Reporting            bool // x
+	Time                 time.Time
 }
 
 // Result is the data result returned by a cache.
@@ -102,7 +103,7 @@ type Stat struct {
 // Stats is designed for returning via the API. It contains result history for each cache, as well as common API data.
 type Stats struct {
 	srvhttp.CommonAPIData
-	Caches map[enum.CacheName]map[string][]Stat `json:"caches"`
+	Caches map[enum.CacheName]map[string][]ResultStatVal `json:"caches"`
 }
 
 // Filter filters whether stats and caches should be returned from a data set.
@@ -113,41 +114,33 @@ type Filter interface {
 }
 
 // StatsMarshall encodes the stats in JSON, encoding up to historyCount of each stat. If statsToUse is empty, all stats are encoded; otherwise, only the given stats are encoded. If wildcard is true, stats which contain the text in each statsToUse are returned, instead of exact stat names. If cacheType is not CacheTypeInvalid, only stats for the given type are returned. If hosts is not empty, only the given hosts are returned.
-func StatsMarshall(statHistory map[enum.CacheName][]Result, filter Filter, params url.Values) ([]byte, error) {
+func StatsMarshall(statResultHistory ResultStatHistory, filter Filter, params url.Values) ([]byte, error) {
 	stats := Stats{
 		CommonAPIData: srvhttp.GetCommonAPIData(params, time.Now()),
-		Caches:        map[enum.CacheName]map[string][]Stat{},
+		Caches:        map[enum.CacheName]map[string][]ResultStatVal{},
 	}
 
 	// TODO in 1.0, stats are divided into 'location', 'cache', and 'type'. 'cache' are hidden by default.
 
-	for id, history := range statHistory {
+	for id, history := range statResultHistory {
 		if !filter.UseCache(id) {
 			continue
 		}
-		historyCount := 1
-		for _, result := range history {
-			if !filter.WithinStatHistoryMax(historyCount) {
-				break
+		for stat, vals := range history {
+			stat = "ats." + stat // TM1 prefixes ATS stats with 'ats.'
+			if !filter.UseStat(stat) {
+				continue
 			}
-			historyCount++
-			for stat, value := range result.Astats.Ats {
-				stat = "ats." + stat // TM 1.0 prefixes ATS stats with 'ats.'
-				if !filter.UseStat(stat) {
-					continue
+			historyCount := 1
+			for _, val := range vals {
+				if !filter.WithinStatHistoryMax(historyCount) {
+					break
 				}
-				s := Stat{
-					Time:  result.Time.UnixNano() / 1000000,
-					Value: value,
+				if _, ok := stats.Caches[id]; !ok {
+					stats.Caches[id] = map[string][]ResultStatVal{}
 				}
-
-				_, exists := stats.Caches[id]
-
-				if !exists {
-					stats.Caches[id] = map[string][]Stat{}
-				}
-
-				stats.Caches[id][stat] = append(stats.Caches[id][stat], Stat{Time: s.Time, Value: fmt.Sprintf("%v", s.Value)}) // convert stats to strings, for the TM1.0 /publish/CacheStats API
+				stats.Caches[id][stat] = append(stats.Caches[id][stat], val)
+				historyCount += int(val.Span)
 			}
 		}
 	}
@@ -181,6 +174,7 @@ func (handler Handler) Handle(id string, r io.Reader, reqTime time.Duration, req
 	}
 
 	result.PrecomputedData.Reporting = true
+	result.PrecomputedData.Time = result.Time
 
 	if decodeErr := json.NewDecoder(r).Decode(&result.Astats); decodeErr != nil {
 		log.Errorf("%s procnetdev decode error '%v'\n", id, decodeErr)
@@ -242,6 +236,7 @@ func outBytes(procNetDev, iface string, multipleSpaceRegex *regexp.Regexp) (int6
 }
 
 // precompute does the calculations which are possible with only this one cache result.
+// TODO precompute ResultStatVal
 func (handler Handler) precompute(result Result) Result {
 	todata := handler.ToData.Get()
 	stats := map[enum.DeliveryServiceName]dsdata.Stat{}

http://git-wip-us.apache.org/repos/asf/incubator-trafficcontrol/blob/01a3f8df/traffic_monitor/experimental/traffic_monitor/cache/data.go
----------------------------------------------------------------------
diff --git a/traffic_monitor/experimental/traffic_monitor/cache/data.go b/traffic_monitor/experimental/traffic_monitor/cache/data.go
index 16abbf6..087995a 100644
--- a/traffic_monitor/experimental/traffic_monitor/cache/data.go
+++ b/traffic_monitor/experimental/traffic_monitor/cache/data.go
@@ -20,6 +20,9 @@ package cache
  */
 
 import (
+	"fmt"
+	"time"
+
 	"github.com/apache/incubator-trafficcontrol/traffic_monitor/experimental/traffic_monitor/enum"
 )
 
@@ -74,3 +77,150 @@ func (a ResultHistory) Copy() ResultHistory {
 	}
 	return b
 }
+
+// ResultStatHistory is a map[cache][statName]val
+type ResultStatHistory map[enum.CacheName]ResultStatValHistory
+
+type ResultStatValHistory map[string][]ResultStatVal
+
+// ResultStatVal is the value of an individual stat returned from a poll. Time is the time this stat was returned.
+// Span is the number of polls this stat has been the same. For example, if History is set to 100, and the last 50 polls had the same value for this stat (but none of the previous 50 were the same), this stat's map value slice will actually contain 51 entries, and the first entry will have the value, the time of the last poll, and a Span of 50. Assuming the poll time is every 8 seconds, users will then know, looking at the Span, that the value was unchanged for the last 50*8=400 seconds.
+// JSON values are all strings, for the TM1.0 /publish/CacheStats API.
+type ResultStatVal struct {
+	Val  interface{} `json:"value,string"`
+	Time TM1Time     `json:"time,string"`
+	Span uint64      `json:"span,string"`
+}
+
+// TM1Time provides a custom MarshalJSON func to serialise a time.Time into milliseconds since the epoch, as served in Traffic Monitor 1.x APIs
+type TM1Time time.Time
+
+func (t *TM1Time) MarshalJSON() ([]byte, error) {
+	NanosecondsPerMillisecond := int64(1000000)
+	it := (*time.Time)(t).UnixNano() / NanosecondsPerMillisecond
+	return []byte(fmt.Sprintf("%d", it)), nil
+}
+
+func copyResultStatVals(a []ResultStatVal) []ResultStatVal {
+	b := make([]ResultStatVal, len(a), len(a))
+	copy(b, a)
+	return b
+}
+
+func copyResultStatValHistory(a ResultStatValHistory) ResultStatValHistory {
+	b := ResultStatValHistory{}
+	for k, v := range a {
+		b[k] = copyResultStatVals(v) // TODO determine if necessary
+	}
+	return b
+}
+
+func (a ResultStatHistory) Copy() ResultStatHistory {
+	b := ResultStatHistory{}
+	for k, v := range a {
+		b[k] = copyResultStatValHistory(v)
+	}
+	return b
+}
+
+func pruneStats(history []ResultStatVal, limit uint64) []ResultStatVal {
+	if uint64(len(history)) > limit {
+		history = history[:limit-1]
+	}
+	return history
+}
+
+func (a ResultStatHistory) Add(r Result, limit uint64) {
+	for statName, statVal := range r.Astats.Ats {
+		statHistory := a[r.ID][statName]
+		// If the new stat value is the same as the last, update the time and increment the span. Span is the number of polls the latest value has been the same, and hence the length of time it's been the same is span*pollInterval.
+		if len(statHistory) > 0 && statHistory[0].Val == statVal {
+			statHistory[0].Time = TM1Time(r.Time)
+			statHistory[0].Span++
+		} else {
+			resultVal := ResultStatVal{
+				Val:  statVal,
+				Time: TM1Time(r.Time),
+				Span: 1,
+			}
+			statHistory = pruneStats(append([]ResultStatVal{resultVal}, statHistory...), limit)
+		}
+		if _, ok := a[r.ID]; !ok {
+			a[r.ID] = ResultStatValHistory{}
+		}
+		a[r.ID][statName] = statHistory // TODO determine if necessary for the first conditional
+	}
+}
+
+type ResultInfoHistory map[enum.CacheName][]ResultInfo
+
+// ResultInfo contains all the non-stat result info. This includes the cache ID, any errors, the time of the poll, the request time duration, Astats System (Vitals), Poll ID, and Availability.
+type ResultInfo struct {
+	ID          enum.CacheName
+	Error       error
+	Time        time.Time
+	RequestTime time.Duration
+	Vitals      Vitals
+	PollID      uint64
+	Available   bool
+}
+
+func toInfo(r Result) ResultInfo {
+	return ResultInfo{
+		ID:          r.ID,
+		Error:       r.Error,
+		Time:        r.Time,
+		RequestTime: r.RequestTime,
+		Vitals:      r.Vitals,
+		PollID:      r.PollID,
+		Available:   r.Available,
+	}
+}
+
+func toInfos(rs []Result) []ResultInfo {
+	infos := make([]ResultInfo, len(rs), len(rs))
+	for i, r := range rs {
+		infos[i] = toInfo(r)
+	}
+	return infos
+}
+
+func copyResultInfos(a []ResultInfo) []ResultInfo {
+	b := make([]ResultInfo, len(a), len(a))
+	copy(b, a)
+	return b
+}
+
+func (a ResultInfoHistory) Copy() ResultInfoHistory {
+	b := ResultInfoHistory{}
+	for k, v := range a {
+		b[k] = copyResultInfos(v) // TODO determine if copy is necessary
+	}
+	return b
+}
+
+func pruneInfos(history []ResultInfo, limit uint64) []ResultInfo {
+	if uint64(len(history)) > limit {
+		history = history[:limit-1]
+	}
+	return history
+}
+
+func (a ResultInfoHistory) Add(r Result, limit uint64) {
+	a[r.ID] = pruneInfos(append([]ResultInfo{toInfo(r)}, a[r.ID]...), limit)
+}
+
+// Kbpses is the kbps values of each cache.
+type Kbpses map[enum.CacheName]int64
+
+func (a Kbpses) Copy() Kbpses {
+	b := Kbpses{}
+	for k, v := range a {
+		b[k] = v
+	}
+	return b
+}
+
+func (a Kbpses) AddMax(r Result) {
+	a[r.ID] = r.PrecomputedData.MaxKbps
+}

http://git-wip-us.apache.org/repos/asf/incubator-trafficcontrol/blob/01a3f8df/traffic_monitor/experimental/traffic_monitor/deliveryservice/stat.go
----------------------------------------------------------------------
diff --git a/traffic_monitor/experimental/traffic_monitor/deliveryservice/stat.go b/traffic_monitor/experimental/traffic_monitor/deliveryservice/stat.go
index d47350e..538eb02 100644
--- a/traffic_monitor/experimental/traffic_monitor/deliveryservice/stat.go
+++ b/traffic_monitor/experimental/traffic_monitor/deliveryservice/stat.go
@@ -72,7 +72,7 @@ func setStaticData(dsStats Stats, dsServers map[enum.DeliveryServiceName][]enum.
 	return dsStats
 }
 
-func addAvailableData(dsStats Stats, crStates peer.Crstates, serverCachegroups map[enum.CacheName]enum.CacheGroupName, serverDs map[enum.CacheName][]enum.DeliveryServiceName, serverTypes map[enum.CacheName]enum.CacheType, statHistory map[enum.CacheName][]cache.Result) (Stats, error) {
+func addAvailableData(dsStats Stats, crStates peer.Crstates, serverCachegroups map[enum.CacheName]enum.CacheGroupName, serverDs map[enum.CacheName][]enum.DeliveryServiceName, serverTypes map[enum.CacheName]enum.CacheType, precomputed map[enum.CacheName]cache.PrecomputedData) (Stats, error) {
 	for cache, available := range crStates.Caches {
 		cacheGroup, ok := serverCachegroups[cache]
 		if !ok {
@@ -117,16 +117,11 @@ func addAvailableData(dsStats Stats, crStates peer.Crstates, serverCachegroups m
 			}
 
 			// TODO fix nested ifs
-			if results, ok := statHistory[cache]; ok {
-				if len(results) < 1 {
-					log.Warnf("no results %v %v\n", cache, deliveryService)
+			if pc, ok := precomputed[cache]; ok {
+				if pc.Reporting {
+					stat.CommonStats.CachesReporting[cache] = true
 				} else {
-					result := results[0]
-					if result.PrecomputedData.Reporting {
-						stat.CommonStats.CachesReporting[cache] = true
-					} else {
-						log.Debugf("no reporting %v %v\n", cache, deliveryService)
-					}
+					log.Debugf("no reporting %v %v\n", cache, deliveryService)
 				}
 			} else {
 				log.Debugf("no result for %v %v\n", cache, deliveryService)
@@ -353,26 +348,16 @@ func addDSPerSecStats(dsName enum.DeliveryServiceName, stat dsdata.Stat, lastSta
 }
 
 // latestBytes returns the most recent OutBytes from the given cache results, and the time of that result. It assumes zero results are not valid, but nonzero results with errors are valid.
-func latestBytes(results []cache.Result) (int64, time.Time, error) {
-	var result *cache.Result
-	for _, r := range results {
-		// result.Errors can include stat errors where OutBytes was set correctly, so we look for the first non-zero OutBytes rather than the first errorless result
-		// TODO add error classes to PrecomputedData, to distinguish stat errors from HTTP errors?
-		if r.PrecomputedData.OutBytes == 0 {
-			continue
-		}
-		result = &r
-		break
-	}
-	if result == nil {
+func latestBytes(p cache.PrecomputedData) (int64, time.Time, error) {
+	if p.OutBytes == 0 {
 		return 0, time.Time{}, fmt.Errorf("no valid results")
 	}
-	return result.PrecomputedData.OutBytes, result.Time, nil
+	return p.OutBytes, p.Time, nil
 }
 
 // addCachePerSecStats calculates the cache per-second stats, adds them to LastStats, and returns the augmented object.
-func addCachePerSecStats(cacheName enum.CacheName, results []cache.Result, lastStats LastStats) LastStats {
-	outBytes, outBytesTime, err := latestBytes(results) // it's ok if `latestBytes` returns 0s with an error, `addLastStat` will refrain from setting it (unless the previous calculation was nonzero, in which case it will error appropriately).
+func addCachePerSecStats(cacheName enum.CacheName, precomputed cache.PrecomputedData, lastStats LastStats) LastStats {
+	outBytes, outBytesTime, err := latestBytes(precomputed) // it's ok if `latestBytes` returns 0s with an error, `addLastStat` will refrain from setting it (unless the previous calculation was nonzero, in which case it will error appropriately).
 	if err != nil {
 		log.Warnf("while computing delivery service data for cache %v: %v\n", cacheName, err)
 	}
@@ -394,18 +379,18 @@ func addCachePerSecStats(cacheName enum.CacheName, results []cache.Result, lastS
 // we set the (new - old) / lastChangedTime as the KBPS, and update the recorded LastChangedTime and LastChangedValue
 //
 // TODO handle ATS byte rolling (when the `out_bytes` overflows back to 0)
-func addPerSecStats(statHistory map[enum.CacheName][]cache.Result, dsStats Stats, lastStats LastStats, dsStatsTime time.Time, serverCachegroups map[enum.CacheName]enum.CacheGroupName, serverTypes map[enum.CacheName]enum.CacheType) (Stats, LastStats) {
+func addPerSecStats(precomputed map[enum.CacheName]cache.PrecomputedData, dsStats Stats, lastStats LastStats, dsStatsTime time.Time, serverCachegroups map[enum.CacheName]enum.CacheGroupName, serverTypes map[enum.CacheName]enum.CacheType) (Stats, LastStats) {
 	for dsName, stat := range dsStats.DeliveryService {
 		dsStats, lastStats = addDSPerSecStats(dsName, stat, lastStats, dsStats, dsStatsTime, serverCachegroups, serverTypes)
 	}
-	for cacheName, results := range statHistory {
-		lastStats = addCachePerSecStats(cacheName, results, lastStats)
+	for cacheName, precomputedData := range precomputed {
+		lastStats = addCachePerSecStats(cacheName, precomputedData, lastStats)
 	}
 	return dsStats, lastStats
 }
 
-// CreateStats aggregates and creates statistics from given stat history. It returns the created stats, information about these stats necessary for the next calculation, and any error.
-func CreateStats(statHistory map[enum.CacheName][]cache.Result, toData todata.TOData, crStates peer.Crstates, lastStats LastStats, now time.Time) (Stats, LastStats, error) {
+// CreateStats aggregates and creates statistics from given precomputed stat history. It returns the created stats, information about these stats necessary for the next calculation, and any error.
+func CreateStats(precomputed map[enum.CacheName]cache.PrecomputedData, toData todata.TOData, crStates peer.Crstates, lastStats LastStats, now time.Time) (Stats, LastStats, error) {
 	start := time.Now()
 	dsStats := NewStats()
 	for deliveryService := range toData.DeliveryServiceServers {
@@ -417,15 +402,12 @@ func CreateStats(statHistory map[enum.CacheName][]cache.Result, toData todata.TO
 	}
 	dsStats = setStaticData(dsStats, toData.DeliveryServiceServers)
 	var err error
-	dsStats, err = addAvailableData(dsStats, crStates, toData.ServerCachegroups, toData.ServerDeliveryServices, toData.ServerTypes, statHistory) // TODO move after stat summarisation
+	dsStats, err = addAvailableData(dsStats, crStates, toData.ServerCachegroups, toData.ServerDeliveryServices, toData.ServerTypes, precomputed) // TODO move after stat summarisation
 	if err != nil {
 		return dsStats, lastStats, fmt.Errorf("Error getting Cache availability data: %v", err)
 	}
 
-	for server, history := range statHistory {
-		if len(history) < 1 {
-			continue // TODO warn?
-		}
+	for server, precomputedData := range precomputed {
 		cachegroup, ok := toData.ServerCachegroups[server]
 		if !ok {
 			log.Warnf("server %s has no cachegroup, skipping\n", server)
@@ -436,10 +418,9 @@ func CreateStats(statHistory map[enum.CacheName][]cache.Result, toData todata.TO
 			log.Warnf("server %s not in CRConfig, skipping\n", server)
 			continue
 		}
-		result := history[0]
 
 		// TODO check result.PrecomputedData.Errors
-		for ds, resultStat := range result.PrecomputedData.DeliveryServiceStats {
+		for ds, resultStat := range precomputedData.DeliveryServiceStats {
 			if ds == "" {
 				log.Errorf("EMPTY precomputed delivery service")
 				continue
@@ -460,7 +441,7 @@ func CreateStats(statHistory map[enum.CacheName][]cache.Result, toData todata.TO
 		}
 	}
 
-	perSecStats, lastStats := addPerSecStats(statHistory, dsStats, lastStats, now, toData.ServerCachegroups, toData.ServerTypes)
+	perSecStats, lastStats := addPerSecStats(precomputed, dsStats, lastStats, now, toData.ServerCachegroups, toData.ServerTypes)
 	log.Infof("CreateStats took %v\n", time.Since(start))
 	perSecStats.Time = time.Now()
 	return perSecStats, lastStats, nil

http://git-wip-us.apache.org/repos/asf/incubator-trafficcontrol/blob/01a3f8df/traffic_monitor/experimental/traffic_monitor/manager/datarequest.go
----------------------------------------------------------------------
diff --git a/traffic_monitor/experimental/traffic_monitor/manager/datarequest.go b/traffic_monitor/experimental/traffic_monitor/manager/datarequest.go
index dd0f02b..90dffb6 100644
--- a/traffic_monitor/experimental/traffic_monitor/manager/datarequest.go
+++ b/traffic_monitor/experimental/traffic_monitor/manager/datarequest.go
@@ -488,17 +488,19 @@ func WrapErrCode(errorCount threadsafe.Uint, reqPath string, body []byte, err er
 }
 
 // WrapBytes takes a function which cannot error and returns only bytes, and wraps it as a http.HandlerFunc. The errContext is logged if the write fails, and should be enough information to trace the problem (function name, endpoint, request parameters, etc).
-func WrapBytes(f func() []byte) http.HandlerFunc {
+func WrapBytes(f func() []byte, contentType string) http.HandlerFunc {
 	return func(w http.ResponseWriter, r *http.Request) {
+		w.Header().Set("Content-Type", contentType)
 		log.Write(w, f(), r.URL.EscapedPath())
 	}
 }
 
 // WrapErr takes a function which returns bytes and an error, and wraps it as a http.HandlerFunc. If the error is nil, the bytes are written with Status OK. Else, the error is logged, and InternalServerError is returned as the response code. If you need to return a different response code (for example, StatusBadRequest), call wrapRespCode.
-func WrapErr(errorCount threadsafe.Uint, f func() ([]byte, error)) http.HandlerFunc {
+func WrapErr(errorCount threadsafe.Uint, f func() ([]byte, error), contentType string) http.HandlerFunc {
 	return func(w http.ResponseWriter, r *http.Request) {
 		bytes, err := f()
 		_, code := WrapErrCode(errorCount, r.URL.EscapedPath(), bytes, err)
+		w.Header().Set("Content-Type", contentType)
 		w.WriteHeader(code)
 		log.Write(w, bytes, r.URL.EscapedPath())
 	}
@@ -510,10 +512,11 @@ func WrapErr(errorCount threadsafe.Uint, f func() ([]byte, error)) http.HandlerF
 type SrvFunc func(params url.Values, path string) ([]byte, int)
 
 // WrapParams takes a SrvFunc and wraps it as an http.HandlerFunc. Note if the SrvFunc returns 0 bytes, an InternalServerError is returned, and the response code is ignored, for security reasons. If an error response code is necessary, return bytes to that effect, for example, "Bad Request". DO NOT return informational messages regarding internal server errors; these should be logged, and only a 500 code returned to the client, for security reasons.
-func WrapParams(f SrvFunc) http.HandlerFunc {
+func WrapParams(f SrvFunc, contentType string) http.HandlerFunc {
 	return func(w http.ResponseWriter, r *http.Request) {
 		bytes, code := f(r.URL.Query(), r.URL.EscapedPath())
 		if len(bytes) > 0 {
+			w.Header().Set("Content-Type", contentType)
 			w.WriteHeader(code)
 			if _, err := w.Write(bytes); err != nil {
 				log.Warnf("received error writing data request %v: %v\n", r.URL.EscapedPath(), err)
@@ -547,7 +550,7 @@ func makeWrapAll(errorCount threadsafe.Uint, unpolledCaches threadsafe.UnpolledC
 func makeCrConfigHandler(wrapper func(http.HandlerFunc) http.HandlerFunc, errorCount threadsafe.Uint, opsConfig OpsConfigThreadsafe, toSession towrap.ITrafficOpsSession) http.HandlerFunc {
 	return wrapper(WrapErr(errorCount, func() ([]byte, error) {
 		return srvTRConfig(opsConfig, toSession)
-	}))
+	}, ContentTypeJSON))
 }
 
 func srvTRState(params url.Values, localStates peer.CRStatesThreadsafe, combinedStates peer.CRStatesThreadsafe) ([]byte, error) {
@@ -566,13 +569,13 @@ func srvTRStateSelf(localStates peer.CRStatesThreadsafe) ([]byte, error) {
 }
 
 // TODO remove error params, handle by returning an error? How, since we need to return a non-standard code?
-func srvCacheStats(params url.Values, errorCount threadsafe.Uint, path string, toData todata.TODataThreadsafe, statHistory threadsafe.ResultHistory) ([]byte, int) {
+func srvCacheStats(params url.Values, errorCount threadsafe.Uint, path string, toData todata.TODataThreadsafe, statResultHistory threadsafe.ResultStatHistory) ([]byte, int) {
 	filter, err := NewCacheStatFilter(path, params, toData.Get().ServerTypes)
 	if err != nil {
 		HandleErr(errorCount, path, err)
 		return []byte(err.Error()), http.StatusBadRequest
 	}
-	bytes, err := cache.StatsMarshall(statHistory.Get(), filter, params)
+	bytes, err := cache.StatsMarshall(statResultHistory.Get(), filter, params)
 	return WrapErrCode(errorCount, path, bytes, err)
 }
 
@@ -643,8 +646,8 @@ func srvAPIVersion(staticAppData StaticAppData) []byte {
 func srvAPITrafficOpsURI(opsConfig OpsConfigThreadsafe) []byte {
 	return []byte(opsConfig.Get().Url)
 }
-func srvAPICacheStates(toData todata.TODataThreadsafe, statHistory threadsafe.ResultHistory, healthHistory threadsafe.ResultHistory, lastHealthDurations DurationMapThreadsafe, localStates peer.CRStatesThreadsafe, lastStats threadsafe.LastStats, localCacheStatus threadsafe.CacheAvailableStatus) ([]byte, error) {
-	return json.Marshal(createCacheStatuses(toData.Get().ServerTypes, statHistory.Get(), healthHistory.Get(), lastHealthDurations.Get(), localStates.Get().Caches, lastStats.Get(), localCacheStatus))
+func srvAPICacheStates(toData todata.TODataThreadsafe, statInfoHistory threadsafe.ResultInfoHistory, statResultHistory threadsafe.ResultStatHistory, healthHistory threadsafe.ResultHistory, lastHealthDurations DurationMapThreadsafe, localStates peer.CRStatesThreadsafe, lastStats threadsafe.LastStats, localCacheStatus threadsafe.CacheAvailableStatus) ([]byte, error) {
+	return json.Marshal(createCacheStatuses(toData.Get().ServerTypes, statInfoHistory.Get(), statResultHistory.Get(), healthHistory.Get(), lastHealthDurations.Get(), localStates.Get().Caches, lastStats.Get(), localCacheStatus))
 }
 
 func srvAPIBandwidthKbps(toData todata.TODataThreadsafe, lastStats threadsafe.LastStats) []byte {
@@ -655,14 +658,11 @@ func srvAPIBandwidthKbps(toData todata.TODataThreadsafe, lastStats threadsafe.La
 	}
 	return []byte(fmt.Sprintf("%f", sum))
 }
-func srvAPIBandwidthCapacityKbps(statHistoryThs threadsafe.ResultHistory) []byte {
-	statHistory := statHistoryThs.Get()
+func srvAPIBandwidthCapacityKbps(statMaxKbpses threadsafe.CacheKbpses) []byte {
+	maxKbpses := statMaxKbpses.Get()
 	cap := int64(0)
-	for _, results := range statHistory {
-		if len(results) == 0 {
-			continue
-		}
-		cap += results[0].PrecomputedData.MaxKbps
+	for _, kbps := range maxKbpses {
+		cap += kbps
 	}
 	return []byte(fmt.Sprintf("%d", cap))
 }
@@ -671,7 +671,7 @@ func srvAPIBandwidthCapacityKbps(statHistoryThs threadsafe.ResultHistory) []byte
 func wrapUnpolledCheck(unpolledCaches threadsafe.UnpolledCaches, errorCount threadsafe.Uint, f http.HandlerFunc) http.HandlerFunc {
 	return func(w http.ResponseWriter, r *http.Request) {
 		if unpolledCaches.Any() {
-			HandleErr(errorCount, r.URL.EscapedPath(), fmt.Errorf("service still starting, some caches unpolled"))
+			HandleErr(errorCount, r.URL.EscapedPath(), fmt.Errorf("service still starting, some caches unpolled."))
 			w.WriteHeader(http.StatusServiceUnavailable)
 			log.Write(w, []byte("Service Unavailable"), r.URL.EscapedPath())
 			return
@@ -680,6 +680,19 @@ func wrapUnpolledCheck(unpolledCaches threadsafe.UnpolledCaches, errorCount thre
 	}
 }
 
+const ContentTypeJSON = "application/json"
+
+// addTrailingEndpoints adds endpoints with trailing slashes to the given dispatch map. Without this, Go will match `route` and `route/` differently.
+func addTrailingSlashEndpoints(dispatchMap map[string]http.HandlerFunc) map[string]http.HandlerFunc {
+	for route, handler := range dispatchMap {
+		if strings.HasSuffix(route, "/") {
+			continue
+		}
+		dispatchMap[route+"/"] = handler
+	}
+	return dispatchMap
+}
+
 // MakeDispatchMap returns the map of paths to http.HandlerFuncs for dispatching.
 func MakeDispatchMap(
 	opsConfig OpsConfigThreadsafe,
@@ -687,7 +700,9 @@ func MakeDispatchMap(
 	localStates peer.CRStatesThreadsafe,
 	peerStates peer.CRStatesPeersThreadsafe,
 	combinedStates peer.CRStatesThreadsafe,
-	statHistory threadsafe.ResultHistory,
+	statInfoHistory threadsafe.ResultInfoHistory,
+	statResultHistory threadsafe.ResultStatHistory,
+	statMaxKbpses threadsafe.CacheKbpses,
 	healthHistory threadsafe.ResultHistory,
 	dsStats threadsafe.DSStatsReader,
 	events threadsafe.Events,
@@ -709,80 +724,90 @@ func MakeDispatchMap(
 		return wrapUnpolledCheck(unpolledCaches, errorCount, f)
 	}
 
-	return map[string]http.HandlerFunc{
+	dispatchMap := map[string]http.HandlerFunc{
 		"/publish/CrConfig": wrap(WrapErr(errorCount, func() ([]byte, error) {
 			return srvTRConfig(opsConfig, toSession)
-		})),
+		}, ContentTypeJSON)),
 		"/publish/CrStates": wrap(WrapParams(func(params url.Values, path string) ([]byte, int) {
 			bytes, err := srvTRState(params, localStates, combinedStates)
 			return WrapErrCode(errorCount, path, bytes, err)
-		})),
-		"/publish/CrStates/": wrap(WrapParams(func(params url.Values, path string) ([]byte, int) {
-			bytes, err := srvTRState(params, localStates, combinedStates)
-			return WrapErrCode(errorCount, path, bytes, err)
-		})),
+		}, ContentTypeJSON)),
 		"/publish/CacheStats": wrap(WrapParams(func(params url.Values, path string) ([]byte, int) {
-			return srvCacheStats(params, errorCount, path, toData, statHistory)
-		})),
-		"/publish/CacheStats/": wrap(WrapParams(func(params url.Values, path string) ([]byte, int) {
-			return srvCacheStats(params, errorCount, path, toData, statHistory)
-		})),
+			return srvCacheStats(params, errorCount, path, toData, statResultHistory)
+		}, ContentTypeJSON)),
 		"/publish/DsStats": wrap(WrapParams(func(params url.Values, path string) ([]byte, int) {
 			return srvDSStats(params, errorCount, path, toData, dsStats)
-		})),
-		"/publish/DsStats/": wrap(WrapParams(func(params url.Values, path string) ([]byte, int) {
-			return srvDSStats(params, errorCount, path, toData, dsStats)
-		})),
+		}, ContentTypeJSON)),
 		"/publish/EventLog": wrap(WrapErr(errorCount, func() ([]byte, error) {
 			return srvEventLog(events)
-		})),
+		}, ContentTypeJSON)),
 		"/publish/PeerStates": wrap(WrapParams(func(params url.Values, path string) ([]byte, int) {
 			return srvPeerStates(params, errorCount, path, toData, peerStates)
-		})),
-		"/publish/PeerStates/": wrap(WrapParams(func(params url.Values, path string) ([]byte, int) {
-			return srvPeerStates(params, errorCount, path, toData, peerStates)
-		})),
+		}, ContentTypeJSON)),
 		"/publish/StatSummary": wrap(WrapParams(func(params url.Values, path string) ([]byte, int) {
 			return srvStatSummary()
-		})),
-		"/publish/StatSummary/": wrap(WrapParams(func(params url.Values, path string) ([]byte, int) {
-			return srvStatSummary()
-		})),
+		}, ContentTypeJSON)),
 		"/publish/Stats": wrap(WrapErr(errorCount, func() ([]byte, error) {
 			return srvStats(staticAppData, healthPollInterval, lastHealthDurations, fetchCount, healthIteration, errorCount)
-		})),
+		}, ContentTypeJSON)),
 		"/publish/ConfigDoc": wrap(WrapErr(errorCount, func() ([]byte, error) {
 			return srvConfigDoc(opsConfig)
-		})),
+		}, ContentTypeJSON)),
 		"/api/cache-count": wrap(WrapBytes(func() []byte {
 			return srvAPICacheCount(localStates)
-		})),
+		}, ContentTypeJSON)),
 		"/api/cache-available-count": wrap(WrapBytes(func() []byte {
 			return srvAPICacheAvailableCount(localStates)
-		})),
+		}, ContentTypeJSON)),
 		"/api/cache-down-count": wrap(WrapBytes(func() []byte {
 			return srvAPICacheDownCount(localStates, monitorConfig)
-		})),
+		}, ContentTypeJSON)),
 		"/api/version": wrap(WrapBytes(func() []byte {
 			return srvAPIVersion(staticAppData)
-		})),
+		}, ContentTypeJSON)),
 		"/api/traffic-ops-uri": wrap(WrapBytes(func() []byte {
 			return srvAPITrafficOpsURI(opsConfig)
-		})),
+		}, ContentTypeJSON)),
 		"/api/cache-statuses": wrap(WrapErr(errorCount, func() ([]byte, error) {
-			return srvAPICacheStates(toData, statHistory, healthHistory, lastHealthDurations, localStates, lastStats, localCacheStatus)
-		})),
+			return srvAPICacheStates(toData, statInfoHistory, statResultHistory, healthHistory, lastHealthDurations, localStates, lastStats, localCacheStatus)
+		}, ContentTypeJSON)),
 		"/api/bandwidth-kbps": wrap(WrapBytes(func() []byte {
 			return srvAPIBandwidthKbps(toData, lastStats)
-		})),
+		}, ContentTypeJSON)),
 		"/api/bandwidth-capacity-kbps": wrap(WrapBytes(func() []byte {
-			return srvAPIBandwidthCapacityKbps(statHistory)
-		})),
+			return srvAPIBandwidthCapacityKbps(statMaxKbpses)
+		}, ContentTypeJSON)),
 	}
+	return addTrailingSlashEndpoints(dispatchMap)
+}
+
+// latestResultInfoTimeMS returns the length of time in milliseconds that it took to request the most recent non-errored result info.
+func latestResultInfoTimeMS(cacheName enum.CacheName, history cache.ResultInfoHistory) (int64, error) {
+	results, ok := history[cacheName]
+	if !ok {
+		return 0, fmt.Errorf("cache %v has no history", cacheName)
+	}
+	if len(results) == 0 {
+		return 0, fmt.Errorf("cache %v history empty", cacheName)
+	}
+	result := cache.ResultInfo{}
+	foundResult := false
+	for _, r := range results {
+		if r.Error == nil {
+			result = r
+			foundResult = true
+			break
+		}
+	}
+	if !foundResult {
+		return 0, fmt.Errorf("cache %v No unerrored result", cacheName)
+	}
+	return int64(result.RequestTime / time.Millisecond), nil
 }
 
 // latestResultTimeMS returns the length of time in milliseconds that it took to request the most recent non-errored result.
 func latestResultTimeMS(cacheName enum.CacheName, history map[enum.CacheName][]cache.Result) (int64, error) {
+
 	results, ok := history[cacheName]
 	if !ok {
 		return 0, fmt.Errorf("cache %v has no history", cacheName)
@@ -814,6 +839,7 @@ func latestQueryTimeMS(cacheName enum.CacheName, lastDurations map[enum.CacheNam
 }
 
 // resultSpanMS returns the length of time between the most recent two results. That is, how long could the cache have been down before we would have noticed it? Note this returns the time between the most recent two results, irrespective if they errored.
+// Note this is unrelated to the Stat Span field.
 func resultSpanMS(cacheName enum.CacheName, history map[enum.CacheName][]cache.Result) (int64, error) {
 	results, ok := history[cacheName]
 	if !ok {
@@ -832,55 +858,62 @@ func resultSpanMS(cacheName enum.CacheName, history map[enum.CacheName][]cache.R
 	return int64(span / time.Millisecond), nil
 }
 
+// resultSpanMS returns the length of time between the most recent two results. That is, how long could the cache have been down before we would have noticed it? Note this returns the time between the most recent two results, irrespective if they errored.
+// Note this is unrelated to the Stat Span field.
+func infoResultSpanMS(cacheName enum.CacheName, history cache.ResultInfoHistory) (int64, error) {
+	results, ok := history[cacheName]
+	if !ok {
+		return 0, fmt.Errorf("cache %v has no history", cacheName)
+	}
+	if len(results) == 0 {
+		return 0, fmt.Errorf("cache %v history empty", cacheName)
+	}
+	if len(results) < 2 {
+		return 0, fmt.Errorf("cache %v history only has one result, can't compute span between results", cacheName)
+	}
+
+	latestResult := results[0]
+	penultimateResult := results[1]
+	span := latestResult.Time.Sub(penultimateResult.Time)
+	return int64(span / time.Millisecond), nil
+}
+
 func createCacheStatuses(
 	cacheTypes map[enum.CacheName]enum.CacheType,
-	statHistory map[enum.CacheName][]cache.Result,
+	statInfoHistory cache.ResultInfoHistory,
+	statResultHistory cache.ResultStatHistory,
 	healthHistory map[enum.CacheName][]cache.Result,
 	lastHealthDurations map[enum.CacheName]time.Duration,
 	cacheStates map[enum.CacheName]peer.IsAvailable,
 	lastStats ds.LastStats,
 	localCacheStatusThreadsafe threadsafe.CacheAvailableStatus,
 ) map[enum.CacheName]CacheStatus {
-	conns := createCacheConnections(statHistory)
+	conns := createCacheConnections(statResultHistory)
 	statii := map[enum.CacheName]CacheStatus{}
 	localCacheStatus := localCacheStatusThreadsafe.Get()
 
 	for cacheName, cacheType := range cacheTypes {
-		cacheStatHistory, ok := statHistory[cacheName]
+		infoHistory, ok := statInfoHistory[cacheName]
 		if !ok {
-			log.Warnf("createCacheStatuses stat history missing cache %s\n", cacheName)
+			log.Warnf("createCacheStatuses stat info history missing cache %s\n", cacheName)
 			continue
 		}
 
-		if len(cacheStatHistory) < 1 {
-			log.Warnf("createCacheStatuses stat history empty for cache %s\n", cacheName)
+		if len(infoHistory) < 1 {
+			log.Warnf("createCacheStatuses stat info history empty for cache %s\n", cacheName)
 			continue
 		}
 
 		log.Debugf("createCacheStatuses NOT empty for cache %s\n", cacheName)
 
-		var loadAverage *float64
-		procLoadAvg := cacheStatHistory[0].Astats.System.ProcLoadavg
-		if procLoadAvg != "" {
-			firstSpace := strings.IndexRune(procLoadAvg, ' ')
-			if firstSpace == -1 {
-				log.Warnf("WARNING unexpected proc.loadavg '%s' for cache %s\n", procLoadAvg, cacheName)
-			} else {
-				loadAverageVal, err := strconv.ParseFloat(procLoadAvg[:firstSpace], 64)
-				if err != nil {
-					log.Warnf("proc.loadavg doesn't contain a float prefix '%s' for cache %s\n", procLoadAvg, cacheName)
-				} else {
-					loadAverage = &loadAverageVal
-				}
-			}
-		}
+		loadAverage := &infoHistory[0].Vitals.LoadAvg
 
 		healthQueryTime, err := latestQueryTimeMS(cacheName, lastHealthDurations)
 		if err != nil {
 			log.Warnf("Error getting cache %v health query time: %v\n", cacheName, err)
 		}
 
-		statTime, err := latestResultTimeMS(cacheName, statHistory)
+		statTime, err := latestResultInfoTimeMS(cacheName, statInfoHistory)
 		if err != nil {
 			log.Warnf("Error getting cache %v stat result time: %v\n", cacheName, err)
 		}
@@ -890,7 +923,7 @@ func createCacheStatuses(
 			log.Warnf("Error getting cache %v health result time: %v\n", cacheName, err)
 		}
 
-		statSpan, err := resultSpanMS(cacheName, statHistory)
+		statSpan, err := infoResultSpanMS(cacheName, statInfoHistory)
 		if err != nil {
 			log.Warnf("Error getting cache %v stat span: %v\n", cacheName, err)
 		}
@@ -948,23 +981,19 @@ func createCacheStatuses(
 	return statii
 }
 
-func createCacheConnections(statHistory map[enum.CacheName][]cache.Result) map[enum.CacheName]int64 {
+func createCacheConnections(statResultHistory cache.ResultStatHistory) map[enum.CacheName]int64 {
 	conns := map[enum.CacheName]int64{}
-	for server, history := range statHistory {
-		for _, result := range history {
-			val, ok := result.Astats.Ats["proxy.process.http.current_client_connections"]
-			if !ok {
-				continue
-			}
-
-			v, ok := val.(float64)
-			if !ok {
-				continue
-			}
+	for server, history := range statResultHistory {
+		vals, ok := history["proxy.process.http.current_client_connections"]
+		if !ok || len(vals) < 1 {
+			continue
+		}
 
-			conns[server] = int64(v)
-			break
+		v, ok := vals[0].Val.(float64)
+		if !ok {
+			continue // TODO log warning? error?
 		}
+		conns[server] = int64(v)
 	}
 	return conns
 }

http://git-wip-us.apache.org/repos/asf/incubator-trafficcontrol/blob/01a3f8df/traffic_monitor/experimental/traffic_monitor/manager/healthresult.go
----------------------------------------------------------------------
diff --git a/traffic_monitor/experimental/traffic_monitor/manager/healthresult.go b/traffic_monitor/experimental/traffic_monitor/manager/healthresult.go
index f6513c0..9c150e4 100644
--- a/traffic_monitor/experimental/traffic_monitor/manager/healthresult.go
+++ b/traffic_monitor/experimental/traffic_monitor/manager/healthresult.go
@@ -80,7 +80,6 @@ func StartHealthResultManager(
 	cacheHealthChan <-chan cache.Result,
 	toData todata.TODataThreadsafe,
 	localStates peer.CRStatesThreadsafe,
-	statHistory threadsafe.ResultHistory,
 	monitorConfig TrafficMonitorConfigMapThreadsafe,
 	peerStates peer.CRStatesPeersThreadsafe,
 	combinedStates peer.CRStatesThreadsafe,
@@ -97,7 +96,6 @@ func StartHealthResultManager(
 		toData,
 		localStates,
 		lastHealthDurations,
-		statHistory,
 		healthHistory,
 		monitorConfig,
 		peerStates,
@@ -116,7 +114,6 @@ func healthResultManagerListen(
 	toData todata.TODataThreadsafe,
 	localStates peer.CRStatesThreadsafe,
 	lastHealthDurations DurationMapThreadsafe,
-	statHistory threadsafe.ResultHistory,
 	healthHistory threadsafe.ResultHistory,
 	monitorConfig TrafficMonitorConfigMapThreadsafe,
 	peerStates peer.CRStatesPeersThreadsafe,
@@ -147,7 +144,6 @@ func healthResultManagerListen(
 					toData,
 					localStates,
 					lastHealthDurations,
-					statHistory,
 					monitorConfig,
 					peerStates,
 					combinedStates,
@@ -171,7 +167,6 @@ func healthResultManagerListen(
 						toData,
 						localStates,
 						lastHealthDurations,
-						statHistory,
 						monitorConfig,
 						peerStates,
 						combinedStates,
@@ -197,7 +192,6 @@ func processHealthResult(
 	toData todata.TODataThreadsafe,
 	localStates peer.CRStatesThreadsafe,
 	lastHealthDurationsThreadsafe DurationMapThreadsafe,
-	statHistory threadsafe.ResultHistory,
 	monitorConfig TrafficMonitorConfigMapThreadsafe,
 	peerStates peer.CRStatesPeersThreadsafe,
 	combinedStates peer.CRStatesThreadsafe,

http://git-wip-us.apache.org/repos/asf/incubator-trafficcontrol/blob/01a3f8df/traffic_monitor/experimental/traffic_monitor/manager/manager.go
----------------------------------------------------------------------
diff --git a/traffic_monitor/experimental/traffic_monitor/manager/manager.go b/traffic_monitor/experimental/traffic_monitor/manager/manager.go
index ae22ad2..e794b7c 100644
--- a/traffic_monitor/experimental/traffic_monitor/manager/manager.go
+++ b/traffic_monitor/experimental/traffic_monitor/manager/manager.go
@@ -105,7 +105,7 @@ func Start(opsConfigFile string, cfg config.Config, staticAppData StaticAppData)
 		peerStates,
 	)
 
-	statHistory, _, lastKbpsStats, dsStats, unpolledCaches := StartStatHistoryManager(
+	statInfoHistory, statResultHistory, statMaxKbpses, _, lastKbpsStats, dsStats, unpolledCaches := StartStatHistoryManager(
 		cacheStatHandler.ResultChannel,
 		localStates,
 		combinedStates,
@@ -120,7 +120,6 @@ func Start(opsConfigFile string, cfg config.Config, staticAppData StaticAppData)
 		cacheHealthHandler.ResultChannel,
 		toData,
 		localStates,
-		statHistory,
 		monitorConfig,
 		peerStates,
 		combinedStates,
@@ -138,7 +137,9 @@ func Start(opsConfigFile string, cfg config.Config, staticAppData StaticAppData)
 		localStates,
 		peerStates,
 		combinedStates,
-		statHistory,
+		statInfoHistory,
+		statResultHistory,
+		statMaxKbpses,
 		healthHistory,
 		lastKbpsStats,
 		dsStats,

http://git-wip-us.apache.org/repos/asf/incubator-trafficcontrol/blob/01a3f8df/traffic_monitor/experimental/traffic_monitor/manager/opsconfig.go
----------------------------------------------------------------------
diff --git a/traffic_monitor/experimental/traffic_monitor/manager/opsconfig.go b/traffic_monitor/experimental/traffic_monitor/manager/opsconfig.go
index b57a613..29ffd01 100644
--- a/traffic_monitor/experimental/traffic_monitor/manager/opsconfig.go
+++ b/traffic_monitor/experimental/traffic_monitor/manager/opsconfig.go
@@ -73,7 +73,9 @@ func StartOpsConfigManager(
 	localStates peer.CRStatesThreadsafe,
 	peerStates peer.CRStatesPeersThreadsafe,
 	combinedStates peer.CRStatesThreadsafe,
-	statHistory threadsafe.ResultHistory,
+	statInfoHistory threadsafe.ResultInfoHistory,
+	statResultHistory threadsafe.ResultStatHistory,
+	statMaxKbpses threadsafe.CacheKbpses,
 	healthHistory threadsafe.ResultHistory,
 	lastStats threadsafe.LastStats,
 	dsStats threadsafe.DSStatsReader,
@@ -132,7 +134,9 @@ func StartOpsConfigManager(
 				localStates,
 				peerStates,
 				combinedStates,
-				statHistory,
+				statInfoHistory,
+				statResultHistory,
+				statMaxKbpses,
 				healthHistory,
 				dsStats,
 				events,

http://git-wip-us.apache.org/repos/asf/incubator-trafficcontrol/blob/01a3f8df/traffic_monitor/experimental/traffic_monitor/manager/stathistory.go
----------------------------------------------------------------------
diff --git a/traffic_monitor/experimental/traffic_monitor/manager/stathistory.go b/traffic_monitor/experimental/traffic_monitor/manager/stathistory.go
index 94d5f9c..4782fc3 100644
--- a/traffic_monitor/experimental/traffic_monitor/manager/stathistory.go
+++ b/traffic_monitor/experimental/traffic_monitor/manager/stathistory.go
@@ -66,14 +66,23 @@ func StartStatHistoryManager(
 	errorCount threadsafe.Uint,
 	cfg config.Config,
 	monitorConfig TrafficMonitorConfigMapThreadsafe,
-) (threadsafe.ResultHistory, DurationMapThreadsafe, threadsafe.LastStats, threadsafe.DSStatsReader, threadsafe.UnpolledCaches) {
-	statHistory := threadsafe.NewResultHistory()
+) (threadsafe.ResultInfoHistory, threadsafe.ResultStatHistory, threadsafe.CacheKbpses, DurationMapThreadsafe, threadsafe.LastStats, threadsafe.DSStatsReader, threadsafe.UnpolledCaches) {
+	statInfoHistory := threadsafe.NewResultInfoHistory()
+	statResultHistory := threadsafe.NewResultStatHistory()
+	statMaxKbpses := threadsafe.NewCacheKbpses()
 	lastStatDurations := NewDurationMapThreadsafe()
 	lastStatEndTimes := map[enum.CacheName]time.Time{}
 	lastStats := threadsafe.NewLastStats()
 	dsStats := threadsafe.NewDSStats()
 	unpolledCaches := threadsafe.NewUnpolledCaches()
 	tickInterval := cfg.StatFlushInterval
+
+	precomputedData := map[enum.CacheName]cache.PrecomputedData{}
+
+	process := func(results []cache.Result) {
+		processStatResults(results, statInfoHistory, statResultHistory, statMaxKbpses, combinedStates.Get(), lastStats, toData.Get(), errorCount, dsStats, lastStatEndTimes, lastStatDurations, unpolledCaches, monitorConfig.Get(), precomputedData)
+	}
+
 	go func() {
 		var ticker *time.Ticker
 		<-cachesChanged // wait for the signal that localStates have been set
@@ -93,27 +102,29 @@ func StartStatHistoryManager(
 					unpolledCaches.SetNewCaches(getNewCaches(localStates, monitorConfig))
 				case <-ticker.C:
 					log.Warnf("StatHistoryManager flushing queued results\n")
-					processStatResults(results, statHistory, combinedStates.Get(), lastStats, toData.Get(), errorCount, dsStats, lastStatEndTimes, lastStatDurations, unpolledCaches, monitorConfig.Get())
+					process(results)
 					break innerLoop
 				default:
 					select {
 					case r := <-cacheStatChan:
 						results = append(results, r)
 					default:
-						processStatResults(results, statHistory, combinedStates.Get(), lastStats, toData.Get(), errorCount, dsStats, lastStatEndTimes, lastStatDurations, unpolledCaches, monitorConfig.Get())
+						process(results)
 						break innerLoop
 					}
 				}
 			}
 		}
 	}()
-	return statHistory, lastStatDurations, lastStats, &dsStats, unpolledCaches
+	return statInfoHistory, statResultHistory, statMaxKbpses, lastStatDurations, lastStats, &dsStats, unpolledCaches
 }
 
 // processStatResults processes the given results, creating and setting DSStats, LastStats, and other stats. Note this is NOT threadsafe, and MUST NOT be called from multiple threads.
 func processStatResults(
 	results []cache.Result,
-	statHistoryThreadsafe threadsafe.ResultHistory,
+	statInfoHistoryThreadsafe threadsafe.ResultInfoHistory,
+	statResultHistoryThreadsafe threadsafe.ResultStatHistory,
+	statMaxKbpsesThreadsafe threadsafe.CacheKbpses,
 	combinedStates peer.Crstates,
 	lastStats threadsafe.LastStats,
 	toData todata.TOData,
@@ -123,25 +134,44 @@ func processStatResults(
 	lastStatDurationsThreadsafe DurationMapThreadsafe,
 	unpolledCaches threadsafe.UnpolledCaches,
 	mc to.TrafficMonitorConfigMap,
+	precomputedData map[enum.CacheName]cache.PrecomputedData,
 ) {
-	statHistory := statHistoryThreadsafe.Get().Copy()
+
+	// setting the statHistory could be put in a goroutine concurrent with `ds.CreateStats`, if it were slow
+	statInfoHistory := statInfoHistoryThreadsafe.Get().Copy()
+	statResultHistory := statResultHistoryThreadsafe.Get().Copy()
+	statMaxKbpses := statMaxKbpsesThreadsafe.Get().Copy()
+
 	for _, result := range results {
 		maxStats := uint64(mc.Profile[mc.TrafficServer[string(result.ID)].Profile].Parameters.HistoryCount)
 		if maxStats < 1 {
 			log.Warnf("processStatResults got history count %v for %v, setting to 1\n", maxStats, result.ID)
 			maxStats = 1
 		}
-
 		// TODO determine if we want to add results with errors, or just print the errors now and don't add them.
-		statHistory[result.ID] = pruneHistory(append([]cache.Result{result}, statHistory[result.ID]...), maxStats)
+		statInfoHistory.Add(result, maxStats)
+		statResultHistory.Add(result, maxStats)
+		// Don't add errored maxes or precomputed DSStats
+		if result.Error == nil {
+			// max and precomputed always contain the latest result from each cache
+			statMaxKbpses.AddMax(result)
+			// if we failed to compute the OutBytes, keep the outbytes of the last result.
+			if result.PrecomputedData.OutBytes == 0 {
+				result.PrecomputedData.OutBytes = precomputedData[result.ID].OutBytes
+			}
+			precomputedData[result.ID] = result.PrecomputedData
+
+		}
 	}
-	statHistoryThreadsafe.Set(statHistory)
+	statInfoHistoryThreadsafe.Set(statInfoHistory)
+	statResultHistoryThreadsafe.Set(statResultHistory)
+	statMaxKbpsesThreadsafe.Set(statMaxKbpses)
 
 	for _, result := range results {
 		log.Debugf("poll %v %v CreateStats start\n", result.PollID, time.Now())
 	}
 
-	newDsStats, newLastStats, err := ds.CreateStats(statHistory, toData, combinedStates, lastStats.Get().Copy(), time.Now())
+	newDsStats, newLastStats, err := ds.CreateStats(precomputedData, toData, combinedStates, lastStats.Get().Copy(), time.Now())
 
 	for _, result := range results {
 		log.Debugf("poll %v %v CreateStats end\n", result.PollID, time.Now())

http://git-wip-us.apache.org/repos/asf/incubator-trafficcontrol/blob/01a3f8df/traffic_monitor/experimental/traffic_monitor/threadsafe/cachemaxkbpses.go
----------------------------------------------------------------------
diff --git a/traffic_monitor/experimental/traffic_monitor/threadsafe/cachemaxkbpses.go b/traffic_monitor/experimental/traffic_monitor/threadsafe/cachemaxkbpses.go
new file mode 100644
index 0000000..5bb584c
--- /dev/null
+++ b/traffic_monitor/experimental/traffic_monitor/threadsafe/cachemaxkbpses.go
@@ -0,0 +1,52 @@
+package threadsafe
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import (
+	"github.com/apache/incubator-trafficcontrol/traffic_monitor/experimental/traffic_monitor/cache"
+	"github.com/apache/incubator-trafficcontrol/traffic_monitor/experimental/traffic_monitor/enum"
+	"sync"
+)
+
+// CacheAvailableStatus wraps a map of cache available statuses to be safe for multiple reader goroutines and one writer.
+type CacheKbpses struct {
+	v *cache.Kbpses
+	m *sync.RWMutex
+}
+
+// NewCacheAvailableStatus creates and returns a new CacheAvailableStatus, initializing internal pointer values.
+func NewCacheKbpses() CacheKbpses {
+	v := cache.Kbpses(map[enum.CacheName]int64{})
+	return CacheKbpses{m: &sync.RWMutex{}, v: &v}
+}
+
+// Get returns the internal map of cache statuses. The returned map MUST NOT be modified. If modification is necessary, copy.
+func (o *CacheKbpses) Get() cache.Kbpses {
+	o.m.RLock()
+	defer o.m.RUnlock()
+	return *o.v
+}
+
+// Set sets the internal map of cache availability. This MUST NOT be called by multiple goroutines.
+func (o *CacheKbpses) Set(v cache.Kbpses) {
+	o.m.Lock()
+	*o.v = v
+	o.m.Unlock()
+}

http://git-wip-us.apache.org/repos/asf/incubator-trafficcontrol/blob/01a3f8df/traffic_monitor/experimental/traffic_monitor/threadsafe/resultstathistory.go
----------------------------------------------------------------------
diff --git a/traffic_monitor/experimental/traffic_monitor/threadsafe/resultstathistory.go b/traffic_monitor/experimental/traffic_monitor/threadsafe/resultstathistory.go
new file mode 100644
index 0000000..966b3ac
--- /dev/null
+++ b/traffic_monitor/experimental/traffic_monitor/threadsafe/resultstathistory.go
@@ -0,0 +1,83 @@
+// TODO rename
+package threadsafe
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import (
+	"sync"
+
+	"github.com/apache/incubator-trafficcontrol/traffic_monitor/experimental/traffic_monitor/cache"
+)
+
+// ResultStatHistory provides safe access for multiple goroutines readers and a single writer to a stored HistoryHistory object.
+// This could be made lock-free, if the performance was necessary
+// TODO add separate locks for Caches and Deliveryservice maps?
+type ResultStatHistory struct {
+	history *cache.ResultStatHistory
+	m       *sync.RWMutex
+}
+
+// NewResultStatHistory returns a new ResultStatHistory safe for multiple readers and a single writer.
+func NewResultStatHistory() ResultStatHistory {
+	h := cache.ResultStatHistory{}
+	return ResultStatHistory{m: &sync.RWMutex{}, history: &h}
+}
+
+// Get returns the ResultStatHistory. Callers MUST NOT modify. If mutation is necessary, call ResultStatHistory.Copy()
+func (h *ResultStatHistory) Get() cache.ResultStatHistory {
+	h.m.RLock()
+	defer h.m.RUnlock()
+	return *h.history
+}
+
+// Set sets the internal ResultStatHistory. This is only safe for one thread of execution. This MUST NOT be called from multiple threads.
+func (h *ResultStatHistory) Set(v cache.ResultStatHistory) {
+	h.m.Lock()
+	*h.history = v
+	h.m.Unlock()
+}
+
+// ResultStatHistory provides safe access for multiple goroutines readers and a single writer to a stored HistoryHistory object.
+// This could be made lock-free, if the performance was necessary
+// TODO add separate locks for Caches and Deliveryservice maps?
+type ResultInfoHistory struct {
+	history *cache.ResultInfoHistory
+	m       *sync.RWMutex
+}
+
+// NewResultInfoHistory returns a new ResultInfoHistory safe for multiple readers and a single writer.
+func NewResultInfoHistory() ResultInfoHistory {
+	h := cache.ResultInfoHistory{}
+	return ResultInfoHistory{m: &sync.RWMutex{}, history: &h}
+}
+
+// Get returns the ResultInfoHistory. Callers MUST NOT modify. If mutation is necessary, call ResultInfoHistory.Copy()
+func (h *ResultInfoHistory) Get() cache.ResultInfoHistory {
+	h.m.RLock()
+	defer h.m.RUnlock()
+	return *h.history
+}
+
+// Set sets the internal ResultInfoHistory. This is only safe for one thread of execution. This MUST NOT be called from multiple threads.
+func (h *ResultInfoHistory) Set(v cache.ResultInfoHistory) {
+	h.m.Lock()
+	*h.history = v
+	h.m.Unlock()
+}