You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by go...@apache.org on 2021/07/19 12:56:24 UTC

[incubator-inlong] branch INLONG-25 updated: [INLONG-764]Fix Go SDK RPC Request bug (#564)

This is an automated email from the ASF dual-hosted git repository.

gosonzhang pushed a commit to branch INLONG-25
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/INLONG-25 by this push:
     new 53e6d0a  [INLONG-764]Fix Go SDK RPC Request bug (#564)
53e6d0a is described below

commit 53e6d0adbd7b203e371d173989835076939da515
Author: Zijie Lu <ws...@gmail.com>
AuthorDate: Mon Jul 19 20:56:16 2021 +0800

    [INLONG-764]Fix Go SDK RPC Request bug (#564)
    
    Signed-off-by: Zijie Lu <ws...@gmail.com>
---
 tubemq-client-twins/tubemq-client-go/rpc/broker.go | 30 ++++------------------
 tubemq-client-twins/tubemq-client-go/rpc/client.go |  2 +-
 tubemq-client-twins/tubemq-client-go/rpc/master.go | 18 +++----------
 3 files changed, 9 insertions(+), 41 deletions(-)

diff --git a/tubemq-client-twins/tubemq-client-go/rpc/broker.go b/tubemq-client-twins/tubemq-client-go/rpc/broker.go
index 5105fd5..face295 100644
--- a/tubemq-client-twins/tubemq-client-go/rpc/broker.go
+++ b/tubemq-client-twins/tubemq-client-go/rpc/broker.go
@@ -71,10 +71,6 @@ func (c *rpcClient) RegisterRequestC2B(ctx context.Context, metadata *metadata.M
 	if offset != util.InvalidValue {
 		reqC2B.CurrOffset = proto.Int64(offset)
 	}
-	data, err := proto.Marshal(reqC2B)
-	if err != nil {
-		return nil, errs.New(errs.RetMarshalFailure, err.Error())
-	}
 	req := codec.NewRPCRequest()
 	req.RpcHeader = &protocol.RpcConnHeader{
 		Flag: proto.Int32(0),
@@ -85,9 +81,9 @@ func (c *rpcClient) RegisterRequestC2B(ctx context.Context, metadata *metadata.M
 	}
 	req.RequestBody = &protocol.RequestBody{
 		Method:  proto.Int32(brokerConsumerRegister),
-		Request: data,
 		Timeout: proto.Int64(c.config.Net.ReadTimeout.Milliseconds()),
 	}
+	req.Body = reqC2B
 
 	rspBody, err := c.doRequest(ctx, metadata.GetNode().GetAddress(), req)
 	if err != nil {
@@ -117,19 +113,15 @@ func (c *rpcClient) UnregisterRequestC2B(ctx context.Context, metadata *metadata
 	req.RpcHeader = &protocol.RpcConnHeader{
 		Flag: proto.Int32(0),
 	}
-	data, err := proto.Marshal(reqC2B)
-	if err != nil {
-		return nil, errs.New(errs.RetMarshalFailure, err.Error())
-	}
 	req.RequestHeader = &protocol.RequestHeader{
 		ServiceType: proto.Int32(brokerReadService),
 		ProtocolVer: proto.Int32(2),
 	}
 	req.RequestBody = &protocol.RequestBody{
 		Method:  proto.Int32(brokerConsumerRegister),
-		Request: data,
 		Timeout: proto.Int64(c.config.Net.ReadTimeout.Milliseconds()),
 	}
+	req.Body = reqC2B
 
 	rspBody, err := c.doRequest(ctx, metadata.GetNode().GetAddress(), req)
 	if err != nil {
@@ -159,19 +151,15 @@ func (c *rpcClient) GetMessageRequestC2B(ctx context.Context, metadata *metadata
 	req.RpcHeader = &protocol.RpcConnHeader{
 		Flag: proto.Int32(0),
 	}
-	data, err := proto.Marshal(reqC2B)
-	if err != nil {
-		return nil, errs.New(errs.RetMarshalFailure, err.Error())
-	}
 	req.RequestHeader = &protocol.RequestHeader{
 		ServiceType: proto.Int32(brokerReadService),
 		ProtocolVer: proto.Int32(2),
 	}
 	req.RequestBody = &protocol.RequestBody{
 		Method:  proto.Int32(brokerConsumerGetMsg),
-		Request: data,
 		Timeout: proto.Int64(c.config.Net.ReadTimeout.Milliseconds()),
 	}
+	req.Body = reqC2B
 
 	rspBody, err := c.doRequest(ctx, metadata.GetNode().GetAddress(), req)
 	if err != nil {
@@ -199,19 +187,15 @@ func (c *rpcClient) CommitOffsetRequestC2B(ctx context.Context, metadata *metada
 	req.RpcHeader = &protocol.RpcConnHeader{
 		Flag: proto.Int32(10),
 	}
-	data, err := proto.Marshal(reqC2B)
-	if err != nil {
-		return nil, errs.New(errs.RetMarshalFailure, err.Error())
-	}
 	req.RequestHeader = &protocol.RequestHeader{
 		ServiceType: proto.Int32(brokerReadService),
 		ProtocolVer: proto.Int32(2),
 	}
 	req.RequestBody = &protocol.RequestBody{
 		Method:  proto.Int32(brokerConsumerCommit),
-		Request: data,
 		Timeout: proto.Int64(c.config.Net.ReadTimeout.Milliseconds()),
 	}
+	req.Body = reqC2B
 
 	rspBody, err := c.doRequest(ctx, metadata.GetNode().GetAddress(), req)
 	if err != nil {
@@ -240,10 +224,6 @@ func (c *rpcClient) HeartbeatRequestC2B(ctx context.Context, metadata *metadata.
 	for _, partition := range partitions {
 		reqC2B.PartitionInfo = append(reqC2B.PartitionInfo, partition.String())
 	}
-	data, err := proto.Marshal(reqC2B)
-	if err != nil {
-		return nil, errs.New(errs.RetMarshalFailure, err.Error())
-	}
 	req := codec.NewRPCRequest()
 	req.RequestHeader = &protocol.RequestHeader{
 		ServiceType: proto.Int32(brokerReadService),
@@ -251,12 +231,12 @@ func (c *rpcClient) HeartbeatRequestC2B(ctx context.Context, metadata *metadata.
 	}
 	req.RequestBody = &protocol.RequestBody{
 		Method:  proto.Int32(brokerConsumerHeartbeat),
-		Request: data,
 		Timeout: proto.Int64(c.config.Net.ReadTimeout.Milliseconds()),
 	}
 	req.RpcHeader = &protocol.RpcConnHeader{
 		Flag: proto.Int32(0),
 	}
+	req.Body = reqC2B
 
 	rspBody, err := c.doRequest(ctx, metadata.GetNode().GetAddress(), req)
 	if err != nil {
diff --git a/tubemq-client-twins/tubemq-client-go/rpc/client.go b/tubemq-client-twins/tubemq-client-go/rpc/client.go
index 64bba71..ae0b469 100644
--- a/tubemq-client-twins/tubemq-client-go/rpc/client.go
+++ b/tubemq-client-twins/tubemq-client-go/rpc/client.go
@@ -33,8 +33,8 @@ import (
 )
 
 const (
+	masterService     = 1
 	brokerReadService = 2
-	masterService     = 4
 )
 
 // RPCClient is the rpc level client to interact with TubeMQ.
diff --git a/tubemq-client-twins/tubemq-client-go/rpc/master.go b/tubemq-client-twins/tubemq-client-go/rpc/master.go
index c0d4bc3..726c8db 100644
--- a/tubemq-client-twins/tubemq-client-go/rpc/master.go
+++ b/tubemq-client-twins/tubemq-client-go/rpc/master.go
@@ -75,10 +75,6 @@ func (c *rpcClient) RegisterRequestC2M(ctx context.Context, metadata *metadata.M
 		AuthInfo: &protocol.AuthenticateInfo{},
 	}
 
-	data, err := proto.Marshal(reqC2M)
-	if err != nil {
-		return nil, errs.New(errs.RetMarshalFailure, err.Error())
-	}
 	req := codec.NewRPCRequest()
 	req.RpcHeader = &protocol.RpcConnHeader{
 		Flag: proto.Int32(0),
@@ -89,9 +85,9 @@ func (c *rpcClient) RegisterRequestC2M(ctx context.Context, metadata *metadata.M
 	}
 	req.RequestBody = &protocol.RequestBody{
 		Method:  proto.Int32(masterConsumerRegister),
-		Request: data,
 		Timeout: proto.Int64(c.config.Net.ReadTimeout.Milliseconds()),
 	}
+	req.Body = reqC2M
 
 	rspBody, err := c.doRequest(ctx, metadata.GetNode().GetAddress(), req)
 	if err != nil {
@@ -142,10 +138,6 @@ func (c *rpcClient) HeartRequestC2M(ctx context.Context, metadata *metadata.Meta
 	reqC2M.AuthInfo = &protocol.MasterCertificateInfo{
 		AuthInfo: &protocol.AuthenticateInfo{},
 	}
-	data, err := proto.Marshal(reqC2M)
-	if err != nil {
-		return nil, errs.New(errs.RetMarshalFailure, err.Error())
-	}
 	req := codec.NewRPCRequest()
 	req.RequestHeader = &protocol.RequestHeader{
 		ServiceType: proto.Int32(masterService),
@@ -153,12 +145,12 @@ func (c *rpcClient) HeartRequestC2M(ctx context.Context, metadata *metadata.Meta
 	}
 	req.RequestBody = &protocol.RequestBody{
 		Method:  proto.Int32(masterConsumerHeartbeat),
-		Request: data,
 		Timeout: proto.Int64(c.config.Net.ReadTimeout.Milliseconds()),
 	}
 	req.RpcHeader = &protocol.RpcConnHeader{
 		Flag: proto.Int32(0),
 	}
+	req.Body = reqC2M
 
 	rspBody, err := c.doRequest(ctx, metadata.GetNode().GetAddress(), req)
 	if err != nil {
@@ -180,10 +172,6 @@ func (c *rpcClient) CloseRequestC2M(ctx context.Context, metadata *metadata.Meta
 		GroupName: proto.String(metadata.GetSubscribeInfo().GetGroup()),
 		AuthInfo:  sub.GetMasterCertificateIInfo(),
 	}
-	data, err := proto.Marshal(reqC2M)
-	if err != nil {
-		return nil, errs.New(errs.RetMarshalFailure, err.Error())
-	}
 	req := codec.NewRPCRequest()
 	req.RequestHeader = &protocol.RequestHeader{
 		ServiceType: proto.Int32(masterService),
@@ -191,12 +179,12 @@ func (c *rpcClient) CloseRequestC2M(ctx context.Context, metadata *metadata.Meta
 	}
 	req.RequestBody = &protocol.RequestBody{
 		Method:  proto.Int32(masterConsumerClose),
-		Request: data,
 		Timeout: proto.Int64(c.config.Net.ReadTimeout.Milliseconds()),
 	}
 	req.RpcHeader = &protocol.RpcConnHeader{
 		Flag: proto.Int32(0),
 	}
+	req.Body = reqC2M
 
 	rspBody, err := c.doRequest(ctx, metadata.GetNode().GetAddress(), req)
 	if err != nil {