You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@trafficcontrol.apache.org by GitBox <gi...@apache.org> on 2022/05/17 15:45:46 UTC

[GitHub] [trafficcontrol] rawlinp opened a new pull request, #6838: Add TO option to cache server update status in memory

rawlinp opened a new pull request, #6838:
URL: https://github.com/apache/trafficcontrol/pull/6838

   To reduce load on the Traffic Ops database, add an option to
   periodically read all server update status data from the database and
   serve all requests to /servers/{hostname}/update_status from memory.
   
   With this option enabled, `t3c` runs on cache servers that don't have
   updates pending don't cause any TODB queries, which means TODB has more
   capacity for actually propagating changes.
   
   The tradeoff is that it can take up to
   `server_update_status_cache_refresh_interval_sec` seconds for cache
   servers to see updates/revalidations pending. However, this is better
   because it allows a cache server to get updates sooner than it would
   otherwise (due to being able to run `t3c` more frequently).
   
   <!--
   Thank you for contributing! Please be sure to read our contribution guidelines: https://github.com/apache/trafficcontrol/blob/master/CONTRIBUTING.md
   If this closes or relates to an existing issue, please reference it using one of the following:
   
   Closes: #ISSUE
   Related: #ISSUE
   
   If this PR fixes a security vulnerability, DO NOT submit! Instead, contact
   the Apache Traffic Control Security Team at security@trafficcontrol.apache.org and follow the
   guidelines at https://apache.org/security regarding vulnerability disclosure.
   -->
   
   
   <!-- **^ Add meaningful description above** --><hr/>
   
   ## Which Traffic Control components are affected by this PR?
   <!-- Please delete all components from this list that are NOT affected by this PR.
   Feel free to add the name of a tool or script that is affected but not on the list.
   -->
   - Documentation
   - Traffic Ops
   
   ## What is the best way to verify this PR?
   <!-- Please include here ALL the steps necessary to test your PR.
   If your PR has tests (and most should), provide the steps needed to run the tests.
   If not, please provide step-by-step instructions to test the PR manually and explain why your PR does not need tests. -->
   Ensure the automated tests pass. Configure `server_update_status_cache_refresh_interval_sec` in cdn.conf to a number > 0, queue updates on various cache servers (edges and mids), and ensure that `GET /servers/{hostname}/update_status` returns the expected data.
   
   
   ## PR submission checklist
   - [x] This PR has tests <!-- If not, please delete this text and explain why this PR does not need tests. -->
   - [x] This PR has documentation <!-- If not, please delete this text and explain why this PR does not need documentation. -->
   - [x] This PR has a CHANGELOG.md entry <!-- A fix for a bug from an ATC release, an improvement, or a new feature should have a changelog entry. -->
   - [x] This PR **DOES NOT FIX A SERIOUS SECURITY VULNERABILITY** (see [the Apache Software Foundation's security guidelines](https://apache.org/security) for details)
   
   <!--
   Licensed to the Apache Software Foundation (ASF) under one
   or more contributor license agreements.  See the NOTICE file
   distributed with this work for additional information
   regarding copyright ownership.  The ASF licenses this file
   to you under the Apache License, Version 2.0 (the
   "License"); you may not use this file except in compliance
   with the License.  You may obtain a copy of the License at
   
       http://www.apache.org/licenses/LICENSE-2.0
   
   Unless required by applicable law or agreed to in writing,
   software distributed under the License is distributed on an
   "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
   KIND, either express or implied.  See the License for the
   specific language governing permissions and limitations
   under the License.
   -->
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@trafficcontrol.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [trafficcontrol] srijeet0406 commented on a diff in pull request #6838: Add TO option to cache server update status in memory

Posted by GitBox <gi...@apache.org>.
srijeet0406 commented on code in PR #6838:
URL: https://github.com/apache/trafficcontrol/pull/6838#discussion_r881890722


##########
traffic_ops/traffic_ops_golang/server/servers_update_status.go:
##########
@@ -175,3 +180,248 @@ ORDER BY s.id
 	}
 	return updateStatuses, nil, nil
 }
+
+type serverUpdateStatuses struct {
+	serverMap map[string][]tc.ServerUpdateStatusV40
+	*sync.RWMutex
+	initialized bool
+	enabled     bool // note: enabled is only written to once at startup, before serving requests, so it doesn't need synchronized access
+}
+
+var serverUpdateStatusCache = serverUpdateStatuses{RWMutex: &sync.RWMutex{}}
+
+func serverUpdateStatusCacheIsEnabled() bool {

Review Comment:
   Should this function be called `serverUpdateStatusCacheIsInitialized` ?



##########
lib/go-tc/enum.go:
##########
@@ -107,6 +107,10 @@ func CacheTypeFromString(s string) CacheType {
 	return CacheTypeInvalid
 }
 
+func IsCacheType(s string) bool {

Review Comment:
   Could we rename this method as `IsValidCacheType`?



##########
traffic_ops/traffic_ops_golang/server/servers_update_status.go:
##########
@@ -175,3 +180,248 @@ ORDER BY s.id
 	}
 	return updateStatuses, nil, nil
 }
