You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@trafficcontrol.apache.org by GitBox <gi...@apache.org> on 2022/05/17 15:45:46 UTC
[GitHub] [trafficcontrol] rawlinp opened a new pull request, #6838: Add TO option to cache server update status in memory
rawlinp opened a new pull request, #6838:
URL: https://github.com/apache/trafficcontrol/pull/6838
To reduce load on the Traffic Ops database, add an option to
periodically read all server update status data from the database and
serve all requests to /servers/{hostname}/update_status from memory.
With this option enabled, `t3c` runs on cache servers that don't have
updates pending don't cause any TODB queries, which means TODB has more
capacity for actually propagating changes.
The tradeoff is that it can take up to
`server_update_status_cache_refresh_interval_sec` seconds for cache
servers to see updates/revalidations pending. However, this is better
because it allows a cache server to get updates sooner than it would
otherwise (due to being able to run `t3c` more frequently).
<!--
Thank you for contributing! Please be sure to read our contribution guidelines: https://github.com/apache/trafficcontrol/blob/master/CONTRIBUTING.md
If this closes or relates to an existing issue, please reference it using one of the following:
Closes: #ISSUE
Related: #ISSUE
If this PR fixes a security vulnerability, DO NOT submit! Instead, contact
the Apache Traffic Control Security Team at security@trafficcontrol.apache.org and follow the
guidelines at https://apache.org/security regarding vulnerability disclosure.
-->
<!-- **^ Add meaningful description above** --><hr/>
## Which Traffic Control components are affected by this PR?
<!-- Please delete all components from this list that are NOT affected by this PR.
Feel free to add the name of a tool or script that is affected but not on the list.
-->
- Documentation
- Traffic Ops
## What is the best way to verify this PR?
<!-- Please include here ALL the steps necessary to test your PR.
If your PR has tests (and most should), provide the steps needed to run the tests.
If not, please provide step-by-step instructions to test the PR manually and explain why your PR does not need tests. -->
Ensure the automated tests pass. Configure `server_update_status_cache_refresh_interval_sec` in cdn.conf to a number > 0, queue updates on various cache servers (edges and mids), and ensure that `GET /servers/{hostname}/update_status` returns the expected data.
## PR submission checklist
- [x] This PR has tests <!-- If not, please delete this text and explain why this PR does not need tests. -->
- [x] This PR has documentation <!-- If not, please delete this text and explain why this PR does not need documentation. -->
- [x] This PR has a CHANGELOG.md entry <!-- A fix for a bug from an ATC release, an improvement, or a new feature should have a changelog entry. -->
- [x] This PR **DOES NOT FIX A SERIOUS SECURITY VULNERABILITY** (see [the Apache Software Foundation's security guidelines](https://apache.org/security) for details)
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: issues-unsubscribe@trafficcontrol.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [trafficcontrol] srijeet0406 commented on a diff in pull request #6838: Add TO option to cache server update status in memory
Posted by GitBox <gi...@apache.org>.
srijeet0406 commented on code in PR #6838:
URL: https://github.com/apache/trafficcontrol/pull/6838#discussion_r881890722
##########
traffic_ops/traffic_ops_golang/server/servers_update_status.go:
##########
@@ -175,3 +180,248 @@ ORDER BY s.id
}
return updateStatuses, nil, nil
}
+
+type serverUpdateStatuses struct {
+ serverMap map[string][]tc.ServerUpdateStatusV40
+ *sync.RWMutex
+ initialized bool
+ enabled bool // note: enabled is only written to once at startup, before serving requests, so it doesn't need synchronized access
+}
+
+var serverUpdateStatusCache = serverUpdateStatuses{RWMutex: &sync.RWMutex{}}
+
+func serverUpdateStatusCacheIsEnabled() bool {
Review Comment:
Should this function be called `serverUpdateStatusCacheIsInitialized` ?
##########
lib/go-tc/enum.go:
##########
@@ -107,6 +107,10 @@ func CacheTypeFromString(s string) CacheType {
return CacheTypeInvalid
}
+func IsCacheType(s string) bool {
Review Comment:
Could we rename this method as `IsValidCacheType`?
##########
traffic_ops/traffic_ops_golang/server/servers_update_status.go:
##########
@@ -175,3 +180,248 @@ ORDER BY s.id
}
return updateStatuses, nil, nil
}
+
+type serverUpdateStatuses struct {
+ serverMap map[string][]tc.ServerUpdateStatusV40
+ *sync.RWMutex
+ initialized bool
+ enabled bool // note: enabled is only written to once at startup, before serving requests, so it doesn't need synchronized access
+}
+
+var serverUpdateStatusCache = serverUpdateStatuses{RWMutex: &sync.RWMutex{}}
+
+func serverUpdateStatusCacheIsEnabled() bool {
+ if serverUpdateStatusCache.enabled {
+ serverUpdateStatusCache.RLock()
+ defer serverUpdateStatusCache.RUnlock()
+ return serverUpdateStatusCache.initialized
+ }
+ return false
+}
+
+func getServerUpdateStatusFromCache(hostname string) []tc.ServerUpdateStatusV40 {
+ serverUpdateStatusCache.RLock()
+ defer serverUpdateStatusCache.RUnlock()
+ return serverUpdateStatusCache.serverMap[hostname]
+}
+
+var once = sync.Once{}
+
+func InitServerUpdateStatusCache(interval time.Duration, db *sql.DB, timeout time.Duration) {
+ once.Do(func() {
+ if interval <= 0 {
+ return
+ }
+ serverUpdateStatusCache.enabled = true
+ refreshServerUpdateStatusCache(db, timeout)
+ startServerUpdateStatusCacheRefresher(interval, db, timeout)
+ })
+}
+
+func startServerUpdateStatusCacheRefresher(interval time.Duration, db *sql.DB, timeout time.Duration) {
+ go func() {
+ for {
+ time.Sleep(interval)
+ refreshServerUpdateStatusCache(db, timeout)
+ }
+ }()
+}
+
+func refreshServerUpdateStatusCache(db *sql.DB, timeout time.Duration) {
+ newServerUpdateStatuses, err := getServerUpdateStatuses(db, timeout)
+ if err != nil {
+ log.Errorf("refreshing server update status cache: %s", err.Error())
+ return
+ }
+ serverUpdateStatusCache.Lock()
+ defer serverUpdateStatusCache.Unlock()
+ serverUpdateStatusCache.serverMap = newServerUpdateStatuses
+ serverUpdateStatusCache.initialized = true
+ log.Infof("refreshed server update status cache (len = %d)", len(serverUpdateStatusCache.serverMap))
+}
+
+type serverInfo struct {
+ id int
+ hostname string
Review Comment:
nit: Could we rename this to `hostName`, just to be consistent?
##########
traffic_ops/traffic_ops_golang/server/servers_update_status.go:
##########
@@ -175,3 +180,248 @@ ORDER BY s.id
}
return updateStatuses, nil, nil
}
+
+type serverUpdateStatuses struct {
+ serverMap map[string][]tc.ServerUpdateStatusV40
+ *sync.RWMutex
+ initialized bool
+ enabled bool // note: enabled is only written to once at startup, before serving requests, so it doesn't need synchronized access
+}
+
+var serverUpdateStatusCache = serverUpdateStatuses{RWMutex: &sync.RWMutex{}}
+
+func serverUpdateStatusCacheIsEnabled() bool {
+ if serverUpdateStatusCache.enabled {
+ serverUpdateStatusCache.RLock()
+ defer serverUpdateStatusCache.RUnlock()
+ return serverUpdateStatusCache.initialized
+ }
+ return false
+}
+
+func getServerUpdateStatusFromCache(hostname string) []tc.ServerUpdateStatusV40 {
+ serverUpdateStatusCache.RLock()
+ defer serverUpdateStatusCache.RUnlock()
+ return serverUpdateStatusCache.serverMap[hostname]
+}
+
+var once = sync.Once{}
+
+func InitServerUpdateStatusCache(interval time.Duration, db *sql.DB, timeout time.Duration) {
+ once.Do(func() {
+ if interval <= 0 {
+ return
+ }
+ serverUpdateStatusCache.enabled = true
+ refreshServerUpdateStatusCache(db, timeout)
+ startServerUpdateStatusCacheRefresher(interval, db, timeout)
+ })
+}
+
+func startServerUpdateStatusCacheRefresher(interval time.Duration, db *sql.DB, timeout time.Duration) {
+ go func() {
+ for {
+ time.Sleep(interval)
+ refreshServerUpdateStatusCache(db, timeout)
+ }
+ }()
+}
+
+func refreshServerUpdateStatusCache(db *sql.DB, timeout time.Duration) {
+ newServerUpdateStatuses, err := getServerUpdateStatuses(db, timeout)
+ if err != nil {
+ log.Errorf("refreshing server update status cache: %s", err.Error())
+ return
+ }
+ serverUpdateStatusCache.Lock()
+ defer serverUpdateStatusCache.Unlock()
+ serverUpdateStatusCache.serverMap = newServerUpdateStatuses
+ serverUpdateStatusCache.initialized = true
+ log.Infof("refreshed server update status cache (len = %d)", len(serverUpdateStatusCache.serverMap))
+}
+
+type serverInfo struct {
+ id int
+ hostname string
+ typeName string
+ cdnId int
+ status string
+ cachegroup int
+ configUpdateTime *time.Time
+ configApplyTime *time.Time
+ revalUpdateTime *time.Time
+ revalApplyTime *time.Time
+}
+
+const getUseRevalPendingQuery = `
+ SELECT value::BOOLEAN
+ FROM parameter
+ WHERE name = 'use_reval_pending' AND config_file = 'global'
+ UNION ALL SELECT FALSE FETCH FIRST 1 ROW ONLY
+`
+
+const getServerInfoQuery = `
+ SELECT
+ s.id,
+ s.host_name,
+ t.name,
+ s.cdn_id,
+ st.name,
+ s.cachegroup,
+ s.config_update_time,
+ s.config_apply_time,
+ s.revalidate_update_time,
+ s.revalidate_apply_time
+ FROM server s
+ JOIN type t ON t.id = s.type
+ JOIN status st ON st.id = s.status
+`
+
+const getCacheGroupsQuery = `
+ SELECT
+ c.id,
+ c.parent_cachegroup_id,
+ c.secondary_parent_cachegroup_id
+ FROM cachegroup c
+`
+
+const getTopologyCacheGroupParentsQuery = `
+ SELECT
+ cg_child.id,
+ ARRAY_AGG(DISTINCT cg_parent.id)
+ FROM topology_cachegroup_parents tcp
+ JOIN topology_cachegroup tc_child on tc_child.id = tcp.child
+ JOIN cachegroup cg_child ON cg_child.name = tc_child.cachegroup
+ JOIN topology_cachegroup tc_parent ON tc_parent.id = tcp.parent
+ JOIN cachegroup cg_parent ON cg_parent.name = tc_parent.cachegroup
+ GROUP BY cg_child.id
+`
+
+func getServerUpdateStatuses(db *sql.DB, timeout time.Duration) (map[string][]tc.ServerUpdateStatusV40, error) {
+ dbCtx, dbClose := context.WithTimeout(context.Background(), timeout)
+ defer dbClose()
+ serversByID := make(map[int]serverInfo)
+ updatePendingByCDNCachegroup := make(map[int]map[int]bool)
+ revalPendingByCDNCachegroup := make(map[int]map[int]bool)
+ tx, err := db.BeginTx(dbCtx, nil)
+ if err != nil {
+ return nil, fmt.Errorf("beginning server update status transaction: %w", err)
+ }
+ defer func() {
+ if err := tx.Commit(); err != nil && err != sql.ErrTxDone {
+ log.Errorln("committing server update status transaction: " + err.Error())
+ }
+ }()
+
+ useRevalPending := false
+ if err := tx.QueryRowContext(dbCtx, getUseRevalPendingQuery).Scan(&useRevalPending); err != nil {
+ return nil, fmt.Errorf("querying use_reval_pending param: %w", err)
+ }
+
+ // get all servers and build map of update/revalPending by cachegroup+CDN
+ serverRows, err := tx.QueryContext(dbCtx, getServerInfoQuery)
+ if err != nil {
+ return nil, fmt.Errorf("querying servers: %w", err)
+ }
+ defer log.Close(serverRows, "closing server rows")
+ for serverRows.Next() {
+ s := serverInfo{}
+ if err := serverRows.Scan(&s.id, &s.hostname, &s.typeName, &s.cdnId, &s.status, &s.cachegroup, &s.configUpdateTime, &s.configApplyTime, &s.revalUpdateTime, &s.revalApplyTime); err != nil {
+ return nil, fmt.Errorf("scanning servers: %w", err)
+ }
+ serversByID[s.id] = s
+ if _, ok := updatePendingByCDNCachegroup[s.cdnId]; !ok {
+ updatePendingByCDNCachegroup[s.cdnId] = make(map[int]bool)
+ }
+ if _, ok := revalPendingByCDNCachegroup[s.cdnId]; !ok {
+ revalPendingByCDNCachegroup[s.cdnId] = make(map[int]bool)
+ }
+ status := tc.CacheStatusFromString(s.status)
+ if tc.IsCacheType(s.typeName) && (status == tc.CacheStatusOnline || status == tc.CacheStatusReported || status == tc.CacheStatusAdminDown) {
+ if s.configUpdateTime.After(*s.configApplyTime) {
+ updatePendingByCDNCachegroup[s.cdnId][s.cachegroup] = true
+ }
+ if s.revalUpdateTime.After(*s.revalApplyTime) {
+ revalPendingByCDNCachegroup[s.cdnId][s.cachegroup] = true
+ }
+ }
+ }
+ if err := serverRows.Err(); err != nil {
+ return nil, fmt.Errorf("iterating over server rows: %w", err)
+ }
+
+ // get all legacy cachegroup parents
+ cacheGroupParents := make(map[int]map[int]struct{})
+ cacheGroupRows, err := tx.QueryContext(dbCtx, getCacheGroupsQuery)
+ if err != nil {
+ return nil, fmt.Errorf("querying cachegroups: %w", err)
+ }
+ defer log.Close(cacheGroupRows, "closing cachegroup rows")
+ for cacheGroupRows.Next() {
+ id := 0
+ parentID := new(int)
+ secondaryParentID := new(int)
+ if err := cacheGroupRows.Scan(&id, &parentID, &secondaryParentID); err != nil {
+ return nil, fmt.Errorf("scanning cachegroups: %w", err)
+ }
+ cacheGroupParents[id] = make(map[int]struct{})
+ if parentID != nil {
+ cacheGroupParents[id][*parentID] = struct{}{}
+ }
+ if secondaryParentID != nil {
+ cacheGroupParents[id][*secondaryParentID] = struct{}{}
+ }
+ }
+ if err := cacheGroupRows.Err(); err != nil {
+ return nil, fmt.Errorf("iterating over cachegroup rows: %w", err)
+ }
+
+ // get all topology-based cachegroup parents
+ topologyCachegroupRows, err := tx.QueryContext(dbCtx, getTopologyCacheGroupParentsQuery)
+ if err != nil {
+ return nil, fmt.Errorf("querying topology cachegroups: %w", err)
+ }
+ defer log.Close(topologyCachegroupRows, "closing topology cachegroup rows")
+ for topologyCachegroupRows.Next() {
+ id := 0
+ parents := []int32{}
+ if err := topologyCachegroupRows.Scan(&id, pq.Array(&parents)); err != nil {
+ return nil, fmt.Errorf("scanning topology cachegroup rows: %w", err)
+ }
+ for _, p := range parents {
+ cacheGroupParents[id][int(p)] = struct{}{}
+ }
+ }
+ if err = topologyCachegroupRows.Err(); err != nil {
+ return nil, fmt.Errorf("iterating over topology cachegroup rows: %w", err)
+ }
+
+ serverUpdateStatuses := make(map[string][]tc.ServerUpdateStatusV40, len(serversByID))
+ for serverID, server := range serversByID {
+ updateStatus := tc.ServerUpdateStatusV40{
+ HostName: server.hostname,
+ UpdatePending: server.configUpdateTime.After(*server.configApplyTime),
+ RevalPending: server.revalUpdateTime.After(*server.revalApplyTime),
Review Comment:
Can the `revalApplyTime` or `configApplyTime` be `nil`? In that case, this will panic.
##########
traffic_ops/traffic_ops_golang/server/servers_update_status.go:
##########
@@ -175,3 +180,248 @@ ORDER BY s.id
}
return updateStatuses, nil, nil
}
+
+type serverUpdateStatuses struct {
+ serverMap map[string][]tc.ServerUpdateStatusV40
+ *sync.RWMutex
+ initialized bool
+ enabled bool // note: enabled is only written to once at startup, before serving requests, so it doesn't need synchronized access
+}
+
+var serverUpdateStatusCache = serverUpdateStatuses{RWMutex: &sync.RWMutex{}}
+
+func serverUpdateStatusCacheIsEnabled() bool {
+ if serverUpdateStatusCache.enabled {
+ serverUpdateStatusCache.RLock()
+ defer serverUpdateStatusCache.RUnlock()
+ return serverUpdateStatusCache.initialized
+ }
+ return false
+}
+
+func getServerUpdateStatusFromCache(hostname string) []tc.ServerUpdateStatusV40 {
+ serverUpdateStatusCache.RLock()
+ defer serverUpdateStatusCache.RUnlock()
+ return serverUpdateStatusCache.serverMap[hostname]
+}
+
+var once = sync.Once{}
+
+func InitServerUpdateStatusCache(interval time.Duration, db *sql.DB, timeout time.Duration) {
+ once.Do(func() {
+ if interval <= 0 {
+ return
+ }
+ serverUpdateStatusCache.enabled = true
+ refreshServerUpdateStatusCache(db, timeout)
+ startServerUpdateStatusCacheRefresher(interval, db, timeout)
+ })
+}
+
+func startServerUpdateStatusCacheRefresher(interval time.Duration, db *sql.DB, timeout time.Duration) {
+ go func() {
+ for {
+ time.Sleep(interval)
+ refreshServerUpdateStatusCache(db, timeout)
+ }
+ }()
+}
+
+func refreshServerUpdateStatusCache(db *sql.DB, timeout time.Duration) {
+ newServerUpdateStatuses, err := getServerUpdateStatuses(db, timeout)
+ if err != nil {
+ log.Errorf("refreshing server update status cache: %s", err.Error())
+ return
+ }
+ serverUpdateStatusCache.Lock()
+ defer serverUpdateStatusCache.Unlock()
+ serverUpdateStatusCache.serverMap = newServerUpdateStatuses
+ serverUpdateStatusCache.initialized = true
+ log.Infof("refreshed server update status cache (len = %d)", len(serverUpdateStatusCache.serverMap))
+}
+
+type serverInfo struct {
+ id int
+ hostname string
+ typeName string
+ cdnId int
+ status string
+ cachegroup int
+ configUpdateTime *time.Time
+ configApplyTime *time.Time
+ revalUpdateTime *time.Time
+ revalApplyTime *time.Time
+}
+
+const getUseRevalPendingQuery = `
+ SELECT value::BOOLEAN
+ FROM parameter
+ WHERE name = 'use_reval_pending' AND config_file = 'global'
+ UNION ALL SELECT FALSE FETCH FIRST 1 ROW ONLY
+`
+
+const getServerInfoQuery = `
+ SELECT
+ s.id,
+ s.host_name,
+ t.name,
+ s.cdn_id,
+ st.name,
+ s.cachegroup,
+ s.config_update_time,
+ s.config_apply_time,
+ s.revalidate_update_time,
+ s.revalidate_apply_time
+ FROM server s
+ JOIN type t ON t.id = s.type
+ JOIN status st ON st.id = s.status
+`
+
+const getCacheGroupsQuery = `
+ SELECT
+ c.id,
+ c.parent_cachegroup_id,
+ c.secondary_parent_cachegroup_id
+ FROM cachegroup c
+`
+
+const getTopologyCacheGroupParentsQuery = `
+ SELECT
+ cg_child.id,
+ ARRAY_AGG(DISTINCT cg_parent.id)
+ FROM topology_cachegroup_parents tcp
+ JOIN topology_cachegroup tc_child on tc_child.id = tcp.child
Review Comment:
nit: should be `ON`
##########
lib/go-tc/enum.go:
##########
@@ -107,6 +107,10 @@ func CacheTypeFromString(s string) CacheType {
return CacheTypeInvalid
}
+func IsCacheType(s string) bool {
Review Comment:
Also, could you pls add a GoDoc to this method?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: issues-unsubscribe@trafficcontrol.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [trafficcontrol] rawlinp commented on a diff in pull request #6838: Add TO option to cache server update status in memory
Posted by GitBox <gi...@apache.org>.
rawlinp commented on code in PR #6838:
URL: https://github.com/apache/trafficcontrol/pull/6838#discussion_r881952158
##########
lib/go-tc/enum.go:
##########
@@ -107,6 +107,10 @@ func CacheTypeFromString(s string) CacheType {
return CacheTypeInvalid
}
+func IsCacheType(s string) bool {
Review Comment:
Done
##########
traffic_ops/traffic_ops_golang/server/servers_update_status.go:
##########
@@ -175,3 +180,248 @@ ORDER BY s.id
}
return updateStatuses, nil, nil
}
+
+type serverUpdateStatuses struct {
+ serverMap map[string][]tc.ServerUpdateStatusV40
+ *sync.RWMutex
+ initialized bool
+ enabled bool // note: enabled is only written to once at startup, before serving requests, so it doesn't need synchronized access
+}
+
+var serverUpdateStatusCache = serverUpdateStatuses{RWMutex: &sync.RWMutex{}}
+
+func serverUpdateStatusCacheIsEnabled() bool {
+ if serverUpdateStatusCache.enabled {
+ serverUpdateStatusCache.RLock()
+ defer serverUpdateStatusCache.RUnlock()
+ return serverUpdateStatusCache.initialized
+ }
+ return false
+}
+
+func getServerUpdateStatusFromCache(hostname string) []tc.ServerUpdateStatusV40 {
+ serverUpdateStatusCache.RLock()
+ defer serverUpdateStatusCache.RUnlock()
+ return serverUpdateStatusCache.serverMap[hostname]
+}
+
+var once = sync.Once{}
+
+func InitServerUpdateStatusCache(interval time.Duration, db *sql.DB, timeout time.Duration) {
+ once.Do(func() {
+ if interval <= 0 {
+ return
+ }
+ serverUpdateStatusCache.enabled = true
+ refreshServerUpdateStatusCache(db, timeout)
+ startServerUpdateStatusCacheRefresher(interval, db, timeout)
+ })
+}
+
+func startServerUpdateStatusCacheRefresher(interval time.Duration, db *sql.DB, timeout time.Duration) {
+ go func() {
+ for {
+ time.Sleep(interval)
+ refreshServerUpdateStatusCache(db, timeout)
+ }
+ }()
+}
+
+func refreshServerUpdateStatusCache(db *sql.DB, timeout time.Duration) {
+ newServerUpdateStatuses, err := getServerUpdateStatuses(db, timeout)
+ if err != nil {
+ log.Errorf("refreshing server update status cache: %s", err.Error())
+ return
+ }
+ serverUpdateStatusCache.Lock()
+ defer serverUpdateStatusCache.Unlock()
+ serverUpdateStatusCache.serverMap = newServerUpdateStatuses
+ serverUpdateStatusCache.initialized = true
+ log.Infof("refreshed server update status cache (len = %d)", len(serverUpdateStatusCache.serverMap))
+}
+
+type serverInfo struct {
+ id int
+ hostname string
+ typeName string
+ cdnId int
+ status string
+ cachegroup int
+ configUpdateTime *time.Time
+ configApplyTime *time.Time
+ revalUpdateTime *time.Time
+ revalApplyTime *time.Time
+}
+
+const getUseRevalPendingQuery = `
+ SELECT value::BOOLEAN
+ FROM parameter
+ WHERE name = 'use_reval_pending' AND config_file = 'global'
+ UNION ALL SELECT FALSE FETCH FIRST 1 ROW ONLY
+`
+
+const getServerInfoQuery = `
+ SELECT
+ s.id,
+ s.host_name,
+ t.name,
+ s.cdn_id,
+ st.name,
+ s.cachegroup,
+ s.config_update_time,
+ s.config_apply_time,
+ s.revalidate_update_time,
+ s.revalidate_apply_time
+ FROM server s
+ JOIN type t ON t.id = s.type
+ JOIN status st ON st.id = s.status
+`
+
+const getCacheGroupsQuery = `
+ SELECT
+ c.id,
+ c.parent_cachegroup_id,
+ c.secondary_parent_cachegroup_id
+ FROM cachegroup c
+`
+
+const getTopologyCacheGroupParentsQuery = `
+ SELECT
+ cg_child.id,
+ ARRAY_AGG(DISTINCT cg_parent.id)
+ FROM topology_cachegroup_parents tcp
+ JOIN topology_cachegroup tc_child on tc_child.id = tcp.child
+ JOIN cachegroup cg_child ON cg_child.name = tc_child.cachegroup
+ JOIN topology_cachegroup tc_parent ON tc_parent.id = tcp.parent
+ JOIN cachegroup cg_parent ON cg_parent.name = tc_parent.cachegroup
+ GROUP BY cg_child.id
+`
+
+func getServerUpdateStatuses(db *sql.DB, timeout time.Duration) (map[string][]tc.ServerUpdateStatusV40, error) {
+ dbCtx, dbClose := context.WithTimeout(context.Background(), timeout)
+ defer dbClose()
+ serversByID := make(map[int]serverInfo)
+ updatePendingByCDNCachegroup := make(map[int]map[int]bool)
+ revalPendingByCDNCachegroup := make(map[int]map[int]bool)
+ tx, err := db.BeginTx(dbCtx, nil)
+ if err != nil {
+ return nil, fmt.Errorf("beginning server update status transaction: %w", err)
+ }
+ defer func() {
+ if err := tx.Commit(); err != nil && err != sql.ErrTxDone {
+ log.Errorln("committing server update status transaction: " + err.Error())
+ }
+ }()
+
+ useRevalPending := false
+ if err := tx.QueryRowContext(dbCtx, getUseRevalPendingQuery).Scan(&useRevalPending); err != nil {
+ return nil, fmt.Errorf("querying use_reval_pending param: %w", err)
+ }
+
+ // get all servers and build map of update/revalPending by cachegroup+CDN
+ serverRows, err := tx.QueryContext(dbCtx, getServerInfoQuery)
+ if err != nil {
+ return nil, fmt.Errorf("querying servers: %w", err)
+ }
+ defer log.Close(serverRows, "closing server rows")
+ for serverRows.Next() {
+ s := serverInfo{}
+ if err := serverRows.Scan(&s.id, &s.hostname, &s.typeName, &s.cdnId, &s.status, &s.cachegroup, &s.configUpdateTime, &s.configApplyTime, &s.revalUpdateTime, &s.revalApplyTime); err != nil {
+ return nil, fmt.Errorf("scanning servers: %w", err)
+ }
+ serversByID[s.id] = s
+ if _, ok := updatePendingByCDNCachegroup[s.cdnId]; !ok {
+ updatePendingByCDNCachegroup[s.cdnId] = make(map[int]bool)
+ }
+ if _, ok := revalPendingByCDNCachegroup[s.cdnId]; !ok {
+ revalPendingByCDNCachegroup[s.cdnId] = make(map[int]bool)
+ }
+ status := tc.CacheStatusFromString(s.status)
+ if tc.IsCacheType(s.typeName) && (status == tc.CacheStatusOnline || status == tc.CacheStatusReported || status == tc.CacheStatusAdminDown) {
+ if s.configUpdateTime.After(*s.configApplyTime) {
+ updatePendingByCDNCachegroup[s.cdnId][s.cachegroup] = true
+ }
+ if s.revalUpdateTime.After(*s.revalApplyTime) {
+ revalPendingByCDNCachegroup[s.cdnId][s.cachegroup] = true
+ }
+ }
+ }
+ if err := serverRows.Err(); err != nil {
+ return nil, fmt.Errorf("iterating over server rows: %w", err)
+ }
+
+ // get all legacy cachegroup parents
+ cacheGroupParents := make(map[int]map[int]struct{})
+ cacheGroupRows, err := tx.QueryContext(dbCtx, getCacheGroupsQuery)
+ if err != nil {
+ return nil, fmt.Errorf("querying cachegroups: %w", err)
+ }
+ defer log.Close(cacheGroupRows, "closing cachegroup rows")
+ for cacheGroupRows.Next() {
+ id := 0
+ parentID := new(int)
+ secondaryParentID := new(int)
+ if err := cacheGroupRows.Scan(&id, &parentID, &secondaryParentID); err != nil {
+ return nil, fmt.Errorf("scanning cachegroups: %w", err)
+ }
+ cacheGroupParents[id] = make(map[int]struct{})
+ if parentID != nil {
+ cacheGroupParents[id][*parentID] = struct{}{}
+ }
+ if secondaryParentID != nil {
+ cacheGroupParents[id][*secondaryParentID] = struct{}{}
+ }
+ }
+ if err := cacheGroupRows.Err(); err != nil {
+ return nil, fmt.Errorf("iterating over cachegroup rows: %w", err)
+ }
+
+ // get all topology-based cachegroup parents
+ topologyCachegroupRows, err := tx.QueryContext(dbCtx, getTopologyCacheGroupParentsQuery)
+ if err != nil {
+ return nil, fmt.Errorf("querying topology cachegroups: %w", err)
+ }
+ defer log.Close(topologyCachegroupRows, "closing topology cachegroup rows")
+ for topologyCachegroupRows.Next() {
+ id := 0
+ parents := []int32{}
+ if err := topologyCachegroupRows.Scan(&id, pq.Array(&parents)); err != nil {
+ return nil, fmt.Errorf("scanning topology cachegroup rows: %w", err)
+ }
+ for _, p := range parents {
+ cacheGroupParents[id][int(p)] = struct{}{}
+ }
+ }
+ if err = topologyCachegroupRows.Err(); err != nil {
+ return nil, fmt.Errorf("iterating over topology cachegroup rows: %w", err)
+ }
+
+ serverUpdateStatuses := make(map[string][]tc.ServerUpdateStatusV40, len(serversByID))
+ for serverID, server := range serversByID {
+ updateStatus := tc.ServerUpdateStatusV40{
+ HostName: server.hostname,
+ UpdatePending: server.configUpdateTime.After(*server.configApplyTime),
+ RevalPending: server.revalUpdateTime.After(*server.revalApplyTime),
Review Comment:
No, not unless the DB schema is changed to make the fields nullable. Originally, I had these fields as non-pointers, because generally that's what we want when DB columns are non-null, but it was causing weird pointer issues in the unit tests that I couldn't really explain.
That said, if the DB columns ever _do_ become nullable and this code isn't changed, it might actually be better to crash than log an error. Fail fast, fail loud. Logged errors would be ignored, and we'd be sitting around wondering why none of our servers are able to queue updates.
##########
traffic_ops/traffic_ops_golang/server/servers_update_status.go:
##########
@@ -175,3 +180,248 @@ ORDER BY s.id
}
return updateStatuses, nil, nil
}
+
+type serverUpdateStatuses struct {
+ serverMap map[string][]tc.ServerUpdateStatusV40
+ *sync.RWMutex
+ initialized bool
+ enabled bool // note: enabled is only written to once at startup, before serving requests, so it doesn't need synchronized access
+}
+
+var serverUpdateStatusCache = serverUpdateStatuses{RWMutex: &sync.RWMutex{}}
+
+func serverUpdateStatusCacheIsEnabled() bool {
+ if serverUpdateStatusCache.enabled {
+ serverUpdateStatusCache.RLock()
+ defer serverUpdateStatusCache.RUnlock()
+ return serverUpdateStatusCache.initialized
+ }
+ return false
+}
+
+func getServerUpdateStatusFromCache(hostname string) []tc.ServerUpdateStatusV40 {
+ serverUpdateStatusCache.RLock()
+ defer serverUpdateStatusCache.RUnlock()
+ return serverUpdateStatusCache.serverMap[hostname]
+}
+
+var once = sync.Once{}
+
+func InitServerUpdateStatusCache(interval time.Duration, db *sql.DB, timeout time.Duration) {
+ once.Do(func() {
+ if interval <= 0 {
+ return
+ }
+ serverUpdateStatusCache.enabled = true
+ refreshServerUpdateStatusCache(db, timeout)
+ startServerUpdateStatusCacheRefresher(interval, db, timeout)
+ })
+}
+
+func startServerUpdateStatusCacheRefresher(interval time.Duration, db *sql.DB, timeout time.Duration) {
+ go func() {
+ for {
+ time.Sleep(interval)
+ refreshServerUpdateStatusCache(db, timeout)
+ }
+ }()
+}
+
+func refreshServerUpdateStatusCache(db *sql.DB, timeout time.Duration) {
+ newServerUpdateStatuses, err := getServerUpdateStatuses(db, timeout)
+ if err != nil {
+ log.Errorf("refreshing server update status cache: %s", err.Error())
+ return
+ }
+ serverUpdateStatusCache.Lock()
+ defer serverUpdateStatusCache.Unlock()
+ serverUpdateStatusCache.serverMap = newServerUpdateStatuses
+ serverUpdateStatusCache.initialized = true
+ log.Infof("refreshed server update status cache (len = %d)", len(serverUpdateStatusCache.serverMap))
+}
+
+type serverInfo struct {
+ id int
+ hostname string
+ typeName string
+ cdnId int
+ status string
+ cachegroup int
+ configUpdateTime *time.Time
+ configApplyTime *time.Time
+ revalUpdateTime *time.Time
+ revalApplyTime *time.Time
+}
+
+const getUseRevalPendingQuery = `
+ SELECT value::BOOLEAN
+ FROM parameter
+ WHERE name = 'use_reval_pending' AND config_file = 'global'
+ UNION ALL SELECT FALSE FETCH FIRST 1 ROW ONLY
+`
+
+const getServerInfoQuery = `
+ SELECT
+ s.id,
+ s.host_name,
+ t.name,
+ s.cdn_id,
+ st.name,
+ s.cachegroup,
+ s.config_update_time,
+ s.config_apply_time,
+ s.revalidate_update_time,
+ s.revalidate_apply_time
+ FROM server s
+ JOIN type t ON t.id = s.type
+ JOIN status st ON st.id = s.status
+`
+
+const getCacheGroupsQuery = `
+ SELECT
+ c.id,
+ c.parent_cachegroup_id,
+ c.secondary_parent_cachegroup_id
+ FROM cachegroup c
+`
+
+const getTopologyCacheGroupParentsQuery = `
+ SELECT
+ cg_child.id,
+ ARRAY_AGG(DISTINCT cg_parent.id)
+ FROM topology_cachegroup_parents tcp
+ JOIN topology_cachegroup tc_child on tc_child.id = tcp.child
Review Comment:
Done
##########
traffic_ops/traffic_ops_golang/server/servers_update_status.go:
##########
@@ -175,3 +180,248 @@ ORDER BY s.id
}
return updateStatuses, nil, nil
}
+
+type serverUpdateStatuses struct {
+ serverMap map[string][]tc.ServerUpdateStatusV40
+ *sync.RWMutex
+ initialized bool
+ enabled bool // note: enabled is only written to once at startup, before serving requests, so it doesn't need synchronized access
+}
+
+var serverUpdateStatusCache = serverUpdateStatuses{RWMutex: &sync.RWMutex{}}
+
+func serverUpdateStatusCacheIsEnabled() bool {
+ if serverUpdateStatusCache.enabled {
+ serverUpdateStatusCache.RLock()
+ defer serverUpdateStatusCache.RUnlock()
+ return serverUpdateStatusCache.initialized
+ }
+ return false
+}
+
+func getServerUpdateStatusFromCache(hostname string) []tc.ServerUpdateStatusV40 {
+ serverUpdateStatusCache.RLock()
+ defer serverUpdateStatusCache.RUnlock()
+ return serverUpdateStatusCache.serverMap[hostname]
+}
+
+var once = sync.Once{}
+
+func InitServerUpdateStatusCache(interval time.Duration, db *sql.DB, timeout time.Duration) {
+ once.Do(func() {
+ if interval <= 0 {
+ return
+ }
+ serverUpdateStatusCache.enabled = true
+ refreshServerUpdateStatusCache(db, timeout)
+ startServerUpdateStatusCacheRefresher(interval, db, timeout)
+ })
+}
+
+func startServerUpdateStatusCacheRefresher(interval time.Duration, db *sql.DB, timeout time.Duration) {
+ go func() {
+ for {
+ time.Sleep(interval)
+ refreshServerUpdateStatusCache(db, timeout)
+ }
+ }()
+}
+
+func refreshServerUpdateStatusCache(db *sql.DB, timeout time.Duration) {
+ newServerUpdateStatuses, err := getServerUpdateStatuses(db, timeout)
+ if err != nil {
+ log.Errorf("refreshing server update status cache: %s", err.Error())
+ return
+ }
+ serverUpdateStatusCache.Lock()
+ defer serverUpdateStatusCache.Unlock()
+ serverUpdateStatusCache.serverMap = newServerUpdateStatuses
+ serverUpdateStatusCache.initialized = true
+ log.Infof("refreshed server update status cache (len = %d)", len(serverUpdateStatusCache.serverMap))
+}
+
+type serverInfo struct {
+ id int
+ hostname string
Review Comment:
Sure, done
##########
traffic_ops/traffic_ops_golang/server/servers_update_status.go:
##########
@@ -175,3 +180,248 @@ ORDER BY s.id
}
return updateStatuses, nil, nil
}
+
+type serverUpdateStatuses struct {
+ serverMap map[string][]tc.ServerUpdateStatusV40
+ *sync.RWMutex
+ initialized bool
+ enabled bool // note: enabled is only written to once at startup, before serving requests, so it doesn't need synchronized access
+}
+
+var serverUpdateStatusCache = serverUpdateStatuses{RWMutex: &sync.RWMutex{}}
+
+func serverUpdateStatusCacheIsEnabled() bool {
Review Comment:
Sure, done
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: issues-unsubscribe@trafficcontrol.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [trafficcontrol] srijeet0406 merged pull request #6838: Add TO option to cache server update status in memory
Posted by GitBox <gi...@apache.org>.
srijeet0406 merged PR #6838:
URL: https://github.com/apache/trafficcontrol/pull/6838
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: issues-unsubscribe@trafficcontrol.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org