You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@dubbo.apache.org by GitBox <gi...@apache.org> on 2020/05/28 08:41:58 UTC

[GitHub] [dubbo-go] flycash commented on a change in pull request #495: Imp: refactor the network transport layer

flycash commented on a change in pull request #495:
URL: https://github.com/apache/dubbo-go/pull/495#discussion_r431671775



##########
File path: protocol/dubbo/dubbo_protocol.go
##########
@@ -131,3 +141,74 @@ func GetProtocol() protocol.Protocol {
 	}
 	return dubboProtocol
 }
+
+func doHandleRequest(rpcInvocation *invocation.RPCInvocation) protocol.RPCResult {
+	exporter, _ := dubboProtocol.ExporterMap().Load(rpcInvocation.ServiceKey())
+	result := protocol.RPCResult{}
+	if exporter == nil {
+		err := fmt.Errorf("don't have this exporter, key: %s", rpcInvocation.ServiceKey())
+		logger.Errorf(err.Error())
+		result.Err = err
+		//reply(session, p, hessian.PackageResponse)
+		return result
+	}
+	invoker := exporter.(protocol.Exporter).GetInvoker()
+	if invoker != nil {
+		// FIXME
+		ctx := rebuildCtx(rpcInvocation)
+
+		invokeResult := invoker.Invoke(ctx, rpcInvocation)
+		if err := invokeResult.Error(); err != nil {
+			result.Err = invokeResult.Error()
+			//p.Header.ResponseStatus = hessian.Response_OK
+			//p.Body = hessian.NewResponse(nil, err, result.Attachments())
+		} else {
+			result.Rest = invokeResult.Result()
+			//p.Header.ResponseStatus = hessian.Response_OK
+			//p.Body = hessian.NewResponse(res, nil, result.Attachments())
+		}
+	} else {
+		result.Err = fmt.Errorf("don't have the invoker, key: %s", rpcInvocation.ServiceKey())
+	}
+	return result
+}
+
+func getExchangeClient(url common.URL) *remoting.ExchangeClient {
+	clientTmp, ok := exchangeClientMap.Load(url.Location)
+	if !ok {
+		exchangeClientTmp := remoting.NewExchangeClient(url, getty.NewClient(getty.Options{
+			ConnectTimeout: config.GetConsumerConfig().ConnectTimeout,
+		}), config.GetConsumerConfig().ConnectTimeout, false)
+		if exchangeClientTmp != nil {
+			exchangeClientMap.Store(url.Location, exchangeClientTmp)
+		}
+
+		return exchangeClientTmp
+	}
+	exchangeClient, ok := clientTmp.(*remoting.ExchangeClient)
+	if !ok {

Review comment:
       什么情况下会执行到!ok里面?我有点没看懂……

##########
File path: protocol/jsonrpc/jsonrpc_invoker.go
##########
@@ -54,7 +53,7 @@ func (ji *JsonrpcInvoker) Invoke(ctx context.Context, invocation protocol.Invoca
 	url := ji.GetUrl()
 	req := ji.client.NewRequest(url, inv.MethodName(), inv.Arguments())
 	ctxNew := context.WithValue(ctx, constant.DUBBOGO_CTX_KEY, map[string]string{
-		"X-Proxy-Id": "dubbogo",
+		"X-Proxy-ID": "dubbogo",

Review comment:
       这里修改之后,是否会出现新版本的jsonrpc客户端调过去老版本的jsonrpc服务端失败的问题?因为老版本的可能只能识别"X-Proxy-Id"。

##########
File path: protocol/dubbo/dubbo_protocol.go
##########
@@ -131,3 +141,74 @@ func GetProtocol() protocol.Protocol {
 	}
 	return dubboProtocol
 }
+
+func doHandleRequest(rpcInvocation *invocation.RPCInvocation) protocol.RPCResult {
+	exporter, _ := dubboProtocol.ExporterMap().Load(rpcInvocation.ServiceKey())
+	result := protocol.RPCResult{}
+	if exporter == nil {
+		err := fmt.Errorf("don't have this exporter, key: %s", rpcInvocation.ServiceKey())
+		logger.Errorf(err.Error())
+		result.Err = err
+		//reply(session, p, hessian.PackageResponse)
+		return result
+	}
+	invoker := exporter.(protocol.Exporter).GetInvoker()
+	if invoker != nil {
+		// FIXME
+		ctx := rebuildCtx(rpcInvocation)
+
+		invokeResult := invoker.Invoke(ctx, rpcInvocation)
+		if err := invokeResult.Error(); err != nil {
+			result.Err = invokeResult.Error()
+			//p.Header.ResponseStatus = hessian.Response_OK
+			//p.Body = hessian.NewResponse(nil, err, result.Attachments())
+		} else {
+			result.Rest = invokeResult.Result()
+			//p.Header.ResponseStatus = hessian.Response_OK
+			//p.Body = hessian.NewResponse(res, nil, result.Attachments())
+		}
+	} else {
+		result.Err = fmt.Errorf("don't have the invoker, key: %s", rpcInvocation.ServiceKey())
+	}
+	return result
+}
+
+func getExchangeClient(url common.URL) *remoting.ExchangeClient {
+	clientTmp, ok := exchangeClientMap.Load(url.Location)
+	if !ok {

Review comment:
       这一步实际上存在并发隐患。如果多线程进来,同时发现没有,就会同时进来这里,于是创建多个,而后者覆盖前者`Store`调用的结果。修改的建议是使用double-check在发现!ok之后加锁再尝试取一次,取不到则创建。

##########
File path: protocol/dubbo/dubbo_protocol.go
##########
@@ -75,18 +84,11 @@ func (dp *DubboProtocol) Export(invoker protocol.Invoker) protocol.Exporter {
 
 // Refer ...
 func (dp *DubboProtocol) Refer(url common.URL) protocol.Invoker {
-	//default requestTimeout
-	var requestTimeout = config.GetConsumerConfig().RequestTimeout
-
-	requestTimeoutStr := url.GetParam(constant.TIMEOUT_KEY, config.GetConsumerConfig().Request_Timeout)
-	if t, err := time.ParseDuration(requestTimeoutStr); err == nil {
-		requestTimeout = t
+	exchangeClient := getExchangeClient(url)
+	if exchangeClient == nil {
+		return nil

Review comment:
       加一句日志会更好一点。不过我们后面应该考虑一下要不要修改这个返回值,支持返回error。

##########
File path: remoting/exchange.go
##########
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package remoting
+
+import (
+	"time"
+)
+
+import (
+	"go.uber.org/atomic"
+)
+
+import (
+	"github.com/apache/dubbo-go/common"
+)
+
+var (
+	// generate request ID for global use
+	sequence atomic.Uint64
+)
+
+func init() {
+	// init request ID
+	sequence.Store(0)
+}
+
+func SequenceId() uint64 {
+	// increse 2 for every request.
+	return sequence.Add(2)
+}
+
+// Request ...
+type Request struct {
+	ID int64
+	// protocol version
+	Version string
+	// serial ID (ignore)
+	SerialID byte
+	// Data
+	Data   interface{}
+	TwoWay bool
+	Event  bool
+	// it is used to judge the request is unbroken
+	// broken bool
+}
+
+// NewRequest
+func NewRequest(version string) *Request {
+	return &Request{
+		ID:      int64(SequenceId()),

Review comment:
       这里看来,为什么sequenceId不直接生成64位?

##########
File path: remoting/exchange.go
##########
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package remoting
+
+import (
+	"time"
+)
+
+import (
+	"go.uber.org/atomic"
+)
+
+import (
+	"github.com/apache/dubbo-go/common"
+)
+
+var (
+	// generate request ID for global use
+	sequence atomic.Uint64
+)
+
+func init() {
+	// init request ID
+	sequence.Store(0)
+}
+
+func SequenceId() uint64 {
+	// increse 2 for every request.

Review comment:
       最好把为什么+2而不是+1这个讲清楚。这是一个很tricky的事情=。=




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@dubbo.apache.org
For additional commands, e-mail: notifications-help@dubbo.apache.org