You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@dubbo.apache.org by GitBox <gi...@apache.org> on 2021/02/05 04:46:50 UTC

[GitHub] [dubbo-go] watermelo commented on a change in pull request #976: Fix: event driven chain cache

watermelo commented on a change in pull request #976:
URL: https://github.com/apache/dubbo-go/pull/976#discussion_r570695348



##########
File path: protocol/rpc_status.go
##########
@@ -181,4 +194,84 @@ func CleanAllStatus() {
 		return true
 	}
 	serviceStatistic.Range(delete2)
+	delete3 := func(key, _ interface{}) bool {
+		invokerBlackList.Delete(key)
+		return true
+	}
+	invokerBlackList.Range(delete3)
+}
+
+// GetInvokerHealthyStatus get invoker's conn healthy status
+func GetInvokerHealthyStatus(invoker Invoker) bool {
+	_, found := invokerBlackList.Load(invoker.GetUrl().Key())
+	return !found
+}
+
+// SetInvokerUnhealthyStatus add target invoker to black list
+func SetInvokerUnhealthyStatus(invoker Invoker) {
+	invokerBlackList.Store(invoker.GetUrl().Key(), invoker)
+	logger.Info("Add invoker ip = ", invoker.GetUrl().Location, " to black list")
+	blackListCacheDirty.Store(true)
+}
+
+// RemoveInvokerUnhealthyStatus remove unhealthy status of target invoker from blacklist
+func RemoveInvokerUnhealthyStatus(invoker Invoker) {
+	invokerBlackList.Delete(invoker.GetUrl().Key())
+	logger.Info("Remove invoker ip = ", invoker.GetUrl().Location, " from black list")
+	blackListCacheDirty.Store(true)
+}
+
+// GetBlackListInvokers get at most size of blockSize invokers from black list
+func GetBlackListInvokers(blockSize int) []Invoker {
+	resultIvks := make([]Invoker, 0, 16)
+	invokerBlackList.Range(func(k, v interface{}) bool {
+		resultIvks = append(resultIvks, v.(Invoker))
+		return true
+	})
+	if blockSize > len(resultIvks) {
+		return resultIvks
+	}
+	return resultIvks[:blockSize]
+}
+
+// RemoveUrlKeyUnhealthyStatus called when event of provider unregister, delete from black list
+func RemoveUrlKeyUnhealthyStatus(key string) {
+	invokerBlackList.Delete(key)
+	logger.Info("Remove invoker key = ", key, " from black list")
+	blackListCacheDirty.Store(true)
+}
+
+func GetAndRefreshState() bool {
+	state := blackListCacheDirty.Load()
+	blackListCacheDirty.Store(false)
+	return state
+}
+
+// TryRefreshBlackList start 3 gr to check at most block=16 invokers in black list
+// if target invoker is available, then remove it from black list
+func TryRefreshBlackList() {
+	if atomic.CompareAndSwapInt32(&blackListRefreshing, 0, 1) {
+		wg := sync.WaitGroup{}
+		defer func() {
+			atomic.CompareAndSwapInt32(&blackListRefreshing, 1, 0)
+		}()
+
+		ivks := GetBlackListInvokers(constant.DEFAULT_BLACK_LIST_RECOVER_BLOCK)
+		logger.Debug("blackList len = ", len(ivks))
+
+		for i := 0; i < 3; i++ {
+			wg.Add(1)
+			go func(ivks []Invoker, i int) {
+				defer wg.Done()
+				for j, _ := range ivks {
+					if j%3-i == 0 {

Review comment:
       merge the `if`.

##########
File path: cluster/cluster_impl/base_cluster_invoker.go
##########
@@ -121,38 +121,45 @@ func (invoker *baseClusterInvoker) doSelect(lb cluster.LoadBalance, invocation p
 
 func (invoker *baseClusterInvoker) doSelectInvoker(lb cluster.LoadBalance, invocation protocol.Invocation, invokers []protocol.Invoker, invoked []protocol.Invoker) protocol.Invoker {
 	if len(invokers) == 0 {
-		logger.Errorf("the invokers of %s is nil. ", invocation.Invoker().GetUrl().ServiceKey())
 		return nil
 	}
+	go protocol.TryRefreshBlackList()
 	if len(invokers) == 1 {
-		return invokers[0]
+		if invokers[0].IsAvailable() {
+			return invokers[0]
+		}
+		protocol.SetInvokerUnhealthyStatus(invokers[0])
+		logger.Errorf("the invokers of %s is nil. ", invokers[0].GetUrl().ServiceKey())

Review comment:
       这个错误日志看起来跟上下文无关,或者是否应该放在 `SetInvokerUnhealthyStatus` 里更合适

##########
File path: cluster/cluster_impl/base_cluster_invoker.go
##########
@@ -121,38 +121,45 @@ func (invoker *baseClusterInvoker) doSelect(lb cluster.LoadBalance, invocation p
 
 func (invoker *baseClusterInvoker) doSelectInvoker(lb cluster.LoadBalance, invocation protocol.Invocation, invokers []protocol.Invoker, invoked []protocol.Invoker) protocol.Invoker {
 	if len(invokers) == 0 {
-		logger.Errorf("the invokers of %s is nil. ", invocation.Invoker().GetUrl().ServiceKey())
 		return nil
 	}
+	go protocol.TryRefreshBlackList()
 	if len(invokers) == 1 {
-		return invokers[0]
+		if invokers[0].IsAvailable() {
+			return invokers[0]
+		}
+		protocol.SetInvokerUnhealthyStatus(invokers[0])
+		logger.Errorf("the invokers of %s is nil. ", invokers[0].GetUrl().ServiceKey())
+		return nil
 	}
 
 	selectedInvoker := lb.Select(invokers, invocation)
 
-	//judge to if the selectedInvoker is invoked
-
+	//judge if the selected Invoker is invoked and available
 	if (!selectedInvoker.IsAvailable() && invoker.availablecheck) || isInvoked(selectedInvoker, invoked) {
+		protocol.SetInvokerUnhealthyStatus(selectedInvoker)
+		otherInvokers := getOtherInvokers(invokers, selectedInvoker)
 		// do reselect
-		var reslectInvokers []protocol.Invoker
-
-		for _, invoker := range invokers {
-			if !invoker.IsAvailable() {
+		for i := 0; i < 3; i++ {

Review comment:
       当整个循环次数超过 3 次,最后还是会返回最开始不可用的 `selectedInvoker `,是否符合预期




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@dubbo.apache.org
For additional commands, e-mail: notifications-help@dubbo.apache.org