+
+type serverUpdateStatuses struct {
+	serverMap map[string][]tc.ServerUpdateStatusV40
+	*sync.RWMutex
+	initialized bool
+	enabled     bool // note: enabled is only written to once at startup, before serving requests, so it doesn't need synchronized access
+}
+
+var serverUpdateStatusCache = serverUpdateStatuses{RWMutex: &sync.RWMutex{}}
+
+func serverUpdateStatusCacheIsEnabled() bool {
+	if serverUpdateStatusCache.enabled {
+		serverUpdateStatusCache.RLock()
+		defer serverUpdateStatusCache.RUnlock()
+		return serverUpdateStatusCache.initialized
+	}
+	return false
+}
+
+func getServerUpdateStatusFromCache(hostname string) []tc.ServerUpdateStatusV40 {
+	serverUpdateStatusCache.RLock()
+	defer serverUpdateStatusCache.RUnlock()
+	return serverUpdateStatusCache.serverMap[hostname]
+}
+
+var once = sync.Once{}
+
+func InitServerUpdateStatusCache(interval time.Duration, db *sql.DB, timeout time.Duration) {
+	once.Do(func() {
+		if interval <= 0 {
+			return
+		}
+		serverUpdateStatusCache.enabled = true
+		refreshServerUpdateStatusCache(db, timeout)
+		startServerUpdateStatusCacheRefresher(interval, db, timeout)
+	})
+}
+
+func startServerUpdateStatusCacheRefresher(interval time.Duration, db *sql.DB, timeout time.Duration) {
+	go func() {
+		for {
+			time.Sleep(interval)
+			refreshServerUpdateStatusCache(db, timeout)
+		}
+	}()
+}
+
+func refreshServerUpdateStatusCache(db *sql.DB, timeout time.Duration) {
+	newServerUpdateStatuses, err := getServerUpdateStatuses(db, timeout)
+	if err != nil {
+		log.Errorf("refreshing server update status cache: %s", err.Error())
+		return
+	}
+	serverUpdateStatusCache.Lock()
+	defer serverUpdateStatusCache.Unlock()
+	serverUpdateStatusCache.serverMap = newServerUpdateStatuses
+	serverUpdateStatusCache.initialized = true
+	log.Infof("refreshed server update status cache (len = %d)", len(serverUpdateStatusCache.serverMap))
+}
+
+type serverInfo struct {
+	id               int
+	hostname         string

Review Comment:
   nit: Could we rename this to `hostName`, just to be consistent?



##########
traffic_ops/traffic_ops_golang/server/servers_update_status.go:
##########
@@ -175,3 +180,248 @@ ORDER BY s.id
 	}
 	return updateStatuses, nil, nil
 }
