You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dubbo.apache.org by fa...@apache.org on 2020/02/22 06:42:21 UTC

[dubbo-go] branch 1.3 updated: Merge pull request #358 from pantianying/addRlockForDubboInvoker

This is an automated email from the ASF dual-hosted git repository.

fangyc pushed a commit to branch 1.3
in repository https://gitbox.apache.org/repos/asf/dubbo-go.git


The following commit(s) were added to refs/heads/1.3 by this push:
     new 95065e5  Merge pull request #358 from pantianying/addRlockForDubboInvoker
     new 082cc62  Merge pull request #364 from fangyincheng/1.3
95065e5 is described below

commit 95065e54419b9e83f54291209ec72294857925b3
Author: Ming Deng <mi...@gmail.com>
AuthorDate: Thu Feb 20 10:06:07 2020 +0800

    Merge pull request #358 from pantianying/addRlockForDubboInvoker
    
    Fix:deal the panic when invoker destroy
---
 protocol/dubbo/dubbo_invoker.go | 40 ++++++++++++++++++++++++++++++++++------
 registry/directory/directory.go | 39 ++++++++++++++++++++++++++++-----------
 2 files changed, 62 insertions(+), 17 deletions(-)

diff --git a/protocol/dubbo/dubbo_invoker.go b/protocol/dubbo/dubbo_invoker.go
index 6dcf256..2d1ef1f 100644
--- a/protocol/dubbo/dubbo_invoker.go
+++ b/protocol/dubbo/dubbo_invoker.go
@@ -20,6 +20,8 @@ package dubbo
 import (
 	"strconv"
 	"sync"
+	"sync/atomic"
+	"time"
 )
 
 import (
@@ -34,7 +36,11 @@ import (
 	invocation_impl "github.com/apache/dubbo-go/protocol/invocation"
 )
 
-var Err_No_Reply = perrors.New("request need @response")
+var (
+	// ErrNoReply ...
+	ErrNoReply          = perrors.New("request need @response")
+	ErrDestroyedInvoker = perrors.New("request Destroyed invoker")
+)
 
 var (
 	attachmentKey = []string{constant.INTERFACE_KEY, constant.GROUP_KEY, constant.TOKEN_KEY, constant.TIMEOUT_KEY}
@@ -44,12 +50,15 @@ type DubboInvoker struct {
 	protocol.BaseInvoker
 	client   *Client
 	quitOnce sync.Once
+	// Used to record the number of requests. -1 represent this DubboInvoker is destroyed
+	reqNum int64
 }
 
 func NewDubboInvoker(url common.URL, client *Client) *DubboInvoker {
 	return &DubboInvoker{
 		BaseInvoker: *protocol.NewBaseInvoker(url),
 		client:      client,
+		reqNum:      0,
 	}
 }
 
@@ -59,6 +68,15 @@ func (di *DubboInvoker) Invoke(invocation protocol.Invocation) protocol.Result {
 		err    error
 		result protocol.RPCResult
 	)
+	if di.reqNum < 0 {
+		// Generally, the case will not happen, because the invoker has been removed
+		// from the invoker list before destroy,so no new request will enter the destroyed invoker
+		logger.Warnf("this dubboInvoker is destroyed")
+		result.Err = ErrDestroyedInvoker
+		return &result
+	}
+	atomic.AddInt64(&(di.reqNum), 1)
+	defer atomic.AddInt64(&(di.reqNum), -1)
 
 	inv := invocation.(*invocation_impl.RPCInvocation)
 	for _, k := range attachmentKey {
@@ -82,7 +100,7 @@ func (di *DubboInvoker) Invoke(invocation protocol.Invocation) protocol.Result {
 		}
 	} else {
 		if inv.Reply() == nil {
-			result.Err = Err_No_Reply
+			result.Err = ErrNoReply
 		} else {
 			result.Err = di.client.Call(NewRequest(url.Location, url, inv.MethodName(), inv.Arguments(), inv.Attachments()), response)
 		}
@@ -98,10 +116,20 @@ func (di *DubboInvoker) Invoke(invocation protocol.Invocation) protocol.Result {
 
 func (di *DubboInvoker) Destroy() {
 	di.quitOnce.Do(func() {
-		di.BaseInvoker.Destroy()
-
-		if di.client != nil {
-			di.client.Close()
+		for {
+			if di.reqNum == 0 {
+				di.reqNum = -1
+				logger.Info("dubboInvoker is destroyed,url:{%s}", di.GetUrl().Key())
+				di.BaseInvoker.Destroy()
+				if di.client != nil {
+					di.client.Close()
+					di.client = nil
+				}
+				break
+			}
+			logger.Warnf("DubboInvoker is to be destroyed, wait {%v} req end,url:{%s}", di.reqNum, di.GetUrl().Key())
+			time.Sleep(1 * time.Second)
 		}
+
 	})
 }
diff --git a/registry/directory/directory.go b/registry/directory/directory.go
index e88c611..d60a340 100644
--- a/registry/directory/directory.go
+++ b/registry/directory/directory.go
@@ -106,7 +106,10 @@ func (dir *registryDirectory) update(res *registry.ServiceEvent) {
 }
 
 func (dir *registryDirectory) refreshInvokers(res *registry.ServiceEvent) {
-	var url *common.URL
+	var (
+		url        *common.URL
+		oldInvoker protocol.Invoker = nil
+	)
 	//judge is override or others
 	if res != nil {
 		url = &res.Service
@@ -123,10 +126,10 @@ func (dir *registryDirectory) refreshInvokers(res *registry.ServiceEvent) {
 		switch res.Action {
 		case remoting.EventTypeAdd, remoting.EventTypeUpdate:
 			//dir.cacheService.EventTypeAdd(res.Path, dir.serviceTTL)
-			dir.cacheInvoker(url)
+			oldInvoker = dir.cacheInvoker(url)
 		case remoting.EventTypeDel:
 			//dir.cacheService.EventTypeDel(res.Path, dir.serviceTTL)
-			dir.uncacheInvoker(url)
+			oldInvoker = dir.uncacheInvoker(url)
 			logger.Infof("selector delete service url{%s}", res.Service)
 		default:
 			return
@@ -135,8 +138,14 @@ func (dir *registryDirectory) refreshInvokers(res *registry.ServiceEvent) {
 
 	newInvokers := dir.toGroupInvokers()
 	dir.listenerLock.Lock()
-	defer dir.listenerLock.Unlock()
 	dir.cacheInvokers = newInvokers
+	dir.listenerLock.Unlock()
+	// After dir.cacheInvokers is updated,destroy the oldInvoker
+	// Ensure that no request will enter the oldInvoker
+	if oldInvoker != nil {
+		oldInvoker.Destroy()
+	}
+
 }
 
 func (dir *registryDirectory) toGroupInvokers() []protocol.Invoker {
@@ -174,12 +183,18 @@ func (dir *registryDirectory) toGroupInvokers() []protocol.Invoker {
 	return groupInvokersList
 }
 
-func (dir *registryDirectory) uncacheInvoker(url *common.URL) {
+// uncacheInvoker return abandoned Invoker,if no Invoker to be abandoned,return nil
+func (dir *registryDirectory) uncacheInvoker(url *common.URL) protocol.Invoker {
 	logger.Debugf("service will be deleted in cache invokers: invokers key is  %s!", url.Key())
-	dir.cacheInvokersMap.Delete(url.Key())
+	if cacheInvoker, ok := dir.cacheInvokersMap.Load(url.Key()); ok {
+		dir.cacheInvokersMap.Delete(url.Key())
+		return cacheInvoker.(protocol.Invoker)
+	}
+	return nil
 }
 
-func (dir *registryDirectory) cacheInvoker(url *common.URL) {
+// cacheInvoker return abandoned Invoker,if no Invoker to be abandoned,return nil
+func (dir *registryDirectory) cacheInvoker(url *common.URL) protocol.Invoker {
 	dir.overrideUrl(dir.GetDirectoryUrl())
 	referenceUrl := dir.GetDirectoryUrl().SubURL
 
@@ -190,7 +205,7 @@ func (dir *registryDirectory) cacheInvoker(url *common.URL) {
 	}
 	if url == nil {
 		logger.Error("URL is nil ,pls check if service url is subscribe successfully!")
-		return
+		return nil
 	}
 	//check the url's protocol is equal to the protocol which is configured in reference config or referenceUrl is not care about protocol
 	if url.Protocol == referenceUrl.Protocol || referenceUrl.Protocol == "" {
@@ -207,10 +222,11 @@ func (dir *registryDirectory) cacheInvoker(url *common.URL) {
 			newInvoker := extension.GetProtocol(protocolwrapper.FILTER).Refer(*newUrl)
 			if newInvoker != nil {
 				dir.cacheInvokersMap.Store(newUrl.Key(), newInvoker)
-				cacheInvoker.(protocol.Invoker).Destroy()
+				return cacheInvoker.(protocol.Invoker)
 			}
 		}
 	}
+	return nil
 }
 
 //select the protocol invokers from the directory
@@ -235,10 +251,11 @@ func (dir *registryDirectory) IsAvailable() bool {
 func (dir *registryDirectory) Destroy() {
 	//TODO:unregister & unsubscribe
 	dir.BaseDirectory.Destroy(func() {
-		for _, ivk := range dir.cacheInvokers {
+		invokers := dir.cacheInvokers
+		dir.cacheInvokers = []protocol.Invoker{}
+		for _, ivk := range invokers {
 			ivk.Destroy()
 		}
-		dir.cacheInvokers = []protocol.Invoker{}
 	})
 }