You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@trafficcontrol.apache.org by GitBox <gi...@apache.org> on 2020/08/06 18:57:19 UTC

[GitHub] [trafficcontrol] shamrickus commented on a change in pull request #4916: Multi interface health

shamrickus commented on a change in pull request #4916:
URL: https://github.com/apache/trafficcontrol/pull/4916#discussion_r466615443



##########
File path: traffic_monitor/threadsafe/resultstathistory.go
##########
@@ -55,55 +64,212 @@ func (h *ResultInfoHistory) Get() cache.ResultInfoHistory {
 	return *h.history
 }
 
-// Set sets the internal ResultInfoHistory. This is only safe for one thread of execution. This MUST NOT be called from multiple threads.
+// Set sets the internal ResultInfoHistory. This is only safe for one thread of
+// execution. This MUST NOT be called from multiple threads.
 func (h *ResultInfoHistory) Set(v cache.ResultInfoHistory) {
 	h.m.Lock()
 	*h.history = v
 	h.m.Unlock()
 }
 
-type ResultStatHistory struct{ *sync.Map } // map[tc.CacheName]map[interfaceName]ResultStatValHistory
+// ResultStatHistory is a thread-safe mapping of cache server hostnames to
+// CacheStatHistory objects containing statistics for those cache servers.
+type ResultStatHistory struct{ *sync.Map } // map[string]CacheStatHistory
 
+// NewResultStatHistory constructs a new, empty ResultStatHistory.
 func NewResultStatHistory() ResultStatHistory {
 	return ResultStatHistory{&sync.Map{}}
 }
 
