You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@trafficcontrol.apache.org by ne...@apache.org on 2016/11/07 19:29:52 UTC
[06/21] incubator-trafficcontrol git commit: Fix TM2 thresholds,
max history from TO /health/
Fix TM2 thresholds, max history from TO /health/
Fixes Traffic Monitor 2.0 to get per-cache (profile) connection
timeout, max history; and threshold kbps, load average, and query
time from the Traffic Ops /health/{cdn-name} endpoint.
Project: http://git-wip-us.apache.org/repos/asf/incubator-trafficcontrol/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-trafficcontrol/commit/57b17e08
Tree: http://git-wip-us.apache.org/repos/asf/incubator-trafficcontrol/tree/57b17e08
Diff: http://git-wip-us.apache.org/repos/asf/incubator-trafficcontrol/diff/57b17e08
Branch: refs/heads/master
Commit: 57b17e08a1128740bc79449319518013e95cc23b
Parents: 88566da
Author: Robert Butts <ro...@gmail.com>
Authored: Fri Oct 28 10:56:26 2016 -0600
Committer: Dave Neuman <ne...@apache.org>
Committed: Mon Nov 7 12:29:08 2016 -0700
----------------------------------------------------------------------
.../experimental/common/fetcher/fetcher.go | 6 +-
.../experimental/common/handler/handler.go | 3 +-
.../experimental/common/poller/poller.go | 60 +++++++++++++++-----
.../experimental/traffic_monitor/cache/cache.go | 17 +++---
.../traffic_monitor/health/cache_health.go | 47 ++++++++++++---
.../traffic_monitor/manager/healthresult.go | 3 +-
.../traffic_monitor/manager/manager.go | 1 -
.../traffic_monitor/manager/monitorconfig.go | 28 ++++++---
.../traffic_monitor/manager/stathistory.go | 20 +++----
.../experimental/traffic_monitor/peer/peer.go | 3 +-
10 files changed, 130 insertions(+), 58 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-trafficcontrol/blob/57b17e08/traffic_monitor/experimental/common/fetcher/fetcher.go
----------------------------------------------------------------------
diff --git a/traffic_monitor/experimental/common/fetcher/fetcher.go b/traffic_monitor/experimental/common/fetcher/fetcher.go
index f1176ea..d7d7646 100644
--- a/traffic_monitor/experimental/common/fetcher/fetcher.go
+++ b/traffic_monitor/experimental/common/fetcher/fetcher.go
@@ -43,7 +43,9 @@ func (f HttpFetcher) Fetch(id string, url string, pollId uint64, pollFinishedCha
if f.Pending != nil {
f.Pending.Inc()
}
+ startReq := time.Now()
response, err := f.Client.Do(req)
+ reqTime := time.Now().Sub(startReq)
if f.Pending != nil {
f.Pending.Dec()
}
@@ -69,11 +71,11 @@ func (f HttpFetcher) Fetch(id string, url string, pollId uint64, pollFinishedCha
f.Success.Inc()
}
log.Debugf("poll %v %v fetch end\n", pollId, time.Now())
- f.Handler.Handle(id, response.Body, err, pollId, pollFinishedChan)
+ f.Handler.Handle(id, response.Body, reqTime, err, pollId, pollFinishedChan)
} else {
if f.Fail != nil {
f.Fail.Inc()
}
- f.Handler.Handle(id, nil, err, pollId, pollFinishedChan)
+ f.Handler.Handle(id, nil, reqTime, err, pollId, pollFinishedChan)
}
}
http://git-wip-us.apache.org/repos/asf/incubator-trafficcontrol/blob/57b17e08/traffic_monitor/experimental/common/handler/handler.go
----------------------------------------------------------------------
diff --git a/traffic_monitor/experimental/common/handler/handler.go b/traffic_monitor/experimental/common/handler/handler.go
index 0b5f838..e1a558a 100644
--- a/traffic_monitor/experimental/common/handler/handler.go
+++ b/traffic_monitor/experimental/common/handler/handler.go
@@ -3,6 +3,7 @@ package handler
import (
"encoding/json"
"io"
+ "time"
"github.com/apache/incubator-trafficcontrol/traffic_monitor/experimental/common/log"
)
@@ -14,7 +15,7 @@ const (
)
type Handler interface {
- Handle(string, io.Reader, error, uint64, chan<- uint64)
+ Handle(string, io.Reader, time.Duration, error, uint64, chan<- uint64)
}
type OpsConfigFileHandler struct {
http://git-wip-us.apache.org/repos/asf/incubator-trafficcontrol/blob/57b17e08/traffic_monitor/experimental/common/poller/poller.go
----------------------------------------------------------------------
diff --git a/traffic_monitor/experimental/common/poller/poller.go b/traffic_monitor/experimental/common/poller/poller.go
index 9a7402f..a78d767 100644
--- a/traffic_monitor/experimental/common/poller/poller.go
+++ b/traffic_monitor/experimental/common/poller/poller.go
@@ -22,14 +22,20 @@ type Poller interface {
}
type HttpPoller struct {
- Config HttpPollerConfig
- ConfigChannel chan HttpPollerConfig
- Fetcher fetcher.Fetcher
- TickChan chan uint64
+ Config HttpPollerConfig
+ ConfigChannel chan HttpPollerConfig
+ FetcherTemplate fetcher.HttpFetcher // FetcherTemplate has all the constant settings, and is copied to create fetchers with custom HTTP client timeouts.
+ TickChan chan uint64
+}
+
+type PollConfig struct {
+ URL string
+ Timeout time.Duration
+ Handler handler.Handler
}
type HttpPollerConfig struct {
- Urls map[string]string
+ Urls map[string]PollConfig
Interval time.Duration
}
@@ -46,7 +52,7 @@ func NewHTTP(interval time.Duration, tick bool, httpClient *http.Client, counter
Config: HttpPollerConfig{
Interval: interval,
},
- Fetcher: fetcher.HttpFetcher{
+ FetcherTemplate: fetcher.HttpFetcher{
Handler: fetchHandler,
Client: httpClient,
Counters: counters,
@@ -124,7 +130,14 @@ func (p HttpPoller) Poll() {
for _, info := range additions {
kill := make(chan struct{})
killChans[info.ID] = kill
- go pollHttp(info.Interval, info.ID, info.URL, p.Fetcher, kill)
+
+ fetcher := p.FetcherTemplate
+ if info.Timeout != 0 { // if the timeout isn't explicitly set, use the template value.
+ c := *fetcher.Client
+ fetcher.Client = &c // copy the client, so we don't change other fetchers.
+ fetcher.Client.Timeout = info.Timeout
+ }
+ go pollHttp(info.Interval, info.ID, info.URL, fetcher, kill)
}
p.Config = newConfig
}
@@ -132,8 +145,10 @@ func (p HttpPoller) Poll() {
type HTTPPollInfo struct {
Interval time.Duration
+ Timeout time.Duration
ID string
URL string
+ Handler handler.Handler
}
// diffConfigs takes the old and new configs, and returns a list of deleted IDs, and a list of new polls to do
@@ -145,26 +160,41 @@ func diffConfigs(old HttpPollerConfig, new HttpPollerConfig) ([]string, []HTTPPo
for id, _ := range old.Urls {
deletions = append(deletions, id)
}
- for id, url := range new.Urls {
- additions = append(additions, HTTPPollInfo{Interval: new.Interval, ID: id, URL: url})
+ for id, pollCfg := range new.Urls {
+ additions = append(additions, HTTPPollInfo{
+ Interval: new.Interval,
+ ID: id,
+ URL: pollCfg.URL,
+ Timeout: pollCfg.Timeout,
+ })
}
return deletions, additions
}
- for id, oldUrl := range old.Urls {
- newUrl, newIdExists := new.Urls[id]
+ for id, oldPollCfg := range old.Urls {
+ newPollCfg, newIdExists := new.Urls[id]
if !newIdExists {
deletions = append(deletions, id)
- } else if newUrl != oldUrl {
+ } else if newPollCfg != oldPollCfg {
deletions = append(deletions, id)
- additions = append(additions, HTTPPollInfo{Interval: new.Interval, ID: id, URL: newUrl})
+ additions = append(additions, HTTPPollInfo{
+ Interval: new.Interval,
+ ID: id,
+ URL: newPollCfg.URL,
+ Timeout: newPollCfg.Timeout,
+ })
}
}
- for id, newUrl := range new.Urls {
+ for id, newPollCfg := range new.Urls {
_, oldIdExists := old.Urls[id]
if !oldIdExists {
- additions = append(additions, HTTPPollInfo{Interval: new.Interval, ID: id, URL: newUrl})
+ additions = append(additions, HTTPPollInfo{
+ Interval: new.Interval,
+ ID: id,
+ URL: newPollCfg.URL,
+ Timeout: newPollCfg.Timeout,
+ })
}
}
http://git-wip-us.apache.org/repos/asf/incubator-trafficcontrol/blob/57b17e08/traffic_monitor/experimental/traffic_monitor/cache/cache.go
----------------------------------------------------------------------
diff --git a/traffic_monitor/experimental/traffic_monitor/cache/cache.go b/traffic_monitor/experimental/traffic_monitor/cache/cache.go
index 0d5a95c..52230d2 100644
--- a/traffic_monitor/experimental/traffic_monitor/cache/cache.go
+++ b/traffic_monitor/experimental/traffic_monitor/cache/cache.go
@@ -27,6 +27,7 @@ type Handler struct {
}
// NewHandler returns a new cache handler. Note this handler does NOT precomputes stat data before calling ResultChannel, and Result.Precomputed will be nil
+// TODO change this to take the ResultChan. It doesn't make sense for the Handler to 'own' the Result Chan.
func NewHandler() Handler {
return Handler{ResultChannel: make(chan Result), MultipleSpaceRegex: regexp.MustCompile(" +")}
}
@@ -52,12 +53,13 @@ type PrecomputedData struct {
// Result is the data result returned by a cache.
type Result struct {
- ID enum.CacheName
- Available bool
- Error error
- Astats Astats
- Time time.Time
- Vitals Vitals
+ ID enum.CacheName
+ Available bool
+ Error error
+ Astats Astats
+ Time time.Time
+ RequestTime time.Duration
+ Vitals Vitals
PrecomputedData
PollID uint64
PollFinished chan<- uint64
@@ -135,11 +137,12 @@ func StatsMarshall(statHistory map[enum.CacheName][]Result, filter Filter, param
}
// Handle handles results fetched from a cache, parsing the raw Reader data and passing it along to a chan for further processing.
-func (handler Handler) Handle(id string, r io.Reader, err error, pollID uint64, pollFinished chan<- uint64) {
+func (handler Handler) Handle(id string, r io.Reader, reqTime time.Duration, err error, pollID uint64, pollFinished chan<- uint64) {
log.Debugf("poll %v %v handle start\n", pollID, time.Now())
result := Result{
ID: enum.CacheName(id),
Time: time.Now(), // TODO change this to be computed the instant we get the result back, to minimise inaccuracy
+ RequestTime: reqTime,
PollID: pollID,
PollFinished: pollFinished,
}
http://git-wip-us.apache.org/repos/asf/incubator-trafficcontrol/blob/57b17e08/traffic_monitor/experimental/traffic_monitor/health/cache_health.go
----------------------------------------------------------------------
diff --git a/traffic_monitor/experimental/traffic_monitor/health/cache_health.go b/traffic_monitor/experimental/traffic_monitor/health/cache_health.go
index 4147418..1106f62 100644
--- a/traffic_monitor/experimental/traffic_monitor/health/cache_health.go
+++ b/traffic_monitor/experimental/traffic_monitor/health/cache_health.go
@@ -1,13 +1,15 @@
package health
import (
- "github.com/apache/incubator-trafficcontrol/traffic_monitor/experimental/common/log"
- "github.com/apache/incubator-trafficcontrol/traffic_monitor/experimental/traffic_monitor/cache"
- traffic_ops "github.com/apache/incubator-trafficcontrol/traffic_ops/client"
-
"fmt"
+ "math"
"strconv"
"strings"
+ "time"
+
+ "github.com/apache/incubator-trafficcontrol/traffic_monitor/experimental/common/log"
+ "github.com/apache/incubator-trafficcontrol/traffic_monitor/experimental/traffic_monitor/cache"
+ traffic_ops "github.com/apache/incubator-trafficcontrol/traffic_ops/client"
)
func setError(newResult *cache.Result, err error) {
@@ -73,9 +75,34 @@ func GetVitals(newResult *cache.Result, prevResult *cache.Result, mc *traffic_op
// log.Infoln(newResult.Id, "BytesOut", newResult.Vitals.BytesOut, "BytesIn", newResult.Vitals.BytesIn, "Kbps", newResult.Vitals.KbpsOut, "max", newResult.Vitals.MaxKbpsOut)
}
+// getKbpsThreshold returns the numeric kbps threshold, from the Traffic Ops string value. If there is a parse error, it logs a warning and returns the max floating point number, signifying no limit
+// TODO add float64 to Traffic Ops Client interface
+func getKbpsThreshold(threshStr string) int64 {
+ if len(threshStr) == 0 {
+ log.Errorf("Empty Traffic Ops HealthThresholdAvailableBandwidthInKbps; setting no limit.\n")
+ return math.MaxInt64
+ }
+ if threshStr[0] == '>' {
+ threshStr = threshStr[1:]
+ }
+ thresh, err := strconv.ParseInt(threshStr, 10, 64)
+ if err != nil {
+ log.Errorf("Failed to parse Traffic Ops HealthThresholdAvailableBandwidthInKbps, setting no limit: '%v'\n", err)
+ return math.MaxInt64
+ }
+ return thresh
+}
+
+// TODO add time.Duration to Traffic Ops Client interface
+func getQueryThreshold(threshInt int64) time.Duration {
+ return time.Duration(threshInt) * time.Millisecond
+}
+
// EvalCache returns whether the given cache should be marked available, and a string describing why
func EvalCache(result cache.Result, mc *traffic_ops.TrafficMonitorConfigMap) (bool, string) {
- status := mc.TrafficServer[string(result.ID)].Status
+ toServer := mc.TrafficServer[string(result.ID)]
+ status := toServer.Status
+ params := mc.Profile[toServer.Profile].Parameters
switch {
case status == "ADMIN_DOWN":
return false, "set to ADMIN_DOWN"
@@ -85,10 +112,12 @@ func EvalCache(result cache.Result, mc *traffic_ops.TrafficMonitorConfigMap) (bo
return true, "set to ONLINE"
case result.Error != nil:
return false, fmt.Sprintf("error: %v", result.Error)
- case result.Vitals.LoadAvg > mc.Profile[mc.TrafficServer[string(result.ID)].Profile].Parameters.HealthThresholdLoadAvg:
- return false, fmt.Sprintf("load average %f exceeds threshold %f", result.Vitals.LoadAvg, mc.Profile[mc.TrafficServer[string(result.ID)].Profile].Parameters.HealthThresholdLoadAvg)
- case result.Vitals.MaxKbpsOut < result.Vitals.KbpsOut:
- return false, fmt.Sprintf("%dkbps exceeds max %dkbps", result.Vitals.KbpsOut, result.Vitals.MaxKbpsOut)
+ case result.Vitals.LoadAvg > params.HealthThresholdLoadAvg:
+ return false, fmt.Sprintf("load average %f exceeds threshold %f", result.Vitals.LoadAvg, params.HealthThresholdLoadAvg)
+ case result.Vitals.KbpsOut >= getKbpsThreshold(params.HealthThresholdAvailableBandwidthInKbps):
+ return false, fmt.Sprintf("%dkbps exceeds max %dkbps", result.Vitals.KbpsOut, getKbpsThreshold(params.HealthThresholdAvailableBandwidthInKbps))
+ case result.RequestTime > getQueryThreshold(int64(params.HealthThresholdQueryTime)):
+ return false, fmt.Sprintf("request time %v exceeds max %v", result.RequestTime, getQueryThreshold(int64(params.HealthThresholdQueryTime)))
default:
return result.Available, "reported"
}
http://git-wip-us.apache.org/repos/asf/incubator-trafficcontrol/blob/57b17e08/traffic_monitor/experimental/traffic_monitor/manager/healthresult.go
----------------------------------------------------------------------
diff --git a/traffic_monitor/experimental/traffic_monitor/manager/healthresult.go b/traffic_monitor/experimental/traffic_monitor/manager/healthresult.go
index de069b3..42fe82c 100644
--- a/traffic_monitor/experimental/traffic_monitor/manager/healthresult.go
+++ b/traffic_monitor/experimental/traffic_monitor/manager/healthresult.go
@@ -203,7 +203,8 @@ func processHealthResult(
health.GetVitals(&healthResult, &prevResult, &monitorConfigCopy)
}
- healthHistory[healthResult.ID] = pruneHistory(append(healthHistory[healthResult.ID], healthResult), cfg.MaxHealthHistory)
+ maxHistory := uint64(monitorConfigCopy.Profile[monitorConfigCopy.TrafficServer[string(healthResult.ID)].Profile].Parameters.HistoryCount)
+ healthHistory[healthResult.ID] = pruneHistory(append(healthHistory[healthResult.ID], healthResult), maxHistory)
isAvailable, whyAvailable := health.EvalCache(healthResult, &monitorConfigCopy)
if localStates.Get().Caches[healthResult.ID].IsAvailable != isAvailable {
http://git-wip-us.apache.org/repos/asf/incubator-trafficcontrol/blob/57b17e08/traffic_monitor/experimental/traffic_monitor/manager/manager.go
----------------------------------------------------------------------
diff --git a/traffic_monitor/experimental/traffic_monitor/manager/manager.go b/traffic_monitor/experimental/traffic_monitor/manager/manager.go
index c362b33..36e47c6 100644
--- a/traffic_monitor/experimental/traffic_monitor/manager/manager.go
+++ b/traffic_monitor/experimental/traffic_monitor/manager/manager.go
@@ -42,7 +42,6 @@ func Start(opsConfigFile string, cfg config.Config, staticAppData StaticAppData)
// TODO investigate whether a unique client per cache to be polled is faster
sharedClient := &http.Client{
- Timeout: cfg.HTTPTimeout,
Transport: &http.Transport{TLSClientConfig: &tls.Config{InsecureSkipVerify: true}},
}
http://git-wip-us.apache.org/repos/asf/incubator-trafficcontrol/blob/57b17e08/traffic_monitor/experimental/traffic_monitor/manager/monitorconfig.go
----------------------------------------------------------------------
diff --git a/traffic_monitor/experimental/traffic_monitor/manager/monitorconfig.go b/traffic_monitor/experimental/traffic_monitor/manager/monitorconfig.go
index dc8ca41..e3ba337 100644
--- a/traffic_monitor/experimental/traffic_monitor/manager/monitorconfig.go
+++ b/traffic_monitor/experimental/traffic_monitor/manager/monitorconfig.go
@@ -2,14 +2,16 @@ package manager
import (
"fmt"
+ "strings"
+ "sync"
+ "time"
+
"github.com/apache/incubator-trafficcontrol/traffic_monitor/experimental/common/log"
"github.com/apache/incubator-trafficcontrol/traffic_monitor/experimental/common/poller"
"github.com/apache/incubator-trafficcontrol/traffic_monitor/experimental/traffic_monitor/config"
"github.com/apache/incubator-trafficcontrol/traffic_monitor/experimental/traffic_monitor/enum"
"github.com/apache/incubator-trafficcontrol/traffic_monitor/experimental/traffic_monitor/peer"
to "github.com/apache/incubator-trafficcontrol/traffic_ops/client"
- "strings"
- "sync"
)
// CopyTrafficMonitorConfigMap returns a deep copy of the given TrafficMonitorConfigMap
@@ -92,6 +94,12 @@ func StartMonitorConfigManager(
return monitorConfig
}
+// trafficOpsHealthConnectionTimeoutToDuration takes the int from Traffic Ops, which is in milliseconds, and returns a time.Duration
+// TODO change Traffic Ops Client API to a time.Duration
+func trafficOpsHealthConnectionTimeoutToDuration(t int) time.Duration {
+ return time.Duration(t) * time.Millisecond
+}
+
// TODO timing, and determine if the case, or its internal `for`, should be put in a goroutine
// TODO determine if subscribers take action on change, and change to mutexed objects if not.
func monitorConfigListen(
@@ -107,9 +115,9 @@ func monitorConfigListen(
) {
for monitorConfig := range monitorConfigPollChan {
monitorConfigTS.Set(monitorConfig)
- healthUrls := map[string]string{}
- statUrls := map[string]string{}
- peerUrls := map[string]string{}
+ healthUrls := map[string]poller.PollConfig{}
+ statUrls := map[string]poller.PollConfig{}
+ peerUrls := map[string]poller.PollConfig{}
caches := map[string]string{}
for _, srv := range monitorConfig.TrafficServer {
@@ -137,10 +145,12 @@ func monitorConfigListen(
"application=", "application=plugin.remap",
)
url = r.Replace(url)
- healthUrls[srv.HostName] = url
+
+ connTimeout := trafficOpsHealthConnectionTimeoutToDuration(monitorConfig.Profile[srv.Profile].Parameters.HealthConnectionTimeout)
+ healthUrls[srv.HostName] = poller.PollConfig{URL: url, Timeout: connTimeout}
r = strings.NewReplacer("application=plugin.remap", "application=")
- url = r.Replace(url)
- statUrls[srv.HostName] = url
+ statUrl := r.Replace(url)
+ statUrls[srv.HostName] = poller.PollConfig{URL: statUrl, Timeout: connTimeout}
}
for _, srv := range monitorConfig.TrafficMonitor {
@@ -152,7 +162,7 @@ func monitorConfigListen(
}
// TODO: the URL should be config driven. -jse
url := fmt.Sprintf("http://%s:%d/publish/CrStates?raw", srv.IP, srv.Port)
- peerUrls[srv.HostName] = url
+ peerUrls[srv.HostName] = poller.PollConfig{URL: url} // TODO determine timeout.
}
statURLSubscriber <- poller.HttpPollerConfig{Urls: statUrls, Interval: cfg.CacheStatPollingInterval}
http://git-wip-us.apache.org/repos/asf/incubator-trafficcontrol/blob/57b17e08/traffic_monitor/experimental/traffic_monitor/manager/stathistory.go
----------------------------------------------------------------------
diff --git a/traffic_monitor/experimental/traffic_monitor/manager/stathistory.go b/traffic_monitor/experimental/traffic_monitor/manager/stathistory.go
index 7af7e2c..b3fc407 100644
--- a/traffic_monitor/experimental/traffic_monitor/manager/stathistory.go
+++ b/traffic_monitor/experimental/traffic_monitor/manager/stathistory.go
@@ -11,6 +11,7 @@ import (
"github.com/apache/incubator-trafficcontrol/traffic_monitor/experimental/traffic_monitor/enum"
"github.com/apache/incubator-trafficcontrol/traffic_monitor/experimental/traffic_monitor/peer"
todata "github.com/apache/incubator-trafficcontrol/traffic_monitor/experimental/traffic_monitor/trafficopsdata"
+ to "github.com/apache/incubator-trafficcontrol/traffic_ops/client"
)
// StatHistory is a map of cache names, to an array of result history from each cache.
@@ -37,18 +38,12 @@ func (a StatHistory) Copy() StatHistory {
type StatHistoryThreadsafe struct {
statHistory *StatHistory
m *sync.RWMutex
- max uint64
-}
-
-// Max returns the max history to be stored for any cache
-func (h StatHistoryThreadsafe) Max() uint64 {
- return h.max
}
// NewStatHistoryThreadsafe returns a new StatHistory safe for multiple readers and a single writer.
-func NewStatHistoryThreadsafe(maxHistory uint64) StatHistoryThreadsafe {
+func NewStatHistoryThreadsafe() StatHistoryThreadsafe {
h := StatHistory{}
- return StatHistoryThreadsafe{m: &sync.RWMutex{}, statHistory: &h, max: maxHistory}
+ return StatHistoryThreadsafe{m: &sync.RWMutex{}, statHistory: &h}
}
// Get returns the StatHistory. Callers MUST NOT modify. If mutation is necessary, call StatHistory.Copy()
@@ -99,7 +94,7 @@ func StartStatHistoryManager(
cfg config.Config,
monitorConfig TrafficMonitorConfigMapThreadsafe,
) (StatHistoryThreadsafe, DurationMapThreadsafe, LastStatsThreadsafe, DSStatsReader, UnpolledCachesThreadsafe) {
- statHistory := NewStatHistoryThreadsafe(cfg.MaxStatHistory)
+ statHistory := NewStatHistoryThreadsafe()
lastStatDurations := NewDurationMapThreadsafe()
lastStatEndTimes := map[enum.CacheName]time.Time{}
lastStats := NewLastStatsThreadsafe()
@@ -122,14 +117,14 @@ func StartStatHistoryManager(
unpolledCaches.SetNewCaches(getNewCaches(localStates, monitorConfig))
case <-tick:
log.Warnf("StatHistoryManager flushing queued results\n")
- processStatResults(results, statHistory, combinedStates.Get(), lastStats, toData.Get(), errorCount, dsStats, lastStatEndTimes, lastStatDurations, unpolledCaches)
+ processStatResults(results, statHistory, combinedStates.Get(), lastStats, toData.Get(), errorCount, dsStats, lastStatEndTimes, lastStatDurations, unpolledCaches, monitorConfig.Get())
break innerLoop
default:
select {
case r := <-cacheStatChan:
results = append(results, r)
default:
- processStatResults(results, statHistory, combinedStates.Get(), lastStats, toData.Get(), errorCount, dsStats, lastStatEndTimes, lastStatDurations, unpolledCaches)
+ processStatResults(results, statHistory, combinedStates.Get(), lastStats, toData.Get(), errorCount, dsStats, lastStatEndTimes, lastStatDurations, unpolledCaches, monitorConfig.Get())
break innerLoop
}
}
@@ -151,10 +146,11 @@ func processStatResults(
lastStatEndTimes map[enum.CacheName]time.Time,
lastStatDurationsThreadsafe DurationMapThreadsafe,
unpolledCaches UnpolledCachesThreadsafe,
+ mc to.TrafficMonitorConfigMap,
) {
statHistory := statHistoryThreadsafe.Get().Copy()
- maxStats := statHistoryThreadsafe.Max()
for _, result := range results {
+ maxStats := uint64(mc.Profile[mc.TrafficServer[string(result.ID)].Profile].Parameters.HistoryCount)
// TODO determine if we want to add results with errors, or just print the errors now and don't add them.
statHistory[result.ID] = pruneHistory(append(statHistory[result.ID], result), maxStats)
}
http://git-wip-us.apache.org/repos/asf/incubator-trafficcontrol/blob/57b17e08/traffic_monitor/experimental/traffic_monitor/peer/peer.go
----------------------------------------------------------------------
diff --git a/traffic_monitor/experimental/traffic_monitor/peer/peer.go b/traffic_monitor/experimental/traffic_monitor/peer/peer.go
index b6e9873..01d5c52 100644
--- a/traffic_monitor/experimental/traffic_monitor/peer/peer.go
+++ b/traffic_monitor/experimental/traffic_monitor/peer/peer.go
@@ -3,6 +3,7 @@ package peer
import (
"encoding/json"
"io"
+ "time"
"github.com/apache/incubator-trafficcontrol/traffic_monitor/experimental/traffic_monitor/enum"
)
@@ -29,7 +30,7 @@ type Result struct {
}
// Handle handles a response from a polled Traffic Monitor peer, parsing the data and forwarding it to the ResultChannel.
-func (handler Handler) Handle(id string, r io.Reader, err error, pollID uint64, pollFinished chan<- uint64) {
+func (handler Handler) Handle(id string, r io.Reader, reqTime time.Duration, err error, pollID uint64, pollFinished chan<- uint64) {
result := Result{
ID: enum.TrafficMonitorName(id),
Available: false,