You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2021/07/20 12:31:37 UTC

[incubator-inlong] branch INLONG-25 updated: [INLONG-766]Fix Go SDK Codec Bug

This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch INLONG-25
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/INLONG-25 by this push:
     new 9b2a943  [INLONG-766]Fix Go SDK Codec Bug
9b2a943 is described below

commit 9b2a943d899fb4764744020d8c66ac9fe8f31dbb
Author: Zijie Lu <ws...@gmail.com>
AuthorDate: Tue Jul 20 15:24:28 2021 +0800

    [INLONG-766]Fix Go SDK Codec Bug
---
 tubemq-client-twins/tubemq-client-go/client/consumer_impl.go | 2 +-
 tubemq-client-twins/tubemq-client-go/codec/tubemq_codec.go   | 6 +++---
 tubemq-client-twins/tubemq-client-go/rpc/client.go           | 3 ++-
 3 files changed, 6 insertions(+), 5 deletions(-)

diff --git a/tubemq-client-twins/tubemq-client-go/client/consumer_impl.go b/tubemq-client-twins/tubemq-client-go/client/consumer_impl.go
index f01de93..a744911 100644
--- a/tubemq-client-twins/tubemq-client-go/client/consumer_impl.go
+++ b/tubemq-client-twins/tubemq-client-go/client/consumer_impl.go
@@ -85,7 +85,7 @@ func NewConsumer(config *config.Config) (Consumer, error) {
 		opts.TLSKeyFile = config.Net.TLS.TLSKeyFile
 		opts.TLSServerName = config.Net.TLS.TLSServerName
 	}
-	client := rpc.New(pool, opts)
+	client := rpc.New(pool, opts, config)
 	r := remote.NewRmtDataCache()
 	r.SetConsumerInfo(clientID, config.Consumer.Group)
 	c := &consumer{
diff --git a/tubemq-client-twins/tubemq-client-go/codec/tubemq_codec.go b/tubemq-client-twins/tubemq-client-go/codec/tubemq_codec.go
index f805e08..db114b3 100644
--- a/tubemq-client-twins/tubemq-client-go/codec/tubemq_codec.go
+++ b/tubemq-client-twins/tubemq-client-go/codec/tubemq_codec.go
@@ -99,8 +99,8 @@ func (t *TubeMQDecoder) Decode() (Response, error) {
 		s := int(binary.BigEndian.Uint32(size))
 		if totalLen+s > len(t.msg) {
 			data := t.msg[:totalLen]
-			t.msg = make([]byte, 0, int(math.Max(float64(2*len(t.msg)), float64(totalLen+s))))
-			copy(t.msg, data[:])
+			t.msg = make([]byte, int(math.Max(float64(2*len(t.msg)), float64(totalLen+s))))
+			copy(t.msg, data)
 		}
 
 		if num, err = io.ReadFull(t.reader, t.msg[totalLen:totalLen+s]); err != nil {
@@ -176,7 +176,7 @@ func (t *TubeMQRPCRequest) Marshal() ([]byte, error) {
 		return nil, err
 	}
 
-	contentLen := int(dataLen) + int(dataLen) + int(dataLen) + len(data)
+	contentLen := len(data)
 	listSize := calcBlockCount(contentLen)
 
 	buf := bytes.NewBuffer(make([]byte, 0, int(frameHeadLen)+listSize*(RPCMaxBufferSize)))
diff --git a/tubemq-client-twins/tubemq-client-go/rpc/client.go b/tubemq-client-twins/tubemq-client-go/rpc/client.go
index ae0b469..52dae2f 100644
--- a/tubemq-client-twins/tubemq-client-go/rpc/client.go
+++ b/tubemq-client-twins/tubemq-client-go/rpc/client.go
@@ -60,10 +60,11 @@ type RPCClient interface {
 }
 
 // New returns a default TubeMQ rpc Client
-func New(pool *multiplexing.Pool, opts *transport.Options) RPCClient {
+func New(pool *multiplexing.Pool, opts *transport.Options, config *config.Config) RPCClient {
 	return &rpcClient{
 		pool:   pool,
 		client: transport.New(opts, pool),
+		config: config,
 	}
 }