You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dubbo.apache.org by la...@apache.org on 2021/08/30 03:58:52 UTC

[dubbo-go] 01/01: hsf-go-dependecy branch init

This is an automated email from the ASF dual-hosted git repository.

laurence pushed a commit to branch hsf-go-dependency
in repository https://gitbox.apache.org/repos/asf/dubbo-go.git

commit 347fadf50abc6b57e12051b5774190428ab909b5
Author: LaurenceLiZhixin <38...@qq.com>
AuthorDate: Mon Aug 30 11:58:03 2021 +0800

    hsf-go-dependecy branch init
---
 common/constant/default.go            |   2 +-
 common/proxy/proxy.go                 |  30 ++++---
 common/proxy/proxy_factory/default.go |   8 +-
 common/rpc_service.go                 |  41 +++++++---
 common/rpc_service_test.go            |  33 ++++++++
 config/config_loader.go               |   7 +-
 config/metadata_report_config.go      |   2 +-
 config/service.go                     |   6 +-
 metadata/definition/definition.go     | 147 ++++++++++++++++++++++++++++++----
 metadata/service/inmemory/service.go  |   1 +
 metadata/service/remote/service.go    |  16 ----
 metadata/service/service.go           |   4 +-
 registry/protocol/protocol.go         |  40 ++++-----
 13 files changed, 249 insertions(+), 88 deletions(-)

