You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dubbo.apache.org by la...@apache.org on 2021/12/21 06:15:09 UTC

[dubbo-go] branch 1.5 updated: support $invokeAsync (#1682)

This is an automated email from the ASF dual-hosted git repository.

laurence pushed a commit to branch 1.5
in repository https://gitbox.apache.org/repos/asf/dubbo-go.git


The following commit(s) were added to refs/heads/1.5 by this push:
     new ee7f479  support $invokeAsync (#1682)
ee7f479 is described below

commit ee7f479c5ccf366cc3c122d901476dbe7bc17aad
Author: 刘月财 <38...@users.noreply.github.com>
AuthorDate: Tue Dec 21 14:02:46 2021 +0800

    support $invokeAsync (#1682)
    
    * support $invokeAsync
    
    * edit code annotation
---
 cluster/cluster_impl/base_cluster_invoker.go       |  3 +--
 cluster/cluster_impl/failfast_cluster_invoker.go   |  2 +-
 cluster/cluster_impl/failover_cluster_invoker.go   |  4 ++--
 cluster/cluster_impl/forking_cluster_invoker.go    |  2 +-
 cluster/cluster_impl/zone_aware_cluster_invoker.go |  2 +-
 common/constant/default.go                         |  1 +
 filter/filter_impl/generic_filter.go               |  5 +++--
 protocol/invocation.go                             |  4 ++++
 protocol/invocation/rpcinvocation.go               | 17 +++++++++++++++++
 9 files changed, 31 insertions(+), 9 deletions(-)

diff --git a/cluster/cluster_impl/base_cluster_invoker.go b/cluster/cluster_impl/base_cluster_invoker.go
index 27ba9b0..24d11ce 100644
--- a/cluster/cluster_impl/base_cluster_invoker.go
+++ b/cluster/cluster_impl/base_cluster_invoker.go
@@ -177,10 +177,9 @@ func isInvoked(selectedInvoker protocol.Invoker, invoked []protocol.Invoker) boo
 	return false
 }
 
-func getLoadBalance(invoker protocol.Invoker, invocation protocol.Invocation) cluster.LoadBalance {
+func getLoadBalance(invoker protocol.Invoker, methodName string) cluster.LoadBalance {
 	url := invoker.GetURL()
 
-	methodName := invocation.MethodName()
 	//Get the service loadbalance config
 	lb := url.GetParam(constant.LOADBALANCE_KEY, constant.DEFAULT_LOADBALANCE)
 
diff --git a/cluster/cluster_impl/failfast_cluster_invoker.go b/cluster/cluster_impl/failfast_cluster_invoker.go
index d71ef5f..018ec19 100644
--- a/cluster/cluster_impl/failfast_cluster_invoker.go
+++ b/cluster/cluster_impl/failfast_cluster_invoker.go
@@ -43,7 +43,7 @@ func (invoker *failfastClusterInvoker) Invoke(ctx context.Context, invocation pr
 		return &protocol.RPCResult{Err: err}
 	}
 
-	loadbalance := getLoadBalance(invokers[0], invocation)
+	loadbalance := getLoadBalance(invokers[0], invocation.ActualMethodName())
 
 	err = invoker.checkWhetherDestroyed()
 	if err != nil {
diff --git a/cluster/cluster_impl/failover_cluster_invoker.go b/cluster/cluster_impl/failover_cluster_invoker.go
index 1740bce..9e3503e 100644
--- a/cluster/cluster_impl/failover_cluster_invoker.go
+++ b/cluster/cluster_impl/failover_cluster_invoker.go
@@ -59,9 +59,9 @@ func (invoker *failoverClusterInvoker) Invoke(ctx context.Context, invocation pr
 		return &protocol.RPCResult{Err: err}
 	}
 
-	methodName := invocation.MethodName()
+	methodName := invocation.ActualMethodName()
 	retries := getRetries(invokers, methodName)
-	loadBalance := getLoadBalance(invokers[0], invocation)
+	loadBalance := getLoadBalance(invokers[0], methodName)
 
 	for i := 0; i <= retries; i++ {
 		//Reselect before retry to avoid a change of candidate `invokers`.
diff --git a/cluster/cluster_impl/forking_cluster_invoker.go b/cluster/cluster_impl/forking_cluster_invoker.go
index 6be6b36..2a16239 100644
--- a/cluster/cluster_impl/forking_cluster_invoker.go
+++ b/cluster/cluster_impl/forking_cluster_invoker.go
@@ -61,7 +61,7 @@ func (invoker *forkingClusterInvoker) Invoke(ctx context.Context, invocation pro
 	if forks < 0 || forks > len(invokers) {
 		selected = invokers
 	} else {
-		loadBalance := getLoadBalance(invokers[0], invocation)
+		loadBalance := getLoadBalance(invokers[0], invocation.ActualMethodName())
 		for i := 0; i < forks; i++ {
 			if ivk := invoker.doSelect(loadBalance, invocation, invokers, selected); ivk != nil {
 				selected = append(selected, ivk)
diff --git a/cluster/cluster_impl/zone_aware_cluster_invoker.go b/cluster/cluster_impl/zone_aware_cluster_invoker.go
index 8673d1e..a7523bc 100644
--- a/cluster/cluster_impl/zone_aware_cluster_invoker.go
+++ b/cluster/cluster_impl/zone_aware_cluster_invoker.go
@@ -84,7 +84,7 @@ func (invoker *zoneAwareClusterInvoker) Invoke(ctx context.Context, invocation p
 	}
 
 	// load balance among all registries, with registry weight count in.
-	loadBalance := getLoadBalance(invokers[0], invocation)
+	loadBalance := getLoadBalance(invokers[0], invocation.ActualMethodName())
 	ivk := invoker.doSelect(loadBalance, invocation, invokers, nil)
 	if ivk != nil && ivk.IsAvailable() {
 		return ivk.Invoke(ctx, invocation)
diff --git a/common/constant/default.go b/common/constant/default.go
index 1ea9e1c..b033a12 100644
--- a/common/constant/default.go
+++ b/common/constant/default.go
@@ -61,6 +61,7 @@ const (
 	DEFAULT_REFERENCE_FILTERS = "cshutdown"
 	GENERIC_REFERENCE_FILTERS = "generic"
 	GENERIC                   = "$invoke"
+	GENERIC_ASYNC             = "$invokeAsync"
 	ECHO                      = "$echo"
 )
 
diff --git a/filter/filter_impl/generic_filter.go b/filter/filter_impl/generic_filter.go
index facdfda..cfd4cb2 100644
--- a/filter/filter_impl/generic_filter.go
+++ b/filter/filter_impl/generic_filter.go
@@ -111,13 +111,14 @@ func GetGenericFilter() filter.Filter {
 // isCallingToGenericService check if it calls to a generic service
 func isCallingToGenericService(invoker protocol.Invoker, invocation protocol.Invocation) bool {
 	return isGeneric(invoker.GetURL().GetParam(constant.GENERIC_KEY, "")) &&
-		invocation.MethodName() != constant.GENERIC
+		invocation.MethodName() != constant.GENERIC &&
+		invocation.MethodName() != constant.GENERIC_ASYNC
 }
 
 // isMakingAGenericCall check if it is making a generic call to a generic service
 func isMakingAGenericCall(invoker protocol.Invoker, invocation protocol.Invocation) bool {
 	return isGeneric(invoker.GetURL().GetParam(constant.GENERIC_KEY, "")) &&
-		invocation.MethodName() == constant.GENERIC &&
+		(invocation.MethodName() == constant.GENERIC || invocation.MethodName() == constant.GENERIC_ASYNC) &&
 		invocation.Arguments() != nil &&
 		len(invocation.Arguments()) == 3
 }
diff --git a/protocol/invocation.go b/protocol/invocation.go
index 2ecc817..a2c3e83 100644
--- a/protocol/invocation.go
+++ b/protocol/invocation.go
@@ -25,6 +25,8 @@ import (
 type Invocation interface {
 	// MethodName gets invocation method name.
 	MethodName() string
+	// ActualMethodName gets actual invocation method name. It returns the method name been called if it's a generic call
+	ActualMethodName() string
 	// ParameterTypeNames gets invocation parameter type names.
 	ParameterTypeNames() []string
 	// ParameterTypes gets invocation parameter types.
@@ -48,4 +50,6 @@ type Invocation interface {
 	SetAttachments(key string, value interface{})
 	// Invoker gets the invoker in current context.
 	Invoker() Invoker
+	// IsGenericInvocation checks if this is a generic invocation
+	IsGenericInvocation() bool
 }
diff --git a/protocol/invocation/rpcinvocation.go b/protocol/invocation/rpcinvocation.go
index 8edba62..b62c016 100644
--- a/protocol/invocation/rpcinvocation.go
+++ b/protocol/invocation/rpcinvocation.go
@@ -79,6 +79,15 @@ func (r *RPCInvocation) MethodName() string {
 	return r.methodName
 }
 
+// ActualMethodName gets actual invocation method name. It returns the method name been called if it's a generic call
+func (r *RPCInvocation) ActualMethodName() string {
+	if r.IsGenericInvocation() {
+		return r.Arguments()[0].(string)
+	} else {
+		return r.MethodName()
+	}
+}
+
 // ParameterTypes gets RPC invocation parameter types.
 func (r *RPCInvocation) ParameterTypes() []reflect.Type {
 	return r.parameterTypes
@@ -282,3 +291,11 @@ func WithInvoker(invoker protocol.Invoker) option {
 		invo.invoker = invoker
 	}
 }
+
+// IsGenericInvocation checks if this is a generic invocation
+func (r *RPCInvocation) IsGenericInvocation() bool {
+	return (r.MethodName() == constant.GENERIC ||
+		r.MethodName() == constant.GENERIC_ASYNC) &&
+		r.Arguments() != nil &&
+		len(r.Arguments()) == 3
+}