You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2021/07/20 12:31:37 UTC
[incubator-inlong] branch INLONG-25 updated: [INLONG-766]Fix Go SDK
Codec Bug
This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch INLONG-25
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/INLONG-25 by this push:
new 9b2a943 [INLONG-766]Fix Go SDK Codec Bug
9b2a943 is described below
commit 9b2a943d899fb4764744020d8c66ac9fe8f31dbb
Author: Zijie Lu <ws...@gmail.com>
AuthorDate: Tue Jul 20 15:24:28 2021 +0800
[INLONG-766]Fix Go SDK Codec Bug
---
tubemq-client-twins/tubemq-client-go/client/consumer_impl.go | 2 +-
tubemq-client-twins/tubemq-client-go/codec/tubemq_codec.go | 6 +++---
tubemq-client-twins/tubemq-client-go/rpc/client.go | 3 ++-
3 files changed, 6 insertions(+), 5 deletions(-)
diff --git a/tubemq-client-twins/tubemq-client-go/client/consumer_impl.go b/tubemq-client-twins/tubemq-client-go/client/consumer_impl.go
index f01de93..a744911 100644
--- a/tubemq-client-twins/tubemq-client-go/client/consumer_impl.go
+++ b/tubemq-client-twins/tubemq-client-go/client/consumer_impl.go
@@ -85,7 +85,7 @@ func NewConsumer(config *config.Config) (Consumer, error) {
opts.TLSKeyFile = config.Net.TLS.TLSKeyFile
opts.TLSServerName = config.Net.TLS.TLSServerName
}
- client := rpc.New(pool, opts)
+ client := rpc.New(pool, opts, config)
r := remote.NewRmtDataCache()
r.SetConsumerInfo(clientID, config.Consumer.Group)
c := &consumer{
diff --git a/tubemq-client-twins/tubemq-client-go/codec/tubemq_codec.go b/tubemq-client-twins/tubemq-client-go/codec/tubemq_codec.go
index f805e08..db114b3 100644
--- a/tubemq-client-twins/tubemq-client-go/codec/tubemq_codec.go
+++ b/tubemq-client-twins/tubemq-client-go/codec/tubemq_codec.go
@@ -99,8 +99,8 @@ func (t *TubeMQDecoder) Decode() (Response, error) {
s := int(binary.BigEndian.Uint32(size))
if totalLen+s > len(t.msg) {
data := t.msg[:totalLen]
- t.msg = make([]byte, 0, int(math.Max(float64(2*len(t.msg)), float64(totalLen+s))))
- copy(t.msg, data[:])
+ t.msg = make([]byte, int(math.Max(float64(2*len(t.msg)), float64(totalLen+s))))
+ copy(t.msg, data)
}
if num, err = io.ReadFull(t.reader, t.msg[totalLen:totalLen+s]); err != nil {
@@ -176,7 +176,7 @@ func (t *TubeMQRPCRequest) Marshal() ([]byte, error) {
return nil, err
}
- contentLen := int(dataLen) + int(dataLen) + int(dataLen) + len(data)
+ contentLen := len(data)
listSize := calcBlockCount(contentLen)
buf := bytes.NewBuffer(make([]byte, 0, int(frameHeadLen)+listSize*(RPCMaxBufferSize)))
diff --git a/tubemq-client-twins/tubemq-client-go/rpc/client.go b/tubemq-client-twins/tubemq-client-go/rpc/client.go
index ae0b469..52dae2f 100644
--- a/tubemq-client-twins/tubemq-client-go/rpc/client.go
+++ b/tubemq-client-twins/tubemq-client-go/rpc/client.go
@@ -60,10 +60,11 @@ type RPCClient interface {
}
// New returns a default TubeMQ rpc Client
-func New(pool *multiplexing.Pool, opts *transport.Options) RPCClient {
+func New(pool *multiplexing.Pool, opts *transport.Options, config *config.Config) RPCClient {
return &rpcClient{
pool: pool,
client: transport.New(opts, pool),
+ config: config,
}
}