You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dubbo.apache.org by la...@apache.org on 2021/12/21 06:15:09 UTC
[dubbo-go] branch 1.5 updated: support $invokeAsync (#1682)
This is an automated email from the ASF dual-hosted git repository.
laurence pushed a commit to branch 1.5
in repository https://gitbox.apache.org/repos/asf/dubbo-go.git
The following commit(s) were added to refs/heads/1.5 by this push:
new ee7f479 support $invokeAsync (#1682)
ee7f479 is described below
commit ee7f479c5ccf366cc3c122d901476dbe7bc17aad
Author: 刘月财 <38...@users.noreply.github.com>
AuthorDate: Tue Dec 21 14:02:46 2021 +0800
support $invokeAsync (#1682)
* support $invokeAsync
* edit code annotation
---
cluster/cluster_impl/base_cluster_invoker.go | 3 +--
cluster/cluster_impl/failfast_cluster_invoker.go | 2 +-
cluster/cluster_impl/failover_cluster_invoker.go | 4 ++--
cluster/cluster_impl/forking_cluster_invoker.go | 2 +-
cluster/cluster_impl/zone_aware_cluster_invoker.go | 2 +-
common/constant/default.go | 1 +
filter/filter_impl/generic_filter.go | 5 +++--
protocol/invocation.go | 4 ++++
protocol/invocation/rpcinvocation.go | 17 +++++++++++++++++
9 files changed, 31 insertions(+), 9 deletions(-)
diff --git a/cluster/cluster_impl/base_cluster_invoker.go b/cluster/cluster_impl/base_cluster_invoker.go
index 27ba9b0..24d11ce 100644
--- a/cluster/cluster_impl/base_cluster_invoker.go
+++ b/cluster/cluster_impl/base_cluster_invoker.go
@@ -177,10 +177,9 @@ func isInvoked(selectedInvoker protocol.Invoker, invoked []protocol.Invoker) boo
return false
}
-func getLoadBalance(invoker protocol.Invoker, invocation protocol.Invocation) cluster.LoadBalance {
+func getLoadBalance(invoker protocol.Invoker, methodName string) cluster.LoadBalance {
url := invoker.GetURL()
- methodName := invocation.MethodName()
//Get the service loadbalance config
lb := url.GetParam(constant.LOADBALANCE_KEY, constant.DEFAULT_LOADBALANCE)
diff --git a/cluster/cluster_impl/failfast_cluster_invoker.go b/cluster/cluster_impl/failfast_cluster_invoker.go
index d71ef5f..018ec19 100644
--- a/cluster/cluster_impl/failfast_cluster_invoker.go
+++ b/cluster/cluster_impl/failfast_cluster_invoker.go
@@ -43,7 +43,7 @@ func (invoker *failfastClusterInvoker) Invoke(ctx context.Context, invocation pr
return &protocol.RPCResult{Err: err}
}
- loadbalance := getLoadBalance(invokers[0], invocation)
+ loadbalance := getLoadBalance(invokers[0], invocation.ActualMethodName())
err = invoker.checkWhetherDestroyed()
if err != nil {
diff --git a/cluster/cluster_impl/failover_cluster_invoker.go b/cluster/cluster_impl/failover_cluster_invoker.go
index 1740bce..9e3503e 100644
--- a/cluster/cluster_impl/failover_cluster_invoker.go
+++ b/cluster/cluster_impl/failover_cluster_invoker.go
@@ -59,9 +59,9 @@ func (invoker *failoverClusterInvoker) Invoke(ctx context.Context, invocation pr
return &protocol.RPCResult{Err: err}
}
- methodName := invocation.MethodName()
+ methodName := invocation.ActualMethodName()
retries := getRetries(invokers, methodName)
- loadBalance := getLoadBalance(invokers[0], invocation)
+ loadBalance := getLoadBalance(invokers[0], methodName)
for i := 0; i <= retries; i++ {
//Reselect before retry to avoid a change of candidate `invokers`.
diff --git a/cluster/cluster_impl/forking_cluster_invoker.go b/cluster/cluster_impl/forking_cluster_invoker.go
index 6be6b36..2a16239 100644
--- a/cluster/cluster_impl/forking_cluster_invoker.go
+++ b/cluster/cluster_impl/forking_cluster_invoker.go
@@ -61,7 +61,7 @@ func (invoker *forkingClusterInvoker) Invoke(ctx context.Context, invocation pro
if forks < 0 || forks > len(invokers) {
selected = invokers
} else {
- loadBalance := getLoadBalance(invokers[0], invocation)
+ loadBalance := getLoadBalance(invokers[0], invocation.ActualMethodName())
for i := 0; i < forks; i++ {
if ivk := invoker.doSelect(loadBalance, invocation, invokers, selected); ivk != nil {
selected = append(selected, ivk)
diff --git a/cluster/cluster_impl/zone_aware_cluster_invoker.go b/cluster/cluster_impl/zone_aware_cluster_invoker.go
index 8673d1e..a7523bc 100644
--- a/cluster/cluster_impl/zone_aware_cluster_invoker.go
+++ b/cluster/cluster_impl/zone_aware_cluster_invoker.go
@@ -84,7 +84,7 @@ func (invoker *zoneAwareClusterInvoker) Invoke(ctx context.Context, invocation p
}
// load balance among all registries, with registry weight count in.
- loadBalance := getLoadBalance(invokers[0], invocation)
+ loadBalance := getLoadBalance(invokers[0], invocation.ActualMethodName())
ivk := invoker.doSelect(loadBalance, invocation, invokers, nil)
if ivk != nil && ivk.IsAvailable() {
return ivk.Invoke(ctx, invocation)
diff --git a/common/constant/default.go b/common/constant/default.go
index 1ea9e1c..b033a12 100644
--- a/common/constant/default.go
+++ b/common/constant/default.go
@@ -61,6 +61,7 @@ const (
DEFAULT_REFERENCE_FILTERS = "cshutdown"
GENERIC_REFERENCE_FILTERS = "generic"
GENERIC = "$invoke"
+ GENERIC_ASYNC = "$invokeAsync"
ECHO = "$echo"
)
diff --git a/filter/filter_impl/generic_filter.go b/filter/filter_impl/generic_filter.go
index facdfda..cfd4cb2 100644
--- a/filter/filter_impl/generic_filter.go
+++ b/filter/filter_impl/generic_filter.go
@@ -111,13 +111,14 @@ func GetGenericFilter() filter.Filter {
// isCallingToGenericService check if it calls to a generic service
func isCallingToGenericService(invoker protocol.Invoker, invocation protocol.Invocation) bool {
return isGeneric(invoker.GetURL().GetParam(constant.GENERIC_KEY, "")) &&
- invocation.MethodName() != constant.GENERIC
+ invocation.MethodName() != constant.GENERIC &&
+ invocation.MethodName() != constant.GENERIC_ASYNC
}
// isMakingAGenericCall check if it is making a generic call to a generic service
func isMakingAGenericCall(invoker protocol.Invoker, invocation protocol.Invocation) bool {
return isGeneric(invoker.GetURL().GetParam(constant.GENERIC_KEY, "")) &&
- invocation.MethodName() == constant.GENERIC &&
+ (invocation.MethodName() == constant.GENERIC || invocation.MethodName() == constant.GENERIC_ASYNC) &&
invocation.Arguments() != nil &&
len(invocation.Arguments()) == 3
}
diff --git a/protocol/invocation.go b/protocol/invocation.go
index 2ecc817..a2c3e83 100644
--- a/protocol/invocation.go
+++ b/protocol/invocation.go
@@ -25,6 +25,8 @@ import (
type Invocation interface {
// MethodName gets invocation method name.
MethodName() string
+ // ActualMethodName gets actual invocation method name. It returns the method name been called if it's a generic call
+ ActualMethodName() string
// ParameterTypeNames gets invocation parameter type names.
ParameterTypeNames() []string
// ParameterTypes gets invocation parameter types.
@@ -48,4 +50,6 @@ type Invocation interface {
SetAttachments(key string, value interface{})
// Invoker gets the invoker in current context.
Invoker() Invoker
+ // IsGenericInvocation checks if this is a generic invocation
+ IsGenericInvocation() bool
}
diff --git a/protocol/invocation/rpcinvocation.go b/protocol/invocation/rpcinvocation.go
index 8edba62..b62c016 100644
--- a/protocol/invocation/rpcinvocation.go
+++ b/protocol/invocation/rpcinvocation.go
@@ -79,6 +79,15 @@ func (r *RPCInvocation) MethodName() string {
return r.methodName
}
+// ActualMethodName gets actual invocation method name. It returns the method name been called if it's a generic call
+func (r *RPCInvocation) ActualMethodName() string {
+ if r.IsGenericInvocation() {
+ return r.Arguments()[0].(string)
+ } else {
+ return r.MethodName()
+ }
+}
+
// ParameterTypes gets RPC invocation parameter types.
func (r *RPCInvocation) ParameterTypes() []reflect.Type {
return r.parameterTypes
@@ -282,3 +291,11 @@ func WithInvoker(invoker protocol.Invoker) option {
invo.invoker = invoker
}
}
+
+// IsGenericInvocation checks if this is a generic invocation
+func (r *RPCInvocation) IsGenericInvocation() bool {
+ return (r.MethodName() == constant.GENERIC ||
+ r.MethodName() == constant.GENERIC_ASYNC) &&
+ r.Arguments() != nil &&
+ len(r.Arguments()) == 3
+}