You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dubbo.apache.org by la...@apache.org on 2021/08/30 03:58:52 UTC
[dubbo-go] 01/01: hsf-go-dependecy branch init
This is an automated email from the ASF dual-hosted git repository.
laurence pushed a commit to branch hsf-go-dependency
in repository https://gitbox.apache.org/repos/asf/dubbo-go.git
commit 347fadf50abc6b57e12051b5774190428ab909b5
Author: LaurenceLiZhixin <38...@qq.com>
AuthorDate: Mon Aug 30 11:58:03 2021 +0800
hsf-go-dependecy branch init
---
common/constant/default.go | 2 +-
common/proxy/proxy.go | 30 ++++---
common/proxy/proxy_factory/default.go | 8 +-
common/rpc_service.go | 41 +++++++---
common/rpc_service_test.go | 33 ++++++++
config/config_loader.go | 7 +-
config/metadata_report_config.go | 2 +-
config/service.go | 6 +-
metadata/definition/definition.go | 147 ++++++++++++++++++++++++++++++----
metadata/service/inmemory/service.go | 1 +
metadata/service/remote/service.go | 16 ----
metadata/service/service.go | 4 +-
registry/protocol/protocol.go | 40 ++++-----
13 files changed, 249 insertions(+), 88 deletions(-)
diff --git a/common/constant/default.go b/common/constant/default.go
index da8cd20..1596f9f 100644
--- a/common/constant/default.go
+++ b/common/constant/default.go
@@ -51,7 +51,7 @@ const (
const (
DEFAULT_KEY = "default"
PREFIX_DEFAULT_KEY = "default."
- DEFAULT_SERVICE_FILTERS = "echo,token,accesslog,tps,generic_service,execute,pshutdown"
+ DEFAULT_SERVICE_FILTERS = "token,accesslog,tps,generic_service,execute,pshutdown"
DEFAULT_REFERENCE_FILTERS = "cshutdown"
GENERIC_REFERENCE_FILTERS = "generic"
GENERIC = "$invoke"
diff --git a/common/proxy/proxy.go b/common/proxy/proxy.go
index fd34810..f35e7ab 100644
--- a/common/proxy/proxy.go
+++ b/common/proxy/proxy.go
@@ -129,24 +129,25 @@ func DefaultProxyImplementFunc(p *Proxy, v common.RPCService) {
makeDubboCallProxy := func(methodName string, outs []reflect.Type) func(in []reflect.Value) []reflect.Value {
return func(in []reflect.Value) []reflect.Value {
var (
- err error
- inv *invocation_impl.RPCInvocation
- inIArr []interface{}
- inVArr []reflect.Value
- reply reflect.Value
+ err error
+ inv *invocation_impl.RPCInvocation
+ inIArr []interface{}
+ inVArr []reflect.Value
+ reply reflect.Value
+ replyEmptyFlag bool
)
if methodName == "Echo" {
methodName = "$echo"
}
- if len(outs) == 2 {
+ if len(outs) == 2 { // return (reply, error)
if outs[0].Kind() == reflect.Ptr {
reply = reflect.New(outs[0].Elem())
} else {
reply = reflect.New(outs[0])
}
- } else {
- reply = valueOf
+ } else { // only return error
+ replyEmptyFlag = true
}
start := 0
@@ -160,10 +161,6 @@ func DefaultProxyImplementFunc(p *Proxy, v common.RPCService) {
}
start += 1
}
- if len(outs) == 1 && in[end-1].Type().Kind() == reflect.Ptr {
- end -= 1
- reply = in[len(in)-1]
- }
}
if end-start <= 0 {
@@ -184,8 +181,11 @@ func DefaultProxyImplementFunc(p *Proxy, v common.RPCService) {
}
inv = invocation_impl.NewRPCInvocationWithOptions(invocation_impl.WithMethodName(methodName),
- invocation_impl.WithArguments(inIArr), invocation_impl.WithReply(reply.Interface()),
+ invocation_impl.WithArguments(inIArr),
invocation_impl.WithCallBack(p.callback), invocation_impl.WithParameterValues(inVArr))
+ if !replyEmptyFlag {
+ inv.SetReply(reply.Interface())
+ }
for k, value := range p.attachments {
inv.SetAttachments(k, value)
@@ -215,8 +215,6 @@ func DefaultProxyImplementFunc(p *Proxy, v common.RPCService) {
} else {
logger.Warnf("result err: %v", err)
}
- } else {
- logger.Debugf("[makeDubboCallProxy] result: %v, err: %v", result.Result(), err)
}
if len(outs) == 1 {
return []reflect.Value{reflect.ValueOf(&err).Elem()}
@@ -251,7 +249,7 @@ func DefaultProxyImplementFunc(p *Proxy, v common.RPCService) {
continue
}
- var funcOuts = make([]reflect.Type, outNum)
+ funcOuts := make([]reflect.Type, outNum)
for i := 0; i < outNum; i++ {
funcOuts[i] = t.Type.Out(i)
}
diff --git a/common/proxy/proxy_factory/default.go b/common/proxy/proxy_factory/default.go
index dd8ce02..6a070bc 100644
--- a/common/proxy/proxy_factory/default.go
+++ b/common/proxy/proxy_factory/default.go
@@ -137,10 +137,10 @@ func (pi *ProxyInvoker) Invoke(ctx context.Context, invocation protocol.Invocati
// prepare replyv
var replyv reflect.Value
- if method.ReplyType() == nil && len(method.ArgsType()) > 0 {
- replyv = reflect.New(method.ArgsType()[len(method.ArgsType())-1].Elem())
- in = append(in, replyv)
- }
+ //if method.ReplyType() == nil && len(method.ArgsType()) > 0 {
+ // replyv = reflect.New(method.ArgsType()[len(method.ArgsType())-1].Elem())
+ // in = append(in, replyv)
+ //}
returnValues := method.Method().Func.Call(in)
diff --git a/common/rpc_service.go b/common/rpc_service.go
index f739e31..dc10e2a 100644
--- a/common/rpc_service.go
+++ b/common/rpc_service.go
@@ -34,14 +34,43 @@ import (
"github.com/apache/dubbo-go/common/logger"
)
-// RPCService
+// RPCService the type alias of interface{}
+type RPCService = interface{}
+
+// ReferencedRPCService
// rpc service interface
-type RPCService interface {
+type ReferencedRPCService interface {
// Reference:
// rpc service id or reference id
Reference() string
}
+// GetReference return the reference id of the service.
+// If the service implemented the ReferencedRPCService interface,
+// it will call the Reference method. If not, it will
+// return the struct name as the reference id.
+func GetReference(service RPCService) string {
+ if s, ok := service.(ReferencedRPCService); ok {
+ return s.Reference()
+ }
+
+ ref := ""
+ sType := reflect.TypeOf(service)
+ kind := sType.Kind()
+ switch kind {
+ case reflect.Struct:
+ ref = sType.Name()
+ case reflect.Ptr:
+ sName := sType.Elem().Name()
+ if sName != "" {
+ ref = sName
+ } else {
+ ref = sType.Elem().Field(0).Name
+ }
+ }
+ return ref
+}
+
// AsyncCallbackService callback interface for async
type AsyncCallbackService interface {
// Callback: callback
@@ -358,12 +387,6 @@ func suiteMethod(method reflect.Method) *MethodType {
return nil
}
- if outNum != 1 && outNum != 2 {
- logger.Warnf("method %s of mtype %v has wrong number of in out parameters %d; needs exactly 1/2",
- mname, mtype.String(), outNum)
- return nil
- }
-
// The latest return type of the method must be error.
if returnType := mtype.Out(outNum - 1); returnType != typeOfError {
if mname != METHOD_MAPPER {
@@ -372,7 +395,7 @@ func suiteMethod(method reflect.Method) *MethodType {
return nil
}
- // replyType
+ // todo, for multi reply condition, replyType is empty
if outNum == 2 {
replyType = mtype.Out(0)
if !isExportedOrBuiltinType(replyType) {
diff --git a/common/rpc_service_test.go b/common/rpc_service_test.go
index e8bd393..5c04c9e 100644
--- a/common/rpc_service_test.go
+++ b/common/rpc_service_test.go
@@ -213,3 +213,36 @@ func TestSuiteMethod(t *testing.T) {
methodType = suiteMethod(method)
assert.Nil(t, methodType)
}
+
+type ServiceWithoutRef struct{}
+
+func TestGetReference(t *testing.T) {
+ s0 := &TestService{}
+ ref0 := GetReference(s0)
+ assert.Equal(t, referenceTestPath, ref0)
+
+ //s1 := TestService{}
+ //ref1 := GetReference(s1)
+ //assert.Equal(t, referenceTestPath, ref1)
+
+ s2 := &struct {
+ TestService
+ }{}
+ ref2 := GetReference(s2)
+ assert.Equal(t, referenceTestPath, ref2)
+
+ expectedReference := "ServiceWithoutRef"
+ s3 := &ServiceWithoutRef{}
+ ref3 := GetReference(s3)
+ assert.Equal(t, expectedReference, ref3)
+
+ s4 := ServiceWithoutRef{}
+ ref4 := GetReference(s4)
+ assert.Equal(t, expectedReference, ref4)
+
+ s5 := &struct {
+ ServiceWithoutRef
+ }{}
+ ref5 := GetReference(s5)
+ assert.Equal(t, expectedReference, ref5)
+}
diff --git a/config/config_loader.go b/config/config_loader.go
index 14caf2c..6f7d6fa 100644
--- a/config/config_loader.go
+++ b/config/config_loader.go
@@ -132,7 +132,7 @@ func loadConsumerConfig() {
}
// start the metadata report if config set
- if err := startMetadataReport(GetApplicationConfig().MetadataType, GetBaseConfig().MetadataReportConfig); err != nil {
+ if err := StartMetadataReport(GetApplicationConfig().MetadataType, GetBaseConfig().MetadataReportConfig); err != nil {
logger.Errorf("Provider starts metadata report error, and the error is {%#v}", err)
return
}
@@ -220,7 +220,7 @@ func loadProviderConfig() {
}
// start the metadata report if config set
- if err := startMetadataReport(GetApplicationConfig().MetadataType, GetBaseConfig().MetadataReportConfig); err != nil {
+ if err := StartMetadataReport(GetApplicationConfig().MetadataType, GetBaseConfig().MetadataReportConfig); err != nil {
logger.Errorf("Provider starts metadata report error, and the error is {%#v}", err)
return
}
@@ -375,7 +375,8 @@ func GetRPCService(name string) common.RPCService {
// RPCService create rpc service for consumer
func RPCService(service common.RPCService) {
- consumerConfig.References[service.Reference()].Implement(service)
+ ref := common.GetReference(service)
+ consumerConfig.References[ref].Implement(service)
}
// GetMetricConfig find the MetricConfig
diff --git a/config/metadata_report_config.go b/config/metadata_report_config.go
index 6fb3fd2..602ba56 100644
--- a/config/metadata_report_config.go
+++ b/config/metadata_report_config.go
@@ -92,7 +92,7 @@ func (c *MetadataReportConfig) IsValid() bool {
}
// StartMetadataReport: The entry of metadata report start
-func startMetadataReport(metadataType string, metadataReportConfig *MetadataReportConfig) error {
+func StartMetadataReport(metadataType string, metadataReportConfig *MetadataReportConfig) error {
if metadataReportConfig == nil || !metadataReportConfig.IsValid() {
return nil
}
diff --git a/config/service.go b/config/service.go
index 6deff3b..3eac7d3 100644
--- a/config/service.go
+++ b/config/service.go
@@ -28,12 +28,14 @@ var (
// SetConsumerService is called by init() of implement of RPCService
func SetConsumerService(service common.RPCService) {
- conServices[service.Reference()] = service
+ ref := common.GetReference(service)
+ conServices[ref] = service
}
// SetProviderService is called by init() of implement of RPCService
func SetProviderService(service common.RPCService) {
- proServices[service.Reference()] = service
+ ref := common.GetReference(service)
+ proServices[ref] = service
}
// GetConsumerService gets ConsumerService by @name
diff --git a/metadata/definition/definition.go b/metadata/definition/definition.go
index a032313..015bdc0 100644
--- a/metadata/definition/definition.go
+++ b/metadata/definition/definition.go
@@ -21,7 +21,13 @@ import (
"bytes"
"encoding/json"
"fmt"
+ "reflect"
"strings"
+ "time"
+)
+
+import (
+ hessian "github.com/apache/dubbo-go-hessian2"
)
import (
@@ -36,10 +42,10 @@ type ServiceDefiner interface {
// ServiceDefinition is the describer of service definition
type ServiceDefinition struct {
- CanonicalName string
- CodeSource string
- Methods []MethodDefinition
- Types []TypeDefinition
+ CanonicalName string `json:"canonicalName"`
+ CodeSource string `json:"codeSource"`
+ Methods []MethodDefinition `json:"methods"`
+ Types []TypeDefinition `json:"types"`
}
// ToBytes convert ServiceDefinition to json string
@@ -76,20 +82,20 @@ type FullServiceDefinition struct {
// MethodDefinition is the describer of method definition
type MethodDefinition struct {
- Name string
- ParameterTypes []string
- ReturnType string
- Parameters []TypeDefinition
+ Name string `json:"name"`
+ ParameterTypes []string `json:"parameterTypes"`
+ ReturnType string `json:"returnType"`
+ Parameters []TypeDefinition `json:"parameters"`
}
// TypeDefinition is the describer of type definition
type TypeDefinition struct {
- Id string
- Type string
- Items []TypeDefinition
- Enums []string
- Properties map[string]TypeDefinition
- TypeBuilderName string
+ Id string `json:"id"`
+ Type string `json:"type"`
+ Items []TypeDefinition `json:"items"`
+ Enums []string `json:"enums"`
+ Properties map[string]TypeDefinition `json:"properties"`
+ TypeBuilderName string `json:"typeBuilderName"`
}
// BuildServiceDefinition can build service definition which will be used to describe a service
@@ -99,15 +105,26 @@ func BuildServiceDefinition(service common.Service, url *common.URL) *ServiceDef
for k, m := range service.Method() {
var paramTypes []string
+ var param string
if len(m.ArgsType()) > 0 {
for _, t := range m.ArgsType() {
- paramTypes = append(paramTypes, t.Kind().String())
+ if t.Kind() == reflect.Ptr {
+ param = getArgType(reflect.New(t).Interface())
+ } else {
+ param = t.Kind().String()
+ }
+ paramTypes = append(paramTypes, param)
}
}
var returnType string
+
if m.ReplyType() != nil {
- returnType = m.ReplyType().Kind().String()
+ if m.ReplyType().Kind() == reflect.Ptr {
+ returnType = getArgType(reflect.New(m.ReplyType()).Interface())
+ } else {
+ returnType = m.ReplyType().Kind().String()
+ }
}
methodD := MethodDefinition{
@@ -135,3 +152,101 @@ func ServiceDescriperBuild(serviceName string, group string, version string) str
}
return buf.String()
}
+
+func getArgType(v interface{}) string {
+ if v == nil {
+ return "V"
+ }
+
+ v = reflect.ValueOf(v).Elem().Interface()
+
+ switch v.(type) {
+ // Serialized tags for base types
+ case nil:
+ return "V"
+ case bool:
+ return "Z"
+ case []bool:
+ return "[Z"
+ case byte:
+ return "B"
+ case []byte:
+ return "[B"
+ case int8:
+ return "B"
+ case []int8:
+ return "[B"
+ case int16:
+ return "S"
+ case []int16:
+ return "[S"
+ case uint16: // Equivalent to Char of Java
+ return "C"
+ case []uint16:
+ return "[C"
+ // case rune:
+ // return "C"
+ case int:
+ return "J"
+ case []int:
+ return "[J"
+ case int32:
+ return "I"
+ case []int32:
+ return "[I"
+ case int64:
+ return "J"
+ case []int64:
+ return "[J"
+ case time.Time:
+ return "java.util.Date"
+ case []time.Time:
+ return "[Ljava.util.Date"
+ case float32:
+ return "F"
+ case []float32:
+ return "[F"
+ case float64:
+ return "D"
+ case []float64:
+ return "[D"
+ case string:
+ return "java.lang.String"
+ case []string:
+ return "[Ljava.lang.String;"
+ case []hessian.Object:
+ return "[Ljava.lang.Object;"
+ case map[interface{}]interface{}:
+ // return "java.util.HashMap"
+ return "java.util.Map"
+ case hessian.POJO:
+ return v.(hessian.POJO).JavaClassName()
+ // Serialized tags for complex types
+ default:
+ t := reflect.TypeOf(v)
+ if reflect.Ptr == t.Kind() {
+ t = reflect.TypeOf(reflect.ValueOf(v).Elem())
+ }
+ switch t.Kind() {
+ case reflect.Struct:
+ v, ok := v.(hessian.POJO)
+ if ok {
+ return v.JavaClassName()
+ }
+ return "java.lang.Object"
+ case reflect.Slice, reflect.Array:
+ if t.Elem().Kind() == reflect.Struct {
+ return "[Ljava.lang.Object;"
+ }
+ // return "java.util.ArrayList"
+ return "java.util.List"
+ case reflect.Map: // Enter here, map may be map[string]int
+ return "java.util.Map"
+ default:
+ return ""
+ }
+ }
+
+ // unreachable
+ // return "java.lang.RuntimeException"
+}
diff --git a/metadata/service/inmemory/service.go b/metadata/service/inmemory/service.go
index 8da78c3..36eefe0 100644
--- a/metadata/service/inmemory/service.go
+++ b/metadata/service/inmemory/service.go
@@ -14,6 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package inmemory
import (
diff --git a/metadata/service/remote/service.go b/metadata/service/remote/service.go
index e2a7a64..302ae32 100644
--- a/metadata/service/remote/service.go
+++ b/metadata/service/remote/service.go
@@ -132,22 +132,6 @@ func (mts *MetadataService) PublishServiceDefinition(url *common.URL) error {
return nil
}
logger.Errorf("publishProvider interfaceName is empty . providerUrl:%v ", url)
- } else {
- params := make(map[string]string, len(url.GetParams()))
- url.RangeParams(func(key, value string) bool {
- params[key] = value
- return true
- })
- id := &identifier.MetadataIdentifier{
- BaseMetadataIdentifier: identifier.BaseMetadataIdentifier{
- ServiceInterface: interfaceName,
- Version: url.GetParam(constant.VERSION_KEY, ""),
- Group: url.GetParam(constant.GROUP_KEY, constant.DUBBO),
- Side: url.GetParam(constant.SIDE_KEY, "consumer"),
- },
- }
- mts.delegateReport.StoreConsumerMetadata(id, params)
- return nil
}
return nil
diff --git a/metadata/service/service.go b/metadata/service/service.go
index 1d90f8a..f7a39bd 100644
--- a/metadata/service/service.go
+++ b/metadata/service/service.go
@@ -30,7 +30,7 @@ import (
// MetadataService is used to define meta data related behaviors
// usually the implementation should be singleton
type MetadataService interface {
- common.RPCService
+ common.ReferencedRPCService
// ServiceName will get the service's name in meta service , which is application name
ServiceName() (string, error)
// ExportURL will store the exported url in metadata
@@ -86,7 +86,7 @@ func (mts *BaseMetadataService) ServiceName() (string, error) {
return mts.serviceName, nil
}
-// Version will return the version of metadata service
+// Reference will return the reference id of metadata service
func (mts *BaseMetadataService) Reference() string {
return constant.SIMPLE_METADATA_SERVICE_NAME
}
diff --git a/registry/protocol/protocol.go b/registry/protocol/protocol.go
index e50e2b9..0f37ac7 100644
--- a/registry/protocol/protocol.go
+++ b/registry/protocol/protocol.go
@@ -186,20 +186,22 @@ func (proto *registryProtocol) Export(invoker protocol.Invoker) protocol.Exporte
serviceConfigurationListener.OverrideUrl(providerUrl)
var reg registry.Registry
- if regI, loaded := proto.registries.Load(registryUrl.Key()); !loaded {
- reg = getRegistry(registryUrl)
- proto.registries.Store(registryUrl.Key(), reg)
- logger.Infof("Export proto:%p registries address:%p", proto, proto.registries)
- } else {
- reg = regI.(registry.Registry)
- }
- registeredProviderUrl := getUrlToRegistry(providerUrl, registryUrl)
- err := reg.Register(registeredProviderUrl)
- if err != nil {
- logger.Errorf("provider service %v register registry %v error, error message is %s",
- providerUrl.Key(), registryUrl.Key(), err.Error())
- return nil
+ if registryUrl.Protocol != "" {
+ if regI, loaded := proto.registries.Load(registryUrl.Key()); !loaded {
+ reg = getRegistry(registryUrl)
+ proto.registries.Store(registryUrl.Key(), reg)
+ logger.Infof("Export proto:%p registries address:%p", proto, proto.registries)
+ } else {
+ reg = regI.(registry.Registry)
+ }
+ registeredProviderUrl := getUrlToRegistry(providerUrl, registryUrl)
+ err := reg.Register(registeredProviderUrl)
+ if err != nil {
+ logger.Errorf("provider service %v register registry %v error, error message is %s",
+ providerUrl.Key(), registryUrl.Key(), err.Error())
+ return nil
+ }
}
key := getCacheKey(invoker)
@@ -214,11 +216,13 @@ func (proto *registryProtocol) Export(invoker protocol.Invoker) protocol.Exporte
logger.Infof("The exporter has not been cached, and will return a new exporter!")
}
- go func() {
- if err = reg.Subscribe(overriderUrl, overrideSubscribeListener); err != nil {
- logger.Warnf("reg.subscribe(overriderUrl:%v) = error:%v", overriderUrl, err)
- }
- }()
+ if registryUrl.Protocol != "" {
+ go func() {
+ if err := reg.Subscribe(overriderUrl, overrideSubscribeListener); err != nil {
+ logger.Warnf("reg.subscribe(overriderUrl:%v) = error:%v", overriderUrl, err)
+ }
+ }()
+ }
return cachedExporter.(protocol.Exporter)
}