You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@trafficcontrol.apache.org by ne...@apache.org on 2016/12/08 18:46:03 UTC

[01/20] incubator-trafficcontrol git commit: Move TM2 Apache license header for consistency

Repository: incubator-trafficcontrol
Updated Branches:
  refs/heads/master f25489da5 -> 0a17ffe06


Move TM2 Apache license header for consistency


Project: http://git-wip-us.apache.org/repos/asf/incubator-trafficcontrol/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-trafficcontrol/commit/11d6ffb5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-trafficcontrol/tree/11d6ffb5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-trafficcontrol/diff/11d6ffb5

Branch: refs/heads/master
Commit: 11d6ffb5b79f6cfde10ab42bddcb21daa21dd0a3
Parents: 6eeb732
Author: Robert Butts <ro...@gmail.com>
Authored: Thu Dec 8 10:01:25 2016 -0700
Committer: Dave Neuman <ne...@apache.org>
Committed: Thu Dec 8 11:44:33 2016 -0700

----------------------------------------------------------------------
 traffic_monitor/experimental/traffic_monitor/cache/data.go | 8 ++++----
 1 file changed, 4 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-trafficcontrol/blob/11d6ffb5/traffic_monitor/experimental/traffic_monitor/cache/data.go
----------------------------------------------------------------------
diff --git a/traffic_monitor/experimental/traffic_monitor/cache/data.go b/traffic_monitor/experimental/traffic_monitor/cache/data.go
index 6c02906..59b752d 100644
--- a/traffic_monitor/experimental/traffic_monitor/cache/data.go
+++ b/traffic_monitor/experimental/traffic_monitor/cache/data.go
@@ -1,9 +1,5 @@
 package cache
 
-import (
-	"github.com/apache/incubator-trafficcontrol/traffic_monitor/experimental/traffic_monitor/enum"
-)
-
 /*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
@@ -23,6 +19,10 @@ import (
  * under the License.
  */
 
+import (
+	"github.com/apache/incubator-trafficcontrol/traffic_monitor/experimental/traffic_monitor/enum"
+)
+
 // CacheAvailableStatusReported is the status string returned by caches set to "reported" in Traffic Ops.
 // TODO put somewhere more generic
 const AvailableStatusReported = "REPORTED"


[12/20] incubator-trafficcontrol git commit: Moved TM2 manager threadsafe types to separate dir

Posted by ne...@apache.org.
Moved TM2 manager threadsafe types to separate dir


Project: http://git-wip-us.apache.org/repos/asf/incubator-trafficcontrol/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-trafficcontrol/commit/005d7f35
Tree: http://git-wip-us.apache.org/repos/asf/incubator-trafficcontrol/tree/005d7f35
Diff: http://git-wip-us.apache.org/repos/asf/incubator-trafficcontrol/diff/005d7f35

Branch: refs/heads/master
Commit: 005d7f35476ec620ac2b200382257ab6bf1a8d56
Parents: d661da2
Author: Robert Butts <ro...@gmail.com>
Authored: Wed Nov 30 08:34:05 2016 -0700
Committer: Dave Neuman <ne...@apache.org>
Committed: Thu Dec 8 11:44:33 2016 -0700

----------------------------------------------------------------------
 .../manager/cacheavailablestatus.go             |  74 ----------
 .../traffic_monitor/manager/datarequest.go      |  53 +++----
 .../traffic_monitor/manager/dsstats.go          |  58 --------
 .../traffic_monitor/manager/events.go           |  79 ----------
 .../traffic_monitor/manager/healthresult.go     |  43 +++---
 .../traffic_monitor/manager/lastkbpsstats.go    |  52 -------
 .../traffic_monitor/manager/manager.go          |   9 +-
 .../traffic_monitor/manager/opsconfig.go        |  21 +--
 .../traffic_monitor/manager/polledcaches.go     | 146 ------------------
 .../traffic_monitor/manager/resulthistory.go    |  72 ---------
 .../traffic_monitor/manager/stathistory.go      |  25 ++--
 .../traffic_monitor/manager/uintman.go          |  52 -------
 .../threadsafe/cacheavailablestatus.go          |  52 +++++++
 .../traffic_monitor/threadsafe/dsstats.go       |  57 +++++++
 .../traffic_monitor/threadsafe/events.go        |  67 +++++++++
 .../traffic_monitor/threadsafe/lastkbpsstats.go |  51 +++++++
 .../traffic_monitor/threadsafe/polledcaches.go  | 147 +++++++++++++++++++
 .../traffic_monitor/threadsafe/resulthistory.go |  54 +++++++
 .../traffic_monitor/threadsafe/uint.go          |  51 +++++++
 19 files changed, 557 insertions(+), 606 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-trafficcontrol/blob/005d7f35/traffic_monitor/experimental/traffic_monitor/manager/cacheavailablestatus.go
----------------------------------------------------------------------
diff --git a/traffic_monitor/experimental/traffic_monitor/manager/cacheavailablestatus.go b/traffic_monitor/experimental/traffic_monitor/manager/cacheavailablestatus.go
deleted file mode 100644
index 67860b1..0000000
--- a/traffic_monitor/experimental/traffic_monitor/manager/cacheavailablestatus.go
+++ /dev/null
@@ -1,74 +0,0 @@
-package manager
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-
-import (
-	"github.com/apache/incubator-trafficcontrol/traffic_monitor/experimental/traffic_monitor/enum"
-	"sync"
-)
-
-// CacheAvailableStatusReported is the status string returned by caches set to "reported" in Traffic Ops.
-// TODO put somewhere more generic
-const CacheAvailableStatusReported = "REPORTED"
-
-// CacheAvailableStatus is the available status of the given cache. It includes a boolean available/unavailable flag, and a descriptive string.
-type CacheAvailableStatus struct {
-	Available bool
-	Status    string
-}
-
-// CacheAvailableStatuses is the available status of each cache.
-type CacheAvailableStatuses map[enum.CacheName]CacheAvailableStatus
-
-// CacheAvailableStatusThreadsafe wraps a map of cache available statuses to be safe for multiple reader goroutines and one writer.
-type CacheAvailableStatusThreadsafe struct {
-	caches *CacheAvailableStatuses
-	m      *sync.RWMutex
-}
-
-// Copy copies this CacheAvailableStatuses. It does not modify, and thus is safe for multiple reader goroutines.
-func (a CacheAvailableStatuses) Copy() CacheAvailableStatuses {
-	b := CacheAvailableStatuses(map[enum.CacheName]CacheAvailableStatus{})
-	for k, v := range a {
-		b[k] = v
-	}
-	return b
-}
-
-// NewCacheAvailableStatusThreadsafe creates and returns a new CacheAvailableStatusThreadsafe, initializing internal pointer values.
-func NewCacheAvailableStatusThreadsafe() CacheAvailableStatusThreadsafe {
-	c := CacheAvailableStatuses(map[enum.CacheName]CacheAvailableStatus{})
-	return CacheAvailableStatusThreadsafe{m: &sync.RWMutex{}, caches: &c}
-}
-
-// Get returns the internal map of cache statuses. The returned map MUST NOT be modified. If modification is necessary, copy.
-func (o *CacheAvailableStatusThreadsafe) Get() CacheAvailableStatuses {
-	o.m.RLock()
-	defer o.m.RUnlock()
-	return *o.caches
-}
-
-// Set sets the internal map of cache availability. This MUST NOT be called by multiple goroutines.
-func (o *CacheAvailableStatusThreadsafe) Set(v CacheAvailableStatuses) {
-	o.m.Lock()
-	*o.caches = v
-	o.m.Unlock()
-}

