You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by go...@apache.org on 2021/07/19 12:56:24 UTC
[incubator-inlong] branch INLONG-25 updated: [INLONG-764]Fix Go SDK
RPC Request bug (#564)
This is an automated email from the ASF dual-hosted git repository.
gosonzhang pushed a commit to branch INLONG-25
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/INLONG-25 by this push:
new 53e6d0a [INLONG-764]Fix Go SDK RPC Request bug (#564)
53e6d0a is described below
commit 53e6d0adbd7b203e371d173989835076939da515
Author: Zijie Lu <ws...@gmail.com>
AuthorDate: Mon Jul 19 20:56:16 2021 +0800
[INLONG-764]Fix Go SDK RPC Request bug (#564)
Signed-off-by: Zijie Lu <ws...@gmail.com>
---
tubemq-client-twins/tubemq-client-go/rpc/broker.go | 30 ++++------------------
tubemq-client-twins/tubemq-client-go/rpc/client.go | 2 +-
tubemq-client-twins/tubemq-client-go/rpc/master.go | 18 +++----------
3 files changed, 9 insertions(+), 41 deletions(-)
diff --git a/tubemq-client-twins/tubemq-client-go/rpc/broker.go b/tubemq-client-twins/tubemq-client-go/rpc/broker.go
index 5105fd5..face295 100644
--- a/tubemq-client-twins/tubemq-client-go/rpc/broker.go
+++ b/tubemq-client-twins/tubemq-client-go/rpc/broker.go
@@ -71,10 +71,6 @@ func (c *rpcClient) RegisterRequestC2B(ctx context.Context, metadata *metadata.M
if offset != util.InvalidValue {
reqC2B.CurrOffset = proto.Int64(offset)
}
- data, err := proto.Marshal(reqC2B)
- if err != nil {
- return nil, errs.New(errs.RetMarshalFailure, err.Error())
- }
req := codec.NewRPCRequest()
req.RpcHeader = &protocol.RpcConnHeader{
Flag: proto.Int32(0),
@@ -85,9 +81,9 @@ func (c *rpcClient) RegisterRequestC2B(ctx context.Context, metadata *metadata.M
}
req.RequestBody = &protocol.RequestBody{
Method: proto.Int32(brokerConsumerRegister),
- Request: data,
Timeout: proto.Int64(c.config.Net.ReadTimeout.Milliseconds()),
}
+ req.Body = reqC2B
rspBody, err := c.doRequest(ctx, metadata.GetNode().GetAddress(), req)
if err != nil {
@@ -117,19 +113,15 @@ func (c *rpcClient) UnregisterRequestC2B(ctx context.Context, metadata *metadata
req.RpcHeader = &protocol.RpcConnHeader{
Flag: proto.Int32(0),
}
- data, err := proto.Marshal(reqC2B)
- if err != nil {
- return nil, errs.New(errs.RetMarshalFailure, err.Error())
- }
req.RequestHeader = &protocol.RequestHeader{
ServiceType: proto.Int32(brokerReadService),
ProtocolVer: proto.Int32(2),
}
req.RequestBody = &protocol.RequestBody{
Method: proto.Int32(brokerConsumerRegister),
- Request: data,
Timeout: proto.Int64(c.config.Net.ReadTimeout.Milliseconds()),
}
+ req.Body = reqC2B
rspBody, err := c.doRequest(ctx, metadata.GetNode().GetAddress(), req)
if err != nil {
@@ -159,19 +151,15 @@ func (c *rpcClient) GetMessageRequestC2B(ctx context.Context, metadata *metadata
req.RpcHeader = &protocol.RpcConnHeader{
Flag: proto.Int32(0),
}
- data, err := proto.Marshal(reqC2B)
- if err != nil {
- return nil, errs.New(errs.RetMarshalFailure, err.Error())
- }
req.RequestHeader = &protocol.RequestHeader{
ServiceType: proto.Int32(brokerReadService),
ProtocolVer: proto.Int32(2),
}
req.RequestBody = &protocol.RequestBody{
Method: proto.Int32(brokerConsumerGetMsg),
- Request: data,
Timeout: proto.Int64(c.config.Net.ReadTimeout.Milliseconds()),
}
+ req.Body = reqC2B
rspBody, err := c.doRequest(ctx, metadata.GetNode().GetAddress(), req)
if err != nil {
@@ -199,19 +187,15 @@ func (c *rpcClient) CommitOffsetRequestC2B(ctx context.Context, metadata *metada
req.RpcHeader = &protocol.RpcConnHeader{
Flag: proto.Int32(10),
}
- data, err := proto.Marshal(reqC2B)
- if err != nil {
- return nil, errs.New(errs.RetMarshalFailure, err.Error())
- }
req.RequestHeader = &protocol.RequestHeader{
ServiceType: proto.Int32(brokerReadService),
ProtocolVer: proto.Int32(2),
}
req.RequestBody = &protocol.RequestBody{
Method: proto.Int32(brokerConsumerCommit),
- Request: data,
Timeout: proto.Int64(c.config.Net.ReadTimeout.Milliseconds()),
}
+ req.Body = reqC2B
rspBody, err := c.doRequest(ctx, metadata.GetNode().GetAddress(), req)
if err != nil {
@@ -240,10 +224,6 @@ func (c *rpcClient) HeartbeatRequestC2B(ctx context.Context, metadata *metadata.
for _, partition := range partitions {
reqC2B.PartitionInfo = append(reqC2B.PartitionInfo, partition.String())
}
- data, err := proto.Marshal(reqC2B)
- if err != nil {
- return nil, errs.New(errs.RetMarshalFailure, err.Error())
- }
req := codec.NewRPCRequest()
req.RequestHeader = &protocol.RequestHeader{
ServiceType: proto.Int32(brokerReadService),
@@ -251,12 +231,12 @@ func (c *rpcClient) HeartbeatRequestC2B(ctx context.Context, metadata *metadata.
}
req.RequestBody = &protocol.RequestBody{
Method: proto.Int32(brokerConsumerHeartbeat),
- Request: data,
Timeout: proto.Int64(c.config.Net.ReadTimeout.Milliseconds()),
}
req.RpcHeader = &protocol.RpcConnHeader{
Flag: proto.Int32(0),
}
+ req.Body = reqC2B
rspBody, err := c.doRequest(ctx, metadata.GetNode().GetAddress(), req)
if err != nil {
diff --git a/tubemq-client-twins/tubemq-client-go/rpc/client.go b/tubemq-client-twins/tubemq-client-go/rpc/client.go
index 64bba71..ae0b469 100644
--- a/tubemq-client-twins/tubemq-client-go/rpc/client.go
+++ b/tubemq-client-twins/tubemq-client-go/rpc/client.go
@@ -33,8 +33,8 @@ import (
)
const (
+ masterService = 1
brokerReadService = 2
- masterService = 4
)
// RPCClient is the rpc level client to interact with TubeMQ.
diff --git a/tubemq-client-twins/tubemq-client-go/rpc/master.go b/tubemq-client-twins/tubemq-client-go/rpc/master.go
index c0d4bc3..726c8db 100644
--- a/tubemq-client-twins/tubemq-client-go/rpc/master.go
+++ b/tubemq-client-twins/tubemq-client-go/rpc/master.go
@@ -75,10 +75,6 @@ func (c *rpcClient) RegisterRequestC2M(ctx context.Context, metadata *metadata.M
AuthInfo: &protocol.AuthenticateInfo{},
}
- data, err := proto.Marshal(reqC2M)
- if err != nil {
- return nil, errs.New(errs.RetMarshalFailure, err.Error())
- }
req := codec.NewRPCRequest()
req.RpcHeader = &protocol.RpcConnHeader{
Flag: proto.Int32(0),
@@ -89,9 +85,9 @@ func (c *rpcClient) RegisterRequestC2M(ctx context.Context, metadata *metadata.M
}
req.RequestBody = &protocol.RequestBody{
Method: proto.Int32(masterConsumerRegister),
- Request: data,
Timeout: proto.Int64(c.config.Net.ReadTimeout.Milliseconds()),
}
+ req.Body = reqC2M
rspBody, err := c.doRequest(ctx, metadata.GetNode().GetAddress(), req)
if err != nil {
@@ -142,10 +138,6 @@ func (c *rpcClient) HeartRequestC2M(ctx context.Context, metadata *metadata.Meta
reqC2M.AuthInfo = &protocol.MasterCertificateInfo{
AuthInfo: &protocol.AuthenticateInfo{},
}
- data, err := proto.Marshal(reqC2M)
- if err != nil {
- return nil, errs.New(errs.RetMarshalFailure, err.Error())
- }
req := codec.NewRPCRequest()
req.RequestHeader = &protocol.RequestHeader{
ServiceType: proto.Int32(masterService),
@@ -153,12 +145,12 @@ func (c *rpcClient) HeartRequestC2M(ctx context.Context, metadata *metadata.Meta
}
req.RequestBody = &protocol.RequestBody{
Method: proto.Int32(masterConsumerHeartbeat),
- Request: data,
Timeout: proto.Int64(c.config.Net.ReadTimeout.Milliseconds()),
}
req.RpcHeader = &protocol.RpcConnHeader{
Flag: proto.Int32(0),
}
+ req.Body = reqC2M
rspBody, err := c.doRequest(ctx, metadata.GetNode().GetAddress(), req)
if err != nil {
@@ -180,10 +172,6 @@ func (c *rpcClient) CloseRequestC2M(ctx context.Context, metadata *metadata.Meta
GroupName: proto.String(metadata.GetSubscribeInfo().GetGroup()),
AuthInfo: sub.GetMasterCertificateIInfo(),
}
- data, err := proto.Marshal(reqC2M)
- if err != nil {
- return nil, errs.New(errs.RetMarshalFailure, err.Error())
- }
req := codec.NewRPCRequest()
req.RequestHeader = &protocol.RequestHeader{
ServiceType: proto.Int32(masterService),
@@ -191,12 +179,12 @@ func (c *rpcClient) CloseRequestC2M(ctx context.Context, metadata *metadata.Meta
}
req.RequestBody = &protocol.RequestBody{
Method: proto.Int32(masterConsumerClose),
- Request: data,
Timeout: proto.Int64(c.config.Net.ReadTimeout.Milliseconds()),
}
req.RpcHeader = &protocol.RpcConnHeader{
Flag: proto.Int32(0),
}
+ req.Body = reqC2M
rspBody, err := c.doRequest(ctx, metadata.GetNode().GetAddress(), req)
if err != nil {