+
+type serverUpdateStatuses struct {
+	serverMap map[string][]tc.ServerUpdateStatusV40
+	*sync.RWMutex
+	initialized bool
+	enabled     bool // note: enabled is only written to once at startup, before serving requests, so it doesn't need synchronized access
+}
+
+var serverUpdateStatusCache = serverUpdateStatuses{RWMutex: &sync.RWMutex{}}
+
+func serverUpdateStatusCacheIsEnabled() bool {
+	if serverUpdateStatusCache.enabled {
+		serverUpdateStatusCache.RLock()
+		defer serverUpdateStatusCache.RUnlock()
+		return serverUpdateStatusCache.initialized
+	}
+	return false
+}
+
+func getServerUpdateStatusFromCache(hostname string) []tc.ServerUpdateStatusV40 {
+	serverUpdateStatusCache.RLock()
+	defer serverUpdateStatusCache.RUnlock()
+	return serverUpdateStatusCache.serverMap[hostname]
+}
+
+var once = sync.Once{}
+
+func InitServerUpdateStatusCache(interval time.Duration, db *sql.DB, timeout time.Duration) {
+	once.Do(func() {
+		if interval <= 0 {
+			return
+		}
+		serverUpdateStatusCache.enabled = true
+		refreshServerUpdateStatusCache(db, timeout)
+		startServerUpdateStatusCacheRefresher(interval, db, timeout)
+	})
+}
+
+func startServerUpdateStatusCacheRefresher(interval time.Duration, db *sql.DB, timeout time.Duration) {
+	go func() {
+		for {
+			time.Sleep(interval)
+			refreshServerUpdateStatusCache(db, timeout)
+		}
+	}()
+}
+
+func refreshServerUpdateStatusCache(db *sql.DB, timeout time.Duration) {
+	newServerUpdateStatuses, err := getServerUpdateStatuses(db, timeout)
+	if err != nil {
+		log.Errorf("refreshing server update status cache: %s", err.Error())
+		return
+	}
+	serverUpdateStatusCache.Lock()
+	defer serverUpdateStatusCache.Unlock()
+	serverUpdateStatusCache.serverMap = newServerUpdateStatuses
+	serverUpdateStatusCache.initialized = true
+	log.Infof("refreshed server update status cache (len = %d)", len(serverUpdateStatusCache.serverMap))
+}
+
+type serverInfo struct {
+	id               int
+	hostname         string
+	typeName         string
+	cdnId            int
+	status           string
+	cachegroup       int
+	configUpdateTime *time.Time
+	configApplyTime  *time.Time
+	revalUpdateTime  *time.Time
+	revalApplyTime   *time.Time
+}
+
+const getUseRevalPendingQuery = `
+	SELECT value::BOOLEAN
+	FROM parameter
+	WHERE name = 'use_reval_pending' AND config_file = 'global'
+	UNION ALL SELECT FALSE FETCH FIRST 1 ROW ONLY
+`
+
+const getServerInfoQuery = `
+	SELECT
+		s.id,
+		s.host_name,
+		t.name,
+		s.cdn_id,
+		st.name,
+		s.cachegroup,
+		s.config_update_time,
+		s.config_apply_time,
+		s.revalidate_update_time,
+		s.revalidate_apply_time
+	FROM server s
+	JOIN type t ON t.id = s.type
+	JOIN status st ON st.id = s.status
+`
+
+const getCacheGroupsQuery = `
+	SELECT
+		c.id,
+		c.parent_cachegroup_id,
+		c.secondary_parent_cachegroup_id
+	FROM cachegroup c
+`
+
+const getTopologyCacheGroupParentsQuery = `
+	SELECT
+		cg_child.id,
+		ARRAY_AGG(DISTINCT cg_parent.id)
+	FROM topology_cachegroup_parents tcp
+	JOIN topology_cachegroup tc_child on tc_child.id = tcp.child
+	JOIN cachegroup cg_child ON cg_child.name = tc_child.cachegroup
+	JOIN topology_cachegroup tc_parent ON tc_parent.id = tcp.parent
+	JOIN cachegroup cg_parent ON cg_parent.name = tc_parent.cachegroup
+	GROUP BY cg_child.id
+`
+
+func getServerUpdateStatuses(db *sql.DB, timeout time.Duration) (map[string][]tc.ServerUpdateStatusV40, error) {
+	dbCtx, dbClose := context.WithTimeout(context.Background(), timeout)
+	defer dbClose()
+	serversByID := make(map[int]serverInfo)
+	updatePendingByCDNCachegroup := make(map[int]map[int]bool)
+	revalPendingByCDNCachegroup := make(map[int]map[int]bool)
+	tx, err := db.BeginTx(dbCtx, nil)
+	if err != nil {
+		return nil, fmt.Errorf("beginning server update status transaction: %w", err)
+	}
+	defer func() {
+		if err := tx.Commit(); err != nil && err != sql.ErrTxDone {
+			log.Errorln("committing server update status transaction: " + err.Error())
+		}
+	}()
+
+	useRevalPending := false
+	if err := tx.QueryRowContext(dbCtx, getUseRevalPendingQuery).Scan(&useRevalPending); err != nil {
+		return nil, fmt.Errorf("querying use_reval_pending param: %w", err)
+	}
+
+	// get all servers and build map of update/revalPending by cachegroup+CDN
+	serverRows, err := tx.QueryContext(dbCtx, getServerInfoQuery)
+	if err != nil {
+		return nil, fmt.Errorf("querying servers: %w", err)
+	}
+	defer log.Close(serverRows, "closing server rows")
+	for serverRows.Next() {
+		s := serverInfo{}
+		if err := serverRows.Scan(&s.id, &s.hostname, &s.typeName, &s.cdnId, &s.status, &s.cachegroup, &s.configUpdateTime, &s.configApplyTime, &s.revalUpdateTime, &s.revalApplyTime); err != nil {
+			return nil, fmt.Errorf("scanning servers: %w", err)
+		}
+		serversByID[s.id] = s
+		if _, ok := updatePendingByCDNCachegroup[s.cdnId]; !ok {
+			updatePendingByCDNCachegroup[s.cdnId] = make(map[int]bool)
+		}
+		if _, ok := revalPendingByCDNCachegroup[s.cdnId]; !ok {
+			revalPendingByCDNCachegroup[s.cdnId] = make(map[int]bool)
+		}
+		status := tc.CacheStatusFromString(s.status)
+		if tc.IsCacheType(s.typeName) && (status == tc.CacheStatusOnline || status == tc.CacheStatusReported || status == tc.CacheStatusAdminDown) {
+			if s.configUpdateTime.After(*s.configApplyTime) {
+				updatePendingByCDNCachegroup[s.cdnId][s.cachegroup] = true
+			}
+			if s.revalUpdateTime.After(*s.revalApplyTime) {
+				revalPendingByCDNCachegroup[s.cdnId][s.cachegroup] = true
+			}
+		}
+	}
+	if err := serverRows.Err(); err != nil {
+		return nil, fmt.Errorf("iterating over server rows: %w", err)
+	}
+
+	// get all legacy cachegroup parents
+	cacheGroupParents := make(map[int]map[int]struct{})
+	cacheGroupRows, err := tx.QueryContext(dbCtx, getCacheGroupsQuery)
+	if err != nil {
+		return nil, fmt.Errorf("querying cachegroups: %w", err)
+	}
+	defer log.Close(cacheGroupRows, "closing cachegroup rows")
+	for cacheGroupRows.Next() {
+		id := 0
+		parentID := new(int)
+		secondaryParentID := new(int)
+		if err := cacheGroupRows.Scan(&id, &parentID, &secondaryParentID); err != nil {
+			return nil, fmt.Errorf("scanning cachegroups: %w", err)
+		}
+		cacheGroupParents[id] = make(map[int]struct{})
+		if parentID != nil {
+			cacheGroupParents[id][*parentID] = struct{}{}
+		}
+		if secondaryParentID != nil {
+			cacheGroupParents[id][*secondaryParentID] = struct{}{}
+		}
+	}
+	if err := cacheGroupRows.Err(); err != nil {
+		return nil, fmt.Errorf("iterating over cachegroup rows: %w", err)
+	}
+
+	// get all topology-based cachegroup parents
+	topologyCachegroupRows, err := tx.QueryContext(dbCtx, getTopologyCacheGroupParentsQuery)
+	if err != nil {
+		return nil, fmt.Errorf("querying topology cachegroups: %w", err)
+	}
+	defer log.Close(topologyCachegroupRows, "closing topology cachegroup rows")
+	for topologyCachegroupRows.Next() {
+		id := 0
+		parents := []int32{}
+		if err := topologyCachegroupRows.Scan(&id, pq.Array(&parents)); err != nil {
+			return nil, fmt.Errorf("scanning topology cachegroup rows: %w", err)
+		}
+		for _, p := range parents {
+			cacheGroupParents[id][int(p)] = struct{}{}
+		}
+	}
+	if err = topologyCachegroupRows.Err(); err != nil {
+		return nil, fmt.Errorf("iterating over topology cachegroup rows: %w", err)
+	}
+
+	serverUpdateStatuses := make(map[string][]tc.ServerUpdateStatusV40, len(serversByID))
+	for serverID, server := range serversByID {
+		updateStatus := tc.ServerUpdateStatusV40{
+			HostName:             server.hostname,
+			UpdatePending:        server.configUpdateTime.After(*server.configApplyTime),
+			RevalPending:         server.revalUpdateTime.After(*server.revalApplyTime),

Review Comment:
   Can the `revalApplyTime` or `configApplyTime` be `nil`? In that case, this will panic.



##########
traffic_ops/traffic_ops_golang/server/servers_update_status.go:
##########
@@ -175,3 +180,248 @@ ORDER BY s.id
 	}
 	return updateStatuses, nil, nil
 }
+
+type serverUpdateStatuses struct {
+	serverMap map[string][]tc.ServerUpdateStatusV40
+	*sync.RWMutex
+	initialized bool
+	enabled     bool // note: enabled is only written to once at startup, before serving requests, so it doesn't need synchronized access
+}
+
+var serverUpdateStatusCache = serverUpdateStatuses{RWMutex: &sync.RWMutex{}}
+
+func serverUpdateStatusCacheIsEnabled() bool {
+	if serverUpdateStatusCache.enabled {
+		serverUpdateStatusCache.RLock()
+		defer serverUpdateStatusCache.RUnlock()
+		return serverUpdateStatusCache.initialized
+	}
+	return false
+}
+
+func getServerUpdateStatusFromCache(hostname string) []tc.ServerUpdateStatusV40 {
+	serverUpdateStatusCache.RLock()
+	defer serverUpdateStatusCache.RUnlock()
+	return serverUpdateStatusCache.serverMap[hostname]
+}
+
+var once = sync.Once{}
+
+func InitServerUpdateStatusCache(interval time.Duration, db *sql.DB, timeout time.Duration) {
+	once.Do(func() {
+		if interval <= 0 {
+			return
+		}
+		serverUpdateStatusCache.enabled = true
+		refreshServerUpdateStatusCache(db, timeout)
+		startServerUpdateStatusCacheRefresher(interval, db, timeout)
+	})
+}
+
+func startServerUpdateStatusCacheRefresher(interval time.Duration, db *sql.DB, timeout time.Duration) {
+	go func() {
+		for {
+			time.Sleep(interval)
+			refreshServerUpdateStatusCache(db, timeout)
+		}
+	}()
+}
+
+func refreshServerUpdateStatusCache(db *sql.DB, timeout time.Duration) {
+	newServerUpdateStatuses, err := getServerUpdateStatuses(db, timeout)
+	if err != nil {
+		log.Errorf("refreshing server update status cache: %s", err.Error())
+		return
+	}
+	serverUpdateStatusCache.Lock()
+	defer serverUpdateStatusCache.Unlock()
+	serverUpdateStatusCache.serverMap = newServerUpdateStatuses
+	serverUpdateStatusCache.initialized = true
+	log.Infof("refreshed server update status cache (len = %d)", len(serverUpdateStatusCache.serverMap))
+}
+
+type serverInfo struct {
+	id               int
+	hostname         string
+	typeName         string
+	cdnId            int
+	status           string
+	cachegroup       int
+	configUpdateTime *time.Time
+	configApplyTime  *time.Time
+	revalUpdateTime  *time.Time
+	revalApplyTime   *time.Time
+}
+
+const getUseRevalPendingQuery = `
+	SELECT value::BOOLEAN
+	FROM parameter
+	WHERE name = 'use_reval_pending' AND config_file = 'global'
+	UNION ALL SELECT FALSE FETCH FIRST 1 ROW ONLY
+`
+
+const getServerInfoQuery = `
+	SELECT
+		s.id,
+		s.host_name,
+		t.name,
+		s.cdn_id,
+		st.name,
+		s.cachegroup,
+		s.config_update_time,
+		s.config_apply_time,
+		s.revalidate_update_time,
+		s.revalidate_apply_time
+	FROM server s
+	JOIN type t ON t.id = s.type
+	JOIN status st ON st.id = s.status
+`
+
+const getCacheGroupsQuery = `
+	SELECT
+		c.id,
+		c.parent_cachegroup_id,
+		c.secondary_parent_cachegroup_id
+	FROM cachegroup c
+`
+
+const getTopologyCacheGroupParentsQuery = `
+	SELECT
+		cg_child.id,
+		ARRAY_AGG(DISTINCT cg_parent.id)
+	FROM topology_cachegroup_parents tcp
+	JOIN topology_cachegroup tc_child on tc_child.id = tcp.child

Review Comment:
   nit: should be `ON`



##########
lib/go-tc/enum.go:
##########
@@ -107,6 +107,10 @@ func CacheTypeFromString(s string) CacheType {
 	return CacheTypeInvalid
 }
 
+func IsCacheType(s string) bool {

Review Comment:
   Also, could you pls add a GoDoc to this method?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@trafficcontrol.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [trafficcontrol] rawlinp commented on a diff in pull request #6838: Add TO option to cache server update status in memory

Posted by GitBox <gi...@apache.org>.
rawlinp commented on code in PR #6838:
URL: https://github.com/apache/trafficcontrol/pull/6838#discussion_r881952158


##########
lib/go-tc/enum.go:
##########
@@ -107,6 +107,10 @@ func CacheTypeFromString(s string) CacheType {
 	return CacheTypeInvalid
 }
 
+func IsCacheType(s string) bool {

Review Comment:
   Done



##########
traffic_ops/traffic_ops_golang/server/servers_update_status.go:
##########
@@ -175,3 +180,248 @@ ORDER BY s.id
 	}
 	return updateStatuses, nil, nil
 }
+
+type serverUpdateStatuses struct {
+	serverMap map[string][]tc.ServerUpdateStatusV40
+	*sync.RWMutex
+	initialized bool
+	enabled     bool // note: enabled is only written to once at startup, before serving requests, so it doesn't need synchronized access
+}
+
+var serverUpdateStatusCache = serverUpdateStatuses{RWMutex: &sync.RWMutex{}}
+
+func serverUpdateStatusCacheIsEnabled() bool {
+	if serverUpdateStatusCache.enabled {
+		serverUpdateStatusCache.RLock()
+		defer serverUpdateStatusCache.RUnlock()
+		return serverUpdateStatusCache.initialized
+	}
+	return false
+}
+
+func getServerUpdateStatusFromCache(hostname string) []tc.ServerUpdateStatusV40 {
+	serverUpdateStatusCache.RLock()
+	defer serverUpdateStatusCache.RUnlock()
+	return serverUpdateStatusCache.serverMap[hostname]
+}
+
+var once = sync.Once{}
+
+func InitServerUpdateStatusCache(interval time.Duration, db *sql.DB, timeout time.Duration) {
+	once.Do(func() {
+		if interval <= 0 {
+			return
+		}
+		serverUpdateStatusCache.enabled = true
+		refreshServerUpdateStatusCache(db, timeout)
+		startServerUpdateStatusCacheRefresher(interval, db, timeout)
+	})
+}
+
+func startServerUpdateStatusCacheRefresher(interval time.Duration, db *sql.DB, timeout time.Duration) {
+	go func() {
+		for {
+			time.Sleep(interval)
+			refreshServerUpdateStatusCache(db, timeout)
+		}
+	}()
+}
+
+func refreshServerUpdateStatusCache(db *sql.DB, timeout time.Duration) {
+	newServerUpdateStatuses, err := getServerUpdateStatuses(db, timeout)
+	if err != nil {
+		log.Errorf("refreshing server update status cache: %s", err.Error())
+		return
+	}
+	serverUpdateStatusCache.Lock()
+	defer serverUpdateStatusCache.Unlock()
+	serverUpdateStatusCache.serverMap = newServerUpdateStatuses
+	serverUpdateStatusCache.initialized = true
+	log.Infof("refreshed server update status cache (len = %d)", len(serverUpdateStatusCache.serverMap))
+}
+
+type serverInfo struct {
+	id               int
+	hostname         string
+	typeName         string
+	cdnId            int
+	status           string
+	cachegroup       int
+	configUpdateTime *time.Time
+	configApplyTime  *time.Time
+	revalUpdateTime  *time.Time
+	revalApplyTime   *time.Time
+}
+
+const getUseRevalPendingQuery = `
+	SELECT value::BOOLEAN
+	FROM parameter
+	WHERE name = 'use_reval_pending' AND config_file = 'global'
+	UNION ALL SELECT FALSE FETCH FIRST 1 ROW ONLY
+`
+
+const getServerInfoQuery = `
+	SELECT
+		s.id,
+		s.host_name,
+		t.name,
+		s.cdn_id,
+		st.name,
+		s.cachegroup,
+		s.config_update_time,
+		s.config_apply_time,
+		s.revalidate_update_time,
+		s.revalidate_apply_time
+	FROM server s
+	JOIN type t ON t.id = s.type
+	JOIN status st ON st.id = s.status
+`
+
+const getCacheGroupsQuery = `
+	SELECT
+		c.id,
+		c.parent_cachegroup_id,
+		c.secondary_parent_cachegroup_id
+	FROM cachegroup c
+`
+
+const getTopologyCacheGroupParentsQuery = `
+	SELECT
+		cg_child.id,
+		ARRAY_AGG(DISTINCT cg_parent.id)
+	FROM topology_cachegroup_parents tcp
+	JOIN topology_cachegroup tc_child on tc_child.id = tcp.child
+	JOIN cachegroup cg_child ON cg_child.name = tc_child.cachegroup
+	JOIN topology_cachegroup tc_parent ON tc_parent.id = tcp.parent
+	JOIN cachegroup cg_parent ON cg_parent.name = tc_parent.cachegroup
+	GROUP BY cg_child.id
+`
+
+func getServerUpdateStatuses(db *sql.DB, timeout time.Duration) (map[string][]tc.ServerUpdateStatusV40, error) {
+	dbCtx, dbClose := context.WithTimeout(context.Background(), timeout)
+	defer dbClose()
+	serversByID := make(map[int]serverInfo)
+	updatePendingByCDNCachegroup := make(map[int]map[int]bool)
+	revalPendingByCDNCachegroup := make(map[int]map[int]bool)
+	tx, err := db.BeginTx(dbCtx, nil)
+	if err != nil {
+		return nil, fmt.Errorf("beginning server update status transaction: %w", err)
+	}
+	defer func() {
+		if err := tx.Commit(); err != nil && err != sql.ErrTxDone {
+			log.Errorln("committing server update status transaction: " + err.Error())
+		}
+	}()
+
+	useRevalPending := false
+	if err := tx.QueryRowContext(dbCtx, getUseRevalPendingQuery).Scan(&useRevalPending); err != nil {
+		return nil, fmt.Errorf("querying use_reval_pending param: %w", err)
+	}
+
+	// get all servers and build map of update/revalPending by cachegroup+CDN
+	serverRows, err := tx.QueryContext(dbCtx, getServerInfoQuery)
+	if err != nil {
+		return nil, fmt.Errorf("querying servers: %w", err)
+	}
+	defer log.Close(serverRows, "closing server rows")
+	for serverRows.Next() {
+		s := serverInfo{}
+		if err := serverRows.Scan(&s.id, &s.hostname, &s.typeName, &s.cdnId, &s.status, &s.cachegroup, &s.configUpdateTime, &s.configApplyTime, &s.revalUpdateTime, &s.revalApplyTime); err != nil {
+			return nil, fmt.Errorf("scanning servers: %w", err)
+		}
+		serversByID[s.id] = s
+		if _, ok := updatePendingByCDNCachegroup[s.cdnId]; !ok {
+			updatePendingByCDNCachegroup[s.cdnId] = make(map[int]bool)
+		}
+		if _, ok := revalPendingByCDNCachegroup[s.cdnId]; !ok {
+			revalPendingByCDNCachegroup[s.cdnId] = make(map[int]bool)
+		}
+		status := tc.CacheStatusFromString(s.status)
+		if tc.IsCacheType(s.typeName) && (status == tc.CacheStatusOnline || status == tc.CacheStatusReported || status == tc.CacheStatusAdminDown) {
+			if s.configUpdateTime.After(*s.configApplyTime) {
+				updatePendingByCDNCachegroup[s.cdnId][s.cachegroup] = true
+			}
+			if s.revalUpdateTime.After(*s.revalApplyTime) {
+				revalPendingByCDNCachegroup[s.cdnId][s.cachegroup] = true
+			}
+		}
+	}
+	if err := serverRows.Err(); err != nil {
+		return nil, fmt.Errorf("iterating over server rows: %w", err)
+	}
+
+	// get all legacy cachegroup parents
+	cacheGroupParents := make(map[int]map[int]struct{})
+	cacheGroupRows, err := tx.QueryContext(dbCtx, getCacheGroupsQuery)
+	if err != nil {
+		return nil, fmt.Errorf("querying cachegroups: %w", err)
+	}
+	defer log.Close(cacheGroupRows, "closing cachegroup rows")
+	for cacheGroupRows.Next() {
+		id := 0
+		parentID := new(int)
+		secondaryParentID := new(int)
+		if err := cacheGroupRows.Scan(&id, &parentID, &secondaryParentID); err != nil {
+			return nil, fmt.Errorf("scanning cachegroups: %w", err)
+		}
+		cacheGroupParents[id] = make(map[int]struct{})
+		if parentID != nil {
+			cacheGroupParents[id][*parentID] = struct{}{}
+		}
+		if secondaryParentID != nil {
+			cacheGroupParents[id][*secondaryParentID] = struct{}{}
+		}
+	}
+	if err := cacheGroupRows.Err(); err != nil {
+		return nil, fmt.Errorf("iterating over cachegroup rows: %w", err)
+	}
+
+	// get all topology-based cachegroup parents
+	topologyCachegroupRows, err := tx.QueryContext(dbCtx, getTopologyCacheGroupParentsQuery)
+	if err != nil {
+		return nil, fmt.Errorf("querying topology cachegroups: %w", err)
+	}
+	defer log.Close(topologyCachegroupRows, "closing topology cachegroup rows")
+	for topologyCachegroupRows.Next() {
+		id := 0
+		parents := []int32{}
+		if err := topologyCachegroupRows.Scan(&id, pq.Array(&parents)); err != nil {
+			return nil, fmt.Errorf("scanning topology cachegroup rows: %w", err)
+		}
+		for _, p := range parents {
+			cacheGroupParents[id][int(p)] = struct{}{}
+		}
+	}
+	if err = topologyCachegroupRows.Err(); err != nil {
+		return nil, fmt.Errorf("iterating over topology cachegroup rows: %w", err)
+	}
+
+	serverUpdateStatuses := make(map[string][]tc.ServerUpdateStatusV40, len(serversByID))
+	for serverID, server := range serversByID {
+		updateStatus := tc.ServerUpdateStatusV40{
+			HostName:             server.hostname,
+			UpdatePending:        server.configUpdateTime.After(*server.configApplyTime),
+			RevalPending:         server.revalUpdateTime.After(*server.revalApplyTime),

Review Comment:
   No, not unless the DB schema is changed to make the fields nullable. Originally, I had these fields as non-pointers, because generally that's what we want when DB columns are non-null, but it was causing weird pointer issues in the unit tests that I couldn't really explain.
   
   That said, if the DB columns ever _do_ become nullable and this code isn't changed, it might actually be better to crash than log an error. Fail fast, fail loud. Logged errors would be ignored, and we'd be sitting around wondering why none of our servers are able to queue updates.



##########
traffic_ops/traffic_ops_golang/server/servers_update_status.go:
##########
@@ -175,3 +180,248 @@ ORDER BY s.id
 	}
 	return updateStatuses, nil, nil
 }
+
+type serverUpdateStatuses struct {
+	serverMap map[string][]tc.ServerUpdateStatusV40
+	*sync.RWMutex
+	initialized bool
+	enabled     bool // note: enabled is only written to once at startup, before serving requests, so it doesn't need synchronized access
+}
+
+var serverUpdateStatusCache = serverUpdateStatuses{RWMutex: &sync.RWMutex{}}
+
+func serverUpdateStatusCacheIsEnabled() bool {
+	if serverUpdateStatusCache.enabled {
+		serverUpdateStatusCache.RLock()
+		defer serverUpdateStatusCache.RUnlock()
+		return serverUpdateStatusCache.initialized
+	}
+	return false
+}
+
+func getServerUpdateStatusFromCache(hostname string) []tc.ServerUpdateStatusV40 {
+	serverUpdateStatusCache.RLock()
+	defer serverUpdateStatusCache.RUnlock()
+	return serverUpdateStatusCache.serverMap[hostname]
+}
+
+var once = sync.Once{}
+
+func InitServerUpdateStatusCache(interval time.Duration, db *sql.DB, timeout time.Duration) {
+	once.Do(func() {
+		if interval <= 0 {
+			return
+		}
+		serverUpdateStatusCache.enabled = true
+		refreshServerUpdateStatusCache(db, timeout)
+		startServerUpdateStatusCacheRefresher(interval, db, timeout)
+	})
+}
+
+func startServerUpdateStatusCacheRefresher(interval time.Duration, db *sql.DB, timeout time.Duration) {
+	go func() {
+		for {
+			time.Sleep(interval)
+			refreshServerUpdateStatusCache(db, timeout)
+		}
+	}()
+}
+
+func refreshServerUpdateStatusCache(db *sql.DB, timeout time.Duration) {
+	newServerUpdateStatuses, err := getServerUpdateStatuses(db, timeout)
+	if err != nil {
+		log.Errorf("refreshing server update status cache: %s", err.Error())
+		return
+	}
+	serverUpdateStatusCache.Lock()
+	defer serverUpdateStatusCache.Unlock()
+	serverUpdateStatusCache.serverMap = newServerUpdateStatuses
+	serverUpdateStatusCache.initialized = true
+	log.Infof("refreshed server update status cache (len = %d)", len(serverUpdateStatusCache.serverMap))
+}
+
+type serverInfo struct {
+	id               int
+	hostname         string
+	typeName         string
+	cdnId            int
+	status           string
+	cachegroup       int
+	configUpdateTime *time.Time
+	configApplyTime  *time.Time
+	revalUpdateTime  *time.Time
+	revalApplyTime   *time.Time
+}
+
+const getUseRevalPendingQuery = `
+	SELECT value::BOOLEAN
+	FROM parameter
+	WHERE name = 'use_reval_pending' AND config_file = 'global'
+	UNION ALL SELECT FALSE FETCH FIRST 1 ROW ONLY
+`
+
+const getServerInfoQuery = `
+	SELECT
+		s.id,
+		s.host_name,
+		t.name,
+		s.cdn_id,
+		st.name,
+		s.cachegroup,
+		s.config_update_time,
+		s.config_apply_time,
+		s.revalidate_update_time,
+		s.revalidate_apply_time
+	FROM server s
+	JOIN type t ON t.id = s.type
+	JOIN status st ON st.id = s.status
+`
+
+const getCacheGroupsQuery = `
+	SELECT
+		c.id,
+		c.parent_cachegroup_id,
+		c.secondary_parent_cachegroup_id
+	FROM cachegroup c
+`
+
+const getTopologyCacheGroupParentsQuery = `
+	SELECT
+		cg_child.id,
+		ARRAY_AGG(DISTINCT cg_parent.id)
+	FROM topology_cachegroup_parents tcp
+	JOIN topology_cachegroup tc_child on tc_child.id = tcp.child

Review Comment:
   Done



##########
traffic_ops/traffic_ops_golang/server/servers_update_status.go:
##########
@@ -175,3 +180,248 @@ ORDER BY s.id
 	}
 	return updateStatuses, nil, nil
 }
