You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@trafficcontrol.apache.org by GitBox <gi...@apache.org> on 2022/05/25 18:02:16 UTC

[GitHub] [trafficcontrol] rawlinp commented on a diff in pull request #6838: Add TO option to cache server update status in memory

rawlinp commented on code in PR #6838:
URL: https://github.com/apache/trafficcontrol/pull/6838#discussion_r881952158


##########
lib/go-tc/enum.go:
##########
@@ -107,6 +107,10 @@ func CacheTypeFromString(s string) CacheType {
 	return CacheTypeInvalid
 }
 
+func IsCacheType(s string) bool {

Review Comment:
   Done



##########
traffic_ops/traffic_ops_golang/server/servers_update_status.go:
##########
@@ -175,3 +180,248 @@ ORDER BY s.id
 	}
 	return updateStatuses, nil, nil
 }
+
+type serverUpdateStatuses struct {
+	serverMap map[string][]tc.ServerUpdateStatusV40
+	*sync.RWMutex
+	initialized bool
+	enabled     bool // note: enabled is only written to once at startup, before serving requests, so it doesn't need synchronized access
+}
+
+var serverUpdateStatusCache = serverUpdateStatuses{RWMutex: &sync.RWMutex{}}
+
+func serverUpdateStatusCacheIsEnabled() bool {
+	if serverUpdateStatusCache.enabled {
+		serverUpdateStatusCache.RLock()
+		defer serverUpdateStatusCache.RUnlock()
+		return serverUpdateStatusCache.initialized
+	}
+	return false
+}
+
+func getServerUpdateStatusFromCache(hostname string) []tc.ServerUpdateStatusV40 {
+	serverUpdateStatusCache.RLock()
+	defer serverUpdateStatusCache.RUnlock()
+	return serverUpdateStatusCache.serverMap[hostname]
+}
+
+var once = sync.Once{}
+
+func InitServerUpdateStatusCache(interval time.Duration, db *sql.DB, timeout time.Duration) {
+	once.Do(func() {
+		if interval <= 0 {
+			return
+		}
+		serverUpdateStatusCache.enabled = true
+		refreshServerUpdateStatusCache(db, timeout)
+		startServerUpdateStatusCacheRefresher(interval, db, timeout)
+	})
+}
+
+func startServerUpdateStatusCacheRefresher(interval time.Duration, db *sql.DB, timeout time.Duration) {
+	go func() {
+		for {
+			time.Sleep(interval)
+			refreshServerUpdateStatusCache(db, timeout)
+		}
+	}()
+}
+
+func refreshServerUpdateStatusCache(db *sql.DB, timeout time.Duration) {
+	newServerUpdateStatuses, err := getServerUpdateStatuses(db, timeout)
+	if err != nil {
+		log.Errorf("refreshing server update status cache: %s", err.Error())
+		return
+	}
+	serverUpdateStatusCache.Lock()
+	defer serverUpdateStatusCache.Unlock()
+	serverUpdateStatusCache.serverMap = newServerUpdateStatuses
+	serverUpdateStatusCache.initialized = true
+	log.Infof("refreshed server update status cache (len = %d)", len(serverUpdateStatusCache.serverMap))
+}
+
+type serverInfo struct {
+	id               int
+	hostname         string
+	typeName         string
+	cdnId            int
+	status           string
+	cachegroup       int
+	configUpdateTime *time.Time
+	configApplyTime  *time.Time
+	revalUpdateTime  *time.Time
+	revalApplyTime   *time.Time
+}
+
+const getUseRevalPendingQuery = `
+	SELECT value::BOOLEAN
+	FROM parameter
+	WHERE name = 'use_reval_pending' AND config_file = 'global'
+	UNION ALL SELECT FALSE FETCH FIRST 1 ROW ONLY
+`
+
+const getServerInfoQuery = `
+	SELECT
+		s.id,
+		s.host_name,
+		t.name,
+		s.cdn_id,
+		st.name,
+		s.cachegroup,
+		s.config_update_time,
+		s.config_apply_time,
+		s.revalidate_update_time,
+		s.revalidate_apply_time
+	FROM server s
+	JOIN type t ON t.id = s.type
+	JOIN status st ON st.id = s.status
+`
+
+const getCacheGroupsQuery = `
+	SELECT
+		c.id,
+		c.parent_cachegroup_id,
+		c.secondary_parent_cachegroup_id
+	FROM cachegroup c
+`
+
+const getTopologyCacheGroupParentsQuery = `
+	SELECT
+		cg_child.id,
+		ARRAY_AGG(DISTINCT cg_parent.id)
+	FROM topology_cachegroup_parents tcp
+	JOIN topology_cachegroup tc_child on tc_child.id = tcp.child
+	JOIN cachegroup cg_child ON cg_child.name = tc_child.cachegroup
+	JOIN topology_cachegroup tc_parent ON tc_parent.id = tcp.parent
+	JOIN cachegroup cg_parent ON cg_parent.name = tc_parent.cachegroup
+	GROUP BY cg_child.id
+`
+
+func getServerUpdateStatuses(db *sql.DB, timeout time.Duration) (map[string][]tc.ServerUpdateStatusV40, error) {
+	dbCtx, dbClose := context.WithTimeout(context.Background(), timeout)
+	defer dbClose()
+	serversByID := make(map[int]serverInfo)
+	updatePendingByCDNCachegroup := make(map[int]map[int]bool)
+	revalPendingByCDNCachegroup := make(map[int]map[int]bool)
+	tx, err := db.BeginTx(dbCtx, nil)
+	if err != nil {
+		return nil, fmt.Errorf("beginning server update status transaction: %w", err)
+	}
+	defer func() {
+		if err := tx.Commit(); err != nil && err != sql.ErrTxDone {
+			log.Errorln("committing server update status transaction: " + err.Error())
+		}
+	}()
+
+	useRevalPending := false
+	if err := tx.QueryRowContext(dbCtx, getUseRevalPendingQuery).Scan(&useRevalPending); err != nil {
+		return nil, fmt.Errorf("querying use_reval_pending param: %w", err)
+	}
+
+	// get all servers and build map of update/revalPending by cachegroup+CDN
+	serverRows, err := tx.QueryContext(dbCtx, getServerInfoQuery)
+	if err != nil {
+		return nil, fmt.Errorf("querying servers: %w", err)
+	}
+	defer log.Close(serverRows, "closing server rows")
+	for serverRows.Next() {
+		s := serverInfo{}
+		if err := serverRows.Scan(&s.id, &s.hostname, &s.typeName, &s.cdnId, &s.status, &s.cachegroup, &s.configUpdateTime, &s.configApplyTime, &s.revalUpdateTime, &s.revalApplyTime); err != nil {
+			return nil, fmt.Errorf("scanning servers: %w", err)
+		}
+		serversByID[s.id] = s
+		if _, ok := updatePendingByCDNCachegroup[s.cdnId]; !ok {
+			updatePendingByCDNCachegroup[s.cdnId] = make(map[int]bool)
+		}
+		if _, ok := revalPendingByCDNCachegroup[s.cdnId]; !ok {
+			revalPendingByCDNCachegroup[s.cdnId] = make(map[int]bool)
+		}
+		status := tc.CacheStatusFromString(s.status)
+		if tc.IsCacheType(s.typeName) && (status == tc.CacheStatusOnline || status == tc.CacheStatusReported || status == tc.CacheStatusAdminDown) {
+			if s.configUpdateTime.After(*s.configApplyTime) {
+				updatePendingByCDNCachegroup[s.cdnId][s.cachegroup] = true
+			}
+			if s.revalUpdateTime.After(*s.revalApplyTime) {
+				revalPendingByCDNCachegroup[s.cdnId][s.cachegroup] = true
+			}
+		}
+	}
+	if err := serverRows.Err(); err != nil {
+		return nil, fmt.Errorf("iterating over server rows: %w", err)
+	}
+
+	// get all legacy cachegroup parents
+	cacheGroupParents := make(map[int]map[int]struct{})
+	cacheGroupRows, err := tx.QueryContext(dbCtx, getCacheGroupsQuery)
+	if err != nil {
+		return nil, fmt.Errorf("querying cachegroups: %w", err)
+	}
+	defer log.Close(cacheGroupRows, "closing cachegroup rows")
+	for cacheGroupRows.Next() {
+		id := 0
+		parentID := new(int)
+		secondaryParentID := new(int)
+		if err := cacheGroupRows.Scan(&id, &parentID, &secondaryParentID); err != nil {
+			return nil, fmt.Errorf("scanning cachegroups: %w", err)
+		}
+		cacheGroupParents[id] = make(map[int]struct{})
+		if parentID != nil {
+			cacheGroupParents[id][*parentID] = struct{}{}
+		}
+		if secondaryParentID != nil {
+			cacheGroupParents[id][*secondaryParentID] = struct{}{}
+		}
+	}
+	if err := cacheGroupRows.Err(); err != nil {
+		return nil, fmt.Errorf("iterating over cachegroup rows: %w", err)
+	}
+
+	// get all topology-based cachegroup parents
+	topologyCachegroupRows, err := tx.QueryContext(dbCtx, getTopologyCacheGroupParentsQuery)
+	if err != nil {
+		return nil, fmt.Errorf("querying topology cachegroups: %w", err)
+	}
+	defer log.Close(topologyCachegroupRows, "closing topology cachegroup rows")
+	for topologyCachegroupRows.Next() {
+		id := 0
+		parents := []int32{}
+		if err := topologyCachegroupRows.Scan(&id, pq.Array(&parents)); err != nil {
+			return nil, fmt.Errorf("scanning topology cachegroup rows: %w", err)
+		}
+		for _, p := range parents {
+			cacheGroupParents[id][int(p)] = struct{}{}
+		}
+	}
+	if err = topologyCachegroupRows.Err(); err != nil {
+		return nil, fmt.Errorf("iterating over topology cachegroup rows: %w", err)
+	}
+
+	serverUpdateStatuses := make(map[string][]tc.ServerUpdateStatusV40, len(serversByID))
+	for serverID, server := range serversByID {
+		updateStatus := tc.ServerUpdateStatusV40{
+			HostName:             server.hostname,
+			UpdatePending:        server.configUpdateTime.After(*server.configApplyTime),
+			RevalPending:         server.revalUpdateTime.After(*server.revalApplyTime),

Review Comment:
   No, not unless the DB schema is changed to make the fields nullable. Originally, I had these fields as non-pointers, because generally that's what we want when DB columns are non-null, but it was causing weird pointer issues in the unit tests that I couldn't really explain.
   
   That said, if the DB columns ever _do_ become nullable and this code isn't changed, it might actually be better to crash than log an error. Fail fast, fail loud. Logged errors would be ignored, and we'd be sitting around wondering why none of our servers are able to queue updates.



##########
traffic_ops/traffic_ops_golang/server/servers_update_status.go:
##########
@@ -175,3 +180,248 @@ ORDER BY s.id
 	}
 	return updateStatuses, nil, nil
 }
+
+type serverUpdateStatuses struct {
+	serverMap map[string][]tc.ServerUpdateStatusV40
+	*sync.RWMutex
+	initialized bool
+	enabled     bool // note: enabled is only written to once at startup, before serving requests, so it doesn't need synchronized access
+}
+
+var serverUpdateStatusCache = serverUpdateStatuses{RWMutex: &sync.RWMutex{}}
+
+func serverUpdateStatusCacheIsEnabled() bool {
+	if serverUpdateStatusCache.enabled {
+		serverUpdateStatusCache.RLock()
+		defer serverUpdateStatusCache.RUnlock()
+		return serverUpdateStatusCache.initialized
+	}
+	return false
+}
+
+func getServerUpdateStatusFromCache(hostname string) []tc.ServerUpdateStatusV40 {
+	serverUpdateStatusCache.RLock()
+	defer serverUpdateStatusCache.RUnlock()
+	return serverUpdateStatusCache.serverMap[hostname]
+}
+
+var once = sync.Once{}
+
+func InitServerUpdateStatusCache(interval time.Duration, db *sql.DB, timeout time.Duration) {
+	once.Do(func() {
+		if interval <= 0 {
+			return
+		}
+		serverUpdateStatusCache.enabled = true
+		refreshServerUpdateStatusCache(db, timeout)
+		startServerUpdateStatusCacheRefresher(interval, db, timeout)
+	})
+}
+
+func startServerUpdateStatusCacheRefresher(interval time.Duration, db *sql.DB, timeout time.Duration) {
+	go func() {
+		for {
+			time.Sleep(interval)
+			refreshServerUpdateStatusCache(db, timeout)
+		}
+	}()
+}
+
+func refreshServerUpdateStatusCache(db *sql.DB, timeout time.Duration) {
+	newServerUpdateStatuses, err := getServerUpdateStatuses(db, timeout)
+	if err != nil {
+		log.Errorf("refreshing server update status cache: %s", err.Error())
+		return
+	}
+	serverUpdateStatusCache.Lock()
+	defer serverUpdateStatusCache.Unlock()
+	serverUpdateStatusCache.serverMap = newServerUpdateStatuses
+	serverUpdateStatusCache.initialized = true
+	log.Infof("refreshed server update status cache (len = %d)", len(serverUpdateStatusCache.serverMap))
+}
+
+type serverInfo struct {
+	id               int
+	hostname         string
+	typeName         string
+	cdnId            int
+	status           string
+	cachegroup       int
+	configUpdateTime *time.Time
+	configApplyTime  *time.Time
+	revalUpdateTime  *time.Time
+	revalApplyTime   *time.Time
+}
+
+const getUseRevalPendingQuery = `
+	SELECT value::BOOLEAN
+	FROM parameter
+	WHERE name = 'use_reval_pending' AND config_file = 'global'
+	UNION ALL SELECT FALSE FETCH FIRST 1 ROW ONLY
+`
+
+const getServerInfoQuery = `
+	SELECT
+		s.id,
+		s.host_name,
+		t.name,
+		s.cdn_id,
+		st.name,
+		s.cachegroup,
+		s.config_update_time,
+		s.config_apply_time,
+		s.revalidate_update_time,
+		s.revalidate_apply_time
+	FROM server s
+	JOIN type t ON t.id = s.type
+	JOIN status st ON st.id = s.status
+`
+
+const getCacheGroupsQuery = `
+	SELECT
+		c.id,
+		c.parent_cachegroup_id,
+		c.secondary_parent_cachegroup_id
+	FROM cachegroup c
+`
+
+const getTopologyCacheGroupParentsQuery = `
+	SELECT
+		cg_child.id,
+		ARRAY_AGG(DISTINCT cg_parent.id)
+	FROM topology_cachegroup_parents tcp
+	JOIN topology_cachegroup tc_child on tc_child.id = tcp.child

Review Comment:
   Done



##########
traffic_ops/traffic_ops_golang/server/servers_update_status.go:
##########
@@ -175,3 +180,248 @@ ORDER BY s.id
 	}
 	return updateStatuses, nil, nil
 }
+
+type serverUpdateStatuses struct {
+	serverMap map[string][]tc.ServerUpdateStatusV40
+	*sync.RWMutex
+	initialized bool
+	enabled     bool // note: enabled is only written to once at startup, before serving requests, so it doesn't need synchronized access
+}
+
+var serverUpdateStatusCache = serverUpdateStatuses{RWMutex: &sync.RWMutex{}}
+
+func serverUpdateStatusCacheIsEnabled() bool {
+	if serverUpdateStatusCache.enabled {
+		serverUpdateStatusCache.RLock()
+		defer serverUpdateStatusCache.RUnlock()
+		return serverUpdateStatusCache.initialized
+	}
+	return false
+}
+
+func getServerUpdateStatusFromCache(hostname string) []tc.ServerUpdateStatusV40 {
+	serverUpdateStatusCache.RLock()
+	defer serverUpdateStatusCache.RUnlock()
+	return serverUpdateStatusCache.serverMap[hostname]
+}
+
+var once = sync.Once{}
+
+func InitServerUpdateStatusCache(interval time.Duration, db *sql.DB, timeout time.Duration) {
+	once.Do(func() {
+		if interval <= 0 {
+			return
+		}
+		serverUpdateStatusCache.enabled = true
+		refreshServerUpdateStatusCache(db, timeout)
+		startServerUpdateStatusCacheRefresher(interval, db, timeout)
+	})
+}
+
+func startServerUpdateStatusCacheRefresher(interval time.Duration, db *sql.DB, timeout time.Duration) {
+	go func() {
+		for {
+			time.Sleep(interval)
+			refreshServerUpdateStatusCache(db, timeout)
+		}
+	}()
+}
+
+func refreshServerUpdateStatusCache(db *sql.DB, timeout time.Duration) {
+	newServerUpdateStatuses, err := getServerUpdateStatuses(db, timeout)
+	if err != nil {
+		log.Errorf("refreshing server update status cache: %s", err.Error())
+		return
+	}
+	serverUpdateStatusCache.Lock()
+	defer serverUpdateStatusCache.Unlock()
+	serverUpdateStatusCache.serverMap = newServerUpdateStatuses
+	serverUpdateStatusCache.initialized = true
+	log.Infof("refreshed server update status cache (len = %d)", len(serverUpdateStatusCache.serverMap))
+}
+
+type serverInfo struct {
+	id               int
+	hostname         string

Review Comment:
   Sure, done



##########
traffic_ops/traffic_ops_golang/server/servers_update_status.go:
##########
@@ -175,3 +180,248 @@ ORDER BY s.id
 	}
 	return updateStatuses, nil, nil
 }
+
+type serverUpdateStatuses struct {
+	serverMap map[string][]tc.ServerUpdateStatusV40
+	*sync.RWMutex
+	initialized bool
+	enabled     bool // note: enabled is only written to once at startup, before serving requests, so it doesn't need synchronized access
+}
+
+var serverUpdateStatusCache = serverUpdateStatuses{RWMutex: &sync.RWMutex{}}
+
+func serverUpdateStatusCacheIsEnabled() bool {

Review Comment:
   Sure, done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@trafficcontrol.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org