You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@dubbo.apache.org by GitBox <gi...@apache.org> on 2021/07/02 03:03:04 UTC

[GitHub] [dubbo-go] cvictory commented on a change in pull request #1287: Fix: fix the black list bug and make ConnCheckRouter work well

cvictory commented on a change in pull request #1287:
URL: https://github.com/apache/dubbo-go/pull/1287#discussion_r662705254



##########
File path: protocol/rpc_status.go
##########
@@ -194,84 +187,190 @@ func CleanAllStatus() {
 		return true
 	}
 	serviceStatistic.Range(delete2)
-	delete3 := func(key, _ interface{}) bool {
-		invokerBlackList.Delete(key)
+	delete3 := func(_, value interface{}) bool {
+		if v, ok := value.(*ServiceHealthState); ok {
+			v.blackList.Range(func(key, value interface{}) bool {
+				v.blackList.Delete(key)
+				return true
+			})
+		}
 		return true
 	}
-	invokerBlackList.Range(delete3)
+	serviceStateMap.Range(delete3)
+}
+
+// if the ip is changed in kubernetes, then the ip will not exist. So we should recycle the invoker.
+// there are two ways :
+// 1. we should know when the invoker is dropped and clear the data from blacklist
+// 2. we add a counter to collect the retry times. After 512 times(just now) retry, we should clear it.
+type invokerState struct {
+	invoker    Invoker
+	retryTimes int32
+}
+
+func newInvokeState(invoker Invoker) *invokerState {
+	return &invokerState{
+		invoker: invoker,
+	}
+}
+
+func (s *invokerState) increateRetryTimes() {
+	s.retryTimes++
 }
 
 // GetInvokerHealthyStatus get invoker's conn healthy status
 func GetInvokerHealthyStatus(invoker Invoker) bool {
-	_, found := invokerBlackList.Load(invoker.GetURL().Key())
-	return !found
+	if v, ok := serviceStateMap.Load(invoker.GetURL().ServiceKey()); ok {
+		if state, ok := v.(*ServiceHealthState); ok {
+			_, found := state.blackList.Load(invoker.GetURL().Key())
+			return !found
+		}
+	}
+	return true
+}
+
+// nolint
+func GetAndRefreshState(url *common.URL) bool {
+	if v, ok := serviceStateMap.Load(url.ServiceKey()); ok {
+		if state, ok := v.(*ServiceHealthState); ok {
+			return atomic.CompareAndSwapInt32(state.rebuildRoute, 1, 0)
+		}
+	}
+	return false
+}
+
+type ServiceHealthState struct {
+	serviceKey string
+	//if some process in refresh
+	refreshState *int32
+	refresh      atomic.Value
+	rebuildRoute *int32
+	blackList    sync.Map // store unhealthy url blackList
+}
+
+func NewServiceState(serviceKey string) *ServiceHealthState {
+	if v, ok := serviceStateMap.Load(serviceKey); ok {
+		return v.(*ServiceHealthState)
+	}
+	serviceState := &ServiceHealthState{
+		refreshState: new(int32),
+		rebuildRoute: new(int32),
+		serviceKey:   serviceKey,
+	}
+	serviceStateMap.Store(serviceKey, serviceState)
+	return serviceState
+}
+
+func (s *ServiceHealthState) reset() {
+	s.refresh.Store(false)
+	atomic.StoreInt32(s.rebuildRoute, 0)
+	s.blackList.Range(func(key, value interface{}) bool {
+		s.blackList.Delete(key)
+		return true
+	})
+}
+
+func (s *ServiceHealthState) configNeedRefresh(needRefresh bool) {
+	s.refresh.Store(needRefresh)
+}
+
+func (s *ServiceHealthState) needRefresh() bool {
+	v := s.refresh.Load()
+	if v == nil {
+		return false
+	}
+	return v.(bool)
 }
 
 // SetInvokerUnhealthyStatus add target invoker to black list