http://git-wip-us.apache.org/repos/asf/incubator-trafficcontrol/blob/005d7f35/traffic_monitor/experimental/traffic_monitor/manager/datarequest.go
----------------------------------------------------------------------
diff --git a/traffic_monitor/experimental/traffic_monitor/manager/datarequest.go b/traffic_monitor/experimental/traffic_monitor/manager/datarequest.go
index 346814e..478f9b4 100644
--- a/traffic_monitor/experimental/traffic_monitor/manager/datarequest.go
+++ b/traffic_monitor/experimental/traffic_monitor/manager/datarequest.go
@@ -37,6 +37,7 @@ import (
 	"github.com/apache/incubator-trafficcontrol/traffic_monitor/experimental/traffic_monitor/enum"
 	"github.com/apache/incubator-trafficcontrol/traffic_monitor/experimental/traffic_monitor/peer"
 	"github.com/apache/incubator-trafficcontrol/traffic_monitor/experimental/traffic_monitor/srvhttp"
+	"github.com/apache/incubator-trafficcontrol/traffic_monitor/experimental/traffic_monitor/threadsafe"
 	todata "github.com/apache/incubator-trafficcontrol/traffic_monitor/experimental/traffic_monitor/trafficopsdata"
 	towrap "github.com/apache/incubator-trafficcontrol/traffic_monitor/experimental/traffic_monitor/trafficopswrapper"
 	to "github.com/apache/incubator-trafficcontrol/traffic_ops/client"
@@ -44,7 +45,7 @@ import (
 
 // JSONEvents represents the structure we wish to serialize to JSON, for Events.
 type JSONEvents struct {
-	Events []Event `json:"events"`
+	Events []cache.Event `json:"events"`
 }
 
 // CacheState represents the available state of a cache.
@@ -441,7 +442,7 @@ func NewPeerStateFilter(params url.Values, cacheTypes map[enum.CacheName]enum.Ca
 }
 
 // HandleErr takes an error, and the request type it came from, and logs. It is ok to call with a nil error, in which case this is a no-op.
-func HandleErr(errorCount UintThreadsafe, reqPath string, err error) {
+func HandleErr(errorCount threadsafe.Uint, reqPath string, err error) {
 	if err == nil {
 		return
 	}
@@ -450,7 +451,7 @@ func HandleErr(errorCount UintThreadsafe, reqPath string, err error) {
 }
 
 // WrapErrCode takes the body, err, and log context (errorCount, reqPath). It logs and deals with any error, and returns the appropriate bytes and response code for the `srvhttp`. It notably returns InternalServerError status on any error, for security reasons.
-func WrapErrCode(errorCount UintThreadsafe, reqPath string, body []byte, err error) ([]byte, int) {
+func WrapErrCode(errorCount threadsafe.Uint, reqPath string, body []byte, err error) ([]byte, int) {
 	if err == nil {
 		return body, http.StatusOK
 	}
@@ -466,7 +467,7 @@ func WrapBytes(f func() []byte) http.HandlerFunc {
 }
 
 // WrapErr takes a function which returns bytes and an error, and wraps it as a http.HandlerFunc. If the error is nil, the bytes are written with Status OK. Else, the error is logged, and InternalServerError is returned as the response code. If you need to return a different response code (for example, StatusBadRequest), call wrapRespCode.
-func WrapErr(errorCount UintThreadsafe, f func() ([]byte, error)) http.HandlerFunc {
+func WrapErr(errorCount threadsafe.Uint, f func() ([]byte, error)) http.HandlerFunc {
 	return func(w http.ResponseWriter, r *http.Request) {
 		bytes, err := f()
 		_, code := WrapErrCode(errorCount, r.URL.EscapedPath(), bytes, err)
@@ -509,13 +510,13 @@ func srvTRConfig(opsConfig OpsConfigThreadsafe, toSession towrap.ITrafficOpsSess
 	return toSession.CRConfigRaw(cdnName)
 }
 
-func makeWrapAll(errorCount UintThreadsafe, unpolledCaches UnpolledCachesThreadsafe) func(http.HandlerFunc) http.HandlerFunc {
+func makeWrapAll(errorCount threadsafe.Uint, unpolledCaches threadsafe.UnpolledCaches) func(http.HandlerFunc) http.HandlerFunc {
 	return func(f http.HandlerFunc) http.HandlerFunc {
 		return wrapUnpolledCheck(unpolledCaches, errorCount, f)
 	}
 }
 
-func makeCrConfigHandler(wrapper func(http.HandlerFunc) http.HandlerFunc, errorCount UintThreadsafe, opsConfig OpsConfigThreadsafe, toSession towrap.ITrafficOpsSession) http.HandlerFunc {
+func makeCrConfigHandler(wrapper func(http.HandlerFunc) http.HandlerFunc, errorCount threadsafe.Uint, opsConfig OpsConfigThreadsafe, toSession towrap.ITrafficOpsSession) http.HandlerFunc {
 	return wrapper(WrapErr(errorCount, func() ([]byte, error) {
 		return srvTRConfig(opsConfig, toSession)
 	}))
@@ -537,7 +538,7 @@ func srvTRStateSelf(localStates peer.CRStatesThreadsafe) ([]byte, error) {
 }
 
 // TODO remove error params, handle by returning an error? How, since we need to return a non-standard code?
-func srvCacheStats(params url.Values, errorCount UintThreadsafe, errContext string, toData todata.TODataThreadsafe, statHistory ResultHistoryThreadsafe) ([]byte, int) {
+func srvCacheStats(params url.Values, errorCount threadsafe.Uint, errContext string, toData todata.TODataThreadsafe, statHistory threadsafe.ResultHistory) ([]byte, int) {
 	filter, err := NewCacheStatFilter(params, toData.Get().ServerTypes)
 	if err != nil {
 		HandleErr(errorCount, errContext, err)
@@ -547,7 +548,7 @@ func srvCacheStats(params url.Values, errorCount UintThreadsafe, errContext stri
 	return WrapErrCode(errorCount, errContext, bytes, err)
 }
 
-func srvDSStats(params url.Values, errorCount UintThreadsafe, errContext string, toData todata.TODataThreadsafe, dsStats DSStatsReader) ([]byte, int) {
+func srvDSStats(params url.Values, errorCount threadsafe.Uint, errContext string, toData todata.TODataThreadsafe, dsStats threadsafe.DSStatsReader) ([]byte, int) {
 	filter, err := NewDSStatFilter(params, toData.Get().DeliveryServiceTypes)
 	if err != nil {
 		HandleErr(errorCount, errContext, err)
@@ -557,11 +558,11 @@ func srvDSStats(params url.Values, errorCount UintThreadsafe, errContext string,
 	return WrapErrCode(errorCount, errContext, bytes, err)
 }
 
-func srvEventLog(events EventsThreadsafe) ([]byte, error) {
+func srvEventLog(events threadsafe.Events) ([]byte, error) {
 	return json.Marshal(JSONEvents{Events: events.Get()})
 }
 
-func srvPeerStates(params url.Values, errorCount UintThreadsafe, errContext string, toData todata.TODataThreadsafe, peerStates peer.CRStatesPeersThreadsafe) ([]byte, int) {
+func srvPeerStates(params url.Values, errorCount threadsafe.Uint, errContext string, toData todata.TODataThreadsafe, peerStates peer.CRStatesPeersThreadsafe) ([]byte, int) {
 	filter, err := NewPeerStateFilter(params, toData.Get().ServerTypes)
 	if err != nil {
 		HandleErr(errorCount, errContext, err)
@@ -575,7 +576,7 @@ func srvStatSummary() ([]byte, int) {
 	return nil, http.StatusNotImplemented
 }
 
-func srvStats(staticAppData StaticAppData, healthPollInterval time.Duration, lastHealthDurations DurationMapThreadsafe, fetchCount UintThreadsafe, healthIteration UintThreadsafe, errorCount UintThreadsafe) ([]byte, error) {
+func srvStats(staticAppData StaticAppData, healthPollInterval time.Duration, lastHealthDurations DurationMapThreadsafe, fetchCount threadsafe.Uint, healthIteration threadsafe.Uint, errorCount threadsafe.Uint) ([]byte, error) {
 	return getStats(staticAppData, healthPollInterval, lastHealthDurations.Get(), fetchCount.Get(), healthIteration.Get(), errorCount.Get())
 }
 
@@ -614,11 +615,11 @@ func srvAPIVersion(staticAppData StaticAppData) []byte {
 func srvAPITrafficOpsURI(opsConfig OpsConfigThreadsafe) []byte {
 	return []byte(opsConfig.Get().Url)
 }
-func srvAPICacheStates(toData todata.TODataThreadsafe, statHistory ResultHistoryThreadsafe, healthHistory ResultHistoryThreadsafe, lastHealthDurations DurationMapThreadsafe, localStates peer.CRStatesThreadsafe, lastStats LastStatsThreadsafe, localCacheStatus CacheAvailableStatusThreadsafe) ([]byte, error) {
+func srvAPICacheStates(toData todata.TODataThreadsafe, statHistory threadsafe.ResultHistory, healthHistory threadsafe.ResultHistory, lastHealthDurations DurationMapThreadsafe, localStates peer.CRStatesThreadsafe, lastStats threadsafe.LastStats, localCacheStatus threadsafe.CacheAvailableStatus) ([]byte, error) {
 	return json.Marshal(createCacheStatuses(toData.Get().ServerTypes, statHistory.Get(), healthHistory.Get(), lastHealthDurations.Get(), localStates.Get().Caches, lastStats.Get(), localCacheStatus))
 }
 
-func srvAPIBandwidthKbps(toData todata.TODataThreadsafe, lastStats LastStatsThreadsafe) []byte {
+func srvAPIBandwidthKbps(toData todata.TODataThreadsafe, lastStats threadsafe.LastStats) []byte {
 	kbpsStats := lastStats.Get()
 	sum := float64(0.0)
 	for _, data := range kbpsStats.Caches {
@@ -626,7 +627,7 @@ func srvAPIBandwidthKbps(toData todata.TODataThreadsafe, lastStats LastStatsThre
 	}
 	return []byte(fmt.Sprintf("%f", sum))
 }
-func srvAPIBandwidthCapacityKbps(statHistoryThs ResultHistoryThreadsafe) []byte {
+func srvAPIBandwidthCapacityKbps(statHistoryThs threadsafe.ResultHistory) []byte {
 	statHistory := statHistoryThs.Get()
 	cap := int64(0)
 	for _, results := range statHistory {
@@ -639,7 +640,7 @@ func srvAPIBandwidthCapacityKbps(statHistoryThs ResultHistoryThreadsafe) []byte
 }
 
 // WrapUnpolledCheck wraps an http.HandlerFunc, returning ServiceUnavailable if any caches are unpolled; else, calling the wrapped func.
-func wrapUnpolledCheck(unpolledCaches UnpolledCachesThreadsafe, errorCount UintThreadsafe, f http.HandlerFunc) http.HandlerFunc {
+func wrapUnpolledCheck(unpolledCaches threadsafe.UnpolledCaches, errorCount threadsafe.Uint, f http.HandlerFunc) http.HandlerFunc {
 	return func(w http.ResponseWriter, r *http.Request) {
 		if unpolledCaches.Any() {
 			HandleErr(errorCount, r.URL.EscapedPath(), fmt.Errorf("service still starting, some caches unpolled"))
@@ -658,20 +659,20 @@ func MakeDispatchMap(
 	localStates peer.CRStatesThreadsafe,
 	peerStates peer.CRStatesPeersThreadsafe,
 	combinedStates peer.CRStatesThreadsafe,
-	statHistory ResultHistoryThreadsafe,
-	healthHistory ResultHistoryThreadsafe,
-	dsStats DSStatsReader,
-	events EventsThreadsafe,
+	statHistory threadsafe.ResultHistory,
+	healthHistory threadsafe.ResultHistory,
+	dsStats threadsafe.DSStatsReader,
+	events threadsafe.Events,
 	staticAppData StaticAppData,
 	healthPollInterval time.Duration,
 	lastHealthDurations DurationMapThreadsafe,
-	fetchCount UintThreadsafe,
-	healthIteration UintThreadsafe,
-	errorCount UintThreadsafe,
+	fetchCount threadsafe.Uint,
+	healthIteration threadsafe.Uint,
+	errorCount threadsafe.Uint,
 	toData todata.TODataThreadsafe,
-	localCacheStatus CacheAvailableStatusThreadsafe,
-	lastStats LastStatsThreadsafe,
-	unpolledCaches UnpolledCachesThreadsafe,
+	localCacheStatus threadsafe.CacheAvailableStatus,
+	lastStats threadsafe.LastStats,
+	unpolledCaches threadsafe.UnpolledCaches,
 	monitorConfig TrafficMonitorConfigMapThreadsafe,
 ) map[string]http.HandlerFunc {
 
@@ -794,7 +795,7 @@ func createCacheStatuses(
 	lastHealthDurations map[enum.CacheName]time.Duration,
 	cacheStates map[enum.CacheName]peer.IsAvailable,
 	lastStats ds.LastStats,
-	localCacheStatusThreadsafe CacheAvailableStatusThreadsafe,
+	localCacheStatusThreadsafe threadsafe.CacheAvailableStatus,
 ) map[enum.CacheName]CacheStatus {
 	conns := createCacheConnections(statHistory)
 	statii := map[enum.CacheName]CacheStatus{}

http://git-wip-us.apache.org/repos/asf/incubator-trafficcontrol/blob/005d7f35/traffic_monitor/experimental/traffic_monitor/manager/dsstats.go
----------------------------------------------------------------------
diff --git a/traffic_monitor/experimental/traffic_monitor/manager/dsstats.go b/traffic_monitor/experimental/traffic_monitor/manager/dsstats.go
deleted file mode 100644
index 23a3fef..0000000
--- a/traffic_monitor/experimental/traffic_monitor/manager/dsstats.go
+++ /dev/null
@@ -1,58 +0,0 @@
-package manager
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-
-import (
-	ds "github.com/apache/incubator-trafficcontrol/traffic_monitor/experimental/traffic_monitor/deliveryservice"
-	dsdata "github.com/apache/incubator-trafficcontrol/traffic_monitor/experimental/traffic_monitor/deliveryservicedata"
-	"sync"
-)
-
-// DSStatsThreadsafe wraps a deliveryservice.Stats object to be safe for multiple reader goroutines and a single writer.
-type DSStatsThreadsafe struct {
-	dsStats *ds.Stats
-	m       *sync.RWMutex
-}
-
-// DSStatsReader permits reading of a dsdata.Stats object, but not writing. This is designed so a Stats object can safely be passed to multiple goroutines, without worry one may unsafely write.
-type DSStatsReader interface {
-	Get() dsdata.StatsReadonly
-}
-
-// NewDSStatsThreadsafe returns a deliveryservice.Stats object wrapped to be safe for multiple readers and a single writer.
-func NewDSStatsThreadsafe() DSStatsThreadsafe {
-	s := ds.NewStats()
-	return DSStatsThreadsafe{m: &sync.RWMutex{}, dsStats: &s}
-}
-
-// Get returns a Stats object safe for reading by multiple goroutines
-func (o *DSStatsThreadsafe) Get() dsdata.StatsReadonly {
-	o.m.RLock()
-	defer o.m.RUnlock()
-	return *o.dsStats
-}
-
-// Set sets the internal Stats object. This MUST NOT be called by multiple goroutines.
-func (o *DSStatsThreadsafe) Set(newDsStats ds.Stats) {
-	o.m.Lock()
-	*o.dsStats = newDsStats
-	o.m.Unlock()
-}

http://git-wip-us.apache.org/repos/asf/incubator-trafficcontrol/blob/005d7f35/traffic_monitor/experimental/traffic_monitor/manager/events.go
----------------------------------------------------------------------
diff --git a/traffic_monitor/experimental/traffic_monitor/manager/events.go b/traffic_monitor/experimental/traffic_monitor/manager/events.go
deleted file mode 100644
index 0a35783..0000000
--- a/traffic_monitor/experimental/traffic_monitor/manager/events.go
+++ /dev/null
@@ -1,79 +0,0 @@
-package manager
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-
-import (
-	"sync"
-
-	"github.com/apache/incubator-trafficcontrol/traffic_monitor/experimental/traffic_monitor/enum"
-)
-
-// Event represents an event change in aggregated data. For example, a cache being marked as unavailable.
-type Event struct {
-	Index       uint64         `json:"index"`
-	Time        int64          `json:"time"`
-	Description string         `json:"description"`
-	Name        enum.CacheName `json:"name"`
-	Hostname    enum.CacheName `json:"hostname"`
-	Type        string         `json:"type"`
-	Available   bool           `json:"isAvailable"`
-}
-
-// EventsThreadsafe provides safe access for multiple goroutines readers and a single writer to a stored Events slice.
-type EventsThreadsafe struct {
-	events    *[]Event
-	m         *sync.RWMutex
-	nextIndex *uint64
-	max       uint64
-}
-
-func copyEvents(a []Event) []Event {
-	b := make([]Event, len(a), len(a))
-	copy(b, a)
-	return b
-}
-
-// NewEventsThreadsafe creates a new single-writer-multiple-reader Threadsafe object
-func NewEventsThreadsafe(maxEvents uint64) EventsThreadsafe {
-	i := uint64(0)
-	return EventsThreadsafe{m: &sync.RWMutex{}, events: &[]Event{}, nextIndex: &i, max: maxEvents}
-}
-
-// Get returns the internal slice of Events for reading. This MUST NOT be modified. If modification is necessary, copy the slice.
-func (o *EventsThreadsafe) Get() []Event {
-	o.m.RLock()
-	defer o.m.RUnlock()
-	return *o.events
-}
-
-// Add adds the given event. This is threadsafe for one writer, multiple readers. This MUST NOT be called by multiple threads, as it non-atomically fetches and adds.
-func (o *EventsThreadsafe) Add(e Event) {
-	events := copyEvents(*o.events)
-	e.Index = *o.nextIndex
-	events = append([]Event{e}, events...)
-	if len(events) > int(o.max) {
-		events = (events)[:o.max-1]
-	}
-	o.m.Lock()
-	*o.events = events
-	*o.nextIndex++
-	o.m.Unlock()
-}

http://git-wip-us.apache.org/repos/asf/incubator-trafficcontrol/blob/005d7f35/traffic_monitor/experimental/traffic_monitor/manager/healthresult.go
----------------------------------------------------------------------
diff --git a/traffic_monitor/experimental/traffic_monitor/manager/healthresult.go b/traffic_monitor/experimental/traffic_monitor/manager/healthresult.go
index b8c46f7..aac3cf6 100644
--- a/traffic_monitor/experimental/traffic_monitor/manager/healthresult.go
+++ b/traffic_monitor/experimental/traffic_monitor/manager/healthresult.go
@@ -29,6 +29,7 @@ import (
 	"github.com/apache/incubator-trafficcontrol/traffic_monitor/experimental/traffic_monitor/enum"
 	"github.com/apache/incubator-trafficcontrol/traffic_monitor/experimental/traffic_monitor/health"
 	"github.com/apache/incubator-trafficcontrol/traffic_monitor/experimental/traffic_monitor/peer"
+	"github.com/apache/incubator-trafficcontrol/traffic_monitor/experimental/traffic_monitor/threadsafe"
 	todata "github.com/apache/incubator-trafficcontrol/traffic_monitor/experimental/traffic_monitor/trafficopsdata"
 )
 
@@ -79,18 +80,18 @@ func StartHealthResultManager(
 	cacheHealthChan <-chan cache.Result,
 	toData todata.TODataThreadsafe,
 	localStates peer.CRStatesThreadsafe,
-	statHistory ResultHistoryThreadsafe,
+	statHistory threadsafe.ResultHistory,
 	monitorConfig TrafficMonitorConfigMapThreadsafe,
 	peerStates peer.CRStatesPeersThreadsafe,
 	combinedStates peer.CRStatesThreadsafe,
-	fetchCount UintThreadsafe,
-	errorCount UintThreadsafe,
+	fetchCount threadsafe.Uint,
+	errorCount threadsafe.Uint,
 	cfg config.Config,
-) (DurationMapThreadsafe, EventsThreadsafe, CacheAvailableStatusThreadsafe, ResultHistoryThreadsafe) {
+) (DurationMapThreadsafe, threadsafe.Events, threadsafe.CacheAvailableStatus, threadsafe.ResultHistory) {
 	lastHealthDurations := NewDurationMapThreadsafe()
-	events := NewEventsThreadsafe(cfg.MaxEvents)
-	localCacheStatus := NewCacheAvailableStatusThreadsafe()
-	healthHistory := NewResultHistoryThreadsafe()
+	events := threadsafe.NewEvents(cfg.MaxEvents)
+	localCacheStatus := threadsafe.NewCacheAvailableStatus()
+	healthHistory := threadsafe.NewResultHistory()
 	go healthResultManagerListen(
 		cacheHealthChan,
 		toData,
@@ -115,15 +116,15 @@ func healthResultManagerListen(
 	toData todata.TODataThreadsafe,
 	localStates peer.CRStatesThreadsafe,
 	lastHealthDurations DurationMapThreadsafe,
-	statHistory ResultHistoryThreadsafe,
-	healthHistory ResultHistoryThreadsafe,
+	statHistory threadsafe.ResultHistory,
+	healthHistory threadsafe.ResultHistory,
 	monitorConfig TrafficMonitorConfigMapThreadsafe,
 	peerStates peer.CRStatesPeersThreadsafe,
 	combinedStates peer.CRStatesThreadsafe,
-	fetchCount UintThreadsafe,
-	errorCount UintThreadsafe,
-	events EventsThreadsafe,
-	localCacheStatus CacheAvailableStatusThreadsafe,
+	fetchCount threadsafe.Uint,
+	errorCount threadsafe.Uint,
+	events threadsafe.Events,
+	localCacheStatus threadsafe.CacheAvailableStatus,
 	cfg config.Config,
 ) {
 	lastHealthEndTimes := map[enum.CacheName]time.Time{}
@@ -192,16 +193,16 @@ func processHealthResult(
 	toData todata.TODataThreadsafe,
 	localStates peer.CRStatesThreadsafe,
 	lastHealthDurationsThreadsafe DurationMapThreadsafe,
-	statHistory ResultHistoryThreadsafe,
+	statHistory threadsafe.ResultHistory,
 	monitorConfig TrafficMonitorConfigMapThreadsafe,
 	peerStates peer.CRStatesPeersThreadsafe,
 	combinedStates peer.CRStatesThreadsafe,
-	fetchCount UintThreadsafe,
-	errorCount UintThreadsafe,
-	events EventsThreadsafe,
-	localCacheStatusThreadsafe CacheAvailableStatusThreadsafe,
+	fetchCount threadsafe.Uint,
+	errorCount threadsafe.Uint,
+	events threadsafe.Events,
+	localCacheStatusThreadsafe threadsafe.CacheAvailableStatus,
 	lastHealthEndTimes map[enum.CacheName]time.Time,
-	healthHistory ResultHistoryThreadsafe,
+	healthHistory threadsafe.ResultHistory,
 	results []cache.Result,
 	cfg config.Config,
 ) {
@@ -231,10 +232,10 @@ func processHealthResult(
 		isAvailable, whyAvailable := health.EvalCache(healthResult, &monitorConfigCopy)
 		if available, ok := localStates.GetCache(healthResult.ID); !ok || available.IsAvailable != isAvailable {
 			log.Infof("Changing state for %s was: %t now: %t because %s error: %v", healthResult.ID, prevResult.Available, isAvailable, whyAvailable, healthResult.Error)
-			events.Add(Event{Time: time.Now().Unix(), Description: whyAvailable, Name: healthResult.ID, Hostname: healthResult.ID, Type: toDataCopy.ServerTypes[healthResult.ID].String(), Available: isAvailable})
+			events.Add(cache.Event{Time: time.Now().Unix(), Description: whyAvailable, Name: healthResult.ID, Hostname: healthResult.ID, Type: toDataCopy.ServerTypes[healthResult.ID].String(), Available: isAvailable})
 		}
 
-		localCacheStatus[healthResult.ID] = CacheAvailableStatus{Available: isAvailable, Status: monitorConfigCopy.TrafficServer[string(healthResult.ID)].Status} // TODO move within localStates?
+		localCacheStatus[healthResult.ID] = cache.AvailableStatus{Available: isAvailable, Status: monitorConfigCopy.TrafficServer[string(healthResult.ID)].Status} // TODO move within localStates?
 		localStates.SetCache(healthResult.ID, peer.IsAvailable{IsAvailable: isAvailable})
 	}
 	calculateDeliveryServiceState(toDataCopy.DeliveryServiceServers, localStates)

http://git-wip-us.apache.org/repos/asf/incubator-trafficcontrol/blob/005d7f35/traffic_monitor/experimental/traffic_monitor/manager/lastkbpsstats.go
----------------------------------------------------------------------
diff --git a/traffic_monitor/experimental/traffic_monitor/manager/lastkbpsstats.go b/traffic_monitor/experimental/traffic_monitor/manager/lastkbpsstats.go
deleted file mode 100644
index cb5811d..0000000
--- a/traffic_monitor/experimental/traffic_monitor/manager/lastkbpsstats.go
+++ /dev/null
@@ -1,52 +0,0 @@
-package manager
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-
-import (
-	ds "github.com/apache/incubator-trafficcontrol/traffic_monitor/experimental/traffic_monitor/deliveryservice"
-	"sync"
-)
-
-// LastStatsThreadsafe wraps a deliveryservice.LastStats object to be safe for multiple readers and one writer.
-type LastStatsThreadsafe struct {
-	stats *ds.LastStats
-	m     *sync.RWMutex
-}
-
-// NewLastStatsThreadsafe returns a wrapped a deliveryservice.LastStats object safe for multiple readers and one writer.
-func NewLastStatsThreadsafe() LastStatsThreadsafe {
-	s := ds.NewLastStats()
-	return LastStatsThreadsafe{m: &sync.RWMutex{}, stats: &s}
-}
-
-// Get returns the last KBPS stats object. Callers MUST NOT modify the object. It is not threadsafe for writing. If the object must be modified, callers must call LastStats.Copy() and modify the copy.
-func (o *LastStatsThreadsafe) Get() ds.LastStats {
-	o.m.RLock()
-	defer o.m.RUnlock()
-	return *o.stats
-}
-
-// Set sets the internal LastStats object. This MUST NOT be called by multiple goroutines.
-func (o *LastStatsThreadsafe) Set(s ds.LastStats) {
-	o.m.Lock()
-	*o.stats = s
-	o.m.Unlock()
-}

http://git-wip-us.apache.org/repos/asf/incubator-trafficcontrol/blob/005d7f35/traffic_monitor/experimental/traffic_monitor/manager/manager.go
----------------------------------------------------------------------
diff --git a/traffic_monitor/experimental/traffic_monitor/manager/manager.go b/traffic_monitor/experimental/traffic_monitor/manager/manager.go
index ffb2c3a..dcb9bee 100644
--- a/traffic_monitor/experimental/traffic_monitor/manager/manager.go
+++ b/traffic_monitor/experimental/traffic_monitor/manager/manager.go
@@ -30,6 +30,7 @@ import (
 	"github.com/apache/incubator-trafficcontrol/traffic_monitor/experimental/traffic_monitor/cache"
 	"github.com/apache/incubator-trafficcontrol/traffic_monitor/experimental/traffic_monitor/config"
 	"github.com/apache/incubator-trafficcontrol/traffic_monitor/experimental/traffic_monitor/peer"
+	"github.com/apache/incubator-trafficcontrol/traffic_monitor/experimental/traffic_monitor/threadsafe"
 	todata "github.com/apache/incubator-trafficcontrol/traffic_monitor/experimental/traffic_monitor/trafficopsdata"
 	towrap "github.com/apache/incubator-trafficcontrol/traffic_monitor/experimental/traffic_monitor/trafficopswrapper"
 	//	to "github.com/apache/incubator-trafficcontrol/traffic_ops/client"
@@ -66,9 +67,9 @@ func Start(opsConfigFile string, cfg config.Config, staticAppData StaticAppData)
 
 	localStates := peer.NewCRStatesThreadsafe()     // this is the local state as discoverer by this traffic_monitor
 	peerStates := peer.NewCRStatesPeersThreadsafe() // each peer's last state is saved in this map
-	fetchCount := NewUintThreadsafe()               // note this is the number of individual caches fetched from, not the number of times all the caches were polled.
-	healthIteration := NewUintThreadsafe()
-	errorCount := NewUintThreadsafe()
+	fetchCount := threadsafe.NewUint()              // note this is the number of individual caches fetched from, not the number of times all the caches were polled.
+	healthIteration := threadsafe.NewUint()
+	errorCount := threadsafe.NewUint()
 
 	toData := todata.NewThreadsafe()
 
@@ -158,7 +159,7 @@ func Start(opsConfigFile string, cfg config.Config, staticAppData StaticAppData)
 }
 
 // healthTickListener listens for health ticks, and writes to the health iteration variable. Does not return.
-func healthTickListener(cacheHealthTick <-chan uint64, healthIteration UintThreadsafe) {
+func healthTickListener(cacheHealthTick <-chan uint64, healthIteration threadsafe.Uint) {
 	for i := range cacheHealthTick {
 		healthIteration.Set(i)
 	}

http://git-wip-us.apache.org/repos/asf/incubator-trafficcontrol/blob/005d7f35/traffic_monitor/experimental/traffic_monitor/manager/opsconfig.go
----------------------------------------------------------------------
diff --git a/traffic_monitor/experimental/traffic_monitor/manager/opsconfig.go b/traffic_monitor/experimental/traffic_monitor/manager/opsconfig.go
index efe43c7..81906ed 100644
--- a/traffic_monitor/experimental/traffic_monitor/manager/opsconfig.go
+++ b/traffic_monitor/experimental/traffic_monitor/manager/opsconfig.go
@@ -30,6 +30,7 @@ import (
 	"github.com/apache/incubator-trafficcontrol/traffic_monitor/experimental/traffic_monitor/config"
 	"github.com/apache/incubator-trafficcontrol/traffic_monitor/experimental/traffic_monitor/peer"
 	"github.com/apache/incubator-trafficcontrol/traffic_monitor/experimental/traffic_monitor/srvhttp"
+	"github.com/apache/incubator-trafficcontrol/traffic_monitor/experimental/traffic_monitor/threadsafe"
 	todata "github.com/apache/incubator-trafficcontrol/traffic_monitor/experimental/traffic_monitor/trafficopsdata"
 	towrap "github.com/apache/incubator-trafficcontrol/traffic_monitor/experimental/traffic_monitor/trafficopswrapper"
 	to "github.com/apache/incubator-trafficcontrol/traffic_ops/client"
@@ -72,19 +73,19 @@ func StartOpsConfigManager(
 	localStates peer.CRStatesThreadsafe,
 	peerStates peer.CRStatesPeersThreadsafe,
 	combinedStates peer.CRStatesThreadsafe,
-	statHistory ResultHistoryThreadsafe,
-	healthHistory ResultHistoryThreadsafe,
-	lastStats LastStatsThreadsafe,
-	dsStats DSStatsReader,
-	events EventsThreadsafe,
+	statHistory threadsafe.ResultHistory,
+	healthHistory threadsafe.ResultHistory,
+	lastStats threadsafe.LastStats,
+	dsStats threadsafe.DSStatsReader,
+	events threadsafe.Events,
 	staticAppData StaticAppData,
 	healthPollInterval time.Duration,
 	lastHealthDurations DurationMapThreadsafe,
-	fetchCount UintThreadsafe,
-	healthIteration UintThreadsafe,
-	errorCount UintThreadsafe,
-	localCacheStatus CacheAvailableStatusThreadsafe,
-	unpolledCaches UnpolledCachesThreadsafe,
+	fetchCount threadsafe.Uint,
+	healthIteration threadsafe.Uint,
+	errorCount threadsafe.Uint,
+	localCacheStatus threadsafe.CacheAvailableStatus,
+	unpolledCaches threadsafe.UnpolledCaches,
 	monitorConfig TrafficMonitorConfigMapThreadsafe,
 	cfg config.Config,
 ) OpsConfigThreadsafe {

http://git-wip-us.apache.org/repos/asf/incubator-trafficcontrol/blob/005d7f35/traffic_monitor/experimental/traffic_monitor/manager/polledcaches.go
----------------------------------------------------------------------
diff --git a/traffic_monitor/experimental/traffic_monitor/manager/polledcaches.go b/traffic_monitor/experimental/traffic_monitor/manager/polledcaches.go
deleted file mode 100644
index 6613a04..0000000
--- a/traffic_monitor/experimental/traffic_monitor/manager/polledcaches.go
+++ /dev/null
@@ -1,146 +0,0 @@
-package manager
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import (
-	"github.com/apache/incubator-trafficcontrol/traffic_monitor/experimental/common/log"
-	"github.com/apache/incubator-trafficcontrol/traffic_monitor/experimental/traffic_monitor/cache"
-	"github.com/apache/incubator-trafficcontrol/traffic_monitor/experimental/traffic_monitor/enum"
-	"sync"
-)
-
-// UnpolledCachesThreadsafe is a structure containing a map of caches which have yet to be polled, which is threadsafe for multiple readers and one writer.
-// This could be made lock-free, if the performance was necessary
-type UnpolledCachesThreadsafe struct {
-	unpolledCaches *map[enum.CacheName]struct{}
-	allCaches      *map[enum.CacheName]struct{}
-	initialized    *bool
-	m              *sync.RWMutex
-}
-
-// NewUnpolledCachesThreadsafe returns a new UnpolledCachesThreadsafe object.
-func NewUnpolledCachesThreadsafe() UnpolledCachesThreadsafe {
-	b := false
-	return UnpolledCachesThreadsafe{
-		m:              &sync.RWMutex{},
-		unpolledCaches: &map[enum.CacheName]struct{}{},
-		allCaches:      &map[enum.CacheName]struct{}{},
-		initialized:    &b,
-	}
-}
-
-// UnpolledCaches returns a map of caches not yet polled. Callers MUST NOT modify. If mutation is necessary, copy the map
-func (t *UnpolledCachesThreadsafe) UnpolledCaches() map[enum.CacheName]struct{} {
-	t.m.RLock()
-	defer t.m.RUnlock()
-	return *t.unpolledCaches
-}
-
-// setUnpolledCaches sets the internal unpolled caches map. This is only safe for one thread of execution. This MUST NOT be called from multiple threads.
-func (t *UnpolledCachesThreadsafe) setUnpolledCaches(v map[enum.CacheName]struct{}) {
-	t.m.Lock()
-	*t.initialized = true
-	*t.unpolledCaches = v
-	t.m.Unlock()
-}
-
-// SetNewCaches takes a list of new caches, which may overlap with the existing caches, diffs them, removes any `unpolledCaches` which aren't in the new list, and sets the list of `polledCaches` (which is only used by this func) to the `newCaches`. This is threadsafe with one writer, along with `setUnpolledCaches`.
-func (t *UnpolledCachesThreadsafe) SetNewCaches(newCaches map[enum.CacheName]struct{}) {
-	unpolledCaches := copyCaches(t.UnpolledCaches())
-	allCaches := copyCaches(*t.allCaches) // not necessary to lock `allCaches`, as the single-writer is the only thing that accesses it.
-	for cache := range unpolledCaches {
-		if _, ok := newCaches[cache]; !ok {
-			delete(unpolledCaches, cache)
-		}
-	}
-	for cache := range allCaches {
-		if _, ok := newCaches[cache]; !ok {
-			delete(allCaches, cache)
-		}
-	}
-	for cache := range newCaches {
-		if _, ok := allCaches[cache]; !ok {
-			unpolledCaches[cache] = struct{}{}
-			allCaches[cache] = struct{}{}
-		}
-	}
-	*t.allCaches = allCaches
-	t.setUnpolledCaches(unpolledCaches)
-}
-
-// Any returns whether there are any caches marked as not polled. Also returns true if SetNewCaches() has never been called (assuming there exist caches, if this hasn't been initialized, we couldn't have polled any of them).
-func (t *UnpolledCachesThreadsafe) Any() bool {
-	t.m.Lock()
-	defer t.m.Unlock()
-	return !(*t.initialized) || len(*t.unpolledCaches) > 0
-}
-
-// copyCaches performs a deep copy of the given map.
-func copyCaches(a map[enum.CacheName]struct{}) map[enum.CacheName]struct{} {
-	b := map[enum.CacheName]struct{}{}
-	for k := range a {
-		b[k] = struct{}{}
-	}
-	return b
-}
-
-// SetPolled sets cache which have been polled. This is used to determine when the app has fully started up, and we can start serving. Serving Traffic Router with caches as 'down' which simply haven't been polled yet would be bad. Therefore, a cache is set as 'polled' if it has received different bandwidths from two different ATS ticks, OR if the cache is marked as down (and thus we won't get a bandwidth).
-// This is threadsafe for one writer, along with `Set`.
-// This is fast if there are no unpolled caches. Moreover, its speed is a function of the number of unpolled caches, not the number of caches total.
-func (t *UnpolledCachesThreadsafe) SetPolled(results []cache.Result, lastStatsThreadsafe LastStatsThreadsafe) {
-	unpolledCaches := copyCaches(t.UnpolledCaches())
-	numUnpolledCaches := len(unpolledCaches)
-	if numUnpolledCaches == 0 {
-		return
-	}
-	lastStats := lastStatsThreadsafe.Get()
-	for cache := range unpolledCaches {
-	innerLoop:
-		for _, result := range results {
-			if result.ID != cache {
-				continue
-			}
-
-			if !result.Available || result.Error != nil {
-				log.Infof("polled %v\n", cache)
-				delete(unpolledCaches, cache)
-				break innerLoop
-			}
-		}
-		lastStat, ok := lastStats.Caches[cache]
-		if !ok {
-			continue
-		}
-		if lastStat.Bytes.PerSec != 0 {
-			log.Infof("polled %v\n", cache)
-			delete(unpolledCaches, cache)
-		}
-	}
-
-	if len(unpolledCaches) == numUnpolledCaches {
-		return
-	}
-	t.setUnpolledCaches(unpolledCaches)
-	if len(unpolledCaches) != 0 {
-		log.Infof("remaining unpolled %v\n", unpolledCaches)
-	} else {
-		log.Infof("all caches polled, ready to serve!\n")
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-trafficcontrol/blob/005d7f35/traffic_monitor/experimental/traffic_monitor/manager/resulthistory.go
----------------------------------------------------------------------
diff --git a/traffic_monitor/experimental/traffic_monitor/manager/resulthistory.go b/traffic_monitor/experimental/traffic_monitor/manager/resulthistory.go
deleted file mode 100644
index f6ccc25..0000000
--- a/traffic_monitor/experimental/traffic_monitor/manager/resulthistory.go
+++ /dev/null
@@ -1,72 +0,0 @@
-package manager
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import (
-	"github.com/apache/incubator-trafficcontrol/traffic_monitor/experimental/traffic_monitor/cache"
-	"github.com/apache/incubator-trafficcontrol/traffic_monitor/experimental/traffic_monitor/enum"
-	"sync"
-)
-
-// ResultHistory is a map of cache names, to an array of result history from each cache.
-type ResultHistory map[enum.CacheName][]cache.Result
-
-func copyResult(a []cache.Result) []cache.Result {
-	b := make([]cache.Result, len(a), len(a))
-	copy(b, a)
-	return b
-}
-
-// Copy copies returns a deep copy of this ResultHistory
-func (a ResultHistory) Copy() ResultHistory {
-	b := ResultHistory{}
-	for k, v := range a {
-		b[k] = copyResult(v)
-	}
-	return b
-}
-
-// ResultHistoryThreadsafe provides safe access for multiple goroutines readers and a single writer to a stored ResultHistory object.
-// This could be made lock-free, if the performance was necessary
-// TODO add separate locks for Caches and Deliveryservice maps?
-type ResultHistoryThreadsafe struct {
-	resultHistory *ResultHistory
-	m             *sync.RWMutex
-}
-
-// NewResultHistoryThreadsafe returns a new ResultHistory safe for multiple readers and a single writer.
-func NewResultHistoryThreadsafe() ResultHistoryThreadsafe {
-	h := ResultHistory{}
-	return ResultHistoryThreadsafe{m: &sync.RWMutex{}, resultHistory: &h}
-}
-
-// Get returns the ResultHistory. Callers MUST NOT modify. If mutation is necessary, call ResultHistory.Copy()
-func (h *ResultHistoryThreadsafe) Get() ResultHistory {
-	h.m.RLock()
-	defer h.m.RUnlock()
-	return *h.resultHistory
-}
-
-// Set sets the internal ResultHistory. This is only safe for one thread of execution. This MUST NOT be called from multiple threads.
-func (h *ResultHistoryThreadsafe) Set(v ResultHistory) {
-	h.m.Lock()
-	*h.resultHistory = v
-	h.m.Unlock()
-}

http://git-wip-us.apache.org/repos/asf/incubator-trafficcontrol/blob/005d7f35/traffic_monitor/experimental/traffic_monitor/manager/stathistory.go
----------------------------------------------------------------------
diff --git a/traffic_monitor/experimental/traffic_monitor/manager/stathistory.go b/traffic_monitor/experimental/traffic_monitor/manager/stathistory.go
index ffad8bc..809ceae 100644
--- a/traffic_monitor/experimental/traffic_monitor/manager/stathistory.go
+++ b/traffic_monitor/experimental/traffic_monitor/manager/stathistory.go
@@ -28,6 +28,7 @@ import (
 	ds "github.com/apache/incubator-trafficcontrol/traffic_monitor/experimental/traffic_monitor/deliveryservice"
 	"github.com/apache/incubator-trafficcontrol/traffic_monitor/experimental/traffic_monitor/enum"
 	"github.com/apache/incubator-trafficcontrol/traffic_monitor/experimental/traffic_monitor/peer"
+	"github.com/apache/incubator-trafficcontrol/traffic_monitor/experimental/traffic_monitor/threadsafe"
 	todata "github.com/apache/incubator-trafficcontrol/traffic_monitor/experimental/traffic_monitor/trafficopsdata"
 	to "github.com/apache/incubator-trafficcontrol/traffic_ops/client"
 )
@@ -62,16 +63,16 @@ func StartStatHistoryManager(
 	combinedStates peer.CRStatesThreadsafe,
 	toData todata.TODataThreadsafe,
 	cachesChanged <-chan struct{},
-	errorCount UintThreadsafe,
+	errorCount threadsafe.Uint,
 	cfg config.Config,
 	monitorConfig TrafficMonitorConfigMapThreadsafe,
-) (ResultHistoryThreadsafe, DurationMapThreadsafe, LastStatsThreadsafe, DSStatsReader, UnpolledCachesThreadsafe) {
-	statHistory := NewResultHistoryThreadsafe()
+) (threadsafe.ResultHistory, DurationMapThreadsafe, threadsafe.LastStats, threadsafe.DSStatsReader, threadsafe.UnpolledCaches) {
+	statHistory := threadsafe.NewResultHistory()
 	lastStatDurations := NewDurationMapThreadsafe()
 	lastStatEndTimes := map[enum.CacheName]time.Time{}
-	lastStats := NewLastStatsThreadsafe()
-	dsStats := NewDSStatsThreadsafe()
-	unpolledCaches := NewUnpolledCachesThreadsafe()
+	lastStats := threadsafe.NewLastStats()
+	dsStats := threadsafe.NewDSStats()
+	unpolledCaches := threadsafe.NewUnpolledCaches()
 	tickInterval := cfg.StatFlushInterval
 	go func() {
 
@@ -109,15 +110,15 @@ func StartStatHistoryManager(
 // processStatResults processes the given results, creating and setting DSStats, LastStats, and other stats. Note this is NOT threadsafe, and MUST NOT be called from multiple threads.
 func processStatResults(
 	results []cache.Result,
-	statHistoryThreadsafe ResultHistoryThreadsafe,
+	statHistoryThreadsafe threadsafe.ResultHistory,
 	combinedStates peer.Crstates,
-	lastStats LastStatsThreadsafe,
+	lastStats threadsafe.LastStats,
 	toData todata.TOData,
-	errorCount UintThreadsafe,
-	dsStats DSStatsThreadsafe,
+	errorCount threadsafe.Uint,
+	dsStats threadsafe.DSStats,
 	lastStatEndTimes map[enum.CacheName]time.Time,
 	lastStatDurationsThreadsafe DurationMapThreadsafe,
-	unpolledCaches UnpolledCachesThreadsafe,
+	unpolledCaches threadsafe.UnpolledCaches,
 	mc to.TrafficMonitorConfigMap,
 ) {
 	statHistory := statHistoryThreadsafe.Get().Copy()
@@ -159,5 +160,5 @@ func processStatResults(
 		result.PollFinished <- result.PollID
 	}
 	lastStatDurationsThreadsafe.Set(lastStatDurations)
-	unpolledCaches.SetPolled(results, lastStats)
+	unpolledCaches.SetPolled(results, lastStats.Get())
 }

http://git-wip-us.apache.org/repos/asf/incubator-trafficcontrol/blob/005d7f35/traffic_monitor/experimental/traffic_monitor/manager/uintman.go
----------------------------------------------------------------------
diff --git a/traffic_monitor/experimental/traffic_monitor/manager/uintman.go b/traffic_monitor/experimental/traffic_monitor/manager/uintman.go
deleted file mode 100644
index 903229a..0000000
--- a/traffic_monitor/experimental/traffic_monitor/manager/uintman.go
+++ /dev/null
@@ -1,52 +0,0 @@
-package manager
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-
-import (
-	"sync/atomic"
-)
-
-// UintThreadsafe provides safe access for multiple goroutines readers and a single writer to a stored uint.
-type UintThreadsafe struct {
-	val *uint64
-}
-
-// NewUintThreadsafe returns a new single-writer-multiple-reader threadsafe uint
-func NewUintThreadsafe() UintThreadsafe {
-	v := uint64(0)
-	return UintThreadsafe{val: &v}
-}
-
-// Get gets the internal uint. This is safe for multiple readers
-func (u *UintThreadsafe) Get() uint64 {
-	return atomic.LoadUint64(u.val)
-}
-
-// Set sets the internal uint. This MUST NOT be called by multiple goroutines.
-func (u *UintThreadsafe) Set(v uint64) {
-	atomic.StoreUint64(u.val, v)
-}
-
-// Inc increments the internal uint64.
-// TODO make sure everything using this uses the value it returns, not a separate Get
-func (u *UintThreadsafe) Inc() uint64 {
-	return atomic.AddUint64(u.val, 1)
-}

http://git-wip-us.apache.org/repos/asf/incubator-trafficcontrol/blob/005d7f35/traffic_monitor/experimental/traffic_monitor/threadsafe/cacheavailablestatus.go
----------------------------------------------------------------------
diff --git a/traffic_monitor/experimental/traffic_monitor/threadsafe/cacheavailablestatus.go b/traffic_monitor/experimental/traffic_monitor/threadsafe/cacheavailablestatus.go
new file mode 100644
index 0000000..edf7b37
--- /dev/null
+++ b/traffic_monitor/experimental/traffic_monitor/threadsafe/cacheavailablestatus.go
@@ -0,0 +1,52 @@
+package threadsafe
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import (
+	"github.com/apache/incubator-trafficcontrol/traffic_monitor/experimental/traffic_monitor/cache"
+	"github.com/apache/incubator-trafficcontrol/traffic_monitor/experimental/traffic_monitor/enum"
+	"sync"
+)
+
+// CacheAvailableStatus wraps a map of cache available statuses to be safe for multiple reader goroutines and one writer.
+type CacheAvailableStatus struct {
+	caches *cache.AvailableStatuses
+	m      *sync.RWMutex
+}
+
+// NewCacheAvailableStatus creates and returns a new CacheAvailableStatus, initializing internal pointer values.
+func NewCacheAvailableStatus() CacheAvailableStatus {
+	c := cache.AvailableStatuses(map[enum.CacheName]cache.AvailableStatus{})
+	return CacheAvailableStatus{m: &sync.RWMutex{}, caches: &c}
+}
+
+// Get returns the internal map of cache statuses. The returned map MUST NOT be modified. If modification is necessary, copy.
+func (o *CacheAvailableStatus) Get() cache.AvailableStatuses {
+	o.m.RLock()
+	defer o.m.RUnlock()
+	return *o.caches
+}
+
+// Set sets the internal map of cache availability. This MUST NOT be called by multiple goroutines.
+func (o *CacheAvailableStatus) Set(v cache.AvailableStatuses) {
+	o.m.Lock()
+	*o.caches = v
+	o.m.Unlock()
+}

http://git-wip-us.apache.org/repos/asf/incubator-trafficcontrol/blob/005d7f35/traffic_monitor/experimental/traffic_monitor/threadsafe/dsstats.go
----------------------------------------------------------------------
diff --git a/traffic_monitor/experimental/traffic_monitor/threadsafe/dsstats.go b/traffic_monitor/experimental/traffic_monitor/threadsafe/dsstats.go
new file mode 100644
index 0000000..b8a7ffa
--- /dev/null
+++ b/traffic_monitor/experimental/traffic_monitor/threadsafe/dsstats.go
@@ -0,0 +1,57 @@
+package threadsafe
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import (
+	ds "github.com/apache/incubator-trafficcontrol/traffic_monitor/experimental/traffic_monitor/deliveryservice"
+	dsdata "github.com/apache/incubator-trafficcontrol/traffic_monitor/experimental/traffic_monitor/deliveryservicedata"
+	"sync"
+)
+
+// DSStats wraps a deliveryservice.Stats object to be safe for multiple reader goroutines and a single writer.
+type DSStats struct {
+	dsStats *ds.Stats
+	m       *sync.RWMutex
+}
+
+// DSStatsReader permits reading of a dsdata.Stats object, but not writing. This is designed so a Stats object can safely be passed to multiple goroutines, without worry one may unsafely write.
+type DSStatsReader interface {
+	Get() dsdata.StatsReadonly
+}
+
+// NewDSStats returns a deliveryservice.Stats object wrapped to be safe for multiple readers and a single writer.
+func NewDSStats() DSStats {
+	s := ds.NewStats()
+	return DSStats{m: &sync.RWMutex{}, dsStats: &s}
+}
+
+// Get returns a Stats object safe for reading by multiple goroutines
+func (o *DSStats) Get() dsdata.StatsReadonly {
+	o.m.RLock()
+	defer o.m.RUnlock()
+	return *o.dsStats
+}
+
+// Set sets the internal Stats object. This MUST NOT be called by multiple goroutines.
+func (o *DSStats) Set(newDsStats ds.Stats) {
+	o.m.Lock()
+	*o.dsStats = newDsStats
+	o.m.Unlock()
+}

http://git-wip-us.apache.org/repos/asf/incubator-trafficcontrol/blob/005d7f35/traffic_monitor/experimental/traffic_monitor/threadsafe/events.go
----------------------------------------------------------------------
diff --git a/traffic_monitor/experimental/traffic_monitor/threadsafe/events.go b/traffic_monitor/experimental/traffic_monitor/threadsafe/events.go
new file mode 100644
index 0000000..6f92481
--- /dev/null
+++ b/traffic_monitor/experimental/traffic_monitor/threadsafe/events.go
@@ -0,0 +1,67 @@
+package threadsafe
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import (
+	"sync"
+
+	"github.com/apache/incubator-trafficcontrol/traffic_monitor/experimental/traffic_monitor/cache"
+)
+
+// Events provides safe access for multiple goroutines readers and a single writer to a stored Events slice.
+type Events struct {
+	events    *[]cache.Event
+	m         *sync.RWMutex
+	nextIndex *uint64
+	max       uint64
+}
+
+func copyEvents(a []cache.Event) []cache.Event {
+	b := make([]cache.Event, len(a), len(a))
+	copy(b, a)
+	return b
+}
+
+// NewEvents creates a new single-writer-multiple-reader Threadsafe object
+func NewEvents(maxEvents uint64) Events {
+	i := uint64(0)
+	return Events{m: &sync.RWMutex{}, events: &[]cache.Event{}, nextIndex: &i, max: maxEvents}
+}
+
+// Get returns the internal slice of Events for reading. This MUST NOT be modified. If modification is necessary, copy the slice.
+func (o *Events) Get() []cache.Event {
+	o.m.RLock()
+	defer o.m.RUnlock()
+	return *o.events
+}
+
+// Add adds the given event. This is threadsafe for one writer, multiple readers. This MUST NOT be called by multiple threads, as it non-atomically fetches and adds.
+func (o *Events) Add(e cache.Event) {
+	events := copyEvents(*o.events)
+	e.Index = *o.nextIndex
+	events = append([]cache.Event{e}, events...)
+	if len(events) > int(o.max) {
+		events = (events)[:o.max-1]
+	}
+	o.m.Lock()
+	*o.events = events
+	*o.nextIndex++
+	o.m.Unlock()
+}

http://git-wip-us.apache.org/repos/asf/incubator-trafficcontrol/blob/005d7f35/traffic_monitor/experimental/traffic_monitor/threadsafe/lastkbpsstats.go
----------------------------------------------------------------------
diff --git a/traffic_monitor/experimental/traffic_monitor/threadsafe/lastkbpsstats.go b/traffic_monitor/experimental/traffic_monitor/threadsafe/lastkbpsstats.go
new file mode 100644
index 0000000..42c080d
--- /dev/null
+++ b/traffic_monitor/experimental/traffic_monitor/threadsafe/lastkbpsstats.go
@@ -0,0 +1,51 @@
+package threadsafe
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import (
+	ds "github.com/apache/incubator-trafficcontrol/traffic_monitor/experimental/traffic_monitor/deliveryservice"
+	"sync"
+)
+
+// LastStats wraps a deliveryservice.LastStats object to be safe for multiple readers and one writer.
+type LastStats struct {
+	stats *ds.LastStats
+	m     *sync.RWMutex
+}
+
+// NewLastStats returns a wrapped a deliveryservice.LastStats object safe for multiple readers and one writer.
+func NewLastStats() LastStats {
+	s := ds.NewLastStats()
+	return LastStats{m: &sync.RWMutex{}, stats: &s}
+}
+
+// Get returns the last KBPS stats object. Callers MUST NOT modify the object. It is not threadsafe for writing. If the object must be modified, callers must call LastStats.Copy() and modify the copy.
+func (o *LastStats) Get() ds.LastStats {
+	o.m.RLock()
+	defer o.m.RUnlock()
+	return *o.stats
+}
+
+// Set sets the internal LastStats object. This MUST NOT be called by multiple goroutines.
+func (o *LastStats) Set(s ds.LastStats) {
+	o.m.Lock()
+	*o.stats = s
+	o.m.Unlock()
+}

http://git-wip-us.apache.org/repos/asf/incubator-trafficcontrol/blob/005d7f35/traffic_monitor/experimental/traffic_monitor/threadsafe/polledcaches.go
----------------------------------------------------------------------
diff --git a/traffic_monitor/experimental/traffic_monitor/threadsafe/polledcaches.go b/traffic_monitor/experimental/traffic_monitor/threadsafe/polledcaches.go
new file mode 100644
index 0000000..978021f
--- /dev/null
+++ b/traffic_monitor/experimental/traffic_monitor/threadsafe/polledcaches.go
@@ -0,0 +1,147 @@
+package threadsafe
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import (
+	"sync"
+
+	"github.com/apache/incubator-trafficcontrol/traffic_monitor/experimental/common/log"
+	"github.com/apache/incubator-trafficcontrol/traffic_monitor/experimental/traffic_monitor/cache"
+	ds "github.com/apache/incubator-trafficcontrol/traffic_monitor/experimental/traffic_monitor/deliveryservice"
+	"github.com/apache/incubator-trafficcontrol/traffic_monitor/experimental/traffic_monitor/enum"
+)
+
+// UnpolledCaches is a structure containing a map of caches which have yet to be polled, which is threadsafe for multiple readers and one writer.
+// This could be made lock-free, if the performance was necessary
+type UnpolledCaches struct {
+	unpolledCaches *map[enum.CacheName]struct{}
+	allCaches      *map[enum.CacheName]struct{}
+	initialized    *bool
+	m              *sync.RWMutex
+}
+
+// NewUnpolledCaches returns a new UnpolledCaches object.
+func NewUnpolledCaches() UnpolledCaches {
+	b := false
+	return UnpolledCaches{
+		m:              &sync.RWMutex{},
+		unpolledCaches: &map[enum.CacheName]struct{}{},
+		allCaches:      &map[enum.CacheName]struct{}{},
+		initialized:    &b,
+	}
+}
+
+// UnpolledCaches returns a map of caches not yet polled. Callers MUST NOT modify. If mutation is necessary, copy the map
+func (t *UnpolledCaches) UnpolledCaches() map[enum.CacheName]struct{} {
+	t.m.RLock()
+	defer t.m.RUnlock()
+	return *t.unpolledCaches
+}
+
+// setUnpolledCaches sets the internal unpolled caches map. This is only safe for one thread of execution. This MUST NOT be called from multiple threads.
+func (t *UnpolledCaches) setUnpolledCaches(v map[enum.CacheName]struct{}) {
+	t.m.Lock()
+	*t.initialized = true
+	*t.unpolledCaches = v
+	t.m.Unlock()
+}
+
+// SetNewCaches takes a list of new caches, which may overlap with the existing caches, diffs them, removes any `unpolledCaches` which aren't in the new list, and sets the list of `polledCaches` (which is only used by this func) to the `newCaches`. This is threadsafe with one writer, along with `setUnpolledCaches`.
+func (t *UnpolledCaches) SetNewCaches(newCaches map[enum.CacheName]struct{}) {
+	unpolledCaches := copyCaches(t.UnpolledCaches())
+	allCaches := copyCaches(*t.allCaches) // not necessary to lock `allCaches`, as the single-writer is the only thing that accesses it.
+	for cache := range unpolledCaches {
+		if _, ok := newCaches[cache]; !ok {
+			delete(unpolledCaches, cache)
+		}
+	}
+	for cache := range allCaches {
+		if _, ok := newCaches[cache]; !ok {
+			delete(allCaches, cache)
+		}
+	}
+	for cache := range newCaches {
+		if _, ok := allCaches[cache]; !ok {
+			unpolledCaches[cache] = struct{}{}
+			allCaches[cache] = struct{}{}
+		}
+	}
+	*t.allCaches = allCaches
+	t.setUnpolledCaches(unpolledCaches)
+}
+
+// Any returns whether there are any caches marked as not polled. Also returns true if SetNewCaches() has never been called (assuming there exist caches, if this hasn't been initialized, we couldn't have polled any of them).
+func (t *UnpolledCaches) Any() bool {
+	t.m.Lock()
+	defer t.m.Unlock()
+	return !(*t.initialized) || len(*t.unpolledCaches) > 0
+}
+
+// copyCaches performs a deep copy of the given map.
+func copyCaches(a map[enum.CacheName]struct{}) map[enum.CacheName]struct{} {
+	b := map[enum.CacheName]struct{}{}
+	for k := range a {
+		b[k] = struct{}{}
+	}
+	return b
+}
+
+// SetPolled sets cache which have been polled. This is used to determine when the app has fully started up, and we can start serving. Serving Traffic Router with caches as 'down' which simply haven't been polled yet would be bad. Therefore, a cache is set as 'polled' if it has received different bandwidths from two different ATS ticks, OR if the cache is marked as down (and thus we won't get a bandwidth).
+// This is threadsafe for one writer, along with `Set`.
+// This is fast if there are no unpolled caches. Moreover, its speed is a function of the number of unpolled caches, not the number of caches total.
+func (t *UnpolledCaches) SetPolled(results []cache.Result, lastStats ds.LastStats) {
+	unpolledCaches := copyCaches(t.UnpolledCaches())
+	numUnpolledCaches := len(unpolledCaches)
+	if numUnpolledCaches == 0 {
+		return
+	}
+	for cache := range unpolledCaches {
+	innerLoop:
+		for _, result := range results {
+			if result.ID != cache {
+				continue
+			}
+
+			if !result.Available || result.Error != nil {
+				log.Infof("polled %v\n", cache)
+				delete(unpolledCaches, cache)
+				break innerLoop
+			}
+		}
+		lastStat, ok := lastStats.Caches[cache]
+		if !ok {
+			continue
+		}
+		if lastStat.Bytes.PerSec != 0 {
+			log.Infof("polled %v\n", cache)
+			delete(unpolledCaches, cache)
+		}
+	}
+
+	if len(unpolledCaches) == numUnpolledCaches {
+		return
+	}
+	t.setUnpolledCaches(unpolledCaches)
+	if len(unpolledCaches) != 0 {
+		log.Infof("remaining unpolled %v\n", unpolledCaches)
+	} else {
+		log.Infof("all caches polled, ready to serve!\n")
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-trafficcontrol/blob/005d7f35/traffic_monitor/experimental/traffic_monitor/threadsafe/resulthistory.go
----------------------------------------------------------------------
diff --git a/traffic_monitor/experimental/traffic_monitor/threadsafe/resulthistory.go b/traffic_monitor/experimental/traffic_monitor/threadsafe/resulthistory.go
new file mode 100644
index 0000000..f3f82e0
--- /dev/null
+++ b/traffic_monitor/experimental/traffic_monitor/threadsafe/resulthistory.go
@@ -0,0 +1,54 @@
+package threadsafe
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import (
+	"sync"
+
+	"github.com/apache/incubator-trafficcontrol/traffic_monitor/experimental/traffic_monitor/cache"
+)
+
+// ResultHistory provides safe access for multiple goroutines readers and a single writer to a stored ResultHistory object.
+// This could be made lock-free, if the performance was necessary
+// TODO add separate locks for Caches and Deliveryservice maps?
+type ResultHistory struct {
+	resultHistory *cache.ResultHistory
+	m             *sync.RWMutex
+}
+
+// NewResultHistory returns a new ResultHistory safe for multiple readers and a single writer.
+func NewResultHistory() ResultHistory {
+	h := cache.ResultHistory{}
+	return ResultHistory{m: &sync.RWMutex{}, resultHistory: &h}
+}
+
+// Get returns the ResultHistory. Callers MUST NOT modify. If mutation is necessary, call ResultHistory.Copy()
+func (h *ResultHistory) Get() cache.ResultHistory {
+	h.m.RLock()
+	defer h.m.RUnlock()
+	return *h.resultHistory
+}
+
+// Set sets the internal ResultHistory. This is only safe for one thread of execution. This MUST NOT be called from multiple threads.
+func (h *ResultHistory) Set(v cache.ResultHistory) {
+	h.m.Lock()
+	*h.resultHistory = v
+	h.m.Unlock()
+}

http://git-wip-us.apache.org/repos/asf/incubator-trafficcontrol/blob/005d7f35/traffic_monitor/experimental/traffic_monitor/threadsafe/uint.go
----------------------------------------------------------------------
diff --git a/traffic_monitor/experimental/traffic_monitor/threadsafe/uint.go b/traffic_monitor/experimental/traffic_monitor/threadsafe/uint.go
new file mode 100644
index 0000000..fcaa3a4
--- /dev/null
+++ b/traffic_monitor/experimental/traffic_monitor/threadsafe/uint.go
@@ -0,0 +1,51 @@
+package threadsafe
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import (
+	"sync/atomic"
+)
+
+// Uint provides safe access for multiple goroutines readers and a single writer to a stored uint.
+type Uint struct {
+	val *uint64
+}
+
+// NewUint returns a new single-writer-multiple-reader threadsafe uint
+func NewUint() Uint {
+	v := uint64(0)
+	return Uint{val: &v}
+}
+
+// Get gets the internal uint. This is safe for multiple readers
+func (u *Uint) Get() uint64 {
+	return atomic.LoadUint64(u.val)
+}
+
+// Set sets the internal uint. This MUST NOT be called by multiple goroutines.
+func (u *Uint) Set(v uint64) {
+	atomic.StoreUint64(u.val, v)
+}
+
+// Inc increments the internal uint64.
+// TODO make sure everything using this uses the value it returns, not a separate Get
+func (u *Uint) Inc() uint64 {
+	return atomic.AddUint64(u.val, 1)
+}


[16/20] incubator-trafficcontrol git commit: Fix TM2 data race

Posted by ne...@apache.org.
Fix TM2 data race


Project: http://git-wip-us.apache.org/repos/asf/incubator-trafficcontrol/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-trafficcontrol/commit/e41a745a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-trafficcontrol/tree/e41a745a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-trafficcontrol/diff/e41a745a

Branch: refs/heads/master
Commit: e41a745a31ecc77c117b3bcf29eb2654f960ecff
Parents: 2f23f29
Author: Robert Butts <ro...@gmail.com>
Authored: Mon Nov 28 11:37:57 2016 -0700
Committer: Dave Neuman <ne...@apache.org>
Committed: Thu Dec 8 11:44:33 2016 -0700

----------------------------------------------------------------------
 .../traffic_monitor/manager/healthresult.go     |  3 +--
 .../traffic_monitor/manager/peer.go             | 24 +++++++++----------
 .../traffic_monitor/peer/crstates.go            | 25 ++++----------------
 3 files changed, 17 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-trafficcontrol/blob/e41a745a/traffic_monitor/experimental/traffic_monitor/manager/healthresult.go
----------------------------------------------------------------------
diff --git a/traffic_monitor/experimental/traffic_monitor/manager/healthresult.go b/traffic_monitor/experimental/traffic_monitor/manager/healthresult.go
index cba6809..67f844a 100644
--- a/traffic_monitor/experimental/traffic_monitor/manager/healthresult.go
+++ b/traffic_monitor/experimental/traffic_monitor/manager/healthresult.go
@@ -273,7 +273,6 @@ func calculateDeliveryServiceState(deliveryServiceServers map[enum.DeliveryServi
 				deliveryServiceState.DisabledLocations = append(deliveryServiceState.DisabledLocations, server)
 			}
 		}
-		deliveryServices[deliveryServiceName] = deliveryServiceState
+		states.SetDeliveryService(deliveryServiceName, deliveryServiceState)
 	}
-	states.SetDeliveryServices(deliveryServices)
 }

http://git-wip-us.apache.org/repos/asf/incubator-trafficcontrol/blob/e41a745a/traffic_monitor/experimental/traffic_monitor/manager/peer.go
----------------------------------------------------------------------
diff --git a/traffic_monitor/experimental/traffic_monitor/manager/peer.go b/traffic_monitor/experimental/traffic_monitor/manager/peer.go
index 7232919..5a94e70 100644
--- a/traffic_monitor/experimental/traffic_monitor/manager/peer.go
+++ b/traffic_monitor/experimental/traffic_monitor/manager/peer.go
@@ -8,9 +8,9 @@ package manager
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -19,7 +19,6 @@ package manager
  * under the License.
  */
 
-
 import (
 	"sort"
 
@@ -38,7 +37,7 @@ func StartPeerManager(
 	go func() {
 		for crStatesResult := range peerChan {
 			peerStates.Set(crStatesResult.ID, crStatesResult.PeerStats)
-			combinedStates.Set(combineCrStates(peerStates.Get(), localStates.Get()))
+			combineCrStates(peerStates.Get(), localStates.Get(), combinedStates)
 			crStatesResult.PollFinished <- crStatesResult.PollID
 		}
 	}()
@@ -46,20 +45,20 @@ func StartPeerManager(
 }
 
 // TODO JvD: add deliveryservice stuff
-func combineCrStates(peerStates map[enum.TrafficMonitorName]peer.Crstates, localStates peer.Crstates) peer.Crstates {
-	combinedStates := peer.NewCrstates()
+func combineCrStates(peerStates map[enum.TrafficMonitorName]peer.Crstates, localStates peer.Crstates, combinedStates peer.CRStatesThreadsafe) {
 	for cacheName, localCacheState := range localStates.Caches { // localStates gets pruned when servers are disabled, it's the source of truth
 		downVotes := 0 // TODO JvD: change to use parameter when deciding to be optimistic or pessimistic.
+		available := false
 		if localCacheState.IsAvailable {
 			// log.Infof(cacheName, " is available locally - setting to IsAvailable: true")
-			combinedStates.Caches[cacheName] = peer.IsAvailable{IsAvailable: true} // we don't care about the peers, we got a "good one", and we're optimistic
+			available = true // we don't care about the peers, we got a "good one", and we're optimistic
 		} else {
 			downVotes++ // localStates says it's not happy
 			for _, peerCrStates := range peerStates {
 				if peerCrStates.Caches[cacheName].IsAvailable {
 					// log.Infoln(cacheName, "- locally we think it's down, but", peerName, "says IsAvailable: ", peerCrStates.Caches[cacheName].IsAvailable, "trusting the peer.")
-					combinedStates.Caches[cacheName] = peer.IsAvailable{IsAvailable: true} // we don't care about the peers, we got a "good one", and we're optimistic
-					break                                                                  // one peer that thinks we're good is all we need.
+					available = true // we don't care about the peers, we got a "good one", and we're optimistic
+					break            // one peer that thinks we're good is all we need.
 				} else {
 					// log.Infoln(cacheName, "- locally we think it's down, and", peerName, "says IsAvailable: ", peerCrStates.Caches[cacheName].IsAvailable, "down voting")
 					downVotes++ // peerStates for this peer doesn't like it
@@ -68,8 +67,9 @@ func combineCrStates(peerStates map[enum.TrafficMonitorName]peer.Crstates, local
 		}
 		if downVotes > len(peerStates) {
 			// log.Infoln(cacheName, "-", downVotes, "down votes, setting to IsAvailable: false")
-			combinedStates.Caches[cacheName] = peer.IsAvailable{IsAvailable: false}
+			available = false
 		}
+		combinedStates.SetCache(cacheName, peer.IsAvailable{IsAvailable: available})
 	}
 
 	for deliveryServiceName, localDeliveryService := range localStates.Deliveryservice {
@@ -90,10 +90,8 @@ func combineCrStates(peerStates map[enum.TrafficMonitorName]peer.Crstates, local
 			}
 			deliveryService.DisabledLocations = intersection(deliveryService.DisabledLocations, peerDeliveryService.DisabledLocations)
 		}
-		combinedStates.Deliveryservice[deliveryServiceName] = deliveryService
+		combinedStates.SetDeliveryService(deliveryServiceName, deliveryService)
 	}
-
-	return combinedStates
 }
 
 // CacheNameSlice is a slice of cache names, which fulfills the `sort.Interface` interface.

http://git-wip-us.apache.org/repos/asf/incubator-trafficcontrol/blob/e41a745a/traffic_monitor/experimental/traffic_monitor/peer/crstates.go
----------------------------------------------------------------------
diff --git a/traffic_monitor/experimental/traffic_monitor/peer/crstates.go b/traffic_monitor/experimental/traffic_monitor/peer/crstates.go
index 3dc3667..65b7eb3 100644
--- a/traffic_monitor/experimental/traffic_monitor/peer/crstates.go
+++ b/traffic_monitor/experimental/traffic_monitor/peer/crstates.go
@@ -8,9 +8,9 @@ package peer
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -19,7 +19,6 @@ package peer
  * under the License.
  */
 
-
 import (
 	"encoding/json"
 	"sync"
@@ -146,41 +145,27 @@ func (t *CRStatesThreadsafe) GetDeliveryService(name enum.DeliveryServiceName) D
 	return t.crStates.Deliveryservice[name]
 }
 
-// Set sets the internal Crstates data. This MUST NOT be called by multiple goroutines.
-func (t *CRStatesThreadsafe) Set(newCRStates Crstates) {
-	t.m.Lock()
-	*t.crStates = newCRStates
-	t.m.Unlock()
-}
-
-// SetCache sets the internal availability data for a particular cache. This MUST NOT be called by multiple goroutines.
+// SetCache sets the internal availability data for a particular cache.
 func (t *CRStatesThreadsafe) SetCache(cacheName enum.CacheName, available IsAvailable) {
 	t.m.Lock()
 	t.crStates.Caches[cacheName] = available
 	t.m.Unlock()
 }
 
-// DeleteCache deletes the given cache from the internal data. This MUST NOT be called by multiple goroutines.
+// DeleteCache deletes the given cache from the internal data.
 func (t *CRStatesThreadsafe) DeleteCache(name enum.CacheName) {
 	t.m.Lock()
 	delete(t.crStates.Caches, name)
 	t.m.Unlock()
 }
 
-// SetDeliveryService sets the availability data for the given delivery service. This MUST NOT be called by multiple goroutines.
+// SetDeliveryService sets the availability data for the given delivery service.
 func (t *CRStatesThreadsafe) SetDeliveryService(name enum.DeliveryServiceName, ds Deliveryservice) {
 	t.m.Lock()
 	t.crStates.Deliveryservice[name] = ds
 	t.m.Unlock()
 }
 
-// SetDeliveryServices sets the availability data for all delivery service. This MUST NOT be called by multiple goroutines.
-func (t *CRStatesThreadsafe) SetDeliveryServices(deliveryServices map[enum.DeliveryServiceName]Deliveryservice) {
-	t.m.Lock()
-	t.crStates.Deliveryservice = deliveryServices
-	t.m.Unlock()
-}
-
 // DeleteDeliveryService deletes the given delivery service from the internal data. This MUST NOT be called by multiple goroutines.
 func (t *CRStatesThreadsafe) DeleteDeliveryService(name enum.DeliveryServiceName) {
 	t.m.Lock()


[05/20] incubator-trafficcontrol git commit: Fix statHistory to be newest-first

Posted by ne...@apache.org.
Fix statHistory to be newest-first

Fixes statHistory to be newest-first, and fixes readers to expect the
same. It was oldest-first, with inconsistent reader expectations.

Fixes total bandwidth to include mids, which appears to match TM 1.0.

Fixes a result error check to check the real Error, instead of
PrecomputedData's Errors, and fixes the Result PrecomptuedData
member to not be anonymous, requiring explicit access, to prevent
this kind of accident again.

Fixes srvhttp to include the proper content type header for files.


Project: http://git-wip-us.apache.org/repos/asf/incubator-trafficcontrol/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-trafficcontrol/commit/12f48d32
Tree: http://git-wip-us.apache.org/repos/asf/incubator-trafficcontrol/tree/12f48d32
Diff: http://git-wip-us.apache.org/repos/asf/incubator-trafficcontrol/diff/12f48d32

Branch: refs/heads/master
Commit: 12f48d327e41bc4273be6c392c6d2399d545d55e
Parents: f25489d
Author: Robert Butts <ro...@gmail.com>
Authored: Mon Nov 21 10:41:30 2016 -0700
Committer: Dave Neuman <ne...@apache.org>
Committed: Thu Dec 8 11:44:33 2016 -0700

----------------------------------------------------------------------
 .../experimental/traffic_monitor/cache/cache.go | 25 ++++++++++----------
 .../traffic_monitor/deliveryservice/stat.go     |  7 +++---
 .../traffic_monitor/manager/datarequest.go      | 17 +++++++------
 .../traffic_monitor/manager/polledcaches.go     |  8 +++----
 .../traffic_monitor/manager/stathistory.go      |  9 ++++---
 .../traffic_monitor/srvhttp/srvhttp.go          | 11 +++++----
 6 files changed, 37 insertions(+), 40 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-trafficcontrol/blob/12f48d32/traffic_monitor/experimental/traffic_monitor/cache/cache.go
----------------------------------------------------------------------
diff --git a/traffic_monitor/experimental/traffic_monitor/cache/cache.go b/traffic_monitor/experimental/traffic_monitor/cache/cache.go
index bcc3449..8d973d5 100644
--- a/traffic_monitor/experimental/traffic_monitor/cache/cache.go
+++ b/traffic_monitor/experimental/traffic_monitor/cache/cache.go
@@ -8,9 +8,9 @@ package cache
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -19,7 +19,6 @@ package cache
  * under the License.
  */
 
-
 import (
 	"encoding/json"
 	"fmt"
@@ -73,16 +72,16 @@ type PrecomputedData struct {
 
 // Result is the data result returned by a cache.
 type Result struct {
-	ID           enum.CacheName
-	Error        error
-	Astats       Astats
-	Time         time.Time
-	RequestTime  time.Duration
-	Vitals       Vitals
-	PollID       uint64
-	PollFinished chan<- uint64
-	PrecomputedData
-	Available bool
+	ID              enum.CacheName
+	Error           error
+	Astats          Astats
+	Time            time.Time
+	RequestTime     time.Duration
+	Vitals          Vitals
+	PollID          uint64
+	PollFinished    chan<- uint64
+	PrecomputedData PrecomputedData
+	Available       bool
 }
 
 // Vitals is the vitals data returned from a cache.

http://git-wip-us.apache.org/repos/asf/incubator-trafficcontrol/blob/12f48d32/traffic_monitor/experimental/traffic_monitor/deliveryservice/stat.go
----------------------------------------------------------------------
diff --git a/traffic_monitor/experimental/traffic_monitor/deliveryservice/stat.go b/traffic_monitor/experimental/traffic_monitor/deliveryservice/stat.go
index ebd883f..d47350e 100644
--- a/traffic_monitor/experimental/traffic_monitor/deliveryservice/stat.go
+++ b/traffic_monitor/experimental/traffic_monitor/deliveryservice/stat.go
@@ -8,9 +8,9 @@ package deliveryservice
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -19,7 +19,6 @@ package deliveryservice
  * under the License.
  */
 
-
 import (
 	"fmt"
 	"github.com/apache/incubator-trafficcontrol/traffic_monitor/experimental/common/log"
@@ -437,7 +436,7 @@ func CreateStats(statHistory map[enum.CacheName][]cache.Result, toData todata.TO
 			log.Warnf("server %s not in CRConfig, skipping\n", server)
 			continue
 		}
-		result := history[len(history)-1]
+		result := history[0]
 
 		// TODO check result.PrecomputedData.Errors
 		for ds, resultStat := range result.PrecomputedData.DeliveryServiceStats {

http://git-wip-us.apache.org/repos/asf/incubator-trafficcontrol/blob/12f48d32/traffic_monitor/experimental/traffic_monitor/manager/datarequest.go
----------------------------------------------------------------------
diff --git a/traffic_monitor/experimental/traffic_monitor/manager/datarequest.go b/traffic_monitor/experimental/traffic_monitor/manager/datarequest.go
index 750264a..f4e993f 100644
--- a/traffic_monitor/experimental/traffic_monitor/manager/datarequest.go
+++ b/traffic_monitor/experimental/traffic_monitor/manager/datarequest.go
@@ -8,9 +8,9 @@ package manager
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -19,7 +19,6 @@ package manager
  * under the License.
  */
 
-
 import (
 	"encoding/json"
 	"fmt"
@@ -611,13 +610,13 @@ func srvAPICacheStates(toData todata.TODataThreadsafe, statHistory StatHistoryTh
 }
 
 func srvAPIBandwidthKbps(toData todata.TODataThreadsafe, lastStats LastStatsThreadsafe) []byte {
-	serverTypes := toData.Get().ServerTypes
+	// serverTypes := toData.Get().ServerTypes
 	kbpsStats := lastStats.Get()
 	sum := float64(0.0)
-	for cache, data := range kbpsStats.Caches {
-		if serverTypes[cache] != enum.CacheTypeEdge {
-			continue
-		}
+	for _, data := range kbpsStats.Caches {
+		// if serverTypes[cache] != enum.CacheTypeEdge {
+		// 	continue
+		// }
 		sum += data.Bytes.PerSec / ds.BytesPerKilobit
 	}
 	return []byte(fmt.Sprintf("%f", sum))
@@ -629,7 +628,7 @@ func srvAPIBandwidthCapacityKbps(statHistoryThs StatHistoryThreadsafe) []byte {
 		if len(results) == 0 {
 			continue
 		}
-		cap += results[0].MaxKbps
+		cap += results[0].PrecomputedData.MaxKbps
 	}
 	return []byte(fmt.Sprintf("%d", cap))
 }

http://git-wip-us.apache.org/repos/asf/incubator-trafficcontrol/blob/12f48d32/traffic_monitor/experimental/traffic_monitor/manager/polledcaches.go
----------------------------------------------------------------------
diff --git a/traffic_monitor/experimental/traffic_monitor/manager/polledcaches.go b/traffic_monitor/experimental/traffic_monitor/manager/polledcaches.go
index 3f353c4..6613a04 100644
--- a/traffic_monitor/experimental/traffic_monitor/manager/polledcaches.go
+++ b/traffic_monitor/experimental/traffic_monitor/manager/polledcaches.go
@@ -8,9 +8,9 @@ package manager
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -19,7 +19,6 @@ package manager
  * under the License.
  */
 
-
 import (
 	"github.com/apache/incubator-trafficcontrol/traffic_monitor/experimental/common/log"
 	"github.com/apache/incubator-trafficcontrol/traffic_monitor/experimental/traffic_monitor/cache"
@@ -118,7 +117,8 @@ func (t *UnpolledCachesThreadsafe) SetPolled(results []cache.Result, lastStatsTh
 			if result.ID != cache {
 				continue
 			}
-			if !result.Available || len(result.Errors) > 0 {
+
+			if !result.Available || result.Error != nil {
 				log.Infof("polled %v\n", cache)
 				delete(unpolledCaches, cache)
 				break innerLoop

http://git-wip-us.apache.org/repos/asf/incubator-trafficcontrol/blob/12f48d32/traffic_monitor/experimental/traffic_monitor/manager/stathistory.go
----------------------------------------------------------------------
diff --git a/traffic_monitor/experimental/traffic_monitor/manager/stathistory.go b/traffic_monitor/experimental/traffic_monitor/manager/stathistory.go
index c938974..57c942b 100644
--- a/traffic_monitor/experimental/traffic_monitor/manager/stathistory.go
+++ b/traffic_monitor/experimental/traffic_monitor/manager/stathistory.go
@@ -8,9 +8,9 @@ package manager
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -19,7 +19,6 @@ package manager
  * under the License.
  */
 
-
 import (
 	"sync"
 	"time"
@@ -82,7 +81,7 @@ func (h *StatHistoryThreadsafe) Set(v StatHistory) {
 
 func pruneHistory(history []cache.Result, limit uint64) []cache.Result {
 	if uint64(len(history)) > limit {
-		history = history[1:]
+		history = history[:limit-1]
 	}
 	return history
 }
@@ -172,7 +171,7 @@ func processStatResults(
 	for _, result := range results {
 		maxStats := uint64(mc.Profile[mc.TrafficServer[string(result.ID)].Profile].Parameters.HistoryCount)
 		// TODO determine if we want to add results with errors, or just print the errors now and don't add them.
-		statHistory[result.ID] = pruneHistory(append(statHistory[result.ID], result), maxStats)
+		statHistory[result.ID] = pruneHistory(append([]cache.Result{result}, statHistory[result.ID]...), maxStats)
 	}
 	statHistoryThreadsafe.Set(statHistory)
 

http://git-wip-us.apache.org/repos/asf/incubator-trafficcontrol/blob/12f48d32/traffic_monitor/experimental/traffic_monitor/srvhttp/srvhttp.go
----------------------------------------------------------------------
diff --git a/traffic_monitor/experimental/traffic_monitor/srvhttp/srvhttp.go b/traffic_monitor/experimental/traffic_monitor/srvhttp/srvhttp.go
index 84fdf78..0b9e261 100644
--- a/traffic_monitor/experimental/traffic_monitor/srvhttp/srvhttp.go
+++ b/traffic_monitor/experimental/traffic_monitor/srvhttp/srvhttp.go
@@ -8,9 +8,9 @@ package srvhttp
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -19,7 +19,6 @@ package srvhttp
  * under the License.
  */
 
-
 import (
 	"fmt"
 	"github.com/apache/incubator-trafficcontrol/traffic_monitor/experimental/common/log"
@@ -149,11 +148,13 @@ func (s Server) handleSortableFunc() (http.HandlerFunc, error) {
 }
 
 func (s Server) handleFile(name string) (http.HandlerFunc, error) {
-	index, err := ioutil.ReadFile(name)
+	bytes, err := ioutil.ReadFile(name)
 	if err != nil {
 		return nil, err
 	}
+	contentType := http.DetectContentType(bytes)
 	return func(w http.ResponseWriter, req *http.Request) {
-		fmt.Fprintf(w, "%s", index)
+		w.Header().Set("Content-Type", contentType)
+		fmt.Fprintf(w, "%s", bytes)
 	}, nil
 }


[03/20] incubator-trafficcontrol git commit: Add TM2 Apache license headers

Posted by ne...@apache.org.
Add TM2 Apache license headers


Project: http://git-wip-us.apache.org/repos/asf/incubator-trafficcontrol/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-trafficcontrol/commit/eaf0641f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-trafficcontrol/tree/eaf0641f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-trafficcontrol/diff/eaf0641f

Branch: refs/heads/master
Commit: eaf0641ff0c6e3962b71f24580f3f5a3fb22a969
Parents: afa1a4c
Author: Robert Butts <ro...@gmail.com>
Authored: Thu Dec 8 09:46:00 2016 -0700
Committer: Dave Neuman <ne...@apache.org>
Committed: Thu Dec 8 11:44:33 2016 -0700

----------------------------------------------------------------------
 .../experimental/common/poller/heap.go           | 19 +++++++++++++++++++
 .../experimental/common/poller/threadsleep.go    | 19 +++++++++++++++++++
 .../common/poller/threadsleep_linux.go           | 19 +++++++++++++++++++
 3 files changed, 57 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-trafficcontrol/blob/eaf0641f/traffic_monitor/experimental/common/poller/heap.go
----------------------------------------------------------------------
diff --git a/traffic_monitor/experimental/common/poller/heap.go b/traffic_monitor/experimental/common/poller/heap.go
index c66a7c3..2933b0e 100644
--- a/traffic_monitor/experimental/common/poller/heap.go
+++ b/traffic_monitor/experimental/common/poller/heap.go
@@ -1,5 +1,24 @@
 package poller
 
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
 import (
 	"fmt"
 	"sync"

http://git-wip-us.apache.org/repos/asf/incubator-trafficcontrol/blob/eaf0641f/traffic_monitor/experimental/common/poller/threadsleep.go
----------------------------------------------------------------------
diff --git a/traffic_monitor/experimental/common/poller/threadsleep.go b/traffic_monitor/experimental/common/poller/threadsleep.go
index b8b1233..d4a67cc 100644
--- a/traffic_monitor/experimental/common/poller/threadsleep.go
+++ b/traffic_monitor/experimental/common/poller/threadsleep.go
@@ -1,5 +1,24 @@
 // +build !linux
 
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
 package poller
 
 import (

http://git-wip-us.apache.org/repos/asf/incubator-trafficcontrol/blob/eaf0641f/traffic_monitor/experimental/common/poller/threadsleep_linux.go
----------------------------------------------------------------------
diff --git a/traffic_monitor/experimental/common/poller/threadsleep_linux.go b/traffic_monitor/experimental/common/poller/threadsleep_linux.go
index 9037027..ba7c379 100644
--- a/traffic_monitor/experimental/common/poller/threadsleep_linux.go
+++ b/traffic_monitor/experimental/common/poller/threadsleep_linux.go
@@ -1,5 +1,24 @@
 // +build linux
 
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
 package poller
 
 import (


[06/20] incubator-trafficcontrol git commit: Change TM2 StatHistoryThreadsafe to be generic

Posted by ne...@apache.org.
Change TM2 StatHistoryThreadsafe to be generic


Project: http://git-wip-us.apache.org/repos/asf/incubator-trafficcontrol/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-trafficcontrol/commit/0a350bc7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-trafficcontrol/tree/0a350bc7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-trafficcontrol/diff/0a350bc7

Branch: refs/heads/master
Commit: 0a350bc7e429370f36314cfc1c6f7af339bdd53d
Parents: 12f48d3
Author: Robert Butts <ro...@gmail.com>
Authored: Mon Nov 21 11:10:30 2016 -0700
Committer: Dave Neuman <ne...@apache.org>
Committed: Thu Dec 8 11:44:33 2016 -0700

----------------------------------------------------------------------
 .../traffic_monitor/manager/datarequest.go      | 31 ++++++---
 .../traffic_monitor/manager/healthresult.go     | 11 ++-
 .../traffic_monitor/manager/opsconfig.go        |  7 +-
 .../traffic_monitor/manager/resulthistory.go    | 72 ++++++++++++++++++++
 .../traffic_monitor/manager/stathistory.go      | 53 +-------------
 5 files changed, 103 insertions(+), 71 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-trafficcontrol/blob/0a350bc7/traffic_monitor/experimental/traffic_monitor/manager/datarequest.go
----------------------------------------------------------------------
diff --git a/traffic_monitor/experimental/traffic_monitor/manager/datarequest.go b/traffic_monitor/experimental/traffic_monitor/manager/datarequest.go
index f4e993f..2530240 100644
--- a/traffic_monitor/experimental/traffic_monitor/manager/datarequest.go
+++ b/traffic_monitor/experimental/traffic_monitor/manager/datarequest.go
@@ -61,12 +61,14 @@ type APIPeerStates struct {
 // CacheStatus contains summary stat data about the given cache.
 // TODO make fields nullable, so error fields can be omitted, letting API callers still get updates for unerrored fields
 type CacheStatus struct {
-	Type                  *string  `json:"type,omitempty"`
-	Status                *string  `json:"status,omitempty"`
-	LoadAverage           *float64 `json:"load_average,omitempty"`
-	QueryTimeMilliseconds *int64   `json:"query_time_ms,omitempty"`
-	BandwidthKbps         *float64 `json:"bandwidth_kbps,omitempty"`
-	ConnectionCount       *int64   `json:"connection_count,omitempty"`
+	Type                   *string  `json:"type,omitempty"`
+	Status                 *string  `json:"status,omitempty"`
+	LoadAverage            *float64 `json:"load_average,omitempty"`
+	QueryTimeMilliseconds  *int64   `json:"query_time_ms,omitempty"`
+	HealthTimeMilliseconds *int64   `json:"health_time_ms,omitempty"`
+	StatTimeMilliseconds   *int64   `json:"stat_time_ms,omitempty"`
+	BandwidthKbps          *float64 `json:"bandwidth_kbps,omitempty"`
+	ConnectionCount        *int64   `json:"connection_count,omitempty"`
 }
 
 // CacheStatFilter fulfills the cache.Filter interface, for filtering stats. See the `NewCacheStatFilter` documentation for details on which query parameters are used to filter.
@@ -528,7 +530,7 @@ func srvTRStateSelf(localStates peer.CRStatesThreadsafe) ([]byte, error) {
 }
 
 // TODO remove error params, handle by returning an error? How, since we need to return a non-standard code?
-func srvCacheStats(params url.Values, errorCount UintThreadsafe, errContext string, toData todata.TODataThreadsafe, statHistory StatHistoryThreadsafe) ([]byte, int) {
+func srvCacheStats(params url.Values, errorCount UintThreadsafe, errContext string, toData todata.TODataThreadsafe, statHistory ResultHistoryThreadsafe) ([]byte, int) {
 	filter, err := NewCacheStatFilter(params, toData.Get().ServerTypes)
 	if err != nil {
 		HandleErr(errorCount, errContext, err)
@@ -605,7 +607,7 @@ func srvAPIVersion(staticAppData StaticAppData) []byte {
 func srvAPITrafficOpsURI(opsConfig OpsConfigThreadsafe) []byte {
 	return []byte(opsConfig.Get().Url)
 }
-func srvAPICacheStates(toData todata.TODataThreadsafe, statHistory StatHistoryThreadsafe, lastHealthDurations DurationMapThreadsafe, localStates peer.CRStatesThreadsafe, lastStats LastStatsThreadsafe, localCacheStatus CacheAvailableStatusThreadsafe) ([]byte, error) {
+func srvAPICacheStates(toData todata.TODataThreadsafe, statHistory ResultHistoryThreadsafe, lastHealthDurations DurationMapThreadsafe, localStates peer.CRStatesThreadsafe, lastStats LastStatsThreadsafe, localCacheStatus CacheAvailableStatusThreadsafe) ([]byte, error) {
 	return json.Marshal(createCacheStatuses(toData.Get().ServerTypes, statHistory.Get(), lastHealthDurations.Get(), localStates.Get().Caches, lastStats.Get(), localCacheStatus))
 }
 
@@ -621,7 +623,7 @@ func srvAPIBandwidthKbps(toData todata.TODataThreadsafe, lastStats LastStatsThre
 	}
 	return []byte(fmt.Sprintf("%f", sum))
 }
-func srvAPIBandwidthCapacityKbps(statHistoryThs StatHistoryThreadsafe) []byte {
+func srvAPIBandwidthCapacityKbps(statHistoryThs ResultHistoryThreadsafe) []byte {
 	statHistory := statHistoryThs.Get()
 	cap := int64(0)
 	for _, results := range statHistory {
@@ -653,7 +655,7 @@ func MakeDispatchMap(
 	localStates peer.CRStatesThreadsafe,
 	peerStates peer.CRStatesPeersThreadsafe,
 	combinedStates peer.CRStatesThreadsafe,
-	statHistory StatHistoryThreadsafe,
+	statHistory ResultHistoryThreadsafe,
 	dsStats DSStatsReader,
 	events EventsThreadsafe,
 	staticAppData StaticAppData,
@@ -813,7 +815,14 @@ func createCacheStatuses(
 		}
 
 		cacheTypeStr := string(cacheType)
-		statii[cacheName] = CacheStatus{Type: &cacheTypeStr, LoadAverage: loadAverage, QueryTimeMilliseconds: queryTime, BandwidthKbps: kbps, ConnectionCount: connections, Status: status}
+		statii[cacheName] = CacheStatus{
+			Type:                  &cacheTypeStr,
+			LoadAverage:           loadAverage,
+			QueryTimeMilliseconds: queryTime,
+			BandwidthKbps:         kbps,
+			ConnectionCount:       connections,
+			Status:                status,
+		}
 	}
 	return statii
 }

http://git-wip-us.apache.org/repos/asf/incubator-trafficcontrol/blob/0a350bc7/traffic_monitor/experimental/traffic_monitor/manager/healthresult.go
----------------------------------------------------------------------
diff --git a/traffic_monitor/experimental/traffic_monitor/manager/healthresult.go b/traffic_monitor/experimental/traffic_monitor/manager/healthresult.go
index 023c570..8b81519 100644
--- a/traffic_monitor/experimental/traffic_monitor/manager/healthresult.go
+++ b/traffic_monitor/experimental/traffic_monitor/manager/healthresult.go
@@ -8,9 +8,9 @@ package manager
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -19,7 +19,6 @@ package manager
  * under the License.
  */
 
-
 import (
 	"sync"
 	"time"
@@ -80,7 +79,7 @@ func StartHealthResultManager(
 	cacheHealthChan <-chan cache.Result,
 	toData todata.TODataThreadsafe,
 	localStates peer.CRStatesThreadsafe,
-	statHistory StatHistoryThreadsafe,
+	statHistory ResultHistoryThreadsafe,
 	monitorConfig TrafficMonitorConfigMapThreadsafe,
 	peerStates peer.CRStatesPeersThreadsafe,
 	combinedStates peer.CRStatesThreadsafe,
@@ -114,7 +113,7 @@ func healthResultManagerListen(
 	toData todata.TODataThreadsafe,
 	localStates peer.CRStatesThreadsafe,
 	lastHealthDurations DurationMapThreadsafe,
-	statHistory StatHistoryThreadsafe,
+	statHistory ResultHistoryThreadsafe,
 	monitorConfig TrafficMonitorConfigMapThreadsafe,
 	peerStates peer.CRStatesPeersThreadsafe,
 	combinedStates peer.CRStatesThreadsafe,
@@ -191,7 +190,7 @@ func processHealthResult(
 	toData todata.TODataThreadsafe,
 	localStates peer.CRStatesThreadsafe,
 	lastHealthDurationsThreadsafe DurationMapThreadsafe,
-	statHistory StatHistoryThreadsafe,
+	statHistory ResultHistoryThreadsafe,
 	monitorConfig TrafficMonitorConfigMapThreadsafe,
 	peerStates peer.CRStatesPeersThreadsafe,
 	combinedStates peer.CRStatesThreadsafe,

http://git-wip-us.apache.org/repos/asf/incubator-trafficcontrol/blob/0a350bc7/traffic_monitor/experimental/traffic_monitor/manager/opsconfig.go
----------------------------------------------------------------------
diff --git a/traffic_monitor/experimental/traffic_monitor/manager/opsconfig.go b/traffic_monitor/experimental/traffic_monitor/manager/opsconfig.go
index f525181..9f70cd7 100644
--- a/traffic_monitor/experimental/traffic_monitor/manager/opsconfig.go
+++ b/traffic_monitor/experimental/traffic_monitor/manager/opsconfig.go
@@ -8,9 +8,9 @@ package manager
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -19,7 +19,6 @@ package manager
  * under the License.
  */
 
-
 import (
 	"fmt"
 	"sync"
@@ -73,7 +72,7 @@ func StartOpsConfigManager(
 	localStates peer.CRStatesThreadsafe,
 	peerStates peer.CRStatesPeersThreadsafe,
 	combinedStates peer.CRStatesThreadsafe,
-	statHistory StatHistoryThreadsafe,
+	statHistory ResultHistoryThreadsafe,
 	lastStats LastStatsThreadsafe,
 	dsStats DSStatsReader,
 	events EventsThreadsafe,

http://git-wip-us.apache.org/repos/asf/incubator-trafficcontrol/blob/0a350bc7/traffic_monitor/experimental/traffic_monitor/manager/resulthistory.go
----------------------------------------------------------------------
diff --git a/traffic_monitor/experimental/traffic_monitor/manager/resulthistory.go b/traffic_monitor/experimental/traffic_monitor/manager/resulthistory.go
new file mode 100644
index 0000000..f6ccc25
--- /dev/null
+++ b/traffic_monitor/experimental/traffic_monitor/manager/resulthistory.go
@@ -0,0 +1,72 @@
+package manager
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import (
+	"github.com/apache/incubator-trafficcontrol/traffic_monitor/experimental/traffic_monitor/cache"
+	"github.com/apache/incubator-trafficcontrol/traffic_monitor/experimental/traffic_monitor/enum"
+	"sync"
+)
+
+// ResultHistory is a map of cache names, to an array of result history from each cache.
+type ResultHistory map[enum.CacheName][]cache.Result
+
+func copyResult(a []cache.Result) []cache.Result {
+	b := make([]cache.Result, len(a), len(a))
+	copy(b, a)
+	return b
+}
+
+// Copy copies returns a deep copy of this ResultHistory
+func (a ResultHistory) Copy() ResultHistory {
+	b := ResultHistory{}
+	for k, v := range a {
+		b[k] = copyResult(v)
+	}
+	return b
+}
+
+// ResultHistoryThreadsafe provides safe access for multiple goroutines readers and a single writer to a stored ResultHistory object.
+// This could be made lock-free, if the performance was necessary
+// TODO add separate locks for Caches and Deliveryservice maps?
+type ResultHistoryThreadsafe struct {
+	resultHistory *ResultHistory
+	m             *sync.RWMutex
+}
+
+// NewResultHistoryThreadsafe returns a new ResultHistory safe for multiple readers and a single writer.
+func NewResultHistoryThreadsafe() ResultHistoryThreadsafe {
+	h := ResultHistory{}
+	return ResultHistoryThreadsafe{m: &sync.RWMutex{}, resultHistory: &h}
+}
+
+// Get returns the ResultHistory. Callers MUST NOT modify. If mutation is necessary, call ResultHistory.Copy()
+func (h *ResultHistoryThreadsafe) Get() ResultHistory {
+	h.m.RLock()
+	defer h.m.RUnlock()
+	return *h.resultHistory
+}
+
+// Set sets the internal ResultHistory. This is only safe for one thread of execution. This MUST NOT be called from multiple threads.
+func (h *ResultHistoryThreadsafe) Set(v ResultHistory) {
+	h.m.Lock()
+	*h.resultHistory = v
+	h.m.Unlock()
+}

http://git-wip-us.apache.org/repos/asf/incubator-trafficcontrol/blob/0a350bc7/traffic_monitor/experimental/traffic_monitor/manager/stathistory.go
----------------------------------------------------------------------
diff --git a/traffic_monitor/experimental/traffic_monitor/manager/stathistory.go b/traffic_monitor/experimental/traffic_monitor/manager/stathistory.go
index 57c942b..ffad8bc 100644
--- a/traffic_monitor/experimental/traffic_monitor/manager/stathistory.go
+++ b/traffic_monitor/experimental/traffic_monitor/manager/stathistory.go
@@ -20,7 +20,6 @@ package manager
  */
 
 import (
-	"sync"
 	"time"
 
 	"github.com/apache/incubator-trafficcontrol/traffic_monitor/experimental/common/log"
@@ -33,52 +32,6 @@ import (
 	to "github.com/apache/incubator-trafficcontrol/traffic_ops/client"
 )
 
-// StatHistory is a map of cache names, to an array of result history from each cache.
-type StatHistory map[enum.CacheName][]cache.Result
-
-func copyStat(a []cache.Result) []cache.Result {
-	b := make([]cache.Result, len(a), len(a))
-	copy(b, a)
-	return b
-}
-
-// Copy copies returns a deep copy of this StatHistory
-func (a StatHistory) Copy() StatHistory {
-	b := StatHistory{}
-	for k, v := range a {
-		b[k] = copyStat(v)
-	}
-	return b
-}
-
-// StatHistoryThreadsafe provides safe access for multiple goroutines readers and a single writer to a stored StatHistory object.
-// This could be made lock-free, if the performance was necessary
-// TODO add separate locks for Caches and Deliveryservice maps?
-type StatHistoryThreadsafe struct {
-	statHistory *StatHistory
-	m           *sync.RWMutex
-}
-
-// NewStatHistoryThreadsafe returns a new StatHistory safe for multiple readers and a single writer.
-func NewStatHistoryThreadsafe() StatHistoryThreadsafe {
-	h := StatHistory{}
-	return StatHistoryThreadsafe{m: &sync.RWMutex{}, statHistory: &h}
-}
-
-// Get returns the StatHistory. Callers MUST NOT modify. If mutation is necessary, call StatHistory.Copy()
-func (h *StatHistoryThreadsafe) Get() StatHistory {
-	h.m.RLock()
-	defer h.m.RUnlock()
-	return *h.statHistory
-}
-
-// Set sets the internal StatHistory. This is only safe for one thread of execution. This MUST NOT be called from multiple threads.
-func (h *StatHistoryThreadsafe) Set(v StatHistory) {
-	h.m.Lock()
-	*h.statHistory = v
-	h.m.Unlock()
-}
-
 func pruneHistory(history []cache.Result, limit uint64) []cache.Result {
 	if uint64(len(history)) > limit {
 		history = history[:limit-1]
@@ -112,8 +65,8 @@ func StartStatHistoryManager(
 	errorCount UintThreadsafe,
 	cfg config.Config,
 	monitorConfig TrafficMonitorConfigMapThreadsafe,
-) (StatHistoryThreadsafe, DurationMapThreadsafe, LastStatsThreadsafe, DSStatsReader, UnpolledCachesThreadsafe) {
-	statHistory := NewStatHistoryThreadsafe()
+) (ResultHistoryThreadsafe, DurationMapThreadsafe, LastStatsThreadsafe, DSStatsReader, UnpolledCachesThreadsafe) {
+	statHistory := NewResultHistoryThreadsafe()
 	lastStatDurations := NewDurationMapThreadsafe()
 	lastStatEndTimes := map[enum.CacheName]time.Time{}
 	lastStats := NewLastStatsThreadsafe()
@@ -156,7 +109,7 @@ func StartStatHistoryManager(
 // processStatResults processes the given results, creating and setting DSStats, LastStats, and other stats. Note this is NOT threadsafe, and MUST NOT be called from multiple threads.
 func processStatResults(
 	results []cache.Result,
-	statHistoryThreadsafe StatHistoryThreadsafe,
+	statHistoryThreadsafe ResultHistoryThreadsafe,
 	combinedStates peer.Crstates,
 	lastStats LastStatsThreadsafe,
 	toData todata.TOData,


[14/20] incubator-trafficcontrol git commit: Add TM2 heap_test.go

Posted by ne...@apache.org.
Add TM2 heap_test.go


Project: http://git-wip-us.apache.org/repos/asf/incubator-trafficcontrol/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-trafficcontrol/commit/34c0b4f8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-trafficcontrol/tree/34c0b4f8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-trafficcontrol/diff/34c0b4f8

Branch: refs/heads/master
Commit: 34c0b4f84ea6a472071d03db39ed5fe6b018fdca
Parents: c3ab47d
Author: Robert Butts <ro...@gmail.com>
Authored: Wed Dec 7 14:06:02 2016 -0700
Committer: Dave Neuman <ne...@apache.org>
Committed: Thu Dec 8 11:44:33 2016 -0700

----------------------------------------------------------------------
 .../experimental/common/poller/heap_test.go     | 143 +++++++++++++++++++
 1 file changed, 143 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-trafficcontrol/blob/34c0b4f8/traffic_monitor/experimental/common/poller/heap_test.go
----------------------------------------------------------------------
diff --git a/traffic_monitor/experimental/common/poller/heap_test.go b/traffic_monitor/experimental/common/poller/heap_test.go
new file mode 100644
index 0000000..ada593d
--- /dev/null
+++ b/traffic_monitor/experimental/common/poller/heap_test.go
@@ -0,0 +1,143 @@
+package poller
+
+import (
+	"fmt"
+	"math/rand"
+	"testing"
+	"time"
+)
+
+func TestHeap(t *testing.T) {
+	h := &Heap{}
+
+	num := 100
+	for i := 0; i < num; i++ {
+		h.Push(HeapPollInfo{
+			Info: HTTPPollInfo{
+				Interval: time.Second * time.Duration(8),
+				ID:       fmt.Sprintf("%v", i),
+			},
+			Next: time.Now().Add(time.Second * time.Duration(i)), // time.Duration((i%2)*-1)
+		})
+	}
+
+	for i := 0; i < num; i++ {
+		val, ok := h.Pop()
+		if !ok {
+			t.Errorf("expected pop ID %v got empty heap", i)
+		} else if val.Info.ID != fmt.Sprintf("%v", i) {
+			t.Errorf("expected pop ID %v got %v next %v", i, val.Info.ID, val.Next)
+		}
+	}
+}
+
+func TestHeapRandom(t *testing.T) {
+	h := &Heap{}
+
+	num := 10
+	for i := 0; i < num; i++ {
+		h.Push(HeapPollInfo{
+			Info: HTTPPollInfo{
+				Interval: time.Second * time.Duration(8),
+				ID:       fmt.Sprintf("%v", i),
+			},
+			Next: time.Now().Add(time.Duration(rand.Int63())),
+		})
+	}
+
+	previousTime := time.Now()
+	for i := 0; i < num; i++ {
+		val, ok := h.Pop()
+		if !ok {
+			t.Errorf("expected pop ID %v got empty heap", i)
+		} else if previousTime.After(val.Next) {
+			t.Errorf("heap pop %v < previous %v expected >", val.Next, previousTime)
+		}
+		previousTime = val.Next
+	}
+}
+
+func TestHeapRandomPopping(t *testing.T) {
+	h := &Heap{}
+
+	randInfo := func(id int) HeapPollInfo {
+		return HeapPollInfo{
+			Info: HTTPPollInfo{
+				Interval: time.Second * time.Duration(8),
+				ID:       fmt.Sprintf("%v", id),
+			},
+			Next: time.Now().Add(time.Duration(rand.Int63())),
+		}
+	}
+
+	num := 10
+	for i := 0; i < num; i++ {
+		h.Push(randInfo(i))
+	}
+
+	previousTime := time.Now()
+	for i := 0; i < num/2; i++ {
+		val, ok := h.Pop()
+		if !ok {
+			t.Errorf("expected pop ID %v got empty heap", i)
+		} else if previousTime.After(val.Next) {
+			t.Errorf("heap pop %v < previous %v expected >", val.Next, previousTime)
+		}
+		previousTime = val.Next
+	}
+
+	for i := 0; i < num; i++ {
+		h.Push(randInfo(i))
+	}
+	val, ok := h.Pop()
+	if !ok {
+		t.Errorf("expected pop, got empty heap")
+	} else {
+		previousTime = val.Next
+	}
+
+	for i := 0; i < num; i++ {
+		val, ok := h.Pop()
+		if !ok {
+			t.Errorf("expected pop ID %v got empty heap", i)
+		} else if previousTime.After(val.Next) {
+			t.Errorf("heap pop %v < previous %v expected >", val.Next, previousTime)
+		}
+		previousTime = val.Next
+	}
+
+	for i := 0; i < num; i++ {
+		h.Push(randInfo(i))
+	}
+	val, ok = h.Pop()
+	if !ok {
+		t.Errorf("expected pop, got empty heap")
+	} else {
+		previousTime = val.Next
+	}
+
+	for i := 0; i < num; i++ {
+		val, ok := h.Pop()
+		if !ok {
+			t.Errorf("expected pop ID %v got empty heap", i)
+		} else if previousTime.After(val.Next) {
+			t.Errorf("heap pop %v < previous %v expected >", val.Next, previousTime)
+		}
+		previousTime = val.Next
+	}
+
+	for i := 0; i < num/2-2; i++ { // -2 for the two we manually popped in order to get the max
+		val, ok := h.Pop()
+		if !ok {
+			t.Errorf("expected pop ID %v got empty heap", i)
+		} else if previousTime.After(val.Next) {
+			t.Errorf("heap pop %v < previous %v expected >", val.Next, previousTime)
+		}
+		previousTime = val.Next
+	}
+
+	val, ok = h.Pop()
+	if ok {
+		t.Errorf("expected empty, got %+v", val)
+	}
+}


[07/20] incubator-trafficcontrol git commit: Add TM2 health times to API and GUI

Posted by ne...@apache.org.
Add TM2 health times to API and GUI


Project: http://git-wip-us.apache.org/repos/asf/incubator-trafficcontrol/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-trafficcontrol/commit/c1db6adf
Tree: http://git-wip-us.apache.org/repos/asf/incubator-trafficcontrol/tree/c1db6adf
Diff: http://git-wip-us.apache.org/repos/asf/incubator-trafficcontrol/diff/c1db6adf

Branch: refs/heads/master
Commit: c1db6adfbc17c150a41621cc3c16c91856e48254
Parents: 0a350bc
Author: Robert Butts <ro...@gmail.com>
Authored: Tue Nov 22 09:05:57 2016 -0700
Committer: Dave Neuman <ne...@apache.org>
Committed: Thu Dec 8 11:44:33 2016 -0700

----------------------------------------------------------------------
 .../experimental/traffic_monitor/index.html     |  28 ++++-
 .../traffic_monitor/manager/datarequest.go      | 124 +++++++++++++++----
 .../traffic_monitor/manager/healthresult.go     |  18 +--
 .../traffic_monitor/manager/manager.go          |   8 +-
 .../traffic_monitor/manager/opsconfig.go        |   2 +
 5 files changed, 144 insertions(+), 36 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-trafficcontrol/blob/c1db6adf/traffic_monitor/experimental/traffic_monitor/index.html
----------------------------------------------------------------------
diff --git a/traffic_monitor/experimental/traffic_monitor/index.html b/traffic_monitor/experimental/traffic_monitor/index.html
index 43d1ab4..847982e 100644
--- a/traffic_monitor/experimental/traffic_monitor/index.html
+++ b/traffic_monitor/experimental/traffic_monitor/index.html
@@ -1,3 +1,5 @@
+<!DOCTYPE html>
+
 <!--
 Licensed to the Apache Software Foundation (ASF) under one
 or more contributor license agreements.  See the NOTICE file
@@ -17,7 +19,7 @@ specific language governing permissions and limitations
 under the License.
 -->
 
-<!DOCTYPE html>
+
 <html>
 	<head>
 		<!-- <script src="sorttable.js"></script> -->
@@ -261,8 +263,12 @@ under the License.
 						 row.insertCell(2).id = row.id + "-status";
 						 row.insertCell(3).id = row.id + "-load-average";
 						 row.insertCell(4).id = row.id + "-query-time";
-						 row.insertCell(5).id = row.id + "-bandwidth";
-						 row.insertCell(6).id = row.id + "-connection-count";
+						 row.insertCell(5).id = row.id + "-health-time";
+						 row.insertCell(6).id = row.id + "-stat-time";
+						 row.insertCell(7).id = row.id + "-health-span";
+						 row.insertCell(8).id = row.id + "-stat-span";
+						 row.insertCell(9).id = row.id + "-bandwidth";
+						 row.insertCell(10).id = row.id + "-connection-count";
 						 document.getElementById(row.id + "-server").textContent = server;
 					 }
 
@@ -293,6 +299,18 @@ under the License.
 					 if (jdata[server].hasOwnProperty("query_time_ms")) {
 						 document.getElementById("cache-states-" + server + "-query-time").textContent = jdata[server].query_time_ms;
 					 }
+					 if (jdata[server].hasOwnProperty("health_time_ms")) {
+						 document.getElementById("cache-states-" + server + "-health-time").textContent = jdata[server].health_time_ms;
+					 }
+					 if (jdata[server].hasOwnProperty("stat_time_ms")) {
+						 document.getElementById("cache-states-" + server + "-stat-time").textContent = jdata[server].stat_time_ms;
+					 }
+					 if (jdata[server].hasOwnProperty("health_span_ms")) {
+						 document.getElementById("cache-states-" + server + "-health-span").textContent = jdata[server].health_span_ms;
+					 }
+					 if (jdata[server].hasOwnProperty("stat_span_ms")) {
+						 document.getElementById("cache-states-" + server + "-stat-span").textContent = jdata[server].stat_span_ms;
+					 }
 					 if (jdata[server].hasOwnProperty("bandwidth_kbps")) {
 						 document.getElementById("cache-states-" + server + "-bandwidth").textContent = (jdata[server].bandwidth_kbps / kilobitsInMegabit).toFixed(2);
 					 }
@@ -454,6 +472,10 @@ under the License.
 					<th>Status</th>
 					<th>Load Average</th>
 					<th>Query Time (ms)</th>
+					<th>Health Time (ms)</th>
+					<th>Stat Time (ms)</th>
+					<th>Health Span (ms)</th>
+					<th>Stat Span (ms)</th>
 					<th>Bandwidth (mbps)</th>
 					<th>Connection Count</th>
 				</tr>

http://git-wip-us.apache.org/repos/asf/incubator-trafficcontrol/blob/c1db6adf/traffic_monitor/experimental/traffic_monitor/manager/datarequest.go
----------------------------------------------------------------------
diff --git a/traffic_monitor/experimental/traffic_monitor/manager/datarequest.go b/traffic_monitor/experimental/traffic_monitor/manager/datarequest.go
index 2530240..a329057 100644
--- a/traffic_monitor/experimental/traffic_monitor/manager/datarequest.go
+++ b/traffic_monitor/experimental/traffic_monitor/manager/datarequest.go
@@ -61,12 +61,19 @@ type APIPeerStates struct {
 // CacheStatus contains summary stat data about the given cache.
 // TODO make fields nullable, so error fields can be omitted, letting API callers still get updates for unerrored fields
 type CacheStatus struct {
-	Type                   *string  `json:"type,omitempty"`
-	Status                 *string  `json:"status,omitempty"`
-	LoadAverage            *float64 `json:"load_average,omitempty"`
-	QueryTimeMilliseconds  *int64   `json:"query_time_ms,omitempty"`
-	HealthTimeMilliseconds *int64   `json:"health_time_ms,omitempty"`
-	StatTimeMilliseconds   *int64   `json:"stat_time_ms,omitempty"`
+	Type        *string  `json:"type,omitempty"`
+	Status      *string  `json:"status,omitempty"`
+	LoadAverage *float64 `json:"load_average,omitempty"`
+	// QueryTimeMilliseconds is the time it took this app to perform a complete query and process the data, end-to-end, for the latest health query.
+	QueryTimeMilliseconds *int64 `json:"query_time_ms,omitempty"`
+	// HealthTimeMilliseconds is the time it took to make the HTTP request and get back the full response, for the latest health query.
+	HealthTimeMilliseconds *int64 `json:"health_time_ms,omitempty"`
+	// StatTimeMilliseconds is the time it took to make the HTTP request and get back the full response, for the latest stat query.
+	StatTimeMilliseconds *int64 `json:"stat_time_ms,omitempty"`
+	// StatSpanMilliseconds is the length of time between completing the most recent two stat queries. This can be used as a rough gauge of the end-to-end query processing time.
+	StatSpanMilliseconds *int64 `json:"stat_span_ms,omitempty"`
+	// HealthSpanMilliseconds is the length of time between completing the most recent two health queries. This can be used as a rough gauge of the end-to-end query processing time.
+	HealthSpanMilliseconds *int64   `json:"health_span_ms,omitempty"`
 	BandwidthKbps          *float64 `json:"bandwidth_kbps,omitempty"`
 	ConnectionCount        *int64   `json:"connection_count,omitempty"`
 }
@@ -607,8 +614,8 @@ func srvAPIVersion(staticAppData StaticAppData) []byte {
 func srvAPITrafficOpsURI(opsConfig OpsConfigThreadsafe) []byte {
 	return []byte(opsConfig.Get().Url)
 }
-func srvAPICacheStates(toData todata.TODataThreadsafe, statHistory ResultHistoryThreadsafe, lastHealthDurations DurationMapThreadsafe, localStates peer.CRStatesThreadsafe, lastStats LastStatsThreadsafe, localCacheStatus CacheAvailableStatusThreadsafe) ([]byte, error) {
-	return json.Marshal(createCacheStatuses(toData.Get().ServerTypes, statHistory.Get(), lastHealthDurations.Get(), localStates.Get().Caches, lastStats.Get(), localCacheStatus))
+func srvAPICacheStates(toData todata.TODataThreadsafe, statHistory ResultHistoryThreadsafe, healthHistory ResultHistoryThreadsafe, lastHealthDurations DurationMapThreadsafe, localStates peer.CRStatesThreadsafe, lastStats LastStatsThreadsafe, localCacheStatus CacheAvailableStatusThreadsafe) ([]byte, error) {
+	return json.Marshal(createCacheStatuses(toData.Get().ServerTypes, statHistory.Get(), healthHistory.Get(), lastHealthDurations.Get(), localStates.Get().Caches, lastStats.Get(), localCacheStatus))
 }
 
 func srvAPIBandwidthKbps(toData todata.TODataThreadsafe, lastStats LastStatsThreadsafe) []byte {
@@ -656,6 +663,7 @@ func MakeDispatchMap(
 	peerStates peer.CRStatesPeersThreadsafe,
 	combinedStates peer.CRStatesThreadsafe,
 	statHistory ResultHistoryThreadsafe,
+	healthHistory ResultHistoryThreadsafe,
 	dsStats DSStatsReader,
 	events EventsThreadsafe,
 	staticAppData StaticAppData,
@@ -721,7 +729,7 @@ func MakeDispatchMap(
 			return srvAPITrafficOpsURI(opsConfig)
 		})),
 		"/api/cache-statuses": wrap(WrapErr(errorCount, func() ([]byte, error) {
-			return srvAPICacheStates(toData, statHistory, lastHealthDurations, localStates, lastStats, localCacheStatus)
+			return srvAPICacheStates(toData, statHistory, healthHistory, lastHealthDurations, localStates, lastStats, localCacheStatus)
 		})),
 		"/api/bandwidth-kbps": wrap(WrapBytes(func() []byte {
 			return srvAPIBandwidthKbps(toData, lastStats)
@@ -732,9 +740,61 @@ func MakeDispatchMap(
 	}
 }
 
+// latestResultTimeMS returns the length of time in milliseconds that it took to request the most recent non-errored result.
+func latestResultTimeMS(cacheName enum.CacheName, history map[enum.CacheName][]cache.Result) (int64, error) {
+	results, ok := history[cacheName]
+	if !ok {
+		return 0, fmt.Errorf("cache %v has no history", cacheName)
+	}
+	if len(results) == 0 {
+		return 0, fmt.Errorf("cache %v history empty", cacheName)
+	}
+	result := cache.Result{}
+	foundResult := false
+	for _, r := range results {
+		if r.Error == nil {
+			result = r
+			foundResult = true
+			break
+		}
+	}
+	if !foundResult {
+		return 0, fmt.Errorf("cache %v No unerrored result", cacheName)
+	}
+	return int64(result.RequestTime / time.Millisecond), nil
+}
+
+func latestQueryTimeMS(cacheName enum.CacheName, lastDurations map[enum.CacheName]time.Duration) (int64, error) {
+	queryTime, ok := lastDurations[cacheName]
+	if !ok {
+		return 0, fmt.Errorf("cache %v not in last durations\n", cacheName)
+	}
+	return int64(queryTime / time.Millisecond), nil
+}
+
+// resultSpanMS returns the length of time between the most recent two results. That is, how long could the cache have been down before we would have noticed it? Note this returns the time between the most recent two results, irrespective if they errored.
+func resultSpanMS(cacheName enum.CacheName, history map[enum.CacheName][]cache.Result) (int64, error) {
+	results, ok := history[cacheName]
+	if !ok {
+		return 0, fmt.Errorf("cache %v has no history", cacheName)
+	}
+	if len(results) == 0 {
+		return 0, fmt.Errorf("cache %v history empty", cacheName)
+	}
+	if len(results) < 2 {
+		return 0, fmt.Errorf("cache %v history only has one result, can't compute span between results", cacheName)
+	}
+
+	latestResult := results[0]
+	penultimateResult := results[1]
+	span := latestResult.Time.Sub(penultimateResult.Time)
+	return int64(span / time.Millisecond), nil
+}
+
 func createCacheStatuses(
 	cacheTypes map[enum.CacheName]enum.CacheType,
 	statHistory map[enum.CacheName][]cache.Result,
+	healthHistory map[enum.CacheName][]cache.Result,
 	lastHealthDurations map[enum.CacheName]time.Duration,
 	cacheStates map[enum.CacheName]peer.IsAvailable,
 	lastStats ds.LastStats,
@@ -774,13 +834,29 @@ func createCacheStatuses(
 			}
 		}
 
-		var queryTime *int64
-		queryTimeVal, ok := lastHealthDurations[cacheName]
-		if !ok {
-			log.Warnf("cache not in last health durations cache %s\n", cacheName)
-		} else {
-			queryTimeInt := int64(queryTimeVal / time.Millisecond)
-			queryTime = &queryTimeInt
+		healthQueryTime, err := latestQueryTimeMS(cacheName, lastHealthDurations)
+		if err != nil {
+			log.Warnf("Error getting cache %v health query time: %v\n", cacheName, err)
+		}
+
+		statTime, err := latestResultTimeMS(cacheName, statHistory)
+		if err != nil {
+			log.Warnf("Error getting cache %v stat result time: %v\n", cacheName, err)
+		}
+
+		healthTime, err := latestResultTimeMS(cacheName, healthHistory)
+		if err != nil {
+			log.Warnf("Error getting cache %v health result time: %v\n", cacheName, err)
+		}
+
+		statSpan, err := resultSpanMS(cacheName, statHistory)
+		if err != nil {
+			log.Warnf("Error getting cache %v stat span: %v\n", cacheName, err)
+		}
+
+		healthSpan, err := resultSpanMS(cacheName, healthHistory)
+		if err != nil {
+			log.Warnf("Error getting cache %v health span: %v\n", cacheName, err)
 		}
 
 		var kbps *float64
@@ -816,12 +892,16 @@ func createCacheStatuses(
 
 		cacheTypeStr := string(cacheType)
 		statii[cacheName] = CacheStatus{
-			Type:                  &cacheTypeStr,
-			LoadAverage:           loadAverage,
-			QueryTimeMilliseconds: queryTime,
-			BandwidthKbps:         kbps,
-			ConnectionCount:       connections,
-			Status:                status,
+			Type:                   &cacheTypeStr,
+			LoadAverage:            loadAverage,
+			QueryTimeMilliseconds:  &healthQueryTime,
+			StatTimeMilliseconds:   &statTime,
+			HealthTimeMilliseconds: &healthTime,
+			StatSpanMilliseconds:   &statSpan,
+			HealthSpanMilliseconds: &healthSpan,
+			BandwidthKbps:          kbps,
+			ConnectionCount:        connections,
+			Status:                 status,
 		}
 	}
 	return statii

http://git-wip-us.apache.org/repos/asf/incubator-trafficcontrol/blob/c1db6adf/traffic_monitor/experimental/traffic_monitor/manager/healthresult.go
----------------------------------------------------------------------
diff --git a/traffic_monitor/experimental/traffic_monitor/manager/healthresult.go b/traffic_monitor/experimental/traffic_monitor/manager/healthresult.go
index 8b81519..e27dbbf 100644
--- a/traffic_monitor/experimental/traffic_monitor/manager/healthresult.go
+++ b/traffic_monitor/experimental/traffic_monitor/manager/healthresult.go
@@ -74,7 +74,7 @@ func (o *DurationMapThreadsafe) Set(d DurationMap) {
 // Note this polls the brief stat endpoint from ATS Astats, not the full stats.
 // This poll should be quicker and less computationally expensive for ATS, but
 // doesn't include all stat data needed for e.g. delivery service calculations.4
-// Returns the last health durations, events, and the local cache statuses.
+// Returns the last health durations, events, the local cache statuses, and the health result history.
 func StartHealthResultManager(
 	cacheHealthChan <-chan cache.Result,
 	toData todata.TODataThreadsafe,
@@ -86,16 +86,18 @@ func StartHealthResultManager(
 	fetchCount UintThreadsafe,
 	errorCount UintThreadsafe,
 	cfg config.Config,
-) (DurationMapThreadsafe, EventsThreadsafe, CacheAvailableStatusThreadsafe) {
+) (DurationMapThreadsafe, EventsThreadsafe, CacheAvailableStatusThreadsafe, ResultHistoryThreadsafe) {
 	lastHealthDurations := NewDurationMapThreadsafe()
 	events := NewEventsThreadsafe(cfg.MaxEvents)
 	localCacheStatus := NewCacheAvailableStatusThreadsafe()
+	healthHistory := NewResultHistoryThreadsafe()
 	go healthResultManagerListen(
 		cacheHealthChan,
 		toData,
 		localStates,
 		lastHealthDurations,
 		statHistory,
+		healthHistory,
 		monitorConfig,
 		peerStates,
 		combinedStates,
@@ -105,7 +107,7 @@ func StartHealthResultManager(
 		localCacheStatus,
 		cfg,
 	)
-	return lastHealthDurations, events, localCacheStatus
+	return lastHealthDurations, events, localCacheStatus, healthHistory
 }
 
 func healthResultManagerListen(
@@ -114,6 +116,7 @@ func healthResultManagerListen(
 	localStates peer.CRStatesThreadsafe,
 	lastHealthDurations DurationMapThreadsafe,
 	statHistory ResultHistoryThreadsafe,
+	healthHistory ResultHistoryThreadsafe,
 	monitorConfig TrafficMonitorConfigMapThreadsafe,
 	peerStates peer.CRStatesPeersThreadsafe,
 	combinedStates peer.CRStatesThreadsafe,
@@ -124,7 +127,6 @@ func healthResultManagerListen(
 	cfg config.Config,
 ) {
 	lastHealthEndTimes := map[enum.CacheName]time.Time{}
-	healthHistory := map[enum.CacheName][]cache.Result{}
 	// This reads at least 1 value from the cacheHealthChan. Then, we loop, and try to read from the channel some more. If there's nothing to read, we hit `default` and process. If there is stuff to read, we read it, then inner-loop trying to read more. If we're continuously reading and the channel is never empty, and we hit the tick time, process anyway even though the channel isn't empty, to prevent never processing (starvation).
 	for {
 		var results []cache.Result
@@ -199,7 +201,7 @@ func processHealthResult(
 	events EventsThreadsafe,
 	localCacheStatusThreadsafe CacheAvailableStatusThreadsafe,
 	lastHealthEndTimes map[enum.CacheName]time.Time,
-	healthHistory map[enum.CacheName][]cache.Result,
+	healthHistory ResultHistoryThreadsafe,
 	results []cache.Result,
 	cfg config.Config,
 ) {
@@ -209,11 +211,12 @@ func processHealthResult(
 	toDataCopy := toData.Get() // create a copy, so the same data used for all processing of this cache health result
 	localCacheStatus := localCacheStatusThreadsafe.Get().Copy()
 	monitorConfigCopy := monitorConfig.Get()
+	healthHistoryCopy := healthHistory.Get().Copy()
 	for _, healthResult := range results {
 		log.Debugf("poll %v %v healthresultman start\n", healthResult.PollID, time.Now())
 		fetchCount.Inc()
 		var prevResult cache.Result
-		healthResultHistory := healthHistory[healthResult.ID]
+		healthResultHistory := healthHistoryCopy[healthResult.ID]
 		if len(healthResultHistory) != 0 {
 			prevResult = healthResultHistory[len(healthResultHistory)-1]
 		}
@@ -223,7 +226,7 @@ func processHealthResult(
 		}
 
 		maxHistory := uint64(monitorConfigCopy.Profile[monitorConfigCopy.TrafficServer[string(healthResult.ID)].Profile].Parameters.HistoryCount)
-		healthHistory[healthResult.ID] = pruneHistory(append(healthHistory[healthResult.ID], healthResult), maxHistory)
+		healthHistoryCopy[healthResult.ID] = pruneHistory(append([]cache.Result{healthResult}, healthHistoryCopy[healthResult.ID]...), maxHistory)
 
 		isAvailable, whyAvailable := health.EvalCache(healthResult, &monitorConfigCopy)
 		if localStates.Get().Caches[healthResult.ID].IsAvailable != isAvailable {
@@ -237,6 +240,7 @@ func processHealthResult(
 		calculateDeliveryServiceState(toDataCopy.DeliveryServiceServers, localStates)
 		log.Debugf("poll %v %v calculateDeliveryServiceState end\n", healthResult.PollID, time.Now())
 	}
+	healthHistory.Set(healthHistoryCopy)
 	localCacheStatusThreadsafe.Set(localCacheStatus)
 	// TODO determine if we should combineCrStates() here
 

http://git-wip-us.apache.org/repos/asf/incubator-trafficcontrol/blob/c1db6adf/traffic_monitor/experimental/traffic_monitor/manager/manager.go
----------------------------------------------------------------------
diff --git a/traffic_monitor/experimental/traffic_monitor/manager/manager.go b/traffic_monitor/experimental/traffic_monitor/manager/manager.go
index 49fa13a..ffb2c3a 100644
--- a/traffic_monitor/experimental/traffic_monitor/manager/manager.go
+++ b/traffic_monitor/experimental/traffic_monitor/manager/manager.go
@@ -8,9 +8,9 @@ package manager
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -19,7 +19,6 @@ package manager
  * under the License.
  */
 
-
 import (
 	"crypto/tls"
 	"net/http"
@@ -116,7 +115,7 @@ func Start(opsConfigFile string, cfg config.Config, staticAppData StaticAppData)
 		monitorConfig,
 	)
 
-	lastHealthDurations, events, localCacheStatus := StartHealthResultManager(
+	lastHealthDurations, events, localCacheStatus, healthHistory := StartHealthResultManager(
 		cacheHealthHandler.ResultChannel,
 		toData,
 		localStates,
@@ -139,6 +138,7 @@ func Start(opsConfigFile string, cfg config.Config, staticAppData StaticAppData)
 		peerStates,
 		combinedStates,
 		statHistory,
+		healthHistory,
 		lastKbpsStats,
 		dsStats,
 		events,

http://git-wip-us.apache.org/repos/asf/incubator-trafficcontrol/blob/c1db6adf/traffic_monitor/experimental/traffic_monitor/manager/opsconfig.go
----------------------------------------------------------------------
diff --git a/traffic_monitor/experimental/traffic_monitor/manager/opsconfig.go b/traffic_monitor/experimental/traffic_monitor/manager/opsconfig.go
index 9f70cd7..efe43c7 100644
--- a/traffic_monitor/experimental/traffic_monitor/manager/opsconfig.go
+++ b/traffic_monitor/experimental/traffic_monitor/manager/opsconfig.go
@@ -73,6 +73,7 @@ func StartOpsConfigManager(
 	peerStates peer.CRStatesPeersThreadsafe,
 	combinedStates peer.CRStatesThreadsafe,
 	statHistory ResultHistoryThreadsafe,
+	healthHistory ResultHistoryThreadsafe,
 	lastStats LastStatsThreadsafe,
 	dsStats DSStatsReader,
 	events EventsThreadsafe,
@@ -131,6 +132,7 @@ func StartOpsConfigManager(
 				peerStates,
 				combinedStates,
 				statHistory,
+				healthHistory,
 				dsStats,
 				events,
 				staticAppData,


[10/20] incubator-trafficcontrol git commit: Remove TM2 unused code

Posted by ne...@apache.org.
Remove TM2 unused code


Project: http://git-wip-us.apache.org/repos/asf/incubator-trafficcontrol/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-trafficcontrol/commit/0c6d88e4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-trafficcontrol/tree/0c6d88e4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-trafficcontrol/diff/0c6d88e4

Branch: refs/heads/master
Commit: 0c6d88e4055cdc31a8d9f6c7d859ad300faa9fb8
Parents: eaf0641
Author: Robert Butts <ro...@gmail.com>
Authored: Thu Dec 8 09:47:01 2016 -0700
Committer: Dave Neuman <ne...@apache.org>
Committed: Thu Dec 8 11:44:33 2016 -0700

----------------------------------------------------------------------
 .../experimental/common/poller/heap.go            | 18 ------------------
 1 file changed, 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-trafficcontrol/blob/0c6d88e4/traffic_monitor/experimental/common/poller/heap.go
----------------------------------------------------------------------
diff --git a/traffic_monitor/experimental/common/poller/heap.go b/traffic_monitor/experimental/common/poller/heap.go
index 2933b0e..b0152b3 100644
--- a/traffic_monitor/experimental/common/poller/heap.go
+++ b/traffic_monitor/experimental/common/poller/heap.go
@@ -25,14 +25,6 @@ import (
 	"time"
 )
 
-// type HTTPPollInfo struct {
-// 	Interval time.Duration
-// 	Timeout  time.Duration
-// 	ID       string
-// 	URL      string
-// 	Handler  handler.Handler
-// }
-
 type HeapPollInfo struct {
 	Info HTTPPollInfo
 	Next time.Time
@@ -110,7 +102,6 @@ func (h *Heap) Pop() (HeapPollInfo, bool) {
 }
 
 // Pop gets the latest time from the heap. Implements Algorithms MAX-HEAP-INSERT.
-// TODO make threadsafe
 func (h *Heap) Push(key HeapPollInfo) {
 	h.m.Lock()
 	defer h.m.Unlock()
@@ -120,12 +111,3 @@ func (h *Heap) Push(key HeapPollInfo) {
 	h.info = append(h.info, HeapPollInfo{Next: time.Unix(1<<63-1, 0)})
 	h.increaseKey(len(h.info)-1, key)
 }
-
-// debug
-func (h *Heap) Sprint() string {
-	s := ""
-	for i, info := range h.info {
-		s += fmt.Sprintf("%v %v|", i, info.Next)
-	}
-	return s
-}


[11/20] incubator-trafficcontrol git commit: Add TM2 InsomniacPoll to workaround Sleep bug

Posted by ne...@apache.org.
Add TM2 InsomniacPoll to workaround Sleep bug

Adds Traffic Monitor 2.0 InsomniacPoll, and a config setting for it,
which uses a single poller goroutine which busywaits and dispatches
pollers when necessary, and never sleeps. The regular Sleep Poll
runs a goroutine per poll, each of which sleeps for the poll interval.

This works around an issue wherein Sleep gets progressively slower.
E.g. a 8s poll takes 9s after a day, and 20s after a week. This issue
has been observed on OpenStack with CentOS 6.5 and kernel 2.6.32.
Users who don't experience the Sleep issue should definitely use
the original Sleep poll (via config setting http_poll_no_sleep),
as it's more efficient, idiomatic, faster, and safer. If and when
the Sleep issue is diagnosed and fixed, the InsomniacPoll should
be removed.


Project: http://git-wip-us.apache.org/repos/asf/incubator-trafficcontrol/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-trafficcontrol/commit/9153d3e9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-trafficcontrol/tree/9153d3e9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-trafficcontrol/diff/9153d3e9

Branch: refs/heads/master
Commit: 9153d3e9aa407e2ccea84aaeed9cc708f2ace3bb
Parents: 005d7f3
Author: Robert Butts <ro...@gmail.com>
Authored: Mon Dec 5 09:14:04 2016 -0700
Committer: Dave Neuman <ne...@apache.org>
Committed: Thu Dec 8 11:44:33 2016 -0700

----------------------------------------------------------------------
 .../experimental/common/poller/heap.go          | 112 ++++++++
 .../experimental/common/poller/poller.go        | 265 ++++++++++++++-----
 .../experimental/traffic_monitor/cache/data.go  |  75 ++++++
 .../traffic_monitor/config/config.go            |   7 +-
 .../traffic_monitor/manager/manager.go          |   6 +-
 .../traffic_monitor-example-config.json         |   3 +-
 .../experimental/traffic_monitor/version.go     |   7 +-
 7 files changed, 398 insertions(+), 77 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-trafficcontrol/blob/9153d3e9/traffic_monitor/experimental/common/poller/heap.go
----------------------------------------------------------------------
diff --git a/traffic_monitor/experimental/common/poller/heap.go b/traffic_monitor/experimental/common/poller/heap.go
new file mode 100644
index 0000000..c66a7c3
--- /dev/null
+++ b/traffic_monitor/experimental/common/poller/heap.go
@@ -0,0 +1,112 @@
+package poller
+
+import (
+	"fmt"
+	"sync"
+	"time"
+)
+
+// type HTTPPollInfo struct {
+// 	Interval time.Duration
+// 	Timeout  time.Duration
+// 	ID       string
+// 	URL      string
+// 	Handler  handler.Handler
+// }
+
+type HeapPollInfo struct {
+	Info HTTPPollInfo
+	Next time.Time
+}
+
+// Heap implements a Heap from Introduction to Algorithms (Cormen et al). A Heap allows fase access of the maximum object, in this case the latest Next time, and O(log(n)) insert. This Heap is specifically designed to be used as a Priority Queue.
+type Heap struct {
+	m        sync.Mutex
+	info     []HeapPollInfo
+	PollerID int64
+}
+
+func left(i int) int {
+	return 2*i + 1
+}
+
+func right(i int) int {
+	return 2*i + 2
+}
+
+// TODO benchmark directly replacing this, to see if Go inlines the function call
+func parent(i int) int {
+	return (i - 1) / 2
+}
+
+func (h *Heap) heapify(i int) {
+	l := left(i)
+	r := right(i)
+	var largest int
+	if l < len(h.info) && h.info[i].Next.After(h.info[l].Next) {
+		largest = l
+	} else {
+		largest = i
+	}
+
+	if r < len(h.info) && h.info[largest].Next.After(h.info[r].Next) {
+		largest = r
+	}
+
+	if largest != i {
+		h.info[i], h.info[largest] = h.info[largest], h.info[i]
+		h.heapify(largest)
+	}
+}
+
+func (h *Heap) increaseKey(i int, key HeapPollInfo) {
+	if h.info[i].Next.After(key.Next) {
+		panic("Poll.Heap.increaseKey got key smaller than index")
+	}
+
+	h.info[i] = key
+
+	for i > 0 && h.info[parent(i)].Next.After(h.info[i].Next) {
+		h.info[i], h.info[parent(i)] = h.info[parent(i)], h.info[i]
+		i = parent(i)
+	}
+}
+
+// Pop gets the latest time from the heap. Implements Algorithms HEAP-EXTRACT-MAX.
+// Returns the info with the latest time, and false if the heap is empty.
+func (h *Heap) Pop() (HeapPollInfo, bool) {
+	h.m.Lock()
+	defer h.m.Unlock()
+	if len(h.info) == 0 {
+		return HeapPollInfo{}, false
+	}
+	max := h.info[0]
+	h.info[0] = h.info[len(h.info)-1]
+	h.info = h.info[:len(h.info)-1]
+	h.heapify(0)
+	if max.Info.ID == "odol-atsec-jac-04" {
+		fmt.Printf("httpPoll %v Heap.Pop id %v next %v\n", h.PollerID, max.Info.ID, max.Next)
+	}
+	return max, true
+}
+
+// Pop gets the latest time from the heap. Implements Algorithms MAX-HEAP-INSERT.
+// TODO make threadsafe
+func (h *Heap) Push(key HeapPollInfo) {
+	h.m.Lock()
+	defer h.m.Unlock()
+	if key.Info.ID == "odol-atsec-jac-04" {
+		fmt.Printf("httpPoll %v Heap.Push id %v next %v\n", h.PollerID, key.Info.ID, key.Next)
+	}
+	h.info = append(h.info, HeapPollInfo{Next: time.Unix(1<<63-1, 0)})
+	h.increaseKey(len(h.info)-1, key)
+}
+
+// debug
+func (h *Heap) Sprint() string {
+	s := ""
+	for i, info := range h.info {
+		s += fmt.Sprintf("%v %v|", i, info.Next)
+	}
+	return s
+}

http://git-wip-us.apache.org/repos/asf/incubator-trafficcontrol/blob/9153d3e9/traffic_monitor/experimental/common/poller/poller.go
----------------------------------------------------------------------
diff --git a/traffic_monitor/experimental/common/poller/poller.go b/traffic_monitor/experimental/common/poller/poller.go
index e8e401a..7c86b57 100644
--- a/traffic_monitor/experimental/common/poller/poller.go
+++ b/traffic_monitor/experimental/common/poller/poller.go
@@ -20,6 +20,7 @@ package poller
  */
 
 import (
+	"fmt"
 	"io/ioutil"
 	"math/rand"
 	"net/http"
@@ -57,11 +58,20 @@ type PollConfig struct {
 type HttpPollerConfig struct {
 	Urls     map[string]PollConfig
 	Interval time.Duration
+	// noSleep indicates to use the InsomniacPoller. Note this is only used with the initial Poll call, which decides which Poller mechanism to use. After that, this is ignored when the HttpPollerConfig is passed over the ConfigChannel.
+	noSleep bool
 }
 
 // Creates and returns a new HttpPoller.
-// If tick is false, HttpPoller.TickChan() will return nil
-func NewHTTP(interval time.Duration, tick bool, httpClient *http.Client, counters fetcher.Counters, fetchHandler handler.Handler) HttpPoller {
+// If tick is false, HttpPoller.TickChan() will return nil. If noSleep is true, the poller will busywait instead of sleeping, and use a single goroutine which dispatches polls instead of a goroutine per poll.
+func NewHTTP(
+	interval time.Duration,
+	tick bool,
+	httpClient *http.Client,
+	counters fetcher.Counters,
+	fetchHandler handler.Handler,
+	noSleep bool,
+) HttpPoller {
 	var tickChan chan uint64
 	if tick {
 		tickChan = make(chan uint64)
@@ -71,6 +81,7 @@ func NewHTTP(interval time.Duration, tick bool, httpClient *http.Client, counter
 		ConfigChannel: make(chan HttpPollerConfig),
 		Config: HttpPollerConfig{
 			Interval: interval,
+			noSleep:  noSleep,
 		},
 		FetcherTemplate: fetcher.HttpFetcher{
 			Handler:  fetchHandler,
@@ -135,7 +146,25 @@ func (p MonitorConfigPoller) Poll() {
 
 var debugPollNum uint64
 
+type HTTPPollInfo struct {
+	Interval time.Duration
+	Timeout  time.Duration
+	ID       string
+	URL      string
+	Handler  handler.Handler
+}
+
 func (p HttpPoller) Poll() {
+	if p.Config.noSleep {
+		log.Infof("HttpPoller using InsomniacPoll\n")
+		p.InsomniacPoll()
+	} else {
+		log.Infof("HttpPoller using SleepPoll\n")
+		p.SleepPoll()
+	}
+}
+
+func (p HttpPoller) SleepPoll() {
 	// iterationCount := uint64(0)
 	// iterationCount++ // on tick<:
 	// case p.TickChan <- iterationCount:
@@ -157,68 +186,151 @@ func (p HttpPoller) Poll() {
 				fetcher.Client = &c // copy the client, so we don't change other fetchers.
 				fetcher.Client.Timeout = info.Timeout
 			}
-			go pollHttp(info.Interval, info.ID, info.URL, fetcher, kill)
+			go sleepPoller(info.Interval, info.ID, info.URL, fetcher, kill)
 		}
 		p.Config = newConfig
 	}
 }
 
-type HTTPPollInfo struct {
-	Interval time.Duration
-	Timeout  time.Duration
-	ID       string
-	URL      string
-	Handler  handler.Handler
+// TODO iterationCount and/or p.TickChan?
+func sleepPoller(interval time.Duration, id string, url string, fetcher fetcher.Fetcher, die <-chan struct{}) {
+	pollSpread := time.Duration(rand.Float64()*float64(interval/time.Nanosecond)) * time.Nanosecond
+	time.Sleep(pollSpread)
+	tick := time.NewTicker(interval)
+	lastTime := time.Now()
+	for {
+		select {
+		case now := <-tick.C:
+			tick.Stop()                     // old ticker MUST call Stop() to release resources. Else, memory leak.
+			tick = time.NewTicker(interval) // recreate timer, to avoid Go's "smoothing" nonsense
+			realInterval := now.Sub(lastTime)
+			if realInterval > interval+(time.Millisecond*100) {
+				instr.TimerFail.Inc()
+				log.Infof("Intended Duration: %v Actual Duration: %v\n", interval, realInterval)
+			}
+			lastTime = time.Now()
+
+			pollId := atomic.AddUint64(&debugPollNum, 1)
+			pollFinishedChan := make(chan uint64)
+			log.Debugf("poll %v %v start\n", pollId, time.Now())
+			go fetcher.Fetch(id, url, pollId, pollFinishedChan) // TODO persist fetcher, with its own die chan?
+			<-pollFinishedChan
+		case <-die:
+			return
+		}
+	}
 }
 
-// diffConfigs takes the old and new configs, and returns a list of deleted IDs, and a list of new polls to do
-func diffConfigs(old HttpPollerConfig, new HttpPollerConfig) ([]string, []HTTPPollInfo) {
-	deletions := []string{}
-	additions := []HTTPPollInfo{}
+// InsomniacPoll polls using a single thread, which never sleeps. This exists to work around a bug observed in OpenStack CentOS 6.5 kernel 2.6.32 wherin sleep gets progressively slower. This should be removed and Poll() changed to call SleepPoll() when the bug is tracked down and fixed for production.
+func (p HttpPoller) InsomniacPoll() {
+	// iterationCount := uint64(0)
+	// iterationCount++ // on tick<:
+	// case p.TickChan <- iterationCount:
+	killChan := make(chan struct{})
+	pollRunning := false // TODO find less awkward way to not kill the first loop
+	pollerId := rand.Int63()
+	for newCfg := range p.ConfigChannel {
+		// TODO add a more efficient function than diffConfigs for this func, since we only need to know whether anything changed
+		deletions, additions := diffConfigs(p.Config, newCfg)
+		if len(deletions) == 0 && len(additions) == 0 {
+			continue
+		}
 
-	if old.Interval != new.Interval {
-		for id, _ := range old.Urls {
-			deletions = append(deletions, id)
+		fmt.Printf("HttpPoller.InsomniacPoll got newCfg\n")
+		if pollRunning {
+			killChan <- struct{}{}
 		}
-		for id, pollCfg := range new.Urls {
-			additions = append(additions, HTTPPollInfo{
-				Interval: new.Interval,
+		pollRunning = true
+
+		polls := []HTTPPollInfo{}
+		fmt.Printf("HttpPoller.InsomniacPoll creating polls\n")
+		for id, pollCfg := range newCfg.Urls {
+			polls = append(polls, HTTPPollInfo{
+				Interval: newCfg.Interval,
 				ID:       id,
 				URL:      pollCfg.URL,
 				Timeout:  pollCfg.Timeout,
 			})
 		}
-		return deletions, additions
+		fmt.Printf("HttpPoller.InsomniacPoll created polls, going httpPoll\n")
+		go insomniacPoller(pollerId, polls, p.FetcherTemplate, killChan)
+		p.Config = newCfg
 	}
+}
 
-	for id, oldPollCfg := range old.Urls {
-		newPollCfg, newIdExists := new.Urls[id]
-		if !newIdExists {
-			deletions = append(deletions, id)
-		} else if newPollCfg != oldPollCfg {
-			deletions = append(deletions, id)
-			additions = append(additions, HTTPPollInfo{
-				Interval: new.Interval,
-				ID:       id,
-				URL:      newPollCfg.URL,
-				Timeout:  newPollCfg.Timeout,
-			})
+func insomniacPoller(pollerId int64, polls []HTTPPollInfo, fetcherTemplate fetcher.HttpFetcher, die <-chan struct{}) {
+	fmt.Printf("httpPoll %v called\n", pollerId)
+	heap := Heap{PollerID: pollerId}
+	start := time.Now()
+	fetchers := map[string]fetcher.Fetcher{}
+	fmt.Printf("httpPoll %v adding to heap\n", pollerId)
+	for _, p := range polls {
+		spread := time.Duration(rand.Float64()*float64(p.Interval/time.Nanosecond)) * time.Nanosecond
+		heap.Push(HeapPollInfo{Info: p, Next: start.Add(spread)})
+
+		fetcher := fetcherTemplate
+		if p.Timeout != 0 { // if the timeout isn't explicitly set, use the template value.
+			c := *fetcher.Client
+			fetcher.Client = &c // copy the client, so we don't change other fetchers.
+			fetcher.Client.Timeout = p.Timeout
 		}
+		fetchers[p.ID] = fetcher
+	}
+	fmt.Printf("httpPoll %v added to heap\n", pollerId)
+	mustDie := func() bool {
+		select {
+		case <-die:
+			return true
+		default:
+		}
+		return false
 	}
 
-	for id, newPollCfg := range new.Urls {
-		_, oldIdExists := old.Urls[id]
-		if !oldIdExists {
-			additions = append(additions, HTTPPollInfo{
-				Interval: new.Interval,
-				ID:       id,
-				URL:      newPollCfg.URL,
-				Timeout:  newPollCfg.Timeout,
-			})
+	timeMax := func(a time.Time, b time.Time) time.Time {
+		if a.After(b) {
+			return a
 		}
+		return b
 	}
 
-	return deletions, additions
+	poll := func(p HeapPollInfo) {
+		start := time.Now()
+		pollId := atomic.AddUint64(&debugPollNum, 1)
+		// TODO change pollFinishedChan to callback, for performance
+		pollFinishedChan := make(chan uint64)
+
+		go fetchers[p.Info.ID].Fetch(p.Info.ID, p.Info.URL, pollId, pollFinishedChan) // TODO persist fetcher, with its own die chan?
+		<-pollFinishedChan
+		now := time.Now()
+		p.Next = timeMax(start.Add(p.Info.Interval), now)
+		if p.Info.ID == "odol-atsec-jac-04" {
+			fmt.Printf("httpPoll %v heaping id %v next %v start %v interval %v now %v add %v\n", pollerId, p.Info.ID, p.Next, start, p.Info.Interval, now, start.Add(p.Info.Interval))
+		}
+		heap.Push(p)
+	}
+
+	fmt.Printf("httpPoll %v starting main loop\n", pollerId)
+	for {
+		if mustDie() {
+			fmt.Printf("httpPoll %v dying\n", pollerId)
+			return
+		}
+		p, ok := heap.Pop()
+		if !ok {
+			fmt.Printf("httpPoll %v empty heap, busylooping\n", pollerId)
+			continue //busywait because we fear to sleep. TODO sleep?
+		}
+		if p.Info.ID == "odol-atsec-jac-04" {
+			fmt.Printf("httpPoll %v popped id %v p.Next %v now %v\n", pollerId, p.Info.ID, p.Next, time.Now())
+		}
+		for p.Next.After(time.Now()) {
+			// busywait, because sleeping gets progressively slower for unknown reasons.
+		}
+		if p.Info.ID == "odol-atsec-jac-04" {
+			fmt.Printf("httpPoll %v polling %v next %v now %v\n", pollerId, p.Info.ID, p.Next, time.Now())
+		}
+		go poll(p)
+	}
 }
 
 func (p FilePoller) Poll() {
@@ -253,31 +365,52 @@ func (p FilePoller) Poll() {
 	}
 }
 
-// TODO iterationCount and/or p.TickChan?
-func pollHttp(interval time.Duration, id string, url string, fetcher fetcher.Fetcher, die <-chan struct{}) {
-	pollSpread := time.Duration(rand.Float64()*float64(interval/time.Nanosecond)) * time.Nanosecond
-	time.Sleep(pollSpread)
-	tick := time.NewTicker(interval)
-	lastTime := time.Now()
-	for {
-		select {
-		case now := <-tick.C:
-			tick.Stop()                     // old ticker MUST call Stop() to release resources. Else, memory leak.
-			tick = time.NewTicker(interval) // recreate timer, to avoid Go's "smoothing" nonsense
-			realInterval := now.Sub(lastTime)
-			if realInterval > interval+(time.Millisecond*100) {
-				instr.TimerFail.Inc()
-				log.Infof("Intended Duration: %v Actual Duration: %v\n", interval, realInterval)
-			}
-			lastTime = time.Now()
+// diffConfigs takes the old and new configs, and returns a list of deleted IDs, and a list of new polls to do
+func diffConfigs(old HttpPollerConfig, new HttpPollerConfig) ([]string, []HTTPPollInfo) {
+	deletions := []string{}
+	additions := []HTTPPollInfo{}
 
-			pollId := atomic.AddUint64(&debugPollNum, 1)
-			pollFinishedChan := make(chan uint64)
-			log.Debugf("poll %v %v start\n", pollId, time.Now())
-			go fetcher.Fetch(id, url, pollId, pollFinishedChan) // TODO persist fetcher, with its own die chan?
-			<-pollFinishedChan
-		case <-die:
-			return
+	if old.Interval != new.Interval {
+		for id, _ := range old.Urls {
+			deletions = append(deletions, id)
+		}
+		for id, pollCfg := range new.Urls {
+			additions = append(additions, HTTPPollInfo{
+				Interval: new.Interval,
+				ID:       id,
+				URL:      pollCfg.URL,
+				Timeout:  pollCfg.Timeout,
+			})
+		}
+		return deletions, additions
+	}
+
+	for id, oldPollCfg := range old.Urls {
+		newPollCfg, newIdExists := new.Urls[id]
+		if !newIdExists {
+			deletions = append(deletions, id)
+		} else if newPollCfg != oldPollCfg {
+			deletions = append(deletions, id)
+			additions = append(additions, HTTPPollInfo{
+				Interval: new.Interval,
+				ID:       id,
+				URL:      newPollCfg.URL,
+				Timeout:  newPollCfg.Timeout,
+			})
 		}
 	}
+
+	for id, newPollCfg := range new.Urls {
+		_, oldIdExists := old.Urls[id]
+		if !oldIdExists {
+			additions = append(additions, HTTPPollInfo{
+				Interval: new.Interval,
+				ID:       id,
+				URL:      newPollCfg.URL,
+				Timeout:  newPollCfg.Timeout,
+			})
+		}
+	}
+
+	return deletions, additions
 }

http://git-wip-us.apache.org/repos/asf/incubator-trafficcontrol/blob/9153d3e9/traffic_monitor/experimental/traffic_monitor/cache/data.go
----------------------------------------------------------------------
diff --git a/traffic_monitor/experimental/traffic_monitor/cache/data.go b/traffic_monitor/experimental/traffic_monitor/cache/data.go
new file mode 100644
index 0000000..6c02906
--- /dev/null
+++ b/traffic_monitor/experimental/traffic_monitor/cache/data.go
@@ -0,0 +1,75 @@
+package cache
+
+import (
+	"github.com/apache/incubator-trafficcontrol/traffic_monitor/experimental/traffic_monitor/enum"
+)
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+// CacheAvailableStatusReported is the status string returned by caches set to "reported" in Traffic Ops.
+// TODO put somewhere more generic
+const AvailableStatusReported = "REPORTED"
+
+// CacheAvailableStatus is the available status of the given cache. It includes a boolean available/unavailable flag, and a descriptive string.
+type AvailableStatus struct {
+	Available bool
+	Status    string
+}
+
+// CacheAvailableStatuses is the available status of each cache.
+type AvailableStatuses map[enum.CacheName]AvailableStatus
+
+// Copy copies this CacheAvailableStatuses. It does not modify, and thus is safe for multiple reader goroutines.
+func (a AvailableStatuses) Copy() AvailableStatuses {
+	b := AvailableStatuses(map[enum.CacheName]AvailableStatus{})
+	for k, v := range a {
+		b[k] = v
+	}
+	return b
+}
+
+// Event represents an event change in aggregated data. For example, a cache being marked as unavailable.
+type Event struct {
+	Index       uint64         `json:"index"`
+	Time        int64          `json:"time"`
+	Description string         `json:"description"`
+	Name        enum.CacheName `json:"name"`
+	Hostname    enum.CacheName `json:"hostname"`
+	Type        string         `json:"type"`
+	Available   bool           `json:"isAvailable"`
+}
+
+// ResultHistory is a map of cache names, to an array of result history from each cache.
+type ResultHistory map[enum.CacheName][]Result
+
+func copyResult(a []Result) []Result {
+	b := make([]Result, len(a), len(a))
+	copy(b, a)
+	return b
+}
+
+// Copy copies returns a deep copy of this ResultHistory
+func (a ResultHistory) Copy() ResultHistory {
+	b := ResultHistory{}
+	for k, v := range a {
+		b[k] = copyResult(v)
+	}
+	return b
+}

http://git-wip-us.apache.org/repos/asf/incubator-trafficcontrol/blob/9153d3e9/traffic_monitor/experimental/traffic_monitor/config/config.go
----------------------------------------------------------------------
diff --git a/traffic_monitor/experimental/traffic_monitor/config/config.go b/traffic_monitor/experimental/traffic_monitor/config/config.go
index ce24883..b7bb0e2 100644
--- a/traffic_monitor/experimental/traffic_monitor/config/config.go
+++ b/traffic_monitor/experimental/traffic_monitor/config/config.go
@@ -8,9 +8,9 @@ package config
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -19,7 +19,6 @@ package config
  * under the License.
  */
 
-
 import (
 	"encoding/json"
 	"io/ioutil"
@@ -57,6 +56,7 @@ type Config struct {
 	ServeReadTimeout             time.Duration `json:"-"`
 	ServeWriteTimeout            time.Duration `json:"-"`
 	HealthToStatRatio            uint64        `json:"health_to_stat_ratio"`
+	HTTPPollNoSleep              bool          `json:"http_poll_no_sleep"`
 }
 
 // DefaultConfig is the default configuration for the application, if no configuration file is given, or if a given config setting doesn't exist in the config file.
@@ -78,6 +78,7 @@ var DefaultConfig = Config{
 	ServeReadTimeout:             10 * time.Second,
 	ServeWriteTimeout:            10 * time.Second,
 	HealthToStatRatio:            4,
+	HTTPPollNoSleep:              false,
 }
 
 // MarshalJSON marshals custom millisecond durations. Aliasing inspired by http://choly.ca/post/go-json-marshalling/

http://git-wip-us.apache.org/repos/asf/incubator-trafficcontrol/blob/9153d3e9/traffic_monitor/experimental/traffic_monitor/manager/manager.go
----------------------------------------------------------------------
diff --git a/traffic_monitor/experimental/traffic_monitor/manager/manager.go b/traffic_monitor/experimental/traffic_monitor/manager/manager.go
index dcb9bee..ae22ad2 100644
--- a/traffic_monitor/experimental/traffic_monitor/manager/manager.go
+++ b/traffic_monitor/experimental/traffic_monitor/manager/manager.go
@@ -74,12 +74,12 @@ func Start(opsConfigFile string, cfg config.Config, staticAppData StaticAppData)
 	toData := todata.NewThreadsafe()
 
 	cacheHealthHandler := cache.NewHandler()
-	cacheHealthPoller := poller.NewHTTP(cfg.CacheHealthPollingInterval, true, sharedClient, counters, cacheHealthHandler)
+	cacheHealthPoller := poller.NewHTTP(cfg.CacheHealthPollingInterval, true, sharedClient, counters, cacheHealthHandler, cfg.HTTPPollNoSleep)
 	cacheStatHandler := cache.NewPrecomputeHandler(toData, peerStates) // TODO figure out if this is necessary, with the CacheHealthPoller
-	cacheStatPoller := poller.NewHTTP(cfg.CacheStatPollingInterval, false, sharedClient, counters, cacheStatHandler)
+	cacheStatPoller := poller.NewHTTP(cfg.CacheStatPollingInterval, false, sharedClient, counters, cacheStatHandler, cfg.HTTPPollNoSleep)
 	monitorConfigPoller := poller.NewMonitorConfig(cfg.MonitorConfigPollingInterval)
 	peerHandler := peer.NewHandler()
-	peerPoller := poller.NewHTTP(cfg.PeerPollingInterval, false, sharedClient, counters, peerHandler)
+	peerPoller := poller.NewHTTP(cfg.PeerPollingInterval, false, sharedClient, counters, peerHandler, cfg.HTTPPollNoSleep)
 
 	go monitorConfigPoller.Poll()
 	go cacheHealthPoller.Poll()

http://git-wip-us.apache.org/repos/asf/incubator-trafficcontrol/blob/9153d3e9/traffic_monitor/experimental/traffic_monitor/traffic_monitor-example-config.json
----------------------------------------------------------------------
diff --git a/traffic_monitor/experimental/traffic_monitor/traffic_monitor-example-config.json b/traffic_monitor/experimental/traffic_monitor/traffic_monitor-example-config.json
index d1b4613..a463682 100644
--- a/traffic_monitor/experimental/traffic_monitor/traffic_monitor-example-config.json
+++ b/traffic_monitor/experimental/traffic_monitor/traffic_monitor-example-config.json
@@ -14,5 +14,6 @@
 	"log_location_info": "null",
 	"log_location_debug": "null",
 	"serve_read_timeout_ms": 10000,
-	"serve_write_timeout_ms": 10000
+	"serve_write_timeout_ms": 10000,
+	"http_poll_no_sleep": false
 }

http://git-wip-us.apache.org/repos/asf/incubator-trafficcontrol/blob/9153d3e9/traffic_monitor/experimental/traffic_monitor/version.go
----------------------------------------------------------------------
diff --git a/traffic_monitor/experimental/traffic_monitor/version.go b/traffic_monitor/experimental/traffic_monitor/version.go
index 3f22073..c1ecc0c 100644
--- a/traffic_monitor/experimental/traffic_monitor/version.go
+++ b/traffic_monitor/experimental/traffic_monitor/version.go
@@ -8,9 +8,9 @@ package main
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -19,6 +19,5 @@ package main
  * under the License.
  */
 
-
 // Version is the current version of the app, in string form.
-var Version = "2.2"
+var Version = "2.0.1"


[08/20] incubator-trafficcontrol git commit: Fix TM2 health processing to compute DS data once

Posted by ne...@apache.org.
Fix TM2 health processing to compute DS data once


Project: http://git-wip-us.apache.org/repos/asf/incubator-trafficcontrol/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-trafficcontrol/commit/2f23f29c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-trafficcontrol/tree/2f23f29c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-trafficcontrol/diff/2f23f29c

Branch: refs/heads/master
Commit: 2f23f29c4e26012baae168aa223fb559244817a0
Parents: c1db6ad
Author: Robert Butts <ro...@gmail.com>
Authored: Fri Nov 25 15:03:05 2016 -0700
Committer: Dave Neuman <ne...@apache.org>
Committed: Thu Dec 8 11:44:33 2016 -0700

----------------------------------------------------------------------
 .../experimental/traffic_monitor/manager/healthresult.go         | 4 +---
 1 file changed, 1 insertion(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-trafficcontrol/blob/2f23f29c/traffic_monitor/experimental/traffic_monitor/manager/healthresult.go
----------------------------------------------------------------------
diff --git a/traffic_monitor/experimental/traffic_monitor/manager/healthresult.go b/traffic_monitor/experimental/traffic_monitor/manager/healthresult.go
index e27dbbf..cba6809 100644
--- a/traffic_monitor/experimental/traffic_monitor/manager/healthresult.go
+++ b/traffic_monitor/experimental/traffic_monitor/manager/healthresult.go
@@ -236,10 +236,8 @@ func processHealthResult(
 
 		localCacheStatus[healthResult.ID] = CacheAvailableStatus{Available: isAvailable, Status: monitorConfigCopy.TrafficServer[string(healthResult.ID)].Status} // TODO move within localStates?
 		localStates.SetCache(healthResult.ID, peer.IsAvailable{IsAvailable: isAvailable})
-		log.Debugf("poll %v %v calculateDeliveryServiceState start\n", healthResult.PollID, time.Now())
-		calculateDeliveryServiceState(toDataCopy.DeliveryServiceServers, localStates)
-		log.Debugf("poll %v %v calculateDeliveryServiceState end\n", healthResult.PollID, time.Now())
 	}
+	calculateDeliveryServiceState(toDataCopy.DeliveryServiceServers, localStates)
 	healthHistory.Set(healthHistoryCopy)
 	localCacheStatusThreadsafe.Set(localCacheStatus)
 	// TODO determine if we should combineCrStates() here


[04/20] incubator-trafficcontrol git commit: Fix TM2 unnecessary mutexed copies

Posted by ne...@apache.org.
Fix TM2 unnecessary mutexed copies


Project: http://git-wip-us.apache.org/repos/asf/incubator-trafficcontrol/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-trafficcontrol/commit/c80be36f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-trafficcontrol/tree/c80be36f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-trafficcontrol/diff/c80be36f

Branch: refs/heads/master
Commit: c80be36ff0d6563f5b447c76283beebffb9e2119
Parents: e41a745
Author: Robert Butts <ro...@gmail.com>
Authored: Mon Nov 28 14:30:13 2016 -0700
Committer: Dave Neuman <ne...@apache.org>
Committed: Thu Dec 8 11:44:33 2016 -0700

----------------------------------------------------------------------
 .../traffic_monitor/manager/healthresult.go          |  4 ++--
 .../traffic_monitor/manager/monitorconfig.go         | 11 +++++------
 .../experimental/traffic_monitor/peer/crstates.go    | 15 ++++++++-------
 3 files changed, 15 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-trafficcontrol/blob/c80be36f/traffic_monitor/experimental/traffic_monitor/manager/healthresult.go
----------------------------------------------------------------------
diff --git a/traffic_monitor/experimental/traffic_monitor/manager/healthresult.go b/traffic_monitor/experimental/traffic_monitor/manager/healthresult.go
index 67f844a..b8c46f7 100644
--- a/traffic_monitor/experimental/traffic_monitor/manager/healthresult.go
+++ b/traffic_monitor/experimental/traffic_monitor/manager/healthresult.go
@@ -229,7 +229,7 @@ func processHealthResult(
 		healthHistoryCopy[healthResult.ID] = pruneHistory(append([]cache.Result{healthResult}, healthHistoryCopy[healthResult.ID]...), maxHistory)
 
 		isAvailable, whyAvailable := health.EvalCache(healthResult, &monitorConfigCopy)
-		if localStates.Get().Caches[healthResult.ID].IsAvailable != isAvailable {
+		if available, ok := localStates.GetCache(healthResult.ID); !ok || available.IsAvailable != isAvailable {
 			log.Infof("Changing state for %s was: %t now: %t because %s error: %v", healthResult.ID, prevResult.Available, isAvailable, whyAvailable, healthResult.Error)
 			events.Add(Event{Time: time.Now().Unix(), Description: whyAvailable, Name: healthResult.ID, Hostname: healthResult.ID, Type: toDataCopy.ServerTypes[healthResult.ID].String(), Available: isAvailable})
 		}
@@ -267,7 +267,7 @@ func calculateDeliveryServiceState(deliveryServiceServers map[enum.DeliveryServi
 		deliveryServiceState.IsAvailable = false
 		deliveryServiceState.DisabledLocations = []enum.CacheName{} // it's important this isn't nil, so it serialises to the JSON `[]` instead of `null`
 		for _, server := range deliveryServiceServers[deliveryServiceName] {
-			if states.GetCache(server).IsAvailable {
+			if available, _ := states.GetCache(server); available.IsAvailable {
 				deliveryServiceState.IsAvailable = true
 			} else {
 				deliveryServiceState.DisabledLocations = append(deliveryServiceState.DisabledLocations, server)

http://git-wip-us.apache.org/repos/asf/incubator-trafficcontrol/blob/c80be36f/traffic_monitor/experimental/traffic_monitor/manager/monitorconfig.go
----------------------------------------------------------------------
diff --git a/traffic_monitor/experimental/traffic_monitor/manager/monitorconfig.go b/traffic_monitor/experimental/traffic_monitor/manager/monitorconfig.go
index 5c343ae..5307382 100644
--- a/traffic_monitor/experimental/traffic_monitor/manager/monitorconfig.go
+++ b/traffic_monitor/experimental/traffic_monitor/manager/monitorconfig.go
@@ -8,9 +8,9 @@ package manager
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -19,7 +19,6 @@ package manager
  * under the License.
  */
 
-
 import (
 	"fmt"
 	"strings"
@@ -219,7 +218,7 @@ func monitorConfigListen(
 				continue
 			}
 			// seed states with available = false until our polling cycle picks up a result
-			if _, exists := localStates.Get().Caches[cacheName]; !exists {
+			if _, exists := localStates.GetCache(cacheName); !exists {
 				localStates.SetCache(cacheName, peer.IsAvailable{IsAvailable: false})
 			}
 
@@ -267,11 +266,11 @@ func monitorConfigListen(
 		// TODO because there are multiple writers to localStates.DeliveryService, there is a race condition, where MonitorConfig (this func) and HealthResultManager could write at the same time, and the HealthResultManager could overwrite a delivery service addition or deletion here. Probably the simplest and most performant fix would be a lock-free algorithm using atomic compare-and-swaps.
 		for _, ds := range monitorConfig.DeliveryService {
 			// since caches default to unavailable, also default DS false
-			if _, exists := localStates.Get().Deliveryservice[enum.DeliveryServiceName(ds.XMLID)]; !exists {
+			if _, exists := localStates.GetDeliveryService(enum.DeliveryServiceName(ds.XMLID)); !exists {
 				localStates.SetDeliveryService(enum.DeliveryServiceName(ds.XMLID), peer.Deliveryservice{IsAvailable: false, DisabledLocations: []enum.CacheName{}}) // important to initialize DisabledLocations, so JSON is `[]` not `null`
 			}
 		}
-		for ds := range localStates.Get().Deliveryservice {
+		for ds := range localStates.GetDeliveryServices() {
 			if _, exists := monitorConfig.DeliveryService[string(ds)]; !exists {
 				localStates.DeleteDeliveryService(ds)
 			}

http://git-wip-us.apache.org/repos/asf/incubator-trafficcontrol/blob/c80be36f/traffic_monitor/experimental/traffic_monitor/peer/crstates.go
----------------------------------------------------------------------
diff --git a/traffic_monitor/experimental/traffic_monitor/peer/crstates.go b/traffic_monitor/experimental/traffic_monitor/peer/crstates.go
index 65b7eb3..ef5176a 100644
--- a/traffic_monitor/experimental/traffic_monitor/peer/crstates.go
+++ b/traffic_monitor/experimental/traffic_monitor/peer/crstates.go
@@ -109,7 +109,6 @@ func NewCRStatesThreadsafe() CRStatesThreadsafe {
 }
 
 // Get returns the internal Crstates object for reading.
-// TODO add GetCaches, GetDeliveryservices?
 func (t *CRStatesThreadsafe) Get() Crstates {
 	t.m.RLock()
 	defer t.m.RUnlock()
@@ -125,10 +124,11 @@ func (t *CRStatesThreadsafe) GetDeliveryServices() map[enum.DeliveryServiceName]
 }
 
 // GetCache returns the availability data of the given cache. This does not mutate, and is thus safe for multiple goroutines to call.
-func (t *CRStatesThreadsafe) GetCache(name enum.CacheName) IsAvailable {
+func (t *CRStatesThreadsafe) GetCache(name enum.CacheName) (available IsAvailable, ok bool) {
 	t.m.RLock()
-	defer t.m.RUnlock()
-	return t.crStates.Caches[name]
+	available, ok = t.crStates.Caches[name]
+	t.m.RUnlock()
+	return
 }
 
 // GetCaches returns the availability data of all caches. This does not mutate, and is thus safe for multiple goroutines to call.
@@ -139,10 +139,11 @@ func (t *CRStatesThreadsafe) GetCaches() map[enum.CacheName]IsAvailable {
 }
 
 // GetDeliveryService returns the availability data of the given delivery service. This does not mutate, and is thus safe for multiple goroutines to call.
-func (t *CRStatesThreadsafe) GetDeliveryService(name enum.DeliveryServiceName) Deliveryservice {
+func (t *CRStatesThreadsafe) GetDeliveryService(name enum.DeliveryServiceName) (ds Deliveryservice, ok bool) {
 	t.m.RLock()
-	defer t.m.RUnlock()
-	return t.crStates.Deliveryservice[name]
+	ds, ok = t.crStates.Deliveryservice[name]
+	t.m.RUnlock()
+	return
 }
 
 // SetCache sets the internal availability data for a particular cache.


[18/20] incubator-trafficcontrol git commit: Add TM2 cache.TestResultHistoryCopy

Posted by ne...@apache.org.
Add TM2 cache.TestResultHistoryCopy


Project: http://git-wip-us.apache.org/repos/asf/incubator-trafficcontrol/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-trafficcontrol/commit/afa1a4cd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-trafficcontrol/tree/afa1a4cd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-trafficcontrol/diff/afa1a4cd

Branch: refs/heads/master
Commit: afa1a4cd39fceee983e40e4b1f54ad9958c4d739
Parents: d08c9ba
Author: Robert Butts <ro...@gmail.com>
Authored: Wed Dec 7 16:12:44 2016 -0700
Committer: Dave Neuman <ne...@apache.org>
Committed: Thu Dec 8 11:44:33 2016 -0700

----------------------------------------------------------------------
 .../traffic_monitor/cache/data_test.go          | 191 ++++++++++++++++++-
 1 file changed, 189 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-trafficcontrol/blob/afa1a4cd/traffic_monitor/experimental/traffic_monitor/cache/data_test.go
----------------------------------------------------------------------
diff --git a/traffic_monitor/experimental/traffic_monitor/cache/data_test.go b/traffic_monitor/experimental/traffic_monitor/cache/data_test.go
index 3788e7c..27d8171 100644
--- a/traffic_monitor/experimental/traffic_monitor/cache/data_test.go
+++ b/traffic_monitor/experimental/traffic_monitor/cache/data_test.go
@@ -1,11 +1,14 @@
 package cache
 
 import (
+	"errors"
+	"fmt"
+	dsdata "github.com/apache/incubator-trafficcontrol/traffic_monitor/experimental/traffic_monitor/deliveryservicedata"
+	"github.com/apache/incubator-trafficcontrol/traffic_monitor/experimental/traffic_monitor/enum"
 	"math/rand"
 	"reflect"
 	"testing"
-
-	"github.com/apache/incubator-trafficcontrol/traffic_monitor/experimental/traffic_monitor/enum"
+	"time"
 )
 
 func randBool() bool {
@@ -48,3 +51,187 @@ func TestAvailableStatusesCopy(t *testing.T) {
 		}
 	}
 }
+
+func randStrIfaceMap() map[string]interface{} {
+	m := map[string]interface{}{}
+	num := 5
+	for i := 0; i < num; i++ {
+		m[randStr()] = randStr()
+	}
+	return m
+}
+
+func randAstats() Astats {
+	return Astats{
+		Ats:    randStrIfaceMap(),
+		System: randAstatsSystem(),
+	}
+}
+
+func randAstatsSystem() AstatsSystem {
+	return AstatsSystem{
+		InfName:           randStr(),
+		InfSpeed:          rand.Int(),
+		ProcNetDev:        randStr(),
+		ProcLoadavg:       randStr(),
+		ConfigLoadRequest: rand.Int(),
+		LastReloadRequest: rand.Int(),
+		ConfigReloads:     rand.Int(),
+		LastReload:        rand.Int(),
+		AstatsLoad:        rand.Int(),
+	}
+}
+
+func randVitals() Vitals {
+	return Vitals{
+		LoadAvg:    rand.Float64(),
+		BytesOut:   rand.Int63(),
+		BytesIn:    rand.Int63(),
+		KbpsOut:    rand.Int63(),
+		MaxKbpsOut: rand.Int63(),
+	}
+}
+
+func randStatMeta() dsdata.StatMeta {
+	return dsdata.StatMeta{Time: rand.Int63()}
+}
+
+func randStatCacheStats() dsdata.StatCacheStats {
+	return dsdata.StatCacheStats{
+		OutBytes:    dsdata.StatInt{Value: rand.Int63(), StatMeta: randStatMeta()},
+		IsAvailable: dsdata.StatBool{Value: randBool(), StatMeta: randStatMeta()},
+		Status5xx:   dsdata.StatInt{Value: rand.Int63(), StatMeta: randStatMeta()},
+		Status4xx:   dsdata.StatInt{Value: rand.Int63(), StatMeta: randStatMeta()},
+		Status3xx:   dsdata.StatInt{Value: rand.Int63(), StatMeta: randStatMeta()},
+		Status2xx:   dsdata.StatInt{Value: rand.Int63(), StatMeta: randStatMeta()},
+		InBytes:     dsdata.StatFloat{Value: rand.Float64(), StatMeta: randStatMeta()},
+		Kbps:        dsdata.StatFloat{Value: rand.Float64(), StatMeta: randStatMeta()},
+		Tps5xx:      dsdata.StatFloat{Value: rand.Float64(), StatMeta: randStatMeta()},
+		Tps4xx:      dsdata.StatFloat{Value: rand.Float64(), StatMeta: randStatMeta()},
+		Tps3xx:      dsdata.StatFloat{Value: rand.Float64(), StatMeta: randStatMeta()},
+		Tps2xx:      dsdata.StatFloat{Value: rand.Float64(), StatMeta: randStatMeta()},
+		ErrorString: dsdata.StatString{Value: randStr(), StatMeta: randStatMeta()},
+		TpsTotal:    dsdata.StatInt{Value: rand.Int63(), StatMeta: randStatMeta()},
+	}
+}
+
+func randStatCommon() dsdata.StatCommon {
+	cachesReporting := map[enum.CacheName]bool{}
+	num := 5
+	for i := 0; i < num; i++ {
+		cachesReporting[enum.CacheName(randStr())] = randBool()
+	}
+	return dsdata.StatCommon{
+		CachesConfiguredNum: dsdata.StatInt{Value: rand.Int63(), StatMeta: randStatMeta()},
+		CachesReporting:     cachesReporting,
+		ErrorStr:            dsdata.StatString{Value: randStr(), StatMeta: randStatMeta()},
+		StatusStr:           dsdata.StatString{Value: randStr(), StatMeta: randStatMeta()},
+		IsHealthy:           dsdata.StatBool{Value: randBool(), StatMeta: randStatMeta()},
+		IsAvailable:         dsdata.StatBool{Value: randBool(), StatMeta: randStatMeta()},
+		CachesAvailableNum:  dsdata.StatInt{Value: rand.Int63(), StatMeta: randStatMeta()},
+	}
+}
+
+func randDsStat() dsdata.Stat {
+	num := 5
+	cacheGroups := map[enum.CacheGroupName]dsdata.StatCacheStats{}
+	types := map[enum.CacheType]dsdata.StatCacheStats{}
+	caches := map[enum.CacheName]dsdata.StatCacheStats{}
+	cachesTime := map[enum.CacheName]time.Time{}
+	for i := 0; i < num; i++ {
+		cacheGroups[enum.CacheGroupName(randStr())] = randStatCacheStats()
+		types[enum.CacheType(randStr())] = randStatCacheStats()
+		cachesTime[enum.CacheName(randStr())] = time.Now()
+	}
+
+	return dsdata.Stat{
+		CommonStats:        randStatCommon(),
+		CacheGroups:        cacheGroups,
+		Types:              types,
+		Caches:             caches,
+		CachesTimeReceived: cachesTime,
+		TotalStats:         randStatCacheStats(),
+	}
+}
+
+func randDsStats() map[enum.DeliveryServiceName]dsdata.Stat {
+	num := 5
+	a := map[enum.DeliveryServiceName]dsdata.Stat{}
+	for i := 0; i < num; i++ {
+		a[enum.DeliveryServiceName(randStr())] = randDsStat()
+	}
+	return a
+}
+func randErrs() []error {
+	if randBool() {
+		return []error{}
+	}
+	num := 5
+	errs := []error{}
+	for i := 0; i < num; i++ {
+		errs = append(errs, errors.New(randStr()))
+	}
+	return errs
+}
+
+func randPrecomputedData() PrecomputedData {
+	return PrecomputedData{
+		DeliveryServiceStats: randDsStats(),
+		OutBytes:             rand.Int63(),
+		MaxKbps:              rand.Int63(),
+		Errors:               randErrs(),
+		Reporting:            randBool(),
+	}
+}
+
+func randResult() Result {
+	return Result{
+		ID:              enum.CacheName(randStr()),
+		Error:           fmt.Errorf(randStr()),
+		Astats:          randAstats(),
+		Time:            time.Now(),
+		RequestTime:     time.Millisecond * time.Duration(rand.Int()),
+		Vitals:          randVitals(),
+		PollID:          uint64(rand.Int63()),
+		PollFinished:    make(chan uint64),
+		PrecomputedData: randPrecomputedData(),
+		Available:       randBool(),
+	}
+
+}
+
+func randResultSlice() []Result {
+	a := []Result{}
+	num := 5
+	for i := 0; i < num; i++ {
+		a = append(a, randResult())
+	}
+	return a
+}
+
+func randResultHistory() ResultHistory {
+	a := ResultHistory{}
+	num := 5
+	for i := 0; i < num; i++ {
+		a[enum.CacheName(randStr())] = randResultSlice()
+	}
+	return a
+}
+
+func TestResultHistoryCopy(t *testing.T) {
+	num := 5
+	for i := 0; i < num; i++ {
+		a := randResultHistory()
+		b := a.Copy()
+
+		if !reflect.DeepEqual(a, b) {
+			t.Errorf("expected a and b DeepEqual, actual copied map not equal", a, b)
+		}
+
+		// verify a and b don't point to the same map
+		a[enum.CacheName(randStr())] = randResultSlice()
+		if reflect.DeepEqual(a, b) {
+			t.Errorf("expected a != b, actual a and b point to the same map", a)
+		}
+	}
+}


[09/20] incubator-trafficcontrol git commit: Remove TM2 commented code to exclude mid kbps sum

Posted by ne...@apache.org.
Remove TM2 commented code to exclude mid kbps sum


Project: http://git-wip-us.apache.org/repos/asf/incubator-trafficcontrol/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-trafficcontrol/commit/d661da21
Tree: http://git-wip-us.apache.org/repos/asf/incubator-trafficcontrol/tree/d661da21
Diff: http://git-wip-us.apache.org/repos/asf/incubator-trafficcontrol/diff/d661da21

Branch: refs/heads/master
Commit: d661da21e55e815f822af4152da81755b58ac87f
Parents: c80be36
Author: Robert Butts <ro...@gmail.com>
Authored: Mon Nov 28 14:31:12 2016 -0700
Committer: Dave Neuman <ne...@apache.org>
Committed: Thu Dec 8 11:44:33 2016 -0700

----------------------------------------------------------------------
 .../experimental/traffic_monitor/manager/datarequest.go          | 4 ----
 1 file changed, 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-trafficcontrol/blob/d661da21/traffic_monitor/experimental/traffic_monitor/manager/datarequest.go
----------------------------------------------------------------------
diff --git a/traffic_monitor/experimental/traffic_monitor/manager/datarequest.go b/traffic_monitor/experimental/traffic_monitor/manager/datarequest.go
index a329057..346814e 100644
--- a/traffic_monitor/experimental/traffic_monitor/manager/datarequest.go
+++ b/traffic_monitor/experimental/traffic_monitor/manager/datarequest.go
@@ -619,13 +619,9 @@ func srvAPICacheStates(toData todata.TODataThreadsafe, statHistory ResultHistory
 }
 
 func srvAPIBandwidthKbps(toData todata.TODataThreadsafe, lastStats LastStatsThreadsafe) []byte {
-	// serverTypes := toData.Get().ServerTypes
 	kbpsStats := lastStats.Get()
 	sum := float64(0.0)
 	for _, data := range kbpsStats.Caches {
-		// if serverTypes[cache] != enum.CacheTypeEdge {
-		// 	continue
-		// }
 		sum += data.Bytes.PerSec / ds.BytesPerKilobit
 	}
 	return []byte(fmt.Sprintf("%f", sum))


[13/20] incubator-trafficcontrol git commit: Fix TM2 poller to use nanosleep, not busywait

Posted by ne...@apache.org.
Fix TM2 poller to use nanosleep, not busywait

Fixes Traffic Monitor 2.0 to use nanosleep instead of busywaiting
on Linux builds for polling (recalling we can't Sleep() because it
gets progressively slower, in what appears a Go runtime bug).

Changes GOMAXPROCS to 3x the number of cores, to steal from other
processes, plus 3 for the 3 thread-locked pollers.


Project: http://git-wip-us.apache.org/repos/asf/incubator-trafficcontrol/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-trafficcontrol/commit/c3ab47dc
Tree: http://git-wip-us.apache.org/repos/asf/incubator-trafficcontrol/tree/c3ab47dc
Diff: http://git-wip-us.apache.org/repos/asf/incubator-trafficcontrol/diff/c3ab47dc

Branch: refs/heads/master
Commit: c3ab47dccf41fd04b7638dd0eacad3c7ed8cdd99
Parents: 9153d3e
Author: Robert Butts <ro...@gmail.com>
Authored: Wed Dec 7 13:51:03 2016 -0700
Committer: Dave Neuman <ne...@apache.org>
Committed: Thu Dec 8 11:44:33 2016 -0700

----------------------------------------------------------------------
 .../experimental/common/poller/poller.go        | 37 ++++++++++----------
 .../experimental/common/poller/threadsleep.go   | 17 +++++++++
 .../common/poller/threadsleep_linux.go          | 23 ++++++++++++
 .../traffic_monitor/traffic_monitor.go          | 10 +++---
 4 files changed, 65 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-trafficcontrol/blob/c3ab47dc/traffic_monitor/experimental/common/poller/poller.go
----------------------------------------------------------------------
diff --git a/traffic_monitor/experimental/common/poller/poller.go b/traffic_monitor/experimental/common/poller/poller.go
index 7c86b57..68d15ce 100644
--- a/traffic_monitor/experimental/common/poller/poller.go
+++ b/traffic_monitor/experimental/common/poller/poller.go
@@ -25,6 +25,7 @@ import (
 	"math/rand"
 	"net/http"
 	"os"
+	"runtime"
 	"sync/atomic"
 	"time"
 
@@ -192,6 +193,15 @@ func (p HttpPoller) SleepPoll() {
 	}
 }
 
+func mustDie(die <-chan struct{}) bool {
+	select {
+	case <-die:
+		return true
+	default:
+	}
+	return false
+}
+
 // TODO iterationCount and/or p.TickChan?
 func sleepPoller(interval time.Duration, id string, url string, fetcher fetcher.Fetcher, die <-chan struct{}) {
 	pollSpread := time.Duration(rand.Float64()*float64(interval/time.Nanosecond)) * time.Nanosecond
@@ -200,10 +210,8 @@ func sleepPoller(interval time.Duration, id string, url string, fetcher fetcher.
 	lastTime := time.Now()
 	for {
 		select {
-		case now := <-tick.C:
-			tick.Stop()                     // old ticker MUST call Stop() to release resources. Else, memory leak.
-			tick = time.NewTicker(interval) // recreate timer, to avoid Go's "smoothing" nonsense
-			realInterval := now.Sub(lastTime)
+		case <-tick.C:
+			realInterval := time.Now().Sub(lastTime)
 			if realInterval > interval+(time.Millisecond*100) {
 				instr.TimerFail.Inc()
 				log.Infof("Intended Duration: %v Actual Duration: %v\n", interval, realInterval)
@@ -259,6 +267,7 @@ func (p HttpPoller) InsomniacPoll() {
 }
 
 func insomniacPoller(pollerId int64, polls []HTTPPollInfo, fetcherTemplate fetcher.HttpFetcher, die <-chan struct{}) {
+	runtime.LockOSThread()
 	fmt.Printf("httpPoll %v called\n", pollerId)
 	heap := Heap{PollerID: pollerId}
 	start := time.Now()
@@ -277,14 +286,6 @@ func insomniacPoller(pollerId int64, polls []HTTPPollInfo, fetcherTemplate fetch
 		fetchers[p.ID] = fetcher
 	}
 	fmt.Printf("httpPoll %v added to heap\n", pollerId)
-	mustDie := func() bool {
-		select {
-		case <-die:
-			return true
-		default:
-		}
-		return false
-	}
 
 	timeMax := func(a time.Time, b time.Time) time.Time {
 		if a.After(b) {
@@ -311,21 +312,21 @@ func insomniacPoller(pollerId int64, polls []HTTPPollInfo, fetcherTemplate fetch
 
 	fmt.Printf("httpPoll %v starting main loop\n", pollerId)
 	for {
-		if mustDie() {
+		if mustDie(die) {
 			fmt.Printf("httpPoll %v dying\n", pollerId)
 			return
 		}
 		p, ok := heap.Pop()
 		if !ok {
-			fmt.Printf("httpPoll %v empty heap, busylooping\n", pollerId)
-			continue //busywait because we fear to sleep. TODO sleep?
+			ThreadSleep(0)
+			continue
 		}
 		if p.Info.ID == "odol-atsec-jac-04" {
 			fmt.Printf("httpPoll %v popped id %v p.Next %v now %v\n", pollerId, p.Info.ID, p.Next, time.Now())
 		}
-		for p.Next.After(time.Now()) {
-			// busywait, because sleeping gets progressively slower for unknown reasons.
-		}
+
+		ThreadSleep(p.Next.Sub(time.Now()))
+
 		if p.Info.ID == "odol-atsec-jac-04" {
 			fmt.Printf("httpPoll %v polling %v next %v now %v\n", pollerId, p.Info.ID, p.Next, time.Now())
 		}

http://git-wip-us.apache.org/repos/asf/incubator-trafficcontrol/blob/c3ab47dc/traffic_monitor/experimental/common/poller/threadsleep.go
----------------------------------------------------------------------
diff --git a/traffic_monitor/experimental/common/poller/threadsleep.go b/traffic_monitor/experimental/common/poller/threadsleep.go
new file mode 100644
index 0000000..b8b1233
--- /dev/null
+++ b/traffic_monitor/experimental/common/poller/threadsleep.go
@@ -0,0 +1,17 @@
+// +build !linux
+
+package poller
+
+import (
+	"runtime"
+	"time"
+)
+
+// ThreadSleep actually busywaits for the given duration. This is becuase Go doesn't have Mac and Windows nanosleep syscalls, and `Sleep` sleeps for progressively longer than requested.
+func ThreadSleep(d time.Duration) {
+	// TODO fix to not busywait on Mac, Windows. We can't simply Sleep, because Sleep gets progressively slower as the app runs, due to a Go runtime issue. If this is changed, you MUST verify the poll doesn't get slower after the app runs for several days.
+	end := time.Now().Add(d)
+	for end.After(time.Now()) {
+		runtime.Gosched()
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-trafficcontrol/blob/c3ab47dc/traffic_monitor/experimental/common/poller/threadsleep_linux.go
----------------------------------------------------------------------
diff --git a/traffic_monitor/experimental/common/poller/threadsleep_linux.go b/traffic_monitor/experimental/common/poller/threadsleep_linux.go
new file mode 100644
index 0000000..9037027
--- /dev/null
+++ b/traffic_monitor/experimental/common/poller/threadsleep_linux.go
@@ -0,0 +1,23 @@
+// +build linux
+
+package poller
+
+import (
+	"errors"
+	"golang.org/x/sys/unix"
+	"time"
+)
+
+// ThreadSleep sleeps using the POSIX syscall `nanosleep`. Note this does not sleep the goroutine, but the operating system thread itself. This should only be called by a goroutine which has previously called `LockOSThread`. This exists due to a bug with `time.Sleep` getting progressively slower as the app runs, and should be removed if the bug in Go is fixed.
+func ThreadSleep(d time.Duration) {
+	if d < 0 {
+		d = 0
+	}
+	t := unix.Timespec{}
+	leftover := unix.NsecToTimespec(d.Nanoseconds())
+	err := errors.New("")
+	for err != nil && (leftover.Sec != 0 || leftover.Nsec != 0) {
+		t = leftover
+		err = unix.Nanosleep(&t, &leftover)
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-trafficcontrol/blob/c3ab47dc/traffic_monitor/experimental/traffic_monitor/traffic_monitor.go
----------------------------------------------------------------------
diff --git a/traffic_monitor/experimental/traffic_monitor/traffic_monitor.go b/traffic_monitor/experimental/traffic_monitor/traffic_monitor.go
index 1b726d2..f2b5444 100644
--- a/traffic_monitor/experimental/traffic_monitor/traffic_monitor.go
+++ b/traffic_monitor/experimental/traffic_monitor/traffic_monitor.go
@@ -8,9 +8,9 @@ package main
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -19,7 +19,6 @@ package main
  * under the License.
  */
 
-
 import (
 	"bytes"
 	"flag"
@@ -116,8 +115,11 @@ func getLogWriters(errLoc, warnLoc, infoLoc, debugLoc string) (io.Writer, io.Wri
 	return errW, warnW, infoW, debugW, nil
 }
 
+// NumLockedThreads is the number of threads which will be locked to a single goroutine. Ideally this would be computed, but there's no easy way to do that, and it isn't critical. This only protects against the case where a user has a single core, or manually changes GOMAXPROCS to a small number.
+const NumLockedThreads = 3
+
 func main() {
-	runtime.GOMAXPROCS(runtime.NumCPU())
+	runtime.GOMAXPROCS(runtime.NumCPU()*3 + NumLockedThreads)
 
 	staticData, err := getStaticAppData()
 	if err != nil {


[19/20] incubator-trafficcontrol git commit: Add TM2 Apache license headers

Posted by ne...@apache.org.
Add TM2 Apache license headers


Project: http://git-wip-us.apache.org/repos/asf/incubator-trafficcontrol/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-trafficcontrol/commit/e279d16c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-trafficcontrol/tree/e279d16c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-trafficcontrol/diff/e279d16c

Branch: refs/heads/master
Commit: e279d16ce8335fd5f9df740601e0ed36c47c332e
Parents: 0c6d88e
Author: Robert Butts <ro...@gmail.com>
Authored: Thu Dec 8 09:53:50 2016 -0700
Committer: Dave Neuman <ne...@apache.org>
Committed: Thu Dec 8 11:44:33 2016 -0700

----------------------------------------------------------------------
 .../traffic_monitor/cache/data_test.go           | 19 +++++++++++++++++++
 1 file changed, 19 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-trafficcontrol/blob/e279d16c/traffic_monitor/experimental/traffic_monitor/cache/data_test.go
----------------------------------------------------------------------
diff --git a/traffic_monitor/experimental/traffic_monitor/cache/data_test.go b/traffic_monitor/experimental/traffic_monitor/cache/data_test.go
index 27d8171..72ee851 100644
--- a/traffic_monitor/experimental/traffic_monitor/cache/data_test.go
+++ b/traffic_monitor/experimental/traffic_monitor/cache/data_test.go
@@ -1,5 +1,24 @@
 package cache
 
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
 import (
 	"errors"
 	"fmt"


[15/20] incubator-trafficcontrol git commit: Add TM2 cache.TestAvailableStatusesCopy

Posted by ne...@apache.org.
Add TM2 cache.TestAvailableStatusesCopy


Project: http://git-wip-us.apache.org/repos/asf/incubator-trafficcontrol/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-trafficcontrol/commit/d08c9bad
Tree: http://git-wip-us.apache.org/repos/asf/incubator-trafficcontrol/tree/d08c9bad
Diff: http://git-wip-us.apache.org/repos/asf/incubator-trafficcontrol/diff/d08c9bad

Branch: refs/heads/master
Commit: d08c9bad652b6fc654271b9d8ea4ce1d984b5864
Parents: 34c0b4f
Author: Robert Butts <ro...@gmail.com>
Authored: Wed Dec 7 15:19:44 2016 -0700
Committer: Dave Neuman <ne...@apache.org>
Committed: Thu Dec 8 11:44:33 2016 -0700

----------------------------------------------------------------------
 .../traffic_monitor/cache/data_test.go          | 50 ++++++++++++++++++++
 1 file changed, 50 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-trafficcontrol/blob/d08c9bad/traffic_monitor/experimental/traffic_monitor/cache/data_test.go
----------------------------------------------------------------------
diff --git a/traffic_monitor/experimental/traffic_monitor/cache/data_test.go b/traffic_monitor/experimental/traffic_monitor/cache/data_test.go
new file mode 100644
index 0000000..3788e7c
--- /dev/null
+++ b/traffic_monitor/experimental/traffic_monitor/cache/data_test.go
@@ -0,0 +1,50 @@
+package cache
+
+import (
+	"math/rand"
+	"reflect"
+	"testing"
+
+	"github.com/apache/incubator-trafficcontrol/traffic_monitor/experimental/traffic_monitor/enum"
+)
+
+func randBool() bool {
+	return rand.Int()%2 == 0
+}
+
+func randStr() string {
+	chars := "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ1234567890-_"
+	num := 100
+	s := ""
+	for i := 0; i < num; i++ {
+		s += string(chars[rand.Intn(len(chars))])
+	}
+	return s
+}
+
+func randAvailableStatuses() AvailableStatuses {
+	a := AvailableStatuses{}
+	num := 100
+	for i := 0; i < num; i++ {
+		a[enum.CacheName(randStr())] = AvailableStatus{Available: randBool(), Status: randStr()}
+	}
+	return a
+}
+
+func TestAvailableStatusesCopy(t *testing.T) {
+	num := 100
+	for i := 0; i < num; i++ {
+		a := randAvailableStatuses()
+		b := a.Copy()
+
+		if !reflect.DeepEqual(a, b) {
+			t.Errorf("expected a and b DeepEqual, actual copied map not equal", a, b)
+		}
+
+		// verify a and b don't point to the same map
+		a[enum.CacheName(randStr())] = AvailableStatus{Available: randBool(), Status: randStr()}
+		if reflect.DeepEqual(a, b) {
+			t.Errorf("expected a != b, actual a and b point to the same map", a)
+		}
+	}
+}


[20/20] incubator-trafficcontrol git commit: This closes #99

Posted by ne...@apache.org.
This closes #99


Project: http://git-wip-us.apache.org/repos/asf/incubator-trafficcontrol/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-trafficcontrol/commit/0a17ffe0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-trafficcontrol/tree/0a17ffe0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-trafficcontrol/diff/0a17ffe0

Branch: refs/heads/master
Commit: 0a17ffe06a56b6d16fc584f311c212ee49f09067
Parents: 11d6ffb
Author: Dave Neuman <ne...@apache.org>
Authored: Thu Dec 8 11:45:49 2016 -0700
Committer: Dave Neuman <ne...@apache.org>
Committed: Thu Dec 8 11:45:49 2016 -0700

----------------------------------------------------------------------

----------------------------------------------------------------------



[02/20] incubator-trafficcontrol git commit: Add TM2 Apache license headers

Posted by ne...@apache.org.
Add TM2 Apache license headers


Project: http://git-wip-us.apache.org/repos/asf/incubator-trafficcontrol/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-trafficcontrol/commit/6eeb7328
Tree: http://git-wip-us.apache.org/repos/asf/incubator-trafficcontrol/tree/6eeb7328
Diff: http://git-wip-us.apache.org/repos/asf/incubator-trafficcontrol/diff/6eeb7328

Branch: refs/heads/master
Commit: 6eeb73289feefca102b6382a9ae24885e475deba
Parents: 0511308
Author: Robert Butts <ro...@gmail.com>
Authored: Thu Dec 8 10:00:45 2016 -0700
Committer: Dave Neuman <ne...@apache.org>
Committed: Thu Dec 8 11:44:33 2016 -0700

----------------------------------------------------------------------
 .../experimental/common/poller/heap_test.go      | 19 +++++++++++++++++++
 1 file changed, 19 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-trafficcontrol/blob/6eeb7328/traffic_monitor/experimental/common/poller/heap_test.go
----------------------------------------------------------------------
diff --git a/traffic_monitor/experimental/common/poller/heap_test.go b/traffic_monitor/experimental/common/poller/heap_test.go
index ada593d..f943c96 100644
--- a/traffic_monitor/experimental/common/poller/heap_test.go
+++ b/traffic_monitor/experimental/common/poller/heap_test.go
@@ -1,5 +1,24 @@
 package poller
 
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
 import (
 	"fmt"
 	"math/rand"


[17/20] incubator-trafficcontrol git commit: Remove TM2 debug prints

Posted by ne...@apache.org.
Remove TM2 debug prints


Project: http://git-wip-us.apache.org/repos/asf/incubator-trafficcontrol/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-trafficcontrol/commit/05113083
Tree: http://git-wip-us.apache.org/repos/asf/incubator-trafficcontrol/tree/05113083
Diff: http://git-wip-us.apache.org/repos/asf/incubator-trafficcontrol/diff/05113083

Branch: refs/heads/master
Commit: 0511308333898a6f254aab885ae26a7551d4a994
Parents: e279d16
Author: Robert Butts <ro...@gmail.com>
Authored: Thu Dec 8 09:59:22 2016 -0700
Committer: Dave Neuman <ne...@apache.org>
Committed: Thu Dec 8 11:44:33 2016 -0700

----------------------------------------------------------------------
 .../experimental/common/poller/heap.go           |  6 ------
 .../experimental/common/poller/poller.go         | 19 -------------------
 2 files changed, 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-trafficcontrol/blob/05113083/traffic_monitor/experimental/common/poller/heap.go
----------------------------------------------------------------------
diff --git a/traffic_monitor/experimental/common/poller/heap.go b/traffic_monitor/experimental/common/poller/heap.go
index b0152b3..c78a64e 100644
--- a/traffic_monitor/experimental/common/poller/heap.go
+++ b/traffic_monitor/experimental/common/poller/heap.go
@@ -95,9 +95,6 @@ func (h *Heap) Pop() (HeapPollInfo, bool) {
 	h.info[0] = h.info[len(h.info)-1]
 	h.info = h.info[:len(h.info)-1]
 	h.heapify(0)
-	if max.Info.ID == "odol-atsec-jac-04" {
-		fmt.Printf("httpPoll %v Heap.Pop id %v next %v\n", h.PollerID, max.Info.ID, max.Next)
-	}
 	return max, true
 }
 
@@ -105,9 +102,6 @@ func (h *Heap) Pop() (HeapPollInfo, bool) {
 func (h *Heap) Push(key HeapPollInfo) {
 	h.m.Lock()
 	defer h.m.Unlock()
-	if key.Info.ID == "odol-atsec-jac-04" {
-		fmt.Printf("httpPoll %v Heap.Push id %v next %v\n", h.PollerID, key.Info.ID, key.Next)
-	}
 	h.info = append(h.info, HeapPollInfo{Next: time.Unix(1<<63-1, 0)})
 	h.increaseKey(len(h.info)-1, key)
 }

http://git-wip-us.apache.org/repos/asf/incubator-trafficcontrol/blob/05113083/traffic_monitor/experimental/common/poller/poller.go
----------------------------------------------------------------------
diff --git a/traffic_monitor/experimental/common/poller/poller.go b/traffic_monitor/experimental/common/poller/poller.go
index 68d15ce..ea70eb0 100644
--- a/traffic_monitor/experimental/common/poller/poller.go
+++ b/traffic_monitor/experimental/common/poller/poller.go
@@ -244,14 +244,12 @@ func (p HttpPoller) InsomniacPoll() {
 			continue
 		}
 
-		fmt.Printf("HttpPoller.InsomniacPoll got newCfg\n")
 		if pollRunning {
 			killChan <- struct{}{}
 		}
 		pollRunning = true
 
 		polls := []HTTPPollInfo{}
-		fmt.Printf("HttpPoller.InsomniacPoll creating polls\n")
 		for id, pollCfg := range newCfg.Urls {
 			polls = append(polls, HTTPPollInfo{
 				Interval: newCfg.Interval,
@@ -260,7 +258,6 @@ func (p HttpPoller) InsomniacPoll() {
 				Timeout:  pollCfg.Timeout,
 			})
 		}
-		fmt.Printf("HttpPoller.InsomniacPoll created polls, going httpPoll\n")
 		go insomniacPoller(pollerId, polls, p.FetcherTemplate, killChan)
 		p.Config = newCfg
 	}
@@ -268,11 +265,9 @@ func (p HttpPoller) InsomniacPoll() {
 
 func insomniacPoller(pollerId int64, polls []HTTPPollInfo, fetcherTemplate fetcher.HttpFetcher, die <-chan struct{}) {
 	runtime.LockOSThread()
-	fmt.Printf("httpPoll %v called\n", pollerId)
 	heap := Heap{PollerID: pollerId}
 	start := time.Now()
 	fetchers := map[string]fetcher.Fetcher{}
-	fmt.Printf("httpPoll %v adding to heap\n", pollerId)
 	for _, p := range polls {
 		spread := time.Duration(rand.Float64()*float64(p.Interval/time.Nanosecond)) * time.Nanosecond
 		heap.Push(HeapPollInfo{Info: p, Next: start.Add(spread)})
@@ -285,7 +280,6 @@ func insomniacPoller(pollerId int64, polls []HTTPPollInfo, fetcherTemplate fetch
 		}
 		fetchers[p.ID] = fetcher
 	}
-	fmt.Printf("httpPoll %v added to heap\n", pollerId)
 
 	timeMax := func(a time.Time, b time.Time) time.Time {
 		if a.After(b) {
@@ -304,16 +298,11 @@ func insomniacPoller(pollerId int64, polls []HTTPPollInfo, fetcherTemplate fetch
 		<-pollFinishedChan
 		now := time.Now()
 		p.Next = timeMax(start.Add(p.Info.Interval), now)
-		if p.Info.ID == "odol-atsec-jac-04" {
-			fmt.Printf("httpPoll %v heaping id %v next %v start %v interval %v now %v add %v\n", pollerId, p.Info.ID, p.Next, start, p.Info.Interval, now, start.Add(p.Info.Interval))
-		}
 		heap.Push(p)
 	}
 
-	fmt.Printf("httpPoll %v starting main loop\n", pollerId)
 	for {
 		if mustDie(die) {
-			fmt.Printf("httpPoll %v dying\n", pollerId)
 			return
 		}
 		p, ok := heap.Pop()
@@ -321,15 +310,7 @@ func insomniacPoller(pollerId int64, polls []HTTPPollInfo, fetcherTemplate fetch
 			ThreadSleep(0)
 			continue
 		}
-		if p.Info.ID == "odol-atsec-jac-04" {
-			fmt.Printf("httpPoll %v popped id %v p.Next %v now %v\n", pollerId, p.Info.ID, p.Next, time.Now())
-		}
-
 		ThreadSleep(p.Next.Sub(time.Now()))
-
-		if p.Info.ID == "odol-atsec-jac-04" {
-			fmt.Printf("httpPoll %v polling %v next %v now %v\n", pollerId, p.Info.ID, p.Next, time.Now())
-		}
 		go poll(p)
 	}
 }