You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@trafficcontrol.apache.org by ne...@apache.org on 2017/01/25 17:30:30 UTC
[19/20] incubator-trafficcontrol git commit: Move TM2
CalcAvailability to health package
Move TM2 CalcAvailability to health package
Project: http://git-wip-us.apache.org/repos/asf/incubator-trafficcontrol/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-trafficcontrol/commit/dea90b1e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-trafficcontrol/tree/dea90b1e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-trafficcontrol/diff/dea90b1e
Branch: refs/heads/master
Commit: dea90b1ebd8dd10e3080b6c238e476e817022b66
Parents: 7a748fe
Author: Robert Butts <ro...@gmail.com>
Authored: Tue Jan 24 11:52:36 2017 -0700
Committer: Dave Neuman <ne...@apache.org>
Committed: Wed Jan 25 10:29:46 2017 -0700
----------------------------------------------------------------------
.../traffic_monitor/health/cache.go | 79 +++++++++++++++++---
.../traffic_monitor/health/event.go | 47 ++++++++++++
.../traffic_monitor/manager/datarequest.go | 4 +-
.../traffic_monitor/manager/health.go | 29 +------
.../traffic_monitor/manager/manager.go | 3 +-
.../traffic_monitor/manager/opsconfig.go | 3 +-
.../traffic_monitor/manager/peer.go | 9 +--
.../traffic_monitor/manager/stat.go | 43 +----------
.../traffic_monitor/threadsafe/events.go | 71 ------------------
9 files changed, 134 insertions(+), 154 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-trafficcontrol/blob/dea90b1e/traffic_monitor/experimental/traffic_monitor/health/cache.go
----------------------------------------------------------------------
diff --git a/traffic_monitor/experimental/traffic_monitor/health/cache.go b/traffic_monitor/experimental/traffic_monitor/health/cache.go
index aec1d8f..533652a 100644
--- a/traffic_monitor/experimental/traffic_monitor/health/cache.go
+++ b/traffic_monitor/experimental/traffic_monitor/health/cache.go
@@ -23,19 +23,17 @@ import (
"fmt"
"strconv"
"strings"
+ "time"
"github.com/apache/incubator-trafficcontrol/traffic_monitor/experimental/common/log"
"github.com/apache/incubator-trafficcontrol/traffic_monitor/experimental/traffic_monitor/cache"
"github.com/apache/incubator-trafficcontrol/traffic_monitor/experimental/traffic_monitor/enum"
"github.com/apache/incubator-trafficcontrol/traffic_monitor/experimental/traffic_monitor/peer"
+ "github.com/apache/incubator-trafficcontrol/traffic_monitor/experimental/traffic_monitor/threadsafe"
+ todata "github.com/apache/incubator-trafficcontrol/traffic_monitor/experimental/traffic_monitor/trafficopsdata"
to "github.com/apache/incubator-trafficcontrol/traffic_ops/client"
)
-func setErr(newResult *cache.Result, err error) {
- newResult.Error = err
- newResult.Available = false
-}
-
// GetVitals Gets the vitals to decide health on in the right format
func GetVitals(newResult *cache.Result, prevResult *cache.Result, mc *to.TrafficMonitorConfigMap) {
if newResult.Error != nil {
@@ -161,16 +159,58 @@ func EvalCache(result cache.ResultInfo, resultStats cache.ResultStatValHistory,
continue
}
- if !InThreshold(threshold, resultStatNum) {
- return false, eventDesc(status, ExceedsThresholdMsg(stat, threshold, resultStatNum)), stat
+ if !inThreshold(threshold, resultStatNum) {
+ return false, eventDesc(status, exceedsThresholdMsg(stat, threshold, resultStatNum)), stat
}
}
return result.Available, eventDesc(status, availability), ""
}
+// CalcAvailability calculates the availability of the cache, from the given result. Availability is stored in `localCacheStatus` and `localStates`, and if the status changed an event is added to `events`. statResultHistory may be nil, for pollers which don't poll stats.
+// TODO add enum for poller names?
+func CalcAvailability(results []cache.Result, pollerName string, statResultHistory cache.ResultStatHistory, mc to.TrafficMonitorConfigMap, toData todata.TOData, localCacheStatusThreadsafe threadsafe.CacheAvailableStatus, localStates peer.CRStatesThreadsafe, events ThreadsafeEvents) {
+ localCacheStatuses := localCacheStatusThreadsafe.Get().Copy()
+ for _, result := range results {
+ statResults := cache.ResultStatValHistory(nil)
+ if statResultHistory != nil {
+ statResults = statResultHistory[result.ID]
+ }
+
+ isAvailable, whyAvailable, unavailableStat := EvalCache(cache.ToInfo(result), statResults, &mc)
+
+ // if the cache is now Available, and was previously unavailable due to a threshold, make sure this poller contains the stat which exceeded the threshold.
+ if previousStatus, hasPreviousStatus := localCacheStatuses[result.ID]; isAvailable && hasPreviousStatus && !previousStatus.Available && previousStatus.UnavailableStat != "" {
+ if !result.HasStat(previousStatus.UnavailableStat) {
+ return
+ }
+ }
+ localCacheStatuses[result.ID] = cache.AvailableStatus{
+ Available: isAvailable,
+ Status: mc.TrafficServer[string(result.ID)].Status,
+ Why: whyAvailable,
+ UnavailableStat: unavailableStat,
+ Poller: pollerName,
+ } // TODO move within localStates?
+
+ if available, ok := localStates.GetCache(result.ID); !ok || available.IsAvailable != isAvailable {
+ log.Infof("Changing state for %s was: %t now: %t because %s poller: %v error: %v", result.ID, available.IsAvailable, isAvailable, whyAvailable, pollerName, result.Error)
+ events.Add(Event{Time: time.Now(), Description: whyAvailable + " (" + pollerName + ")", Name: string(result.ID), Hostname: string(result.ID), Type: toData.ServerTypes[result.ID].String(), Available: isAvailable})
+ }
+
+ localStates.SetCache(result.ID, peer.IsAvailable{IsAvailable: isAvailable})
+ }
+ calculateDeliveryServiceState(toData.DeliveryServiceServers, localStates)
+ localCacheStatusThreadsafe.Set(localCacheStatuses)
+}
+
+func setErr(newResult *cache.Result, err error) {
+ newResult.Error = err
+ newResult.Available = false
+}
+
// ExceedsThresholdMsg returns a human-readable message for why the given value exceeds the threshold. It does NOT check whether the value actually exceeds the threshold; call `InThreshold` to check first.
-func ExceedsThresholdMsg(stat string, threshold to.HealthThreshold, val float64) string {
+func exceedsThresholdMsg(stat string, threshold to.HealthThreshold, val float64) string {
switch threshold.Comparator {
case "=":
return fmt.Sprintf("%s not equal (%f != %f)", stat, val, threshold.Val)
@@ -187,7 +227,7 @@ func ExceedsThresholdMsg(stat string, threshold to.HealthThreshold, val float64)
}
}
-func InThreshold(threshold to.HealthThreshold, val float64) bool {
+func inThreshold(threshold to.HealthThreshold, val float64) bool {
switch threshold.Comparator {
case "=":
return val == threshold.Val
@@ -208,3 +248,24 @@ func InThreshold(threshold to.HealthThreshold, val float64) bool {
func eventDesc(status enum.CacheStatus, message string) string {
return fmt.Sprintf("%s - %s", status, message)
}
+
+// calculateDeliveryServiceState calculates the state of delivery services from the new cache state data `cacheState` and the CRConfig data `deliveryServiceServers` and puts the calculated state in the outparam `deliveryServiceStates`
+func calculateDeliveryServiceState(deliveryServiceServers map[enum.DeliveryServiceName][]enum.CacheName, states peer.CRStatesThreadsafe) {
+ deliveryServices := states.GetDeliveryServices()
+ for deliveryServiceName, deliveryServiceState := range deliveryServices {
+ if _, ok := deliveryServiceServers[deliveryServiceName]; !ok {
+ // log.Errorf("CRConfig does not have delivery service %s, but traffic monitor poller does; skipping\n", deliveryServiceName)
+ continue
+ }
+ deliveryServiceState.IsAvailable = false
+ deliveryServiceState.DisabledLocations = []enum.CacheName{} // it's important this isn't nil, so it serialises to the JSON `[]` instead of `null`
+ for _, server := range deliveryServiceServers[deliveryServiceName] {
+ if available, _ := states.GetCache(server); available.IsAvailable {
+ deliveryServiceState.IsAvailable = true
+ } else {
+ deliveryServiceState.DisabledLocations = append(deliveryServiceState.DisabledLocations, server)
+ }
+ }
+ states.SetDeliveryService(deliveryServiceName, deliveryServiceState)
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-trafficcontrol/blob/dea90b1e/traffic_monitor/experimental/traffic_monitor/health/event.go
----------------------------------------------------------------------
diff --git a/traffic_monitor/experimental/traffic_monitor/health/event.go b/traffic_monitor/experimental/traffic_monitor/health/event.go
index 47563fa..3150da4 100644
--- a/traffic_monitor/experimental/traffic_monitor/health/event.go
+++ b/traffic_monitor/experimental/traffic_monitor/health/event.go
@@ -20,7 +20,10 @@ package health
*/
import (
+ "sync"
"time"
+
+ "github.com/apache/incubator-trafficcontrol/traffic_monitor/experimental/common/log"
)
// Event represents an event change in aggregated data. For example, a cache being marked as unavailable.
@@ -34,3 +37,47 @@ type Event struct {
Type string `json:"type"`
Available bool `json:"isAvailable"`
}
+
+// Events provides safe access for multiple goroutines readers and a single writer to a stored Events slice.
+type ThreadsafeEvents struct {
+ events *[]Event
+ m *sync.RWMutex
+ nextIndex *uint64
+ max uint64
+}
+
+func copyEvents(a []Event) []Event {
+ b := make([]Event, len(a), len(a))
+ copy(b, a)
+ return b
+}
+
+// NewEvents creates a new single-writer-multiple-reader Threadsafe object
+func NewThreadsafeEvents(maxEvents uint64) ThreadsafeEvents {
+ i := uint64(0)
+ return ThreadsafeEvents{m: &sync.RWMutex{}, events: &[]Event{}, nextIndex: &i, max: maxEvents}
+}
+
+// Get returns the internal slice of Events for reading. This MUST NOT be modified. If modification is necessary, copy the slice.
+func (o *ThreadsafeEvents) Get() []Event {
+ o.m.RLock()
+ defer o.m.RUnlock()
+ return *o.events
+}
+
+// Add adds the given event. This is threadsafe for one writer, multiple readers. This MUST NOT be called by multiple threads, as it non-atomically fetches and adds.
+func (o *ThreadsafeEvents) Add(e Event) {
+ // host="hostname", type=EDGE, available=true, msg="REPORTED - available"
+ log.Eventf(e.Time, "host=\"%s\", type=%s, available=%t, msg=\"%s\"", e.Hostname, e.Type, e.Available, e.Description)
+ o.m.Lock() // TODO test removing
+ events := copyEvents(*o.events)
+ e.Index = *o.nextIndex
+ events = append([]Event{e}, events...)
+ if len(events) > int(o.max) {
+ events = (events)[:o.max-1]
+ }
+ // o.m.Lock()
+ *o.events = events
+ *o.nextIndex++
+ o.m.Unlock()
+}
http://git-wip-us.apache.org/repos/asf/incubator-trafficcontrol/blob/dea90b1e/traffic_monitor/experimental/traffic_monitor/manager/datarequest.go
----------------------------------------------------------------------
diff --git a/traffic_monitor/experimental/traffic_monitor/manager/datarequest.go b/traffic_monitor/experimental/traffic_monitor/manager/datarequest.go
index 30fcc47..eddf972 100644
--- a/traffic_monitor/experimental/traffic_monitor/manager/datarequest.go
+++ b/traffic_monitor/experimental/traffic_monitor/manager/datarequest.go
@@ -592,7 +592,7 @@ func srvDSStats(params url.Values, errorCount threadsafe.Uint, path string, toDa
return WrapErrCode(errorCount, path, bytes, err)
}
-func srvEventLog(events threadsafe.Events) ([]byte, error) {
+func srvEventLog(events health.ThreadsafeEvents) ([]byte, error) {
return json.Marshal(JSONEvents{Events: events.Get()})
}
@@ -704,7 +704,7 @@ func MakeDispatchMap(
statMaxKbpses threadsafe.CacheKbpses,
healthHistory threadsafe.ResultHistory,
dsStats threadsafe.DSStatsReader,
- events threadsafe.Events,
+ events health.ThreadsafeEvents,
staticAppData StaticAppData,
healthPollInterval time.Duration,
lastHealthDurations DurationMapThreadsafe,
http://git-wip-us.apache.org/repos/asf/incubator-trafficcontrol/blob/dea90b1e/traffic_monitor/experimental/traffic_monitor/manager/health.go
----------------------------------------------------------------------
diff --git a/traffic_monitor/experimental/traffic_monitor/manager/health.go b/traffic_monitor/experimental/traffic_monitor/manager/health.go
index 63248e8..e2657f8 100644
--- a/traffic_monitor/experimental/traffic_monitor/manager/health.go
+++ b/traffic_monitor/experimental/traffic_monitor/manager/health.go
@@ -86,7 +86,7 @@ func StartHealthResultManager(
fetchCount threadsafe.Uint,
errorCount threadsafe.Uint,
cfg config.Config,
- events threadsafe.Events,
+ events health.ThreadsafeEvents,
localCacheStatus threadsafe.CacheAvailableStatus,
) (DurationMapThreadsafe, threadsafe.ResultHistory) {
lastHealthDurations := NewDurationMapThreadsafe()
@@ -120,7 +120,7 @@ func healthResultManagerListen(
combinedStates peer.CRStatesThreadsafe,
fetchCount threadsafe.Uint,
errorCount threadsafe.Uint,
- events threadsafe.Events,
+ events health.ThreadsafeEvents,
localCacheStatus threadsafe.CacheAvailableStatus,
cfg config.Config,
) {
@@ -186,7 +186,7 @@ func processHealthResult(
combinedStates peer.CRStatesThreadsafe,
fetchCount threadsafe.Uint,
errorCount threadsafe.Uint,
- events threadsafe.Events,
+ events health.ThreadsafeEvents,
localCacheStatusThreadsafe threadsafe.CacheAvailableStatus,
lastHealthEndTimes map[enum.CacheName]time.Time,
healthHistory threadsafe.ResultHistory,
@@ -229,7 +229,7 @@ func processHealthResult(
healthHistoryCopy[healthResult.ID] = pruneHistory(append([]cache.Result{healthResult}, healthHistoryCopy[healthResult.ID]...), maxHistory)
}
- calcAvailability(results, "health", nil, monitorConfigCopy, toDataCopy, localCacheStatusThreadsafe, localStates, events)
+ health.CalcAvailability(results, "health", nil, monitorConfigCopy, toDataCopy, localCacheStatusThreadsafe, localStates, events)
healthHistory.Set(healthHistoryCopy)
// TODO determine if we should combineCrStates() here
@@ -244,24 +244,3 @@ func processHealthResult(
}
lastHealthDurationsThreadsafe.Set(lastHealthDurations)
}
-
-// calculateDeliveryServiceState calculates the state of delivery services from the new cache state data `cacheState` and the CRConfig data `deliveryServiceServers` and puts the calculated state in the outparam `deliveryServiceStates`
-func CalculateDeliveryServiceState(deliveryServiceServers map[enum.DeliveryServiceName][]enum.CacheName, states peer.CRStatesThreadsafe) {
- deliveryServices := states.GetDeliveryServices()
- for deliveryServiceName, deliveryServiceState := range deliveryServices {
- if _, ok := deliveryServiceServers[deliveryServiceName]; !ok {
- // log.Errorf("CRConfig does not have delivery service %s, but traffic monitor poller does; skipping\n", deliveryServiceName)
- continue
- }
- deliveryServiceState.IsAvailable = false
- deliveryServiceState.DisabledLocations = []enum.CacheName{} // it's important this isn't nil, so it serialises to the JSON `[]` instead of `null`
- for _, server := range deliveryServiceServers[deliveryServiceName] {
- if available, _ := states.GetCache(server); available.IsAvailable {
- deliveryServiceState.IsAvailable = true
- } else {
- deliveryServiceState.DisabledLocations = append(deliveryServiceState.DisabledLocations, server)
- }
- }
- states.SetDeliveryService(deliveryServiceName, deliveryServiceState)
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-trafficcontrol/blob/dea90b1e/traffic_monitor/experimental/traffic_monitor/manager/manager.go
----------------------------------------------------------------------
diff --git a/traffic_monitor/experimental/traffic_monitor/manager/manager.go b/traffic_monitor/experimental/traffic_monitor/manager/manager.go
index 79c69af..974b3eb 100644
--- a/traffic_monitor/experimental/traffic_monitor/manager/manager.go
+++ b/traffic_monitor/experimental/traffic_monitor/manager/manager.go
@@ -29,6 +29,7 @@ import (
"github.com/apache/incubator-trafficcontrol/traffic_monitor/experimental/common/poller"
"github.com/apache/incubator-trafficcontrol/traffic_monitor/experimental/traffic_monitor/cache"
"github.com/apache/incubator-trafficcontrol/traffic_monitor/experimental/traffic_monitor/config"
+ "github.com/apache/incubator-trafficcontrol/traffic_monitor/experimental/traffic_monitor/health"
"github.com/apache/incubator-trafficcontrol/traffic_monitor/experimental/traffic_monitor/peer"
"github.com/apache/incubator-trafficcontrol/traffic_monitor/experimental/traffic_monitor/threadsafe"
todata "github.com/apache/incubator-trafficcontrol/traffic_monitor/experimental/traffic_monitor/trafficopsdata"
@@ -86,7 +87,7 @@ func Start(opsConfigFile string, cfg config.Config, staticAppData StaticAppData)
go cacheStatPoller.Poll()
go peerPoller.Poll()
- events := threadsafe.NewEvents(cfg.MaxEvents)
+ events := health.NewThreadsafeEvents(cfg.MaxEvents)
cachesChanged := make(chan struct{})
http://git-wip-us.apache.org/repos/asf/incubator-trafficcontrol/blob/dea90b1e/traffic_monitor/experimental/traffic_monitor/manager/opsconfig.go
----------------------------------------------------------------------
diff --git a/traffic_monitor/experimental/traffic_monitor/manager/opsconfig.go b/traffic_monitor/experimental/traffic_monitor/manager/opsconfig.go
index 29ffd01..8f7a0d2 100644
--- a/traffic_monitor/experimental/traffic_monitor/manager/opsconfig.go
+++ b/traffic_monitor/experimental/traffic_monitor/manager/opsconfig.go
@@ -28,6 +28,7 @@ import (
"github.com/apache/incubator-trafficcontrol/traffic_monitor/experimental/common/log"
"github.com/apache/incubator-trafficcontrol/traffic_monitor/experimental/common/poller"
"github.com/apache/incubator-trafficcontrol/traffic_monitor/experimental/traffic_monitor/config"
+ "github.com/apache/incubator-trafficcontrol/traffic_monitor/experimental/traffic_monitor/health"
"github.com/apache/incubator-trafficcontrol/traffic_monitor/experimental/traffic_monitor/peer"
"github.com/apache/incubator-trafficcontrol/traffic_monitor/experimental/traffic_monitor/srvhttp"
"github.com/apache/incubator-trafficcontrol/traffic_monitor/experimental/traffic_monitor/threadsafe"
@@ -79,7 +80,7 @@ func StartOpsConfigManager(
healthHistory threadsafe.ResultHistory,
lastStats threadsafe.LastStats,
dsStats threadsafe.DSStatsReader,
- events threadsafe.Events,
+ events health.ThreadsafeEvents,
staticAppData StaticAppData,
healthPollInterval time.Duration,
lastHealthDurations DurationMapThreadsafe,
http://git-wip-us.apache.org/repos/asf/incubator-trafficcontrol/blob/dea90b1e/traffic_monitor/experimental/traffic_monitor/manager/peer.go
----------------------------------------------------------------------
diff --git a/traffic_monitor/experimental/traffic_monitor/manager/peer.go b/traffic_monitor/experimental/traffic_monitor/manager/peer.go
index 13b8411..3b4a209 100644
--- a/traffic_monitor/experimental/traffic_monitor/manager/peer.go
+++ b/traffic_monitor/experimental/traffic_monitor/manager/peer.go
@@ -31,7 +31,6 @@ import (
"github.com/apache/incubator-trafficcontrol/traffic_monitor/experimental/traffic_monitor/enum"
"github.com/apache/incubator-trafficcontrol/traffic_monitor/experimental/traffic_monitor/health"
"github.com/apache/incubator-trafficcontrol/traffic_monitor/experimental/traffic_monitor/peer"
- "github.com/apache/incubator-trafficcontrol/traffic_monitor/experimental/traffic_monitor/threadsafe"
todata "github.com/apache/incubator-trafficcontrol/traffic_monitor/experimental/traffic_monitor/trafficopsdata"
)
@@ -40,11 +39,11 @@ func StartPeerManager(
peerChan <-chan peer.Result,
localStates peer.CRStatesThreadsafe,
peerStates peer.CRStatesPeersThreadsafe,
- events threadsafe.Events,
+ events health.ThreadsafeEvents,
peerOptimistic bool,
toData todata.TODataThreadsafe,
cfg config.Config,
-) (peer.CRStatesThreadsafe, threadsafe.Events) {
+) (peer.CRStatesThreadsafe, health.ThreadsafeEvents) {
combinedStates := peer.NewCRStatesThreadsafe()
overrideMap := map[enum.CacheName]bool{}
@@ -59,14 +58,14 @@ func StartPeerManager(
return combinedStates, events
}
-func comparePeerState(events threadsafe.Events, result peer.Result, peerStates peer.CRStatesPeersThreadsafe) {
+func comparePeerState(events health.ThreadsafeEvents, result peer.Result, peerStates peer.CRStatesPeersThreadsafe) {
if result.Available != peerStates.GetPeerAvailability(result.ID) {
events.Add(health.Event{Time: result.Time, Unix: result.Time.Unix(), Description: util.JoinErrorsString(result.Errors), Name: result.ID.String(), Hostname: result.ID.String(), Type: "Peer", Available: result.Available})
}
}
// TODO JvD: add deliveryservice stuff
-func combineCrStates(events threadsafe.Events, peerOptimistic bool, peerStates peer.CRStatesPeersThreadsafe, localStates peer.Crstates, combinedStates peer.CRStatesThreadsafe, overrideMap map[enum.CacheName]bool, toData todata.TODataThreadsafe) {
+func combineCrStates(events health.ThreadsafeEvents, peerOptimistic bool, peerStates peer.CRStatesPeersThreadsafe, localStates peer.Crstates, combinedStates peer.CRStatesThreadsafe, overrideMap map[enum.CacheName]bool, toData todata.TODataThreadsafe) {
toDataCopy := toData.Get()
for cacheName, localCacheState := range localStates.Caches { // localStates gets pruned when servers are disabled, it's the source of truth
http://git-wip-us.apache.org/repos/asf/incubator-trafficcontrol/blob/dea90b1e/traffic_monitor/experimental/traffic_monitor/manager/stat.go
----------------------------------------------------------------------
diff --git a/traffic_monitor/experimental/traffic_monitor/manager/stat.go b/traffic_monitor/experimental/traffic_monitor/manager/stat.go
index a4a9b81..8e69bda 100644
--- a/traffic_monitor/experimental/traffic_monitor/manager/stat.go
+++ b/traffic_monitor/experimental/traffic_monitor/manager/stat.go
@@ -67,7 +67,7 @@ func StartStatHistoryManager(
errorCount threadsafe.Uint,
cfg config.Config,
monitorConfig TrafficMonitorConfigMapThreadsafe,
- events threadsafe.Events,
+ events health.ThreadsafeEvents,
) (threadsafe.ResultInfoHistory, threadsafe.ResultStatHistory, threadsafe.CacheKbpses, DurationMapThreadsafe, threadsafe.LastStats, threadsafe.DSStatsReader, threadsafe.UnpolledCaches, threadsafe.CacheAvailableStatus) {
statInfoHistory := threadsafe.NewResultInfoHistory()
statResultHistory := threadsafe.NewResultStatHistory()
@@ -141,7 +141,7 @@ func processStatResults(
precomputedData map[enum.CacheName]cache.PrecomputedData,
lastResults map[enum.CacheName]cache.Result,
localStates peer.CRStatesThreadsafe,
- events threadsafe.Events,
+ events health.ThreadsafeEvents,
localCacheStatusThreadsafe threadsafe.CacheAvailableStatus,
) {
if len(results) == 0 {
@@ -212,7 +212,7 @@ func processStatResults(
lastStats.Set(newLastStats)
}
- calcAvailability(results, "stat", statResultHistory, mc, toData, localCacheStatusThreadsafe, localStates, events)
+ health.CalcAvailability(results, "stat", statResultHistory, mc, toData, localCacheStatusThreadsafe, localStates, events)
endTime := time.Now()
lastStatDurations := lastStatDurationsThreadsafe.Get().Copy()
@@ -226,40 +226,3 @@ func processStatResults(
lastStatDurationsThreadsafe.Set(lastStatDurations)
unpolledCaches.SetPolled(results, lastStats.Get())
}
-
-// calcAvailability calculates the availability of the cache, from the given result. Availability is stored in `localCacheStatus` and `localStates`, and if the status changed an event is added to `events`. statResultHistory may be nil, for pollers which don't poll stats.
-// TODO add enum for poller names?
-func calcAvailability(results []cache.Result, pollerName string, statResultHistory cache.ResultStatHistory, mc to.TrafficMonitorConfigMap, toData todata.TOData, localCacheStatusThreadsafe threadsafe.CacheAvailableStatus, localStates peer.CRStatesThreadsafe, events threadsafe.Events) {
- localCacheStatuses := localCacheStatusThreadsafe.Get().Copy()
- for _, result := range results {
- statResults := cache.ResultStatValHistory(nil)
- if statResultHistory != nil {
- statResults = statResultHistory[result.ID]
- }
-
- isAvailable, whyAvailable, unavailableStat := health.EvalCache(cache.ToInfo(result), statResults, &mc)
-
- // if the cache is now Available, and was previously unavailable due to a threshold, make sure this poller contains the stat which exceeded the threshold.
- if previousStatus, hasPreviousStatus := localCacheStatuses[result.ID]; isAvailable && hasPreviousStatus && !previousStatus.Available && previousStatus.UnavailableStat != "" {
- if !result.HasStat(previousStatus.UnavailableStat) {
- return
- }
- }
- localCacheStatuses[result.ID] = cache.AvailableStatus{
- Available: isAvailable,
- Status: mc.TrafficServer[string(result.ID)].Status,
- Why: whyAvailable,
- UnavailableStat: unavailableStat,
- Poller: pollerName,
- } // TODO move within localStates?
-
- if available, ok := localStates.GetCache(result.ID); !ok || available.IsAvailable != isAvailable {
- log.Infof("Changing state for %s was: %t now: %t because %s poller: %v error: %v", result.ID, available.IsAvailable, isAvailable, whyAvailable, pollerName, result.Error)
- events.Add(health.Event{Time: time.Now(), Description: whyAvailable + " (" + pollerName + ")", Name: string(result.ID), Hostname: string(result.ID), Type: toData.ServerTypes[result.ID].String(), Available: isAvailable})
- }
-
- localStates.SetCache(result.ID, peer.IsAvailable{IsAvailable: isAvailable})
- }
- CalculateDeliveryServiceState(toData.DeliveryServiceServers, localStates)
- localCacheStatusThreadsafe.Set(localCacheStatuses)
-}
http://git-wip-us.apache.org/repos/asf/incubator-trafficcontrol/blob/dea90b1e/traffic_monitor/experimental/traffic_monitor/threadsafe/events.go
----------------------------------------------------------------------
diff --git a/traffic_monitor/experimental/traffic_monitor/threadsafe/events.go b/traffic_monitor/experimental/traffic_monitor/threadsafe/events.go
deleted file mode 100644
index 9cd0320..0000000
--- a/traffic_monitor/experimental/traffic_monitor/threadsafe/events.go
+++ /dev/null
@@ -1,71 +0,0 @@
-package threadsafe
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import (
- "sync"
-
- "github.com/apache/incubator-trafficcontrol/traffic_monitor/experimental/common/log"
- "github.com/apache/incubator-trafficcontrol/traffic_monitor/experimental/traffic_monitor/health"
-)
-
-// Events provides safe access for multiple goroutines readers and a single writer to a stored Events slice.
-type Events struct {
- events *[]health.Event
- m *sync.RWMutex
- nextIndex *uint64
- max uint64
-}
-
-func copyEvents(a []health.Event) []health.Event {
- b := make([]health.Event, len(a), len(a))
- copy(b, a)
- return b
-}
-
-// NewEvents creates a new single-writer-multiple-reader Threadsafe object
-func NewEvents(maxEvents uint64) Events {
- i := uint64(0)
- return Events{m: &sync.RWMutex{}, events: &[]health.Event{}, nextIndex: &i, max: maxEvents}
-}
-
-// Get returns the internal slice of Events for reading. This MUST NOT be modified. If modification is necessary, copy the slice.
-func (o *Events) Get() []health.Event {
- o.m.RLock()
- defer o.m.RUnlock()
- return *o.events
-}
-
-// Add adds the given event. This is threadsafe for one writer, multiple readers. This MUST NOT be called by multiple threads, as it non-atomically fetches and adds.
-func (o *Events) Add(e health.Event) {
- // host="hostname", type=EDGE, available=true, msg="REPORTED - available"
- log.Eventf(e.Time, "host=\"%s\", type=%s, available=%t, msg=\"%s\"", e.Hostname, e.Type, e.Available, e.Description)
- o.m.Lock() // TODO test removing
- events := copyEvents(*o.events)
- e.Index = *o.nextIndex
- events = append([]health.Event{e}, events...)
- if len(events) > int(o.max) {
- events = (events)[:o.max-1]
- }
- // o.m.Lock()
- *o.events = events
- *o.nextIndex++
- o.m.Unlock()
-}