-func SetInvokerUnhealthyStatus(invoker Invoker) {
-	invokerBlackList.Store(invoker.GetURL().Key(), invoker)
-	logger.Info("Add invoker ip = ", invoker.GetURL().Location, " to black list")
-	blackListCacheDirty.Store(true)
+func (s *ServiceHealthState) SetInvokerUnhealthyStatus(invoker Invoker) {
+	s.configNeedRefresh(true)
+	s.blackList.Store(invoker.GetURL().Key(), newInvokeState(invoker))
+	logger.Infof("Add invoker ip=%s to black list for service(%s)", invoker.GetURL().Location, invoker.GetURL().ServiceKey())
+	s.activeBlackListCacheDirty()
 }
 
 // RemoveInvokerUnhealthyStatus remove unhealthy status of target invoker from blacklist
-func RemoveInvokerUnhealthyStatus(invoker Invoker) {
-	invokerBlackList.Delete(invoker.GetURL().Key())
-	logger.Info("Remove invoker ip = ", invoker.GetURL().Location, " from black list")
-	blackListCacheDirty.Store(true)
+func (s *ServiceHealthState) RemoveInvokerUnhealthyStatus(invoker Invoker) {
+	s.blackList.Delete(invoker.GetURL().Key())
+	logger.Infof("Remove invoker ip(%s) from black list for service(%s)", invoker.GetURL().Location, invoker.GetURL().ServiceKey())
+	s.activeBlackListCacheDirty()
 }
 
 // GetBlackListInvokers get at most size of blockSize invokers from black list
-func GetBlackListInvokers(blockSize int) []Invoker {
-	resultIvks := make([]Invoker, 0, 16)
-	invokerBlackList.Range(func(k, v interface{}) bool {
-		resultIvks = append(resultIvks, v.(Invoker))
+func (s *ServiceHealthState) GetBlackListInvokers(blockSize int) []*invokerState {
+	resultIvks := make([]*invokerState, 0, blockSize)
+	s.blackList.Range(func(k, v interface{}) bool {
+		if v == nil {
+			return true
+		}
+		resultIvks = append(resultIvks, v.(*invokerState))
+		if len(resultIvks) == blockSize {
+			return false
+		}
 		return true
 	})
-	if blockSize > len(resultIvks) {
-		return resultIvks
-	}
-	return resultIvks[:blockSize]
+	return resultIvks
 }
 
 // RemoveUrlKeyUnhealthyStatus called when event of provider unregister, delete from black list
-func RemoveUrlKeyUnhealthyStatus(key string) {
-	invokerBlackList.Delete(key)
+func (s *ServiceHealthState) RemoveUrlKeyUnhealthyStatus(key string) {
+	if _, ok := s.blackList.Load(key); ok {
+		s.blackList.Delete(key)
+		s.activeBlackListCacheDirty()
+	}
 	logger.Info("Remove invoker key = ", key, " from black list")
-	blackListCacheDirty.Store(true)
 }
 
-func GetAndRefreshState() bool {
-	state := blackListCacheDirty.Load()
-	blackListCacheDirty.Store(false)
-	return state
+func (s *ServiceHealthState) activeBlackListCacheDirty() {
+	atomic.StoreInt32(s.rebuildRoute, 1)
 }
 
 // TryRefreshBlackList start 3 gr to check at most block=16 invokers in black list
 // if target invoker is available, then remove it from black list
-func TryRefreshBlackList() {
-	if atomic.CompareAndSwapInt32(&blackListRefreshing, 0, 1) {
-		go func() {
-			wg := sync.WaitGroup{}
-			defer func() {
-				atomic.CompareAndSwapInt32(&blackListRefreshing, 1, 0)
-			}()
-
-			ivks := GetBlackListInvokers(constant.DEFAULT_BLACK_LIST_RECOVER_BLOCK)
-			logger.Debug("blackList len = ", len(ivks))
-
-			for i := 0; i < 3; i++ {
-				wg.Add(1)
-				go func(ivks []Invoker, i int) {
-					defer wg.Done()
-					for j, _ := range ivks {
-						if j%3-i == 0 && ivks[j].(Invoker).IsAvailable() {
-							RemoveInvokerUnhealthyStatus(ivks[i])
-						}
-					}
-				}(ivks, i)
-			}
-			wg.Wait()
+func (s *ServiceHealthState) TryRefreshBlackList() {
+	if s.needRefresh() {
+		go s.refreshBlackList()
+	}
+}
+
+func (s *ServiceHealthState) refreshBlackList() {
+	defer func() {
+		if r := recover(); r != nil {
+			logger.Errorf("try to refresh black list failed: %s, %+v", s.serviceKey, r)
+		}
+	}()
+
+	if atomic.CompareAndSwapInt32(s.refreshState, 0, 1) {
+		defer func() {
+			atomic.CompareAndSwapInt32(s.refreshState, 1, 0)
 		}()
+
+		ivkStates := s.GetBlackListInvokers(constant.DEFAULT_BLACK_LIST_RECOVER_BLOCK)
+		logger.Debug("blackList len = ", len(ivkStates))
+		if len(ivkStates) == 0 {
+			logger.Infof("there is no data in black list(%s), and will not refresh black list.", s.serviceKey)
+			s.configNeedRefresh(false)
+			return
+		}
+		wg := sync.WaitGroup{}
+		for i := 0; i < 3; i++ {
+			wg.Add(1)
+			go func(ivks []*invokerState, i int) {
+				defer wg.Done()
+				for j, _ := range ivks {

Review comment:
       DONE

##########
File path: protocol/rpc_status.go
##########
@@ -18,31 +18,24 @@
 package protocol
 
 import (
+	"github.com/apache/dubbo-go/common/constant"
+	"github.com/apache/dubbo-go/common/logger"

Review comment:
       done




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@dubbo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@dubbo.apache.org
For additional commands, e-mail: notifications-help@dubbo.apache.org