You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@trafficcontrol.apache.org by GitBox <gi...@apache.org> on 2022/05/25 17:35:28 UTC

[GitHub] [trafficcontrol] srijeet0406 commented on a diff in pull request #6838: Add TO option to cache server update status in memory

srijeet0406 commented on code in PR #6838:
URL: https://github.com/apache/trafficcontrol/pull/6838#discussion_r881890722


##########
traffic_ops/traffic_ops_golang/server/servers_update_status.go:
##########
@@ -175,3 +180,248 @@ ORDER BY s.id
 	}
 	return updateStatuses, nil, nil
 }
+
+type serverUpdateStatuses struct {
+	serverMap map[string][]tc.ServerUpdateStatusV40
+	*sync.RWMutex
+	initialized bool
+	enabled     bool // note: enabled is only written to once at startup, before serving requests, so it doesn't need synchronized access
+}
+
+var serverUpdateStatusCache = serverUpdateStatuses{RWMutex: &sync.RWMutex{}}
+
+func serverUpdateStatusCacheIsEnabled() bool {

Review Comment:
   Should this function be called `serverUpdateStatusCacheIsInitialized` ?



##########
lib/go-tc/enum.go:
##########
@@ -107,6 +107,10 @@ func CacheTypeFromString(s string) CacheType {
 	return CacheTypeInvalid
 }
 
+func IsCacheType(s string) bool {

Review Comment:
   Could we rename this method as `IsValidCacheType`?



##########
traffic_ops/traffic_ops_golang/server/servers_update_status.go:
##########
@@ -175,3 +180,248 @@ ORDER BY s.id
 	}
 	return updateStatuses, nil, nil
 }
+
+type serverUpdateStatuses struct {
+	serverMap map[string][]tc.ServerUpdateStatusV40
+	*sync.RWMutex
+	initialized bool
+	enabled     bool // note: enabled is only written to once at startup, before serving requests, so it doesn't need synchronized access
+}
+
+var serverUpdateStatusCache = serverUpdateStatuses{RWMutex: &sync.RWMutex{}}
+
+func serverUpdateStatusCacheIsEnabled() bool {
+	if serverUpdateStatusCache.enabled {
+		serverUpdateStatusCache.RLock()
+		defer serverUpdateStatusCache.RUnlock()
+		return serverUpdateStatusCache.initialized
+	}
+	return false
+}
+
+func getServerUpdateStatusFromCache(hostname string) []tc.ServerUpdateStatusV40 {
+	serverUpdateStatusCache.RLock()
+	defer serverUpdateStatusCache.RUnlock()
+	return serverUpdateStatusCache.serverMap[hostname]
+}
+
+var once = sync.Once{}
+
+func InitServerUpdateStatusCache(interval time.Duration, db *sql.DB, timeout time.Duration) {
+	once.Do(func() {
+		if interval <= 0 {
+			return
+		}
+		serverUpdateStatusCache.enabled = true
+		refreshServerUpdateStatusCache(db, timeout)
+		startServerUpdateStatusCacheRefresher(interval, db, timeout)
+	})
+}
+
+func startServerUpdateStatusCacheRefresher(interval time.Duration, db *sql.DB, timeout time.Duration) {
+	go func() {
+		for {
+			time.Sleep(interval)
+			refreshServerUpdateStatusCache(db, timeout)
+		}
+	}()
+}
+
+func refreshServerUpdateStatusCache(db *sql.DB, timeout time.Duration) {
+	newServerUpdateStatuses, err := getServerUpdateStatuses(db, timeout)
+	if err != nil {
+		log.Errorf("refreshing server update status cache: %s", err.Error())
+		return
+	}
+	serverUpdateStatusCache.Lock()
+	defer serverUpdateStatusCache.Unlock()
+	serverUpdateStatusCache.serverMap = newServerUpdateStatuses
+	serverUpdateStatusCache.initialized = true
+	log.Infof("refreshed server update status cache (len = %d)", len(serverUpdateStatusCache.serverMap))
+}
+
+type serverInfo struct {
+	id               int
+	hostname         string

Review Comment:
   nit: Could we rename this to `hostName`, just to be consistent?



##########
traffic_ops/traffic_ops_golang/server/servers_update_status.go:
##########
@@ -175,3 +180,248 @@ ORDER BY s.id
 	}
 	return updateStatuses, nil, nil
 }
