You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dubbo.apache.org by al...@apache.org on 2021/10/22 14:54:17 UTC
[dubbo-go] branch 1.5 updated: Fix: 1526 (#1527)
This is an automated email from the ASF dual-hosted git repository.
alexstocks pushed a commit to branch 1.5
in repository https://gitbox.apache.org/repos/asf/dubbo-go.git
The following commit(s) were added to refs/heads/1.5 by this push:
new 22ec982 Fix: 1526 (#1527)
22ec982 is described below
commit 22ec982bc6240ff8c5aa97d3d634f4473fa2173a
Author: ChangedenChan <ch...@gmail.com>
AuthorDate: Fri Oct 22 22:54:10 2021 +0800
Fix: 1526 (#1527)
* 支持配置protocol.payload
* dubbo consumer支持配置protocol.payload
* 格式化代码
* 格式化代码
* rename ProtocolConfig.Ip to ProtocolConfig.IP
* rollback rename ProtocolConfig.Ip
---
common/constant/default.go | 13 ++++++++++++-
common/constant/key.go | 5 +++++
config/config_loader.go | 1 +
config/consumer_config.go | 1 +
config/protocol_config.go | 7 ++++---
config/reference_config.go | 9 +++++++++
protocol/dubbo/dubbo_codec.go | 1 +
protocol/dubbo/dubbo_protocol.go | 3 +++
protocol/dubbo/impl/codec.go | 8 ++++++--
protocol/dubbo/impl/package.go | 1 +
remoting/exchange.go | 7 ++++---
remoting/getty/getty_client.go | 2 ++
12 files changed, 49 insertions(+), 9 deletions(-)
diff --git a/common/constant/default.go b/common/constant/default.go
index a6790f9..1ea9e1c 100644
--- a/common/constant/default.go
+++ b/common/constant/default.go
@@ -17,7 +17,10 @@
package constant
-import "time"
+import (
+ "strconv"
+ "time"
+)
const (
DUBBO = "dubbo"
@@ -90,3 +93,11 @@ const (
const (
SERVICE_DISCOVERY_DEFAULT_GROUP = "DEFAULT_GROUP"
)
+
+const (
+ DefaultProtocolPayload = 8388608
+)
+
+var (
+ DefaultProtocolPayloadStr = strconv.Itoa(DefaultProtocolPayload)
+)
diff --git a/common/constant/key.go b/common/constant/key.go
index bd161ab..d8a66c1 100644
--- a/common/constant/key.go
+++ b/common/constant/key.go
@@ -372,3 +372,8 @@ const (
AfterAllServicesListenCompleteFunctionName = "AfterAllServicesListenComplete"
BeforeShutdownFunctionName = "BeforeShutdown"
)
+
+// protocol config
+const (
+ ProtocolPayload = "protocol.payload"
+)
diff --git a/config/config_loader.go b/config/config_loader.go
index d9288f9..741137e 100644
--- a/config/config_loader.go
+++ b/config/config_loader.go
@@ -139,6 +139,7 @@ func loadConsumerConfig() {
checkRegistries(consumerConfig.Registries, consumerConfig.Registry)
for key, ref := range consumerConfig.References {
+ ref.Protocols = consumerConfig.Protocols
if ref.Generic {
genericService := NewGenericService(key)
SetConsumerService(genericService)
diff --git a/config/consumer_config.go b/config/consumer_config.go
index c50c2a7..de05224 100644
--- a/config/consumer_config.go
+++ b/config/consumer_config.go
@@ -59,6 +59,7 @@ type ConsumerConfig struct {
Check *bool `yaml:"check" json:"check,omitempty" property:"check"`
References map[string]*ReferenceConfig `yaml:"references" json:"references,omitempty" property:"references"`
+ Protocols map[string]*ProtocolConfig `yaml:"protocols" json:"protocols,omitempty" property:"protocols"`
ProtocolConf interface{} `yaml:"protocol_conf" json:"protocol_conf,omitempty" property:"protocol_conf"`
FilterConf interface{} `yaml:"filter_conf" json:"filter_conf,omitempty" property:"filter_conf"`
ShutdownConfig *ShutdownConfig `yaml:"shutdown_conf" json:"shutdown_conf,omitempty" property:"shutdown_conf"`
diff --git a/config/protocol_config.go b/config/protocol_config.go
index cee5b7a..f83bece 100644
--- a/config/protocol_config.go
+++ b/config/protocol_config.go
@@ -27,9 +27,10 @@ import (
// ProtocolConfig is protocol configuration
type ProtocolConfig struct {
- Name string `required:"true" yaml:"name" json:"name,omitempty" property:"name"`
- Ip string `required:"true" yaml:"ip" json:"ip,omitempty" property:"ip"`
- Port string `required:"true" yaml:"port" json:"port,omitempty" property:"port"`
+ Name string `required:"true" yaml:"name" json:"name,omitempty" property:"name"`
+ Ip string `required:"true" yaml:"ip" json:"ip,omitempty" property:"ip"`
+ Port string `required:"true" yaml:"port" json:"port,omitempty" property:"port"`
+ Payload int `yaml:"payload" json:"payload,omitempty" property:"payload"`
}
// nolint
diff --git a/config/reference_config.go b/config/reference_config.go
index 97ac688..48198cf 100644
--- a/config/reference_config.go
+++ b/config/reference_config.go
@@ -66,6 +66,7 @@ type ReferenceConfig struct {
Sticky bool `yaml:"sticky" json:"sticky,omitempty" property:"sticky"`
RequestTimeout string `yaml:"timeout" json:"timeout,omitempty" property:"timeout"`
ForceTag bool `yaml:"force.tag" json:"force.tag,omitempty" property:"force.tag"`
+ Protocols map[string]*ProtocolConfig
}
// nolint
@@ -103,6 +104,14 @@ func (c *ReferenceConfig) Refer(_ interface{}) {
}
c.loadProcessConfig(cfgURL, constant.HookEventBeforeReferenceConnect, nil)
c.postProcessConfig(cfgURL)
+ protocolConfigs := loadProtocol(c.Protocol, c.Protocols)
+ payload := constant.DefaultProtocolPayload
+ if len(protocolConfigs) > 0 {
+ if pl := protocolConfigs[0].Payload; pl > 0 {
+ payload = pl
+ }
+ }
+ cfgURL.AddParam(constant.ProtocolPayload, strconv.Itoa(payload))
if c.URL != "" {
// 1. user specified URL, could be peer-to-peer address, or register center's address.
urlStrings := gxstrings.RegSplit(c.URL, "\\s*[;]+\\s*")
diff --git a/protocol/dubbo/dubbo_codec.go b/protocol/dubbo/dubbo_codec.go
index 21376c3..0353424 100644
--- a/protocol/dubbo/dubbo_codec.go
+++ b/protocol/dubbo/dubbo_codec.go
@@ -96,6 +96,7 @@ func (c *DubboCodec) EncodeRequest(request *remoting.Request) (*bytes.Buffer, er
Body: impl.NewRequestPayload(invocation.Arguments(), invocation.Attachments()),
Err: nil,
Codec: impl.NewDubboCodec(nil),
+ Payload: request.Payload,
}
if err := impl.LoadSerializer(pkg); err != nil {
diff --git a/protocol/dubbo/dubbo_protocol.go b/protocol/dubbo/dubbo_protocol.go
index 94afbb1..585c003 100644
--- a/protocol/dubbo/dubbo_protocol.go
+++ b/protocol/dubbo/dubbo_protocol.go
@@ -20,6 +20,7 @@ package dubbo
import (
"context"
"fmt"
+ "strconv"
"sync"
"time"
)
@@ -198,9 +199,11 @@ func getExchangeClient(url *common.URL) *remoting.ExchangeClient {
return
}
// new ExchangeClient
+ payload, _ := strconv.Atoi(url.GetParam(constant.ProtocolPayload, constant.DefaultProtocolPayloadStr))
exchangeClientTmp = remoting.NewExchangeClient(url, getty.NewClient(getty.Options{
ConnectTimeout: config.GetConsumerConfig().ConnectTimeout,
RequestTimeout: config.GetConsumerConfig().RequestTimeout,
+ Payload: payload,
}), config.GetConsumerConfig().ConnectTimeout, false)
// input store
if exchangeClientTmp != nil {
diff --git a/protocol/dubbo/impl/codec.go b/protocol/dubbo/impl/codec.go
index 6c9816f..1b3cb36 100644
--- a/protocol/dubbo/impl/codec.go
+++ b/protocol/dubbo/impl/codec.go
@@ -219,6 +219,10 @@ func packRequest(p DubboPackage, serializer Serializer) ([]byte, error) {
//////////////////////////////////////////
// body
//////////////////////////////////////////
+ payload := DEFAULT_LEN
+ if p.Payload > 0 {
+ payload = p.Payload
+ }
if p.IsHeartBeat() {
byteArray = append(byteArray, byte('N'))
pkgLen = 1
@@ -228,8 +232,8 @@ func packRequest(p DubboPackage, serializer Serializer) ([]byte, error) {
return nil, err
}
pkgLen = len(body)
- if pkgLen > int(DEFAULT_LEN) { // 8M
- return nil, perrors.Errorf("Data length %d too large, max payload %d", pkgLen, DEFAULT_LEN)
+ if pkgLen > payload { // 8M
+ return nil, perrors.Errorf("Data length %d too large, max payload %d", pkgLen, payload)
}
byteArray = append(byteArray, body...)
}
diff --git a/protocol/dubbo/impl/package.go b/protocol/dubbo/impl/package.go
index 6f6d2ea..fe329c9 100644
--- a/protocol/dubbo/impl/package.go
+++ b/protocol/dubbo/impl/package.go
@@ -65,6 +65,7 @@ type DubboPackage struct {
Body interface{}
Err error
Codec *ProtocolCodec
+ Payload int
}
func (p DubboPackage) String() string {
diff --git a/remoting/exchange.go b/remoting/exchange.go
index ad136a7..8d59ff0 100644
--- a/remoting/exchange.go
+++ b/remoting/exchange.go
@@ -59,9 +59,10 @@ type Request struct {
// serial ID (ignore)
SerialID byte
// Data
- Data interface{}
- TwoWay bool
- Event bool
+ Data interface{}
+ TwoWay bool
+ Event bool
+ Payload int
}
// NewRequest aims to create Request.
diff --git a/remoting/getty/getty_client.go b/remoting/getty/getty_client.go
index 7082773..145e1a9 100644
--- a/remoting/getty/getty_client.go
+++ b/remoting/getty/getty_client.go
@@ -119,6 +119,7 @@ type Options struct {
ConnectTimeout time.Duration
// request timeout
RequestTimeout time.Duration
+ Payload int
}
// Client : some configuration for network communication.
@@ -257,6 +258,7 @@ func (c *Client) selectSession(addr string) (*gettyRPCClient, getty.Session, err
}
func (c *Client) transfer(session getty.Session, request *remoting.Request, timeout time.Duration) (int, int, error) {
+ request.Payload = c.opts.Payload
totalLen, sendLen, err := session.WritePkg(request, timeout)
return totalLen, sendLen, perrors.WithStack(err)
}