diff --git a/common/constant/default.go b/common/constant/default.go
index da8cd20..1596f9f 100644
--- a/common/constant/default.go
+++ b/common/constant/default.go
@@ -51,7 +51,7 @@ const (
 const (
 	DEFAULT_KEY               = "default"
 	PREFIX_DEFAULT_KEY        = "default."
-	DEFAULT_SERVICE_FILTERS   = "echo,token,accesslog,tps,generic_service,execute,pshutdown"
+	DEFAULT_SERVICE_FILTERS   = "token,accesslog,tps,generic_service,execute,pshutdown"
 	DEFAULT_REFERENCE_FILTERS = "cshutdown"
 	GENERIC_REFERENCE_FILTERS = "generic"
 	GENERIC                   = "$invoke"
diff --git a/common/proxy/proxy.go b/common/proxy/proxy.go
index fd34810..f35e7ab 100644
--- a/common/proxy/proxy.go
+++ b/common/proxy/proxy.go
@@ -129,24 +129,25 @@ func DefaultProxyImplementFunc(p *Proxy, v common.RPCService) {
 	makeDubboCallProxy := func(methodName string, outs []reflect.Type) func(in []reflect.Value) []reflect.Value {
 		return func(in []reflect.Value) []reflect.Value {
 			var (
-				err    error
-				inv    *invocation_impl.RPCInvocation
-				inIArr []interface{}
-				inVArr []reflect.Value
-				reply  reflect.Value
+				err            error
+				inv            *invocation_impl.RPCInvocation
+				inIArr         []interface{}
+				inVArr         []reflect.Value
+				reply          reflect.Value
+				replyEmptyFlag bool
 			)
 			if methodName == "Echo" {
 				methodName = "$echo"
 			}
 
-			if len(outs) == 2 {
+			if len(outs) == 2 { // return (reply, error)
 				if outs[0].Kind() == reflect.Ptr {
 					reply = reflect.New(outs[0].Elem())
 				} else {
 					reply = reflect.New(outs[0])
 				}
-			} else {
-				reply = valueOf
+			} else { // only return error
+				replyEmptyFlag = true
 			}
 
 			start := 0
@@ -160,10 +161,6 @@ func DefaultProxyImplementFunc(p *Proxy, v common.RPCService) {
 					}
 					start += 1
 				}
-				if len(outs) == 1 && in[end-1].Type().Kind() == reflect.Ptr {
-					end -= 1
-					reply = in[len(in)-1]
-				}
 			}
 
 			if end-start <= 0 {
@@ -184,8 +181,11 @@ func DefaultProxyImplementFunc(p *Proxy, v common.RPCService) {
 			}
 
 			inv = invocation_impl.NewRPCInvocationWithOptions(invocation_impl.WithMethodName(methodName),
-				invocation_impl.WithArguments(inIArr), invocation_impl.WithReply(reply.Interface()),
+				invocation_impl.WithArguments(inIArr),
 				invocation_impl.WithCallBack(p.callback), invocation_impl.WithParameterValues(inVArr))
+			if !replyEmptyFlag {
+				inv.SetReply(reply.Interface())
+			}
 
 			for k, value := range p.attachments {
 				inv.SetAttachments(k, value)
@@ -215,8 +215,6 @@ func DefaultProxyImplementFunc(p *Proxy, v common.RPCService) {
 				} else {
 					logger.Warnf("result err: %v", err)
 				}
-			} else {
-				logger.Debugf("[makeDubboCallProxy] result: %v, err: %v", result.Result(), err)
 			}
 			if len(outs) == 1 {
 				return []reflect.Value{reflect.ValueOf(&err).Elem()}
@@ -251,7 +249,7 @@ func DefaultProxyImplementFunc(p *Proxy, v common.RPCService) {
 				continue
 			}
 
-			var funcOuts = make([]reflect.Type, outNum)
+			funcOuts := make([]reflect.Type, outNum)
 			for i := 0; i < outNum; i++ {
 				funcOuts[i] = t.Type.Out(i)
 			}
diff --git a/common/proxy/proxy_factory/default.go b/common/proxy/proxy_factory/default.go
index dd8ce02..6a070bc 100644
--- a/common/proxy/proxy_factory/default.go
+++ b/common/proxy/proxy_factory/default.go
@@ -137,10 +137,10 @@ func (pi *ProxyInvoker) Invoke(ctx context.Context, invocation protocol.Invocati
 
 	// prepare replyv
 	var replyv reflect.Value
-	if method.ReplyType() == nil && len(method.ArgsType()) > 0 {
-		replyv = reflect.New(method.ArgsType()[len(method.ArgsType())-1].Elem())
-		in = append(in, replyv)
-	}
+	//if method.ReplyType() == nil && len(method.ArgsType()) > 0 {
+	//	replyv = reflect.New(method.ArgsType()[len(method.ArgsType())-1].Elem())
+	//	in = append(in, replyv)
+	//}
 
 	returnValues := method.Method().Func.Call(in)
 
diff --git a/common/rpc_service.go b/common/rpc_service.go
index f739e31..dc10e2a 100644
--- a/common/rpc_service.go
+++ b/common/rpc_service.go
@@ -34,14 +34,43 @@ import (
 	"github.com/apache/dubbo-go/common/logger"
 )
 
-// RPCService
+// RPCService the type alias of interface{}
+type RPCService = interface{}
+
+// ReferencedRPCService
 // rpc service interface
-type RPCService interface {
+type ReferencedRPCService interface {
 	// Reference:
 	// rpc service id or reference id
 	Reference() string
 }
 
+// GetReference return the reference id of the service.
+// If the service implemented the ReferencedRPCService interface,
+// it will call the Reference method. If not, it will
+// return the struct name as the reference id.
+func GetReference(service RPCService) string {
+	if s, ok := service.(ReferencedRPCService); ok {
+		return s.Reference()
+	}
+
+	ref := ""
+	sType := reflect.TypeOf(service)
+	kind := sType.Kind()
+	switch kind {
+	case reflect.Struct:
+		ref = sType.Name()
+	case reflect.Ptr:
+		sName := sType.Elem().Name()
+		if sName != "" {
+			ref = sName
+		} else {
+			ref = sType.Elem().Field(0).Name
+		}
+	}
+	return ref
+}
+
 // AsyncCallbackService callback interface for async
 type AsyncCallbackService interface {
 	// Callback: callback
@@ -358,12 +387,6 @@ func suiteMethod(method reflect.Method) *MethodType {
 		return nil
 	}
 
-	if outNum != 1 && outNum != 2 {
-		logger.Warnf("method %s of mtype %v has wrong number of in out parameters %d; needs exactly 1/2",
-			mname, mtype.String(), outNum)
-		return nil
-	}
-
 	// The latest return type of the method must be error.
 	if returnType := mtype.Out(outNum - 1); returnType != typeOfError {
 		if mname != METHOD_MAPPER {
@@ -372,7 +395,7 @@ func suiteMethod(method reflect.Method) *MethodType {
 		return nil
 	}
 
-	// replyType
+	// todo, for multi reply condition, replyType is empty
 	if outNum == 2 {
 		replyType = mtype.Out(0)
 		if !isExportedOrBuiltinType(replyType) {
diff --git a/common/rpc_service_test.go b/common/rpc_service_test.go
index e8bd393..5c04c9e 100644
--- a/common/rpc_service_test.go
+++ b/common/rpc_service_test.go
@@ -213,3 +213,36 @@ func TestSuiteMethod(t *testing.T) {
 	methodType = suiteMethod(method)
 	assert.Nil(t, methodType)
 }
+
+type ServiceWithoutRef struct{}
+
+func TestGetReference(t *testing.T) {
+	s0 := &TestService{}
+	ref0 := GetReference(s0)
+	assert.Equal(t, referenceTestPath, ref0)
+
+	//s1 := TestService{}
+	//ref1 := GetReference(s1)
+	//assert.Equal(t, referenceTestPath, ref1)
+
+	s2 := &struct {
+		TestService
+	}{}
+	ref2 := GetReference(s2)
+	assert.Equal(t, referenceTestPath, ref2)
+
+	expectedReference := "ServiceWithoutRef"
+	s3 := &ServiceWithoutRef{}
+	ref3 := GetReference(s3)
+	assert.Equal(t, expectedReference, ref3)
+
+	s4 := ServiceWithoutRef{}
+	ref4 := GetReference(s4)
+	assert.Equal(t, expectedReference, ref4)
+
+	s5 := &struct {
+		ServiceWithoutRef
+	}{}
+	ref5 := GetReference(s5)
+	assert.Equal(t, expectedReference, ref5)
+}
diff --git a/config/config_loader.go b/config/config_loader.go
index 14caf2c..6f7d6fa 100644
--- a/config/config_loader.go
+++ b/config/config_loader.go
@@ -132,7 +132,7 @@ func loadConsumerConfig() {
 	}
 
 	// start the metadata report if config set
-	if err := startMetadataReport(GetApplicationConfig().MetadataType, GetBaseConfig().MetadataReportConfig); err != nil {
+	if err := StartMetadataReport(GetApplicationConfig().MetadataType, GetBaseConfig().MetadataReportConfig); err != nil {
 		logger.Errorf("Provider starts metadata report error, and the error is {%#v}", err)
 		return
 	}
@@ -220,7 +220,7 @@ func loadProviderConfig() {
 	}
 
 	// start the metadata report if config set
-	if err := startMetadataReport(GetApplicationConfig().MetadataType, GetBaseConfig().MetadataReportConfig); err != nil {
+	if err := StartMetadataReport(GetApplicationConfig().MetadataType, GetBaseConfig().MetadataReportConfig); err != nil {
 		logger.Errorf("Provider starts metadata report error, and the error is {%#v}", err)
 		return
 	}
@@ -375,7 +375,8 @@ func GetRPCService(name string) common.RPCService {
 
 // RPCService create rpc service for consumer
 func RPCService(service common.RPCService) {
-	consumerConfig.References[service.Reference()].Implement(service)
+	ref := common.GetReference(service)
+	consumerConfig.References[ref].Implement(service)
 }
 
 // GetMetricConfig find the MetricConfig
diff --git a/config/metadata_report_config.go b/config/metadata_report_config.go
index 6fb3fd2..602ba56 100644
--- a/config/metadata_report_config.go
+++ b/config/metadata_report_config.go
@@ -92,7 +92,7 @@ func (c *MetadataReportConfig) IsValid() bool {
 }
 
 // StartMetadataReport: The entry of metadata report start
-func startMetadataReport(metadataType string, metadataReportConfig *MetadataReportConfig) error {
+func StartMetadataReport(metadataType string, metadataReportConfig *MetadataReportConfig) error {
 	if metadataReportConfig == nil || !metadataReportConfig.IsValid() {
 		return nil
 	}
diff --git a/config/service.go b/config/service.go
index 6deff3b..3eac7d3 100644
--- a/config/service.go
+++ b/config/service.go
@@ -28,12 +28,14 @@ var (
 
 // SetConsumerService is called by init() of implement of RPCService
 func SetConsumerService(service common.RPCService) {
-	conServices[service.Reference()] = service
+	ref := common.GetReference(service)
+	conServices[ref] = service
 }
 
 // SetProviderService is called by init() of implement of RPCService
 func SetProviderService(service common.RPCService) {
-	proServices[service.Reference()] = service
+	ref := common.GetReference(service)
+	proServices[ref] = service
 }
 
 // GetConsumerService gets ConsumerService by @name
diff --git a/metadata/definition/definition.go b/metadata/definition/definition.go
index a032313..015bdc0 100644
--- a/metadata/definition/definition.go
+++ b/metadata/definition/definition.go
@@ -21,7 +21,13 @@ import (
 	"bytes"
 	"encoding/json"
 	"fmt"
+	"reflect"
 	"strings"
+	"time"
+)
+
+import (
+	hessian "github.com/apache/dubbo-go-hessian2"
 )
 
 import (
@@ -36,10 +42,10 @@ type ServiceDefiner interface {
 
 // ServiceDefinition is the describer of service definition
 type ServiceDefinition struct {
-	CanonicalName string
-	CodeSource    string
-	Methods       []MethodDefinition
-	Types         []TypeDefinition
+	CanonicalName string             `json:"canonicalName"`
+	CodeSource    string             `json:"codeSource"`
+	Methods       []MethodDefinition `json:"methods"`
+	Types         []TypeDefinition   `json:"types"`
 }
 
 // ToBytes convert ServiceDefinition to json string
@@ -76,20 +82,20 @@ type FullServiceDefinition struct {
 
 // MethodDefinition is the describer of method definition
 type MethodDefinition struct {
-	Name           string
-	ParameterTypes []string
-	ReturnType     string
-	Parameters     []TypeDefinition
+	Name           string           `json:"name"`
+	ParameterTypes []string         `json:"parameterTypes"`
+	ReturnType     string           `json:"returnType"`
+	Parameters     []TypeDefinition `json:"parameters"`
 }
 
 // TypeDefinition is the describer of type definition
 type TypeDefinition struct {
-	Id              string
-	Type            string
-	Items           []TypeDefinition
-	Enums           []string
-	Properties      map[string]TypeDefinition
-	TypeBuilderName string
+	Id              string                    `json:"id"`
+	Type            string                    `json:"type"`
+	Items           []TypeDefinition          `json:"items"`
+	Enums           []string                  `json:"enums"`
+	Properties      map[string]TypeDefinition `json:"properties"`
+	TypeBuilderName string                    `json:"typeBuilderName"`
 }
 
 // BuildServiceDefinition can build service definition which will be used to describe a service
@@ -99,15 +105,26 @@ func BuildServiceDefinition(service common.Service, url *common.URL) *ServiceDef
 
 	for k, m := range service.Method() {
 		var paramTypes []string
+		var param string
 		if len(m.ArgsType()) > 0 {
 			for _, t := range m.ArgsType() {
-				paramTypes = append(paramTypes, t.Kind().String())
+				if t.Kind() == reflect.Ptr {
+					param = getArgType(reflect.New(t).Interface())
+				} else {
+					param = t.Kind().String()
+				}
+				paramTypes = append(paramTypes, param)
 			}
 		}
 
 		var returnType string
+
 		if m.ReplyType() != nil {
-			returnType = m.ReplyType().Kind().String()
+			if m.ReplyType().Kind() == reflect.Ptr {
+				returnType = getArgType(reflect.New(m.ReplyType()).Interface())
+			} else {
+				returnType = m.ReplyType().Kind().String()
+			}
 		}
 
 		methodD := MethodDefinition{
@@ -135,3 +152,101 @@ func ServiceDescriperBuild(serviceName string, group string, version string) str
 	}
 	return buf.String()
 }
+
+func getArgType(v interface{}) string {
+	if v == nil {
+		return "V"
+	}
+
+	v = reflect.ValueOf(v).Elem().Interface()
+
+	switch v.(type) {
+	// Serialized tags for base types
+	case nil:
+		return "V"
+	case bool:
+		return "Z"
+	case []bool:
+		return "[Z"
+	case byte:
+		return "B"
+	case []byte:
+		return "[B"
+	case int8:
+		return "B"
+	case []int8:
+		return "[B"
+	case int16:
+		return "S"
+	case []int16:
+		return "[S"
+	case uint16: // Equivalent to Char of Java
+		return "C"
+	case []uint16:
+		return "[C"
+	// case rune:
+	//	return "C"
+	case int:
+		return "J"
+	case []int:
+		return "[J"
+	case int32:
+		return "I"
+	case []int32:
+		return "[I"
+	case int64:
+		return "J"
+	case []int64:
+		return "[J"
+	case time.Time:
+		return "java.util.Date"
+	case []time.Time:
+		return "[Ljava.util.Date"
+	case float32:
+		return "F"
+	case []float32:
+		return "[F"
+	case float64:
+		return "D"
+	case []float64:
+		return "[D"
+	case string:
+		return "java.lang.String"
+	case []string:
+		return "[Ljava.lang.String;"
+	case []hessian.Object:
+		return "[Ljava.lang.Object;"
+	case map[interface{}]interface{}:
+		// return  "java.util.HashMap"
+		return "java.util.Map"
+	case hessian.POJO:
+		return v.(hessian.POJO).JavaClassName()
+	//  Serialized tags for complex types
+	default:
+		t := reflect.TypeOf(v)
+		if reflect.Ptr == t.Kind() {
+			t = reflect.TypeOf(reflect.ValueOf(v).Elem())
+		}
+		switch t.Kind() {
+		case reflect.Struct:
+			v, ok := v.(hessian.POJO)
+			if ok {
+				return v.JavaClassName()
+			}
+			return "java.lang.Object"
+		case reflect.Slice, reflect.Array:
+			if t.Elem().Kind() == reflect.Struct {
+				return "[Ljava.lang.Object;"
+			}
+			// return "java.util.ArrayList"
+			return "java.util.List"
+		case reflect.Map: // Enter here, map may be map[string]int
+			return "java.util.Map"
+		default:
+			return ""
+		}
+	}
+
+	// unreachable
+	// return "java.lang.RuntimeException"
+}
diff --git a/metadata/service/inmemory/service.go b/metadata/service/inmemory/service.go
index 8da78c3..36eefe0 100644
--- a/metadata/service/inmemory/service.go
+++ b/metadata/service/inmemory/service.go
@@ -14,6 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package inmemory
 
 import (
diff --git a/metadata/service/remote/service.go b/metadata/service/remote/service.go
index e2a7a64..302ae32 100644
--- a/metadata/service/remote/service.go
+++ b/metadata/service/remote/service.go
@@ -132,22 +132,6 @@ func (mts *MetadataService) PublishServiceDefinition(url *common.URL) error {
 			return nil
 		}
 		logger.Errorf("publishProvider interfaceName is empty . providerUrl:%v ", url)
-	} else {
-		params := make(map[string]string, len(url.GetParams()))
-		url.RangeParams(func(key, value string) bool {
-			params[key] = value
-			return true
-		})
-		id := &identifier.MetadataIdentifier{
-			BaseMetadataIdentifier: identifier.BaseMetadataIdentifier{
-				ServiceInterface: interfaceName,
-				Version:          url.GetParam(constant.VERSION_KEY, ""),
-				Group:            url.GetParam(constant.GROUP_KEY, constant.DUBBO),
-				Side:             url.GetParam(constant.SIDE_KEY, "consumer"),
-			},
-		}
-		mts.delegateReport.StoreConsumerMetadata(id, params)
-		return nil
 	}
 
 	return nil
diff --git a/metadata/service/service.go b/metadata/service/service.go
index 1d90f8a..f7a39bd 100644
--- a/metadata/service/service.go
+++ b/metadata/service/service.go
@@ -30,7 +30,7 @@ import (
 // MetadataService is used to define meta data related behaviors
 // usually the implementation should be singleton
 type MetadataService interface {
-	common.RPCService
+	common.ReferencedRPCService
 	// ServiceName will get the service's name in meta service , which is application name
 	ServiceName() (string, error)
 	// ExportURL will store the exported url in metadata
@@ -86,7 +86,7 @@ func (mts *BaseMetadataService) ServiceName() (string, error) {
 	return mts.serviceName, nil
 }
 
-// Version will return the version of metadata service
+// Reference will return the reference id of metadata service
 func (mts *BaseMetadataService) Reference() string {
 	return constant.SIMPLE_METADATA_SERVICE_NAME
 }
diff --git a/registry/protocol/protocol.go b/registry/protocol/protocol.go
index e50e2b9..0f37ac7 100644
--- a/registry/protocol/protocol.go
+++ b/registry/protocol/protocol.go
@@ -186,20 +186,22 @@ func (proto *registryProtocol) Export(invoker protocol.Invoker) protocol.Exporte
 	serviceConfigurationListener.OverrideUrl(providerUrl)
 
 	var reg registry.Registry
-	if regI, loaded := proto.registries.Load(registryUrl.Key()); !loaded {
-		reg = getRegistry(registryUrl)
-		proto.registries.Store(registryUrl.Key(), reg)
-		logger.Infof("Export proto:%p registries address:%p", proto, proto.registries)
-	} else {
-		reg = regI.(registry.Registry)
-	}
 
-	registeredProviderUrl := getUrlToRegistry(providerUrl, registryUrl)
-	err := reg.Register(registeredProviderUrl)
-	if err != nil {
-		logger.Errorf("provider service %v register registry %v error, error message is %s",
-			providerUrl.Key(), registryUrl.Key(), err.Error())
-		return nil
+	if registryUrl.Protocol != "" {
+		if regI, loaded := proto.registries.Load(registryUrl.Key()); !loaded {
+			reg = getRegistry(registryUrl)
+			proto.registries.Store(registryUrl.Key(), reg)
+			logger.Infof("Export proto:%p registries address:%p", proto, proto.registries)
+		} else {
+			reg = regI.(registry.Registry)
+		}
+		registeredProviderUrl := getUrlToRegistry(providerUrl, registryUrl)
+		err := reg.Register(registeredProviderUrl)
+		if err != nil {
+			logger.Errorf("provider service %v register registry %v error, error message is %s",
+				providerUrl.Key(), registryUrl.Key(), err.Error())
+			return nil
+		}
 	}
 
 	key := getCacheKey(invoker)
@@ -214,11 +216,13 @@ func (proto *registryProtocol) Export(invoker protocol.Invoker) protocol.Exporte
 		logger.Infof("The exporter has not been cached, and will return a new exporter!")
 	}
 
-	go func() {
-		if err = reg.Subscribe(overriderUrl, overrideSubscribeListener); err != nil {
-			logger.Warnf("reg.subscribe(overriderUrl:%v) = error:%v", overriderUrl, err)
-		}
-	}()
+	if registryUrl.Protocol != "" {
+		go func() {
+			if err := reg.Subscribe(overriderUrl, overrideSubscribeListener); err != nil {
+				logger.Warnf("reg.subscribe(overriderUrl:%v) = error:%v", overriderUrl, err)
+			}
+		}()
+	}
 	return cachedExporter.(protocol.Exporter)
 }