You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@dubbo.apache.org by GitBox <gi...@apache.org> on 2020/05/21 02:05:15 UTC

[GitHub] [dubbo-go] pantianying commented on a change in pull request #495: Imp: refactor the network transport layer

pantianying commented on a change in pull request #495:
URL: https://github.com/apache/dubbo-go/pull/495#discussion_r428405517



##########
File path: remoting/exchange_client.go
##########
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package remoting
+
+import (
+	"sync"
+	"time"
+
+	"github.com/apache/dubbo-go/common"
+	"github.com/apache/dubbo-go/common/logger"
+	"github.com/apache/dubbo-go/protocol"
+)
+
+var (
+	// store requestID and response
+	pendingResponses *sync.Map = new(sync.Map)
+)
+
+type SequenceType int64
+
+// It is interface of client for network communication.
+// If you use getty as network communication, you should define GettyClient that implements this interface.
+type Client interface {
+	SetExchangeClient(client *ExchangeClient)
+	// responseHandler is used to deal with msg
+	SetResponseHandler(responseHandler ResponseHandler)
+	// connect url
+	Connect(url common.URL) error
+	// close
+	Close()
+	// send request to server.
+	Request(request *Request, timeout time.Duration, response *PendingResponse) error
+}
+
+// This is abstraction level. it is like facade.
+type ExchangeClient struct {
+	ConnectTimeout time.Duration
+	address        string
+	client         Client
+}
+
+// handle the message from server
+type ResponseHandler interface {
+	Handler(response *Response)
+}
+
+// create ExchangeClient
+func NewExchangeClient(url common.URL, client Client, connectTimeout time.Duration) *ExchangeClient {
+	exchangeClient := &ExchangeClient{
+		ConnectTimeout: connectTimeout,
+		address:        url.Location,
+		client:         client,
+	}
+	client.SetExchangeClient(exchangeClient)
+	if client.Connect(url) != nil {
+		//retry for a while
+		time.Sleep(1 * time.Second)
+		if client.Connect(url) != nil {
+			return nil
+		}
+	}
+	client.SetResponseHandler(exchangeClient)
+	return exchangeClient
+}
+
+// two way request
+func (client *ExchangeClient) Request(invocation *protocol.Invocation, url common.URL, timeout time.Duration,
+	result *protocol.RPCResult) error {
+	request := NewRequest("2.0.2")
+	request.Data = invocation
+	request.Event = false
+	request.TwoWay = true
+
+	rsp := NewPendingResponse(request.ID)
+	rsp.response = NewResponse(request.ID, "2.0.2")
+	rsp.Reply = (*invocation).Reply()
+	AddPendingResponse(rsp)
+
+	err := client.client.Request(request, timeout, rsp)
+	if err != nil {
+		result.Err = err
+		return err
+	}
+	result.Rest = rsp.response.Result
+	return nil
+}
+
+// async two way request
+func (client *ExchangeClient) AsyncRequest(invocation *protocol.Invocation, url common.URL, timeout time.Duration,
+	callback common.AsyncCallback, result *protocol.RPCResult) error {
+	request := NewRequest("2.0.2")
+	request.Data = invocation
+	request.Event = false
+	request.TwoWay = true
+
+	rsp := NewPendingResponse(request.ID)
+	rsp.response = NewResponse(request.ID, "2.0.2")
+	rsp.Callback = callback
+	rsp.Reply = (*invocation).Reply()
+	AddPendingResponse(rsp)
+
+	err := client.client.Request(request, timeout, rsp)
+	if err != nil {
+		result.Err = err
+		return err
+	}
+	result.Rest = rsp.response
+	return nil
+}
+
+// oneway request
+func (client *ExchangeClient) Send(invocation *protocol.Invocation, timeout time.Duration) error {
+	request := NewRequest("2.0.2")
+	request.Data = invocation
+	request.Event = false
+	request.TwoWay = false
+
+	rsp := NewPendingResponse(request.ID)
+	rsp.response = NewResponse(request.ID, "2.0.2")
+
+	err := client.client.Request(request, timeout, rsp)
+	if err != nil {
+		return err
+	}
+	return nil
+}
+
+// close client
+func (client *ExchangeClient) Close() {
+	client.client.Close()
+}
+
+// handle the response from server
+func (client *ExchangeClient) Handler(response *Response) {
+
+	pendingResponse := removePendingResponse(SequenceType(response.ID))
+	if pendingResponse == nil {
+		logger.Errorf("failed to get pending response context for response package %s", *response)
+		return
+	}
+
+	pendingResponse.response = response
+
+	if pendingResponse.Callback == nil {
+		pendingResponse.Err = pendingResponse.response.Error
+		pendingResponse.Done <- struct{}{}

Review comment:
       一般chan 作为一次性关闭使用都是close,可以让全部协程<-Done 都触发。只发送一个信号,如果该Done在多个协程使用了,只能关闭一个协程。




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@dubbo.apache.org
For additional commands, e-mail: notifications-help@dubbo.apache.org