You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@dubbo.apache.org by GitBox <gi...@apache.org> on 2020/05/14 14:49:20 UTC

[GitHub] [dubbo-go] pantianying commented on a change in pull request #495: 重构网络模块

pantianying commented on a change in pull request #495:
URL: https://github.com/apache/dubbo-go/pull/495#discussion_r425187763



##########
File path: remoting/codec.go
##########
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package remoting
+
+import (
+	"bytes"
+)
+
+// codec for exchangeClient
+type Codec interface {
+	EncodeRequest(request *Request) (*bytes.Buffer, error)
+	EncodeResponse(response *Response) (*bytes.Buffer, error)
+	Decode(data []byte) (DecodeResult, int, error)
+}
+
+type DecodeResult struct {
+	IsRequest bool
+	Result    interface{}
+}
+
+var (
+	codec map[string]Codec
+)
+
+func init() {

Review comment:
       No need to initialize in init
   example:
   var codec=make( map[string]Codec)
   

##########
File path: remoting/codec.go
##########
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package remoting
+
+import (
+	"bytes"
+)
+
+// codec for exchangeClient
+type Codec interface {
+	EncodeRequest(request *Request) (*bytes.Buffer, error)
+	EncodeResponse(response *Response) (*bytes.Buffer, error)
+	Decode(data []byte) (DecodeResult, int, error)
+}
+
+type DecodeResult struct {
+	IsRequest bool
+	Result    interface{}
+}
+
+var (
+	codec map[string]Codec
+)
+
+func init() {
+	codec = make(map[string]Codec, 2)
+}
+
+func NewCodec(protocol string, codecTmp Codec) {

Review comment:
       maybe It would be better to replace new with register?

##########
File path: remoting/exchange_client.go
##########
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package remoting
+
+import (
+	"sync"
+	"time"
+
+	"github.com/apache/dubbo-go/common"
+	"github.com/apache/dubbo-go/common/logger"
+	"github.com/apache/dubbo-go/protocol"
+)
+
+var (
+	// store requestID and response
+	pendingResponses *sync.Map = new(sync.Map)
+)
+
+type SequenceType int64
+
+// It is interface of client for network communication.
+// If you use getty as network communication, you should define GettyClient that implements this interface.
+type Client interface {
+	SetExchangeClient(client *ExchangeClient)
+	// responseHandler is used to deal with msg
+	SetResponseHandler(responseHandler ResponseHandler)
+	// connect url
+	Connect(url common.URL) error
+	// close
+	Close()
+	// send request to server.
+	Request(request *Request, timeout time.Duration, response *PendingResponse) error
+}
+
+// This is abstraction level. it is like facade.
+type ExchangeClient struct {
+	ConnectTimeout time.Duration
+	address        string
+	client         Client
+}
+
+// handle the message from server
+type ResponseHandler interface {
+	Handler(response *Response)
+}
+
+// create ExchangeClient
+func NewExchangeClient(url common.URL, client Client, connectTimeout time.Duration) *ExchangeClient {
+	exchangeClient := &ExchangeClient{
+		ConnectTimeout: connectTimeout,
+		address:        url.Location,
+		client:         client,
+	}
+	client.SetExchangeClient(exchangeClient)
+	if client.Connect(url) != nil {
+		//retry for a while
+		time.Sleep(1 * time.Second)
+		if client.Connect(url) != nil {
+			return nil
+		}
+	}
+	client.SetResponseHandler(exchangeClient)
+	return exchangeClient
+}
+
+// two way request
+func (client *ExchangeClient) Request(invocation *protocol.Invocation, url common.URL, timeout time.Duration,
+	result *protocol.RPCResult) error {
+	request := NewRequest("2.0.2")
+	request.Data = invocation
+	request.Event = false
+	request.TwoWay = true
+
+	rsp := NewPendingResponse(request.ID)
+	rsp.response = NewResponse(request.ID, "2.0.2")
+	rsp.Reply = (*invocation).Reply()
+	AddPendingResponse(rsp)
+
+	err := client.client.Request(request, timeout, rsp)
+	if err != nil {
+		result.Err = err
+		return err
+	}
+	result.Rest = rsp.response.Result
+	return nil
+}
+
+// async two way request
+func (client *ExchangeClient) AsyncRequest(invocation *protocol.Invocation, url common.URL, timeout time.Duration,
+	callback common.AsyncCallback, result *protocol.RPCResult) error {
+	request := NewRequest("2.0.2")
+	request.Data = invocation
+	request.Event = false
+	request.TwoWay = true
+
+	rsp := NewPendingResponse(request.ID)
+	rsp.response = NewResponse(request.ID, "2.0.2")
+	rsp.Callback = callback
+	rsp.Reply = (*invocation).Reply()
+	AddPendingResponse(rsp)
+
+	err := client.client.Request(request, timeout, rsp)
+	if err != nil {
+		result.Err = err
+		return err
+	}
+	result.Rest = rsp.response
+	return nil
+}
+
+// oneway request
+func (client *ExchangeClient) Send(invocation *protocol.Invocation, timeout time.Duration) error {
+	request := NewRequest("2.0.2")
+	request.Data = invocation
+	request.Event = false
+	request.TwoWay = false
+
+	rsp := NewPendingResponse(request.ID)
+	rsp.response = NewResponse(request.ID, "2.0.2")
+
+	err := client.client.Request(request, timeout, rsp)
+	if err != nil {
+		return err
+	}
+	return nil
+}
+
+// close client
+func (client *ExchangeClient) Close() {
+	client.client.Close()
+}
+
+// handle the response from server
+func (client *ExchangeClient) Handler(response *Response) {
+
+	pendingResponse := removePendingResponse(SequenceType(response.ID))
+	if pendingResponse == nil {
+		logger.Errorf("failed to get pending response context for response package %s", *response)
+		return
+	}
+
+	pendingResponse.response = response
+
+	if pendingResponse.Callback == nil {
+		pendingResponse.Err = pendingResponse.response.Error
+		pendingResponse.Done <- struct{}{}

Review comment:
       use close(pendingResponse.Done) 

##########
File path: remoting/getty/getty_client.go
##########
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package getty
+
+import (
+	"math/rand"
+	"time"
+
+	"github.com/apache/dubbo-go/remoting"
+	"github.com/dubbogo/getty"
+	"gopkg.in/yaml.v2"
+
+	gxsync "github.com/dubbogo/gost/sync"
+
+	"github.com/apache/dubbo-go/common"
+	"github.com/apache/dubbo-go/common/logger"
+	"github.com/apache/dubbo-go/config"
+	perrors "github.com/pkg/errors"
+)
+
+var (
+	errInvalidCodecType  = perrors.New("illegal CodecType")
+	errInvalidAddress    = perrors.New("remote address invalid or empty")
+	errSessionNotExist   = perrors.New("session not exist")
+	errClientClosed      = perrors.New("client closed")
+	errClientReadTimeout = perrors.New("client read timeout")
+
+	clientConf   *ClientConfig
+	clientGrpool *gxsync.TaskPool
+)
+
+// it is init client for single protocol.
+func initClient(protocol string) {
+	if protocol == "" {
+		return
+	}
+
+	// load clientconfig from consumer_config
+	// default use dubbo
+	consumerConfig := config.GetConsumerConfig()
+	if consumerConfig.ApplicationConfig == nil {
+		return
+	}
+	protocolConf := config.GetConsumerConfig().ProtocolConf
+	defaultClientConfig := GetDefaultClientConfig()
+	if protocolConf == nil {
+		logger.Info("protocol_conf default use dubbo config")
+	} else {
+		dubboConf := protocolConf.(map[interface{}]interface{})[protocol]
+		if dubboConf == nil {
+			logger.Warnf("dubboConf is nil")
+			return
+		}
+		dubboConfByte, err := yaml.Marshal(dubboConf)
+		if err != nil {
+			panic(err)
+		}
+		err = yaml.Unmarshal(dubboConfByte, &defaultClientConfig)
+		if err != nil {
+			panic(err)
+		}
+	}
+	clientConf = &defaultClientConfig
+	if err := clientConf.CheckValidity(); err != nil {
+		logger.Warnf("[CheckValidity] error: %v", err)
+		return
+	}
+	setClientGrpool()
+
+	rand.Seed(time.Now().UnixNano())
+}
+
+// SetClientConf:  config ClientConf
+func SetClientConf(c ClientConfig) {
+	clientConf = &c
+	err := clientConf.CheckValidity()
+	if err != nil {
+		logger.Warnf("[ClientConfig CheckValidity] error: %v", err)
+		return
+	}
+	setClientGrpool()
+}
+
+func setClientGrpool() {
+	if clientConf.GrPoolSize > 1 {
+		clientGrpool = gxsync.NewTaskPool(gxsync.WithTaskPoolTaskPoolSize(clientConf.GrPoolSize), gxsync.WithTaskPoolTaskQueueLength(clientConf.QueueLen),
+			gxsync.WithTaskPoolTaskQueueNumber(clientConf.QueueNumber))
+	}
+}
+
+// Options : param config
+type Options struct {
+	// connect timeout
+	// remove request timeout, it will be calulate for every request
+	ConnectTimeout time.Duration
+}
+
+// Client : some configuration for network communication.
+type Client struct {
+	addr            string
+	opts            Options
+	conf            ClientConfig
+	pool            *gettyRPCClientPool
+	codec           remoting.Codec
+	responseHandler remoting.ResponseHandler
+	ExchangeClient  *remoting.ExchangeClient
+}
+
+// create client
+func NewClient(opt Options) *Client {
+	switch {
+	case opt.ConnectTimeout == 0:
+		opt.ConnectTimeout = 3 * time.Second
+	}
+
+	c := &Client{
+		opts: opt,
+	}
+	return c
+}
+
+func (c *Client) SetExchangeClient(client *remoting.ExchangeClient) {
+	c.ExchangeClient = client
+}
+func (c *Client) SetResponseHandler(responseHandler remoting.ResponseHandler) {
+	c.responseHandler = responseHandler
+}
+
+// init client and try to connection.
+func (c *Client) Connect(url common.URL) error {
+	initClient(url.Protocol)
+	c.conf = *clientConf
+	// new client
+	c.pool = newGettyRPCClientConnPool(c, clientConf.PoolSize, time.Duration(int(time.Second)*clientConf.PoolTTL))
+	// codec
+	c.codec = remoting.GetCodec(url.Protocol)
+	c.addr = url.Location
+	_, _, err := c.selectSession(c.addr)
+	logger.Errorf("try to connect server %v failed for : %v", url.Location, err)
+	return err
+}
+
+// close network connection
+func (c *Client) Close() {
+	if c.pool != nil {
+		c.pool.close()
+	}
+	c.pool = nil
+}
+
+// send request
+func (c *Client) Request(request *remoting.Request, timeout time.Duration, response *remoting.PendingResponse) error {
+
+	var (
+		err     error
+		session getty.Session
+		conn    *gettyRPCClient
+	)
+	conn, session, err = c.selectSession(c.addr)
+	if err != nil {
+		return perrors.WithStack(err)
+	}
+	if session == nil {
+		return errSessionNotExist
+	}
+	defer func() {

Review comment:
       这样的写法有点奇怪,这里没有必要用defer吧。。

##########
File path: remoting/exchange_client.go
##########
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package remoting
+
+import (
+	"sync"
+	"time"
+
+	"github.com/apache/dubbo-go/common"
+	"github.com/apache/dubbo-go/common/logger"
+	"github.com/apache/dubbo-go/protocol"
+)
+
+var (
+	// store requestID and response
+	pendingResponses *sync.Map = new(sync.Map)
+)
+
+type SequenceType int64
+
+// It is interface of client for network communication.
+// If you use getty as network communication, you should define GettyClient that implements this interface.
+type Client interface {
+	SetExchangeClient(client *ExchangeClient)
+	// responseHandler is used to deal with msg
+	SetResponseHandler(responseHandler ResponseHandler)
+	// connect url
+	Connect(url common.URL) error
+	// close
+	Close()
+	// send request to server.
+	Request(request *Request, timeout time.Duration, response *PendingResponse) error
+}
+
+// This is abstraction level. it is like facade.
+type ExchangeClient struct {
+	ConnectTimeout time.Duration
+	address        string
+	client         Client
+}
+
+// handle the message from server
+type ResponseHandler interface {
+	Handler(response *Response)
+}
+
+// create ExchangeClient
+func NewExchangeClient(url common.URL, client Client, connectTimeout time.Duration) *ExchangeClient {
+	exchangeClient := &ExchangeClient{
+		ConnectTimeout: connectTimeout,
+		address:        url.Location,
+		client:         client,
+	}
+	client.SetExchangeClient(exchangeClient)
+	if client.Connect(url) != nil {
+		//retry for a while
+		time.Sleep(1 * time.Second)

Review comment:
       If it's necessary to try again, there's no need to wait for a while




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@dubbo.apache.org
For additional commands, e-mail: notifications-help@dubbo.apache.org