You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dubbo.apache.org by fa...@apache.org on 2020/02/22 06:42:21 UTC
[dubbo-go] branch 1.3 updated: Merge pull request #358 from
pantianying/addRlockForDubboInvoker
This is an automated email from the ASF dual-hosted git repository.
fangyc pushed a commit to branch 1.3
in repository https://gitbox.apache.org/repos/asf/dubbo-go.git
The following commit(s) were added to refs/heads/1.3 by this push:
new 95065e5 Merge pull request #358 from pantianying/addRlockForDubboInvoker
new 082cc62 Merge pull request #364 from fangyincheng/1.3
95065e5 is described below
commit 95065e54419b9e83f54291209ec72294857925b3
Author: Ming Deng <mi...@gmail.com>
AuthorDate: Thu Feb 20 10:06:07 2020 +0800
Merge pull request #358 from pantianying/addRlockForDubboInvoker
Fix:deal the panic when invoker destroy
---
protocol/dubbo/dubbo_invoker.go | 40 ++++++++++++++++++++++++++++++++++------
registry/directory/directory.go | 39 ++++++++++++++++++++++++++++-----------
2 files changed, 62 insertions(+), 17 deletions(-)
diff --git a/protocol/dubbo/dubbo_invoker.go b/protocol/dubbo/dubbo_invoker.go
index 6dcf256..2d1ef1f 100644
--- a/protocol/dubbo/dubbo_invoker.go
+++ b/protocol/dubbo/dubbo_invoker.go
@@ -20,6 +20,8 @@ package dubbo
import (
"strconv"
"sync"
+ "sync/atomic"
+ "time"
)
import (
@@ -34,7 +36,11 @@ import (
invocation_impl "github.com/apache/dubbo-go/protocol/invocation"
)
-var Err_No_Reply = perrors.New("request need @response")
+var (
+ // ErrNoReply ...
+ ErrNoReply = perrors.New("request need @response")
+ ErrDestroyedInvoker = perrors.New("request Destroyed invoker")
+)
var (
attachmentKey = []string{constant.INTERFACE_KEY, constant.GROUP_KEY, constant.TOKEN_KEY, constant.TIMEOUT_KEY}
@@ -44,12 +50,15 @@ type DubboInvoker struct {
protocol.BaseInvoker
client *Client
quitOnce sync.Once
+ // Used to record the number of requests. -1 represent this DubboInvoker is destroyed
+ reqNum int64
}
func NewDubboInvoker(url common.URL, client *Client) *DubboInvoker {
return &DubboInvoker{
BaseInvoker: *protocol.NewBaseInvoker(url),
client: client,
+ reqNum: 0,
}
}
@@ -59,6 +68,15 @@ func (di *DubboInvoker) Invoke(invocation protocol.Invocation) protocol.Result {
err error
result protocol.RPCResult
)
+ if di.reqNum < 0 {
+ // Generally, the case will not happen, because the invoker has been removed
+ // from the invoker list before destroy,so no new request will enter the destroyed invoker
+ logger.Warnf("this dubboInvoker is destroyed")
+ result.Err = ErrDestroyedInvoker
+ return &result
+ }
+ atomic.AddInt64(&(di.reqNum), 1)
+ defer atomic.AddInt64(&(di.reqNum), -1)
inv := invocation.(*invocation_impl.RPCInvocation)
for _, k := range attachmentKey {
@@ -82,7 +100,7 @@ func (di *DubboInvoker) Invoke(invocation protocol.Invocation) protocol.Result {
}
} else {
if inv.Reply() == nil {
- result.Err = Err_No_Reply
+ result.Err = ErrNoReply
} else {
result.Err = di.client.Call(NewRequest(url.Location, url, inv.MethodName(), inv.Arguments(), inv.Attachments()), response)
}
@@ -98,10 +116,20 @@ func (di *DubboInvoker) Invoke(invocation protocol.Invocation) protocol.Result {
func (di *DubboInvoker) Destroy() {
di.quitOnce.Do(func() {
- di.BaseInvoker.Destroy()
-
- if di.client != nil {
- di.client.Close()
+ for {
+ if di.reqNum == 0 {
+ di.reqNum = -1
+ logger.Info("dubboInvoker is destroyed,url:{%s}", di.GetUrl().Key())
+ di.BaseInvoker.Destroy()
+ if di.client != nil {
+ di.client.Close()
+ di.client = nil
+ }
+ break
+ }
+ logger.Warnf("DubboInvoker is to be destroyed, wait {%v} req end,url:{%s}", di.reqNum, di.GetUrl().Key())
+ time.Sleep(1 * time.Second)
}
+
})
}
diff --git a/registry/directory/directory.go b/registry/directory/directory.go
index e88c611..d60a340 100644
--- a/registry/directory/directory.go
+++ b/registry/directory/directory.go
@@ -106,7 +106,10 @@ func (dir *registryDirectory) update(res *registry.ServiceEvent) {
}
func (dir *registryDirectory) refreshInvokers(res *registry.ServiceEvent) {
- var url *common.URL
+ var (
+ url *common.URL
+ oldInvoker protocol.Invoker = nil
+ )
//judge is override or others
if res != nil {
url = &res.Service
@@ -123,10 +126,10 @@ func (dir *registryDirectory) refreshInvokers(res *registry.ServiceEvent) {
switch res.Action {
case remoting.EventTypeAdd, remoting.EventTypeUpdate:
//dir.cacheService.EventTypeAdd(res.Path, dir.serviceTTL)
- dir.cacheInvoker(url)
+ oldInvoker = dir.cacheInvoker(url)
case remoting.EventTypeDel:
//dir.cacheService.EventTypeDel(res.Path, dir.serviceTTL)
- dir.uncacheInvoker(url)
+ oldInvoker = dir.uncacheInvoker(url)
logger.Infof("selector delete service url{%s}", res.Service)
default:
return
@@ -135,8 +138,14 @@ func (dir *registryDirectory) refreshInvokers(res *registry.ServiceEvent) {
newInvokers := dir.toGroupInvokers()
dir.listenerLock.Lock()
- defer dir.listenerLock.Unlock()
dir.cacheInvokers = newInvokers
+ dir.listenerLock.Unlock()
+ // After dir.cacheInvokers is updated,destroy the oldInvoker
+ // Ensure that no request will enter the oldInvoker
+ if oldInvoker != nil {
+ oldInvoker.Destroy()
+ }
+
}
func (dir *registryDirectory) toGroupInvokers() []protocol.Invoker {
@@ -174,12 +183,18 @@ func (dir *registryDirectory) toGroupInvokers() []protocol.Invoker {
return groupInvokersList
}
-func (dir *registryDirectory) uncacheInvoker(url *common.URL) {
+// uncacheInvoker return abandoned Invoker,if no Invoker to be abandoned,return nil
+func (dir *registryDirectory) uncacheInvoker(url *common.URL) protocol.Invoker {
logger.Debugf("service will be deleted in cache invokers: invokers key is %s!", url.Key())
- dir.cacheInvokersMap.Delete(url.Key())
+ if cacheInvoker, ok := dir.cacheInvokersMap.Load(url.Key()); ok {
+ dir.cacheInvokersMap.Delete(url.Key())
+ return cacheInvoker.(protocol.Invoker)
+ }
+ return nil
}
-func (dir *registryDirectory) cacheInvoker(url *common.URL) {
+// cacheInvoker return abandoned Invoker,if no Invoker to be abandoned,return nil
+func (dir *registryDirectory) cacheInvoker(url *common.URL) protocol.Invoker {
dir.overrideUrl(dir.GetDirectoryUrl())
referenceUrl := dir.GetDirectoryUrl().SubURL
@@ -190,7 +205,7 @@ func (dir *registryDirectory) cacheInvoker(url *common.URL) {
}
if url == nil {
logger.Error("URL is nil ,pls check if service url is subscribe successfully!")
- return
+ return nil
}
//check the url's protocol is equal to the protocol which is configured in reference config or referenceUrl is not care about protocol
if url.Protocol == referenceUrl.Protocol || referenceUrl.Protocol == "" {
@@ -207,10 +222,11 @@ func (dir *registryDirectory) cacheInvoker(url *common.URL) {
newInvoker := extension.GetProtocol(protocolwrapper.FILTER).Refer(*newUrl)
if newInvoker != nil {
dir.cacheInvokersMap.Store(newUrl.Key(), newInvoker)
- cacheInvoker.(protocol.Invoker).Destroy()
+ return cacheInvoker.(protocol.Invoker)
}
}
}
+ return nil
}
//select the protocol invokers from the directory
@@ -235,10 +251,11 @@ func (dir *registryDirectory) IsAvailable() bool {
func (dir *registryDirectory) Destroy() {
//TODO:unregister & unsubscribe
dir.BaseDirectory.Destroy(func() {
- for _, ivk := range dir.cacheInvokers {
+ invokers := dir.cacheInvokers
+ dir.cacheInvokers = []protocol.Invoker{}
+ for _, ivk := range invokers {
ivk.Destroy()
}
- dir.cacheInvokers = []protocol.Invoker{}
})
}