+
+type serverUpdateStatuses struct {
+	serverMap map[string][]tc.ServerUpdateStatusV40
+	*sync.RWMutex
+	initialized bool
+	enabled     bool // note: enabled is only written to once at startup, before serving requests, so it doesn't need synchronized access
+}
+
+var serverUpdateStatusCache = serverUpdateStatuses{RWMutex: &sync.RWMutex{}}
+
+func serverUpdateStatusCacheIsEnabled() bool {
+	if serverUpdateStatusCache.enabled {
+		serverUpdateStatusCache.RLock()
+		defer serverUpdateStatusCache.RUnlock()
+		return serverUpdateStatusCache.initialized
+	}
+	return false
+}
+
+func getServerUpdateStatusFromCache(hostname string) []tc.ServerUpdateStatusV40 {
+	serverUpdateStatusCache.RLock()
+	defer serverUpdateStatusCache.RUnlock()
+	return serverUpdateStatusCache.serverMap[hostname]
+}
+
+var once = sync.Once{}
+
+func InitServerUpdateStatusCache(interval time.Duration, db *sql.DB, timeout time.Duration) {
+	once.Do(func() {
+		if interval <= 0 {
+			return
+		}
+		serverUpdateStatusCache.enabled = true
+		refreshServerUpdateStatusCache(db, timeout)
+		startServerUpdateStatusCacheRefresher(interval, db, timeout)
+	})
+}
+
+func startServerUpdateStatusCacheRefresher(interval time.Duration, db *sql.DB, timeout time.Duration) {
+	go func() {
+		for {
+			time.Sleep(interval)
+			refreshServerUpdateStatusCache(db, timeout)
+		}
+	}()
+}
+
+func refreshServerUpdateStatusCache(db *sql.DB, timeout time.Duration) {
+	newServerUpdateStatuses, err := getServerUpdateStatuses(db, timeout)
+	if err != nil {
+		log.Errorf("refreshing server update status cache: %s", err.Error())
+		return
+	}
+	serverUpdateStatusCache.Lock()
+	defer serverUpdateStatusCache.Unlock()
+	serverUpdateStatusCache.serverMap = newServerUpdateStatuses
+	serverUpdateStatusCache.initialized = true
+	log.Infof("refreshed server update status cache (len = %d)", len(serverUpdateStatusCache.serverMap))
+}
+
+type serverInfo struct {
+	id               int
+	hostname         string

Review Comment:
   Sure, done



##########
traffic_ops/traffic_ops_golang/server/servers_update_status.go:
##########
@@ -175,3 +180,248 @@ ORDER BY s.id
 	}
 	return updateStatuses, nil, nil
 }
+
+type serverUpdateStatuses struct {
+	serverMap map[string][]tc.ServerUpdateStatusV40
+	*sync.RWMutex
+	initialized bool
+	enabled     bool // note: enabled is only written to once at startup, before serving requests, so it doesn't need synchronized access
+}
+
+var serverUpdateStatusCache = serverUpdateStatuses{RWMutex: &sync.RWMutex{}}
+
+func serverUpdateStatusCacheIsEnabled() bool {

Review Comment:
   Sure, done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@trafficcontrol.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [trafficcontrol] srijeet0406 merged pull request #6838: Add TO option to cache server update status in memory

Posted by GitBox <gi...@apache.org>.
srijeet0406 merged PR #6838:
URL: https://github.com/apache/trafficcontrol/pull/6838


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@trafficcontrol.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org