-func (h ResultStatHistory) LoadOrStore(cache tc.CacheName) map[string]ResultStatValHistory {
+// LoadOrStore returns the stored CacheStatHistory for the given cache server
+// hostname if it has already been stored. If it has not already been stored, a
+// new, empty CacheStatHistory object is created, stored under the given
+// hostname, and returned.
+func (h ResultStatHistory) LoadOrStore(hostname string) CacheStatHistory {
 	// TODO change to use sync.Pool?
-	v, loaded := h.Map.LoadOrStore(cache, NewResultStatValHistory())
-	if !loaded {
-		v = map[string]ResultStatValHistory{}
-	}
-	if rv, ok := v.(ResultStatValHistory); ok {
-		v = map[string]ResultStatValHistory{tc.CacheInterfacesAggregate: rv}
+	v, _ := h.Map.LoadOrStore(hostname, NewCacheStatHistory())
+	rv, ok := v.(CacheStatHistory)
+	if !ok {
+		log.Errorf("Failed to load or store stat history for '%s': invalid stored type.", hostname)
+		return NewCacheStatHistory()
 	}
-	return v.(map[string]ResultStatValHistory)
+
+	return rv
 }
 
-// Range behaves like sync.Map.Range. It calls f for every value in the map; if f returns false, the iteration is stopped.
-func (h ResultStatHistory) Range(f func(cache tc.CacheName, interfaceName string, val ResultStatValHistory) bool) {
+// Range behaves like sync.Map.Range. It calls f for every value in the map; if
+// f returns false, the iteration is stopped.
+func (h ResultStatHistory) Range(f func(cacheName string, val CacheStatHistory) bool) {
 	h.Map.Range(func(k, v interface{}) bool {
-		i, ok := v.(map[string]ResultStatValHistory)
+		i, ok := v.(CacheStatHistory)
 		if !ok {
-			log.Warnln("Cannot umarshal result stat val history")
+			log.Warnf("Non-CacheStatHistory object (%T) found in ResultStatHistory during Range.", v)
 			return true
 		}
-		for a, b := range i {
-			if !f(k.(tc.CacheName), a, b) {
-				return false
-			}
+		cacheName, ok := k.(string)
+		if !ok {
+			log.Warnf("Non-string object (%T) found as key in ResultStatHistory during Range.", k)
+			return true
 		}
-		return true
+		return f(cacheName, i)
 	})
 }
 
-// ResultStatValHistory is threadsafe for one writer. Specifically, because a CompareAndSwap is not provided, it's not possible to Load and Store without a race condition.
-// If multiple writers were necessary, it wouldn't be difficult to add a CompareAndSwap, internally storing an atomically-accessed pointer to the slice.
+// This is just a convenience structure used only for passing data about a
+// single statistic for a network interface into
+// compareAndAppendStatForInterface.

Review comment:
       struct name in godoc.

##########
File path: traffic_monitor/health/cache.go
##########
@@ -144,177 +191,151 @@ func EvalCache(result cache.ResultInfo, resultStats *threadsafe.ResultStatValHis
 		}
 
 		if !inThreshold(threshold, resultStatNum) {
-			return false, result.UsingIPv4, eventDesc(status, exceedsThresholdMsg(stat, threshold, resultStatNum)), stat
+			return false, eventDesc(status, exceedsThresholdMsg(stat, threshold, resultStatNum)), stat
 		}
 	}
 
-	return avail, result.UsingIPv4, eventDescVal, eventMsg
+	return avail, eventDescVal, eventMsg
 }
 
-// CalcAvailabilityWithStats calculates the availability of each cache in results.
-// statResultHistory may be nil, in which case stats won't be used to calculate availability.
-func CalcAvailability(results []cache.Result, pollerName string, statResultHistory *threadsafe.ResultStatHistory, mc tc.LegacyTrafficMonitorConfigMap, toData todata.TOData, localCacheStatusThreadsafe threadsafe.CacheAvailableStatus, localStates peer.CRStatesThreadsafe, events ThreadsafeEvents, protocol config.PollingProtocol) {
-	localCacheStatuses := localCacheStatusThreadsafe.Get().Copy()
-	statResults := (*threadsafe.ResultStatValHistory)(nil)
-	statResultsVal := (*map[string]threadsafe.ResultStatValHistory)(nil)
-	processAvailableTuple := func(tuple cache.AvailableTuple, serverInfo tc.LegacyTrafficServer) bool {
-		switch protocol {
-		case config.IPv4Only:
+// gets a function to process an availability tuple based on the protocol used.

Review comment:
       Missing function name for godocs

##########
File path: traffic_monitor/manager/opsconfig.go
##########
@@ -169,42 +164,26 @@ func StartOpsConfigManager(
 			backoff = util.NewConstantBackoff(util.ConstantBackoffDuration)
 		}
 		for {
-			realToSession, toAddr, err = to.LoginWithAgent(newOpsConfig.Url, newOpsConfig.Username, newOpsConfig.Password, newOpsConfig.Insecure, staticAppData.UserAgent, useCache, trafficOpsRequestTimeout)
+			err = toSession.Update(newOpsConfig.Url, newOpsConfig.Username, newOpsConfig.Password, newOpsConfig.Insecure, staticAppData.UserAgent, useCache, trafficOpsRequestTimeout)
 			if err != nil {
 				handleErr(fmt.Errorf("MonitorConfigPoller: error instantiating Session with traffic_ops (%v): %s\n", toAddr, err))
 				duration := backoff.BackoffDuration()
 				log.Errorf("retrying in %v\n", duration)
 				time.Sleep(duration)
 
 				if toSession.BackupFileExists() && (toLoginCount >= cfg.TrafficOpsDiskRetryMax) {
-					jar, err := cookiejar.New(nil)
-					if err != nil {
-						log.Errorf("Err getting cookiejar")
-						continue
-					}
-
-					realToSession = to.NewSession(newOpsConfig.Username, newOpsConfig.Password, newOpsConfig.Url, staticAppData.UserAgent, &http.Client{
-						Timeout: trafficOpsRequestTimeout,
-						Transport: &http.Transport{
-							TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
-						},
-						Jar: jar,
-					}, useCache)
-					toSession.Set(realToSession)
-					// At this point we have a valid 'dummy' session. This will allow us to pull from disk but will also retry when TO comes up

Review comment:
       Are you sure removing this won't break TM when TO is unaccessible but the backup file exists? 

##########
File path: traffic_monitor/threadsafe/resultstathistory.go
##########
@@ -253,32 +440,23 @@ func StatsMarshall(statResultHistory ResultStatHistory, statInfo cache.ResultInf
 		}
 
 		for i, resultInfo := range statInfo[id] {
-			if !filter.WithinStatHistoryMax(i + 1) {
+			if !filter.WithinStatHistoryMax(uint64(i) + 1) {
 				break
 			}
-			if _, ok := stats.Caches[id]; !ok {
-				stats.Caches[id] = map[string]map[string][]cache.ResultStatVal{}
-			}
 
 			t := resultInfo.Time
 
 			for stat, statValF := range computedStats {
 				if !filter.UseStat(stat) {
 					continue
 				}
-				if _, ok := stats.Caches[id][tc.CacheInterfacesAggregate]; !ok {
-					stats.Caches[id][tc.CacheInterfacesAggregate] = map[string][]cache.ResultStatVal{}
-				}
-				stats.Caches[id][tc.CacheInterfacesAggregate][stat] = append(stats.Caches[id][tc.CacheInterfacesAggregate][stat],
-					cache.ResultStatVal{Val: statValF(resultInfo, serverInfo, serverProfile, combinedStatesCache), Time: t, Span: 1})
 				// Need to actually handle interfaces, needs vitals to be refactored

Review comment:
       Same story as above

##########
File path: traffic_monitor/towrap/towrap.go
##########
@@ -124,59 +157,114 @@ func (h CRConfigHistoryThreadsafe) Add(i *CRConfigStat) {
 
 	(*h.hist)[*h.pos] = *i
 	*h.pos = (*h.pos + 1) % *h.limit
-	if *h.len < *h.limit {
-		*h.len++
+	if *h.length < *h.limit {
+		*h.length++
 	}
 }
 
+// Get retrieves the stored history of CRConfigStat entries.
 func (h CRConfigHistoryThreadsafe) Get() []CRConfigStat {
 	h.m.RLock()
 	defer h.m.RUnlock()
-	if *h.len < *h.limit {
-		return CopyCRConfigStat((*h.hist)[:*h.len])
+	if *h.length < *h.limit {
+		return CopyCRConfigStat((*h.hist)[:*h.length])
 	}
-	new := make([]CRConfigStat, *h.limit)
-	copy(new, (*h.hist)[*h.pos:])
-	copy(new[*h.len-*h.pos:], (*h.hist)[:*h.pos])
-	return new
-}
-
-func CopyCRConfigStat(old []CRConfigStat) []CRConfigStat {
-	new := make([]CRConfigStat, len(old))
-	copy(new, old)
-	return new
+	newStats := make([]CRConfigStat, *h.limit)
+	copy(newStats, (*h.hist)[*h.pos:])
+	copy(newStats[*h.length-*h.pos:], (*h.hist)[:*h.pos])
+	return newStats
 }
 
-type CRConfigStat struct {
-	ReqTime time.Time        `json:"request_time"`
-	ReqAddr string           `json:"request_address"`
-	Stats   tc.CRConfigStats `json:"stats"`
-	Err     error            `json:"error"`
+// Len gives the number of currently stored items in the buffer.
+//
+// An uninitialized buffer has zero length.
+func (h CRConfigHistoryThreadsafe) Len() uint64 {
+	if h.length == nil {
+		return 0
+	}
+	return *h.length
 }
 
-// TrafficOpsSessionThreadsafe provides access to the Traffic Ops client safe for multiple goroutines. This fulfills the ITrafficOpsSession interface.
+// TrafficOpsSessionThreadsafe provides access to the Traffic Ops client safe
+// for multiple goroutines. This fulfills the ITrafficOpsSession interface.
 type TrafficOpsSessionThreadsafe struct {
 	session            **client.Session // pointer-to-pointer, because we're given a pointer from the Traffic Ops package, and we don't want to copy it.
+	legacySession      **legacyClient.Session
 	m                  *sync.Mutex
 	lastCRConfig       ByteMapCache
 	crConfigHist       CRConfigHistoryThreadsafe
+	useLegacy          bool

Review comment:
       What is the point of this and `legacySession`? It doesn't appear to actually get used anywhere.

##########
File path: traffic_monitor/static/style.css
##########
@@ -44,11 +44,11 @@ tbody tr:nth-child(odd) {
 	background: #fff;
 }
 .error {
-	background-color: #f00;
+	background-color: #f00!important;
 }
 
 .warning {
-	background-color: #f80;

Review comment:
       Is this fixing a bug?

##########
File path: traffic_monitor/threadsafe/resultstathistory.go
##########
@@ -55,55 +64,212 @@ func (h *ResultInfoHistory) Get() cache.ResultInfoHistory {
 	return *h.history
 }
 
-// Set sets the internal ResultInfoHistory. This is only safe for one thread of execution. This MUST NOT be called from multiple threads.
+// Set sets the internal ResultInfoHistory. This is only safe for one thread of
+// execution. This MUST NOT be called from multiple threads.
 func (h *ResultInfoHistory) Set(v cache.ResultInfoHistory) {
 	h.m.Lock()
 	*h.history = v
 	h.m.Unlock()
 }
 
-type ResultStatHistory struct{ *sync.Map } // map[tc.CacheName]map[interfaceName]ResultStatValHistory
+// ResultStatHistory is a thread-safe mapping of cache server hostnames to
+// CacheStatHistory objects containing statistics for those cache servers.
+type ResultStatHistory struct{ *sync.Map } // map[string]CacheStatHistory
 
+// NewResultStatHistory constructs a new, empty ResultStatHistory.
 func NewResultStatHistory() ResultStatHistory {
 	return ResultStatHistory{&sync.Map{}}
 }
 
-func (h ResultStatHistory) LoadOrStore(cache tc.CacheName) map[string]ResultStatValHistory {
+// LoadOrStore returns the stored CacheStatHistory for the given cache server
+// hostname if it has already been stored. If it has not already been stored, a
+// new, empty CacheStatHistory object is created, stored under the given
+// hostname, and returned.
+func (h ResultStatHistory) LoadOrStore(hostname string) CacheStatHistory {
 	// TODO change to use sync.Pool?
-	v, loaded := h.Map.LoadOrStore(cache, NewResultStatValHistory())
-	if !loaded {
-		v = map[string]ResultStatValHistory{}
-	}
-	if rv, ok := v.(ResultStatValHistory); ok {
-		v = map[string]ResultStatValHistory{tc.CacheInterfacesAggregate: rv}
+	v, _ := h.Map.LoadOrStore(hostname, NewCacheStatHistory())
+	rv, ok := v.(CacheStatHistory)
+	if !ok {
+		log.Errorf("Failed to load or store stat history for '%s': invalid stored type.", hostname)
+		return NewCacheStatHistory()
 	}
-	return v.(map[string]ResultStatValHistory)
+
+	return rv
 }
 
-// Range behaves like sync.Map.Range. It calls f for every value in the map; if f returns false, the iteration is stopped.
-func (h ResultStatHistory) Range(f func(cache tc.CacheName, interfaceName string, val ResultStatValHistory) bool) {
+// Range behaves like sync.Map.Range. It calls f for every value in the map; if
+// f returns false, the iteration is stopped.
+func (h ResultStatHistory) Range(f func(cacheName string, val CacheStatHistory) bool) {
 	h.Map.Range(func(k, v interface{}) bool {
-		i, ok := v.(map[string]ResultStatValHistory)
+		i, ok := v.(CacheStatHistory)
 		if !ok {
-			log.Warnln("Cannot umarshal result stat val history")
+			log.Warnf("Non-CacheStatHistory object (%T) found in ResultStatHistory during Range.", v)
 			return true
 		}
-		for a, b := range i {
-			if !f(k.(tc.CacheName), a, b) {
-				return false
-			}
+		cacheName, ok := k.(string)
+		if !ok {
+			log.Warnf("Non-string object (%T) found as key in ResultStatHistory during Range.", k)
+			return true
 		}
-		return true
+		return f(cacheName, i)
 	})
 }
 
-// ResultStatValHistory is threadsafe for one writer. Specifically, because a CompareAndSwap is not provided, it's not possible to Load and Store without a race condition.
-// If multiple writers were necessary, it wouldn't be difficult to add a CompareAndSwap, internally storing an atomically-accessed pointer to the slice.
+// This is just a convenience structure used only for passing data about a
+// single statistic for a network interface into
+// compareAndAppendStatForInterface.
+type interfaceStat struct {
+	InterfaceName string
+	Stat          interface{}
+	StatName      string
+	Time          time.Time
+}
+
+// This is a little helper function used to compare a single stat for a single
+// network interface to its historical values and do the appropriate appending
+// and management of the history to ensure it never exceeds `limit`.
+func compareAndAppendStatForInterface(history []cache.ResultStatVal, errs strings.Builder, limit uint64, stat interfaceStat) []cache.ResultStatVal {
+	if history == nil {
+		history = make([]cache.ResultStatVal, 0, limit)
+	}
+
+	ok, err := newStatEqual(history, stat.Stat)
+	if err != nil {
+		errs.WriteString(stat.InterfaceName)
+		errs.Write([]byte(": cannot add stat "))
+		errs.WriteString(stat.StatName)
+		errs.Write([]byte(": "))
+		errs.WriteString(err.Error())
+		errs.Write([]byte("; "))

Review comment:
       Any reason not to use `errs.Write([]bytes(fmt.Sprintf(...))` instead?

##########
File path: traffic_monitor/threadsafe/resultstathistory.go
##########
@@ -216,27 +375,55 @@ func StatsMarshall(statResultHistory ResultStatHistory, statInfo cache.ResultInf
 			continue
 		}
 
-		cacheStatResultHistory := statResultHistory.LoadOrStore(id)
-		for interfaceName, interfaceHistory := range cacheStatResultHistory {
+		cacheId := string(id)
+
+		cacheStatResultHistory := statResultHistory.LoadOrStore(cacheId)
+		if _, ok := stats.Caches[cacheId]; !ok {
+			stats.Caches[cacheId] = cache.StatsCache{
+				Interfaces: make(map[string]map[string][]cache.ResultStatVal),
+				Stats:      make(map[string][]cache.ResultStatVal),
+			}
+		}
+
+		cacheStatResultHistory.Stats.Range(func(stat string, vals []cache.ResultStatVal) bool {
+			stat = "ats." + stat // legacy reasons
+			if !filter.UseStat(stat) {
+				return true
+			}
+
+			var historyCount uint64 = 1
+			for _, val := range vals {
+				if !filter.WithinStatHistoryMax(historyCount) {
+					break
+				}
+				if _, ok := stats.Caches[cacheId].Stats[stat]; !ok {
+					stats.Caches[cacheId].Stats[stat] = []cache.ResultStatVal{val}
+				} else {
+					stats.Caches[cacheId].Stats[stat] = append(stats.Caches[cacheId].Stats[stat], val)
+				}
+				historyCount += val.Span
+			}
+
+			return true
+		})
+
+		for interfaceName, interfaceHistory := range cacheStatResultHistory.Interfaces {
 			interfaceHistory.Range(func(stat string, vals []cache.ResultStatVal) bool {
-				stat = "ats." + stat // TM1 prefixes ATS stats with 'ats.'
-				if !filter.UseStat(stat) {
+				if !filter.UseInterfaceStat(stat) {
 					return true
 				}
-				historyCount := 1
-				if _, ok := stats.Caches[id]; !ok {
-					stats.Caches[id] = map[string]map[string][]cache.ResultStatVal{}
-				}
+
+				var historyCount uint64 = 1
 				for _, val := range vals {
 					if !filter.WithinStatHistoryMax(historyCount) {
 						break
 					}
-					if _, ok := stats.Caches[id][interfaceName]; !ok {
-						stats.Caches[id][interfaceName] = map[string][]cache.ResultStatVal{}
+					if _, ok := stats.Caches[cacheId].Interfaces[interfaceName]; !ok {
+						stats.Caches[cacheId].Interfaces[interfaceName] = map[string][]cache.ResultStatVal{}
 					}
-					stats.Caches[id][interfaceName][stat] = append(stats.Caches[id][interfaceName][stat], val)
+					stats.Caches[cacheId].Interfaces[interfaceName][stat] = append(stats.Caches[cacheId].Interfaces[interfaceName][stat], val)
 					// Todo add for each interface?

Review comment:
       This comment (which was mine originally) no longer makes any sense




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org