You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dubbo.apache.org by al...@apache.org on 2021/10/22 14:54:17 UTC

[dubbo-go] branch 1.5 updated: Fix: 1526 (#1527)

This is an automated email from the ASF dual-hosted git repository.

alexstocks pushed a commit to branch 1.5
in repository https://gitbox.apache.org/repos/asf/dubbo-go.git


The following commit(s) were added to refs/heads/1.5 by this push:
     new 22ec982  Fix: 1526 (#1527)
22ec982 is described below

commit 22ec982bc6240ff8c5aa97d3d634f4473fa2173a
Author: ChangedenChan <ch...@gmail.com>
AuthorDate: Fri Oct 22 22:54:10 2021 +0800

    Fix: 1526 (#1527)
    
    * 支持配置protocol.payload
    
    * dubbo consumer支持配置protocol.payload
    
    * 格式化代码
    
    * 格式化代码
    
    * rename ProtocolConfig.Ip to ProtocolConfig.IP
    
    * rollback rename ProtocolConfig.Ip
---
 common/constant/default.go       | 13 ++++++++++++-
 common/constant/key.go           |  5 +++++
 config/config_loader.go          |  1 +
 config/consumer_config.go        |  1 +
 config/protocol_config.go        |  7 ++++---
 config/reference_config.go       |  9 +++++++++
 protocol/dubbo/dubbo_codec.go    |  1 +
 protocol/dubbo/dubbo_protocol.go |  3 +++
 protocol/dubbo/impl/codec.go     |  8 ++++++--
 protocol/dubbo/impl/package.go   |  1 +
 remoting/exchange.go             |  7 ++++---
 remoting/getty/getty_client.go   |  2 ++
 12 files changed, 49 insertions(+), 9 deletions(-)

diff --git a/common/constant/default.go b/common/constant/default.go
index a6790f9..1ea9e1c 100644
--- a/common/constant/default.go
+++ b/common/constant/default.go
@@ -17,7 +17,10 @@
 
 package constant
 
-import "time"
+import (
+	"strconv"
+	"time"
+)
 
 const (
 	DUBBO             = "dubbo"
@@ -90,3 +93,11 @@ const (
 const (
 	SERVICE_DISCOVERY_DEFAULT_GROUP = "DEFAULT_GROUP"
 )
+
+const (
+	DefaultProtocolPayload = 8388608
+)
+
+var (
+	DefaultProtocolPayloadStr = strconv.Itoa(DefaultProtocolPayload)
+)
diff --git a/common/constant/key.go b/common/constant/key.go
index bd161ab..d8a66c1 100644
--- a/common/constant/key.go
+++ b/common/constant/key.go
@@ -372,3 +372,8 @@ const (
 	AfterAllServicesListenCompleteFunctionName    = "AfterAllServicesListenComplete"
 	BeforeShutdownFunctionName                    = "BeforeShutdown"
 )
+
+// protocol config
+const (
+	ProtocolPayload = "protocol.payload"
+)
diff --git a/config/config_loader.go b/config/config_loader.go
index d9288f9..741137e 100644
--- a/config/config_loader.go
+++ b/config/config_loader.go
@@ -139,6 +139,7 @@ func loadConsumerConfig() {
 
 	checkRegistries(consumerConfig.Registries, consumerConfig.Registry)
 	for key, ref := range consumerConfig.References {
+		ref.Protocols = consumerConfig.Protocols
 		if ref.Generic {
 			genericService := NewGenericService(key)
 			SetConsumerService(genericService)
diff --git a/config/consumer_config.go b/config/consumer_config.go
index c50c2a7..de05224 100644
--- a/config/consumer_config.go
+++ b/config/consumer_config.go
@@ -59,6 +59,7 @@ type ConsumerConfig struct {
 	Check           *bool  `yaml:"check"  json:"check,omitempty" property:"check"`
 
 	References     map[string]*ReferenceConfig `yaml:"references" json:"references,omitempty" property:"references"`
+	Protocols      map[string]*ProtocolConfig  `yaml:"protocols" json:"protocols,omitempty" property:"protocols"`
 	ProtocolConf   interface{}                 `yaml:"protocol_conf" json:"protocol_conf,omitempty" property:"protocol_conf"`
 	FilterConf     interface{}                 `yaml:"filter_conf" json:"filter_conf,omitempty" property:"filter_conf"`
 	ShutdownConfig *ShutdownConfig             `yaml:"shutdown_conf" json:"shutdown_conf,omitempty" property:"shutdown_conf"`
diff --git a/config/protocol_config.go b/config/protocol_config.go
index cee5b7a..f83bece 100644
--- a/config/protocol_config.go
+++ b/config/protocol_config.go
@@ -27,9 +27,10 @@ import (
 
 // ProtocolConfig is protocol configuration
 type ProtocolConfig struct {
-	Name string `required:"true" yaml:"name"  json:"name,omitempty" property:"name"`
-	Ip   string `required:"true" yaml:"ip"  json:"ip,omitempty" property:"ip"`
-	Port string `required:"true" yaml:"port"  json:"port,omitempty" property:"port"`
+	Name    string `required:"true" yaml:"name"  json:"name,omitempty" property:"name"`
+	Ip      string `required:"true" yaml:"ip"  json:"ip,omitempty" property:"ip"`
+	Port    string `required:"true" yaml:"port"  json:"port,omitempty" property:"port"`
+	Payload int    `yaml:"payload" json:"payload,omitempty" property:"payload"`
 }
 
 // nolint
diff --git a/config/reference_config.go b/config/reference_config.go
index 97ac688..48198cf 100644
--- a/config/reference_config.go
+++ b/config/reference_config.go
@@ -66,6 +66,7 @@ type ReferenceConfig struct {
 	Sticky         bool   `yaml:"sticky"   json:"sticky,omitempty" property:"sticky"`
 	RequestTimeout string `yaml:"timeout"  json:"timeout,omitempty" property:"timeout"`
 	ForceTag       bool   `yaml:"force.tag"  json:"force.tag,omitempty" property:"force.tag"`
+	Protocols      map[string]*ProtocolConfig
 }
 
 // nolint
@@ -103,6 +104,14 @@ func (c *ReferenceConfig) Refer(_ interface{}) {
 	}
 	c.loadProcessConfig(cfgURL, constant.HookEventBeforeReferenceConnect, nil)
 	c.postProcessConfig(cfgURL)
+	protocolConfigs := loadProtocol(c.Protocol, c.Protocols)
+	payload := constant.DefaultProtocolPayload
+	if len(protocolConfigs) > 0 {
+		if pl := protocolConfigs[0].Payload; pl > 0 {
+			payload = pl
+		}
+	}
+	cfgURL.AddParam(constant.ProtocolPayload, strconv.Itoa(payload))
 	if c.URL != "" {
 		// 1. user specified URL, could be peer-to-peer address, or register center's address.
 		urlStrings := gxstrings.RegSplit(c.URL, "\\s*[;]+\\s*")
diff --git a/protocol/dubbo/dubbo_codec.go b/protocol/dubbo/dubbo_codec.go
index 21376c3..0353424 100644
--- a/protocol/dubbo/dubbo_codec.go
+++ b/protocol/dubbo/dubbo_codec.go
@@ -96,6 +96,7 @@ func (c *DubboCodec) EncodeRequest(request *remoting.Request) (*bytes.Buffer, er
 		Body:    impl.NewRequestPayload(invocation.Arguments(), invocation.Attachments()),
 		Err:     nil,
 		Codec:   impl.NewDubboCodec(nil),
+		Payload: request.Payload,
 	}
 
 	if err := impl.LoadSerializer(pkg); err != nil {
diff --git a/protocol/dubbo/dubbo_protocol.go b/protocol/dubbo/dubbo_protocol.go
index 94afbb1..585c003 100644
--- a/protocol/dubbo/dubbo_protocol.go
+++ b/protocol/dubbo/dubbo_protocol.go
@@ -20,6 +20,7 @@ package dubbo
 import (
 	"context"
 	"fmt"
+	"strconv"
 	"sync"
 	"time"
 )
@@ -198,9 +199,11 @@ func getExchangeClient(url *common.URL) *remoting.ExchangeClient {
 				return
 			}
 			// new ExchangeClient
+			payload, _ := strconv.Atoi(url.GetParam(constant.ProtocolPayload, constant.DefaultProtocolPayloadStr))
 			exchangeClientTmp = remoting.NewExchangeClient(url, getty.NewClient(getty.Options{
 				ConnectTimeout: config.GetConsumerConfig().ConnectTimeout,
 				RequestTimeout: config.GetConsumerConfig().RequestTimeout,
+				Payload:        payload,
 			}), config.GetConsumerConfig().ConnectTimeout, false)
 			// input store
 			if exchangeClientTmp != nil {
diff --git a/protocol/dubbo/impl/codec.go b/protocol/dubbo/impl/codec.go
index 6c9816f..1b3cb36 100644
--- a/protocol/dubbo/impl/codec.go
+++ b/protocol/dubbo/impl/codec.go
@@ -219,6 +219,10 @@ func packRequest(p DubboPackage, serializer Serializer) ([]byte, error) {
 	//////////////////////////////////////////
 	// body
 	//////////////////////////////////////////
+	payload := DEFAULT_LEN
+	if p.Payload > 0 {
+		payload = p.Payload
+	}
 	if p.IsHeartBeat() {
 		byteArray = append(byteArray, byte('N'))
 		pkgLen = 1
@@ -228,8 +232,8 @@ func packRequest(p DubboPackage, serializer Serializer) ([]byte, error) {
 			return nil, err
 		}
 		pkgLen = len(body)
-		if pkgLen > int(DEFAULT_LEN) { // 8M
-			return nil, perrors.Errorf("Data length %d too large, max payload %d", pkgLen, DEFAULT_LEN)
+		if pkgLen > payload { // 8M
+			return nil, perrors.Errorf("Data length %d too large, max payload %d", pkgLen, payload)
 		}
 		byteArray = append(byteArray, body...)
 	}
diff --git a/protocol/dubbo/impl/package.go b/protocol/dubbo/impl/package.go
index 6f6d2ea..fe329c9 100644
--- a/protocol/dubbo/impl/package.go
+++ b/protocol/dubbo/impl/package.go
@@ -65,6 +65,7 @@ type DubboPackage struct {
 	Body    interface{}
 	Err     error
 	Codec   *ProtocolCodec
+	Payload int
 }
 
 func (p DubboPackage) String() string {
diff --git a/remoting/exchange.go b/remoting/exchange.go
index ad136a7..8d59ff0 100644
--- a/remoting/exchange.go
+++ b/remoting/exchange.go
@@ -59,9 +59,10 @@ type Request struct {
 	// serial ID (ignore)
 	SerialID byte
 	// Data
-	Data   interface{}
-	TwoWay bool
-	Event  bool
+	Data    interface{}
+	TwoWay  bool
+	Event   bool
+	Payload int
 }
 
 // NewRequest aims to create Request.
diff --git a/remoting/getty/getty_client.go b/remoting/getty/getty_client.go
index 7082773..145e1a9 100644
--- a/remoting/getty/getty_client.go
+++ b/remoting/getty/getty_client.go
@@ -119,6 +119,7 @@ type Options struct {
 	ConnectTimeout time.Duration
 	// request timeout
 	RequestTimeout time.Duration
+	Payload        int
 }
 
 // Client : some configuration for network communication.
@@ -257,6 +258,7 @@ func (c *Client) selectSession(addr string) (*gettyRPCClient, getty.Session, err
 }
 
 func (c *Client) transfer(session getty.Session, request *remoting.Request, timeout time.Duration) (int, int, error) {
+	request.Payload = c.opts.Payload
 	totalLen, sendLen, err := session.WritePkg(request, timeout)
 	return totalLen, sendLen, perrors.WithStack(err)
 }