+
+type serverUpdateStatuses struct {
+	serverMap map[string][]tc.ServerUpdateStatusV40
+	*sync.RWMutex
+	initialized bool
+	enabled     bool // note: enabled is only written to once at startup, before serving requests, so it doesn't need synchronized access
+}
+
+var serverUpdateStatusCache = serverUpdateStatuses{RWMutex: &sync.RWMutex{}}
+
+func serverUpdateStatusCacheIsEnabled() bool {
+	if serverUpdateStatusCache.enabled {
+		serverUpdateStatusCache.RLock()
+		defer serverUpdateStatusCache.RUnlock()
+		return serverUpdateStatusCache.initialized
+	}
+	return false
+}
+
+func getServerUpdateStatusFromCache(hostname string) []tc.ServerUpdateStatusV40 {
+	serverUpdateStatusCache.RLock()
+	defer serverUpdateStatusCache.RUnlock()
+	return serverUpdateStatusCache.serverMap[hostname]
+}
+
+var once = sync.Once{}
+
+func InitServerUpdateStatusCache(interval time.Duration, db *sql.DB, timeout time.Duration) {
+	once.Do(func() {
+		if interval <= 0 {
+			return
+		}
+		serverUpdateStatusCache.enabled = true
+		refreshServerUpdateStatusCache(db, timeout)
+		startServerUpdateStatusCacheRefresher(interval, db, timeout)
+	})
+}
+
+func startServerUpdateStatusCacheRefresher(interval time.Duration, db *sql.DB, timeout time.Duration) {
+	go func() {
+		for {
+			time.Sleep(interval)
+			refreshServerUpdateStatusCache(db, timeout)
+		}
+	}()
+}
+
+func refreshServerUpdateStatusCache(db *sql.DB, timeout time.Duration) {
+	newServerUpdateStatuses, err := getServerUpdateStatuses(db, timeout)
+	if err != nil {
+		log.Errorf("refreshing server update status cache: %s", err.Error())
+		return
+	}
+	serverUpdateStatusCache.Lock()
+	defer serverUpdateStatusCache.Unlock()
+	serverUpdateStatusCache.serverMap = newServerUpdateStatuses
+	serverUpdateStatusCache.initialized = true
+	log.Infof("refreshed server update status cache (len = %d)", len(serverUpdateStatusCache.serverMap))
+}
+
+type serverInfo struct {
+	id               int
+	hostname         string
+	typeName         string
+	cdnId            int
+	status           string
+	cachegroup       int
+	configUpdateTime *time.Time
+	configApplyTime  *time.Time
+	revalUpdateTime  *time.Time
+	revalApplyTime   *time.Time
+}
+
+const getUseRevalPendingQuery = `
+	SELECT value::BOOLEAN
+	FROM parameter
+	WHERE name = 'use_reval_pending' AND config_file = 'global'
+	UNION ALL SELECT FALSE FETCH FIRST 1 ROW ONLY
+`
+
+const getServerInfoQuery = `
+	SELECT
+		s.id,
+		s.host_name,
+		t.name,
+		s.cdn_id,
+		st.name,
+		s.cachegroup,
+		s.config_update_time,
+		s.config_apply_time,
+		s.revalidate_update_time,
+		s.revalidate_apply_time
+	FROM server s
+	JOIN type t ON t.id = s.type
+	JOIN status st ON st.id = s.status
+`
+
+const getCacheGroupsQuery = `
+	SELECT
+		c.id,
+		c.parent_cachegroup_id,
+		c.secondary_parent_cachegroup_id
+	FROM cachegroup c
+`
+
+const getTopologyCacheGroupParentsQuery = `
+	SELECT
+		cg_child.id,
+		ARRAY_AGG(DISTINCT cg_parent.id)
+	FROM topology_cachegroup_parents tcp
+	JOIN topology_cachegroup tc_child on tc_child.id = tcp.child
+	JOIN cachegroup cg_child ON cg_child.name = tc_child.cachegroup
+	JOIN topology_cachegroup tc_parent ON tc_parent.id = tcp.parent
+	JOIN cachegroup cg_parent ON cg_parent.name = tc_parent.cachegroup
+	GROUP BY cg_child.id
+`
+
+func getServerUpdateStatuses(db *sql.DB, timeout time.Duration) (map[string][]tc.ServerUpdateStatusV40, error) {
+	dbCtx, dbClose := context.WithTimeout(context.Background(), timeout)
+	defer dbClose()
+	serversByID := make(map[int]serverInfo)
+	updatePendingByCDNCachegroup := make(map[int]map[int]bool)
+	revalPendingByCDNCachegroup := make(map[int]map[int]bool)
+	tx, err := db.BeginTx(dbCtx, nil)
+	if err != nil {
+		return nil, fmt.Errorf("beginning server update status transaction: %w", err)
+	}
+	defer func() {
+		if err := tx.Commit(); err != nil && err != sql.ErrTxDone {
+			log.Errorln("committing server update status transaction: " + err.Error())
+		}
+	}()
+
+	useRevalPending := false
+	if err := tx.QueryRowContext(dbCtx, getUseRevalPendingQuery).Scan(&useRevalPending); err != nil {
+		return nil, fmt.Errorf("querying use_reval_pending param: %w", err)
+	}
+
+	// get all servers and build map of update/revalPending by cachegroup+CDN
+	serverRows, err := tx.QueryContext(dbCtx, getServerInfoQuery)
+	if err != nil {
+		return nil, fmt.Errorf("querying servers: %w", err)
+	}
+	defer log.Close(serverRows, "closing server rows")
+	for serverRows.Next() {
+		s := serverInfo{}
+		if err := serverRows.Scan(&s.id, &s.hostname, &s.typeName, &s.cdnId, &s.status, &s.cachegroup, &s.configUpdateTime, &s.configApplyTime, &s.revalUpdateTime, &s.revalApplyTime); err != nil {
+			return nil, fmt.Errorf("scanning servers: %w", err)
+		}
+		serversByID[s.id] = s
+		if _, ok := updatePendingByCDNCachegroup[s.cdnId]; !ok {
+			updatePendingByCDNCachegroup[s.cdnId] = make(map[int]bool)
+		}
+		if _, ok := revalPendingByCDNCachegroup[s.cdnId]; !ok {
+			revalPendingByCDNCachegroup[s.cdnId] = make(map[int]bool)
+		}
+		status := tc.CacheStatusFromString(s.status)
+		if tc.IsCacheType(s.typeName) && (status == tc.CacheStatusOnline || status == tc.CacheStatusReported || status == tc.CacheStatusAdminDown) {
+			if s.configUpdateTime.After(*s.configApplyTime) {
+				updatePendingByCDNCachegroup[s.cdnId][s.cachegroup] = true
+			}
+			if s.revalUpdateTime.After(*s.revalApplyTime) {
+				revalPendingByCDNCachegroup[s.cdnId][s.cachegroup] = true
+			}
+		}
+	}
+	if err := serverRows.Err(); err != nil {
+		return nil, fmt.Errorf("iterating over server rows: %w", err)
+	}
+
+	// get all legacy cachegroup parents
+	cacheGroupParents := make(map[int]map[int]struct{})
+	cacheGroupRows, err := tx.QueryContext(dbCtx, getCacheGroupsQuery)
+	if err != nil {
+		return nil, fmt.Errorf("querying cachegroups: %w", err)
+	}
+	defer log.Close(cacheGroupRows, "closing cachegroup rows")
+	for cacheGroupRows.Next() {
+		id := 0
+		parentID := new(int)
+		secondaryParentID := new(int)
+		if err := cacheGroupRows.Scan(&id, &parentID, &secondaryParentID); err != nil {
+			return nil, fmt.Errorf("scanning cachegroups: %w", err)
+		}
+		cacheGroupParents[id] = make(map[int]struct{})
+		if parentID != nil {
+			cacheGroupParents[id][*parentID] = struct{}{}
+		}
+		if secondaryParentID != nil {
+			cacheGroupParents[id][*secondaryParentID] = struct{}{}
+		}
+	}
+	if err := cacheGroupRows.Err(); err != nil {
+		return nil, fmt.Errorf("iterating over cachegroup rows: %w", err)
+	}
+
+	// get all topology-based cachegroup parents
+	topologyCachegroupRows, err := tx.QueryContext(dbCtx, getTopologyCacheGroupParentsQuery)
+	if err != nil {
+		return nil, fmt.Errorf("querying topology cachegroups: %w", err)
+	}
+	defer log.Close(topologyCachegroupRows, "closing topology cachegroup rows")
+	for topologyCachegroupRows.Next() {
+		id := 0
+		parents := []int32{}
+		if err := topologyCachegroupRows.Scan(&id, pq.Array(&parents)); err != nil {
+			return nil, fmt.Errorf("scanning topology cachegroup rows: %w", err)
+		}
+		for _, p := range parents {
+			cacheGroupParents[id][int(p)] = struct{}{}
+		}
+	}
+	if err = topologyCachegroupRows.Err(); err != nil {
+		return nil, fmt.Errorf("iterating over topology cachegroup rows: %w", err)
+	}
+
+	serverUpdateStatuses := make(map[string][]tc.ServerUpdateStatusV40, len(serversByID))
+	for serverID, server := range serversByID {
+		updateStatus := tc.ServerUpdateStatusV40{
+			HostName:             server.hostname,
+			UpdatePending:        server.configUpdateTime.After(*server.configApplyTime),
+			RevalPending:         server.revalUpdateTime.After(*server.revalApplyTime),

Review Comment:
   Can the `revalApplyTime` or `configApplyTime` be `nil`? In that case, this will panic.



##########
traffic_ops/traffic_ops_golang/server/servers_update_status.go:
##########
@@ -175,3 +180,248 @@ ORDER BY s.id
 	}
 	return updateStatuses, nil, nil
 }
+
+type serverUpdateStatuses struct {
+	serverMap map[string][]tc.ServerUpdateStatusV40
+	*sync.RWMutex
+	initialized bool
+	enabled     bool // note: enabled is only written to once at startup, before serving requests, so it doesn't need synchronized access
+}
+
+var serverUpdateStatusCache = serverUpdateStatuses{RWMutex: &sync.RWMutex{}}
+
+func serverUpdateStatusCacheIsEnabled() bool {
+	if serverUpdateStatusCache.enabled {
+		serverUpdateStatusCache.RLock()
+		defer serverUpdateStatusCache.RUnlock()
+		return serverUpdateStatusCache.initialized
+	}
+	return false
+}
+
+func getServerUpdateStatusFromCache(hostname string) []tc.ServerUpdateStatusV40 {
+	serverUpdateStatusCache.RLock()
+	defer serverUpdateStatusCache.RUnlock()
+	return serverUpdateStatusCache.serverMap[hostname]
+}
+
+var once = sync.Once{}
+
+func InitServerUpdateStatusCache(interval time.Duration, db *sql.DB, timeout time.Duration) {
+	once.Do(func() {
+		if interval <= 0 {
+			return
+		}
+		serverUpdateStatusCache.enabled = true
+		refreshServerUpdateStatusCache(db, timeout)
+		startServerUpdateStatusCacheRefresher(interval, db, timeout)
+	})
+}
+
+func startServerUpdateStatusCacheRefresher(interval time.Duration, db *sql.DB, timeout time.Duration) {
+	go func() {
+		for {
+			time.Sleep(interval)
+			refreshServerUpdateStatusCache(db, timeout)
+		}
+	}()
+}
+
+func refreshServerUpdateStatusCache(db *sql.DB, timeout time.Duration) {
+	newServerUpdateStatuses, err := getServerUpdateStatuses(db, timeout)
+	if err != nil {
+		log.Errorf("refreshing server update status cache: %s", err.Error())
+		return
+	}
+	serverUpdateStatusCache.Lock()
+	defer serverUpdateStatusCache.Unlock()
+	serverUpdateStatusCache.serverMap = newServerUpdateStatuses
+	serverUpdateStatusCache.initialized = true
+	log.Infof("refreshed server update status cache (len = %d)", len(serverUpdateStatusCache.serverMap))
+}
+
+type serverInfo struct {
+	id               int
+	hostname         string
+	typeName         string
+	cdnId            int
+	status           string
+	cachegroup       int
+	configUpdateTime *time.Time
+	configApplyTime  *time.Time
+	revalUpdateTime  *time.Time
+	revalApplyTime   *time.Time
+}
+
+const getUseRevalPendingQuery = `
+	SELECT value::BOOLEAN
+	FROM parameter
+	WHERE name = 'use_reval_pending' AND config_file = 'global'
+	UNION ALL SELECT FALSE FETCH FIRST 1 ROW ONLY
+`
+
+const getServerInfoQuery = `
+	SELECT
+		s.id,
+		s.host_name,
+		t.name,
+		s.cdn_id,
+		st.name,
+		s.cachegroup,
+		s.config_update_time,
+		s.config_apply_time,
+		s.revalidate_update_time,
+		s.revalidate_apply_time
+	FROM server s
+	JOIN type t ON t.id = s.type
+	JOIN status st ON st.id = s.status
+`
+
+const getCacheGroupsQuery = `
+	SELECT
+		c.id,
+		c.parent_cachegroup_id,
+		c.secondary_parent_cachegroup_id
+	FROM cachegroup c
+`
+
+const getTopologyCacheGroupParentsQuery = `
+	SELECT
+		cg_child.id,
+		ARRAY_AGG(DISTINCT cg_parent.id)
+	FROM topology_cachegroup_parents tcp
+	JOIN topology_cachegroup tc_child on tc_child.id = tcp.child

Review Comment:
   nit: should be `ON`



##########
lib/go-tc/enum.go:
##########
@@ -107,6 +107,10 @@ func CacheTypeFromString(s string) CacheType {
 	return CacheTypeInvalid
 }
 
+func IsCacheType(s string) bool {

Review Comment:
   Also, could you pls add a GoDoc to this method?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@trafficcontrol.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org