You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dubbo.apache.org by al...@apache.org on 2022/01/15 10:36:36 UTC
[dubbo-go] branch 3.0 updated: refactor: invocation interface (#1702)
This is an automated email from the ASF dual-hosted git repository.
alexstocks pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/dubbo-go.git
The following commit(s) were added to refs/heads/3.0 by this push:
new f271fc1 refactor: invocation interface (#1702)
f271fc1 is described below
commit f271fc154386371721961d08d431b204008e5630
Author: Xuewei Niu <ju...@apache.org>
AuthorDate: Sat Jan 15 18:36:32 2022 +0800
refactor: invocation interface (#1702)
* refactor: invocation interface
* fix: set attachements use interface{}
* fix: reset result set attachments
---
cluster/cluster/adaptivesvc/cluster_invoker.go | 2 +
cluster/cluster/zoneaware/cluster_interceptor.go | 4 +-
cluster/cluster/zoneaware/cluster_invoker.go | 4 +-
cluster/cluster/zoneaware/cluster_invoker_test.go | 6 +-
common/proxy/proxy.go | 6 +-
common/proxy/proxy_factory/pass_through.go | 2 +-
filter/active/filter.go | 4 +-
filter/active/filter_test.go | 2 +-
filter/adaptivesvc/filter.go | 2 +-
filter/auth/default_authenticator.go | 16 +--
filter/auth/default_authenticator_test.go | 8 +-
filter/generic/filter.go | 2 +-
filter/generic/filter_test.go | 4 +-
filter/generic/service_filter.go | 4 +-
filter/seata/filter.go | 2 +-
protocol/dubbo/dubbo_codec.go | 18 ++--
protocol/dubbo/dubbo_invoker.go | 10 +-
protocol/dubbo/dubbo_invoker_test.go | 4 +-
protocol/dubbo3/dubbo3_invoker.go | 4 +-
protocol/invocation.go | 25 +++--
protocol/invocation/rpcinvocation.go | 122 +++++++++++++---------
protocol/result.go | 2 +-
remoting/getty/dubbo_codec_for_test.go | 12 +--
remoting/getty/getty_client_test.go | 2 +-
24 files changed, 148 insertions(+), 119 deletions(-)
diff --git a/cluster/cluster/adaptivesvc/cluster_invoker.go b/cluster/cluster/adaptivesvc/cluster_invoker.go
index 30f0772..d21c8d7 100644
--- a/cluster/cluster/adaptivesvc/cluster_invoker.go
+++ b/cluster/cluster/adaptivesvc/cluster_invoker.go
@@ -36,6 +36,8 @@ import (
"dubbo.apache.org/dubbo-go/v3/protocol"
)
+var _ protocol.Invoker = (*adaptiveServiceClusterInvoker)(nil)
+
type adaptiveServiceClusterInvoker struct {
base.BaseClusterInvoker
}
diff --git a/cluster/cluster/zoneaware/cluster_interceptor.go b/cluster/cluster/zoneaware/cluster_interceptor.go
index 67844d2..c92edc7 100644
--- a/cluster/cluster/zoneaware/cluster_interceptor.go
+++ b/cluster/cluster/zoneaware/cluster_interceptor.go
@@ -38,11 +38,11 @@ func (z *interceptor) Invoke(ctx context.Context, invoker protocol.Invoker, invo
switch value := force.(type) {
case bool:
if value {
- invocation.SetAttachments(key, "true")
+ invocation.SetAttachment(key, "true")
}
case string:
if "true" == value {
- invocation.SetAttachments(key, "true")
+ invocation.SetAttachment(key, "true")
}
default:
// ignore
diff --git a/cluster/cluster/zoneaware/cluster_invoker.go b/cluster/cluster/zoneaware/cluster_invoker.go
index e8cd3fd..f6cbacf 100644
--- a/cluster/cluster/zoneaware/cluster_invoker.go
+++ b/cluster/cluster/zoneaware/cluster_invoker.go
@@ -65,7 +65,7 @@ func (invoker *zoneawareClusterInvoker) Invoke(ctx context.Context, invocation p
// providers in the registry with the same zone
key := constant.RegistryKey + "." + constant.RegistryZoneKey
- zone := invocation.AttachmentsByKey(key, "")
+ zone := invocation.GetAttachmentWithDefaultValue(key, "")
if "" != zone {
for _, invoker := range invokers {
if invoker.IsAvailable() && matchParam(zone, key, "", invoker) {
@@ -73,7 +73,7 @@ func (invoker *zoneawareClusterInvoker) Invoke(ctx context.Context, invocation p
}
}
- force := invocation.AttachmentsByKey(constant.RegistryKey+"."+constant.RegistryZoneForceKey, "")
+ force := invocation.GetAttachmentWithDefaultValue(constant.RegistryKey+"."+constant.RegistryZoneForceKey, "")
if "true" == force {
return &protocol.RPCResult{
Err: fmt.Errorf("no registry instance in zone or "+
diff --git a/cluster/cluster/zoneaware/cluster_invoker_test.go b/cluster/cluster/zoneaware/cluster_invoker_test.go
index d32a3e7..705218e 100644
--- a/cluster/cluster/zoneaware/cluster_invoker_test.go
+++ b/cluster/cluster/zoneaware/cluster_invoker_test.go
@@ -177,7 +177,7 @@ func TestZoneWareInvokerWithZoneSuccess(t *testing.T) {
inv := &invocation.RPCInvocation{}
// zone hangzhou
hz := zoneArray[0]
- inv.SetAttachments(constant.RegistryKey+"."+constant.RegistryZoneKey, hz)
+ inv.SetAttachment(constant.RegistryKey+"."+constant.RegistryZoneKey, hz)
result := clusterInvoker.Invoke(context.Background(), inv)
@@ -206,9 +206,9 @@ func TestZoneWareInvokerWithZoneForceFail(t *testing.T) {
inv := &invocation.RPCInvocation{}
// zone hangzhou
- inv.SetAttachments(constant.RegistryKey+"."+constant.RegistryZoneKey, "hangzhou")
+ inv.SetAttachment(constant.RegistryKey+"."+constant.RegistryZoneKey, "hangzhou")
// zone force
- inv.SetAttachments(constant.RegistryKey+"."+constant.RegistryZoneForceKey, "true")
+ inv.SetAttachment(constant.RegistryKey+"."+constant.RegistryZoneForceKey, "true")
result := clusterInvoker.Invoke(context.Background(), inv)
diff --git a/common/proxy/proxy.go b/common/proxy/proxy.go
index 8ff36d8..016d293 100644
--- a/common/proxy/proxy.go
+++ b/common/proxy/proxy.go
@@ -181,19 +181,19 @@ func DefaultProxyImplementFunc(p *Proxy, v common.RPCService) {
}
for k, value := range p.attachments {
- inv.SetAttachments(k, value)
+ inv.SetAttachment(k, value)
}
// add user setAttachment. It is compatibility with previous versions.
atm := invCtx.Value(constant.AttachmentKey)
if m, ok := atm.(map[string]string); ok {
for k, value := range m {
- inv.SetAttachments(k, value)
+ inv.SetAttachment(k, value)
}
} else if m2, ok2 := atm.(map[string]interface{}); ok2 {
// it is support to transfer map[string]interface{}. It refers to dubbo-java 2.7.
for k, value := range m2 {
- inv.SetAttachments(k, value)
+ inv.SetAttachment(k, value)
}
}
diff --git a/common/proxy/proxy_factory/pass_through.go b/common/proxy/proxy_factory/pass_through.go
index 6ad28dc..41a2b57 100644
--- a/common/proxy/proxy_factory/pass_through.go
+++ b/common/proxy/proxy_factory/pass_through.go
@@ -102,7 +102,7 @@ func (pi *PassThroughProxyInvoker) Invoke(ctx context.Context, invocation protoc
in := make([]reflect.Value, 5)
in = append(in, srv.Rcvr())
in = append(in, reflect.ValueOf(invocation.MethodName()))
- in = append(in, reflect.ValueOf(invocation.Attachment(constant.ParamsTypeKey)))
+ in = append(in, reflect.ValueOf(invocation.GetAttachmentInterface(constant.ParamsTypeKey)))
in = append(in, reflect.ValueOf(args))
in = append(in, reflect.ValueOf(invocation.Attachments()))
diff --git a/filter/active/filter.go b/filter/active/filter.go
index 4228c19..8ee781b 100644
--- a/filter/active/filter.go
+++ b/filter/active/filter.go
@@ -59,14 +59,14 @@ func newActiveFilter() filter.Filter {
// Invoke starts to record the requests status
func (f *activeFilter) Invoke(ctx context.Context, invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result {
- invocation.(*invocation2.RPCInvocation).SetAttachments(dubboInvokeStartTime, strconv.FormatInt(protocol.CurrentTimeMillis(), 10))
+ invocation.(*invocation2.RPCInvocation).SetAttachment(dubboInvokeStartTime, strconv.FormatInt(protocol.CurrentTimeMillis(), 10))
protocol.BeginCount(invoker.GetURL(), invocation.MethodName())
return invoker.Invoke(ctx, invocation)
}
// OnResponse update the active count base on the request result.
func (f *activeFilter) OnResponse(ctx context.Context, result protocol.Result, invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result {
- startTime, err := strconv.ParseInt(invocation.(*invocation2.RPCInvocation).AttachmentsByKey(dubboInvokeStartTime, "0"), 10, 64)
+ startTime, err := strconv.ParseInt(invocation.(*invocation2.RPCInvocation).GetAttachmentWithDefaultValue(dubboInvokeStartTime, "0"), 10, 64)
if err != nil {
result.SetError(err)
logger.Errorf("parse dubbo_invoke_start_time to int64 failed")
diff --git a/filter/active/filter_test.go b/filter/active/filter_test.go
index 1d751aa..a4b0f5c 100644
--- a/filter/active/filter_test.go
+++ b/filter/active/filter_test.go
@@ -47,7 +47,7 @@ func TestFilterInvoke(t *testing.T) {
invoker.EXPECT().Invoke(gomock.Any()).Return(nil)
invoker.EXPECT().GetUrl().Return(url).Times(1)
filter.Invoke(context.Background(), invoker, invoc)
- assert.True(t, invoc.AttachmentsByKey(dubboInvokeStartTime, "") != "")
+ assert.True(t, invoc.GetAttachmentWithDefaultValue(dubboInvokeStartTime, "") != "")
}
func TestFilterOnResponse(t *testing.T) {
diff --git a/filter/adaptivesvc/filter.go b/filter/adaptivesvc/filter.go
index 04dde94..17472ba 100644
--- a/filter/adaptivesvc/filter.go
+++ b/filter/adaptivesvc/filter.go
@@ -91,7 +91,7 @@ func (f *adaptiveServiceProviderFilter) Invoke(ctx context.Context, invoker prot
func (f *adaptiveServiceProviderFilter) OnResponse(_ context.Context, result protocol.Result, invoker protocol.Invoker,
invocation protocol.Invocation) protocol.Result {
// get updater from the attributes
- updaterIface := invocation.AttributeByKey(constant.AdaptiveServiceUpdaterKey, nil)
+ updaterIface, _ := invocation.GetAttribute(constant.AdaptiveServiceUpdaterKey)
if updaterIface == nil {
return &protocol.RPCResult{Err: ErrUpdaterNotFound}
}
diff --git a/filter/auth/default_authenticator.go b/filter/auth/default_authenticator.go
index 71da50b..db178c7 100644
--- a/filter/auth/default_authenticator.go
+++ b/filter/auth/default_authenticator.go
@@ -69,10 +69,10 @@ func (authenticator *defaultAuthenticator) Sign(invocation protocol.Invocation,
if err != nil {
return err
}
- inv.SetAttachments(constant.RequestSignatureKey, signature)
- inv.SetAttachments(constant.RequestTimestampKey, currentTimeMillis)
- inv.SetAttachments(constant.AKKey, accessKeyPair.AccessKey)
- inv.SetAttachments(constant.Consumer, consumer)
+ inv.SetAttachment(constant.RequestSignatureKey, signature)
+ inv.SetAttachment(constant.RequestTimestampKey, currentTimeMillis)
+ inv.SetAttachment(constant.AKKey, accessKeyPair.AccessKey)
+ inv.SetAttachment(constant.Consumer, consumer)
return nil
}
@@ -97,11 +97,11 @@ func getSignature(url *common.URL, invocation protocol.Invocation, secrectKey st
// Authenticate verifies whether the signature sent by the requester is correct
func (authenticator *defaultAuthenticator) Authenticate(invocation protocol.Invocation, url *common.URL) error {
- accessKeyId := invocation.AttachmentsByKey(constant.AKKey, "")
+ accessKeyId := invocation.GetAttachmentWithDefaultValue(constant.AKKey, "")
- requestTimestamp := invocation.AttachmentsByKey(constant.RequestTimestampKey, "")
- originSignature := invocation.AttachmentsByKey(constant.RequestSignatureKey, "")
- consumer := invocation.AttachmentsByKey(constant.Consumer, "")
+ requestTimestamp := invocation.GetAttachmentWithDefaultValue(constant.RequestTimestampKey, "")
+ originSignature := invocation.GetAttachmentWithDefaultValue(constant.RequestSignatureKey, "")
+ consumer := invocation.GetAttachmentWithDefaultValue(constant.Consumer, "")
if IsEmpty(accessKeyId, false) || IsEmpty(consumer, false) ||
IsEmpty(requestTimestamp, false) || IsEmpty(originSignature, false) {
return errors.New("failed to authenticate your ak/sk, maybe the consumer has not enabled the auth")
diff --git a/filter/auth/default_authenticator_test.go b/filter/auth/default_authenticator_test.go
index 57197f1..3ff17b9 100644
--- a/filter/auth/default_authenticator_test.go
+++ b/filter/auth/default_authenticator_test.go
@@ -79,10 +79,10 @@ func TestDefaultAuthenticator_Sign(t *testing.T) {
testurl.SetParam(constant.ParameterSignatureEnableKey, "false")
inv := invocation.NewRPCInvocation("test", []interface{}{"OK"}, nil)
_ = authenticator.Sign(inv, testurl)
- assert.NotEqual(t, inv.AttachmentsByKey(constant.RequestSignatureKey, ""), "")
- assert.NotEqual(t, inv.AttachmentsByKey(constant.Consumer, ""), "")
- assert.NotEqual(t, inv.AttachmentsByKey(constant.RequestTimestampKey, ""), "")
- assert.Equal(t, inv.AttachmentsByKey(constant.AKKey, ""), "akey")
+ assert.NotEqual(t, inv.GetAttachmentWithDefaultValue(constant.RequestSignatureKey, ""), "")
+ assert.NotEqual(t, inv.GetAttachmentWithDefaultValue(constant.Consumer, ""), "")
+ assert.NotEqual(t, inv.GetAttachmentWithDefaultValue(constant.RequestTimestampKey, ""), "")
+ assert.Equal(t, inv.GetAttachmentWithDefaultValue(constant.AKKey, ""), "akey")
}
func Test_getAccessKeyPairSuccess(t *testing.T) {
diff --git a/filter/generic/filter.go b/filter/generic/filter.go
index c63b30c..ee93b23 100644
--- a/filter/generic/filter.go
+++ b/filter/generic/filter.go
@@ -67,7 +67,7 @@ func (f *genericFilter) Invoke(ctx context.Context, invoker protocol.Invoker, in
args := make([]hessian.Object, 0, len(oldargs))
// get generic info from attachments of invocation, the default value is "true"
- generic := invocation.AttachmentsByKey(constant.GenericKey, constant.GenericSerializationDefault)
+ generic := invocation.GetAttachmentWithDefaultValue(constant.GenericKey, constant.GenericSerializationDefault)
// get generalizer according to value in the `generic`
g := getGeneralizer(generic)
diff --git a/filter/generic/filter_test.go b/filter/generic/filter_test.go
index cccd21a..4b99f95 100644
--- a/filter/generic/filter_test.go
+++ b/filter/generic/filter_test.go
@@ -60,7 +60,7 @@ func TestFilter_Invoke(t *testing.T) {
assert.Equal(t, "Hello", args[0])
assert.Equal(t, "java.lang.String", args[1].([]string)[0])
assert.Equal(t, "arg1", args[2].([]hessian.Object)[0].(string))
- assert.Equal(t, constant.GenericSerializationDefault, invocation.AttachmentsByKey(constant.GenericKey, ""))
+ assert.Equal(t, constant.GenericSerializationDefault, invocation.GetAttachmentWithDefaultValue(constant.GenericKey, ""))
return &protocol.RPCResult{}
})
@@ -93,7 +93,7 @@ func TestFilter_InvokeWithGenericCall(t *testing.T) {
assert.Equal(t, "hello", args[0])
assert.Equal(t, "java.lang.String", args[1].([]string)[0])
assert.Equal(t, "arg1", args[2].([]string)[0])
- assert.Equal(t, constant.GenericSerializationDefault, invocation.AttachmentsByKey(constant.GenericKey, ""))
+ assert.Equal(t, constant.GenericSerializationDefault, invocation.GetAttachmentWithDefaultValue(constant.GenericKey, ""))
return &protocol.RPCResult{}
})
diff --git a/filter/generic/service_filter.go b/filter/generic/service_filter.go
index d9f1bba..b6293e7 100644
--- a/filter/generic/service_filter.go
+++ b/filter/generic/service_filter.go
@@ -87,7 +87,7 @@ func (f *genericServiceFilter) Invoke(ctx context.Context, invoker protocol.Invo
argsType := method.ArgsType()
// get generic info from attachments of invocation, the default value is "true"
- generic := invocation.AttachmentsByKey(constant.GenericKey, constant.GenericSerializationDefault)
+ generic := invocation.GetAttachmentWithDefaultValue(constant.GenericKey, constant.GenericSerializationDefault)
// get generalizer according to value in the `generic`
g := getGeneralizer(generic)
@@ -126,7 +126,7 @@ func (f *genericServiceFilter) Invoke(ctx context.Context, invoker protocol.Invo
func (f *genericServiceFilter) OnResponse(_ context.Context, result protocol.Result, _ protocol.Invoker, invocation protocol.Invocation) protocol.Result {
if invocation.IsGenericInvocation() && result.Result() != nil {
// get generic info from attachments of invocation, the default value is "true"
- generic := invocation.AttachmentsByKey(constant.GenericKey, constant.GenericSerializationDefault)
+ generic := invocation.GetAttachmentWithDefaultValue(constant.GenericKey, constant.GenericSerializationDefault)
// get generalizer according to value in the `generic`
g := getGeneralizer(generic)
diff --git a/filter/seata/filter.go b/filter/seata/filter.go
index b1a75e4..f05b738 100644
--- a/filter/seata/filter.go
+++ b/filter/seata/filter.go
@@ -58,7 +58,7 @@ func newSeataFilter() filter.Filter {
// Invoke Get Xid by attachment key `SEATA_XID`. When use Seata, transfer xid by attachments
func (f *seataFilter) Invoke(ctx context.Context, invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result {
- xid := invocation.AttachmentsByKey(string(SEATA_XID), "")
+ xid := invocation.GetAttachmentWithDefaultValue(string(SEATA_XID), "")
if len(strings.TrimSpace(xid)) > 0 {
logger.Debugf("Method: %v,Xid: %v", invocation.MethodName(), xid)
return invoker.Invoke(context.WithValue(ctx, SEATA_XID, xid), invocation)
diff --git a/protocol/dubbo/dubbo_codec.go b/protocol/dubbo/dubbo_codec.go
index 477e6c7..b35d909 100644
--- a/protocol/dubbo/dubbo_codec.go
+++ b/protocol/dubbo/dubbo_codec.go
@@ -65,12 +65,12 @@ func (c *DubboCodec) EncodeRequest(request *remoting.Request) (*bytes.Buffer, er
invocation := *invoc
svc := impl.Service{}
- svc.Path = invocation.AttachmentsByKey(constant.PathKey, "")
- svc.Interface = invocation.AttachmentsByKey(constant.InterfaceKey, "")
- svc.Version = invocation.AttachmentsByKey(constant.VersionKey, "")
- svc.Group = invocation.AttachmentsByKey(constant.GroupKey, "")
+ svc.Path = invocation.GetAttachmentWithDefaultValue(constant.PathKey, "")
+ svc.Interface = invocation.GetAttachmentWithDefaultValue(constant.InterfaceKey, "")
+ svc.Version = invocation.GetAttachmentWithDefaultValue(constant.VersionKey, "")
+ svc.Group = invocation.GetAttachmentWithDefaultValue(constant.GroupKey, "")
svc.Method = invocation.MethodName()
- timeout, err := strconv.Atoi(invocation.AttachmentsByKey(constant.TimeoutKey, strconv.Itoa(constant.DefaultRemotingTimeout)))
+ timeout, err := strconv.Atoi(invocation.GetAttachmentWithDefaultValue(constant.TimeoutKey, strconv.Itoa(constant.DefaultRemotingTimeout)))
if err != nil {
// it will be wrapped in readwrite.Write .
return nil, perrors.WithStack(err)
@@ -78,7 +78,7 @@ func (c *DubboCodec) EncodeRequest(request *remoting.Request) (*bytes.Buffer, er
svc.Timeout = time.Duration(timeout)
header := impl.DubboHeader{}
- serialization := invocation.AttachmentsByKey(constant.SerializationKey, constant.Hessian2Serialization)
+ serialization := invocation.GetAttachmentWithDefaultValue(constant.SerializationKey, constant.Hessian2Serialization)
if serialization == constant.ProtobufSerialization {
header.SerialID = constant.SProto
} else {
@@ -254,20 +254,20 @@ func (c *DubboCodec) decodeResponse(data []byte) (*remoting.Response, int, error
Status: pkg.Header.ResponseStatus,
Event: (pkg.Header.Type & impl.PackageHeartbeat) != 0,
}
- var error error
+ var pkgerr error
if pkg.Header.Type&impl.PackageHeartbeat != 0x00 {
if pkg.Header.Type&impl.PackageResponse != 0x00 {
logger.Debugf("get rpc heartbeat response{header: %#v, body: %#v}", pkg.Header, pkg.Body)
if pkg.Err != nil {
logger.Errorf("rpc heartbeat response{error: %#v}", pkg.Err)
- error = pkg.Err
+ pkgerr = pkg.Err
}
} else {
logger.Debugf("get rpc heartbeat request{header: %#v, service: %#v, body: %#v}", pkg.Header, pkg.Service, pkg.Body)
response.Status = hessian.Response_OK
// reply(session, p, hessian.PackageHeartbeat)
}
- return response, hessian.HEADER_LENGTH + pkg.Header.BodyLen, error
+ return response, hessian.HEADER_LENGTH + pkg.Header.BodyLen, pkgerr
}
logger.Debugf("get rpc response{header: %#v, body: %#v}", pkg.Header, pkg.Body)
rpcResult := &protocol.RPCResult{}
diff --git a/protocol/dubbo/dubbo_invoker.go b/protocol/dubbo/dubbo_invoker.go
index 904bfb0..6d74f71 100644
--- a/protocol/dubbo/dubbo_invoker.go
+++ b/protocol/dubbo/dubbo_invoker.go
@@ -117,10 +117,10 @@ func (di *DubboInvoker) Invoke(ctx context.Context, invocation protocol.Invocati
inv := invocation.(*invocation_impl.RPCInvocation)
// init param
- inv.SetAttachments(constant.PathKey, di.GetURL().GetParam(constant.InterfaceKey, ""))
+ inv.SetAttachment(constant.PathKey, di.GetURL().GetParam(constant.InterfaceKey, ""))
for _, k := range attachmentKey {
if v := di.GetURL().GetParam(k, ""); len(v) > 0 {
- inv.SetAttachments(k, v)
+ inv.SetAttachment(k, v)
}
}
@@ -133,7 +133,7 @@ func (di *DubboInvoker) Invoke(ctx context.Context, invocation protocol.Invocati
url.SetParam(constant.SerializationKey, constant.Hessian2Serialization)
}
// async
- async, err := strconv.ParseBool(inv.AttachmentsByKey(constant.AsyncKey, "false"))
+ async, err := strconv.ParseBool(inv.GetAttachmentWithDefaultValue(constant.AsyncKey, "false"))
if err != nil {
logger.Errorf("ParseBool - error: %v", err)
async = false
@@ -172,12 +172,12 @@ func (di *DubboInvoker) getTimeout(invocation *invocation_impl.RPCInvocation) ti
if len(timeout) != 0 {
if t, err := time.ParseDuration(timeout); err == nil {
// config timeout into attachment
- invocation.SetAttachments(constant.TimeoutKey, strconv.Itoa(int(t.Milliseconds())))
+ invocation.SetAttachment(constant.TimeoutKey, strconv.Itoa(int(t.Milliseconds())))
return t
}
}
// set timeout into invocation at method level
- invocation.SetAttachments(constant.TimeoutKey, strconv.Itoa(int(di.timeout.Milliseconds())))
+ invocation.SetAttachment(constant.TimeoutKey, strconv.Itoa(int(di.timeout.Milliseconds())))
return di.timeout
}
diff --git a/protocol/dubbo/dubbo_invoker_test.go b/protocol/dubbo/dubbo_invoker_test.go
index 5f5dd86..8d12bba 100644
--- a/protocol/dubbo/dubbo_invoker_test.go
+++ b/protocol/dubbo/dubbo_invoker_test.go
@@ -63,7 +63,7 @@ package dubbo
// assert.Equal(t, User{ID: "1", Name: "username"}, *res.Result().(*User))
//
// // CallOneway
-// inv.SetAttachments(constant.ASYNC_KEY, "true")
+// inv.SetAttachment(constant.ASYNC_KEY, "true")
// res = invoker.Invoke(context.Background(), inv)
// assert.NoError(t, res.Error())
//
@@ -81,7 +81,7 @@ package dubbo
// assert.NoError(t, res.Error())
//
// // Err_No_Reply
-// inv.SetAttachments(constant.ASYNC_KEY, "false")
+// inv.SetAttachment(constant.ASYNC_KEY, "false")
// inv.SetReply(nil)
// res = invoker.Invoke(context.Background(), inv)
// assert.EqualError(t, res.Error(), "request need @response")
diff --git a/protocol/dubbo3/dubbo3_invoker.go b/protocol/dubbo3/dubbo3_invoker.go
index 332f78e..bc07957 100644
--- a/protocol/dubbo3/dubbo3_invoker.go
+++ b/protocol/dubbo3/dubbo3_invoker.go
@@ -217,12 +217,12 @@ func (di *DubboInvoker) getTimeout(invocation *invocation_impl.RPCInvocation) ti
if len(timeout) != 0 {
if t, err := time.ParseDuration(timeout); err == nil {
// config timeout into attachment
- invocation.SetAttachments(constant.TimeoutKey, strconv.Itoa(int(t.Milliseconds())))
+ invocation.SetAttachment(constant.TimeoutKey, strconv.Itoa(int(t.Milliseconds())))
return t
}
}
// set timeout into invocation at method level
- invocation.SetAttachments(constant.TimeoutKey, strconv.Itoa(int(di.timeout.Milliseconds())))
+ invocation.SetAttachment(constant.TimeoutKey, strconv.Itoa(int(di.timeout.Milliseconds())))
return di.timeout
}
diff --git a/protocol/invocation.go b/protocol/invocation.go
index a85b6b6..4580250 100644
--- a/protocol/invocation.go
+++ b/protocol/invocation.go
@@ -38,18 +38,23 @@ type Invocation interface {
// Reply gets response of request
Reply() interface{}
// Attachments gets all attachments
- Attachments() map[string]interface{}
- // AttachmentsByKey gets attachment by key , if nil then return default value. (It will be deprecated in the future)
- AttachmentsByKey(string, string) string
- Attachment(string) interface{}
- // Attributes refers to dubbo 2.7.6. It is different from attachment. It is used in internal process.
- Attributes() map[string]interface{}
- // AttributeByKey gets attribute by key , if nil then return default value
- AttributeByKey(string, interface{}) interface{}
- // SetAttachments sets attribute by @key and @value.
- SetAttachments(key string, value interface{})
+
// Invoker gets the invoker in current context.
Invoker() Invoker
// IsGenericInvocation gets if this is a generic invocation
IsGenericInvocation() bool
+
+ Attachments() map[string]interface{}
+ SetAttachment(key string, value interface{})
+ GetAttachment(key string) (string, bool)
+ GetAttachmentInterface(string) interface{}
+ GetAttachmentWithDefaultValue(key string, defaultValue string) string
+
+ // Attributes firstly introduced on dubbo-java 2.7.6. It is
+ // used in internal invocation, that is, it's not passed between
+ // server and client.
+ Attributes() map[string]interface{}
+ SetAttribute(key string, value interface{})
+ GetAttribute(key string) (interface{}, bool)
+ GetAttributeWithDefaultValue(key string, defaultValue interface{}) interface{}
}
diff --git a/protocol/invocation/rpcinvocation.go b/protocol/invocation/rpcinvocation.go
index aa1f0d9..399e3a1 100644
--- a/protocol/invocation/rpcinvocation.go
+++ b/protocol/invocation/rpcinvocation.go
@@ -129,32 +129,14 @@ func (r *RPCInvocation) Attachments() map[string]interface{} {
return r.attachments
}
-// AttachmentsByKey gets RPC attachment by key, if nil then return default value.
-func (r *RPCInvocation) AttachmentsByKey(key string, defaultValue string) string {
- r.lock.RLock()
- defer r.lock.RUnlock()
- if r.attachments == nil {
- return defaultValue
- }
- value, ok := r.attachments[key]
- if ok {
- return value.(string)
- }
- return defaultValue
-}
-
// Attachment returns the corresponding value from dubbo's attachment with the given key.
-func (r *RPCInvocation) Attachment(key string) interface{} {
+func (r *RPCInvocation) GetAttachmentInterface(key string) interface{} {
r.lock.RLock()
defer r.lock.RUnlock()
if r.attachments == nil {
return nil
}
- value, ok := r.attachments[key]
- if ok {
- return value
- }
- return nil
+ return r.attachments[key]
}
// Attributes gets all attributes of RPC.
@@ -162,34 +144,6 @@ func (r *RPCInvocation) Attributes() map[string]interface{} {
return r.attributes
}
-// AttributeByKey gets attribute by @key. If it is not exist, it will return default value.
-func (r *RPCInvocation) AttributeByKey(key string, defaultValue interface{}) interface{} {
- r.lock.RLock()
- defer r.lock.RUnlock()
- value, ok := r.attributes[key]
- if ok {
- return value
- }
- return defaultValue
-}
-
-// SetAttachments sets attribute by @key and @value.
-func (r *RPCInvocation) SetAttachments(key string, value interface{}) {
- r.lock.Lock()
- defer r.lock.Unlock()
- if r.attachments == nil {
- r.attachments = make(map[string]interface{})
- }
- r.attachments[key] = value
-}
-
-// SetAttribute sets attribute by @key and @value.
-func (r *RPCInvocation) SetAttribute(key string, value interface{}) {
- r.lock.Lock()
- defer r.lock.Unlock()
- r.attributes[key] = value
-}
-
// Invoker gets the invoker in current context.
func (r *RPCInvocation) Invoker() protocol.Invoker {
return r.invoker
@@ -213,8 +167,76 @@ func (r *RPCInvocation) SetCallBack(c interface{}) {
}
func (r *RPCInvocation) ServiceKey() string {
- return common.ServiceKey(strings.TrimPrefix(r.AttachmentsByKey(constant.PathKey, r.AttachmentsByKey(constant.InterfaceKey, "")), "/"),
- r.AttachmentsByKey(constant.GroupKey, ""), r.AttachmentsByKey(constant.VersionKey, ""))
+ return common.ServiceKey(strings.TrimPrefix(r.GetAttachmentWithDefaultValue(constant.PathKey, r.GetAttachmentWithDefaultValue(constant.InterfaceKey, "")), "/"),
+ r.GetAttachmentWithDefaultValue(constant.GroupKey, ""), r.GetAttachmentWithDefaultValue(constant.VersionKey, ""))
+}
+
+func (r *RPCInvocation) SetAttachment(key string, value interface{}) {
+ r.lock.Lock()
+ defer r.lock.Unlock()
+ if r.attachments == nil {
+ r.attachments = make(map[string]interface{})
+ }
+ r.attachments[key] = value
+}
+
+func (r *RPCInvocation) GetAttachment(key string) (string, bool) {
+ r.lock.RLock()
+ defer r.lock.RUnlock()
+ if r.attachments == nil {
+ return "", false
+ }
+ if value, ok := r.attachments[key]; ok {
+ if str, ok := value.(string); ok {
+ return str, true
+ }
+ }
+ return "", false
+}
+
+func (r *RPCInvocation) GetAttachmentWithDefaultValue(key string, defaultValue string) string {
+ r.lock.RLock()
+ defer r.lock.RUnlock()
+ if r.attachments == nil {
+ return defaultValue
+ }
+ if value, ok := r.attachments[key]; ok {
+ if str, ok := value.(string); ok {
+ return str
+ }
+ }
+ return defaultValue
+}
+
+func (r *RPCInvocation) SetAttribute(key string, value interface{}) {
+ r.lock.Lock()
+ defer r.lock.Unlock()
+ if r.attributes == nil {
+ r.attributes = make(map[string]interface{})
+ }
+ r.attributes[key] = value
+}
+
+func (r *RPCInvocation) GetAttribute(key string) (interface{}, bool) {
+ r.lock.RLock()
+ defer r.lock.RUnlock()
+ if r.attributes == nil {
+ return nil, false
+ }
+ value, ok := r.attributes[key]
+ return value, ok
+}
+
+func (r *RPCInvocation) GetAttributeWithDefaultValue(key string, defaultValue interface{}) interface{} {
+ r.lock.RLock()
+ defer r.lock.RUnlock()
+ if r.attributes == nil {
+ return defaultValue
+ }
+ if value, ok := r.attachments[key]; ok {
+ return value
+ }
+ return defaultValue
}
// /////////////////////////
diff --git a/protocol/result.go b/protocol/result.go
index a2169a1..709c7ec 100644
--- a/protocol/result.go
+++ b/protocol/result.go
@@ -71,7 +71,7 @@ func (r *RPCResult) Result() interface{} {
return r.Rest
}
-// SetAttachments replaces the existing attachments with the specified param.
+// SetAttachment replaces the existing attachments with the specified param.
func (r *RPCResult) SetAttachments(attr map[string]interface{}) {
r.Attrs = attr
}
diff --git a/remoting/getty/dubbo_codec_for_test.go b/remoting/getty/dubbo_codec_for_test.go
index 6a90a0b..de3783e 100644
--- a/remoting/getty/dubbo_codec_for_test.go
+++ b/remoting/getty/dubbo_codec_for_test.go
@@ -59,12 +59,12 @@ func (c *DubboTestCodec) EncodeRequest(request *remoting.Request) (*bytes.Buffer
tmpInvocation := invoc
svc := impl.Service{}
- svc.Path = tmpInvocation.AttachmentsByKey(constant.PathKey, "")
- svc.Interface = tmpInvocation.AttachmentsByKey(constant.InterfaceKey, "")
- svc.Version = tmpInvocation.AttachmentsByKey(constant.VersionKey, "")
- svc.Group = tmpInvocation.AttachmentsByKey(constant.GroupKey, "")
+ svc.Path = tmpInvocation.GetAttachmentWithDefaultValue(constant.PathKey, "")
+ svc.Interface = tmpInvocation.GetAttachmentWithDefaultValue(constant.InterfaceKey, "")
+ svc.Version = tmpInvocation.GetAttachmentWithDefaultValue(constant.VersionKey, "")
+ svc.Group = tmpInvocation.GetAttachmentWithDefaultValue(constant.GroupKey, "")
svc.Method = tmpInvocation.MethodName()
- timeout, err := strconv.Atoi(tmpInvocation.AttachmentsByKey(constant.TimeoutKey, strconv.Itoa(constant.DefaultRemotingTimeout)))
+ timeout, err := strconv.Atoi(tmpInvocation.GetAttachmentWithDefaultValue(constant.TimeoutKey, strconv.Itoa(constant.DefaultRemotingTimeout)))
if err != nil {
// it will be wrapped in readwrite.Write .
return nil, perrors.WithStack(err)
@@ -72,7 +72,7 @@ func (c *DubboTestCodec) EncodeRequest(request *remoting.Request) (*bytes.Buffer
svc.Timeout = time.Duration(timeout)
header := impl.DubboHeader{}
- serialization := tmpInvocation.AttachmentsByKey(constant.SerializationKey, constant.Hessian2Serialization)
+ serialization := tmpInvocation.GetAttachmentWithDefaultValue(constant.SerializationKey, constant.Hessian2Serialization)
if serialization == constant.ProtobufSerialization {
header.SerialID = constant.SProto
} else {
diff --git a/remoting/getty/getty_client_test.go b/remoting/getty/getty_client_test.go
index 346bd7c..3d4216b 100644
--- a/remoting/getty/getty_client_test.go
+++ b/remoting/getty/getty_client_test.go
@@ -76,7 +76,7 @@ func createInvocation(methodName string, callback interface{}, reply interface{}
func setAttachment(invocation *invocation.RPCInvocation, attachments map[string]string) {
for key, value := range attachments {
- invocation.SetAttachments(key, value)
+ invocation.SetAttachment(key, value)
}
}